Coverage for britney2/policies/autopkgtest.py: 90%

847 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-29 17:21 +0000

1# Copyright (C) 2013 - 2016 Canonical Ltd. 

2# Authors: 

3# Colin Watson <cjwatson@ubuntu.com> 

4# Jean-Baptiste Lallement <jean-baptiste.lallement@canonical.com> 

5# Martin Pitt <martin.pitt@ubuntu.com> 

6 

7# This program is free software; you can redistribute it and/or modify 

8# it under the terms of the GNU General Public License as published by 

9# the Free Software Foundation; either version 2 of the License, or 

10# (at your option) any later version. 

11 

12# This program is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16 

17import calendar 

18import collections 

19import http.client 

20import io 

21import itertools 

22import json 

23import optparse 

24import os 

25import sys 

26import tarfile 

27import time 

28import urllib.parse 

29from collections.abc import Iterator 

30from copy import deepcopy 

31from enum import Enum 

32from functools import lru_cache, total_ordering 

33from typing import TYPE_CHECKING, Any, Optional, cast 

34from urllib.error import HTTPError 

35from urllib.request import urlopen 

36from urllib.response import addinfourl 

37 

38import apt_pkg 

39 

40from britney2 import ( 

41 BinaryPackageId, 

42 PackageId, 

43 SourcePackage, 

44 SuiteClass, 

45 Suites, 

46 TargetSuite, 

47) 

48from britney2.hints import HintAnnotate, HintType 

49from britney2.migrationitem import MigrationItem 

50from britney2.policies import PolicyVerdict 

51from britney2.policies.policy import AbstractBasePolicy 

52from britney2.utils import ( 

53 filter_out_faux, 

54 get_dependency_solvers, 

55 iter_except, 

56 parse_option, 

57) 

58 

59if TYPE_CHECKING: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 import amqplib.client_0_8 as amqp 

61 

62 from ..britney import Britney 

63 from ..excuse import Excuse 

64 from ..hints import HintParser 

65 

66 

67@total_ordering 

68class Result(Enum): 

69 PASS = 1 

70 NEUTRAL = 2 

71 FAIL = 3 

72 OLD_PASS = 4 

73 OLD_NEUTRAL = 5 

74 OLD_FAIL = 6 

75 NONE = 7 

76 

77 def __lt__(self, other: "Result") -> bool: 

78 return True if self.value < other.value else False 

79 

80 

81EXCUSES_LABELS = { 

82 "PASS": '<span style="background:#87d96c">Pass</span>', 

83 "OLD_PASS": '<span style="background:#87d96c">Pass</span>', 

84 "NEUTRAL": "No tests, superficial or marked flaky", 

85 "OLD_NEUTRAL": "No tests, superficial or marked flaky", 

86 "FAIL": '<span style="background:#ff6666">Failed</span>', 

87 "OLD_FAIL": '<span style="background:#ff6666">Failed</span>', 

88 "ALWAYSFAIL": '<span style="background:#e5c545">Failed (not a regression)</span>', 

89 "REGRESSION": '<span style="background:#ff6666">Regression</span>', 

90 "IGNORE-FAIL": '<span style="background:#e5c545">Ignored failure</span>', 

91 "RUNNING": '<span style="background:#99ddff">Test triggered</span>', 

92 "RUNNING-ALWAYSFAIL": "Test triggered (will not be considered a regression)", 

93 "RUNNING-IGNORE": "Test triggered (failure will be ignored)", 

94 "RUNNING-REFERENCE": '<span style="background:#ff6666">Reference test triggered, but real test failed already</span>', 

95} 

96 

97REF_TRIG = "migration-reference/0" 

98 

99VERSION_KEY = "britney-autopkgtest-pending-file-version" 

100 

101 

102def srchash(src: str) -> str: 

103 """archive hash prefix for source package""" 

104 

105 if src.startswith("lib"): 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 return src[:4] 

107 else: 

108 return src[0] 

109 

110 

111def added_pkgs_compared_to_target_suite( 

112 package_ids: frozenset[BinaryPackageId], 

113 target_suite: TargetSuite, 

114 *, 

115 invert: bool = False, 

116) -> Iterator[BinaryPackageId]: 

117 if invert: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 pkgs_ids_to_ignore = package_ids - set( 

119 target_suite.which_of_these_are_in_the_suite(package_ids) 

120 ) 

121 names_ignored = {p.package_name for p in pkgs_ids_to_ignore} 

122 else: 

123 names_ignored = { 

124 p.package_name 

125 for p in target_suite.which_of_these_are_in_the_suite(package_ids) 

126 } 

127 yield from (p for p in package_ids if p.package_name not in names_ignored) 

128 

129 

130def all_leaf_results( 

131 test_results: dict[str, dict[str, dict[str, list[Any]]]], 

132) -> Iterator[list[Any]]: 

133 for trigger in test_results.values(): 

134 for arch in trigger.values(): 

135 yield from arch.values() 

136 

137 

138def mark_result_as_old(result: Result) -> Result: 

139 """Convert current result into corresponding old result""" 

140 

141 if result == Result.FAIL: 

142 result = Result.OLD_FAIL 

143 elif result == Result.PASS: 

144 result = Result.OLD_PASS 

145 elif result == Result.NEUTRAL: 145 ↛ 147line 145 didn't jump to line 147 because the condition on line 145 was always true

146 result = Result.OLD_NEUTRAL 

147 return result 

148 

149 

150def concat_bdeps(src_data: SourcePackage) -> str: 

151 """Concatenate build_deps_arch and build_deps_indep""" 

152 return ",".join( 

153 (src_data.build_deps_arch or "", src_data.build_deps_indep or "") 

154 ).strip(",") 

155 

156 

157class AutopkgtestPolicy(AbstractBasePolicy): 

158 """autopkgtest regression policy for source migrations 

159 

160 Run autopkgtests for the excuse and all of its reverse dependencies, and 

161 reject the upload if any of those regress. 

162 """ 

163 

164 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

165 super().__init__( 

166 "autopkgtest", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

167 ) 

168 # tests requested in this and previous runs 

169 # trigger -> src -> [arch] 

170 self.pending_tests: dict[str, dict[str, dict[str, int]]] | None = None 

171 self.pending_tests_file = os.path.join( 

172 self.state_dir, "autopkgtest-pending.json" 

173 ) 

174 self.testsuite_triggers: dict[str, set[str]] = {} 

175 self.result_in_baseline_cache: dict[str, dict[str, list[Any]]] = ( 

176 collections.defaultdict(dict) 

177 ) 

178 

179 self.amqp_file_handle: io.TextIOWrapper | None = None 

180 

181 # Default values for this policy's options 

182 parse_option(options, "adt_baseline") 

183 parse_option(options, "adt_huge", to_int=True) 

184 parse_option(options, "adt_ppas") 

185 parse_option(options, "adt_reference_max_age", day_to_sec=True) 

186 parse_option(options, "adt_pending_max_age", default=5, day_to_sec=True) 

187 parse_option(options, "adt_regression_penalty", default=0, to_int=True) 

188 parse_option(options, "adt_log_url") # see below for defaults 

189 parse_option(options, "adt_retry_url") # see below for defaults 

190 parse_option(options, "adt_retry_older_than", day_to_sec=True) 

191 parse_option(options, "adt_results_cache_age", day_to_sec=True) 

192 parse_option(options, "adt_shared_results_cache") 

193 parse_option(options, "adt_success_bounty", default=0, to_int=True) 

194 parse_option(options, "adt_ignore_failure_for_new_tests", to_bool=True) 

195 

196 # When ADT_RESULTS_CACHE_AGE is smaller than or equal to 

197 # ADT_REFERENCE_MAX_AGE old reference result will be removed from cache 

198 # before the newly scheduled results are in, potentially causing 

199 # additional waiting. For packages like glibc this might cause an 

200 # infinite delay as there will always be a package that's 

201 # waiting. Similarly for ADT_RETRY_OLDER_THAN. 

202 if self.options.adt_results_cache_age <= self.options.adt_reference_max_age: 

203 self.logger.warning( 

204 "Unexpected: ADT_REFERENCE_MAX_AGE bigger than ADT_RESULTS_CACHE_AGE" 

205 ) 

206 if self.options.adt_results_cache_age <= self.options.adt_retry_older_than: 

207 self.logger.warning( 

208 "Unexpected: ADT_RETRY_OLDER_THAN bigger than ADT_RESULTS_CACHE_AGE" 

209 ) 

210 

211 if not self.options.adt_log_url: 211 ↛ 237line 211 didn't jump to line 237 because the condition on line 211 was always true

212 # Historical defaults 

213 if self.options.adt_swift_url.startswith("file://"): 

214 self.options.adt_log_url = os.path.join( 

215 self.options.adt_ci_url, 

216 "data", 

217 "autopkgtest", 

218 self.options.series, 

219 "{arch}", 

220 "{hash}", 

221 "{package}", 

222 "{run_id}", 

223 "log.gz", 

224 ) 

225 else: 

226 self.options.adt_log_url = os.path.join( 

227 self.options.adt_swift_url, 

228 "{swift_container}", 

229 self.options.series, 

230 "{arch}", 

231 "{hash}", 

232 "{package}", 

233 "{run_id}", 

234 "log.gz", 

235 ) 

236 

237 if hasattr(self.options, "adt_retry_url_mech"): 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true

238 self.logger.warning( 

239 "The ADT_RETRY_URL_MECH configuration has been deprecated." 

240 ) 

241 self.logger.warning( 

242 "Instead britney now supports ADT_RETRY_URL for more flexibility." 

243 ) 

244 if self.options.adt_retry_url: 

245 self.logger.error( 

246 "Please remove the ADT_RETRY_URL_MECH as ADT_RETRY_URL will be used." 

247 ) 

248 elif self.options.adt_retry_url_mech == "run_id": 

249 self.options.adt_retry_url = ( 

250 self.options.adt_ci_url + "api/v1/retry/{run_id}" 

251 ) 

252 if not self.options.adt_retry_url: 252 ↛ 269line 252 didn't jump to line 269 because the condition on line 252 was always true

253 # Historical default 

254 self.options.adt_retry_url = ( 

255 self.options.adt_ci_url 

256 + "request.cgi?" 

257 + "release={release}&arch={arch}&package={package}&trigger={trigger}{ppas}" 

258 ) 

259 

260 # results map: trigger -> src -> arch -> [passed, version, run_id, seen] 

261 # - trigger is "source/version" of an unstable package that triggered 

262 # this test run. 

263 # - "passed" is a Result 

264 # - "version" is the package version of "src" of that test 

265 # - "run_id" is an opaque ID that identifies a particular test run for 

266 # a given src/arch. 

267 # - "seen" is an approximate time stamp of the test run. How this is 

268 # deduced depends on the interface used. 

269 self.test_results: dict[str, dict[str, dict[str, list[Any]]]] = {} 

270 if self.options.adt_shared_results_cache: 

271 self.results_cache_file = self.options.adt_shared_results_cache 

272 else: 

273 self.results_cache_file = os.path.join( 

274 self.state_dir, "autopkgtest-results.cache" 

275 ) 

276 

277 try: 

278 self.options.adt_ppas = self.options.adt_ppas.strip().split() 

279 except AttributeError: 

280 self.options.adt_ppas = [] 

281 

282 self.swift_container = "autopkgtest-" + options.series 

283 if self.options.adt_ppas: 

284 self.swift_container += "-" + options.adt_ppas[-1].replace("/", "-") 

285 

286 # restrict adt_arches to architectures we actually run for 

287 self.adt_arches = [] 

288 for arch in self.options.adt_arches.split(): 

289 if arch in self.options.architectures: 

290 self.adt_arches.append(arch) 

291 else: 

292 self.logger.info( 

293 "Ignoring ADT_ARCHES %s as it is not in architectures list", arch 

294 ) 

295 

296 def __del__(self) -> None: 

297 if self.amqp_file_handle: 297 ↛ exitline 297 didn't return from function '__del__' because the condition on line 297 was always true

298 try: 

299 self.amqp_file_handle.close() 

300 except AttributeError: 

301 pass 

302 

303 def register_hints(self, hint_parser: "HintParser") -> None: 

304 hint_parser.register_hint_type( 

305 HintType( 

306 "force-badtest", 

307 versioned=HintAnnotate.OPTIONAL, 

308 architectured=HintAnnotate.OPTIONAL, 

309 ) 

310 ) 

311 hint_parser.register_hint_type(HintType("force-skiptest")) 

312 

313 def initialise(self, britney: "Britney") -> None: 

314 super().initialise(britney) 

315 # We want to use the "current" time stamp in multiple locations 

316 time_now = round(time.time()) 

317 if hasattr(self.options, "fake_runtime"): 

318 time_now = int(self.options.fake_runtime) 

319 self._now = time_now 

320 

321 # local copies for better performance 

322 parse_src_depends = apt_pkg.parse_src_depends 

323 

324 # compute inverse Testsuite-Triggers: map, unifying all series 

325 self.logger.info("Building inverse testsuite_triggers map") 

326 for suite in self.suite_info: 

327 for src, data in suite.sources.items(): 

328 # for now, let's assume that autodep8 uses builddeps (most do) 

329 if ( 

330 self.has_autodep8(data) 

331 and "@builddeps@" not in data.testsuite_triggers 

332 ): 

333 data.testsuite_triggers.append("@builddeps@") 

334 for trigger in data.testsuite_triggers: 

335 if trigger == "@builddeps@": 

336 for arch in self.adt_arches: 

337 for block in parse_src_depends( 

338 concat_bdeps(data), True, arch 

339 ): 

340 self.testsuite_triggers.setdefault( 

341 block[0][0], set() 

342 ).add(src) 

343 else: 

344 self.testsuite_triggers.setdefault(trigger, set()).add(src) 

345 target_suite_name = self.suite_info.target_suite.name 

346 

347 os.makedirs(self.state_dir, exist_ok=True) 

348 self.read_pending_tests() 

349 

350 # read the cached results that we collected so far 

351 if os.path.exists(self.results_cache_file): 

352 with open(self.results_cache_file) as f: 

353 test_results = json.load(f) 

354 self.test_results = self.check_and_upgrade_cache(test_results) 

355 self.logger.info("Read previous results from %s", self.results_cache_file) 

356 else: 

357 self.logger.info( 

358 "%s does not exist, re-downloading all results from swift", 

359 self.results_cache_file, 

360 ) 

361 

362 # read in the new results 

363 if self.options.adt_swift_url.startswith("file://"): 

364 debci_file = self.options.adt_swift_url[7:] 

365 if os.path.exists(debci_file): 

366 with open(debci_file) as f: 

367 test_results = json.load(f) 

368 self.logger.info("Read new results from %s", debci_file) 

369 for res in test_results["results"]: 

370 # if there's no date, the test didn't finish yet 

371 if res["date"] is None: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true

372 continue 

373 (test_suite, triggers, src, arch, ver, status, run_id, seen) = [ 

374 res["suite"], 

375 res["trigger"], 

376 res["package"], 

377 res["arch"], 

378 res["version"], 

379 res["status"], 

380 str(res["run_id"]), 

381 round( 

382 calendar.timegm( 

383 time.strptime(res["date"][0:-5], "%Y-%m-%dT%H:%M:%S") 

384 ) 

385 ), 

386 ] 

387 if test_suite != target_suite_name: 387 ↛ 389line 387 didn't jump to line 389 because the condition on line 387 was never true

388 # not requested for this target suite, so ignore 

389 continue 

390 if triggers is None: 390 ↛ 392line 390 didn't jump to line 392 because the condition on line 390 was never true

391 # not requested for this policy, so ignore 

392 continue 

393 if status is None: 

394 # still running => pending 

395 continue 

396 for trigger in triggers.split(): 

397 # remove matching test requests 

398 self.remove_from_pending(trigger, src, arch, seen) 

399 if status == "tmpfail": 399 ↛ 401line 399 didn't jump to line 401 because the condition on line 399 was never true

400 # let's see if we still need it 

401 continue 

402 self.logger.debug( 

403 "Results %s %s %s added", src, trigger, status 

404 ) 

405 self.add_trigger_to_results( 

406 trigger, 

407 src, 

408 ver, 

409 arch, 

410 run_id, 

411 seen, 

412 Result[status.upper()], 

413 ) 

414 else: 

415 self.logger.info( 

416 "%s does not exist, no new data will be processed", debci_file 

417 ) 

418 

419 # The cache can contain results against versions of packages that 

420 # are not in any suite anymore. Strip those out, as we don't want 

421 # to use those results. Additionally, old references may be 

422 # filtered out. 

423 if self.options.adt_baseline == "reference": 

424 self.filter_old_results() 

425 

426 # we need sources, binaries, and installability tester, so for now 

427 # remember the whole britney object 

428 self.britney = britney 

429 

430 # Initialize AMQP connection 

431 self.amqp_channel: Optional["amqp.channel.Channel"] = None 

432 self.amqp_file_handle = None 

433 if self.options.dry_run: 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true

434 return 

435 

436 amqp_url = self.options.adt_amqp 

437 

438 if amqp_url.startswith("amqp://"): 438 ↛ 439line 438 didn't jump to line 439 because the condition on line 438 was never true

439 import amqplib.client_0_8 as amqp 

440 

441 # depending on the setup we connect to a AMQP server 

442 creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False) 

443 self.amqp_con = amqp.Connection( 

444 creds.hostname, userid=creds.username, password=creds.password 

445 ) 

446 self.amqp_channel = self.amqp_con.channel() 

447 self.logger.info("Connected to AMQP server") 

448 elif amqp_url.startswith("file://"): 448 ↛ 453line 448 didn't jump to line 453 because the condition on line 448 was always true

449 # or in Debian and in testing mode, adt_amqp will be a file:// URL 

450 amqp_file = amqp_url[7:] 

451 self.amqp_file_handle = open(amqp_file, "w", 1) 

452 else: 

453 raise RuntimeError("Unknown ADT_AMQP schema %s" % amqp_url.split(":", 1)[0]) 

454 

455 def check_and_upgrade_cache( 

456 self, test_results: dict[str, dict[str, dict[str, list[Any]]]] 

457 ) -> dict[str, dict[str, dict[str, list[Any]]]]: 

458 for leaf_result in all_leaf_results(test_results): 

459 leaf_result[0] = Result[leaf_result[0]] 

460 

461 # Drop results older than ADT_RESULTS_CACHE_AGE 

462 for trigger in list(test_results.keys()): 

463 for pkg in list(test_results[trigger].keys()): 

464 for arch in list(test_results[trigger][pkg].keys()): 

465 arch_result = test_results[trigger][pkg][arch] 

466 if self._now - arch_result[3] > self.options.adt_results_cache_age: 466 ↛ 467line 466 didn't jump to line 467 because the condition on line 466 was never true

467 del test_results[trigger][pkg][arch] 

468 if not test_results[trigger][pkg]: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true

469 del test_results[trigger][pkg] 

470 if not test_results[trigger]: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true

471 del test_results[trigger] 

472 

473 return test_results 

474 

475 def filter_old_results(self) -> None: 

476 """Remove results for old versions and reference runs from the cache. 

477 

478 For now, only delete reference runs. If we delete regular 

479 results after a while, packages with lots of triggered tests may 

480 never have all the results at the same time.""" 

481 

482 test_results = self.test_results 

483 

484 for trigger, trigger_data in test_results.items(): 

485 for src, results in trigger_data.items(): 

486 for arch, result in results.items(): 

487 if ( 

488 trigger == REF_TRIG 

489 and self._now - result[3] > self.options.adt_reference_max_age 

490 ): 

491 result[0] = mark_result_as_old(result[0]) 

492 elif not self.test_version_in_any_suite(src, result[1]): 

493 result[0] = mark_result_as_old(result[0]) 

494 

495 def test_version_in_any_suite(self, src: str, version: str) -> bool: 

496 """Check if the mentioned version of src is found in a suite 

497 

498 To prevent regressions in the target suite, the result should be 

499 from a test with the version of the package in either the source 

500 suite or the target suite. The source suite is also valid, 

501 because due to versioned test dependencies and Breaks/Conflicts 

502 relations, regularly the version in the source suite is used 

503 during testing. 

504 """ 

505 

506 versions = set() 

507 for suite in self.suite_info: 

508 try: 

509 srcinfo = suite.sources[src] 

510 except KeyError: 

511 continue 

512 versions.add(srcinfo.version) 

513 

514 valid_version = False 

515 for ver in versions: 

516 if apt_pkg.version_compare(ver, version) == 0: 

517 valid_version = True 

518 break 

519 

520 return valid_version 

521 

522 def save_pending_json(self) -> None: 

523 # update the pending tests on-disk cache 

524 self.logger.info( 

525 "Updating pending requested tests in %s" % self.pending_tests_file 

526 ) 

527 # Shallow clone pending_tests as we only modify the toplevel and change its type. 

528 pending_tests: dict[str, Any] = {} 

529 if self.pending_tests: 

530 pending_tests = dict(self.pending_tests) 

531 # Avoid adding if there are no pending results at all (eases testing) 

532 pending_tests[VERSION_KEY] = 1 

533 with open(self.pending_tests_file + ".new", "w") as f: 

534 json.dump(pending_tests, f, indent=2) 

535 os.rename(self.pending_tests_file + ".new", self.pending_tests_file) 

536 

537 def save_state(self, britney: "Britney") -> None: 

538 super().save_state(britney) 

539 

540 # update the results on-disk cache, unless we are using a r/o shared one 

541 if not self.options.adt_shared_results_cache: 

542 self.logger.info("Updating results cache") 

543 test_results = deepcopy(self.test_results) 

544 for result in all_leaf_results(test_results): 

545 result[0] = result[0].name 

546 with open(self.results_cache_file + ".new", "w") as f: 

547 json.dump(test_results, f, indent=2) 

548 os.rename(self.results_cache_file + ".new", self.results_cache_file) 

549 

550 self.save_pending_json() 

551 

552 def format_retry_url( 

553 self, run_id: str | None, arch: str, testsrc: str, trigger: str 

554 ) -> str: 

555 if self.options.adt_ppas: 

556 ppas = "&" + urllib.parse.urlencode( 

557 [("ppa", p) for p in self.options.adt_ppas] 

558 ) 

559 else: 

560 ppas = "" 

561 return cast(str, self.options.adt_retry_url).format( 

562 run_id=run_id, 

563 release=self.options.series, 

564 arch=arch, 

565 package=testsrc, 

566 trigger=urllib.parse.quote_plus(trigger), 

567 ppas=ppas, 

568 ) 

569 

570 def format_log_url(self, testsrc: str, arch: str, run_id: str) -> str: 

571 return cast(str, self.options.adt_log_url).format( 

572 release=self.options.series, 

573 swift_container=self.swift_container, 

574 hash=srchash(testsrc), 

575 package=testsrc, 

576 arch=arch, 

577 run_id=run_id, 

578 ) 

579 

580 def apply_src_policy_impl( 

581 self, 

582 tests_info: dict[str, Any], 

583 source_data_tdist: SourcePackage | None, 

584 source_data_srcdist: SourcePackage, 

585 excuse: "Excuse", 

586 ) -> PolicyVerdict: 

587 

588 # initialize 

589 verdict = PolicyVerdict.PASS 

590 source_name = excuse.item.package 

591 

592 # skip/delay autopkgtests until new package is built somewhere 

593 if not filter_out_faux(source_data_srcdist.binaries): 

594 self.logger.debug( 

595 "%s hasnot been built anywhere, skipping autopkgtest policy", 

596 excuse.name, 

597 ) 

598 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

599 excuse.add_verdict_info(verdict, "Autopkgtest deferred: missing all builds") 

600 

601 if "all" in excuse.missing_builds: 

602 self.logger.debug( 

603 "%s hasnot been built for arch:all, skipping autopkgtest policy", 

604 source_name, 

605 ) 

606 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

607 excuse.add_verdict_info( 

608 verdict, "Autopkgtest deferred: missing arch:all build" 

609 ) 

610 

611 all_self_tests_pass = False 

612 results_info: list[str] = [] 

613 if not verdict.is_rejected: 

614 self.logger.debug("Checking autopkgtests for %s", source_name) 

615 trigger = source_name + "/" + source_data_srcdist.version 

616 

617 # build a (testsrc, testver) → arch → (status, run_id, log_url) map; we trigger/check test 

618 # results per architecture for technical/efficiency reasons, but we 

619 # want to evaluate and present the results by tested source package 

620 # first 

621 pkg_arch_result: dict[ 

622 tuple[str, str], dict[str, tuple[str, str | None, str]] 

623 ] = collections.defaultdict(dict) 

624 for arch in self.adt_arches: 

625 if arch in excuse.missing_builds: 

626 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

627 self.logger.debug( 

628 "%s hasnot been built on arch %s, delay autopkgtest there", 

629 source_name, 

630 arch, 

631 ) 

632 excuse.add_verdict_info( 

633 verdict, 

634 f"Autopkgtest deferred on {arch}: missing arch:{arch} build", 

635 ) 

636 else: 

637 verdict = self.check_and_request_arch( 

638 excuse, 

639 arch, 

640 source_data_srcdist, 

641 pkg_arch_result, 

642 trigger, 

643 verdict, 

644 ) 

645 

646 verdict, results_info, all_self_tests_pass = self.process_pkg_arch_results( 

647 tests_info, excuse, pkg_arch_result, verdict, trigger 

648 ) 

649 

650 verdict = self.finalize_excuse( 

651 excuse, verdict, all_self_tests_pass, results_info 

652 ) 

653 return verdict 

654 

655 def apply_srcarch_policy_impl( 

656 self, 

657 tests_info: dict[str, Any], 

658 arch: str, 

659 source_data_tdist: SourcePackage | None, 

660 source_data_srcdist: SourcePackage, 

661 excuse: "Excuse", 

662 ) -> PolicyVerdict: 

663 

664 assert self.hints is not None # for type checking 

665 # initialize 

666 verdict = PolicyVerdict.PASS 

667 

668 self.logger.debug(f"Checking autopkgtests for binNMU {str(excuse.item)}/{arch}") 

669 

670 if arch not in self.adt_arches: 

671 return verdict 

672 

673 # find the binNMU version 

674 versions = set() 

675 for bin_pkg in source_data_srcdist.binaries: 

676 if bin_pkg.architecture == arch: 

677 if ( 

678 len(parts := bin_pkg.version.split("+b")) > 1 

679 and parts[-1].isdigit() 

680 ): 

681 versions.add(parts[-1]) 

682 else: 

683 self.logger.debug( 

684 f"Version {bin_pkg.version} doesn't end with '+b#', skipping" 

685 ) 

686 if not versions or len(versions) > 1: 

687 self.logger.debug("This migration item doesn't look like a binNMU") 

688 return verdict 

689 

690 trigger = str(excuse.item) + "/" + versions.pop() 

691 

692 # While we don't need the arch here, this is common with apply_src_policy_impl() 

693 # (testsrc, testver) → arch → (status, run_id, log_url) map 

694 pkg_arch_result: dict[ 

695 tuple[str, str], dict[str, tuple[str, str | None, str]] 

696 ] = collections.defaultdict(dict) 

697 

698 verdict = self.check_and_request_arch( 

699 excuse, arch, source_data_srcdist, pkg_arch_result, trigger, verdict 

700 ) 

701 

702 verdict, results_info, all_self_tests_pass = self.process_pkg_arch_results( 

703 tests_info, excuse, pkg_arch_result, verdict, trigger 

704 ) 

705 

706 verdict = self.finalize_excuse( 

707 excuse, verdict, all_self_tests_pass, results_info 

708 ) 

709 return verdict 

710 

711 def check_and_request_arch( 

712 self, 

713 excuse: "Excuse", 

714 arch: str, 

715 source_data_srcdist: SourcePackage, 

716 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]], 

717 trigger: str, 

718 verdict: PolicyVerdict, 

719 ) -> PolicyVerdict: 

720 """Perform sanity checks and request test/results when they pass""" 

721 

722 source_name = excuse.item.package 

723 if arch in excuse.policy_info["depends"].get("arch_all_not_installable", []): 

724 self.logger.debug( 

725 "%s is uninstallable on arch %s (which is allowed), not running autopkgtest there", 

726 source_name, 

727 arch, 

728 ) 

729 excuse.addinfo( 

730 f"Autopkgtest skipped on {arch}: not installable (which is allowed)" 

731 ) 

732 elif arch in excuse.unsatisfiable_on_archs and arch not in excuse.policy_info[ 

733 "depends" 

734 ].get("autopkgtest_run_anyways", []): 

735 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

736 self.logger.debug( 

737 "%s is uninstallable on arch %s, not running autopkgtest there", 

738 source_name, 

739 arch, 

740 ) 

741 excuse.addinfo(f"Autopkgtest skipped on {arch}: not installable") 

742 else: 

743 self.request_tests_for_source( 

744 arch, source_data_srcdist, pkg_arch_result, excuse, trigger 

745 ) 

746 

747 return verdict 

748 

749 def process_pkg_arch_results( 

750 self, 

751 tests_info: dict[str, Any], 

752 excuse: "Excuse", 

753 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]], 

754 verdict: PolicyVerdict, 

755 trigger: str, 

756 ) -> tuple[PolicyVerdict, list[str], bool]: 

757 """Calculate verdict based on results and render excuse text""" 

758 

759 source_name = excuse.item.package 

760 all_self_tests_pass = False 

761 results_info = [] 

762 

763 # add test result details to Excuse 

764 cloud_url = self.options.adt_ci_url + "packages/%(h)s/%(s)s/%(r)s/%(a)s" 

765 testver: str | None 

766 for testsrc, testver in sorted(pkg_arch_result): 

767 assert testver is not None 

768 arch_results = pkg_arch_result[(testsrc, testver)] 

769 r = {v[0] for v in arch_results.values()} 

770 if r & {"FAIL", "OLD_FAIL", "REGRESSION"}: 

771 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

772 elif r & {"RUNNING", "RUNNING-REFERENCE"} and not verdict.is_rejected: 

773 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

774 # skip version if still running on all arches 

775 if not r - {"RUNNING", "RUNNING-ALWAYSFAIL", "RUNNING-IGNORE"}: 

776 testver = None 

777 

778 # A source package is eligible for the bounty if it has tests 

779 # of its own that pass on all tested architectures. 

780 if testsrc == source_name: 

781 excuse.autopkgtest_results = r 

782 if r == {"PASS"}: 

783 all_self_tests_pass = True 

784 

785 if testver: 

786 testname = f"{testsrc}/{testver}" 

787 else: 

788 testname = testsrc 

789 

790 html_archmsg = [] 

791 for arch in sorted(arch_results): 

792 (status, run_id, log_url) = arch_results[arch] 

793 artifact_url = None 

794 retry_url = None 

795 reference_url = None 

796 reference_retry_url = None 

797 history_url = None 

798 if self.options.adt_ppas: 

799 if log_url.endswith("log.gz"): 

800 artifact_url = log_url.replace("log.gz", "artifacts.tar.gz") 

801 else: 

802 history_url = cloud_url % { 

803 "h": srchash(testsrc), 

804 "s": testsrc, 

805 "r": self.options.series, 

806 "a": arch, 

807 } 

808 if status not in ("PASS", "RUNNING", "RUNNING-IGNORE"): 

809 retry_url = self.format_retry_url(run_id, arch, testsrc, trigger) 

810 

811 baseline_result = self.result_in_baseline(testsrc, arch) 

812 if baseline_result and baseline_result[0] != Result.NONE: 

813 baseline_run_id = str(baseline_result[2]) 

814 reference_url = self.format_log_url( 

815 testsrc, arch, baseline_run_id 

816 ) 

817 if self.options.adt_baseline == "reference": 

818 reference_retry_url = self.format_retry_url( 

819 baseline_run_id, arch, testsrc, REF_TRIG 

820 ) 

821 tests_info.setdefault(testname, {})[arch] = [ 

822 status, 

823 log_url, 

824 history_url, 

825 artifact_url, 

826 retry_url, 

827 ] 

828 

829 # render HTML snippet for testsrc entry for current arch 

830 if history_url: 

831 message = f'<a href="{history_url}">{arch}</a>' 

832 else: 

833 message = arch 

834 message += ': <a href="{}">{}</a>'.format( 

835 log_url, 

836 EXCUSES_LABELS[status], 

837 ) 

838 if retry_url: 

839 message += ( 

840 '<a href="%s" style="text-decoration: none;"> ♻</a>' % retry_url 

841 ) 

842 if reference_url: 

843 message += ' (<a href="%s">reference</a>' % reference_url 

844 if reference_retry_url: 

845 message += ( 

846 '<a href="%s" style="text-decoration: none;"> ♻</a>' 

847 % reference_retry_url 

848 ) 

849 message += ")" 

850 if artifact_url: 

851 message += ' <a href="%s">[artifacts]</a>' % artifact_url 

852 html_archmsg.append(message) 

853 

854 # render HTML line for testsrc entry 

855 # - if action is or may be required 

856 # - for ones own package 

857 if ( 

858 r 

859 - { 

860 "PASS", 

861 "NEUTRAL", 

862 "RUNNING-ALWAYSFAIL", 

863 "ALWAYSFAIL", 

864 "IGNORE-FAIL", 

865 } 

866 or testsrc == source_name 

867 ): 

868 if testver: 

869 pkg = '<a href="#{0}">{0}</a>/{1}'.format(testsrc, testver) 

870 else: 

871 pkg = '<a href="#{0}">{0}</a>'.format(testsrc) 

872 results_info.append( 

873 "Autopkgtest for {}: {}".format(pkg, ", ".join(html_archmsg)) 

874 ) 

875 

876 return (verdict, results_info, all_self_tests_pass) 

877 

878 def finalize_excuse( 

879 self, 

880 excuse: "Excuse", 

881 verdict: PolicyVerdict, 

882 all_self_tests_pass: bool, 

883 results_info: list[str], 

884 ) -> PolicyVerdict: 

885 """Updates excuses and verdict for hints and bounty/penalty config 

886 

887 Given the verdict so far, hints and configuration, the verdict may be 

888 updated. Depending of the end verdict, the content of results_info is 

889 added as info or as excuse. 

890 """ 

891 

892 package = excuse.item.package 

893 version = excuse.item.version 

894 

895 assert self.hints is not None # for type checking 

896 if verdict.is_rejected: 

897 # check for force-skiptest hint 

898 hints = self.hints.search( 

899 "force-skiptest", 

900 package=package, 

901 version=version, 

902 ) 

903 if hints: 

904 excuse.addreason("skiptest") 

905 excuse.addinfo( 

906 "Autopkgtest check should wait for tests relating to %s %s, but forced by %s" 

907 % (package, version, hints[0].user) 

908 ) 

909 verdict = PolicyVerdict.PASS_HINTED 

910 else: 

911 excuse.addreason("autopkgtest") 

912 

913 if ( 

914 self.options.adt_success_bounty 

915 and verdict == PolicyVerdict.PASS 

916 and all_self_tests_pass 

917 ): 

918 excuse.add_bounty("autopkgtest", self.options.adt_success_bounty) 

919 if self.options.adt_regression_penalty and verdict in { 

920 PolicyVerdict.REJECTED_PERMANENTLY, 

921 PolicyVerdict.REJECTED_TEMPORARILY, 

922 }: 

923 if self.options.adt_regression_penalty > 0: 923 ↛ 926line 923 didn't jump to line 926 because the condition on line 923 was always true

924 excuse.add_penalty("autopkgtest", self.options.adt_regression_penalty) 

925 # In case we give penalties instead of blocking, we must always pass 

926 verdict = PolicyVerdict.PASS 

927 for i in results_info: 

928 if verdict.is_rejected: 

929 excuse.add_verdict_info(verdict, i) 

930 else: 

931 excuse.addinfo(i) 

932 

933 return verdict 

934 

935 @staticmethod 

936 def has_autodep8(srcinfo: SourcePackage) -> bool: 

937 """Check if package is covered by autodep8 

938 

939 srcinfo is an item from self.britney.sources 

940 """ 

941 # autodep8? 

942 for t in srcinfo.testsuite: 

943 if t.startswith("autopkgtest-pkg"): 

944 return True 

945 

946 return False 

947 

948 def request_tests_for_source( 

949 self, 

950 arch: str, 

951 source_data_srcdist: SourcePackage, 

952 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]], 

953 excuse: "Excuse", 

954 trigger: str, 

955 ) -> None: 

956 pkg_universe = self.britney.pkg_universe 

957 target_suite = self.suite_info.target_suite 

958 source_suite = excuse.item.suite 

959 sources_t = target_suite.sources 

960 sources_s = excuse.item.suite.sources 

961 packages_s_a = excuse.item.suite.binaries[arch] 

962 source_name = excuse.item.package 

963 source_version = source_data_srcdist.version 

964 # request tests (unless they were already requested earlier or have a result) 

965 tests = self.tests_for_source(source_name, source_version, arch, excuse) 

966 is_huge = len(tests) > self.options.adt_huge 

967 

968 # local copies for better performance 

969 parse_src_depends = apt_pkg.parse_src_depends 

970 

971 # Here we figure out what is required from the source suite 

972 # for the test to install successfully. 

973 # 

974 # The ImplicitDependencyPolicy does a similar calculation, but 

975 # if I (elbrus) understand correctly, only in the reverse 

976 # dependency direction. We are doing something similar here 

977 # but in the dependency direction (note: this code is older). 

978 # We use the ImplicitDependencyPolicy result for the reverse 

979 # dependencies and we keep the code below for the 

980 # dependencies. Using the ImplicitDependencyPolicy results 

981 # also in the reverse direction seems to require quite some 

982 # reorganisation to get that information available here, as in 

983 # the current state only the current excuse is available here 

984 # and the required other excuses may not be calculated yet. 

985 # 

986 # Loop over all binary packages from trigger and 

987 # recursively look up which *versioned* dependencies are 

988 # only satisfied in the source suite. 

989 # 

990 # For all binaries found, look up which packages they 

991 # break/conflict with in the target suite, but not in the 

992 # source suite. The main reason to do this is to cover test 

993 # dependencies, so we will check Testsuite-Triggers as 

994 # well. 

995 # 

996 # OI: do we need to do the first check in a smart way 

997 # (i.e. only for the packages that are actually going to be 

998 # installed) for the breaks/conflicts set as well, i.e. do 

999 # we need to check if any of the packages that we now 

1000 # enforce being from the source suite, actually have new 

1001 # versioned depends and new breaks/conflicts. 

1002 # 

1003 # For all binaries found, add the set of unique source 

1004 # packages to the list of triggers. 

1005 

1006 bin_triggers: set[PackageId] = set() 

1007 bin_new = set(filter_out_faux(source_data_srcdist.binaries)) 

1008 # For each build-depends block (if any) check if the first alternative 

1009 # is satisfiable in the target suite. If not, add it to the initial set 

1010 # used for checking. 

1011 for block in parse_src_depends(concat_bdeps(source_data_srcdist), True, arch): 

1012 if not get_dependency_solvers( 

1013 [block[0]], 

1014 target_suite.binaries[arch], 

1015 target_suite.provides_table[arch], 

1016 build_depends=True, 

1017 ) and ( 

1018 solvers := get_dependency_solvers( 

1019 [block[0]], 

1020 packages_s_a, 

1021 source_suite.provides_table[arch], 

1022 build_depends=True, 

1023 ) 

1024 ): 

1025 bin_new.add(solvers[0].pkg_id) 

1026 for n_binary in iter_except(bin_new.pop, KeyError): 

1027 if n_binary in bin_triggers: 

1028 continue 

1029 bin_triggers.add(n_binary) 

1030 

1031 # Check if there is a dependency that is not 

1032 # available in the target suite. 

1033 # We add slightly too much here, because new binaries 

1034 # will also show up, but they are already properly 

1035 # installed. Nevermind. 

1036 depends = pkg_universe.dependencies_of(n_binary) 

1037 # depends is a frozenset{frozenset{BinaryPackageId, ..}} 

1038 for deps_of_bin in depends: 

1039 if target_suite.any_of_these_are_in_the_suite(deps_of_bin): 

1040 # if any of the alternative dependencies is already 

1041 # satisfied in the target suite, we can just ignore it 

1042 continue 

1043 # We'll figure out which version later 

1044 bin_new.update( 

1045 added_pkgs_compared_to_target_suite(deps_of_bin, target_suite) 

1046 ) 

1047 

1048 # Check if the package breaks/conflicts anything. We might 

1049 # be adding slightly too many source packages due to the 

1050 # check here as a binary package that is broken may be 

1051 # coming from a different source package in the source 

1052 # suite. Nevermind. 

1053 bin_broken = set() 

1054 for t_binary in bin_triggers: 

1055 # broken is a frozenset{BinaryPackageId, ..} 

1056 broken = pkg_universe.negative_dependencies_of( 

1057 cast(BinaryPackageId, t_binary) 

1058 ) 

1059 broken_in_target = { 

1060 p.package_name 

1061 for p in target_suite.which_of_these_are_in_the_suite(broken) 

1062 } 

1063 broken_in_source = { 

1064 p.package_name 

1065 for p in source_suite.which_of_these_are_in_the_suite(broken) 

1066 } 

1067 # We want packages with a newer version in the source suite that 

1068 # no longer has the conflict. This is an approximation 

1069 broken_filtered = { 

1070 p 

1071 for p in broken 

1072 if p.package_name in broken_in_target 

1073 and p.package_name not in broken_in_source 

1074 } 

1075 # We add the version in the target suite, but the code below will 

1076 # change it to the version in the source suite 

1077 bin_broken.update(broken_filtered) 

1078 bin_triggers.update(bin_broken) 

1079 

1080 # The ImplicitDependencyPolicy also found packages that need 

1081 # to migrate together, so add them to the triggers too. 

1082 for bin_implicit in excuse.depends_packages_flattened: 

1083 if bin_implicit.architecture == arch: 

1084 bin_triggers.add(bin_implicit) 

1085 

1086 triggers = set() 

1087 for t_binary2 in bin_triggers: 

1088 if t_binary2.architecture == arch: 

1089 try: 

1090 source_of_bin = packages_s_a[t_binary2.package_name].source 

1091 # If the version in the target suite is the same, don't add a trigger. 

1092 # Note that we looked up the source package in the source suite. 

1093 # If it were a different source package in the target suite, however, then 

1094 # we would not have this source package in the same version anyway. 

1095 # 

1096 # binNMU's exist, so let's also check if t_binary2 exists 

1097 # in the target suite if the sources are the same. 

1098 if ( 

1099 sources_t.get(source_of_bin, None) is None 

1100 or sources_s[source_of_bin].version 

1101 != sources_t[source_of_bin].version 

1102 or not target_suite.any_of_these_are_in_the_suite( 

1103 {cast(BinaryPackageId, t_binary2)} 

1104 ) 

1105 ): 

1106 triggers.add( 

1107 source_of_bin + "/" + sources_s[source_of_bin].version 

1108 ) 

1109 except KeyError: 

1110 # Apparently the package was removed from 

1111 # unstable e.g. if packages are replaced 

1112 # (e.g. -dbg to -dbgsym) 

1113 pass 

1114 if t_binary2 not in source_data_srcdist.binaries: 

1115 for tdep_src in self.testsuite_triggers.get( 

1116 t_binary2.package_name, set() 

1117 ): 

1118 try: 

1119 # Only add trigger if versions in the target and source suites are different 

1120 if ( 1120 ↛ 1115line 1120 didn't jump to line 1115

1121 sources_t.get(tdep_src, None) is None 

1122 or sources_s[tdep_src].version 

1123 != sources_t[tdep_src].version 

1124 ): 

1125 triggers.add( 

1126 tdep_src + "/" + sources_s[tdep_src].version 

1127 ) 

1128 except KeyError: 

1129 # Apparently the source was removed from 

1130 # unstable (testsuite_triggers are unified 

1131 # over all suites) 

1132 pass 

1133 source_trigger = source_name + "/" + source_version 

1134 triggers.discard(source_trigger) 

1135 triggers_list = sorted(list(triggers)) 

1136 triggers_list.insert(0, trigger) 

1137 

1138 for testsrc, testver in tests: 

1139 self.pkg_test_request(testsrc, arch, triggers_list, huge=is_huge) 

1140 (result, real_ver, run_id, url) = self.pkg_test_result( 

1141 testsrc, testver, arch, trigger 

1142 ) 

1143 pkg_arch_result[(testsrc, real_ver)][arch] = (result, run_id, url) 

1144 

1145 def tests_for_source( 

1146 self, src: str, ver: str, arch: str, excuse: "Excuse" 

1147 ) -> list[tuple[str, str]]: 

1148 """Iterate over all tests that should be run for given source and arch""" 

1149 

1150 source_suite = self.suite_info.primary_source_suite 

1151 target_suite = self.suite_info.target_suite 

1152 sources_info = target_suite.sources 

1153 binaries_info = target_suite.binaries[arch] 

1154 

1155 reported_pkgs = set() 

1156 

1157 tests = [] 

1158 

1159 # Debian doesn't have linux-meta, but Ubuntu does 

1160 # for linux themselves we don't want to trigger tests -- these should 

1161 # all come from linux-meta*. A new kernel ABI without a corresponding 

1162 # -meta won't be installed and thus we can't sensibly run tests against 

1163 # it. 

1164 if ( 1164 ↛ 1168line 1164 didn't jump to line 1168

1165 src.startswith("linux") 

1166 and src.replace("linux", "linux-meta") in sources_info 

1167 ): 

1168 return [] 

1169 

1170 # we want to test the package itself, if it still has a test in unstable 

1171 # but only if the package actually exists on this arch 

1172 srcinfo = source_suite.sources[src] 

1173 if ("autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo)) and len( 

1174 excuse.packages[arch] 

1175 ) > 0: 

1176 reported_pkgs.add(src) 

1177 tests.append((src, ver)) 

1178 

1179 extra_bins = [] 

1180 # Debian doesn't have linux-meta, but Ubuntu does 

1181 # Hack: For new kernels trigger all DKMS packages by pretending that 

1182 # linux-meta* builds a "dkms" binary as well. With that we ensure that we 

1183 # don't regress DKMS drivers with new kernel versions. 

1184 if src.startswith("linux-meta"): 

1185 # does this have any image on this arch? 

1186 for pkg_id in srcinfo.binaries: 

1187 if pkg_id.architecture == arch and "-image" in pkg_id.package_name: 

1188 try: 

1189 extra_bins.append(binaries_info["dkms"].pkg_id) 

1190 except KeyError: 

1191 pass 

1192 

1193 if not self.has_built_on_this_arch_or_is_arch_all(srcinfo, arch): 

1194 return [] 

1195 

1196 pkg_universe = self.britney.pkg_universe 

1197 # plus all direct reverse dependencies and test triggers of its 

1198 # binaries which have an autopkgtest 

1199 for binary in itertools.chain(srcinfo.binaries, extra_bins): 

1200 rdeps = filter_out_faux(pkg_universe.reverse_dependencies_of(binary)) 

1201 for rdep in rdeps: 

1202 try: 

1203 rdep_src = binaries_info[rdep.package_name].source 

1204 # Don't re-trigger the package itself here; this should 

1205 # have been done above if the package still continues to 

1206 # have an autopkgtest in unstable. 

1207 if rdep_src == src: 

1208 continue 

1209 except KeyError: 

1210 continue 

1211 

1212 rdep_src_info = sources_info[rdep_src] 

1213 if "autopkgtest" in rdep_src_info.testsuite or self.has_autodep8( 

1214 rdep_src_info 

1215 ): 

1216 if rdep_src not in reported_pkgs: 

1217 tests.append((rdep_src, rdep_src_info.version)) 

1218 reported_pkgs.add(rdep_src) 

1219 

1220 for tdep_src in self.testsuite_triggers.get(binary.package_name, set()): 

1221 if tdep_src not in reported_pkgs: 

1222 try: 

1223 tdep_src_info = sources_info[tdep_src] 

1224 except KeyError: 

1225 continue 

1226 if "autopkgtest" in tdep_src_info.testsuite or self.has_autodep8( 1226 ↛ 1220line 1226 didn't jump to line 1220 because the condition on line 1226 was always true

1227 tdep_src_info 

1228 ): 

1229 for pkg_id in tdep_src_info.binaries: 1229 ↛ 1220line 1229 didn't jump to line 1220 because the loop on line 1229 didn't complete

1230 if pkg_id.architecture == arch: 

1231 tests.append((tdep_src, tdep_src_info.version)) 

1232 reported_pkgs.add(tdep_src) 

1233 break 

1234 

1235 tests.sort(key=lambda s_v: s_v[0]) 

1236 return tests 

1237 

1238 def read_pending_tests(self) -> None: 

1239 """Read pending test requests from previous britney runs 

1240 

1241 Initialize self.pending_tests with that data. 

1242 """ 

1243 assert self.pending_tests is None, "already initialized" 

1244 if not os.path.exists(self.pending_tests_file): 

1245 self.logger.info( 

1246 "No %s, starting with no pending tests", self.pending_tests_file 

1247 ) 

1248 self.pending_tests = {} 

1249 return 

1250 with open(self.pending_tests_file) as f: 

1251 self.pending_tests = json.load(f) 

1252 if VERSION_KEY in self.pending_tests: 

1253 del self.pending_tests[VERSION_KEY] 

1254 for trigger in list(self.pending_tests.keys()): 

1255 for pkg in list(self.pending_tests[trigger].keys()): 

1256 arch_dict = self.pending_tests[trigger][pkg] 

1257 for arch in list(arch_dict.keys()): 

1258 if ( 

1259 self._now - arch_dict[arch] 

1260 > self.options.adt_pending_max_age 

1261 ): 

1262 del arch_dict[arch] 

1263 if not arch_dict: 

1264 del self.pending_tests[trigger][pkg] 

1265 if not self.pending_tests[trigger]: 

1266 del self.pending_tests[trigger] 

1267 else: 

1268 # Migration code: 

1269 for trigger_data in self.pending_tests.values(): 1269 ↛ 1270line 1269 didn't jump to line 1270 because the loop on line 1269 never started

1270 for pkg, arch_list in trigger_data.items(): 

1271 trigger_data[pkg] = {} 

1272 for arch in arch_list: 

1273 trigger_data[pkg][arch] = self._now 

1274 

1275 self.logger.info( 

1276 "Read pending requested tests from %s", self.pending_tests_file 

1277 ) 

1278 self.logger.debug("%s", self.pending_tests) 

1279 

1280 # this requires iterating over all triggers and thus is expensive; 

1281 # cache the results 

1282 @lru_cache(None) 

1283 def latest_run_for_package(self, src: str, arch: str) -> str: 

1284 """Return latest run ID for src on arch""" 

1285 

1286 latest_run_id = "" 

1287 for srcmap in self.test_results.values(): 

1288 try: 

1289 run_id = srcmap[src][arch][2] 

1290 except KeyError: 

1291 continue 

1292 if run_id > latest_run_id: 

1293 latest_run_id = run_id 

1294 return latest_run_id 

1295 

1296 def urlopen_retry(self, url: str) -> http.client.HTTPResponse | addinfourl: 

1297 """A urlopen() that retries on time outs or errors""" 

1298 

1299 exc: Exception 

1300 for retry in range(5): 1300 ↛ 1321line 1300 didn't jump to line 1321 because the loop on line 1300 didn't complete

1301 try: 

1302 req = urlopen(url, timeout=30) 

1303 code = req.getcode() 

1304 if not code or 200 <= code < 300: 1304 ↛ 1300line 1304 didn't jump to line 1300 because the condition on line 1304 was always true

1305 return req # type: ignore[no-any-return] 

1306 except TimeoutError as e: 1306 ↛ 1307line 1306 didn't jump to line 1307 because the exception caught by line 1306 didn't happen

1307 self.logger.info( 

1308 "Timeout downloading '%s', will retry %d more times." 

1309 % (url, 5 - retry - 1) 

1310 ) 

1311 exc = e 

1312 except HTTPError as e: 

1313 if e.code not in (503, 502): 1313 ↛ 1315line 1313 didn't jump to line 1315 because the condition on line 1313 was always true

1314 raise 

1315 self.logger.info( 

1316 "Caught error %d downloading '%s', will retry %d more times." 

1317 % (e.code, url, 5 - retry - 1) 

1318 ) 

1319 exc = e 

1320 else: 

1321 raise exc 

1322 

1323 @lru_cache(None) 

1324 def fetch_swift_results(self, swift_url: str, src: str, arch: str) -> None: 

1325 """Download new results for source package/arch from swift""" 

1326 

1327 # prepare query: get all runs with a timestamp later than the latest 

1328 # run_id for this package/arch; '@' is at the end of each run id, to 

1329 # mark the end of a test run directory path 

1330 # example: <autopkgtest-wily>wily/amd64/libp/libpng/20150630_054517@/result.tar 

1331 query = { 

1332 "delimiter": "@", 

1333 "prefix": f"{self.options.series}/{arch}/{srchash(src)}/{src}/", 

1334 } 

1335 

1336 # determine latest run_id from results 

1337 if not self.options.adt_shared_results_cache: 

1338 latest_run_id = self.latest_run_for_package(src, arch) 

1339 if latest_run_id: 

1340 query["marker"] = query["prefix"] + latest_run_id 

1341 

1342 # request new results from swift 

1343 url = os.path.join(swift_url, self.swift_container) 

1344 url += "?" + urllib.parse.urlencode(query) 

1345 f = None 

1346 try: 

1347 f = self.urlopen_retry(url) 

1348 if f.getcode() == 200: 

1349 result_paths = f.read().decode().strip().splitlines() 

1350 elif f.getcode() == 204: # No content 1350 ↛ 1356line 1350 didn't jump to line 1356 because the condition on line 1350 was always true

1351 result_paths = [] 

1352 else: 

1353 # we should not ever end up here as we expect a HTTPError in 

1354 # other cases; e. g. 3XX is something that tells us to adjust 

1355 # our URLS, so fail hard on those 

1356 raise NotImplementedError( 

1357 "fetch_swift_results(%s): cannot handle HTTP code %r" 

1358 % (url, f.getcode()) 

1359 ) 

1360 except OSError as e: 

1361 # 401 "Unauthorized" is swift's way of saying "container does not exist" 

1362 if getattr(e, "code", -1) == 401: 1362 ↛ 1371line 1362 didn't jump to line 1371 because the condition on line 1362 was always true

1363 self.logger.info( 

1364 "fetch_swift_results: %s does not exist yet or is inaccessible", url 

1365 ) 

1366 return 

1367 # Other status codes are usually a transient 

1368 # network/infrastructure failure. Ignoring this can lead to 

1369 # re-requesting tests which we already have results for, so 

1370 # fail hard on this and let the next run retry. 

1371 self.logger.error("Failure to fetch swift results from %s: %s", url, str(e)) 

1372 sys.exit(1) 

1373 finally: 

1374 if f is not None: 1374 ↛ 1377line 1374 didn't jump to line 1377 because the condition on line 1374 was always true

1375 f.close() 1375 ↛ exitline 1375 didn't return from function 'fetch_swift_results' because the return on line 1366 wasn't executed

1376 

1377 for p in result_paths: 

1378 self.fetch_one_result( 

1379 os.path.join(swift_url, self.swift_container, p, "result.tar"), 

1380 src, 

1381 arch, 

1382 ) 

1383 

1384 def fetch_one_result(self, url: str, src: str, arch: str) -> None: 

1385 """Download one result URL for source/arch 

1386 

1387 Remove matching pending_tests entries. 

1388 """ 

1389 f = None 

1390 try: 

1391 f = self.urlopen_retry(url) 

1392 if f.getcode() == 200: 1392 ↛ 1395line 1392 didn't jump to line 1395 because the condition on line 1392 was always true

1393 tar_bytes = io.BytesIO(f.read()) 

1394 else: 

1395 raise NotImplementedError( 

1396 "fetch_one_result(%s): cannot handle HTTP code %r" 

1397 % (url, f.getcode()) 

1398 ) 

1399 except OSError as err: 

1400 self.logger.error("Failure to fetch %s: %s", url, str(err)) 

1401 # we tolerate "not found" (something went wrong on uploading the 

1402 # result), but other things indicate infrastructure problems 

1403 if getattr(err, "code", -1) == 404: 

1404 return 

1405 sys.exit(1) 

1406 finally: 

1407 if f is not None: 1407 ↛ exit,   1407 ↛ 14092 missed branches: 1) line 1407 didn't return from function 'fetch_one_result' because the return on line 1404 wasn't executed, 2) line 1407 didn't jump to line 1409 because the condition on line 1407 was always true

1408 f.close() 1408 ↛ exitline 1408 didn't return from function 'fetch_one_result' because the return on line 1404 wasn't executed

1409 try: 

1410 with tarfile.open(None, "r", tar_bytes) as tar: 

1411 exitcode = int(tar.extractfile("exitcode").read().strip()) # type: ignore[union-attr] 

1412 srcver = tar.extractfile("testpkg-version").read().decode().strip() # type: ignore[union-attr] 

1413 (ressrc, ver) = srcver.split() 

1414 testinfo = json.loads(tar.extractfile("testinfo.json").read().decode()) # type: ignore[union-attr] 

1415 except (KeyError, ValueError, tarfile.TarError) as err: 

1416 self.logger.error("%s is damaged, ignoring: %s", url, str(err)) 

1417 # ignore this; this will leave an orphaned request in autopkgtest-pending.json 

1418 # and thus require manual retries after fixing the tmpfail, but we 

1419 # can't just blindly attribute it to some pending test. 

1420 return 

1421 

1422 if src != ressrc: 1422 ↛ 1423line 1422 didn't jump to line 1423 because the condition on line 1422 was never true

1423 self.logger.error( 

1424 "%s is a result for package %s, but expected package %s", 

1425 url, 

1426 ressrc, 

1427 src, 

1428 ) 

1429 return 

1430 

1431 # parse recorded triggers in test result 

1432 for e in testinfo.get("custom_environment", []): 1432 ↛ 1437line 1432 didn't jump to line 1437 because the loop on line 1432 didn't complete

1433 if e.startswith("ADT_TEST_TRIGGERS="): 1433 ↛ 1432line 1433 didn't jump to line 1432 because the condition on line 1433 was always true

1434 result_triggers = [i for i in e.split("=", 1)[1].split() if "/" in i] 

1435 break 

1436 else: 

1437 self.logger.error("%s result has no ADT_TEST_TRIGGERS, ignoring") 

1438 return 

1439 

1440 run_id = os.path.basename(os.path.dirname(url)) 

1441 seen = round(calendar.timegm(time.strptime(run_id, "%Y%m%d_%H%M%S@"))) 

1442 # allow some skipped tests, but nothing else 

1443 if exitcode in [0, 2]: 

1444 result = Result.PASS 

1445 elif exitcode == 8: 1445 ↛ 1446line 1445 didn't jump to line 1446 because the condition on line 1445 was never true

1446 result = Result.NEUTRAL 

1447 else: 

1448 result = Result.FAIL 

1449 

1450 self.logger.info( 

1451 "Fetched test result for %s/%s/%s %s (triggers: %s): %s", 

1452 src, 

1453 ver, 

1454 arch, 

1455 run_id, 

1456 result_triggers, 

1457 result.name.lower(), 

1458 ) 

1459 

1460 # remove matching test requests 

1461 for trigger in result_triggers: 

1462 self.remove_from_pending(trigger, src, arch) 

1463 

1464 # add this result 

1465 for trigger in result_triggers: 

1466 self.add_trigger_to_results(trigger, src, ver, arch, run_id, seen, result) 

1467 

1468 def remove_from_pending( 

1469 self, trigger: str, src: str, arch: str, timestamp: int = sys.maxsize 

1470 ) -> None: 

1471 assert self.pending_tests is not None # for type checking 

1472 try: 

1473 arch_dict = self.pending_tests[trigger][src] 

1474 if timestamp < arch_dict[arch]: 

1475 # The result is from before the moment of scheduling, so it's 

1476 # not the one we're waiting for 

1477 return 

1478 del arch_dict[arch] 

1479 if not arch_dict: 

1480 del self.pending_tests[trigger][src] 

1481 if not self.pending_tests[trigger]: 

1482 del self.pending_tests[trigger] 

1483 self.logger.debug( 

1484 "-> matches pending request %s/%s for trigger %s", src, arch, trigger 

1485 ) 

1486 except KeyError: 

1487 self.logger.debug( 

1488 "-> does not match any pending request for %s/%s", src, arch 

1489 ) 

1490 

1491 def add_trigger_to_results( 

1492 self, 

1493 trigger: str, 

1494 src: str, 

1495 ver: str, 

1496 arch: str, 

1497 run_id: str, 

1498 timestamp: int, 

1499 status_to_add: Result, 

1500 ) -> None: 

1501 # Ensure that we got a new enough version 

1502 parts = trigger.split("/") 

1503 match len(parts): 

1504 case 2: 

1505 trigsrc, trigver = parts 

1506 case 4: 1506 ↛ 1508line 1506 didn't jump to line 1508 because the pattern on line 1506 always matched

1507 trigsrc, trigarch, trigver, rebuild = parts 

1508 case _: 

1509 self.logger.info("Ignoring invalid test trigger %s", trigger) 

1510 return 

1511 if trigsrc == src and apt_pkg.version_compare(ver, trigver) < 0: 1511 ↛ 1512line 1511 didn't jump to line 1512 because the condition on line 1511 was never true

1512 self.logger.debug( 

1513 "test trigger %s, but run for older version %s, ignoring", trigger, ver 

1514 ) 

1515 return 

1516 

1517 stored_result = ( 

1518 self.test_results.setdefault(trigger, {}) 

1519 .setdefault(src, {}) 

1520 .setdefault(arch, [Result.FAIL, None, "", 0]) 

1521 ) 

1522 

1523 # reruns shouldn't flip the result from PASS or NEUTRAL to 

1524 # FAIL, so remember the most recent version of the best result 

1525 # we've seen. Except for reference updates, which we always 

1526 # want to update with the most recent result. The result data 

1527 # may not be ordered by timestamp, so we need to check time. 

1528 update = False 

1529 if self.options.adt_baseline == "reference" and trigger == REF_TRIG: 

1530 if stored_result[3] < timestamp: 

1531 update = True 

1532 elif status_to_add < stored_result[0]: 

1533 update = True 

1534 elif status_to_add == stored_result[0] and stored_result[3] < timestamp: 

1535 update = True 

1536 

1537 if update: 

1538 stored_result[0] = status_to_add 

1539 stored_result[1] = ver 

1540 stored_result[2] = run_id 

1541 stored_result[3] = timestamp 

1542 

1543 def send_test_request( 

1544 self, src: str, arch: str, triggers: list[str], huge: bool = False 

1545 ) -> None: 

1546 """Send out AMQP request for testing src/arch for triggers 

1547 

1548 If huge is true, then the request will be put into the -huge instead of 

1549 normal queue. 

1550 """ 

1551 if self.options.dry_run: 1551 ↛ 1552line 1551 didn't jump to line 1552 because the condition on line 1551 was never true

1552 return 

1553 

1554 params: dict[str, Any] = {"triggers": triggers} 

1555 if self.options.adt_ppas: 

1556 params["ppas"] = self.options.adt_ppas 

1557 qname = f"debci-ppa-{self.options.series}-{arch}" 

1558 elif huge: 

1559 qname = f"debci-huge-{self.options.series}-{arch}" 

1560 else: 

1561 qname = f"debci-{self.options.series}-{arch}" 

1562 params["submit-time"] = time.strftime("%Y-%m-%d %H:%M:%S%z", time.gmtime()) 

1563 

1564 if self.amqp_channel: 1564 ↛ 1565line 1564 didn't jump to line 1565 because the condition on line 1564 was never true

1565 self.amqp_channel.basic_publish( 

1566 amqp.Message( 

1567 src + "\n" + json.dumps(params), delivery_mode=2 

1568 ), # persistent 

1569 routing_key=qname, 

1570 ) 

1571 # we save pending.json with every request, so that if britney 

1572 # crashes we don't re-request tests. This is only needed when using 

1573 # real amqp, as with file-based submission the pending tests are 

1574 # returned by debci along with the results each run. 

1575 self.save_pending_json() 

1576 else: 

1577 # for file-based submission, triggers are space separated 

1578 params["triggers"] = [" ".join(params["triggers"])] 

1579 assert self.amqp_file_handle 

1580 self.amqp_file_handle.write(f"{qname}:{src} {json.dumps(params)}\n") 

1581 

1582 def pkg_test_request( 

1583 self, src: str, arch: str, all_triggers: list[str], huge: bool = False 

1584 ) -> None: 

1585 """Request one package test for a set of triggers 

1586 

1587 all_triggers is a list of "pkgname/version". These are the packages 

1588 that will be taken from the source suite. The first package in this 

1589 list is the package that triggers the testing of src, the rest are 

1590 additional packages required for installability of the test deps. If 

1591 huge is true, then the request will be put into the -huge instead of 

1592 normal queue. 

1593 

1594 This will only be done if that test wasn't already requested in 

1595 a previous run (i. e. if it's not already in self.pending_tests) 

1596 or if there is already a fresh or a positive result for it. This 

1597 ensures to download current results for this package before 

1598 requesting any test.""" 

1599 trigger = all_triggers[0] 

1600 uses_swift = not self.options.adt_swift_url.startswith("file://") 

1601 try: 

1602 result = self.test_results[trigger][src][arch] 

1603 has_result = True 

1604 except KeyError: 

1605 has_result = False 

1606 

1607 if has_result: 

1608 result_state = result[0] 

1609 if result_state in {Result.OLD_PASS, Result.OLD_FAIL, Result.OLD_NEUTRAL}: 

1610 pass 

1611 elif ( 

1612 result_state == Result.FAIL 

1613 and self.result_in_baseline(src, arch)[0] 

1614 in {Result.PASS, Result.NEUTRAL, Result.OLD_PASS, Result.OLD_NEUTRAL} 

1615 and self._now - result[3] > self.options.adt_retry_older_than 

1616 ): 

1617 # We might want to retry this failure, so continue 

1618 pass 

1619 elif not uses_swift: 

1620 # We're done if we don't retrigger and we're not using swift 

1621 return 

1622 elif result_state in {Result.PASS, Result.NEUTRAL}: 

1623 self.logger.debug( 

1624 "%s/%s triggered by %s already known", src, arch, trigger 

1625 ) 

1626 return 

1627 

1628 # Without swift we don't expect new results 

1629 if uses_swift: 

1630 self.logger.info( 

1631 "Checking for new results for failed %s/%s for trigger %s", 

1632 src, 

1633 arch, 

1634 trigger, 

1635 ) 

1636 self.fetch_swift_results(self.options.adt_swift_url, src, arch) 

1637 # do we have one now? 

1638 try: 

1639 self.test_results[trigger][src][arch] 

1640 return 

1641 except KeyError: 

1642 pass 

1643 

1644 self.request_test_if_not_queued(src, arch, trigger, all_triggers, huge=huge) 

1645 

1646 def request_test_if_not_queued( 

1647 self, 

1648 src: str, 

1649 arch: str, 

1650 trigger: str, 

1651 all_triggers: list[str] = [], 

1652 huge: bool = False, 

1653 ) -> None: 

1654 assert self.pending_tests is not None # for type checking 

1655 if not all_triggers: 

1656 all_triggers = [trigger] 

1657 

1658 # Don't re-request if it's already pending 

1659 arch_dict = self.pending_tests.setdefault(trigger, {}).setdefault(src, {}) 

1660 if arch in arch_dict.keys(): 

1661 self.logger.debug( 

1662 "Test %s/%s for %s is already pending, not queueing", src, arch, trigger 

1663 ) 

1664 else: 

1665 self.logger.debug( 

1666 "Requesting %s autopkgtest on %s to verify %s", src, arch, trigger 

1667 ) 

1668 arch_dict[arch] = self._now 

1669 self.send_test_request(src, arch, all_triggers, huge=huge) 

1670 

1671 def result_in_baseline(self, src: str, arch: str) -> list[Any]: 

1672 """Get the result for src on arch in the baseline 

1673 

1674 The baseline is optionally all data or a reference set) 

1675 """ 

1676 

1677 # this requires iterating over all cached results and thus is expensive; 

1678 # cache the results 

1679 try: 

1680 return self.result_in_baseline_cache[src][arch] 

1681 except KeyError: 

1682 pass 

1683 

1684 result_reference: list[Any] = [Result.NONE, None, "", 0] 

1685 if self.options.adt_baseline == "reference": 

1686 if src not in self.suite_info.target_suite.sources: 1686 ↛ 1687line 1686 didn't jump to line 1687 because the condition on line 1686 was never true

1687 return result_reference 

1688 

1689 try: 

1690 result_reference = self.test_results[REF_TRIG][src][arch] 

1691 self.logger.debug( 

1692 "Found result for src %s in reference: %s", 

1693 src, 

1694 result_reference[0].name, 

1695 ) 

1696 except KeyError: 

1697 self.logger.debug( 

1698 "Found NO result for src %s in reference: %s", 

1699 src, 

1700 result_reference[0].name, 

1701 ) 

1702 self.result_in_baseline_cache[src][arch] = deepcopy(result_reference) 

1703 return result_reference 

1704 

1705 result_ever: list[Any] = [Result.FAIL, None, "", 0] 

1706 for srcmap in self.test_results.values(): 

1707 try: 

1708 if srcmap[src][arch][0] != Result.FAIL: 

1709 result_ever = srcmap[src][arch] 

1710 # If we are not looking at a reference run, We don't really 

1711 # care about anything except the status, so we're done 

1712 # once we find a PASS. 

1713 if result_ever[0] == Result.PASS: 

1714 break 

1715 except KeyError: 

1716 pass 

1717 

1718 self.result_in_baseline_cache[src][arch] = deepcopy(result_ever) 

1719 self.logger.debug("Result for src %s ever: %s", src, result_ever[0].name) 

1720 return result_ever 

1721 

1722 def has_test_in_target(self, src: str) -> bool: 

1723 test_in_target = False 

1724 try: 

1725 srcinfo = self.suite_info.target_suite.sources[src] 

1726 if "autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo): 

1727 test_in_target = True 

1728 # AttributeError is only needed for the test suite as 

1729 # srcinfo can be a NoneType 

1730 except (KeyError, AttributeError): 

1731 pass 

1732 

1733 return test_in_target 

1734 

1735 def pkg_test_result( 

1736 self, src: str, ver: str, arch: str, trigger: str 

1737 ) -> tuple[str, str, str | None, str]: 

1738 """Get current test status of a particular package 

1739 

1740 Return (status, real_version, run_id, log_url) tuple; status is a key in 

1741 EXCUSES_LABELS. run_id is None if the test is still running. 

1742 """ 

1743 assert self.pending_tests is not None # for type checking 

1744 # determine current test result status 

1745 run_id = None 

1746 try: 

1747 r = self.test_results[trigger][src][arch] 

1748 ver = r[1] 

1749 run_id = r[2] 

1750 

1751 if r[0] in {Result.FAIL, Result.OLD_FAIL}: 

1752 # determine current test result status 

1753 baseline_result = self.result_in_baseline(src, arch)[0] 

1754 

1755 # Special-case triggers from linux-meta*: we cannot compare 

1756 # results against different kernels, as e. g. a DKMS module 

1757 # might work against the default kernel but fail against a 

1758 # different flavor; so for those, ignore the "ever 

1759 # passed" check; FIXME: check against trigsrc only 

1760 if self.options.adt_baseline != "reference" and ( 

1761 trigger.startswith("linux-meta") or trigger.startswith("linux/") 

1762 ): 

1763 baseline_result = Result.FAIL 

1764 

1765 # Check if the autopkgtest (still) exists in the target suite 

1766 test_in_target = self.has_test_in_target(src) 

1767 

1768 if test_in_target and baseline_result in { 

1769 Result.NONE, 

1770 Result.OLD_FAIL, 

1771 Result.OLD_NEUTRAL, 

1772 Result.OLD_PASS, 

1773 }: 

1774 self.request_test_if_not_queued(src, arch, REF_TRIG) 

1775 

1776 if self.has_force_badtest(src, ver, arch): 

1777 result = "IGNORE-FAIL" 

1778 elif not test_in_target: 

1779 if self.options.adt_ignore_failure_for_new_tests: 

1780 result = "IGNORE-FAIL" 

1781 else: 

1782 result = r[0].name 

1783 elif baseline_result in {Result.FAIL, Result.OLD_FAIL}: 

1784 result = "ALWAYSFAIL" 

1785 elif baseline_result == Result.NONE: 1785 ↛ 1786line 1785 didn't jump to line 1786 because the condition on line 1785 was never true

1786 result = "RUNNING-REFERENCE" 

1787 else: 

1788 result = "REGRESSION" 

1789 

1790 else: 

1791 result = r[0].name 

1792 

1793 url = self.format_log_url(src, arch, run_id) 

1794 except KeyError: 

1795 # no result for src/arch; still running? 

1796 assert arch in self.pending_tests.get(trigger, {}).get(src, {}).keys(), ( 

1797 "Result for %s/%s/%s (triggered by %s) is neither known nor pending!" 

1798 % (src, ver, arch, trigger) 

1799 ) 

1800 

1801 if self.has_force_badtest(src, ver, arch): 

1802 result = "RUNNING-IGNORE" 

1803 else: 

1804 if self.has_test_in_target(src): 

1805 baseline_result = self.result_in_baseline(src, arch)[0] 

1806 if baseline_result == Result.FAIL: 

1807 result = "RUNNING-ALWAYSFAIL" 

1808 else: 

1809 result = "RUNNING" 

1810 else: 

1811 if self.options.adt_ignore_failure_for_new_tests: 

1812 result = "RUNNING-IGNORE" 

1813 else: 

1814 result = "RUNNING" 

1815 url = self.options.adt_ci_url + "status/pending" 

1816 

1817 return (result, ver, run_id, url) 

1818 

1819 def has_force_badtest(self, src: str, ver: str, arch: str) -> bool: 

1820 """Check if src/ver/arch has a force-badtest hint""" 

1821 

1822 assert self.hints is not None 

1823 hints = self.hints.search("force-badtest", package=src) 

1824 if hints: 

1825 self.logger.info( 

1826 "Checking hints for %s/%s/%s: %s", 

1827 src, 

1828 arch, 

1829 ver, 

1830 [str(h) for h in hints], 

1831 ) 

1832 for hint in hints: 

1833 if [ 

1834 mi 

1835 for mi in hint.packages 

1836 if mi.architecture in ["source", arch] 

1837 and ( 

1838 mi.version is None 

1839 or mi.version == "all" # Historical unversioned hint 

1840 or apt_pkg.version_compare(ver, mi.version) <= 0 

1841 ) 

1842 ]: 

1843 return True 

1844 

1845 return False 

1846 

1847 def has_built_on_this_arch_or_is_arch_all( 

1848 self, src_data: SourcePackage, arch: str 

1849 ) -> bool: 

1850 """When a source builds arch:all binaries, those binaries are 

1851 added to all architectures and thus the source 'exists' 

1852 everywhere. This function checks if the source has any arch 

1853 specific binaries on this architecture and if not, if it 

1854 has them on any architecture. 

1855 """ 

1856 packages_s_a = self.suite_info.primary_source_suite.binaries[arch] 

1857 has_unknown_binary = False 

1858 for binary_s in filter_out_faux(src_data.binaries): 

1859 try: 

1860 binary_u = packages_s_a[binary_s.package_name] 

1861 except KeyError: 

1862 # src_data.binaries has all the built binaries, so if 

1863 # we get here, we know that at least one architecture 

1864 # has architecture specific binaries 

1865 has_unknown_binary = True 

1866 continue 

1867 if binary_u.architecture == arch: 

1868 return True 

1869 # If we get here, we have only seen arch:all packages for this 

1870 # arch. 

1871 return not has_unknown_binary