Coverage for britney2/policies/autopkgtest.py: 90%

869 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-06-17 09:00 +0000

1# Copyright (C) 2013 - 2016 Canonical Ltd. 

2# Authors: 

3# Colin Watson <cjwatson@ubuntu.com> 

4# Jean-Baptiste Lallement <jean-baptiste.lallement@canonical.com> 

5# Martin Pitt <martin.pitt@ubuntu.com> 

6 

7# This program is free software; you can redistribute it and/or modify 

8# it under the terms of the GNU General Public License as published by 

9# the Free Software Foundation; either version 2 of the License, or 

10# (at your option) any later version. 

11 

12# This program is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16 

17import calendar 

18import collections 

19import http.client 

20import io 

21import itertools 

22import json 

23import optparse 

24import os 

25import sys 

26import tarfile 

27import time 

28import urllib.parse 

29from collections.abc import Iterator 

30from copy import deepcopy 

31from enum import Enum 

32from functools import lru_cache, total_ordering 

33from typing import TYPE_CHECKING, Any, Optional, cast 

34from urllib.error import HTTPError 

35from urllib.request import urlopen 

36from urllib.response import addinfourl 

37 

38import apt_pkg 

39 

40from britney2 import ( 

41 BinaryPackageId, 

42 PackageId, 

43 SourcePackage, 

44 SuiteClass, 

45 Suites, 

46 TargetSuite, 

47) 

48from britney2.hints import HintAnnotate, HintType 

49from britney2.migrationitem import MigrationItem 

50from britney2.policies import PolicyVerdict 

51from britney2.policies.policy import AbstractBasePolicy 

52from britney2.utils import ( 

53 binaries_from_source_version, 

54 filter_out_faux, 

55 filter_out_faux_gen, 

56 get_dependency_solvers, 

57 iter_except, 

58 parse_option, 

59) 

60 

61if TYPE_CHECKING: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 import amqplib.client_0_8 as amqp 

63 

64 from ..britney import Britney 

65 from ..excuse import Excuse 

66 from ..hints import HintParser 

67 

68 

69@total_ordering 

70class Result(Enum): 

71 PASS = 1 

72 NEUTRAL = 2 

73 FAIL = 3 

74 OLD_PASS = 4 

75 OLD_NEUTRAL = 5 

76 OLD_FAIL = 6 

77 NONE = 7 

78 

79 def __lt__(self, other: "Result") -> bool: 

80 return self.value < other.value 

81 

82 

83EXCUSES_LABELS = { 

84 "PASS": '<span style="background:#87d96c">Pass</span>', 

85 "OLD_PASS": '<span style="background:#87d96c">Pass</span>', 

86 "NEUTRAL": "No tests, superficial or marked flaky", 

87 "OLD_NEUTRAL": "No tests, superficial or marked flaky", 

88 "FAIL": '<span style="background:#ff6666">Failed</span>', 

89 "OLD_FAIL": '<span style="background:#ff6666">Failed</span>', 

90 "ALWAYSFAIL": '<span style="background:#e5c545">Failed (not a regression)</span>', 

91 "REGRESSION": '<span style="background:#ff6666">Regression</span>', 

92 "IGNORE-FAIL": '<span style="background:#e5c545">Ignored failure</span>', 

93 "RUNNING": '<span style="background:#99ddff">Test triggered</span>', 

94 "RUNNING-ALWAYSFAIL": "Test triggered (will not be considered a regression)", 

95 "RUNNING-IGNORE": "Test triggered (failure will be ignored)", 

96 "RUNNING-REFERENCE": '<span style="background:#ff6666">Reference test triggered, but real test failed already</span>', 

97 "DEFERRED": '<span style="background:#99ddff">Test deferred</span>', 

98} 

99 

100REF_TRIG = "migration-reference/0" 

101 

102VERSION_KEY = "britney-autopkgtest-pending-file-version" 

103 

104 

105def srchash(src: str) -> str: 

106 """archive hash prefix for source package""" 

107 

108 if src.startswith("lib"): 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 return src[:4] 

110 else: 

111 return src[0] 

112 

113 

114def added_pkgs_compared_to_target_suite( 

115 package_ids: frozenset[BinaryPackageId], 

116 target_suite: TargetSuite, 

117 *, 

118 invert: bool = False, 

119) -> Iterator[BinaryPackageId]: 

120 if invert: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true

121 pkgs_ids_to_ignore = package_ids - set( 

122 target_suite.which_of_these_are_in_the_suite(package_ids) 

123 ) 

124 names_ignored = {p.package_name for p in pkgs_ids_to_ignore} 

125 else: 

126 names_ignored = { 

127 p.package_name 

128 for p in target_suite.which_of_these_are_in_the_suite(package_ids) 

129 } 

130 yield from (p for p in package_ids if p.package_name not in names_ignored) 

131 

132 

133def all_leaf_results( 

134 test_results: dict[str, dict[str, dict[str, list[Any]]]], 

135) -> Iterator[list[Any]]: 

136 for trigger in test_results.values(): 

137 for arch in trigger.values(): 

138 yield from arch.values() 

139 

140 

141def mark_result_as_old(result: Result) -> Result: 

142 """Convert current result into corresponding old result""" 

143 

144 if result is Result.FAIL: 

145 result = Result.OLD_FAIL 

146 elif result is Result.PASS: 

147 result = Result.OLD_PASS 

148 elif result is Result.NEUTRAL: 148 ↛ 150line 148 didn't jump to line 150 because the condition on line 148 was always true

149 result = Result.OLD_NEUTRAL 

150 return result 

151 

152 

153def concat_bdeps(src_data: SourcePackage) -> str: 

154 """Concatenate build_deps_arch and build_deps_indep""" 

155 return ",".join( 

156 (src_data.build_deps_arch or "", src_data.build_deps_indep or "") 

157 ).strip(",") 

158 

159 

160class AutopkgtestPolicy(AbstractBasePolicy): 

161 """autopkgtest regression policy for source migrations 

162 

163 Run autopkgtests for the excuse and all of its reverse dependencies, and 

164 reject the upload if any of those regress. 

165 """ 

166 

167 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

168 super().__init__( 

169 "autopkgtest", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

170 ) 

171 # tests requested in this and previous runs 

172 # trigger -> src -> [arch] 

173 self.pending_tests: dict[str, dict[str, dict[str, int]]] | None = None 

174 self.pending_tests_file = os.path.join( 

175 self.state_dir, "autopkgtest-pending.json" 

176 ) 

177 self.testsuite_triggers: dict[str, set[str]] = {} 

178 self.result_in_baseline_cache: dict[str, dict[str, list[Any]]] = ( 

179 collections.defaultdict(dict) 

180 ) 

181 

182 self.amqp_file_handle: io.TextIOWrapper | None = None 

183 

184 # Default values for this policy's options 

185 parse_option(options, "adt_baseline") 

186 parse_option(options, "adt_huge", to_int=True) 

187 parse_option(options, "adt_ppas") 

188 parse_option(options, "adt_reference_max_age", day_to_sec=True) 

189 parse_option(options, "adt_pending_max_age", default=5, day_to_sec=True) 

190 parse_option(options, "adt_regression_penalty", default=0, to_int=True) 

191 parse_option(options, "adt_log_url") # see below for defaults 

192 parse_option(options, "adt_retry_url") # see below for defaults 

193 parse_option(options, "adt_retry_older_than", day_to_sec=True) 

194 parse_option(options, "adt_results_cache_age", day_to_sec=True) 

195 parse_option(options, "adt_shared_results_cache") 

196 parse_option(options, "adt_success_bounty", default=0, to_int=True) 

197 parse_option(options, "adt_ignore_failure_for_new_tests", to_bool=True) 

198 

199 # When ADT_RESULTS_CACHE_AGE is smaller than or equal to 

200 # ADT_REFERENCE_MAX_AGE old reference result will be removed from cache 

201 # before the newly scheduled results are in, potentially causing 

202 # additional waiting. For packages like glibc this might cause an 

203 # infinite delay as there will always be a package that's 

204 # waiting. Similarly for ADT_RETRY_OLDER_THAN. 

205 if self.options.adt_results_cache_age <= self.options.adt_reference_max_age: 

206 self.logger.warning( 

207 "Unexpected: ADT_REFERENCE_MAX_AGE bigger than ADT_RESULTS_CACHE_AGE" 

208 ) 

209 if self.options.adt_results_cache_age <= self.options.adt_retry_older_than: 

210 self.logger.warning( 

211 "Unexpected: ADT_RETRY_OLDER_THAN bigger than ADT_RESULTS_CACHE_AGE" 

212 ) 

213 

214 if not self.options.adt_log_url: 214 ↛ 240line 214 didn't jump to line 240 because the condition on line 214 was always true

215 # Historical defaults 

216 if self.options.adt_swift_url.startswith("file://"): 

217 self.options.adt_log_url = os.path.join( 

218 self.options.adt_ci_url, 

219 "data", 

220 "autopkgtest", 

221 self.options.series, 

222 "{arch}", 

223 "{hash}", 

224 "{package}", 

225 "{run_id}", 

226 "log.gz", 

227 ) 

228 else: 

229 self.options.adt_log_url = os.path.join( 

230 self.options.adt_swift_url, 

231 "{swift_container}", 

232 self.options.series, 

233 "{arch}", 

234 "{hash}", 

235 "{package}", 

236 "{run_id}", 

237 "log.gz", 

238 ) 

239 

240 if hasattr(self.options, "adt_retry_url_mech"): 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 self.logger.warning( 

242 "The ADT_RETRY_URL_MECH configuration has been deprecated." 

243 ) 

244 self.logger.warning( 

245 "Instead britney now supports ADT_RETRY_URL for more flexibility." 

246 ) 

247 if self.options.adt_retry_url: 

248 self.logger.error( 

249 "Please remove the ADT_RETRY_URL_MECH as ADT_RETRY_URL will be used." 

250 ) 

251 elif self.options.adt_retry_url_mech == "run_id": 

252 self.options.adt_retry_url = ( 

253 self.options.adt_ci_url + "api/v1/retry/{run_id}" 

254 ) 

255 if not self.options.adt_retry_url: 255 ↛ 272line 255 didn't jump to line 272 because the condition on line 255 was always true

256 # Historical default 

257 self.options.adt_retry_url = ( 

258 self.options.adt_ci_url 

259 + "request.cgi?" 

260 + "release={release}&arch={arch}&package={package}&trigger={trigger}{ppas}" 

261 ) 

262 

263 # results map: trigger -> src -> arch -> [passed, version, run_id, seen] 

264 # - trigger is "source/version" of an unstable package that triggered 

265 # this test run. 

266 # - "passed" is a Result 

267 # - "version" is the package version of "src" of that test 

268 # - "run_id" is an opaque ID that identifies a particular test run for 

269 # a given src/arch. 

270 # - "seen" is an approximate time stamp of the test run. How this is 

271 # deduced depends on the interface used. 

272 self.test_results: dict[str, dict[str, dict[str, list[Any]]]] = {} 

273 if self.options.adt_shared_results_cache: 

274 self.results_cache_file = self.options.adt_shared_results_cache 

275 else: 

276 self.results_cache_file = os.path.join( 

277 self.state_dir, "autopkgtest-results.cache" 

278 ) 

279 

280 try: 

281 self.options.adt_ppas = self.options.adt_ppas.strip().split() 

282 except AttributeError: 

283 self.options.adt_ppas = [] 

284 

285 self.swift_container = "autopkgtest-" + options.series 

286 if self.options.adt_ppas: 

287 self.swift_container += "-" + options.adt_ppas[-1].replace("/", "-") 

288 

289 # restrict adt_arches to architectures we actually run for 

290 self.adt_arches = [] 

291 for arch in self.options.adt_arches.split(): 

292 if arch in self.options.architectures: 

293 self.adt_arches.append(arch) 

294 else: 

295 self.logger.info( 

296 "Ignoring ADT_ARCHES %s as it is not in architectures list", arch 

297 ) 

298 

299 def __del__(self) -> None: 

300 if self.amqp_file_handle: 300 ↛ exitline 300 didn't return from function '__del__' because the condition on line 300 was always true

301 try: 

302 self.amqp_file_handle.close() 

303 except AttributeError: 

304 pass 

305 

306 def register_hints(self, hint_parser: "HintParser") -> None: 

307 hint_parser.register_hint_type( 

308 HintType( 

309 "force-badtest", 

310 versioned=HintAnnotate.OPTIONAL, 

311 architectured=HintAnnotate.OPTIONAL, 

312 ) 

313 ) 

314 hint_parser.register_hint_type(HintType("force-skiptest")) 

315 

316 def initialise(self, britney: "Britney") -> None: 

317 super().initialise(britney) 

318 # We want to use the "current" time stamp in multiple locations 

319 time_now = round(time.time()) 

320 if hasattr(self.options, "fake_runtime"): 

321 time_now = int(self.options.fake_runtime) 

322 self._now = time_now 

323 

324 # local copies for better performance 

325 parse_src_depends = apt_pkg.parse_src_depends 

326 

327 # compute inverse Testsuite-Triggers: map, unifying all series 

328 self.logger.info("Building inverse testsuite_triggers map") 

329 for suite in self.suite_info: 

330 for src, data in suite.sources.items(): 

331 # for now, let's assume that autodep8 uses builddeps (most do) 

332 if ( 

333 self.has_autodep8(data) 

334 and "@builddeps@" not in data.testsuite_triggers 

335 ): 

336 data.testsuite_triggers.append("@builddeps@") 

337 for trigger in data.testsuite_triggers: 

338 if trigger == "@builddeps@": 

339 for arch in self.adt_arches: 

340 for block in parse_src_depends( 

341 concat_bdeps(data), True, arch 

342 ): 

343 self.testsuite_triggers.setdefault( 

344 block[0][0], set() 

345 ).add(src) 

346 else: 

347 self.testsuite_triggers.setdefault(trigger, set()).add(src) 

348 target_suite_name = self.suite_info.target_suite.name 

349 

350 os.makedirs(self.state_dir, exist_ok=True) 

351 self.read_pending_tests() 

352 

353 # read the cached results that we collected so far 

354 if os.path.exists(self.results_cache_file): 

355 with open(self.results_cache_file) as f: 

356 test_results = json.load(f) 

357 self.test_results = self.check_and_upgrade_cache(test_results) 

358 self.logger.info("Read previous results from %s", self.results_cache_file) 

359 else: 

360 self.logger.info( 

361 "%s does not exist, re-downloading all results from swift", 

362 self.results_cache_file, 

363 ) 

364 

365 # read in the new results 

366 if self.options.adt_swift_url.startswith("file://"): 

367 debci_file = self.options.adt_swift_url[7:] 

368 if os.path.exists(debci_file): 

369 with open(debci_file) as f: 

370 test_results = json.load(f) 

371 self.logger.info("Read new results from %s", debci_file) 

372 for res in test_results["results"]: 

373 # if there's no date, the test didn't finish yet 

374 if res["date"] is None: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true

375 continue 

376 test_suite = res["suite"] 

377 if test_suite != target_suite_name: 377 ↛ 379line 377 didn't jump to line 379 because the condition on line 377 was never true

378 # not requested for this target suite, so ignore 

379 continue 

380 triggers = res["trigger"] 

381 if triggers is None: 381 ↛ 383line 381 didn't jump to line 383 because the condition on line 381 was never true

382 # not requested for this policy, so ignore 

383 continue 

384 status = res["status"] 

385 if status is None: 

386 # still running => pending 

387 continue 

388 src = res["package"] 

389 arch = res["arch"] 

390 ver = res["version"] 

391 run_id = str(res["run_id"]) 

392 seen = round( 

393 calendar.timegm( 

394 time.strptime(res["date"][0:-5], "%Y-%m-%dT%H:%M:%S") 

395 ) 

396 ) 

397 for trigger in triggers.split(): 

398 # remove matching test requests 

399 self.remove_from_pending(trigger, src, arch, seen) 

400 if status == "tmpfail": 400 ↛ 402line 400 didn't jump to line 402 because the condition on line 400 was never true

401 # let's see if we still need it 

402 continue 

403 self.logger.debug( 

404 "Results %s %s %s added", src, trigger, status 

405 ) 

406 self.add_trigger_to_results( 

407 trigger, 

408 src, 

409 ver, 

410 arch, 

411 run_id, 

412 seen, 

413 Result[status.upper()], 

414 ) 

415 else: 

416 self.logger.info( 

417 "%s does not exist, no new data will be processed", debci_file 

418 ) 

419 

420 # The cache can contain results against versions of packages that 

421 # are not in any suite anymore. Strip those out, as we don't want 

422 # to use those results. Additionally, old references may be 

423 # filtered out. 

424 if self.options.adt_baseline == "reference": 

425 self.filter_old_results() 

426 

427 # we need sources, binaries, and installability tester, so for now 

428 # remember the whole britney object 

429 self.britney = britney 

430 

431 # Initialize AMQP connection 

432 self.amqp_channel: Optional["amqp.channel.Channel"] = None 

433 self.amqp_file_handle = None 

434 if self.options.dry_run: 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true

435 return 

436 

437 amqp_url = self.options.adt_amqp 

438 

439 if amqp_url.startswith("amqp://"): 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true

440 import amqplib.client_0_8 as amqp 

441 

442 # depending on the setup we connect to a AMQP server 

443 creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False) 

444 self.amqp_con = amqp.Connection( 

445 creds.hostname, userid=creds.username, password=creds.password 

446 ) 

447 self.amqp_channel = self.amqp_con.channel() 

448 self.logger.info("Connected to AMQP server") 

449 elif amqp_url.startswith("file://"): 449 ↛ 454line 449 didn't jump to line 454 because the condition on line 449 was always true

450 # or in Debian and in testing mode, adt_amqp will be a file:// URL 

451 amqp_file = amqp_url[7:] 

452 self.amqp_file_handle = open(amqp_file, "w", 1) 

453 else: 

454 raise RuntimeError("Unknown ADT_AMQP schema %s" % amqp_url.split(":", 1)[0]) 

455 

456 def check_and_upgrade_cache( 

457 self, test_results: dict[str, dict[str, dict[str, list[Any]]]] 

458 ) -> dict[str, dict[str, dict[str, list[Any]]]]: 

459 # Drop results older than ADT_RESULTS_CACHE_AGE 

460 

461 # Collect keys to delete instead of copying the full list of keys and 

462 # changing the dicts on the fly. The lists containing the keys to delete 

463 # only reaches the upper bound if all entries are too old. 

464 to_delete_trigger = [] 

465 for trigger, trigger_data in test_results.items(): 

466 to_delete_pkg = [] 

467 for pkg, results in trigger_data.items(): 

468 to_delete_arch = [] 

469 for arch, arch_result in results.items(): 

470 arch_result[0] = Result[arch_result[0]] 

471 if self._now - arch_result[3] > self.options.adt_results_cache_age: 471 ↛ 472line 471 didn't jump to line 472 because the condition on line 471 was never true

472 to_delete_arch.append(arch) 

473 

474 for arch in to_delete_arch: 474 ↛ 475line 474 didn't jump to line 475 because the loop on line 474 never started

475 del results[arch] 

476 if not results: 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true

477 to_delete_pkg.append(pkg) 

478 for pkg in to_delete_pkg: 478 ↛ 479line 478 didn't jump to line 479 because the loop on line 478 never started

479 del trigger_data[pkg] 

480 if not trigger_data: 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true

481 to_delete_trigger.append(trigger) 

482 for trigger in to_delete_trigger: 482 ↛ 483line 482 didn't jump to line 483 because the loop on line 482 never started

483 del test_results[trigger] 

484 

485 return test_results 

486 

487 def filter_old_results(self) -> None: 

488 """Remove results for old versions and reference runs from the cache. 

489 

490 For now, only delete reference runs. If we delete regular 

491 results after a while, packages with lots of triggered tests may 

492 never have all the results at the same time.""" 

493 

494 test_results = self.test_results 

495 

496 for trigger, trigger_data in test_results.items(): 

497 for src, results in trigger_data.items(): 

498 for arch, result in results.items(): 

499 if ( 

500 trigger == REF_TRIG 

501 and self._now - result[3] > self.options.adt_reference_max_age 

502 ): 

503 result[0] = mark_result_as_old(result[0]) 

504 elif not self.test_version_in_any_suite(src, result[1]): 

505 result[0] = mark_result_as_old(result[0]) 

506 

507 def test_version_in_any_suite(self, src: str, version: str) -> bool: 

508 """Check if the mentioned version of src is found in a suite 

509 

510 To prevent regressions in the target suite, the result should be 

511 from a test with the version of the package in either the source 

512 suite or the target suite. The source suite is also valid, 

513 because due to versioned test dependencies and Breaks/Conflicts 

514 relations, regularly the version in the source suite is used 

515 during testing. 

516 """ 

517 

518 versions = { 

519 suite.sources[src].version 

520 for suite in self.suite_info 

521 if src in suite.sources 

522 } 

523 

524 valid_version = False 

525 for ver in versions: 

526 if apt_pkg.version_compare(ver, version) == 0: 

527 valid_version = True 

528 break 

529 

530 return valid_version 

531 

532 def save_pending_json(self) -> None: 

533 # update the pending tests on-disk cache 

534 self.logger.info( 

535 "Updating pending requested tests in %s" % self.pending_tests_file 

536 ) 

537 # Shallow clone pending_tests as we only modify the toplevel and change its type. 

538 pending_tests: dict[str, Any] = {} 

539 if self.pending_tests: 

540 pending_tests = dict(self.pending_tests) 

541 # Avoid adding if there are no pending results at all (eases testing) 

542 pending_tests[VERSION_KEY] = 1 

543 with open(self.pending_tests_file + ".new", "w") as f: 

544 json.dump(pending_tests, f, indent=2) 

545 os.rename(self.pending_tests_file + ".new", self.pending_tests_file) 

546 

547 def save_state(self, britney: "Britney") -> None: 

548 super().save_state(britney) 

549 

550 # update the results on-disk cache, unless we are using a r/o shared one 

551 if not self.options.adt_shared_results_cache: 

552 self.logger.info("Updating results cache") 

553 test_results = deepcopy(self.test_results) 

554 for result in all_leaf_results(test_results): 

555 result[0] = result[0].name 

556 with open(self.results_cache_file + ".new", "w") as f: 

557 json.dump(test_results, f, indent=2) 

558 os.rename(self.results_cache_file + ".new", self.results_cache_file) 

559 

560 self.save_pending_json() 

561 

562 def format_retry_url( 

563 self, run_id: str | None, arch: str, testsrc: str, trigger: str 

564 ) -> str: 

565 if self.options.adt_ppas: 

566 ppas = "&" + urllib.parse.urlencode( 

567 [("ppa", p) for p in self.options.adt_ppas] 

568 ) 

569 else: 

570 ppas = "" 

571 return cast(str, self.options.adt_retry_url).format( 

572 run_id=run_id, 

573 release=self.options.series, 

574 arch=arch, 

575 package=testsrc, 

576 trigger=urllib.parse.quote_plus(trigger), 

577 ppas=ppas, 

578 ) 

579 

580 def format_log_url(self, testsrc: str, arch: str, run_id: str) -> str: 

581 return cast(str, self.options.adt_log_url).format( 

582 release=self.options.series, 

583 swift_container=self.swift_container, 

584 hash=srchash(testsrc), 

585 package=testsrc, 

586 arch=arch, 

587 run_id=run_id, 

588 ) 

589 

590 def apply_src_policy_impl( 

591 self, 

592 tests_info: dict[str, Any], 

593 source_data_tdist: SourcePackage | None, 

594 source_data_srcdist: SourcePackage, 

595 excuse: "Excuse", 

596 ) -> PolicyVerdict: 

597 

598 # initialize 

599 verdict = PolicyVerdict.PASS 

600 source_name = excuse.item.package 

601 

602 # skip/delay autopkgtests until new package is built somewhere 

603 if not binaries_from_source_version(source_data_srcdist, self.suite_info)[0]: 

604 self.logger.debug( 

605 "%s hasnot been built anywhere, skipping autopkgtest policy", 

606 excuse.name, 

607 ) 

608 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

609 excuse.add_verdict_info(verdict, "Autopkgtest deferred: missing builds") 

610 

611 elif "all" in excuse.missing_builds: 

612 self.logger.debug( 

613 "%s hasnot been built for arch:all, skipping autopkgtest policy", 

614 source_name, 

615 ) 

616 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

617 excuse.add_verdict_info( 

618 verdict, "Autopkgtest deferred: missing arch:all build" 

619 ) 

620 

621 all_self_tests_pass = False 

622 results_info: list[str] = [] 

623 if not verdict.is_rejected: 

624 self.logger.debug("Checking autopkgtests for %s", source_name) 

625 trigger = source_name + "/" + source_data_srcdist.version 

626 

627 # build a (testsrc, testver) → arch → (status, run_id, log_url) map; we trigger/check test 

628 # results per architecture for technical/efficiency reasons, but we 

629 # want to evaluate and present the results by tested source package 

630 # first 

631 pkg_arch_result: dict[ 

632 tuple[str, str], dict[str, tuple[str, str | None, str]] 

633 ] = collections.defaultdict(dict) 

634 for arch in self.adt_arches: 

635 if arch in excuse.missing_builds: 

636 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

637 self.logger.debug( 

638 "%s hasnot been built on arch %s, delay autopkgtest there", 

639 source_name, 

640 arch, 

641 ) 

642 excuse.add_verdict_info( 

643 verdict, 

644 f"Autopkgtest deferred on {arch}: missing arch:{arch} build", 

645 ) 

646 else: 

647 verdict = self.check_and_request_arch( 

648 excuse, 

649 arch, 

650 source_data_srcdist, 

651 pkg_arch_result, 

652 trigger, 

653 verdict, 

654 ) 

655 

656 verdict, results_info, all_self_tests_pass = self.process_pkg_arch_results( 

657 tests_info, excuse, pkg_arch_result, verdict, trigger 

658 ) 

659 

660 verdict = self.finalize_excuse( 

661 excuse, verdict, all_self_tests_pass, results_info 

662 ) 

663 return verdict 

664 

665 def apply_srcarch_policy_impl( 

666 self, 

667 tests_info: dict[str, Any], 

668 arch: str, 

669 source_data_tdist: SourcePackage | None, 

670 source_data_srcdist: SourcePackage, 

671 excuse: "Excuse", 

672 ) -> PolicyVerdict: 

673 

674 assert self.hints is not None # for type checking 

675 # initialize 

676 verdict = PolicyVerdict.PASS 

677 

678 str_excuses = str(excuse.item) 

679 self.logger.debug("Checking autopkgtests for binNMU %s/%s", str_excuses, arch) 

680 

681 if arch not in self.adt_arches: 

682 return verdict 

683 

684 # find the binNMU version 

685 versions = set() 

686 for bin_pkg in source_data_srcdist.binaries: 

687 if bin_pkg.architecture == arch: 

688 if ( 

689 len(parts := bin_pkg.version.split("+b")) > 1 

690 and parts[-1].isdigit() 

691 ): 

692 versions.add(parts[-1]) 

693 else: 

694 self.logger.debug( 

695 "Version %s doesn't end with '+b#', skipping", bin_pkg.version 

696 ) 

697 if not versions or len(versions) > 1: 

698 self.logger.debug("This migration item doesn't look like a binNMU") 

699 return verdict 

700 

701 trigger = str_excuses + "/" + versions.pop() 

702 

703 # While we don't need the arch here, this is common with apply_src_policy_impl() 

704 # (testsrc, testver) → arch → (status, run_id, log_url) map 

705 pkg_arch_result: dict[ 

706 tuple[str, str], dict[str, tuple[str, str | None, str]] 

707 ] = collections.defaultdict(dict) 

708 

709 verdict = self.check_and_request_arch( 

710 excuse, arch, source_data_srcdist, pkg_arch_result, trigger, verdict 

711 ) 

712 

713 verdict, results_info, all_self_tests_pass = self.process_pkg_arch_results( 

714 tests_info, excuse, pkg_arch_result, verdict, trigger 

715 ) 

716 

717 verdict = self.finalize_excuse( 

718 excuse, verdict, all_self_tests_pass, results_info 

719 ) 

720 return verdict 

721 

722 def check_and_request_arch( 

723 self, 

724 excuse: "Excuse", 

725 arch: str, 

726 source_data_srcdist: SourcePackage, 

727 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]], 

728 trigger: str, 

729 verdict: PolicyVerdict, 

730 ) -> PolicyVerdict: 

731 """Perform sanity checks and request test/results when they pass""" 

732 

733 source_name = excuse.item.package 

734 if arch in excuse.policy_info["depends"].get("arch_all_not_installable", []): 

735 self.logger.debug( 

736 "%s is uninstallable on arch %s (which is allowed), not running autopkgtest there", 

737 source_name, 

738 arch, 

739 ) 

740 excuse.addinfo( 

741 f"Autopkgtest skipped on {arch}: not installable (which is allowed)" 

742 ) 

743 elif arch in excuse.unsatisfiable_on_archs and arch not in excuse.policy_info[ 

744 "depends" 

745 ].get("autopkgtest_run_anyways", []): 

746 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

747 self.logger.debug( 

748 "%s is uninstallable on arch %s, not running autopkgtest there", 

749 source_name, 

750 arch, 

751 ) 

752 excuse.addinfo(f"Autopkgtest skipped on {arch}: not installable") 

753 else: 

754 self.request_tests_for_source( 

755 arch, source_data_srcdist, pkg_arch_result, excuse, trigger 

756 ) 

757 

758 return verdict 

759 

760 def process_pkg_arch_results( 

761 self, 

762 tests_info: dict[str, Any], 

763 excuse: "Excuse", 

764 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]], 

765 verdict: PolicyVerdict, 

766 trigger: str, 

767 ) -> tuple[PolicyVerdict, list[str], bool]: 

768 """Calculate verdict based on results and render excuse text""" 

769 

770 source_name = excuse.item.package 

771 all_self_tests_pass = False 

772 results_info = [] 

773 

774 # add test result details to Excuse 

775 cloud_url = self.options.adt_ci_url + "packages/%(h)s/%(s)s/%(r)s/%(a)s" 

776 testver: str | None 

777 for testsrc, testver in sorted(pkg_arch_result): 

778 assert testver is not None 

779 arch_results = pkg_arch_result[(testsrc, testver)] 

780 r = {v[0] for v in arch_results.values()} 

781 if r & {"FAIL", "OLD_FAIL", "REGRESSION"}: 

782 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

783 elif ( 

784 r & {"DEFERRED", "RUNNING", "RUNNING-REFERENCE"} 

785 and not verdict.is_rejected 

786 ): 

787 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

788 # skip version if still running on all arches 

789 if not r - {"DEFERRED", "RUNNING", "RUNNING-ALWAYSFAIL", "RUNNING-IGNORE"}: 

790 testver = None 

791 

792 # A source package is eligible for the bounty if it has tests 

793 # of its own that pass on all tested architectures. 

794 if testsrc == source_name: 

795 excuse.autopkgtest_results = r 

796 if r == {"PASS"}: 

797 all_self_tests_pass = True 

798 

799 if testver: 

800 testname = f"{testsrc}/{testver}" 

801 else: 

802 testname = testsrc 

803 

804 html_archmsg = [] 

805 for arch in sorted(arch_results): 

806 status, run_id, log_url = arch_results[arch] 

807 artifact_url = None 

808 retry_url = None 

809 reference_url = None 

810 reference_retry_url = None 

811 history_url = None 

812 if self.options.adt_ppas: 

813 if log_url.endswith("log.gz"): 

814 artifact_url = log_url.replace("log.gz", "artifacts.tar.gz") 

815 else: 

816 history_url = cloud_url % { 

817 "h": srchash(testsrc), 

818 "s": testsrc, 

819 "r": self.options.series, 

820 "a": arch, 

821 } 

822 if status not in ("DEFERRED", "PASS", "RUNNING", "RUNNING-IGNORE"): 

823 retry_url = self.format_retry_url(run_id, arch, testsrc, trigger) 

824 

825 baseline_result = self.result_in_baseline(testsrc, arch) 

826 if baseline_result and baseline_result[0] is not Result.NONE: 

827 baseline_run_id = str(baseline_result[2]) 

828 reference_url = self.format_log_url( 

829 testsrc, arch, baseline_run_id 

830 ) 

831 if self.options.adt_baseline == "reference": 

832 reference_retry_url = self.format_retry_url( 

833 baseline_run_id, arch, testsrc, REF_TRIG 

834 ) 

835 tests_info.setdefault(testname, {})[arch] = [ 

836 status, 

837 log_url, 

838 history_url, 

839 artifact_url, 

840 retry_url, 

841 ] 

842 

843 # render HTML snippet for testsrc entry for current arch 

844 if history_url: 

845 message = f'<a href="{history_url}">{arch}</a>' 

846 else: 

847 message = arch 

848 message += ': <a href="{}">{}</a>'.format( 

849 log_url, 

850 EXCUSES_LABELS[status], 

851 ) 

852 if retry_url: 

853 message += ( 

854 '<a href="%s" style="text-decoration: none;"> ♻</a>' % retry_url 

855 ) 

856 if reference_url: 

857 message += ' (<a href="%s">reference</a>' % reference_url 

858 if reference_retry_url: 

859 message += ( 

860 '<a href="%s" style="text-decoration: none;"> ♻</a>' 

861 % reference_retry_url 

862 ) 

863 message += ")" 

864 if artifact_url: 

865 message += ' <a href="%s">[artifacts]</a>' % artifact_url 

866 html_archmsg.append(message) 

867 

868 # render HTML line for testsrc entry 

869 # - if action is or may be required 

870 # - for ones own package 

871 if ( 

872 r 

873 - { 

874 "PASS", 

875 "NEUTRAL", 

876 "RUNNING-ALWAYSFAIL", 

877 "ALWAYSFAIL", 

878 "IGNORE-FAIL", 

879 } 

880 or testsrc == source_name 

881 ): 

882 if testver: 

883 pkg = '<a href="#{0}">{0}</a>/{1}'.format(testsrc, testver) 

884 else: 

885 pkg = '<a href="#{0}">{0}</a>'.format(testsrc) 

886 results_info.append( 

887 "Autopkgtest for {}: {}".format(pkg, ", ".join(html_archmsg)) 

888 ) 

889 

890 return (verdict, results_info, all_self_tests_pass) 

891 

892 def finalize_excuse( 

893 self, 

894 excuse: "Excuse", 

895 verdict: PolicyVerdict, 

896 all_self_tests_pass: bool, 

897 results_info: list[str], 

898 ) -> PolicyVerdict: 

899 """Updates excuses and verdict for hints and bounty/penalty config 

900 

901 Given the verdict so far, hints and configuration, the verdict may be 

902 updated. Depending of the end verdict, the content of results_info is 

903 added as info or as excuse. 

904 """ 

905 

906 package = excuse.item.package 

907 version = excuse.item.version 

908 

909 assert self.hints is not None # for type checking 

910 if verdict.is_rejected: 

911 # check for force-skiptest hint 

912 if ( 

913 hint := self.hints.search_first( 

914 "force-skiptest", 

915 package=package, 

916 version=version, 

917 ) 

918 ) is not None: 

919 excuse.addreason("skiptest") 

920 excuse.addinfo( 

921 "Not waiting for autopkgtest results and failures are " 

922 f"ignored because of hint by {hint.user}" 

923 ) 

924 verdict = PolicyVerdict.PASS_HINTED 

925 else: 

926 excuse.addreason("autopkgtest") 

927 

928 if ( 

929 self.options.adt_success_bounty 

930 and verdict is PolicyVerdict.PASS 

931 and all_self_tests_pass 

932 ): 

933 excuse.add_bounty("autopkgtest", self.options.adt_success_bounty) 

934 if self.options.adt_regression_penalty and verdict in { 

935 PolicyVerdict.REJECTED_PERMANENTLY, 

936 PolicyVerdict.REJECTED_TEMPORARILY, 

937 }: 

938 if self.options.adt_regression_penalty > 0: 938 ↛ 941line 938 didn't jump to line 941 because the condition on line 938 was always true

939 excuse.add_penalty("autopkgtest", self.options.adt_regression_penalty) 

940 # In case we give penalties instead of blocking, we must always pass 

941 verdict = PolicyVerdict.PASS 

942 for i in results_info: 

943 if verdict.is_rejected: 

944 excuse.add_verdict_info(verdict, i) 

945 else: 

946 excuse.addinfo(i) 

947 

948 return verdict 

949 

950 @staticmethod 

951 def has_autodep8(srcinfo: SourcePackage) -> bool: 

952 """Check if package is covered by autodep8 

953 

954 srcinfo is an item from self.britney.sources 

955 """ 

956 # autodep8? 

957 for t in srcinfo.testsuite: 

958 if t.startswith("autopkgtest-pkg"): 

959 return True 

960 

961 return False 

962 

963 def request_tests_for_source( 

964 self, 

965 arch: str, 

966 source_data_srcdist: SourcePackage, 

967 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]], 

968 excuse: "Excuse", 

969 trigger: str, 

970 ) -> None: 

971 pkg_universe = self.britney.pkg_universe 

972 target_suite = self.suite_info.target_suite 

973 source_suite = excuse.item.suite 

974 sources_t = target_suite.sources 

975 sources_s = excuse.item.suite.sources 

976 packages_s_a = excuse.item.suite.binaries[arch] 

977 source_name = excuse.item.package 

978 source_version = source_data_srcdist.version 

979 # request tests (unless they were already requested earlier or have a result) 

980 tests = self.tests_for_source(source_name, source_version, arch, excuse) 

981 is_huge = len(tests) > self.options.adt_huge 

982 

983 # local copies for better performance 

984 parse_src_depends = apt_pkg.parse_src_depends 

985 

986 # Here we figure out what is required from the source suite 

987 # for the test to install successfully. 

988 # 

989 # The ImplicitDependencyPolicy does a similar calculation, but 

990 # if I (elbrus) understand correctly, only in the reverse 

991 # dependency direction. We are doing something similar here 

992 # but in the dependency direction (note: this code is older). 

993 # We use the ImplicitDependencyPolicy result for the reverse 

994 # dependencies and we keep the code below for the 

995 # dependencies. Using the ImplicitDependencyPolicy results 

996 # also in the reverse direction seems to require quite some 

997 # reorganisation to get that information available here, as in 

998 # the current state only the current excuse is available here 

999 # and the required other excuses may not be calculated yet. 

1000 # 

1001 # Loop over all binary packages from trigger and 

1002 # recursively look up which *versioned* dependencies are 

1003 # only satisfied in the source suite. 

1004 # 

1005 # For all binaries found, look up which packages they 

1006 # break/conflict with in the target suite, but not in the 

1007 # source suite. The main reason to do this is to cover test 

1008 # dependencies, so we will check Testsuite-Triggers as 

1009 # well. 

1010 # 

1011 # OI: do we need to do the first check in a smart way 

1012 # (i.e. only for the packages that are actually going to be 

1013 # installed) for the breaks/conflicts set as well, i.e. do 

1014 # we need to check if any of the packages that we now 

1015 # enforce being from the source suite, actually have new 

1016 # versioned depends and new breaks/conflicts. 

1017 # 

1018 # For all binaries found, add the set of unique source 

1019 # packages to the list of triggers. 

1020 

1021 bin_triggers: set[PackageId] = set() 

1022 bin_new = filter_out_faux(source_data_srcdist.binaries) 

1023 # For each build-depends block (if any) check if the first alternative 

1024 # is satisfiable in the target suite. If not, add it to the initial set 

1025 # used for checking. 

1026 for block in parse_src_depends(concat_bdeps(source_data_srcdist), True, arch): 

1027 if not get_dependency_solvers( 

1028 [block[0]], 

1029 target_suite.binaries[arch], 

1030 target_suite.provides_table[arch], 

1031 build_depends=True, 

1032 ) and ( 

1033 solvers := get_dependency_solvers( 

1034 [block[0]], 

1035 packages_s_a, 

1036 source_suite.provides_table[arch], 

1037 build_depends=True, 

1038 ) 

1039 ): 

1040 bin_new.add(solvers[0].pkg_id) 

1041 for n_binary in iter_except(bin_new.pop, KeyError): 

1042 if n_binary in bin_triggers: 

1043 continue 

1044 bin_triggers.add(n_binary) 

1045 

1046 # Check if there is a dependency that is not 

1047 # available in the target suite. 

1048 # We add slightly too much here, because new binaries 

1049 # will also show up, but they are already properly 

1050 # installed. Nevermind. 

1051 depends = pkg_universe.dependencies_of(n_binary) 

1052 # depends is a frozenset{frozenset{BinaryPackageId, ..}} 

1053 for deps_of_bin in depends: 

1054 if target_suite.any_of_these_are_in_the_suite(deps_of_bin): 

1055 # if any of the alternative dependencies is already 

1056 # satisfied in the target suite, we can just ignore it 

1057 continue 

1058 # We'll figure out which version later 

1059 bin_new.update( 

1060 added_pkgs_compared_to_target_suite(deps_of_bin, target_suite) 

1061 ) 

1062 

1063 # Check if the package breaks/conflicts anything. We might 

1064 # be adding slightly too many source packages due to the 

1065 # check here as a binary package that is broken may be 

1066 # coming from a different source package in the source 

1067 # suite. Nevermind. 

1068 bin_broken = set() 

1069 for t_binary in bin_triggers: 

1070 # broken is a frozenset{BinaryPackageId, ..} 

1071 broken = pkg_universe.negative_dependencies_of( 

1072 cast(BinaryPackageId, t_binary) 

1073 ) 

1074 broken_in_target = { 

1075 p.package_name 

1076 for p in target_suite.which_of_these_are_in_the_suite(broken) 

1077 } 

1078 broken_in_source = { 

1079 p.package_name 

1080 for p in source_suite.which_of_these_are_in_the_suite(broken) 

1081 } 

1082 # We want packages with a newer version in the source suite that 

1083 # no longer has the conflict. This is an approximation 

1084 broken_filtered = { 

1085 p 

1086 for p in broken 

1087 if p.package_name in broken_in_target 

1088 and p.package_name not in broken_in_source 

1089 } 

1090 # We add the version in the target suite, but the code below will 

1091 # change it to the version in the source suite 

1092 bin_broken.update(broken_filtered) 

1093 bin_triggers.update(bin_broken) 

1094 

1095 # The ImplicitDependencyPolicy also found packages that need 

1096 # to migrate together, so add them to the triggers too. 

1097 for bin_implicit in excuse.depends_packages_flattened: 

1098 if bin_implicit.architecture == arch: 

1099 bin_triggers.add(bin_implicit) 

1100 

1101 triggers = set() 

1102 for t_binary2 in bin_triggers: 

1103 if t_binary2.architecture == arch: 

1104 try: 

1105 source_of_bin = packages_s_a[t_binary2.package_name].source 

1106 # If the version in the target suite is the same, don't add a trigger. 

1107 # Note that we looked up the source package in the source suite. 

1108 # If it were a different source package in the target suite, however, then 

1109 # we would not have this source package in the same version anyway. 

1110 # 

1111 # binNMU's exist, so let's also check if t_binary2 exists 

1112 # in the target suite if the sources are the same. 

1113 if ( 

1114 sources_t.get(source_of_bin, None) is None 

1115 or sources_s[source_of_bin].version 

1116 != sources_t[source_of_bin].version 

1117 or not target_suite.any_of_these_are_in_the_suite( 

1118 {cast(BinaryPackageId, t_binary2)} 

1119 ) 

1120 ): 

1121 triggers.add( 

1122 source_of_bin + "/" + sources_s[source_of_bin].version 

1123 ) 

1124 except KeyError: 

1125 # Apparently the package was removed from 

1126 # unstable e.g. if packages are replaced 

1127 # (e.g. -dbg to -dbgsym) 

1128 pass 

1129 if t_binary2 not in source_data_srcdist.binaries: 

1130 for tdep_src in self.testsuite_triggers.get( 

1131 t_binary2.package_name, set() 

1132 ): 

1133 try: 

1134 # Only add trigger if versions in the target and source suites are different 

1135 if ( 1135 ↛ 1130line 1135 didn't jump to line 1130

1136 sources_t.get(tdep_src, None) is None 

1137 or sources_s[tdep_src].version 

1138 != sources_t[tdep_src].version 

1139 ): 

1140 triggers.add( 

1141 tdep_src + "/" + sources_s[tdep_src].version 

1142 ) 

1143 except KeyError: 

1144 # Apparently the source was removed from 

1145 # unstable (testsuite_triggers are unified 

1146 # over all suites) 

1147 pass 

1148 source_trigger = source_name + "/" + source_version 

1149 triggers.discard(source_trigger) 

1150 triggers_list = sorted(list(triggers)) 

1151 triggers_list.insert(0, trigger) 

1152 

1153 impl_pids = excuse.policy_info.get("implicit-deps", {}).get( 

1154 "broken-binaries", [] 

1155 ) 

1156 for testsrc, testver in tests: 

1157 # Not if binaries from testsrc are not installable 

1158 skip = False 

1159 for bpid_s in impl_pids: 

1160 bpid = BinaryPackageId(*bpid_s.split("/")) 

1161 if ( 

1162 bpid.architecture == arch 

1163 and testsrc == target_suite.all_binaries_in_suite[bpid].source 

1164 ): 

1165 skip = True 

1166 break 

1167 if skip: 

1168 pkg_arch_result[(testsrc, testver)][arch] = ("DEFERRED", None, "") 

1169 else: 

1170 self.pkg_test_request(testsrc, arch, triggers_list, huge=is_huge) 

1171 result, real_ver, run_id, url = self.pkg_test_result( 

1172 testsrc, testver, arch, trigger 

1173 ) 

1174 pkg_arch_result[(testsrc, real_ver)][arch] = (result, run_id, url) 

1175 

1176 def tests_for_source( 

1177 self, src: str, ver: str, arch: str, excuse: "Excuse" 

1178 ) -> list[tuple[str, str]]: 

1179 """Iterate over all tests that should be run for given source and arch""" 

1180 

1181 source_suite = self.suite_info.primary_source_suite 

1182 target_suite = self.suite_info.target_suite 

1183 sources_info = target_suite.sources 

1184 binaries_info = target_suite.binaries[arch] 

1185 

1186 reported_pkgs = set() 

1187 

1188 tests = [] 

1189 

1190 # Debian doesn't have linux-meta, but Ubuntu does 

1191 # for linux themselves we don't want to trigger tests -- these should 

1192 # all come from linux-meta*. A new kernel ABI without a corresponding 

1193 # -meta won't be installed and thus we can't sensibly run tests against 

1194 # it. 

1195 if ( 1195 ↛ 1199line 1195 didn't jump to line 1199

1196 src.startswith("linux") 

1197 and src.replace("linux", "linux-meta") in sources_info 

1198 ): 

1199 return [] 

1200 

1201 # we want to test the package itself, if it still has a test in unstable 

1202 # but only if the package actually exists on this arch 

1203 srcinfo = source_suite.sources[src] 

1204 if ("autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo)) and len( 

1205 excuse.packages[arch] 

1206 ) > 0: 

1207 reported_pkgs.add(src) 

1208 tests.append((src, ver)) 

1209 

1210 extra_bins = [] 

1211 # Debian doesn't have linux-meta, but Ubuntu does 

1212 # Hack: For new kernels trigger all DKMS packages by pretending that 

1213 # linux-meta* builds a "dkms" binary as well. With that we ensure that we 

1214 # don't regress DKMS drivers with new kernel versions. 

1215 if src.startswith("linux-meta"): 

1216 # does this have any image on this arch? 

1217 for pkg_id in srcinfo.binaries: 

1218 if pkg_id.architecture == arch and "-image" in pkg_id.package_name: 

1219 try: 

1220 extra_bins.append(binaries_info["dkms"].pkg_id) 

1221 except KeyError: 

1222 pass 

1223 

1224 if not self.has_built_on_this_arch_or_is_arch_all(srcinfo, arch): 

1225 return [] 

1226 

1227 pkg_universe = self.britney.pkg_universe 

1228 # plus all direct reverse dependencies and test triggers of its 

1229 # binaries which have an autopkgtest 

1230 for binary in itertools.chain(srcinfo.binaries, extra_bins): 

1231 for rdep in filter_out_faux_gen( 

1232 pkg_universe.reverse_dependencies_of(binary) 

1233 ): 

1234 try: 

1235 rdep_src = binaries_info[rdep.package_name].source 

1236 # Don't re-trigger the package itself here; this should 

1237 # have been done above if the package still continues to 

1238 # have an autopkgtest in unstable. 

1239 if rdep_src == src: 

1240 continue 

1241 except KeyError: 

1242 continue 

1243 

1244 rdep_src_info = sources_info[rdep_src] 

1245 if "autopkgtest" in rdep_src_info.testsuite or self.has_autodep8( 

1246 rdep_src_info 

1247 ): 

1248 if rdep_src not in reported_pkgs: 

1249 tests.append((rdep_src, rdep_src_info.version)) 

1250 reported_pkgs.add(rdep_src) 

1251 

1252 for tdep_src in self.testsuite_triggers.get(binary.package_name, set()): 

1253 if tdep_src not in reported_pkgs: 

1254 try: 

1255 tdep_src_info = sources_info[tdep_src] 

1256 except KeyError: 

1257 continue 

1258 if "autopkgtest" in tdep_src_info.testsuite or self.has_autodep8( 1258 ↛ 1252line 1258 didn't jump to line 1252 because the condition on line 1258 was always true

1259 tdep_src_info 

1260 ): 

1261 for pkg_id in tdep_src_info.binaries: 1261 ↛ 1252line 1261 didn't jump to line 1252 because the loop on line 1261 didn't complete

1262 if pkg_id.architecture == arch: 

1263 tests.append((tdep_src, tdep_src_info.version)) 

1264 reported_pkgs.add(tdep_src) 

1265 break 

1266 

1267 tests.sort(key=lambda s_v: s_v[0]) 

1268 return tests 

1269 

1270 def read_pending_tests(self) -> None: 

1271 """Read pending test requests from previous britney runs 

1272 

1273 Initialize self.pending_tests with that data. 

1274 """ 

1275 assert self.pending_tests is None, "already initialized" 

1276 if not os.path.exists(self.pending_tests_file): 

1277 self.logger.info( 

1278 "No %s, starting with no pending tests", self.pending_tests_file 

1279 ) 

1280 self.pending_tests = {} 

1281 return 

1282 with open(self.pending_tests_file) as f: 

1283 self.pending_tests = json.load(f) 

1284 if VERSION_KEY in self.pending_tests: 

1285 del self.pending_tests[VERSION_KEY] 

1286 

1287 # Collect keys to delete instead of copying the full list of keys and 

1288 # changing the dicts on the fly. The lists containing the keys to delete 

1289 # only reaches the upper bound if all entries are too old. 

1290 to_delete_trigger = [] 

1291 for trigger, trigger_results in self.pending_tests.items(): 

1292 to_delete_pkg = [] 

1293 for pkg, arch_dict in trigger_results.items(): 

1294 to_delete_arch = [] 

1295 for arch, pending_test in arch_dict.items(): 

1296 if self._now - pending_test > self.options.adt_pending_max_age: 

1297 to_delete_arch.append(arch) 

1298 

1299 for key in to_delete_arch: 

1300 del arch_dict[key] 

1301 if not arch_dict: 

1302 to_delete_pkg.append(pkg) 

1303 for key in to_delete_pkg: 

1304 del trigger_results[key] 

1305 if not trigger_results: 

1306 to_delete_trigger.append(trigger) 

1307 for key in to_delete_trigger: 

1308 del self.pending_tests[key] 

1309 else: 

1310 # Migration code: 

1311 for trigger_data in self.pending_tests.values(): 1311 ↛ 1312line 1311 didn't jump to line 1312 because the loop on line 1311 never started

1312 for pkg, arch_list in trigger_data.items(): 

1313 trigger_data[pkg] = {} 

1314 for arch in arch_list: 

1315 trigger_data[pkg][arch] = self._now 

1316 

1317 self.logger.info( 

1318 "Read pending requested tests from %s", self.pending_tests_file 

1319 ) 

1320 self.logger.debug("%s", self.pending_tests) 

1321 

1322 # this requires iterating over all triggers and thus is expensive; 

1323 # cache the results 

1324 @lru_cache(None) 

1325 def latest_run_for_package(self, src: str, arch: str) -> str: 

1326 """Return latest run ID for src on arch""" 

1327 

1328 latest_run_id = "" 

1329 for srcmap in self.test_results.values(): 

1330 try: 

1331 run_id = srcmap[src][arch][2] 

1332 except KeyError: 

1333 continue 

1334 if run_id > latest_run_id: 

1335 latest_run_id = run_id 

1336 return latest_run_id 

1337 

1338 def urlopen_retry(self, url: str) -> http.client.HTTPResponse | addinfourl: 

1339 """A urlopen() that retries on time outs or errors""" 

1340 

1341 exc: Exception 

1342 for retry in range(5): 1342 ↛ 1363line 1342 didn't jump to line 1363 because the loop on line 1342 didn't complete

1343 try: 

1344 req = urlopen(url, timeout=30) 

1345 code = req.getcode() 

1346 if not code or 200 <= code < 300: 1346 ↛ 1342line 1346 didn't jump to line 1342 because the condition on line 1346 was always true

1347 return req # type: ignore[no-any-return] 

1348 except TimeoutError as e: 1348 ↛ 1349line 1348 didn't jump to line 1349 because the exception caught by line 1348 didn't happen

1349 self.logger.info( 

1350 "Timeout downloading '%s', will retry %d more times." 

1351 % (url, 5 - retry - 1) 

1352 ) 

1353 exc = e 

1354 except HTTPError as e: 

1355 if e.code not in (503, 502): 1355 ↛ 1357line 1355 didn't jump to line 1357 because the condition on line 1355 was always true

1356 raise 

1357 self.logger.info( 

1358 "Caught error %d downloading '%s', will retry %d more times." 

1359 % (e.code, url, 5 - retry - 1) 

1360 ) 

1361 exc = e 

1362 else: 

1363 raise exc 

1364 

1365 @lru_cache(None) 

1366 def fetch_swift_results(self, swift_url: str, src: str, arch: str) -> None: 

1367 """Download new results for source package/arch from swift""" 

1368 

1369 # prepare query: get all runs with a timestamp later than the latest 

1370 # run_id for this package/arch; '@' is at the end of each run id, to 

1371 # mark the end of a test run directory path 

1372 # example: <autopkgtest-wily>wily/amd64/libp/libpng/20150630_054517@/result.tar 

1373 query = { 

1374 "delimiter": "@", 

1375 "prefix": f"{self.options.series}/{arch}/{srchash(src)}/{src}/", 

1376 } 

1377 

1378 # determine latest run_id from results 

1379 if not self.options.adt_shared_results_cache: 

1380 latest_run_id = self.latest_run_for_package(src, arch) 

1381 if latest_run_id: 

1382 query["marker"] = query["prefix"] + latest_run_id 

1383 

1384 # request new results from swift 

1385 url = os.path.join(swift_url, self.swift_container) 

1386 url += "?" + urllib.parse.urlencode(query) 

1387 f = None 

1388 try: 

1389 f = self.urlopen_retry(url) 

1390 if f.getcode() == 200: 

1391 result_paths = f.read().decode().strip().splitlines() 

1392 elif f.getcode() == 204: # No content 1392 ↛ 1398line 1392 didn't jump to line 1398 because the condition on line 1392 was always true

1393 result_paths = [] 

1394 else: 

1395 # we should not ever end up here as we expect a HTTPError in 

1396 # other cases; e. g. 3XX is something that tells us to adjust 

1397 # our URLS, so fail hard on those 

1398 raise NotImplementedError( 

1399 "fetch_swift_results(%s): cannot handle HTTP code %r" 

1400 % (url, f.getcode()) 

1401 ) 

1402 except OSError as e: 

1403 # 401 "Unauthorized" is swift's way of saying "container does not exist" 

1404 if getattr(e, "code", -1) == 401: 1404 ↛ 1413line 1404 didn't jump to line 1413 because the condition on line 1404 was always true

1405 self.logger.info( 

1406 "fetch_swift_results: %s does not exist yet or is inaccessible", url 

1407 ) 

1408 return 

1409 # Other status codes are usually a transient 

1410 # network/infrastructure failure. Ignoring this can lead to 

1411 # re-requesting tests which we already have results for, so 

1412 # fail hard on this and let the next run retry. 

1413 self.logger.error("Failure to fetch swift results from %s: %s", url, str(e)) 

1414 sys.exit(1) 

1415 finally: 

1416 if f is not None: 1416 ↛ 1419line 1416 didn't jump to line 1419 because the condition on line 1416 was always true

1417 f.close() 1417 ↛ exitline 1417 didn't return from function 'fetch_swift_results' because the return on line 1408 wasn't executed

1418 

1419 for p in result_paths: 

1420 self.fetch_one_result( 

1421 os.path.join(swift_url, self.swift_container, p, "result.tar"), 

1422 src, 

1423 arch, 

1424 ) 

1425 

1426 def fetch_one_result(self, url: str, src: str, arch: str) -> None: 

1427 """Download one result URL for source/arch 

1428 

1429 Remove matching pending_tests entries. 

1430 """ 

1431 f = None 

1432 try: 

1433 f = self.urlopen_retry(url) 

1434 if f.getcode() == 200: 1434 ↛ 1437line 1434 didn't jump to line 1437 because the condition on line 1434 was always true

1435 tar_bytes = io.BytesIO(f.read()) 

1436 else: 

1437 raise NotImplementedError( 

1438 "fetch_one_result(%s): cannot handle HTTP code %r" 

1439 % (url, f.getcode()) 

1440 ) 

1441 except OSError as err: 

1442 self.logger.error("Failure to fetch %s: %s", url, str(err)) 

1443 # we tolerate "not found" (something went wrong on uploading the 

1444 # result), but other things indicate infrastructure problems 

1445 if getattr(err, "code", -1) == 404: 

1446 return 

1447 sys.exit(1) 

1448 finally: 

1449 if f is not None: 1449 ↛ exit,   1449 ↛ 14512 missed branches: 1) line 1449 didn't return from function 'fetch_one_result' because the return on line 1446 wasn't executed, 2) line 1449 didn't jump to line 1451 because the condition on line 1449 was always true

1450 f.close() 1450 ↛ exitline 1450 didn't return from function 'fetch_one_result' because the return on line 1446 wasn't executed

1451 try: 

1452 with tarfile.open(None, "r", tar_bytes) as tar: 

1453 exitcode = int(tar.extractfile("exitcode").read().strip()) # type: ignore[union-attr] 

1454 srcver = tar.extractfile("testpkg-version").read().decode().strip() # type: ignore[union-attr] 

1455 ressrc, ver = srcver.split() 

1456 testinfo = json.loads(tar.extractfile("testinfo.json").read().decode()) # type: ignore[union-attr] 

1457 except (KeyError, ValueError, tarfile.TarError) as err: 

1458 self.logger.error("%s is damaged, ignoring: %s", url, str(err)) 

1459 # ignore this; this will leave an orphaned request in autopkgtest-pending.json 

1460 # and thus require manual retries after fixing the tmpfail, but we 

1461 # can't just blindly attribute it to some pending test. 

1462 return 

1463 

1464 if src != ressrc: 1464 ↛ 1465line 1464 didn't jump to line 1465 because the condition on line 1464 was never true

1465 self.logger.error( 

1466 "%s is a result for package %s, but expected package %s", 

1467 url, 

1468 ressrc, 

1469 src, 

1470 ) 

1471 return 

1472 

1473 # parse recorded triggers in test result 

1474 for e in testinfo.get("custom_environment", []): 1474 ↛ 1479line 1474 didn't jump to line 1479 because the loop on line 1474 didn't complete

1475 if e.startswith("ADT_TEST_TRIGGERS="): 1475 ↛ 1474line 1475 didn't jump to line 1474 because the condition on line 1475 was always true

1476 result_triggers = [i for i in e.split("=", 1)[1].split() if "/" in i] 

1477 break 

1478 else: 

1479 self.logger.error("%s result has no ADT_TEST_TRIGGERS, ignoring") 

1480 return 

1481 

1482 run_id = os.path.basename(os.path.dirname(url)) 

1483 seen = round(calendar.timegm(time.strptime(run_id, "%Y%m%d_%H%M%S@"))) 

1484 # allow some skipped tests, but nothing else 

1485 if exitcode in [0, 2]: 

1486 result = Result.PASS 

1487 elif exitcode == 8: 1487 ↛ 1488line 1487 didn't jump to line 1488 because the condition on line 1487 was never true

1488 result = Result.NEUTRAL 

1489 else: 

1490 result = Result.FAIL 

1491 

1492 self.logger.info( 

1493 "Fetched test result for %s/%s/%s %s (triggers: %s): %s", 

1494 src, 

1495 ver, 

1496 arch, 

1497 run_id, 

1498 result_triggers, 

1499 result.name.lower(), 

1500 ) 

1501 

1502 # remove matching test requests 

1503 for trigger in result_triggers: 

1504 self.remove_from_pending(trigger, src, arch) 

1505 

1506 # add this result 

1507 for trigger in result_triggers: 

1508 self.add_trigger_to_results(trigger, src, ver, arch, run_id, seen, result) 

1509 

1510 def remove_from_pending( 

1511 self, trigger: str, src: str, arch: str, timestamp: int = sys.maxsize 

1512 ) -> None: 

1513 assert self.pending_tests is not None # for type checking 

1514 try: 

1515 arch_dict = self.pending_tests[trigger][src] 

1516 if timestamp < arch_dict[arch]: 

1517 # The result is from before the moment of scheduling, so it's 

1518 # not the one we're waiting for 

1519 return 

1520 del arch_dict[arch] 

1521 if not arch_dict: 

1522 del self.pending_tests[trigger][src] 

1523 if not self.pending_tests[trigger]: 

1524 del self.pending_tests[trigger] 

1525 self.logger.debug( 

1526 "-> matches pending request %s/%s for trigger %s", src, arch, trigger 

1527 ) 

1528 except KeyError: 

1529 self.logger.debug( 

1530 "-> does not match any pending request for %s/%s", src, arch 

1531 ) 

1532 

1533 def add_trigger_to_results( 

1534 self, 

1535 trigger: str, 

1536 src: str, 

1537 ver: str, 

1538 arch: str, 

1539 run_id: str, 

1540 timestamp: int, 

1541 status_to_add: Result, 

1542 ) -> None: 

1543 # Ensure that we got a new enough version 

1544 parts = trigger.split("/") 

1545 match len(parts): 

1546 case 2: 

1547 trigsrc, trigver = parts 

1548 case 4: 1548 ↛ 1550line 1548 didn't jump to line 1550 because the pattern on line 1548 always matched

1549 trigsrc, trigarch, trigver, rebuild = parts 

1550 case _: 

1551 self.logger.info("Ignoring invalid test trigger %s", trigger) 

1552 return 

1553 if trigsrc == src and apt_pkg.version_compare(ver, trigver) < 0: 1553 ↛ 1554line 1553 didn't jump to line 1554 because the condition on line 1553 was never true

1554 self.logger.debug( 

1555 "test trigger %s, but run for older version %s, ignoring", trigger, ver 

1556 ) 

1557 return 

1558 

1559 stored_result = ( 

1560 self.test_results.setdefault(trigger, {}) 

1561 .setdefault(src, {}) 

1562 .setdefault(arch, [Result.FAIL, None, "", 0]) 

1563 ) 

1564 

1565 # reruns shouldn't flip the result from PASS or NEUTRAL to 

1566 # FAIL, so remember the most recent version of the best result 

1567 # we've seen. Except for reference updates, which we always 

1568 # want to update with the most recent result. The result data 

1569 # may not be ordered by timestamp, so we need to check time. 

1570 update = False 

1571 if self.options.adt_baseline == "reference" and trigger == REF_TRIG: 

1572 if stored_result[3] < timestamp: 

1573 update = True 

1574 elif status_to_add < stored_result[0]: 

1575 update = True 

1576 elif status_to_add == stored_result[0] and stored_result[3] < timestamp: 

1577 update = True 

1578 

1579 if update: 

1580 stored_result[0] = status_to_add 

1581 stored_result[1] = ver 

1582 stored_result[2] = run_id 

1583 stored_result[3] = timestamp 

1584 

1585 def send_test_request( 

1586 self, src: str, arch: str, triggers: list[str], huge: bool = False 

1587 ) -> None: 

1588 """Send out AMQP request for testing src/arch for triggers 

1589 

1590 If huge is true, then the request will be put into the -huge instead of 

1591 normal queue. 

1592 """ 

1593 if self.options.dry_run: 1593 ↛ 1594line 1593 didn't jump to line 1594 because the condition on line 1593 was never true

1594 return 

1595 

1596 params: dict[str, Any] = {"triggers": triggers} 

1597 if self.options.adt_ppas: 

1598 params["ppas"] = self.options.adt_ppas 

1599 qname = f"debci-ppa-{self.options.series}-{arch}" 

1600 elif huge: 

1601 qname = f"debci-huge-{self.options.series}-{arch}" 

1602 else: 

1603 qname = f"debci-{self.options.series}-{arch}" 

1604 params["submit-time"] = time.strftime("%Y-%m-%d %H:%M:%S%z", time.gmtime()) 

1605 

1606 if self.amqp_channel: 1606 ↛ 1607line 1606 didn't jump to line 1607 because the condition on line 1606 was never true

1607 self.amqp_channel.basic_publish( 

1608 amqp.Message( 

1609 src + "\n" + json.dumps(params), delivery_mode=2 

1610 ), # persistent 

1611 routing_key=qname, 

1612 ) 

1613 # we save pending.json with every request, so that if britney 

1614 # crashes we don't re-request tests. This is only needed when using 

1615 # real amqp, as with file-based submission the pending tests are 

1616 # returned by debci along with the results each run. 

1617 self.save_pending_json() 

1618 else: 

1619 # for file-based submission, triggers are space separated 

1620 params["triggers"] = [" ".join(params["triggers"])] 

1621 assert self.amqp_file_handle 

1622 self.amqp_file_handle.write(f"{qname}:{src} {json.dumps(params)}\n") 

1623 

1624 def pkg_test_request( 

1625 self, src: str, arch: str, all_triggers: list[str], huge: bool = False 

1626 ) -> None: 

1627 """Request one package test for a set of triggers 

1628 

1629 all_triggers is a list of "pkgname/version". These are the packages 

1630 that will be taken from the source suite. The first package in this 

1631 list is the package that triggers the testing of src, the rest are 

1632 additional packages required for installability of the test deps. If 

1633 huge is true, then the request will be put into the -huge instead of 

1634 normal queue. 

1635 

1636 This will only be done if that test wasn't already requested in 

1637 a previous run (i. e. if it's not already in self.pending_tests) 

1638 or if there is already a fresh or a positive result for it. This 

1639 ensures to download current results for this package before 

1640 requesting any test.""" 

1641 trigger = all_triggers[0] 

1642 uses_swift = not self.options.adt_swift_url.startswith("file://") 

1643 try: 

1644 result = self.test_results[trigger][src][arch] 

1645 has_result = True 

1646 except KeyError: 

1647 has_result = False 

1648 

1649 if has_result: 

1650 result_state = result[0] 

1651 if result_state in {Result.OLD_PASS, Result.OLD_FAIL, Result.OLD_NEUTRAL}: 

1652 pass 

1653 elif ( 

1654 result_state is Result.FAIL 

1655 and self.result_in_baseline(src, arch)[0] 

1656 in {Result.PASS, Result.NEUTRAL, Result.OLD_PASS, Result.OLD_NEUTRAL} 

1657 and self._now - result[3] > self.options.adt_retry_older_than 

1658 ): 

1659 # We might want to retry this failure, so continue 

1660 pass 

1661 elif not uses_swift: 

1662 # We're done if we don't retrigger and we're not using swift 

1663 return 

1664 elif result_state in {Result.PASS, Result.NEUTRAL}: 

1665 self.logger.debug( 

1666 "%s/%s triggered by %s already known", src, arch, trigger 

1667 ) 

1668 return 

1669 

1670 # Without swift we don't expect new results 

1671 if uses_swift: 

1672 self.logger.info( 

1673 "Checking for new results for failed %s/%s for trigger %s", 

1674 src, 

1675 arch, 

1676 trigger, 

1677 ) 

1678 self.fetch_swift_results(self.options.adt_swift_url, src, arch) 

1679 # do we have one now? 

1680 try: 

1681 self.test_results[trigger][src][arch] 

1682 return 

1683 except KeyError: 

1684 pass 

1685 

1686 self.request_test_if_not_queued(src, arch, trigger, all_triggers, huge=huge) 

1687 

1688 def request_test_if_not_queued( 

1689 self, 

1690 src: str, 

1691 arch: str, 

1692 trigger: str, 

1693 all_triggers: list[str] = [], 

1694 huge: bool = False, 

1695 ) -> None: 

1696 assert self.pending_tests is not None # for type checking 

1697 if not all_triggers: 

1698 all_triggers = [trigger] 

1699 

1700 # Don't re-request if it's already pending 

1701 arch_dict = self.pending_tests.setdefault(trigger, {}).setdefault(src, {}) 

1702 if arch in arch_dict.keys(): 

1703 self.logger.debug( 

1704 "Test %s/%s for %s is already pending, not queueing", src, arch, trigger 

1705 ) 

1706 else: 

1707 self.logger.debug( 

1708 "Requesting %s autopkgtest on %s to verify %s", src, arch, trigger 

1709 ) 

1710 arch_dict[arch] = self._now 

1711 self.send_test_request(src, arch, all_triggers, huge=huge) 

1712 

1713 def result_in_baseline(self, src: str, arch: str) -> list[Any]: 

1714 """Get the result for src on arch in the baseline 

1715 

1716 The baseline is optionally all data or a reference set) 

1717 """ 

1718 

1719 # this requires iterating over all cached results and thus is expensive; 

1720 # cache the results 

1721 try: 

1722 return self.result_in_baseline_cache[src][arch] 

1723 except KeyError: 

1724 pass 

1725 

1726 result_reference: list[Any] = [Result.NONE, None, "", 0] 

1727 if self.options.adt_baseline == "reference": 

1728 if src not in self.suite_info.target_suite.sources: 1728 ↛ 1729line 1728 didn't jump to line 1729 because the condition on line 1728 was never true

1729 return result_reference 

1730 

1731 try: 

1732 result_reference = self.test_results[REF_TRIG][src][arch] 

1733 self.logger.debug( 

1734 "Found result for src %s in reference: %s", 

1735 src, 

1736 result_reference[0].name, 

1737 ) 

1738 except KeyError: 

1739 self.logger.debug( 

1740 "Found NO result for src %s in reference: %s", 

1741 src, 

1742 result_reference[0].name, 

1743 ) 

1744 self.result_in_baseline_cache[src][arch] = deepcopy(result_reference) 

1745 return result_reference 

1746 

1747 result_ever: list[Any] = [Result.FAIL, None, "", 0] 

1748 for srcmap in self.test_results.values(): 

1749 try: 

1750 if srcmap[src][arch][0] is not Result.FAIL: 

1751 result_ever = srcmap[src][arch] 

1752 # If we are not looking at a reference run, We don't really 

1753 # care about anything except the status, so we're done 

1754 # once we find a PASS. 

1755 if result_ever[0] is Result.PASS: 

1756 break 

1757 except KeyError: 

1758 pass 

1759 

1760 self.result_in_baseline_cache[src][arch] = deepcopy(result_ever) 

1761 self.logger.debug("Result for src %s ever: %s", src, result_ever[0].name) 

1762 return result_ever 

1763 

1764 def has_test_in_target(self, src: str) -> bool: 

1765 test_in_target = False 

1766 try: 

1767 srcinfo = self.suite_info.target_suite.sources[src] 

1768 if "autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo): 

1769 test_in_target = True 

1770 # AttributeError is only needed for the test suite as 

1771 # srcinfo can be a NoneType 

1772 except (KeyError, AttributeError): 

1773 pass 

1774 

1775 return test_in_target 

1776 

1777 def pkg_test_result( 

1778 self, src: str, ver: str, arch: str, trigger: str 

1779 ) -> tuple[str, str, str | None, str]: 

1780 """Get current test status of a particular package 

1781 

1782 Return (status, real_version, run_id, log_url) tuple; status is a key in 

1783 EXCUSES_LABELS. run_id is None if the test is still running. 

1784 """ 

1785 assert self.pending_tests is not None # for type checking 

1786 # determine current test result status 

1787 run_id = None 

1788 try: 

1789 r = self.test_results[trigger][src][arch] 

1790 ver = r[1] 

1791 run_id = r[2] 

1792 

1793 if r[0] in {Result.FAIL, Result.OLD_FAIL}: 

1794 # determine current test result status 

1795 baseline_result = self.result_in_baseline(src, arch)[0] 

1796 

1797 # Special-case triggers from linux-meta*: we cannot compare 

1798 # results against different kernels, as e. g. a DKMS module 

1799 # might work against the default kernel but fail against a 

1800 # different flavor; so for those, ignore the "ever 

1801 # passed" check; FIXME: check against trigsrc only 

1802 if self.options.adt_baseline != "reference" and ( 

1803 trigger.startswith("linux-meta") or trigger.startswith("linux/") 

1804 ): 

1805 baseline_result = Result.FAIL 

1806 

1807 # Check if the autopkgtest (still) exists in the target suite 

1808 test_in_target = self.has_test_in_target(src) 

1809 

1810 if test_in_target and baseline_result in { 

1811 Result.NONE, 

1812 Result.OLD_FAIL, 

1813 Result.OLD_NEUTRAL, 

1814 Result.OLD_PASS, 

1815 }: 

1816 self.request_test_if_not_queued(src, arch, REF_TRIG) 

1817 

1818 if self.has_force_badtest(src, ver, arch): 

1819 result = "IGNORE-FAIL" 

1820 elif not test_in_target: 

1821 if self.options.adt_ignore_failure_for_new_tests: 

1822 result = "IGNORE-FAIL" 

1823 else: 

1824 result = r[0].name 

1825 elif baseline_result in {Result.FAIL, Result.OLD_FAIL}: 

1826 result = "ALWAYSFAIL" 

1827 elif baseline_result is Result.NONE: 1827 ↛ 1828line 1827 didn't jump to line 1828 because the condition on line 1827 was never true

1828 result = "RUNNING-REFERENCE" 

1829 else: 

1830 result = "REGRESSION" 

1831 

1832 else: 

1833 result = r[0].name 

1834 

1835 url = self.format_log_url(src, arch, run_id) 

1836 except KeyError: 

1837 # no result for src/arch; still running? 

1838 assert arch in self.pending_tests.get(trigger, {}).get(src, {}).keys(), ( 

1839 "Result for %s/%s/%s (triggered by %s) is neither known nor pending!" 

1840 % (src, ver, arch, trigger) 

1841 ) 

1842 

1843 if self.has_force_badtest(src, ver, arch): 

1844 result = "RUNNING-IGNORE" 

1845 else: 

1846 if self.has_test_in_target(src): 

1847 baseline_result = self.result_in_baseline(src, arch)[0] 

1848 if baseline_result is Result.FAIL: 

1849 result = "RUNNING-ALWAYSFAIL" 

1850 else: 

1851 result = "RUNNING" 

1852 else: 

1853 if self.options.adt_ignore_failure_for_new_tests: 

1854 result = "RUNNING-IGNORE" 

1855 else: 

1856 result = "RUNNING" 

1857 url = self.options.adt_ci_url + "status/pending" 

1858 

1859 return (result, ver, run_id, url) 

1860 

1861 def has_force_badtest(self, src: str, ver: str, arch: str) -> bool: 

1862 """Check if src/ver/arch has a force-badtest hint""" 

1863 

1864 assert self.hints is not None 

1865 for hint in self.hints.search("force-badtest", package=src): 

1866 if any( 

1867 mi 

1868 for mi in hint.packages 

1869 if mi.architecture in ("source", arch) 

1870 and ( 

1871 mi.version is None 

1872 or mi.version == "all" # Historical unversioned hint 

1873 or apt_pkg.version_compare(ver, mi.version) <= 0 

1874 ) 

1875 ): 

1876 self.logger.info( 

1877 "Checking hints for %s/%s/%s: %s", 

1878 src, 

1879 arch, 

1880 ver, 

1881 hint, 

1882 ) 

1883 return True 

1884 

1885 return False 

1886 

1887 def has_built_on_this_arch_or_is_arch_all( 

1888 self, src_data: SourcePackage, arch: str 

1889 ) -> bool: 

1890 """When a source builds arch:all binaries, those binaries are 

1891 added to all architectures and thus the source 'exists' 

1892 everywhere. This function checks if the source has any arch 

1893 specific binaries on this architecture and if not, if it 

1894 has them on any architecture. 

1895 """ 

1896 packages_s_a = self.suite_info.primary_source_suite.binaries[arch] 

1897 has_unknown_binary = False 

1898 for binary_s in filter_out_faux_gen(src_data.binaries): 

1899 try: 

1900 binary_u = packages_s_a[binary_s.package_name] 

1901 except KeyError: 

1902 # src_data.binaries has all the built binaries, so if 

1903 # we get here, we know that at least one architecture 

1904 # has architecture specific binaries 

1905 has_unknown_binary = True 

1906 continue 

1907 if binary_u.architecture == arch: 

1908 return True 

1909 # If we get here, we have only seen arch:all packages for this 

1910 # arch. 

1911 return not has_unknown_binary