Coverage for britney2/policies/autopkgtest.py: 89%

790 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-08-23 07:57 +0000

1# -*- coding: utf-8 -*- 

2 

3# Copyright (C) 2013 - 2016 Canonical Ltd. 

4# Authors: 

5# Colin Watson <cjwatson@ubuntu.com> 

6# Jean-Baptiste Lallement <jean-baptiste.lallement@canonical.com> 

7# Martin Pitt <martin.pitt@ubuntu.com> 

8 

9# This program is free software; you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation; either version 2 of the License, or 

12# (at your option) any later version. 

13 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18 

19import calendar 

20import collections 

21import http.client 

22import io 

23import itertools 

24import json 

25import optparse 

26import os 

27import socket 

28import sys 

29import tarfile 

30import time 

31import urllib.parse 

32from copy import deepcopy 

33from enum import Enum 

34from functools import lru_cache, total_ordering 

35from typing import TYPE_CHECKING, Any, Optional, cast 

36from collections.abc import Iterator 

37from urllib.error import HTTPError 

38from urllib.request import urlopen 

39from urllib.response import addinfourl 

40 

41import apt_pkg 

42 

43import britney2.hints 

44from britney2 import ( 

45 BinaryPackageId, 

46 PackageId, 

47 SourcePackage, 

48 SuiteClass, 

49 Suites, 

50 TargetSuite, 

51) 

52from britney2.migrationitem import MigrationItem 

53from britney2.policies import PolicyVerdict 

54from britney2.policies.policy import AbstractBasePolicy 

55from britney2.utils import iter_except, parse_option 

56 

57if TYPE_CHECKING: 57 ↛ 58line 57 didn't jump to line 58, because the condition on line 57 was never true

58 import amqplib.client_0_8 as amqp 

59 

60 from ..britney import Britney 

61 from ..excuse import Excuse 

62 from ..hints import HintParser 

63 

64 

65@total_ordering 

66class Result(Enum): 

67 PASS = 1 

68 NEUTRAL = 2 

69 FAIL = 3 

70 OLD_PASS = 4 

71 OLD_NEUTRAL = 5 

72 OLD_FAIL = 6 

73 NONE = 7 

74 

75 def __lt__(self, other: "Result") -> bool: 

76 return True if self.value < other.value else False 

77 

78 

79EXCUSES_LABELS = { 

80 "PASS": '<span style="background:#87d96c">Pass</span>', 

81 "OLD_PASS": '<span style="background:#87d96c">Pass</span>', 

82 "NEUTRAL": '<span style="background:#e5c545">No tests, superficial or marked flaky</span>', 

83 "OLD_NEUTRAL": '<span style="background:#e5c545">No tests, superficial or marked flaky</span>', 

84 "FAIL": '<span style="background:#ff6666">Failed</span>', 

85 "OLD_FAIL": '<span style="background:#ff6666">Failed</span>', 

86 "ALWAYSFAIL": '<span style="background:#e5c545">Failed (not a regression)</span>', 

87 "REGRESSION": '<span style="background:#ff6666">Regression or new test</span>', 

88 "IGNORE-FAIL": '<span style="background:#e5c545">Ignored failure</span>', 

89 "RUNNING": '<span style="background:#99ddff">Test in progress</span>', 

90 "RUNNING-REFERENCE": '<span style="background:#ff6666">Reference test in progress, but real test failed already</span>', 

91 "RUNNING-ALWAYSFAIL": '<span style="background:#99ddff">Test in progress (will not be considered a regression)</span>', 

92} 

93 

94REF_TRIG = "migration-reference/0" 

95 

96VERSION_KEY = "britney-autopkgtest-pending-file-version" 

97 

98 

99def srchash(src: str) -> str: 

100 """archive hash prefix for source package""" 

101 

102 if src.startswith("lib"): 102 ↛ 103line 102 didn't jump to line 103, because the condition on line 102 was never true

103 return src[:4] 

104 else: 

105 return src[0] 

106 

107 

108def added_pkgs_compared_to_target_suite( 

109 package_ids: frozenset[BinaryPackageId], 

110 target_suite: TargetSuite, 

111 *, 

112 invert: bool = False, 

113) -> Iterator[BinaryPackageId]: 

114 if invert: 114 ↛ 115line 114 didn't jump to line 115, because the condition on line 114 was never true

115 pkgs_ids_to_ignore = package_ids - set( 

116 target_suite.which_of_these_are_in_the_suite(package_ids) 

117 ) 

118 names_ignored = {p.package_name for p in pkgs_ids_to_ignore} 

119 else: 

120 names_ignored = { 

121 p.package_name 

122 for p in target_suite.which_of_these_are_in_the_suite(package_ids) 

123 } 

124 yield from (p for p in package_ids if p.package_name not in names_ignored) 

125 

126 

127def all_leaf_results( 

128 test_results: dict[str, dict[str, dict[str, list[Any]]]], 

129) -> Iterator[list[Any]]: 

130 for trigger in test_results.values(): 

131 for arch in trigger.values(): 

132 yield from arch.values() 

133 

134 

135def mark_result_as_old(result: Result) -> Result: 

136 """Convert current result into corresponding old result""" 

137 

138 if result == Result.FAIL: 

139 result = Result.OLD_FAIL 

140 elif result == Result.PASS: 

141 result = Result.OLD_PASS 

142 elif result == Result.NEUTRAL: 142 ↛ 144line 142 didn't jump to line 144, because the condition on line 142 was never false

143 result = Result.OLD_NEUTRAL 

144 return result 

145 

146 

147class AutopkgtestPolicy(AbstractBasePolicy): 

148 """autopkgtest regression policy for source migrations 

149 

150 Run autopkgtests for the excuse and all of its reverse dependencies, and 

151 reject the upload if any of those regress. 

152 """ 

153 

154 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

155 super().__init__( 

156 "autopkgtest", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

157 ) 

158 # tests requested in this and previous runs 

159 # trigger -> src -> [arch] 

160 self.pending_tests: Optional[dict[str, dict[str, dict[str, int]]]] = None 

161 self.pending_tests_file = os.path.join( 

162 self.state_dir, "autopkgtest-pending.json" 

163 ) 

164 self.testsuite_triggers: dict[str, set[str]] = {} 

165 self.result_in_baseline_cache: dict[str, dict[str, list[Any]]] = ( 

166 collections.defaultdict(dict) 

167 ) 

168 

169 self.amqp_file_handle: io.TextIOWrapper | None = None 

170 

171 # Default values for this policy's options 

172 parse_option(options, "adt_baseline") 

173 parse_option(options, "adt_huge", to_int=True) 

174 parse_option(options, "adt_ppas") 

175 parse_option(options, "adt_reference_max_age", day_to_sec=True) 

176 parse_option(options, "adt_pending_max_age", default=5, day_to_sec=True) 

177 parse_option(options, "adt_regression_penalty", default=0, to_int=True) 

178 parse_option(options, "adt_log_url") # see below for defaults 

179 parse_option(options, "adt_retry_url") # see below for defaults 

180 parse_option(options, "adt_retry_older_than", day_to_sec=True) 

181 parse_option(options, "adt_results_cache_age", day_to_sec=True) 

182 parse_option(options, "adt_shared_results_cache") 

183 parse_option(options, "adt_success_bounty", default=0, to_int=True) 

184 parse_option(options, "adt_ignore_failure_for_new_tests", to_bool=True) 

185 

186 # When ADT_RESULTS_CACHE_AGE is smaller than or equal to 

187 # ADT_REFERENCE_MAX_AGE old reference result will be removed from cache 

188 # before the newly scheduled results are in, potentially causing 

189 # additional waiting. For packages like glibc this might cause an 

190 # infinite delay as there will always be a package that's 

191 # waiting. Similarly for ADT_RETRY_OLDER_THAN. 

192 if self.options.adt_results_cache_age <= self.options.adt_reference_max_age: 

193 self.logger.warning( 

194 "Unexpected: ADT_REFERENCE_MAX_AGE bigger than ADT_RESULTS_CACHE_AGE" 

195 ) 

196 if self.options.adt_results_cache_age <= self.options.adt_retry_older_than: 

197 self.logger.warning( 

198 "Unexpected: ADT_RETRY_OLDER_THAN bigger than ADT_RESULTS_CACHE_AGE" 

199 ) 

200 

201 if not self.options.adt_log_url: 201 ↛ 227line 201 didn't jump to line 227, because the condition on line 201 was never false

202 # Historical defaults 

203 if self.options.adt_swift_url.startswith("file://"): 

204 self.options.adt_log_url = os.path.join( 

205 self.options.adt_ci_url, 

206 "data", 

207 "autopkgtest", 

208 self.options.series, 

209 "{arch}", 

210 "{hash}", 

211 "{package}", 

212 "{run_id}", 

213 "log.gz", 

214 ) 

215 else: 

216 self.options.adt_log_url = os.path.join( 

217 self.options.adt_swift_url, 

218 "{swift_container}", 

219 self.options.series, 

220 "{arch}", 

221 "{hash}", 

222 "{package}", 

223 "{run_id}", 

224 "log.gz", 

225 ) 

226 

227 if hasattr(self.options, "adt_retry_url_mech"): 227 ↛ 228line 227 didn't jump to line 228, because the condition on line 227 was never true

228 self.logger.warning( 

229 "The ADT_RETRY_URL_MECH configuration has been deprecated." 

230 ) 

231 self.logger.warning( 

232 "Instead britney now supports ADT_RETRY_URL for more flexibility." 

233 ) 

234 if self.options.adt_retry_url: 

235 self.logger.error( 

236 "Please remove the ADT_RETRY_URL_MECH as ADT_RETRY_URL will be used." 

237 ) 

238 elif self.options.adt_retry_url_mech == "run_id": 

239 self.options.adt_retry_url = ( 

240 self.options.adt_ci_url + "api/v1/retry/{run_id}" 

241 ) 

242 if not self.options.adt_retry_url: 242 ↛ 259line 242 didn't jump to line 259, because the condition on line 242 was never false

243 # Historical default 

244 self.options.adt_retry_url = ( 

245 self.options.adt_ci_url 

246 + "request.cgi?" 

247 + "release={release}&arch={arch}&package={package}&trigger={trigger}{ppas}" 

248 ) 

249 

250 # results map: trigger -> src -> arch -> [passed, version, run_id, seen] 

251 # - trigger is "source/version" of an unstable package that triggered 

252 # this test run. 

253 # - "passed" is a Result 

254 # - "version" is the package version of "src" of that test 

255 # - "run_id" is an opaque ID that identifies a particular test run for 

256 # a given src/arch. 

257 # - "seen" is an approximate time stamp of the test run. How this is 

258 # deduced depends on the interface used. 

259 self.test_results: dict[str, dict[str, dict[str, list[Any]]]] = {} 

260 if self.options.adt_shared_results_cache: 

261 self.results_cache_file = self.options.adt_shared_results_cache 

262 else: 

263 self.results_cache_file = os.path.join( 

264 self.state_dir, "autopkgtest-results.cache" 

265 ) 

266 

267 try: 

268 self.options.adt_ppas = self.options.adt_ppas.strip().split() 

269 except AttributeError: 

270 self.options.adt_ppas = [] 

271 

272 self.swift_container = "autopkgtest-" + options.series 

273 if self.options.adt_ppas: 

274 self.swift_container += "-" + options.adt_ppas[-1].replace("/", "-") 

275 

276 # restrict adt_arches to architectures we actually run for 

277 self.adt_arches = [] 

278 for arch in self.options.adt_arches.split(): 

279 if arch in self.options.architectures: 

280 self.adt_arches.append(arch) 

281 else: 

282 self.logger.info( 

283 "Ignoring ADT_ARCHES %s as it is not in architectures list", arch 

284 ) 

285 

286 def __del__(self) -> None: 

287 if self.amqp_file_handle: 287 ↛ exitline 287 didn't return from function '__del__', because the condition on line 287 was never false

288 try: 

289 self.amqp_file_handle.close() 

290 except AttributeError: 

291 pass 

292 

293 def register_hints(self, hint_parser: "HintParser") -> None: 

294 hint_parser.register_hint_type( 

295 "force-badtest", britney2.hints.split_into_one_hint_per_package 

296 ) 

297 hint_parser.register_hint_type( 

298 "force-skiptest", britney2.hints.split_into_one_hint_per_package 

299 ) 

300 

301 def initialise(self, britney: "Britney") -> None: 

302 super().initialise(britney) 

303 # We want to use the "current" time stamp in multiple locations 

304 time_now = round(time.time()) 

305 if hasattr(self.options, "fake_runtime"): 

306 time_now = int(self.options.fake_runtime) 

307 self._now = time_now 

308 # compute inverse Testsuite-Triggers: map, unifying all series 

309 self.logger.info("Building inverse testsuite_triggers map") 

310 for suite in self.suite_info: 

311 for src, data in suite.sources.items(): 

312 for trigger in data.testsuite_triggers: 

313 self.testsuite_triggers.setdefault(trigger, set()).add(src) 

314 target_suite_name = self.suite_info.target_suite.name 

315 

316 os.makedirs(self.state_dir, exist_ok=True) 

317 self.read_pending_tests() 

318 

319 # read the cached results that we collected so far 

320 if os.path.exists(self.results_cache_file): 

321 with open(self.results_cache_file) as f: 

322 test_results = json.load(f) 

323 self.test_results = self.check_and_upgrade_cache(test_results) 

324 self.logger.info("Read previous results from %s", self.results_cache_file) 

325 else: 

326 self.logger.info( 

327 "%s does not exist, re-downloading all results from swift", 

328 self.results_cache_file, 

329 ) 

330 

331 # read in the new results 

332 if self.options.adt_swift_url.startswith("file://"): 

333 debci_file = self.options.adt_swift_url[7:] 

334 if os.path.exists(debci_file): 

335 with open(debci_file) as f: 

336 test_results = json.load(f) 

337 self.logger.info("Read new results from %s", debci_file) 

338 for res in test_results["results"]: 

339 # if there's no date, the test didn't finish yet 

340 if res["date"] is None: 340 ↛ 341line 340 didn't jump to line 341, because the condition on line 340 was never true

341 continue 

342 (test_suite, triggers, src, arch, ver, status, run_id, seen) = [ 

343 res["suite"], 

344 res["trigger"], 

345 res["package"], 

346 res["arch"], 

347 res["version"], 

348 res["status"], 

349 str(res["run_id"]), 

350 round( 

351 calendar.timegm( 

352 time.strptime(res["date"][0:-5], "%Y-%m-%dT%H:%M:%S") 

353 ) 

354 ), 

355 ] 

356 if test_suite != target_suite_name: 356 ↛ 358line 356 didn't jump to line 358, because the condition on line 356 was never true

357 # not requested for this target suite, so ignore 

358 continue 

359 if triggers is None: 359 ↛ 361line 359 didn't jump to line 361, because the condition on line 359 was never true

360 # not requested for this policy, so ignore 

361 continue 

362 if status is None: 

363 # still running => pending 

364 continue 

365 for trigger in triggers.split(): 

366 # remove matching test requests 

367 self.remove_from_pending(trigger, src, arch, seen) 

368 if status == "tmpfail": 368 ↛ 370line 368 didn't jump to line 370, because the condition on line 368 was never true

369 # let's see if we still need it 

370 continue 

371 self.logger.debug( 

372 "Results %s %s %s added", src, trigger, status 

373 ) 

374 self.add_trigger_to_results( 

375 trigger, 

376 src, 

377 ver, 

378 arch, 

379 run_id, 

380 seen, 

381 Result[status.upper()], 

382 ) 

383 else: 

384 self.logger.info( 

385 "%s does not exist, no new data will be processed", debci_file 

386 ) 

387 

388 # The cache can contain results against versions of packages that 

389 # are not in any suite anymore. Strip those out, as we don't want 

390 # to use those results. Additionally, old references may be 

391 # filtered out. 

392 if self.options.adt_baseline == "reference": 

393 self.filter_old_results() 

394 

395 # we need sources, binaries, and installability tester, so for now 

396 # remember the whole britney object 

397 self.britney = britney 

398 

399 # Initialize AMQP connection 

400 self.amqp_channel: Optional["amqp.channel.Channel"] = None 

401 self.amqp_file_handle = None 

402 if self.options.dry_run: 402 ↛ 403line 402 didn't jump to line 403, because the condition on line 402 was never true

403 return 

404 

405 amqp_url = self.options.adt_amqp 

406 

407 if amqp_url.startswith("amqp://"): 407 ↛ 408line 407 didn't jump to line 408, because the condition on line 407 was never true

408 import amqplib.client_0_8 as amqp 

409 

410 # depending on the setup we connect to a AMQP server 

411 creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False) 

412 self.amqp_con = amqp.Connection( 

413 creds.hostname, userid=creds.username, password=creds.password 

414 ) 

415 self.amqp_channel = self.amqp_con.channel() 

416 self.logger.info("Connected to AMQP server") 

417 elif amqp_url.startswith("file://"): 417 ↛ 422line 417 didn't jump to line 422, because the condition on line 417 was never false

418 # or in Debian and in testing mode, adt_amqp will be a file:// URL 

419 amqp_file = amqp_url[7:] 

420 self.amqp_file_handle = open(amqp_file, "w", 1) 

421 else: 

422 raise RuntimeError("Unknown ADT_AMQP schema %s" % amqp_url.split(":", 1)[0]) 

423 

424 def check_and_upgrade_cache( 

425 self, test_results: dict[str, dict[str, dict[str, list[Any]]]] 

426 ) -> dict[str, dict[str, dict[str, list[Any]]]]: 

427 for leaf_result in all_leaf_results(test_results): 

428 leaf_result[0] = Result[leaf_result[0]] 

429 

430 # Drop results older than ADT_RESULTS_CACHE_AGE 

431 for trigger in list(test_results.keys()): 

432 for pkg in list(test_results[trigger].keys()): 

433 for arch in list(test_results[trigger][pkg].keys()): 

434 arch_result = test_results[trigger][pkg][arch] 

435 if self._now - arch_result[3] > self.options.adt_results_cache_age: 435 ↛ 436line 435 didn't jump to line 436, because the condition on line 435 was never true

436 del test_results[trigger][pkg][arch] 

437 if not test_results[trigger][pkg]: 437 ↛ 438line 437 didn't jump to line 438, because the condition on line 437 was never true

438 del test_results[trigger][pkg] 

439 if not test_results[trigger]: 439 ↛ 440line 439 didn't jump to line 440, because the condition on line 439 was never true

440 del test_results[trigger] 

441 

442 return test_results 

443 

444 def filter_old_results(self) -> None: 

445 """Remove results for old versions and reference runs from the cache. 

446 

447 For now, only delete reference runs. If we delete regular 

448 results after a while, packages with lots of triggered tests may 

449 never have all the results at the same time.""" 

450 

451 test_results = self.test_results 

452 

453 for trigger, trigger_data in test_results.items(): 

454 for src, results in trigger_data.items(): 

455 for arch, result in results.items(): 

456 if ( 

457 trigger == REF_TRIG 

458 and self._now - result[3] > self.options.adt_reference_max_age 

459 ): 

460 result[0] = mark_result_as_old(result[0]) 

461 elif not self.test_version_in_any_suite(src, result[1]): 

462 result[0] = mark_result_as_old(result[0]) 

463 

464 def test_version_in_any_suite(self, src: str, version: str) -> bool: 

465 """Check if the mentioned version of src is found in a suite 

466 

467 To prevent regressions in the target suite, the result should be 

468 from a test with the version of the package in either the source 

469 suite or the target suite. The source suite is also valid, 

470 because due to versioned test dependencies and Breaks/Conflicts 

471 relations, regularly the version in the source suite is used 

472 during testing. 

473 """ 

474 

475 versions = set() 

476 for suite in self.suite_info: 

477 try: 

478 srcinfo = suite.sources[src] 

479 except KeyError: 

480 continue 

481 versions.add(srcinfo.version) 

482 

483 valid_version = False 

484 for ver in versions: 

485 if apt_pkg.version_compare(ver, version) == 0: 

486 valid_version = True 

487 break 

488 

489 return valid_version 

490 

491 def save_pending_json(self) -> None: 

492 # update the pending tests on-disk cache 

493 self.logger.info( 

494 "Updating pending requested tests in %s" % self.pending_tests_file 

495 ) 

496 # Shallow clone pending_tests as we only modify the toplevel and change its type. 

497 pending_tests: dict[str, Any] = {} 

498 if self.pending_tests: 

499 pending_tests = dict(self.pending_tests) 

500 # Avoid adding if there are no pending results at all (eases testing) 

501 pending_tests[VERSION_KEY] = 1 

502 with open(self.pending_tests_file + ".new", "w") as f: 

503 json.dump(pending_tests, f, indent=2) 

504 os.rename(self.pending_tests_file + ".new", self.pending_tests_file) 

505 

506 def save_state(self, britney: "Britney") -> None: 

507 super().save_state(britney) 

508 

509 # update the results on-disk cache, unless we are using a r/o shared one 

510 if not self.options.adt_shared_results_cache: 

511 self.logger.info("Updating results cache") 

512 test_results = deepcopy(self.test_results) 

513 for result in all_leaf_results(test_results): 

514 result[0] = result[0].name 

515 with open(self.results_cache_file + ".new", "w") as f: 

516 json.dump(test_results, f, indent=2) 

517 os.rename(self.results_cache_file + ".new", self.results_cache_file) 

518 

519 self.save_pending_json() 

520 

521 def format_retry_url( 

522 self, run_id: Optional[str], arch: str, testsrc: str, trigger: str 

523 ) -> str: 

524 if self.options.adt_ppas: 

525 ppas = "&" + urllib.parse.urlencode( 

526 [("ppa", p) for p in self.options.adt_ppas] 

527 ) 

528 else: 

529 ppas = "" 

530 return cast(str, self.options.adt_retry_url).format( 

531 run_id=run_id, 

532 release=self.options.series, 

533 arch=arch, 

534 package=testsrc, 

535 trigger=urllib.parse.quote_plus(trigger), 

536 ppas=ppas, 

537 ) 

538 

539 def format_log_url(self, testsrc: str, arch: str, run_id: str) -> str: 

540 return cast(str, self.options.adt_log_url).format( 

541 release=self.options.series, 

542 swift_container=self.swift_container, 

543 hash=srchash(testsrc), 

544 package=testsrc, 

545 arch=arch, 

546 run_id=run_id, 

547 ) 

548 

549 def apply_src_policy_impl( 

550 self, 

551 tests_info: dict[str, Any], 

552 item: MigrationItem, 

553 source_data_tdist: Optional[SourcePackage], 

554 source_data_srcdist: SourcePackage, 

555 excuse: "Excuse", 

556 ) -> PolicyVerdict: 

557 assert self.hints is not None # for type checking 

558 # initialize 

559 verdict = PolicyVerdict.PASS 

560 all_self_tests_pass = False 

561 source_name = item.package 

562 results_info = [] 

563 

564 # skip/delay autopkgtests until new package is built somewhere 

565 if not source_data_srcdist.binaries: 

566 self.logger.debug( 

567 "%s hasnot been built anywhere, skipping autopkgtest policy", 

568 excuse.name, 

569 ) 

570 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

571 excuse.add_verdict_info(verdict, "nothing built yet, autopkgtest delayed") 

572 

573 if "all" in excuse.missing_builds: 

574 self.logger.debug( 

575 "%s hasnot been built for arch:all, skipping autopkgtest policy", 

576 source_name, 

577 ) 

578 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

579 excuse.add_verdict_info( 

580 verdict, "arch:all not built yet, autopkgtest delayed" 

581 ) 

582 

583 if not verdict.is_rejected: 

584 self.logger.debug("Checking autopkgtests for %s", source_name) 

585 trigger = source_name + "/" + source_data_srcdist.version 

586 

587 # build a (testsrc, testver) → arch → (status, run_id, log_url) map; we trigger/check test 

588 # results per architecture for technical/efficiency reasons, but we 

589 # want to evaluate and present the results by tested source package 

590 # first 

591 pkg_arch_result: dict[ 

592 tuple[str, str], dict[str, tuple[str, Optional[str], str]] 

593 ] = collections.defaultdict(dict) 

594 for arch in self.adt_arches: 

595 if arch in excuse.missing_builds: 

596 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

597 self.logger.debug( 

598 "%s hasnot been built on arch %s, delay autopkgtest there", 

599 source_name, 

600 arch, 

601 ) 

602 excuse.add_verdict_info( 

603 verdict, 

604 "arch:%s not built yet, autopkgtest delayed there" % arch, 

605 ) 

606 elif arch in excuse.policy_info["depends"].get( 

607 "arch_all_not_installable", [] 

608 ): 

609 self.logger.debug( 

610 "%s is uninstallable on arch %s (which is allowed), not running autopkgtest there", 

611 source_name, 

612 arch, 

613 ) 

614 excuse.addinfo( 

615 "uninstallable on arch %s (which is allowed), not running autopkgtest there" 

616 % arch 

617 ) 

618 elif ( 

619 arch in excuse.unsatisfiable_on_archs 

620 and arch 

621 not in excuse.policy_info["depends"].get( 

622 "autopkgtest_run_anyways", [] 

623 ) 

624 ): 

625 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

626 self.logger.debug( 

627 "%s is uninstallable on arch %s, not running autopkgtest there", 

628 source_name, 

629 arch, 

630 ) 

631 excuse.addinfo( 

632 "uninstallable on arch %s, not running autopkgtest there" % arch 

633 ) 

634 else: 

635 self.request_tests_for_source( 

636 item, arch, source_data_srcdist, pkg_arch_result, excuse 

637 ) 

638 

639 # add test result details to Excuse 

640 cloud_url = self.options.adt_ci_url + "packages/%(h)s/%(s)s/%(r)s/%(a)s" 

641 testver: Optional[str] 

642 for testsrc, testver in sorted(pkg_arch_result): 

643 assert testver is not None 

644 arch_results = pkg_arch_result[(testsrc, testver)] 

645 r = {v[0] for v in arch_results.values()} 

646 if "REGRESSION" in r: 

647 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

648 elif ( 

649 "RUNNING" in r or "RUNNING-REFERENCE" in r 

650 ) and not verdict.is_rejected: 

651 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

652 # skip version if still running on all arches 

653 if not r - {"RUNNING", "RUNNING-ALWAYSFAIL"}: 

654 testver = None 

655 

656 # A source package is eligible for the bounty if it has tests 

657 # of its own that pass on all tested architectures. 

658 if testsrc == source_name: 

659 excuse.autopkgtest_results = r 

660 if r == {"PASS"}: 

661 all_self_tests_pass = True 

662 

663 if testver: 

664 testname = "%s/%s" % (testsrc, testver) 

665 else: 

666 testname = testsrc 

667 

668 html_archmsg = [] 

669 for arch in sorted(arch_results): 

670 (status, run_id, log_url) = arch_results[arch] 

671 artifact_url = None 

672 retry_url = None 

673 reference_url = None 

674 reference_retry_url = None 

675 history_url = None 

676 if self.options.adt_ppas: 

677 if log_url.endswith("log.gz"): 

678 artifact_url = log_url.replace("log.gz", "artifacts.tar.gz") 

679 else: 

680 history_url = cloud_url % { 

681 "h": srchash(testsrc), 

682 "s": testsrc, 

683 "r": self.options.series, 

684 "a": arch, 

685 } 

686 if status in ("NEUTRAL", "REGRESSION", "RUNNING-REFERENCE"): 

687 retry_url = self.format_retry_url( 

688 run_id, arch, testsrc, trigger 

689 ) 

690 

691 baseline_result = self.result_in_baseline(testsrc, arch) 

692 if baseline_result and baseline_result[0] != Result.NONE: 

693 baseline_run_id = str(baseline_result[2]) 

694 reference_url = self.format_log_url( 

695 testsrc, arch, baseline_run_id 

696 ) 

697 if self.options.adt_baseline == "reference": 

698 reference_retry_url = self.format_retry_url( 

699 baseline_run_id, arch, testsrc, REF_TRIG 

700 ) 

701 tests_info.setdefault(testname, {})[arch] = [ 

702 status, 

703 log_url, 

704 history_url, 

705 artifact_url, 

706 retry_url, 

707 ] 

708 

709 # render HTML snippet for testsrc entry for current arch 

710 if history_url: 

711 message = '<a href="%s">%s</a>' % (history_url, arch) 

712 else: 

713 message = arch 

714 message += ': <a href="%s">%s</a>' % ( 

715 log_url, 

716 EXCUSES_LABELS[status], 

717 ) 

718 if retry_url: 

719 message += ( 

720 '<a href="%s" style="text-decoration: none;"> ♻</a>' 

721 % retry_url 

722 ) 

723 if reference_url: 

724 message += ' (<a href="%s">reference</a>' % reference_url 

725 if reference_retry_url: 

726 message += ( 

727 '<a href="%s" style="text-decoration: none;"> ♻</a>' 

728 % reference_retry_url 

729 ) 

730 message += ")" 

731 if artifact_url: 

732 message += ' <a href="%s">[artifacts]</a>' % artifact_url 

733 html_archmsg.append(message) 

734 

735 # render HTML line for testsrc entry 

736 # - if action is or may be required 

737 # - for ones own package 

738 if ( 

739 r 

740 - { 

741 "PASS", 

742 "NEUTRAL", 

743 "RUNNING-ALWAYSFAIL", 

744 "ALWAYSFAIL", 

745 "IGNORE-FAIL", 

746 } 

747 or testsrc == source_name 

748 ): 

749 if testver: 

750 pkg = '<a href="#{0}">{0}</a>/{1}'.format(testsrc, testver) 

751 else: 

752 pkg = '<a href="#{0}">{0}</a>'.format(testsrc) 

753 results_info.append( 

754 "autopkgtest for %s: %s" % (pkg, ", ".join(html_archmsg)) 

755 ) 

756 

757 if verdict.is_rejected: 

758 # check for force-skiptest hint 

759 hints = self.hints.search( 

760 "force-skiptest", 

761 package=source_name, 

762 version=source_data_srcdist.version, 

763 ) 

764 if hints: 

765 excuse.addreason("skiptest") 

766 excuse.addinfo( 

767 "Should wait for tests relating to %s %s, but forced by %s" 

768 % (source_name, source_data_srcdist.version, hints[0].user) 

769 ) 

770 verdict = PolicyVerdict.PASS_HINTED 

771 else: 

772 excuse.addreason("autopkgtest") 

773 

774 if ( 

775 self.options.adt_success_bounty 

776 and verdict == PolicyVerdict.PASS 

777 and all_self_tests_pass 

778 ): 

779 excuse.add_bounty("autopkgtest", self.options.adt_success_bounty) 

780 if self.options.adt_regression_penalty and verdict in { 

781 PolicyVerdict.REJECTED_PERMANENTLY, 

782 PolicyVerdict.REJECTED_TEMPORARILY, 

783 }: 

784 if self.options.adt_regression_penalty > 0: 784 ↛ 787line 784 didn't jump to line 787, because the condition on line 784 was never false

785 excuse.add_penalty("autopkgtest", self.options.adt_regression_penalty) 

786 # In case we give penalties instead of blocking, we must always pass 

787 verdict = PolicyVerdict.PASS 

788 for i in results_info: 

789 if verdict.is_rejected: 

790 excuse.add_verdict_info(verdict, i) 

791 else: 

792 excuse.addinfo(i) 

793 

794 return verdict 

795 

796 # 

797 # helper functions 

798 # 

799 

800 @staticmethod 

801 def has_autodep8(srcinfo: SourcePackage) -> bool: 

802 """Check if package is covered by autodep8 

803 

804 srcinfo is an item from self.britney.sources 

805 """ 

806 # autodep8? 

807 for t in srcinfo.testsuite: 

808 if t.startswith("autopkgtest-pkg"): 

809 return True 

810 

811 return False 

812 

813 def request_tests_for_source( 

814 self, 

815 item: MigrationItem, 

816 arch: str, 

817 source_data_srcdist: SourcePackage, 

818 pkg_arch_result: dict[ 

819 tuple[str, str], dict[str, tuple[str, Optional[str], str]] 

820 ], 

821 excuse: "Excuse", 

822 ) -> None: 

823 pkg_universe = self.britney.pkg_universe 

824 target_suite = self.suite_info.target_suite 

825 source_suite = item.suite 

826 sources_t = target_suite.sources 

827 sources_s = item.suite.sources 

828 packages_s_a = item.suite.binaries[arch] 

829 source_name = item.package 

830 source_version = source_data_srcdist.version 

831 # request tests (unless they were already requested earlier or have a result) 

832 tests = self.tests_for_source(source_name, source_version, arch, excuse) 

833 is_huge = len(tests) > self.options.adt_huge 

834 

835 # Here we figure out what is required from the source suite 

836 # for the test to install successfully. 

837 # 

838 # The ImplicitDependencyPolicy does a similar calculation, but 

839 # if I (elbrus) understand correctly, only in the reverse 

840 # dependency direction. We are doing something similar here 

841 # but in the dependency direction (note: this code is older). 

842 # We use the ImplicitDependencyPolicy result for the reverse 

843 # dependencies and we keep the code below for the 

844 # dependencies. Using the ImplicitDependencyPolicy results 

845 # also in the reverse direction seems to require quite some 

846 # reorganisation to get that information available here, as in 

847 # the current state only the current excuse is available here 

848 # and the required other excuses may not be calculated yet. 

849 # 

850 # Loop over all binary packages from trigger and 

851 # recursively look up which *versioned* dependencies are 

852 # only satisfied in the source suite. 

853 # 

854 # For all binaries found, look up which packages they 

855 # break/conflict with in the target suite, but not in the 

856 # source suite. The main reason to do this is to cover test 

857 # dependencies, so we will check Testsuite-Triggers as 

858 # well. 

859 # 

860 # OI: do we need to do the first check in a smart way 

861 # (i.e. only for the packages that are actually going to be 

862 # installed) for the breaks/conflicts set as well, i.e. do 

863 # we need to check if any of the packages that we now 

864 # enforce being from the source suite, actually have new 

865 # versioned depends and new breaks/conflicts. 

866 # 

867 # For all binaries found, add the set of unique source 

868 # packages to the list of triggers. 

869 

870 bin_triggers: set[PackageId] = set() 

871 bin_new = set(source_data_srcdist.binaries) 

872 for n_binary in iter_except(bin_new.pop, KeyError): 

873 if n_binary in bin_triggers: 

874 continue 

875 bin_triggers.add(n_binary) 

876 

877 # Check if there is a dependency that is not 

878 # available in the target suite. 

879 # We add slightly too much here, because new binaries 

880 # will also show up, but they are already properly 

881 # installed. Nevermind. 

882 depends = pkg_universe.dependencies_of(n_binary) 

883 # depends is a frozenset{frozenset{BinaryPackageId, ..}} 

884 for deps_of_bin in depends: 

885 if target_suite.any_of_these_are_in_the_suite(deps_of_bin): 

886 # if any of the alternative dependencies is already 

887 # satisfied in the target suite, we can just ignore it 

888 continue 

889 # We'll figure out which version later 

890 bin_new.update( 

891 added_pkgs_compared_to_target_suite(deps_of_bin, target_suite) 

892 ) 

893 

894 # Check if the package breaks/conflicts anything. We might 

895 # be adding slightly too many source packages due to the 

896 # check here as a binary package that is broken may be 

897 # coming from a different source package in the source 

898 # suite. Nevermind. 

899 bin_broken = set() 

900 for t_binary in bin_triggers: 

901 # broken is a frozenset{BinaryPackageId, ..} 

902 broken = pkg_universe.negative_dependencies_of( 

903 cast(BinaryPackageId, t_binary) 

904 ) 

905 broken_in_target = { 

906 p.package_name 

907 for p in target_suite.which_of_these_are_in_the_suite(broken) 

908 } 

909 broken_in_source = { 

910 p.package_name 

911 for p in source_suite.which_of_these_are_in_the_suite(broken) 

912 } 

913 # We want packages with a newer version in the source suite that 

914 # no longer has the conflict. This is an approximation 

915 broken_filtered = set( 

916 p 

917 for p in broken 

918 if p.package_name in broken_in_target 

919 and p.package_name not in broken_in_source 

920 ) 

921 # We add the version in the target suite, but the code below will 

922 # change it to the version in the source suite 

923 bin_broken.update(broken_filtered) 

924 bin_triggers.update(bin_broken) 

925 

926 # The ImplicitDependencyPolicy also found packages that need 

927 # to migrate together, so add them to the triggers too. 

928 for bin_implicit in excuse.depends_packages_flattened: 

929 if bin_implicit.architecture == arch: 

930 bin_triggers.add(bin_implicit) 

931 

932 triggers = set() 

933 for t_binary2 in bin_triggers: 

934 if t_binary2.architecture == arch: 

935 try: 

936 source_of_bin = packages_s_a[t_binary2.package_name].source 

937 # If the version in the target suite is the same, don't add a trigger. 

938 # Note that we looked up the source package in the source suite. 

939 # If it were a different source package in the target suite, however, then 

940 # we would not have this source package in the same version anyway. 

941 if ( 

942 sources_t.get(source_of_bin, None) is None 

943 or sources_s[source_of_bin].version 

944 != sources_t[source_of_bin].version 

945 ): 

946 triggers.add( 

947 source_of_bin + "/" + sources_s[source_of_bin].version 

948 ) 

949 except KeyError: 

950 # Apparently the package was removed from 

951 # unstable e.g. if packages are replaced 

952 # (e.g. -dbg to -dbgsym) 

953 pass 

954 if t_binary2 not in source_data_srcdist.binaries: 

955 for tdep_src in self.testsuite_triggers.get( 955 ↛ 958line 955 didn't jump to line 958, because the loop on line 955 never started

956 t_binary2.package_name, set() 

957 ): 

958 try: 

959 # Only add trigger if versions in the target and source suites are different 

960 if ( 

961 sources_t.get(tdep_src, None) is None 

962 or sources_s[tdep_src].version 

963 != sources_t[tdep_src].version 

964 ): 

965 triggers.add( 

966 tdep_src + "/" + sources_s[tdep_src].version 

967 ) 

968 except KeyError: 

969 # Apparently the source was removed from 

970 # unstable (testsuite_triggers are unified 

971 # over all suites) 

972 pass 

973 trigger = source_name + "/" + source_version 

974 triggers.discard(trigger) 

975 triggers_list = sorted(list(triggers)) 

976 triggers_list.insert(0, trigger) 

977 

978 for testsrc, testver in tests: 

979 self.pkg_test_request(testsrc, arch, triggers_list, huge=is_huge) 

980 (result, real_ver, run_id, url) = self.pkg_test_result( 

981 testsrc, testver, arch, trigger 

982 ) 

983 pkg_arch_result[(testsrc, real_ver)][arch] = (result, run_id, url) 

984 

985 def tests_for_source( 

986 self, src: str, ver: str, arch: str, excuse: "Excuse" 

987 ) -> list[tuple[str, str]]: 

988 """Iterate over all tests that should be run for given source and arch""" 

989 

990 source_suite = self.suite_info.primary_source_suite 

991 target_suite = self.suite_info.target_suite 

992 sources_info = target_suite.sources 

993 binaries_info = target_suite.binaries[arch] 

994 

995 reported_pkgs = set() 

996 

997 tests = [] 

998 

999 # Debian doesn't have linux-meta, but Ubuntu does 

1000 # for linux themselves we don't want to trigger tests -- these should 

1001 # all come from linux-meta*. A new kernel ABI without a corresponding 

1002 # -meta won't be installed and thus we can't sensibly run tests against 

1003 # it. 

1004 if ( 1004 ↛ 1008line 1004 didn't jump to line 1008

1005 src.startswith("linux") 

1006 and src.replace("linux", "linux-meta") in sources_info 

1007 ): 

1008 return [] 

1009 

1010 # we want to test the package itself, if it still has a test in unstable 

1011 # but only if the package actually exists on this arch 

1012 srcinfo = source_suite.sources[src] 

1013 if ("autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo)) and len( 

1014 excuse.packages[arch] 

1015 ) > 0: 

1016 reported_pkgs.add(src) 

1017 tests.append((src, ver)) 

1018 

1019 extra_bins = [] 

1020 # Debian doesn't have linux-meta, but Ubuntu does 

1021 # Hack: For new kernels trigger all DKMS packages by pretending that 

1022 # linux-meta* builds a "dkms" binary as well. With that we ensure that we 

1023 # don't regress DKMS drivers with new kernel versions. 

1024 if src.startswith("linux-meta"): 

1025 # does this have any image on this arch? 

1026 for pkg_id in srcinfo.binaries: 

1027 if pkg_id.architecture == arch and "-image" in pkg_id.package_name: 

1028 try: 

1029 extra_bins.append(binaries_info["dkms"].pkg_id) 

1030 except KeyError: 

1031 pass 

1032 

1033 if not self.has_built_on_this_arch_or_is_arch_all(srcinfo, arch): 

1034 return [] 

1035 

1036 pkg_universe = self.britney.pkg_universe 

1037 # plus all direct reverse dependencies and test triggers of its 

1038 # binaries which have an autopkgtest 

1039 for binary in itertools.chain(srcinfo.binaries, extra_bins): 

1040 rdeps = pkg_universe.reverse_dependencies_of(binary) 

1041 for rdep in rdeps: 

1042 try: 

1043 rdep_src = binaries_info[rdep.package_name].source 

1044 # Don't re-trigger the package itself here; this should 

1045 # have been done above if the package still continues to 

1046 # have an autopkgtest in unstable. 

1047 if rdep_src == src: 

1048 continue 

1049 except KeyError: 

1050 continue 

1051 

1052 rdep_src_info = sources_info[rdep_src] 

1053 if "autopkgtest" in rdep_src_info.testsuite or self.has_autodep8( 

1054 rdep_src_info 

1055 ): 

1056 if rdep_src not in reported_pkgs: 

1057 tests.append((rdep_src, rdep_src_info.version)) 

1058 reported_pkgs.add(rdep_src) 

1059 

1060 for tdep_src in self.testsuite_triggers.get(binary.package_name, set()): 

1061 if tdep_src not in reported_pkgs: 

1062 try: 

1063 tdep_src_info = sources_info[tdep_src] 

1064 except KeyError: 

1065 continue 

1066 if "autopkgtest" in tdep_src_info.testsuite or self.has_autodep8( 1066 ↛ 1060line 1066 didn't jump to line 1060, because the condition on line 1066 was never false

1067 tdep_src_info 

1068 ): 

1069 for pkg_id in tdep_src_info.binaries: 1069 ↛ 1060line 1069 didn't jump to line 1060, because the loop on line 1069 didn't complete

1070 if pkg_id.architecture == arch: 

1071 tests.append((tdep_src, tdep_src_info.version)) 

1072 reported_pkgs.add(tdep_src) 

1073 break 

1074 

1075 tests.sort(key=lambda s_v: s_v[0]) 

1076 return tests 

1077 

1078 def read_pending_tests(self) -> None: 

1079 """Read pending test requests from previous britney runs 

1080 

1081 Initialize self.pending_tests with that data. 

1082 """ 

1083 assert self.pending_tests is None, "already initialized" 

1084 if not os.path.exists(self.pending_tests_file): 

1085 self.logger.info( 

1086 "No %s, starting with no pending tests", self.pending_tests_file 

1087 ) 

1088 self.pending_tests = {} 

1089 return 

1090 with open(self.pending_tests_file) as f: 

1091 self.pending_tests = json.load(f) 

1092 if VERSION_KEY in self.pending_tests: 

1093 del self.pending_tests[VERSION_KEY] 

1094 for trigger in list(self.pending_tests.keys()): 

1095 for pkg in list(self.pending_tests[trigger].keys()): 

1096 arch_dict = self.pending_tests[trigger][pkg] 

1097 for arch in list(arch_dict.keys()): 

1098 if ( 

1099 self._now - arch_dict[arch] 

1100 > self.options.adt_pending_max_age 

1101 ): 

1102 del arch_dict[arch] 

1103 if not arch_dict: 

1104 del self.pending_tests[trigger][pkg] 

1105 if not self.pending_tests[trigger]: 

1106 del self.pending_tests[trigger] 

1107 else: 

1108 # Migration code: 

1109 for trigger_data in self.pending_tests.values(): 1109 ↛ 1110line 1109 didn't jump to line 1110, because the loop on line 1109 never started

1110 for pkg, arch_list in trigger_data.items(): 

1111 trigger_data[pkg] = {} 

1112 for arch in arch_list: 

1113 trigger_data[pkg][arch] = self._now 

1114 

1115 self.logger.info( 

1116 "Read pending requested tests from %s", self.pending_tests_file 

1117 ) 

1118 self.logger.debug("%s", self.pending_tests) 

1119 

1120 # this requires iterating over all triggers and thus is expensive; 

1121 # cache the results 

1122 @lru_cache(None) 

1123 def latest_run_for_package(self, src: str, arch: str) -> str: 

1124 """Return latest run ID for src on arch""" 

1125 

1126 latest_run_id = "" 

1127 for srcmap in self.test_results.values(): 

1128 try: 

1129 run_id = srcmap[src][arch][2] 

1130 except KeyError: 

1131 continue 

1132 if run_id > latest_run_id: 

1133 latest_run_id = run_id 

1134 return latest_run_id 

1135 

1136 def urlopen_retry(self, url: str) -> http.client.HTTPResponse | addinfourl: 

1137 """A urlopen() that retries on time outs or errors""" 

1138 

1139 exc: Exception 

1140 for retry in range(5): 1140 ↛ 1161line 1140 didn't jump to line 1161, because the loop on line 1140 didn't complete

1141 try: 

1142 req = urlopen(url, timeout=30) 

1143 code = req.getcode() 

1144 if not code or 200 <= code < 300: 1144 ↛ 1140line 1144 didn't jump to line 1140, because the condition on line 1144 was never false

1145 return req # type: ignore[no-any-return] 

1146 except socket.timeout as e: 1146 ↛ 1147line 1146 didn't jump to line 1147, because the exception caught by line 1146 didn't happen

1147 self.logger.info( 

1148 "Timeout downloading '%s', will retry %d more times." 

1149 % (url, 5 - retry - 1) 

1150 ) 

1151 exc = e 

1152 except HTTPError as e: 

1153 if e.code not in (503, 502): 1153 ↛ 1155line 1153 didn't jump to line 1155, because the condition on line 1153 was never false

1154 raise 

1155 self.logger.info( 

1156 "Caught error %d downloading '%s', will retry %d more times." 

1157 % (e.code, url, 5 - retry - 1) 

1158 ) 

1159 exc = e 

1160 else: 

1161 raise exc 

1162 

1163 @lru_cache(None) 

1164 def fetch_swift_results(self, swift_url: str, src: str, arch: str) -> None: 

1165 """Download new results for source package/arch from swift""" 

1166 

1167 # prepare query: get all runs with a timestamp later than the latest 

1168 # run_id for this package/arch; '@' is at the end of each run id, to 

1169 # mark the end of a test run directory path 

1170 # example: <autopkgtest-wily>wily/amd64/libp/libpng/20150630_054517@/result.tar 

1171 query = { 

1172 "delimiter": "@", 

1173 "prefix": "%s/%s/%s/%s/" % (self.options.series, arch, srchash(src), src), 

1174 } 

1175 

1176 # determine latest run_id from results 

1177 if not self.options.adt_shared_results_cache: 

1178 latest_run_id = self.latest_run_for_package(src, arch) 

1179 if latest_run_id: 

1180 query["marker"] = query["prefix"] + latest_run_id 

1181 

1182 # request new results from swift 

1183 url = os.path.join(swift_url, self.swift_container) 

1184 url += "?" + urllib.parse.urlencode(query) 

1185 f = None 

1186 try: 

1187 f = self.urlopen_retry(url) 

1188 if f.getcode() == 200: 

1189 result_paths = f.read().decode().strip().splitlines() 

1190 elif f.getcode() == 204: # No content 1190 ↛ 1196line 1190 didn't jump to line 1196, because the condition on line 1190 was never false

1191 result_paths = [] 

1192 else: 

1193 # we should not ever end up here as we expect a HTTPError in 

1194 # other cases; e. g. 3XX is something that tells us to adjust 

1195 # our URLS, so fail hard on those 

1196 raise NotImplementedError( 

1197 "fetch_swift_results(%s): cannot handle HTTP code %r" 

1198 % (url, f.getcode()) 

1199 ) 

1200 except IOError as e: 

1201 # 401 "Unauthorized" is swift's way of saying "container does not exist" 

1202 if getattr(e, "code", -1) == 401: 1202 ↛ 1211line 1202 didn't jump to line 1211, because the condition on line 1202 was never false

1203 self.logger.info( 

1204 "fetch_swift_results: %s does not exist yet or is inaccessible", url 

1205 ) 

1206 return 

1207 # Other status codes are usually a transient 

1208 # network/infrastructure failure. Ignoring this can lead to 

1209 # re-requesting tests which we already have results for, so 

1210 # fail hard on this and let the next run retry. 

1211 self.logger.error("Failure to fetch swift results from %s: %s", url, str(e)) 

1212 sys.exit(1) 

1213 finally: 

1214 if f is not None: 1214 ↛ 1217line 1214 didn't jump to line 1217, because the condition on line 1214 was never false

1215 f.close() 1215 ↛ exitline 1215 didn't return from function 'fetch_swift_results', because the return on line 1206 wasn't executed

1216 

1217 for p in result_paths: 

1218 self.fetch_one_result( 

1219 os.path.join(swift_url, self.swift_container, p, "result.tar"), 

1220 src, 

1221 arch, 

1222 ) 

1223 

1224 def fetch_one_result(self, url: str, src: str, arch: str) -> None: 

1225 """Download one result URL for source/arch 

1226 

1227 Remove matching pending_tests entries. 

1228 """ 

1229 f = None 

1230 try: 

1231 f = self.urlopen_retry(url) 

1232 if f.getcode() == 200: 1232 ↛ 1235line 1232 didn't jump to line 1235, because the condition on line 1232 was never false

1233 tar_bytes = io.BytesIO(f.read()) 

1234 else: 

1235 raise NotImplementedError( 

1236 "fetch_one_result(%s): cannot handle HTTP code %r" 

1237 % (url, f.getcode()) 

1238 ) 

1239 except IOError as err: 

1240 self.logger.error("Failure to fetch %s: %s", url, str(err)) 

1241 # we tolerate "not found" (something went wrong on uploading the 

1242 # result), but other things indicate infrastructure problems 

1243 if getattr(err, "code", -1) == 404: 

1244 return 

1245 sys.exit(1) 

1246 finally: 

1247 if f is not None: 1247 ↛ exit,   1247 ↛ 12492 missed branches: 1) line 1247 didn't return from function 'fetch_one_result', because the return on line 1244 wasn't executed, 2) line 1247 didn't jump to line 1249, because the condition on line 1247 was never false

1248 f.close() 1248 ↛ exitline 1248 didn't return from function 'fetch_one_result', because the return on line 1244 wasn't executed

1249 try: 

1250 with tarfile.open(None, "r", tar_bytes) as tar: 

1251 exitcode = int(tar.extractfile("exitcode").read().strip()) # type: ignore[union-attr] 

1252 srcver = tar.extractfile("testpkg-version").read().decode().strip() # type: ignore[union-attr] 

1253 (ressrc, ver) = srcver.split() 

1254 testinfo = json.loads(tar.extractfile("testinfo.json").read().decode()) # type: ignore[union-attr] 

1255 except (KeyError, ValueError, tarfile.TarError) as err: 

1256 self.logger.error("%s is damaged, ignoring: %s", url, str(err)) 

1257 # ignore this; this will leave an orphaned request in autopkgtest-pending.json 

1258 # and thus require manual retries after fixing the tmpfail, but we 

1259 # can't just blindly attribute it to some pending test. 

1260 return 

1261 

1262 if src != ressrc: 1262 ↛ 1263line 1262 didn't jump to line 1263, because the condition on line 1262 was never true

1263 self.logger.error( 

1264 "%s is a result for package %s, but expected package %s", 

1265 url, 

1266 ressrc, 

1267 src, 

1268 ) 

1269 return 

1270 

1271 # parse recorded triggers in test result 

1272 for e in testinfo.get("custom_environment", []): 1272 ↛ 1277line 1272 didn't jump to line 1277, because the loop on line 1272 didn't complete

1273 if e.startswith("ADT_TEST_TRIGGERS="): 1273 ↛ 1272line 1273 didn't jump to line 1272, because the condition on line 1273 was never false

1274 result_triggers = [i for i in e.split("=", 1)[1].split() if "/" in i] 

1275 break 

1276 else: 

1277 self.logger.error("%s result has no ADT_TEST_TRIGGERS, ignoring") 

1278 return 

1279 

1280 run_id = os.path.basename(os.path.dirname(url)) 

1281 seen = round(calendar.timegm(time.strptime(run_id, "%Y%m%d_%H%M%S@"))) 

1282 # allow some skipped tests, but nothing else 

1283 if exitcode in [0, 2]: 

1284 result = Result.PASS 

1285 elif exitcode == 8: 1285 ↛ 1286line 1285 didn't jump to line 1286, because the condition on line 1285 was never true

1286 result = Result.NEUTRAL 

1287 else: 

1288 result = Result.FAIL 

1289 

1290 self.logger.info( 

1291 "Fetched test result for %s/%s/%s %s (triggers: %s): %s", 

1292 src, 

1293 ver, 

1294 arch, 

1295 run_id, 

1296 result_triggers, 

1297 result.name.lower(), 

1298 ) 

1299 

1300 # remove matching test requests 

1301 for trigger in result_triggers: 

1302 self.remove_from_pending(trigger, src, arch) 

1303 

1304 # add this result 

1305 for trigger in result_triggers: 

1306 self.add_trigger_to_results(trigger, src, ver, arch, run_id, seen, result) 

1307 

1308 def remove_from_pending( 

1309 self, trigger: str, src: str, arch: str, timestamp: int = sys.maxsize 

1310 ) -> None: 

1311 assert self.pending_tests is not None # for type checking 

1312 try: 

1313 arch_dict = self.pending_tests[trigger][src] 

1314 if timestamp < arch_dict[arch]: 

1315 # The result is from before the moment of scheduling, so it's 

1316 # not the one we're waiting for 

1317 return 

1318 del arch_dict[arch] 

1319 if not arch_dict: 

1320 del self.pending_tests[trigger][src] 

1321 if not self.pending_tests[trigger]: 

1322 del self.pending_tests[trigger] 

1323 self.logger.debug( 

1324 "-> matches pending request %s/%s for trigger %s", src, arch, trigger 

1325 ) 

1326 except KeyError: 

1327 self.logger.debug( 

1328 "-> does not match any pending request for %s/%s", src, arch 

1329 ) 

1330 

1331 def add_trigger_to_results( 

1332 self, 

1333 trigger: str, 

1334 src: str, 

1335 ver: str, 

1336 arch: str, 

1337 run_id: str, 

1338 timestamp: int, 

1339 status_to_add: Result, 

1340 ) -> None: 

1341 # Ensure that we got a new enough version 

1342 try: 

1343 (trigsrc, trigver) = trigger.split("/", 1) 

1344 except ValueError: 

1345 self.logger.info("Ignoring invalid test trigger %s", trigger) 

1346 return 

1347 if trigsrc == src and apt_pkg.version_compare(ver, trigver) < 0: 1347 ↛ 1348line 1347 didn't jump to line 1348, because the condition on line 1347 was never true

1348 self.logger.debug( 

1349 "test trigger %s, but run for older version %s, ignoring", trigger, ver 

1350 ) 

1351 return 

1352 

1353 stored_result = ( 

1354 self.test_results.setdefault(trigger, {}) 

1355 .setdefault(src, {}) 

1356 .setdefault(arch, [Result.FAIL, None, "", 0]) 

1357 ) 

1358 

1359 # reruns shouldn't flip the result from PASS or NEUTRAL to 

1360 # FAIL, so remember the most recent version of the best result 

1361 # we've seen. Except for reference updates, which we always 

1362 # want to update with the most recent result. The result data 

1363 # may not be ordered by timestamp, so we need to check time. 

1364 update = False 

1365 if self.options.adt_baseline == "reference" and trigger == REF_TRIG: 

1366 if stored_result[3] < timestamp: 

1367 update = True 

1368 elif status_to_add < stored_result[0]: 

1369 update = True 

1370 elif status_to_add == stored_result[0] and stored_result[3] < timestamp: 

1371 update = True 

1372 

1373 if update: 

1374 stored_result[0] = status_to_add 

1375 stored_result[1] = ver 

1376 stored_result[2] = run_id 

1377 stored_result[3] = timestamp 

1378 

1379 def send_test_request( 

1380 self, src: str, arch: str, triggers: list[str], huge: bool = False 

1381 ) -> None: 

1382 """Send out AMQP request for testing src/arch for triggers 

1383 

1384 If huge is true, then the request will be put into the -huge instead of 

1385 normal queue. 

1386 """ 

1387 if self.options.dry_run: 1387 ↛ 1388line 1387 didn't jump to line 1388, because the condition on line 1387 was never true

1388 return 

1389 

1390 params: dict[str, Any] = {"triggers": triggers} 

1391 if self.options.adt_ppas: 

1392 params["ppas"] = self.options.adt_ppas 

1393 qname = "debci-ppa-%s-%s" % (self.options.series, arch) 

1394 elif huge: 

1395 qname = "debci-huge-%s-%s" % (self.options.series, arch) 

1396 else: 

1397 qname = "debci-%s-%s" % (self.options.series, arch) 

1398 params["submit-time"] = time.strftime("%Y-%m-%d %H:%M:%S%z", time.gmtime()) 

1399 

1400 if self.amqp_channel: 1400 ↛ 1401line 1400 didn't jump to line 1401, because the condition on line 1400 was never true

1401 self.amqp_channel.basic_publish( 

1402 amqp.Message( 

1403 src + "\n" + json.dumps(params), delivery_mode=2 

1404 ), # persistent 

1405 routing_key=qname, 

1406 ) 

1407 # we save pending.json with every request, so that if britney 

1408 # crashes we don't re-request tests. This is only needed when using 

1409 # real amqp, as with file-based submission the pending tests are 

1410 # returned by debci along with the results each run. 

1411 self.save_pending_json() 

1412 else: 

1413 # for file-based submission, triggers are space separated 

1414 params["triggers"] = [" ".join(params["triggers"])] 

1415 assert self.amqp_file_handle 

1416 self.amqp_file_handle.write("%s:%s %s\n" % (qname, src, json.dumps(params))) 

1417 

1418 def pkg_test_request( 

1419 self, src: str, arch: str, all_triggers: list[str], huge: bool = False 

1420 ) -> None: 

1421 """Request one package test for a set of triggers 

1422 

1423 all_triggers is a list of "pkgname/version". These are the packages 

1424 that will be taken from the source suite. The first package in this 

1425 list is the package that triggers the testing of src, the rest are 

1426 additional packages required for installability of the test deps. If 

1427 huge is true, then the request will be put into the -huge instead of 

1428 normal queue. 

1429 

1430 This will only be done if that test wasn't already requested in 

1431 a previous run (i. e. if it's not already in self.pending_tests) 

1432 or if there is already a fresh or a positive result for it. This 

1433 ensures to download current results for this package before 

1434 requesting any test.""" 

1435 trigger = all_triggers[0] 

1436 uses_swift = not self.options.adt_swift_url.startswith("file://") 

1437 try: 

1438 result = self.test_results[trigger][src][arch] 

1439 has_result = True 

1440 except KeyError: 

1441 has_result = False 

1442 

1443 if has_result: 

1444 result_state = result[0] 

1445 if result_state in {Result.OLD_PASS, Result.OLD_FAIL, Result.OLD_NEUTRAL}: 

1446 pass 

1447 elif ( 

1448 result_state == Result.FAIL 

1449 and self.result_in_baseline(src, arch)[0] 

1450 in {Result.PASS, Result.NEUTRAL, Result.OLD_PASS, Result.OLD_NEUTRAL} 

1451 and self._now - result[3] > self.options.adt_retry_older_than 

1452 ): 

1453 # We might want to retry this failure, so continue 

1454 pass 

1455 elif not uses_swift: 

1456 # We're done if we don't retrigger and we're not using swift 

1457 return 

1458 elif result_state in {Result.PASS, Result.NEUTRAL}: 

1459 self.logger.debug( 

1460 "%s/%s triggered by %s already known", src, arch, trigger 

1461 ) 

1462 return 

1463 

1464 # Without swift we don't expect new results 

1465 if uses_swift: 

1466 self.logger.info( 

1467 "Checking for new results for failed %s/%s for trigger %s", 

1468 src, 

1469 arch, 

1470 trigger, 

1471 ) 

1472 self.fetch_swift_results(self.options.adt_swift_url, src, arch) 

1473 # do we have one now? 

1474 try: 

1475 self.test_results[trigger][src][arch] 

1476 return 

1477 except KeyError: 

1478 pass 

1479 

1480 self.request_test_if_not_queued(src, arch, trigger, all_triggers, huge=huge) 

1481 

1482 def request_test_if_not_queued( 

1483 self, 

1484 src: str, 

1485 arch: str, 

1486 trigger: str, 

1487 all_triggers: list[str] = [], 

1488 huge: bool = False, 

1489 ) -> None: 

1490 assert self.pending_tests is not None # for type checking 

1491 if not all_triggers: 

1492 all_triggers = [trigger] 

1493 

1494 # Don't re-request if it's already pending 

1495 arch_dict = self.pending_tests.setdefault(trigger, {}).setdefault(src, {}) 

1496 if arch in arch_dict.keys(): 

1497 self.logger.debug( 

1498 "Test %s/%s for %s is already pending, not queueing", src, arch, trigger 

1499 ) 

1500 else: 

1501 self.logger.debug( 

1502 "Requesting %s autopkgtest on %s to verify %s", src, arch, trigger 

1503 ) 

1504 arch_dict[arch] = self._now 

1505 self.send_test_request(src, arch, all_triggers, huge=huge) 

1506 

1507 def result_in_baseline(self, src: str, arch: str) -> list[Any]: 

1508 """Get the result for src on arch in the baseline 

1509 

1510 The baseline is optionally all data or a reference set) 

1511 """ 

1512 

1513 # this requires iterating over all cached results and thus is expensive; 

1514 # cache the results 

1515 try: 

1516 return self.result_in_baseline_cache[src][arch] 

1517 except KeyError: 

1518 pass 

1519 

1520 result_reference: list[Any] = [Result.NONE, None, "", 0] 

1521 if self.options.adt_baseline == "reference": 

1522 if src not in self.suite_info.target_suite.sources: 

1523 return result_reference 

1524 

1525 try: 

1526 result_reference = self.test_results[REF_TRIG][src][arch] 

1527 self.logger.debug( 

1528 "Found result for src %s in reference: %s", 

1529 src, 

1530 result_reference[0].name, 

1531 ) 

1532 except KeyError: 

1533 self.logger.debug( 

1534 "Found NO result for src %s in reference: %s", 

1535 src, 

1536 result_reference[0].name, 

1537 ) 

1538 self.result_in_baseline_cache[src][arch] = deepcopy(result_reference) 

1539 return result_reference 

1540 

1541 result_ever: list[Any] = [Result.FAIL, None, "", 0] 

1542 for srcmap in self.test_results.values(): 

1543 try: 

1544 if srcmap[src][arch][0] != Result.FAIL: 

1545 result_ever = srcmap[src][arch] 

1546 # If we are not looking at a reference run, We don't really 

1547 # care about anything except the status, so we're done 

1548 # once we find a PASS. 

1549 if result_ever[0] == Result.PASS: 

1550 break 

1551 except KeyError: 

1552 pass 

1553 

1554 self.result_in_baseline_cache[src][arch] = deepcopy(result_ever) 

1555 self.logger.debug("Result for src %s ever: %s", src, result_ever[0].name) 

1556 return result_ever 

1557 

1558 def has_test_in_target(self, src: str) -> bool: 

1559 test_in_target = False 

1560 try: 

1561 srcinfo = self.suite_info.target_suite.sources[src] 

1562 if "autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo): 

1563 test_in_target = True 

1564 # AttributeError is only needed for the test suite as 

1565 # srcinfo can be a NoneType 

1566 except (KeyError, AttributeError): 

1567 pass 

1568 

1569 return test_in_target 

1570 

1571 def pkg_test_result( 

1572 self, src: str, ver: str, arch: str, trigger: str 

1573 ) -> tuple[str, str, Optional[str], str]: 

1574 """Get current test status of a particular package 

1575 

1576 Return (status, real_version, run_id, log_url) tuple; status is a key in 

1577 EXCUSES_LABELS. run_id is None if the test is still running. 

1578 """ 

1579 assert self.pending_tests is not None # for type checking 

1580 # determine current test result status 

1581 run_id = None 

1582 try: 

1583 r = self.test_results[trigger][src][arch] 

1584 ver = r[1] 

1585 run_id = r[2] 

1586 

1587 if r[0] in {Result.FAIL, Result.OLD_FAIL}: 

1588 # determine current test result status 

1589 baseline_result = self.result_in_baseline(src, arch)[0] 

1590 

1591 # Special-case triggers from linux-meta*: we cannot compare 

1592 # results against different kernels, as e. g. a DKMS module 

1593 # might work against the default kernel but fail against a 

1594 # different flavor; so for those, ignore the "ever 

1595 # passed" check; FIXME: check against trigsrc only 

1596 if self.options.adt_baseline != "reference" and ( 

1597 trigger.startswith("linux-meta") or trigger.startswith("linux/") 

1598 ): 

1599 baseline_result = Result.FAIL 

1600 

1601 # Check if the autopkgtest (still) exists in the target suite 

1602 test_in_target = self.has_test_in_target(src) 

1603 

1604 if test_in_target and baseline_result in { 

1605 Result.NONE, 

1606 Result.OLD_FAIL, 

1607 Result.OLD_NEUTRAL, 

1608 Result.OLD_PASS, 

1609 }: 

1610 self.request_test_if_not_queued(src, arch, REF_TRIG) 

1611 

1612 result = "REGRESSION" 

1613 if baseline_result in {Result.FAIL, Result.OLD_FAIL}: 

1614 result = "ALWAYSFAIL" 

1615 elif baseline_result == Result.NONE and test_in_target: 1615 ↛ 1616line 1615 didn't jump to line 1616, because the condition on line 1615 was never true

1616 result = "RUNNING-REFERENCE" 

1617 

1618 if self.options.adt_ignore_failure_for_new_tests and not test_in_target: 

1619 result = "ALWAYSFAIL" 

1620 

1621 if self.has_force_badtest(src, ver, arch): 

1622 result = "IGNORE-FAIL" 

1623 else: 

1624 result = r[0].name 

1625 

1626 url = self.format_log_url(src, arch, run_id) 

1627 except KeyError: 

1628 # no result for src/arch; still running? 

1629 if arch in self.pending_tests.get(trigger, {}).get(src, {}).keys(): 1629 ↛ 1644line 1629 didn't jump to line 1644, because the condition on line 1629 was never false

1630 baseline_result = self.result_in_baseline(src, arch)[0] 

1631 if ( 

1632 self.options.adt_ignore_failure_for_new_tests 

1633 and not self.has_test_in_target(src) 

1634 ): 

1635 result = "RUNNING-ALWAYSFAIL" 

1636 elif baseline_result != Result.FAIL and not self.has_force_badtest( 

1637 src, ver, arch 

1638 ): 

1639 result = "RUNNING" 

1640 else: 

1641 result = "RUNNING-ALWAYSFAIL" 

1642 url = self.options.adt_ci_url + "status/pending" 

1643 else: 

1644 raise RuntimeError( 

1645 "Result for %s/%s/%s (triggered by %s) is neither known nor pending!" 

1646 % (src, ver, arch, trigger) 

1647 ) 

1648 

1649 return (result, ver, run_id, url) 

1650 

1651 def has_force_badtest(self, src: str, ver: str, arch: str) -> bool: 

1652 """Check if src/ver/arch has a force-badtest hint""" 

1653 

1654 assert self.hints is not None 

1655 hints = self.hints.search("force-badtest", package=src) 

1656 if hints: 

1657 self.logger.info( 

1658 "Checking hints for %s/%s/%s: %s", 

1659 src, 

1660 ver, 

1661 arch, 

1662 [str(h) for h in hints], 

1663 ) 

1664 for hint in hints: 

1665 if [ 

1666 mi 

1667 for mi in hint.packages 

1668 if mi.architecture in ["source", arch] 

1669 and ( 

1670 mi.version == "all" 

1671 or apt_pkg.version_compare(ver, mi.version) <= 0 # type: ignore[arg-type] 

1672 ) 

1673 ]: 

1674 return True 

1675 

1676 return False 

1677 

1678 def has_built_on_this_arch_or_is_arch_all( 

1679 self, src_data: SourcePackage, arch: str 

1680 ) -> bool: 

1681 """When a source builds arch:all binaries, those binaries are 

1682 added to all architectures and thus the source 'exists' 

1683 everywhere. This function checks if the source has any arch 

1684 specific binaries on this architecture and if not, if it 

1685 has them on any architecture. 

1686 """ 

1687 packages_s_a = self.suite_info.primary_source_suite.binaries[arch] 

1688 has_unknown_binary = False 

1689 for binary_s in src_data.binaries: 

1690 try: 

1691 binary_u = packages_s_a[binary_s.package_name] 

1692 except KeyError: 

1693 # src_data.binaries has all the built binaries, so if 

1694 # we get here, we know that at least one architecture 

1695 # has architecture specific binaries 

1696 has_unknown_binary = True 

1697 continue 

1698 if binary_u.architecture == arch: 

1699 return True 

1700 # If we get here, we have only seen arch:all packages for this 

1701 # arch. 

1702 return not has_unknown_binary