Coverage for britney2/policies/autopkgtest.py: 91%

856 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-04-19 18:02 +0000

1# Copyright (C) 2013 - 2016 Canonical Ltd. 

2# Authors: 

3# Colin Watson <cjwatson@ubuntu.com> 

4# Jean-Baptiste Lallement <jean-baptiste.lallement@canonical.com> 

5# Martin Pitt <martin.pitt@ubuntu.com> 

6 

7# This program is free software; you can redistribute it and/or modify 

8# it under the terms of the GNU General Public License as published by 

9# the Free Software Foundation; either version 2 of the License, or 

10# (at your option) any later version. 

11 

12# This program is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16 

17import calendar 

18import collections 

19import http.client 

20import io 

21import itertools 

22import json 

23import optparse 

24import os 

25import sys 

26import tarfile 

27import time 

28import urllib.parse 

29from collections.abc import Iterator 

30from copy import deepcopy 

31from enum import Enum 

32from functools import lru_cache, total_ordering 

33from typing import TYPE_CHECKING, Any, Optional, cast 

34from urllib.error import HTTPError 

35from urllib.request import urlopen 

36from urllib.response import addinfourl 

37 

38import apt_pkg 

39 

40from britney2 import ( 

41 BinaryPackageId, 

42 PackageId, 

43 SourcePackage, 

44 SuiteClass, 

45 Suites, 

46 TargetSuite, 

47) 

48from britney2.hints import HintAnnotate, HintType 

49from britney2.migrationitem import MigrationItem 

50from britney2.policies import PolicyVerdict 

51from britney2.policies.policy import AbstractBasePolicy 

52from britney2.utils import ( 

53 binaries_from_source_version, 

54 filter_out_faux, 

55 get_dependency_solvers, 

56 iter_except, 

57 parse_option, 

58) 

59 

60if TYPE_CHECKING: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true

61 import amqplib.client_0_8 as amqp 

62 

63 from ..britney import Britney 

64 from ..excuse import Excuse 

65 from ..hints import HintParser 

66 

67 

68@total_ordering 

69class Result(Enum): 

70 PASS = 1 

71 NEUTRAL = 2 

72 FAIL = 3 

73 OLD_PASS = 4 

74 OLD_NEUTRAL = 5 

75 OLD_FAIL = 6 

76 NONE = 7 

77 

78 def __lt__(self, other: "Result") -> bool: 

79 return True if self.value < other.value else False 

80 

81 

82EXCUSES_LABELS = { 

83 "PASS": '<span style="background:#87d96c">Pass</span>', 

84 "OLD_PASS": '<span style="background:#87d96c">Pass</span>', 

85 "NEUTRAL": "No tests, superficial or marked flaky", 

86 "OLD_NEUTRAL": "No tests, superficial or marked flaky", 

87 "FAIL": '<span style="background:#ff6666">Failed</span>', 

88 "OLD_FAIL": '<span style="background:#ff6666">Failed</span>', 

89 "ALWAYSFAIL": '<span style="background:#e5c545">Failed (not a regression)</span>', 

90 "REGRESSION": '<span style="background:#ff6666">Regression</span>', 

91 "IGNORE-FAIL": '<span style="background:#e5c545">Ignored failure</span>', 

92 "RUNNING": '<span style="background:#99ddff">Test triggered</span>', 

93 "RUNNING-ALWAYSFAIL": "Test triggered (will not be considered a regression)", 

94 "RUNNING-IGNORE": "Test triggered (failure will be ignored)", 

95 "RUNNING-REFERENCE": '<span style="background:#ff6666">Reference test triggered, but real test failed already</span>', 

96 "DEFERRED": '<span style="background:#99ddff">Test deferred</span>', 

97} 

98 

99REF_TRIG = "migration-reference/0" 

100 

101VERSION_KEY = "britney-autopkgtest-pending-file-version" 

102 

103 

104def srchash(src: str) -> str: 

105 """archive hash prefix for source package""" 

106 

107 if src.startswith("lib"): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 return src[:4] 

109 else: 

110 return src[0] 

111 

112 

113def added_pkgs_compared_to_target_suite( 

114 package_ids: frozenset[BinaryPackageId], 

115 target_suite: TargetSuite, 

116 *, 

117 invert: bool = False, 

118) -> Iterator[BinaryPackageId]: 

119 if invert: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true

120 pkgs_ids_to_ignore = package_ids - set( 

121 target_suite.which_of_these_are_in_the_suite(package_ids) 

122 ) 

123 names_ignored = {p.package_name for p in pkgs_ids_to_ignore} 

124 else: 

125 names_ignored = { 

126 p.package_name 

127 for p in target_suite.which_of_these_are_in_the_suite(package_ids) 

128 } 

129 yield from (p for p in package_ids if p.package_name not in names_ignored) 

130 

131 

132def all_leaf_results( 

133 test_results: dict[str, dict[str, dict[str, list[Any]]]], 

134) -> Iterator[list[Any]]: 

135 for trigger in test_results.values(): 

136 for arch in trigger.values(): 

137 yield from arch.values() 

138 

139 

140def mark_result_as_old(result: Result) -> Result: 

141 """Convert current result into corresponding old result""" 

142 

143 if result == Result.FAIL: 

144 result = Result.OLD_FAIL 

145 elif result == Result.PASS: 

146 result = Result.OLD_PASS 

147 elif result == Result.NEUTRAL: 147 ↛ 149line 147 didn't jump to line 149 because the condition on line 147 was always true

148 result = Result.OLD_NEUTRAL 

149 return result 

150 

151 

152def concat_bdeps(src_data: SourcePackage) -> str: 

153 """Concatenate build_deps_arch and build_deps_indep""" 

154 return ",".join( 

155 (src_data.build_deps_arch or "", src_data.build_deps_indep or "") 

156 ).strip(",") 

157 

158 

159class AutopkgtestPolicy(AbstractBasePolicy): 

160 """autopkgtest regression policy for source migrations 

161 

162 Run autopkgtests for the excuse and all of its reverse dependencies, and 

163 reject the upload if any of those regress. 

164 """ 

165 

166 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

167 super().__init__( 

168 "autopkgtest", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

169 ) 

170 # tests requested in this and previous runs 

171 # trigger -> src -> [arch] 

172 self.pending_tests: dict[str, dict[str, dict[str, int]]] | None = None 

173 self.pending_tests_file = os.path.join( 

174 self.state_dir, "autopkgtest-pending.json" 

175 ) 

176 self.testsuite_triggers: dict[str, set[str]] = {} 

177 self.result_in_baseline_cache: dict[str, dict[str, list[Any]]] = ( 

178 collections.defaultdict(dict) 

179 ) 

180 

181 self.amqp_file_handle: io.TextIOWrapper | None = None 

182 

183 # Default values for this policy's options 

184 parse_option(options, "adt_baseline") 

185 parse_option(options, "adt_huge", to_int=True) 

186 parse_option(options, "adt_ppas") 

187 parse_option(options, "adt_reference_max_age", day_to_sec=True) 

188 parse_option(options, "adt_pending_max_age", default=5, day_to_sec=True) 

189 parse_option(options, "adt_regression_penalty", default=0, to_int=True) 

190 parse_option(options, "adt_log_url") # see below for defaults 

191 parse_option(options, "adt_retry_url") # see below for defaults 

192 parse_option(options, "adt_retry_older_than", day_to_sec=True) 

193 parse_option(options, "adt_results_cache_age", day_to_sec=True) 

194 parse_option(options, "adt_shared_results_cache") 

195 parse_option(options, "adt_success_bounty", default=0, to_int=True) 

196 parse_option(options, "adt_ignore_failure_for_new_tests", to_bool=True) 

197 

198 # When ADT_RESULTS_CACHE_AGE is smaller than or equal to 

199 # ADT_REFERENCE_MAX_AGE old reference result will be removed from cache 

200 # before the newly scheduled results are in, potentially causing 

201 # additional waiting. For packages like glibc this might cause an 

202 # infinite delay as there will always be a package that's 

203 # waiting. Similarly for ADT_RETRY_OLDER_THAN. 

204 if self.options.adt_results_cache_age <= self.options.adt_reference_max_age: 

205 self.logger.warning( 

206 "Unexpected: ADT_REFERENCE_MAX_AGE bigger than ADT_RESULTS_CACHE_AGE" 

207 ) 

208 if self.options.adt_results_cache_age <= self.options.adt_retry_older_than: 

209 self.logger.warning( 

210 "Unexpected: ADT_RETRY_OLDER_THAN bigger than ADT_RESULTS_CACHE_AGE" 

211 ) 

212 

213 if not self.options.adt_log_url: 213 ↛ 239line 213 didn't jump to line 239 because the condition on line 213 was always true

214 # Historical defaults 

215 if self.options.adt_swift_url.startswith("file://"): 

216 self.options.adt_log_url = os.path.join( 

217 self.options.adt_ci_url, 

218 "data", 

219 "autopkgtest", 

220 self.options.series, 

221 "{arch}", 

222 "{hash}", 

223 "{package}", 

224 "{run_id}", 

225 "log.gz", 

226 ) 

227 else: 

228 self.options.adt_log_url = os.path.join( 

229 self.options.adt_swift_url, 

230 "{swift_container}", 

231 self.options.series, 

232 "{arch}", 

233 "{hash}", 

234 "{package}", 

235 "{run_id}", 

236 "log.gz", 

237 ) 

238 

239 if hasattr(self.options, "adt_retry_url_mech"): 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 self.logger.warning( 

241 "The ADT_RETRY_URL_MECH configuration has been deprecated." 

242 ) 

243 self.logger.warning( 

244 "Instead britney now supports ADT_RETRY_URL for more flexibility." 

245 ) 

246 if self.options.adt_retry_url: 

247 self.logger.error( 

248 "Please remove the ADT_RETRY_URL_MECH as ADT_RETRY_URL will be used." 

249 ) 

250 elif self.options.adt_retry_url_mech == "run_id": 

251 self.options.adt_retry_url = ( 

252 self.options.adt_ci_url + "api/v1/retry/{run_id}" 

253 ) 

254 if not self.options.adt_retry_url: 254 ↛ 271line 254 didn't jump to line 271 because the condition on line 254 was always true

255 # Historical default 

256 self.options.adt_retry_url = ( 

257 self.options.adt_ci_url 

258 + "request.cgi?" 

259 + "release={release}&arch={arch}&package={package}&trigger={trigger}{ppas}" 

260 ) 

261 

262 # results map: trigger -> src -> arch -> [passed, version, run_id, seen] 

263 # - trigger is "source/version" of an unstable package that triggered 

264 # this test run. 

265 # - "passed" is a Result 

266 # - "version" is the package version of "src" of that test 

267 # - "run_id" is an opaque ID that identifies a particular test run for 

268 # a given src/arch. 

269 # - "seen" is an approximate time stamp of the test run. How this is 

270 # deduced depends on the interface used. 

271 self.test_results: dict[str, dict[str, dict[str, list[Any]]]] = {} 

272 if self.options.adt_shared_results_cache: 

273 self.results_cache_file = self.options.adt_shared_results_cache 

274 else: 

275 self.results_cache_file = os.path.join( 

276 self.state_dir, "autopkgtest-results.cache" 

277 ) 

278 

279 try: 

280 self.options.adt_ppas = self.options.adt_ppas.strip().split() 

281 except AttributeError: 

282 self.options.adt_ppas = [] 

283 

284 self.swift_container = "autopkgtest-" + options.series 

285 if self.options.adt_ppas: 

286 self.swift_container += "-" + options.adt_ppas[-1].replace("/", "-") 

287 

288 # restrict adt_arches to architectures we actually run for 

289 self.adt_arches = [] 

290 for arch in self.options.adt_arches.split(): 

291 if arch in self.options.architectures: 

292 self.adt_arches.append(arch) 

293 else: 

294 self.logger.info( 

295 "Ignoring ADT_ARCHES %s as it is not in architectures list", arch 

296 ) 

297 

298 def __del__(self) -> None: 

299 if self.amqp_file_handle: 299 ↛ exitline 299 didn't return from function '__del__' because the condition on line 299 was always true

300 try: 

301 self.amqp_file_handle.close() 

302 except AttributeError: 

303 pass 

304 

305 def register_hints(self, hint_parser: "HintParser") -> None: 

306 hint_parser.register_hint_type( 

307 HintType( 

308 "force-badtest", 

309 versioned=HintAnnotate.OPTIONAL, 

310 architectured=HintAnnotate.OPTIONAL, 

311 ) 

312 ) 

313 hint_parser.register_hint_type(HintType("force-skiptest")) 

314 

315 def initialise(self, britney: "Britney") -> None: 

316 super().initialise(britney) 

317 # We want to use the "current" time stamp in multiple locations 

318 time_now = round(time.time()) 

319 if hasattr(self.options, "fake_runtime"): 

320 time_now = int(self.options.fake_runtime) 

321 self._now = time_now 

322 

323 # local copies for better performance 

324 parse_src_depends = apt_pkg.parse_src_depends 

325 

326 # compute inverse Testsuite-Triggers: map, unifying all series 

327 self.logger.info("Building inverse testsuite_triggers map") 

328 for suite in self.suite_info: 

329 for src, data in suite.sources.items(): 

330 # for now, let's assume that autodep8 uses builddeps (most do) 

331 if ( 

332 self.has_autodep8(data) 

333 and "@builddeps@" not in data.testsuite_triggers 

334 ): 

335 data.testsuite_triggers.append("@builddeps@") 

336 for trigger in data.testsuite_triggers: 

337 if trigger == "@builddeps@": 

338 for arch in self.adt_arches: 

339 for block in parse_src_depends( 

340 concat_bdeps(data), True, arch 

341 ): 

342 self.testsuite_triggers.setdefault( 

343 block[0][0], set() 

344 ).add(src) 

345 else: 

346 self.testsuite_triggers.setdefault(trigger, set()).add(src) 

347 target_suite_name = self.suite_info.target_suite.name 

348 

349 os.makedirs(self.state_dir, exist_ok=True) 

350 self.read_pending_tests() 

351 

352 # read the cached results that we collected so far 

353 if os.path.exists(self.results_cache_file): 

354 with open(self.results_cache_file) as f: 

355 test_results = json.load(f) 

356 self.test_results = self.check_and_upgrade_cache(test_results) 

357 self.logger.info("Read previous results from %s", self.results_cache_file) 

358 else: 

359 self.logger.info( 

360 "%s does not exist, re-downloading all results from swift", 

361 self.results_cache_file, 

362 ) 

363 

364 # read in the new results 

365 if self.options.adt_swift_url.startswith("file://"): 

366 debci_file = self.options.adt_swift_url[7:] 

367 if os.path.exists(debci_file): 

368 with open(debci_file) as f: 

369 test_results = json.load(f) 

370 self.logger.info("Read new results from %s", debci_file) 

371 for res in test_results["results"]: 

372 # if there's no date, the test didn't finish yet 

373 if res["date"] is None: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true

374 continue 

375 (test_suite, triggers, src, arch, ver, status, run_id, seen) = [ 

376 res["suite"], 

377 res["trigger"], 

378 res["package"], 

379 res["arch"], 

380 res["version"], 

381 res["status"], 

382 str(res["run_id"]), 

383 round( 

384 calendar.timegm( 

385 time.strptime(res["date"][0:-5], "%Y-%m-%dT%H:%M:%S") 

386 ) 

387 ), 

388 ] 

389 if test_suite != target_suite_name: 389 ↛ 391line 389 didn't jump to line 391 because the condition on line 389 was never true

390 # not requested for this target suite, so ignore 

391 continue 

392 if triggers is None: 392 ↛ 394line 392 didn't jump to line 394 because the condition on line 392 was never true

393 # not requested for this policy, so ignore 

394 continue 

395 if status is None: 

396 # still running => pending 

397 continue 

398 for trigger in triggers.split(): 

399 # remove matching test requests 

400 self.remove_from_pending(trigger, src, arch, seen) 

401 if status == "tmpfail": 401 ↛ 403line 401 didn't jump to line 403 because the condition on line 401 was never true

402 # let's see if we still need it 

403 continue 

404 self.logger.debug( 

405 "Results %s %s %s added", src, trigger, status 

406 ) 

407 self.add_trigger_to_results( 

408 trigger, 

409 src, 

410 ver, 

411 arch, 

412 run_id, 

413 seen, 

414 Result[status.upper()], 

415 ) 

416 else: 

417 self.logger.info( 

418 "%s does not exist, no new data will be processed", debci_file 

419 ) 

420 

421 # The cache can contain results against versions of packages that 

422 # are not in any suite anymore. Strip those out, as we don't want 

423 # to use those results. Additionally, old references may be 

424 # filtered out. 

425 if self.options.adt_baseline == "reference": 

426 self.filter_old_results() 

427 

428 # we need sources, binaries, and installability tester, so for now 

429 # remember the whole britney object 

430 self.britney = britney 

431 

432 # Initialize AMQP connection 

433 self.amqp_channel: Optional["amqp.channel.Channel"] = None 

434 self.amqp_file_handle = None 

435 if self.options.dry_run: 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true

436 return 

437 

438 amqp_url = self.options.adt_amqp 

439 

440 if amqp_url.startswith("amqp://"): 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true

441 import amqplib.client_0_8 as amqp 

442 

443 # depending on the setup we connect to a AMQP server 

444 creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False) 

445 self.amqp_con = amqp.Connection( 

446 creds.hostname, userid=creds.username, password=creds.password 

447 ) 

448 self.amqp_channel = self.amqp_con.channel() 

449 self.logger.info("Connected to AMQP server") 

450 elif amqp_url.startswith("file://"): 450 ↛ 455line 450 didn't jump to line 455 because the condition on line 450 was always true

451 # or in Debian and in testing mode, adt_amqp will be a file:// URL 

452 amqp_file = amqp_url[7:] 

453 self.amqp_file_handle = open(amqp_file, "w", 1) 

454 else: 

455 raise RuntimeError("Unknown ADT_AMQP schema %s" % amqp_url.split(":", 1)[0]) 

456 

457 def check_and_upgrade_cache( 

458 self, test_results: dict[str, dict[str, dict[str, list[Any]]]] 

459 ) -> dict[str, dict[str, dict[str, list[Any]]]]: 

460 for leaf_result in all_leaf_results(test_results): 

461 leaf_result[0] = Result[leaf_result[0]] 

462 

463 # Drop results older than ADT_RESULTS_CACHE_AGE 

464 for trigger in list(test_results.keys()): 

465 for pkg in list(test_results[trigger].keys()): 

466 for arch in list(test_results[trigger][pkg].keys()): 

467 arch_result = test_results[trigger][pkg][arch] 

468 if self._now - arch_result[3] > self.options.adt_results_cache_age: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true

469 del test_results[trigger][pkg][arch] 

470 if not test_results[trigger][pkg]: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true

471 del test_results[trigger][pkg] 

472 if not test_results[trigger]: 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true

473 del test_results[trigger] 

474 

475 return test_results 

476 

477 def filter_old_results(self) -> None: 

478 """Remove results for old versions and reference runs from the cache. 

479 

480 For now, only delete reference runs. If we delete regular 

481 results after a while, packages with lots of triggered tests may 

482 never have all the results at the same time.""" 

483 

484 test_results = self.test_results 

485 

486 for trigger, trigger_data in test_results.items(): 

487 for src, results in trigger_data.items(): 

488 for arch, result in results.items(): 

489 if ( 

490 trigger == REF_TRIG 

491 and self._now - result[3] > self.options.adt_reference_max_age 

492 ): 

493 result[0] = mark_result_as_old(result[0]) 

494 elif not self.test_version_in_any_suite(src, result[1]): 

495 result[0] = mark_result_as_old(result[0]) 

496 

497 def test_version_in_any_suite(self, src: str, version: str) -> bool: 

498 """Check if the mentioned version of src is found in a suite 

499 

500 To prevent regressions in the target suite, the result should be 

501 from a test with the version of the package in either the source 

502 suite or the target suite. The source suite is also valid, 

503 because due to versioned test dependencies and Breaks/Conflicts 

504 relations, regularly the version in the source suite is used 

505 during testing. 

506 """ 

507 

508 versions = set() 

509 for suite in self.suite_info: 

510 try: 

511 srcinfo = suite.sources[src] 

512 except KeyError: 

513 continue 

514 versions.add(srcinfo.version) 

515 

516 valid_version = False 

517 for ver in versions: 

518 if apt_pkg.version_compare(ver, version) == 0: 

519 valid_version = True 

520 break 

521 

522 return valid_version 

523 

524 def save_pending_json(self) -> None: 

525 # update the pending tests on-disk cache 

526 self.logger.info( 

527 "Updating pending requested tests in %s" % self.pending_tests_file 

528 ) 

529 # Shallow clone pending_tests as we only modify the toplevel and change its type. 

530 pending_tests: dict[str, Any] = {} 

531 if self.pending_tests: 

532 pending_tests = dict(self.pending_tests) 

533 # Avoid adding if there are no pending results at all (eases testing) 

534 pending_tests[VERSION_KEY] = 1 

535 with open(self.pending_tests_file + ".new", "w") as f: 

536 json.dump(pending_tests, f, indent=2) 

537 os.rename(self.pending_tests_file + ".new", self.pending_tests_file) 

538 

539 def save_state(self, britney: "Britney") -> None: 

540 super().save_state(britney) 

541 

542 # update the results on-disk cache, unless we are using a r/o shared one 

543 if not self.options.adt_shared_results_cache: 

544 self.logger.info("Updating results cache") 

545 test_results = deepcopy(self.test_results) 

546 for result in all_leaf_results(test_results): 

547 result[0] = result[0].name 

548 with open(self.results_cache_file + ".new", "w") as f: 

549 json.dump(test_results, f, indent=2) 

550 os.rename(self.results_cache_file + ".new", self.results_cache_file) 

551 

552 self.save_pending_json() 

553 

554 def format_retry_url( 

555 self, run_id: str | None, arch: str, testsrc: str, trigger: str 

556 ) -> str: 

557 if self.options.adt_ppas: 

558 ppas = "&" + urllib.parse.urlencode( 

559 [("ppa", p) for p in self.options.adt_ppas] 

560 ) 

561 else: 

562 ppas = "" 

563 return cast(str, self.options.adt_retry_url).format( 

564 run_id=run_id, 

565 release=self.options.series, 

566 arch=arch, 

567 package=testsrc, 

568 trigger=urllib.parse.quote_plus(trigger), 

569 ppas=ppas, 

570 ) 

571 

572 def format_log_url(self, testsrc: str, arch: str, run_id: str) -> str: 

573 return cast(str, self.options.adt_log_url).format( 

574 release=self.options.series, 

575 swift_container=self.swift_container, 

576 hash=srchash(testsrc), 

577 package=testsrc, 

578 arch=arch, 

579 run_id=run_id, 

580 ) 

581 

582 def apply_src_policy_impl( 

583 self, 

584 tests_info: dict[str, Any], 

585 source_data_tdist: SourcePackage | None, 

586 source_data_srcdist: SourcePackage, 

587 excuse: "Excuse", 

588 ) -> PolicyVerdict: 

589 

590 # initialize 

591 verdict = PolicyVerdict.PASS 

592 source_name = excuse.item.package 

593 

594 # skip/delay autopkgtests until new package is built somewhere 

595 if not binaries_from_source_version(source_data_srcdist, self.suite_info): 

596 self.logger.debug( 

597 "%s hasnot been built anywhere, skipping autopkgtest policy", 

598 excuse.name, 

599 ) 

600 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

601 excuse.add_verdict_info(verdict, "Autopkgtest deferred: missing builds") 

602 

603 elif "all" in excuse.missing_builds: 

604 self.logger.debug( 

605 "%s hasnot been built for arch:all, skipping autopkgtest policy", 

606 source_name, 

607 ) 

608 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

609 excuse.add_verdict_info( 

610 verdict, "Autopkgtest deferred: missing arch:all build" 

611 ) 

612 

613 all_self_tests_pass = False 

614 results_info: list[str] = [] 

615 if not verdict.is_rejected: 

616 self.logger.debug("Checking autopkgtests for %s", source_name) 

617 trigger = source_name + "/" + source_data_srcdist.version 

618 

619 # build a (testsrc, testver) → arch → (status, run_id, log_url) map; we trigger/check test 

620 # results per architecture for technical/efficiency reasons, but we 

621 # want to evaluate and present the results by tested source package 

622 # first 

623 pkg_arch_result: dict[ 

624 tuple[str, str], dict[str, tuple[str, str | None, str]] 

625 ] = collections.defaultdict(dict) 

626 for arch in self.adt_arches: 

627 if arch in excuse.missing_builds: 

628 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

629 self.logger.debug( 

630 "%s hasnot been built on arch %s, delay autopkgtest there", 

631 source_name, 

632 arch, 

633 ) 

634 excuse.add_verdict_info( 

635 verdict, 

636 f"Autopkgtest deferred on {arch}: missing arch:{arch} build", 

637 ) 

638 else: 

639 verdict = self.check_and_request_arch( 

640 excuse, 

641 arch, 

642 source_data_srcdist, 

643 pkg_arch_result, 

644 trigger, 

645 verdict, 

646 ) 

647 

648 verdict, results_info, all_self_tests_pass = self.process_pkg_arch_results( 

649 tests_info, excuse, pkg_arch_result, verdict, trigger 

650 ) 

651 

652 verdict = self.finalize_excuse( 

653 excuse, verdict, all_self_tests_pass, results_info 

654 ) 

655 return verdict 

656 

657 def apply_srcarch_policy_impl( 

658 self, 

659 tests_info: dict[str, Any], 

660 arch: str, 

661 source_data_tdist: SourcePackage | None, 

662 source_data_srcdist: SourcePackage, 

663 excuse: "Excuse", 

664 ) -> PolicyVerdict: 

665 

666 assert self.hints is not None # for type checking 

667 # initialize 

668 verdict = PolicyVerdict.PASS 

669 

670 self.logger.debug(f"Checking autopkgtests for binNMU {str(excuse.item)}/{arch}") 

671 

672 if arch not in self.adt_arches: 

673 return verdict 

674 

675 # find the binNMU version 

676 versions = set() 

677 for bin_pkg in source_data_srcdist.binaries: 

678 if bin_pkg.architecture == arch: 

679 if ( 

680 len(parts := bin_pkg.version.split("+b")) > 1 

681 and parts[-1].isdigit() 

682 ): 

683 versions.add(parts[-1]) 

684 else: 

685 self.logger.debug( 

686 f"Version {bin_pkg.version} doesn't end with '+b#', skipping" 

687 ) 

688 if not versions or len(versions) > 1: 

689 self.logger.debug("This migration item doesn't look like a binNMU") 

690 return verdict 

691 

692 trigger = str(excuse.item) + "/" + versions.pop() 

693 

694 # While we don't need the arch here, this is common with apply_src_policy_impl() 

695 # (testsrc, testver) → arch → (status, run_id, log_url) map 

696 pkg_arch_result: dict[ 

697 tuple[str, str], dict[str, tuple[str, str | None, str]] 

698 ] = collections.defaultdict(dict) 

699 

700 verdict = self.check_and_request_arch( 

701 excuse, arch, source_data_srcdist, pkg_arch_result, trigger, verdict 

702 ) 

703 

704 verdict, results_info, all_self_tests_pass = self.process_pkg_arch_results( 

705 tests_info, excuse, pkg_arch_result, verdict, trigger 

706 ) 

707 

708 verdict = self.finalize_excuse( 

709 excuse, verdict, all_self_tests_pass, results_info 

710 ) 

711 return verdict 

712 

713 def check_and_request_arch( 

714 self, 

715 excuse: "Excuse", 

716 arch: str, 

717 source_data_srcdist: SourcePackage, 

718 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]], 

719 trigger: str, 

720 verdict: PolicyVerdict, 

721 ) -> PolicyVerdict: 

722 """Perform sanity checks and request test/results when they pass""" 

723 

724 source_name = excuse.item.package 

725 if arch in excuse.policy_info["depends"].get("arch_all_not_installable", []): 

726 self.logger.debug( 

727 "%s is uninstallable on arch %s (which is allowed), not running autopkgtest there", 

728 source_name, 

729 arch, 

730 ) 

731 excuse.addinfo( 

732 f"Autopkgtest skipped on {arch}: not installable (which is allowed)" 

733 ) 

734 elif arch in excuse.unsatisfiable_on_archs and arch not in excuse.policy_info[ 

735 "depends" 

736 ].get("autopkgtest_run_anyways", []): 

737 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

738 self.logger.debug( 

739 "%s is uninstallable on arch %s, not running autopkgtest there", 

740 source_name, 

741 arch, 

742 ) 

743 excuse.addinfo(f"Autopkgtest skipped on {arch}: not installable") 

744 else: 

745 self.request_tests_for_source( 

746 arch, source_data_srcdist, pkg_arch_result, excuse, trigger 

747 ) 

748 

749 return verdict 

750 

751 def process_pkg_arch_results( 

752 self, 

753 tests_info: dict[str, Any], 

754 excuse: "Excuse", 

755 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]], 

756 verdict: PolicyVerdict, 

757 trigger: str, 

758 ) -> tuple[PolicyVerdict, list[str], bool]: 

759 """Calculate verdict based on results and render excuse text""" 

760 

761 source_name = excuse.item.package 

762 all_self_tests_pass = False 

763 results_info = [] 

764 

765 # add test result details to Excuse 

766 cloud_url = self.options.adt_ci_url + "packages/%(h)s/%(s)s/%(r)s/%(a)s" 

767 testver: str | None 

768 for testsrc, testver in sorted(pkg_arch_result): 

769 assert testver is not None 

770 arch_results = pkg_arch_result[(testsrc, testver)] 

771 r = {v[0] for v in arch_results.values()} 

772 if r & {"FAIL", "OLD_FAIL", "REGRESSION"}: 

773 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

774 elif ( 

775 r & {"DEFERRED", "RUNNING", "RUNNING-REFERENCE"} 

776 and not verdict.is_rejected 

777 ): 

778 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

779 # skip version if still running on all arches 

780 if not r - {"DEFERRED", "RUNNING", "RUNNING-ALWAYSFAIL", "RUNNING-IGNORE"}: 

781 testver = None 

782 

783 # A source package is eligible for the bounty if it has tests 

784 # of its own that pass on all tested architectures. 

785 if testsrc == source_name: 

786 excuse.autopkgtest_results = r 

787 if r == {"PASS"}: 

788 all_self_tests_pass = True 

789 

790 if testver: 

791 testname = f"{testsrc}/{testver}" 

792 else: 

793 testname = testsrc 

794 

795 html_archmsg = [] 

796 for arch in sorted(arch_results): 

797 (status, run_id, log_url) = arch_results[arch] 

798 artifact_url = None 

799 retry_url = None 

800 reference_url = None 

801 reference_retry_url = None 

802 history_url = None 

803 if self.options.adt_ppas: 

804 if log_url.endswith("log.gz"): 

805 artifact_url = log_url.replace("log.gz", "artifacts.tar.gz") 

806 else: 

807 history_url = cloud_url % { 

808 "h": srchash(testsrc), 

809 "s": testsrc, 

810 "r": self.options.series, 

811 "a": arch, 

812 } 

813 if status not in ("DEFERRED", "PASS", "RUNNING", "RUNNING-IGNORE"): 

814 retry_url = self.format_retry_url(run_id, arch, testsrc, trigger) 

815 

816 baseline_result = self.result_in_baseline(testsrc, arch) 

817 if baseline_result and baseline_result[0] != Result.NONE: 

818 baseline_run_id = str(baseline_result[2]) 

819 reference_url = self.format_log_url( 

820 testsrc, arch, baseline_run_id 

821 ) 

822 if self.options.adt_baseline == "reference": 

823 reference_retry_url = self.format_retry_url( 

824 baseline_run_id, arch, testsrc, REF_TRIG 

825 ) 

826 tests_info.setdefault(testname, {})[arch] = [ 

827 status, 

828 log_url, 

829 history_url, 

830 artifact_url, 

831 retry_url, 

832 ] 

833 

834 # render HTML snippet for testsrc entry for current arch 

835 if history_url: 

836 message = f'<a href="{history_url}">{arch}</a>' 

837 else: 

838 message = arch 

839 message += ': <a href="{}">{}</a>'.format( 

840 log_url, 

841 EXCUSES_LABELS[status], 

842 ) 

843 if retry_url: 

844 message += ( 

845 '<a href="%s" style="text-decoration: none;"> ♻</a>' % retry_url 

846 ) 

847 if reference_url: 

848 message += ' (<a href="%s">reference</a>' % reference_url 

849 if reference_retry_url: 

850 message += ( 

851 '<a href="%s" style="text-decoration: none;"> ♻</a>' 

852 % reference_retry_url 

853 ) 

854 message += ")" 

855 if artifact_url: 

856 message += ' <a href="%s">[artifacts]</a>' % artifact_url 

857 html_archmsg.append(message) 

858 

859 # render HTML line for testsrc entry 

860 # - if action is or may be required 

861 # - for ones own package 

862 if ( 

863 r 

864 - { 

865 "PASS", 

866 "NEUTRAL", 

867 "RUNNING-ALWAYSFAIL", 

868 "ALWAYSFAIL", 

869 "IGNORE-FAIL", 

870 } 

871 or testsrc == source_name 

872 ): 

873 if testver: 

874 pkg = '<a href="#{0}">{0}</a>/{1}'.format(testsrc, testver) 

875 else: 

876 pkg = '<a href="#{0}">{0}</a>'.format(testsrc) 

877 results_info.append( 

878 "Autopkgtest for {}: {}".format(pkg, ", ".join(html_archmsg)) 

879 ) 

880 

881 return (verdict, results_info, all_self_tests_pass) 

882 

883 def finalize_excuse( 

884 self, 

885 excuse: "Excuse", 

886 verdict: PolicyVerdict, 

887 all_self_tests_pass: bool, 

888 results_info: list[str], 

889 ) -> PolicyVerdict: 

890 """Updates excuses and verdict for hints and bounty/penalty config 

891 

892 Given the verdict so far, hints and configuration, the verdict may be 

893 updated. Depending of the end verdict, the content of results_info is 

894 added as info or as excuse. 

895 """ 

896 

897 package = excuse.item.package 

898 version = excuse.item.version 

899 

900 assert self.hints is not None # for type checking 

901 if verdict.is_rejected: 

902 # check for force-skiptest hint 

903 hints = self.hints.search( 

904 "force-skiptest", 

905 package=package, 

906 version=version, 

907 ) 

908 if hints: 

909 excuse.addreason("skiptest") 

910 excuse.addinfo( 

911 "Autopkgtest check should wait for tests relating to %s %s, but forced by %s" 

912 % (package, version, hints[0].user) 

913 ) 

914 verdict = PolicyVerdict.PASS_HINTED 

915 else: 

916 excuse.addreason("autopkgtest") 

917 

918 if ( 

919 self.options.adt_success_bounty 

920 and verdict == PolicyVerdict.PASS 

921 and all_self_tests_pass 

922 ): 

923 excuse.add_bounty("autopkgtest", self.options.adt_success_bounty) 

924 if self.options.adt_regression_penalty and verdict in { 

925 PolicyVerdict.REJECTED_PERMANENTLY, 

926 PolicyVerdict.REJECTED_TEMPORARILY, 

927 }: 

928 if self.options.adt_regression_penalty > 0: 928 ↛ 931line 928 didn't jump to line 931 because the condition on line 928 was always true

929 excuse.add_penalty("autopkgtest", self.options.adt_regression_penalty) 

930 # In case we give penalties instead of blocking, we must always pass 

931 verdict = PolicyVerdict.PASS 

932 for i in results_info: 

933 if verdict.is_rejected: 

934 excuse.add_verdict_info(verdict, i) 

935 else: 

936 excuse.addinfo(i) 

937 

938 return verdict 

939 

940 @staticmethod 

941 def has_autodep8(srcinfo: SourcePackage) -> bool: 

942 """Check if package is covered by autodep8 

943 

944 srcinfo is an item from self.britney.sources 

945 """ 

946 # autodep8? 

947 for t in srcinfo.testsuite: 

948 if t.startswith("autopkgtest-pkg"): 

949 return True 

950 

951 return False 

952 

953 def request_tests_for_source( 

954 self, 

955 arch: str, 

956 source_data_srcdist: SourcePackage, 

957 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]], 

958 excuse: "Excuse", 

959 trigger: str, 

960 ) -> None: 

961 pkg_universe = self.britney.pkg_universe 

962 target_suite = self.suite_info.target_suite 

963 source_suite = excuse.item.suite 

964 sources_t = target_suite.sources 

965 sources_s = excuse.item.suite.sources 

966 packages_s_a = excuse.item.suite.binaries[arch] 

967 source_name = excuse.item.package 

968 source_version = source_data_srcdist.version 

969 # request tests (unless they were already requested earlier or have a result) 

970 tests = self.tests_for_source(source_name, source_version, arch, excuse) 

971 is_huge = len(tests) > self.options.adt_huge 

972 

973 # local copies for better performance 

974 parse_src_depends = apt_pkg.parse_src_depends 

975 

976 # Here we figure out what is required from the source suite 

977 # for the test to install successfully. 

978 # 

979 # The ImplicitDependencyPolicy does a similar calculation, but 

980 # if I (elbrus) understand correctly, only in the reverse 

981 # dependency direction. We are doing something similar here 

982 # but in the dependency direction (note: this code is older). 

983 # We use the ImplicitDependencyPolicy result for the reverse 

984 # dependencies and we keep the code below for the 

985 # dependencies. Using the ImplicitDependencyPolicy results 

986 # also in the reverse direction seems to require quite some 

987 # reorganisation to get that information available here, as in 

988 # the current state only the current excuse is available here 

989 # and the required other excuses may not be calculated yet. 

990 # 

991 # Loop over all binary packages from trigger and 

992 # recursively look up which *versioned* dependencies are 

993 # only satisfied in the source suite. 

994 # 

995 # For all binaries found, look up which packages they 

996 # break/conflict with in the target suite, but not in the 

997 # source suite. The main reason to do this is to cover test 

998 # dependencies, so we will check Testsuite-Triggers as 

999 # well. 

1000 # 

1001 # OI: do we need to do the first check in a smart way 

1002 # (i.e. only for the packages that are actually going to be 

1003 # installed) for the breaks/conflicts set as well, i.e. do 

1004 # we need to check if any of the packages that we now 

1005 # enforce being from the source suite, actually have new 

1006 # versioned depends and new breaks/conflicts. 

1007 # 

1008 # For all binaries found, add the set of unique source 

1009 # packages to the list of triggers. 

1010 

1011 bin_triggers: set[PackageId] = set() 

1012 bin_new = set(filter_out_faux(source_data_srcdist.binaries)) 

1013 # For each build-depends block (if any) check if the first alternative 

1014 # is satisfiable in the target suite. If not, add it to the initial set 

1015 # used for checking. 

1016 for block in parse_src_depends(concat_bdeps(source_data_srcdist), True, arch): 

1017 if not get_dependency_solvers( 

1018 [block[0]], 

1019 target_suite.binaries[arch], 

1020 target_suite.provides_table[arch], 

1021 build_depends=True, 

1022 ) and ( 

1023 solvers := get_dependency_solvers( 

1024 [block[0]], 

1025 packages_s_a, 

1026 source_suite.provides_table[arch], 

1027 build_depends=True, 

1028 ) 

1029 ): 

1030 bin_new.add(solvers[0].pkg_id) 

1031 for n_binary in iter_except(bin_new.pop, KeyError): 

1032 if n_binary in bin_triggers: 

1033 continue 

1034 bin_triggers.add(n_binary) 

1035 

1036 # Check if there is a dependency that is not 

1037 # available in the target suite. 

1038 # We add slightly too much here, because new binaries 

1039 # will also show up, but they are already properly 

1040 # installed. Nevermind. 

1041 depends = pkg_universe.dependencies_of(n_binary) 

1042 # depends is a frozenset{frozenset{BinaryPackageId, ..}} 

1043 for deps_of_bin in depends: 

1044 if target_suite.any_of_these_are_in_the_suite(deps_of_bin): 

1045 # if any of the alternative dependencies is already 

1046 # satisfied in the target suite, we can just ignore it 

1047 continue 

1048 # We'll figure out which version later 

1049 bin_new.update( 

1050 added_pkgs_compared_to_target_suite(deps_of_bin, target_suite) 

1051 ) 

1052 

1053 # Check if the package breaks/conflicts anything. We might 

1054 # be adding slightly too many source packages due to the 

1055 # check here as a binary package that is broken may be 

1056 # coming from a different source package in the source 

1057 # suite. Nevermind. 

1058 bin_broken = set() 

1059 for t_binary in bin_triggers: 

1060 # broken is a frozenset{BinaryPackageId, ..} 

1061 broken = pkg_universe.negative_dependencies_of( 

1062 cast(BinaryPackageId, t_binary) 

1063 ) 

1064 broken_in_target = { 

1065 p.package_name 

1066 for p in target_suite.which_of_these_are_in_the_suite(broken) 

1067 } 

1068 broken_in_source = { 

1069 p.package_name 

1070 for p in source_suite.which_of_these_are_in_the_suite(broken) 

1071 } 

1072 # We want packages with a newer version in the source suite that 

1073 # no longer has the conflict. This is an approximation 

1074 broken_filtered = { 

1075 p 

1076 for p in broken 

1077 if p.package_name in broken_in_target 

1078 and p.package_name not in broken_in_source 

1079 } 

1080 # We add the version in the target suite, but the code below will 

1081 # change it to the version in the source suite 

1082 bin_broken.update(broken_filtered) 

1083 bin_triggers.update(bin_broken) 

1084 

1085 # The ImplicitDependencyPolicy also found packages that need 

1086 # to migrate together, so add them to the triggers too. 

1087 for bin_implicit in excuse.depends_packages_flattened: 

1088 if bin_implicit.architecture == arch: 

1089 bin_triggers.add(bin_implicit) 

1090 

1091 triggers = set() 

1092 for t_binary2 in bin_triggers: 

1093 if t_binary2.architecture == arch: 

1094 try: 

1095 source_of_bin = packages_s_a[t_binary2.package_name].source 

1096 # If the version in the target suite is the same, don't add a trigger. 

1097 # Note that we looked up the source package in the source suite. 

1098 # If it were a different source package in the target suite, however, then 

1099 # we would not have this source package in the same version anyway. 

1100 # 

1101 # binNMU's exist, so let's also check if t_binary2 exists 

1102 # in the target suite if the sources are the same. 

1103 if ( 

1104 sources_t.get(source_of_bin, None) is None 

1105 or sources_s[source_of_bin].version 

1106 != sources_t[source_of_bin].version 

1107 or not target_suite.any_of_these_are_in_the_suite( 

1108 {cast(BinaryPackageId, t_binary2)} 

1109 ) 

1110 ): 

1111 triggers.add( 

1112 source_of_bin + "/" + sources_s[source_of_bin].version 

1113 ) 

1114 except KeyError: 

1115 # Apparently the package was removed from 

1116 # unstable e.g. if packages are replaced 

1117 # (e.g. -dbg to -dbgsym) 

1118 pass 

1119 if t_binary2 not in source_data_srcdist.binaries: 

1120 for tdep_src in self.testsuite_triggers.get( 

1121 t_binary2.package_name, set() 

1122 ): 

1123 try: 

1124 # Only add trigger if versions in the target and source suites are different 

1125 if ( 1125 ↛ 1120line 1125 didn't jump to line 1120

1126 sources_t.get(tdep_src, None) is None 

1127 or sources_s[tdep_src].version 

1128 != sources_t[tdep_src].version 

1129 ): 

1130 triggers.add( 

1131 tdep_src + "/" + sources_s[tdep_src].version 

1132 ) 

1133 except KeyError: 

1134 # Apparently the source was removed from 

1135 # unstable (testsuite_triggers are unified 

1136 # over all suites) 

1137 pass 

1138 source_trigger = source_name + "/" + source_version 

1139 triggers.discard(source_trigger) 

1140 triggers_list = sorted(list(triggers)) 

1141 triggers_list.insert(0, trigger) 

1142 

1143 impl_pids = excuse.policy_info.get("implicit-deps", {}).get( 

1144 "broken-binaries", [] 

1145 ) 

1146 for testsrc, testver in tests: 

1147 # Not if binaries from testsrc are not installable 

1148 skip = False 

1149 for bpid_s in impl_pids: 

1150 bpid = BinaryPackageId(*bpid_s.split("/")) 

1151 if ( 

1152 bpid.architecture == arch 

1153 and testsrc == target_suite.all_binaries_in_suite[bpid].source 

1154 ): 

1155 skip = True 

1156 break 

1157 if skip: 

1158 pkg_arch_result[(testsrc, testver)][arch] = ("DEFERRED", None, "") 

1159 else: 

1160 self.pkg_test_request(testsrc, arch, triggers_list, huge=is_huge) 

1161 (result, real_ver, run_id, url) = self.pkg_test_result( 

1162 testsrc, testver, arch, trigger 

1163 ) 

1164 pkg_arch_result[(testsrc, real_ver)][arch] = (result, run_id, url) 

1165 

1166 def tests_for_source( 

1167 self, src: str, ver: str, arch: str, excuse: "Excuse" 

1168 ) -> list[tuple[str, str]]: 

1169 """Iterate over all tests that should be run for given source and arch""" 

1170 

1171 source_suite = self.suite_info.primary_source_suite 

1172 target_suite = self.suite_info.target_suite 

1173 sources_info = target_suite.sources 

1174 binaries_info = target_suite.binaries[arch] 

1175 

1176 reported_pkgs = set() 

1177 

1178 tests = [] 

1179 

1180 # Debian doesn't have linux-meta, but Ubuntu does 

1181 # for linux themselves we don't want to trigger tests -- these should 

1182 # all come from linux-meta*. A new kernel ABI without a corresponding 

1183 # -meta won't be installed and thus we can't sensibly run tests against 

1184 # it. 

1185 if ( 1185 ↛ 1189line 1185 didn't jump to line 1189

1186 src.startswith("linux") 

1187 and src.replace("linux", "linux-meta") in sources_info 

1188 ): 

1189 return [] 

1190 

1191 # we want to test the package itself, if it still has a test in unstable 

1192 # but only if the package actually exists on this arch 

1193 srcinfo = source_suite.sources[src] 

1194 if ("autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo)) and len( 

1195 excuse.packages[arch] 

1196 ) > 0: 

1197 reported_pkgs.add(src) 

1198 tests.append((src, ver)) 

1199 

1200 extra_bins = [] 

1201 # Debian doesn't have linux-meta, but Ubuntu does 

1202 # Hack: For new kernels trigger all DKMS packages by pretending that 

1203 # linux-meta* builds a "dkms" binary as well. With that we ensure that we 

1204 # don't regress DKMS drivers with new kernel versions. 

1205 if src.startswith("linux-meta"): 

1206 # does this have any image on this arch? 

1207 for pkg_id in srcinfo.binaries: 

1208 if pkg_id.architecture == arch and "-image" in pkg_id.package_name: 

1209 try: 

1210 extra_bins.append(binaries_info["dkms"].pkg_id) 

1211 except KeyError: 

1212 pass 

1213 

1214 if not self.has_built_on_this_arch_or_is_arch_all(srcinfo, arch): 

1215 return [] 

1216 

1217 pkg_universe = self.britney.pkg_universe 

1218 # plus all direct reverse dependencies and test triggers of its 

1219 # binaries which have an autopkgtest 

1220 for binary in itertools.chain(srcinfo.binaries, extra_bins): 

1221 rdeps = filter_out_faux(pkg_universe.reverse_dependencies_of(binary)) 

1222 for rdep in rdeps: 

1223 try: 

1224 rdep_src = binaries_info[rdep.package_name].source 

1225 # Don't re-trigger the package itself here; this should 

1226 # have been done above if the package still continues to 

1227 # have an autopkgtest in unstable. 

1228 if rdep_src == src: 

1229 continue 

1230 except KeyError: 

1231 continue 

1232 

1233 rdep_src_info = sources_info[rdep_src] 

1234 if "autopkgtest" in rdep_src_info.testsuite or self.has_autodep8( 

1235 rdep_src_info 

1236 ): 

1237 if rdep_src not in reported_pkgs: 

1238 tests.append((rdep_src, rdep_src_info.version)) 

1239 reported_pkgs.add(rdep_src) 

1240 

1241 for tdep_src in self.testsuite_triggers.get(binary.package_name, set()): 

1242 if tdep_src not in reported_pkgs: 

1243 try: 

1244 tdep_src_info = sources_info[tdep_src] 

1245 except KeyError: 

1246 continue 

1247 if "autopkgtest" in tdep_src_info.testsuite or self.has_autodep8( 1247 ↛ 1241line 1247 didn't jump to line 1241 because the condition on line 1247 was always true

1248 tdep_src_info 

1249 ): 

1250 for pkg_id in tdep_src_info.binaries: 1250 ↛ 1241line 1250 didn't jump to line 1241 because the loop on line 1250 didn't complete

1251 if pkg_id.architecture == arch: 

1252 tests.append((tdep_src, tdep_src_info.version)) 

1253 reported_pkgs.add(tdep_src) 

1254 break 

1255 

1256 tests.sort(key=lambda s_v: s_v[0]) 

1257 return tests 

1258 

1259 def read_pending_tests(self) -> None: 

1260 """Read pending test requests from previous britney runs 

1261 

1262 Initialize self.pending_tests with that data. 

1263 """ 

1264 assert self.pending_tests is None, "already initialized" 

1265 if not os.path.exists(self.pending_tests_file): 

1266 self.logger.info( 

1267 "No %s, starting with no pending tests", self.pending_tests_file 

1268 ) 

1269 self.pending_tests = {} 

1270 return 

1271 with open(self.pending_tests_file) as f: 

1272 self.pending_tests = json.load(f) 

1273 if VERSION_KEY in self.pending_tests: 

1274 del self.pending_tests[VERSION_KEY] 

1275 for trigger in list(self.pending_tests.keys()): 

1276 for pkg in list(self.pending_tests[trigger].keys()): 

1277 arch_dict = self.pending_tests[trigger][pkg] 

1278 for arch in list(arch_dict.keys()): 

1279 if ( 

1280 self._now - arch_dict[arch] 

1281 > self.options.adt_pending_max_age 

1282 ): 

1283 del arch_dict[arch] 

1284 if not arch_dict: 

1285 del self.pending_tests[trigger][pkg] 

1286 if not self.pending_tests[trigger]: 

1287 del self.pending_tests[trigger] 

1288 else: 

1289 # Migration code: 

1290 for trigger_data in self.pending_tests.values(): 1290 ↛ 1291line 1290 didn't jump to line 1291 because the loop on line 1290 never started

1291 for pkg, arch_list in trigger_data.items(): 

1292 trigger_data[pkg] = {} 

1293 for arch in arch_list: 

1294 trigger_data[pkg][arch] = self._now 

1295 

1296 self.logger.info( 

1297 "Read pending requested tests from %s", self.pending_tests_file 

1298 ) 

1299 self.logger.debug("%s", self.pending_tests) 

1300 

1301 # this requires iterating over all triggers and thus is expensive; 

1302 # cache the results 

1303 @lru_cache(None) 

1304 def latest_run_for_package(self, src: str, arch: str) -> str: 

1305 """Return latest run ID for src on arch""" 

1306 

1307 latest_run_id = "" 

1308 for srcmap in self.test_results.values(): 

1309 try: 

1310 run_id = srcmap[src][arch][2] 

1311 except KeyError: 

1312 continue 

1313 if run_id > latest_run_id: 

1314 latest_run_id = run_id 

1315 return latest_run_id 

1316 

1317 def urlopen_retry(self, url: str) -> http.client.HTTPResponse | addinfourl: 

1318 """A urlopen() that retries on time outs or errors""" 

1319 

1320 exc: Exception 

1321 for retry in range(5): 1321 ↛ 1342line 1321 didn't jump to line 1342 because the loop on line 1321 didn't complete

1322 try: 

1323 req = urlopen(url, timeout=30) 

1324 code = req.getcode() 

1325 if not code or 200 <= code < 300: 1325 ↛ 1321line 1325 didn't jump to line 1321 because the condition on line 1325 was always true

1326 return req # type: ignore[no-any-return] 

1327 except TimeoutError as e: 1327 ↛ 1328line 1327 didn't jump to line 1328 because the exception caught by line 1327 didn't happen

1328 self.logger.info( 

1329 "Timeout downloading '%s', will retry %d more times." 

1330 % (url, 5 - retry - 1) 

1331 ) 

1332 exc = e 

1333 except HTTPError as e: 

1334 if e.code not in (503, 502): 1334 ↛ 1336line 1334 didn't jump to line 1336 because the condition on line 1334 was always true

1335 raise 

1336 self.logger.info( 

1337 "Caught error %d downloading '%s', will retry %d more times." 

1338 % (e.code, url, 5 - retry - 1) 

1339 ) 

1340 exc = e 

1341 else: 

1342 raise exc 

1343 

1344 @lru_cache(None) 

1345 def fetch_swift_results(self, swift_url: str, src: str, arch: str) -> None: 

1346 """Download new results for source package/arch from swift""" 

1347 

1348 # prepare query: get all runs with a timestamp later than the latest 

1349 # run_id for this package/arch; '@' is at the end of each run id, to 

1350 # mark the end of a test run directory path 

1351 # example: <autopkgtest-wily>wily/amd64/libp/libpng/20150630_054517@/result.tar 

1352 query = { 

1353 "delimiter": "@", 

1354 "prefix": f"{self.options.series}/{arch}/{srchash(src)}/{src}/", 

1355 } 

1356 

1357 # determine latest run_id from results 

1358 if not self.options.adt_shared_results_cache: 

1359 latest_run_id = self.latest_run_for_package(src, arch) 

1360 if latest_run_id: 

1361 query["marker"] = query["prefix"] + latest_run_id 

1362 

1363 # request new results from swift 

1364 url = os.path.join(swift_url, self.swift_container) 

1365 url += "?" + urllib.parse.urlencode(query) 

1366 f = None 

1367 try: 

1368 f = self.urlopen_retry(url) 

1369 if f.getcode() == 200: 

1370 result_paths = f.read().decode().strip().splitlines() 

1371 elif f.getcode() == 204: # No content 1371 ↛ 1377line 1371 didn't jump to line 1377 because the condition on line 1371 was always true

1372 result_paths = [] 

1373 else: 

1374 # we should not ever end up here as we expect a HTTPError in 

1375 # other cases; e. g. 3XX is something that tells us to adjust 

1376 # our URLS, so fail hard on those 

1377 raise NotImplementedError( 

1378 "fetch_swift_results(%s): cannot handle HTTP code %r" 

1379 % (url, f.getcode()) 

1380 ) 

1381 except OSError as e: 

1382 # 401 "Unauthorized" is swift's way of saying "container does not exist" 

1383 if getattr(e, "code", -1) == 401: 1383 ↛ 1392line 1383 didn't jump to line 1392 because the condition on line 1383 was always true

1384 self.logger.info( 

1385 "fetch_swift_results: %s does not exist yet or is inaccessible", url 

1386 ) 

1387 return 

1388 # Other status codes are usually a transient 

1389 # network/infrastructure failure. Ignoring this can lead to 

1390 # re-requesting tests which we already have results for, so 

1391 # fail hard on this and let the next run retry. 

1392 self.logger.error("Failure to fetch swift results from %s: %s", url, str(e)) 

1393 sys.exit(1) 

1394 finally: 

1395 if f is not None: 1395 ↛ 1398line 1395 didn't jump to line 1398 because the condition on line 1395 was always true

1396 f.close() 1396 ↛ exitline 1396 didn't return from function 'fetch_swift_results' because the return on line 1387 wasn't executed

1397 

1398 for p in result_paths: 

1399 self.fetch_one_result( 

1400 os.path.join(swift_url, self.swift_container, p, "result.tar"), 

1401 src, 

1402 arch, 

1403 ) 

1404 

1405 def fetch_one_result(self, url: str, src: str, arch: str) -> None: 

1406 """Download one result URL for source/arch 

1407 

1408 Remove matching pending_tests entries. 

1409 """ 

1410 f = None 

1411 try: 

1412 f = self.urlopen_retry(url) 

1413 if f.getcode() == 200: 1413 ↛ 1416line 1413 didn't jump to line 1416 because the condition on line 1413 was always true

1414 tar_bytes = io.BytesIO(f.read()) 

1415 else: 

1416 raise NotImplementedError( 

1417 "fetch_one_result(%s): cannot handle HTTP code %r" 

1418 % (url, f.getcode()) 

1419 ) 

1420 except OSError as err: 

1421 self.logger.error("Failure to fetch %s: %s", url, str(err)) 

1422 # we tolerate "not found" (something went wrong on uploading the 

1423 # result), but other things indicate infrastructure problems 

1424 if getattr(err, "code", -1) == 404: 

1425 return 

1426 sys.exit(1) 

1427 finally: 

1428 if f is not None: 1428 ↛ exit,   1428 ↛ 14302 missed branches: 1) line 1428 didn't return from function 'fetch_one_result' because the return on line 1425 wasn't executed, 2) line 1428 didn't jump to line 1430 because the condition on line 1428 was always true

1429 f.close() 1429 ↛ exitline 1429 didn't return from function 'fetch_one_result' because the return on line 1425 wasn't executed

1430 try: 

1431 with tarfile.open(None, "r", tar_bytes) as tar: 

1432 exitcode = int(tar.extractfile("exitcode").read().strip()) # type: ignore[union-attr] 

1433 srcver = tar.extractfile("testpkg-version").read().decode().strip() # type: ignore[union-attr] 

1434 (ressrc, ver) = srcver.split() 

1435 testinfo = json.loads(tar.extractfile("testinfo.json").read().decode()) # type: ignore[union-attr] 

1436 except (KeyError, ValueError, tarfile.TarError) as err: 

1437 self.logger.error("%s is damaged, ignoring: %s", url, str(err)) 

1438 # ignore this; this will leave an orphaned request in autopkgtest-pending.json 

1439 # and thus require manual retries after fixing the tmpfail, but we 

1440 # can't just blindly attribute it to some pending test. 

1441 return 

1442 

1443 if src != ressrc: 1443 ↛ 1444line 1443 didn't jump to line 1444 because the condition on line 1443 was never true

1444 self.logger.error( 

1445 "%s is a result for package %s, but expected package %s", 

1446 url, 

1447 ressrc, 

1448 src, 

1449 ) 

1450 return 

1451 

1452 # parse recorded triggers in test result 

1453 for e in testinfo.get("custom_environment", []): 1453 ↛ 1458line 1453 didn't jump to line 1458 because the loop on line 1453 didn't complete

1454 if e.startswith("ADT_TEST_TRIGGERS="): 1454 ↛ 1453line 1454 didn't jump to line 1453 because the condition on line 1454 was always true

1455 result_triggers = [i for i in e.split("=", 1)[1].split() if "/" in i] 

1456 break 

1457 else: 

1458 self.logger.error("%s result has no ADT_TEST_TRIGGERS, ignoring") 

1459 return 

1460 

1461 run_id = os.path.basename(os.path.dirname(url)) 

1462 seen = round(calendar.timegm(time.strptime(run_id, "%Y%m%d_%H%M%S@"))) 

1463 # allow some skipped tests, but nothing else 

1464 if exitcode in [0, 2]: 

1465 result = Result.PASS 

1466 elif exitcode == 8: 1466 ↛ 1467line 1466 didn't jump to line 1467 because the condition on line 1466 was never true

1467 result = Result.NEUTRAL 

1468 else: 

1469 result = Result.FAIL 

1470 

1471 self.logger.info( 

1472 "Fetched test result for %s/%s/%s %s (triggers: %s): %s", 

1473 src, 

1474 ver, 

1475 arch, 

1476 run_id, 

1477 result_triggers, 

1478 result.name.lower(), 

1479 ) 

1480 

1481 # remove matching test requests 

1482 for trigger in result_triggers: 

1483 self.remove_from_pending(trigger, src, arch) 

1484 

1485 # add this result 

1486 for trigger in result_triggers: 

1487 self.add_trigger_to_results(trigger, src, ver, arch, run_id, seen, result) 

1488 

1489 def remove_from_pending( 

1490 self, trigger: str, src: str, arch: str, timestamp: int = sys.maxsize 

1491 ) -> None: 

1492 assert self.pending_tests is not None # for type checking 

1493 try: 

1494 arch_dict = self.pending_tests[trigger][src] 

1495 if timestamp < arch_dict[arch]: 

1496 # The result is from before the moment of scheduling, so it's 

1497 # not the one we're waiting for 

1498 return 

1499 del arch_dict[arch] 

1500 if not arch_dict: 

1501 del self.pending_tests[trigger][src] 

1502 if not self.pending_tests[trigger]: 

1503 del self.pending_tests[trigger] 

1504 self.logger.debug( 

1505 "-> matches pending request %s/%s for trigger %s", src, arch, trigger 

1506 ) 

1507 except KeyError: 

1508 self.logger.debug( 

1509 "-> does not match any pending request for %s/%s", src, arch 

1510 ) 

1511 

1512 def add_trigger_to_results( 

1513 self, 

1514 trigger: str, 

1515 src: str, 

1516 ver: str, 

1517 arch: str, 

1518 run_id: str, 

1519 timestamp: int, 

1520 status_to_add: Result, 

1521 ) -> None: 

1522 # Ensure that we got a new enough version 

1523 parts = trigger.split("/") 

1524 match len(parts): 

1525 case 2: 

1526 trigsrc, trigver = parts 

1527 case 4: 1527 ↛ 1529line 1527 didn't jump to line 1529 because the pattern on line 1527 always matched

1528 trigsrc, trigarch, trigver, rebuild = parts 

1529 case _: 

1530 self.logger.info("Ignoring invalid test trigger %s", trigger) 

1531 return 

1532 if trigsrc == src and apt_pkg.version_compare(ver, trigver) < 0: 1532 ↛ 1533line 1532 didn't jump to line 1533 because the condition on line 1532 was never true

1533 self.logger.debug( 

1534 "test trigger %s, but run for older version %s, ignoring", trigger, ver 

1535 ) 

1536 return 

1537 

1538 stored_result = ( 

1539 self.test_results.setdefault(trigger, {}) 

1540 .setdefault(src, {}) 

1541 .setdefault(arch, [Result.FAIL, None, "", 0]) 

1542 ) 

1543 

1544 # reruns shouldn't flip the result from PASS or NEUTRAL to 

1545 # FAIL, so remember the most recent version of the best result 

1546 # we've seen. Except for reference updates, which we always 

1547 # want to update with the most recent result. The result data 

1548 # may not be ordered by timestamp, so we need to check time. 

1549 update = False 

1550 if self.options.adt_baseline == "reference" and trigger == REF_TRIG: 

1551 if stored_result[3] < timestamp: 

1552 update = True 

1553 elif status_to_add < stored_result[0]: 

1554 update = True 

1555 elif status_to_add == stored_result[0] and stored_result[3] < timestamp: 

1556 update = True 

1557 

1558 if update: 

1559 stored_result[0] = status_to_add 

1560 stored_result[1] = ver 

1561 stored_result[2] = run_id 

1562 stored_result[3] = timestamp 

1563 

1564 def send_test_request( 

1565 self, src: str, arch: str, triggers: list[str], huge: bool = False 

1566 ) -> None: 

1567 """Send out AMQP request for testing src/arch for triggers 

1568 

1569 If huge is true, then the request will be put into the -huge instead of 

1570 normal queue. 

1571 """ 

1572 if self.options.dry_run: 1572 ↛ 1573line 1572 didn't jump to line 1573 because the condition on line 1572 was never true

1573 return 

1574 

1575 params: dict[str, Any] = {"triggers": triggers} 

1576 if self.options.adt_ppas: 

1577 params["ppas"] = self.options.adt_ppas 

1578 qname = f"debci-ppa-{self.options.series}-{arch}" 

1579 elif huge: 

1580 qname = f"debci-huge-{self.options.series}-{arch}" 

1581 else: 

1582 qname = f"debci-{self.options.series}-{arch}" 

1583 params["submit-time"] = time.strftime("%Y-%m-%d %H:%M:%S%z", time.gmtime()) 

1584 

1585 if self.amqp_channel: 1585 ↛ 1586line 1585 didn't jump to line 1586 because the condition on line 1585 was never true

1586 self.amqp_channel.basic_publish( 

1587 amqp.Message( 

1588 src + "\n" + json.dumps(params), delivery_mode=2 

1589 ), # persistent 

1590 routing_key=qname, 

1591 ) 

1592 # we save pending.json with every request, so that if britney 

1593 # crashes we don't re-request tests. This is only needed when using 

1594 # real amqp, as with file-based submission the pending tests are 

1595 # returned by debci along with the results each run. 

1596 self.save_pending_json() 

1597 else: 

1598 # for file-based submission, triggers are space separated 

1599 params["triggers"] = [" ".join(params["triggers"])] 

1600 assert self.amqp_file_handle 

1601 self.amqp_file_handle.write(f"{qname}:{src} {json.dumps(params)}\n") 

1602 

1603 def pkg_test_request( 

1604 self, src: str, arch: str, all_triggers: list[str], huge: bool = False 

1605 ) -> None: 

1606 """Request one package test for a set of triggers 

1607 

1608 all_triggers is a list of "pkgname/version". These are the packages 

1609 that will be taken from the source suite. The first package in this 

1610 list is the package that triggers the testing of src, the rest are 

1611 additional packages required for installability of the test deps. If 

1612 huge is true, then the request will be put into the -huge instead of 

1613 normal queue. 

1614 

1615 This will only be done if that test wasn't already requested in 

1616 a previous run (i. e. if it's not already in self.pending_tests) 

1617 or if there is already a fresh or a positive result for it. This 

1618 ensures to download current results for this package before 

1619 requesting any test.""" 

1620 trigger = all_triggers[0] 

1621 uses_swift = not self.options.adt_swift_url.startswith("file://") 

1622 try: 

1623 result = self.test_results[trigger][src][arch] 

1624 has_result = True 

1625 except KeyError: 

1626 has_result = False 

1627 

1628 if has_result: 

1629 result_state = result[0] 

1630 if result_state in {Result.OLD_PASS, Result.OLD_FAIL, Result.OLD_NEUTRAL}: 

1631 pass 

1632 elif ( 

1633 result_state == Result.FAIL 

1634 and self.result_in_baseline(src, arch)[0] 

1635 in {Result.PASS, Result.NEUTRAL, Result.OLD_PASS, Result.OLD_NEUTRAL} 

1636 and self._now - result[3] > self.options.adt_retry_older_than 

1637 ): 

1638 # We might want to retry this failure, so continue 

1639 pass 

1640 elif not uses_swift: 

1641 # We're done if we don't retrigger and we're not using swift 

1642 return 

1643 elif result_state in {Result.PASS, Result.NEUTRAL}: 

1644 self.logger.debug( 

1645 "%s/%s triggered by %s already known", src, arch, trigger 

1646 ) 

1647 return 

1648 

1649 # Without swift we don't expect new results 

1650 if uses_swift: 

1651 self.logger.info( 

1652 "Checking for new results for failed %s/%s for trigger %s", 

1653 src, 

1654 arch, 

1655 trigger, 

1656 ) 

1657 self.fetch_swift_results(self.options.adt_swift_url, src, arch) 

1658 # do we have one now? 

1659 try: 

1660 self.test_results[trigger][src][arch] 

1661 return 

1662 except KeyError: 

1663 pass 

1664 

1665 self.request_test_if_not_queued(src, arch, trigger, all_triggers, huge=huge) 

1666 

1667 def request_test_if_not_queued( 

1668 self, 

1669 src: str, 

1670 arch: str, 

1671 trigger: str, 

1672 all_triggers: list[str] = [], 

1673 huge: bool = False, 

1674 ) -> None: 

1675 assert self.pending_tests is not None # for type checking 

1676 if not all_triggers: 

1677 all_triggers = [trigger] 

1678 

1679 # Don't re-request if it's already pending 

1680 arch_dict = self.pending_tests.setdefault(trigger, {}).setdefault(src, {}) 

1681 if arch in arch_dict.keys(): 

1682 self.logger.debug( 

1683 "Test %s/%s for %s is already pending, not queueing", src, arch, trigger 

1684 ) 

1685 else: 

1686 self.logger.debug( 

1687 "Requesting %s autopkgtest on %s to verify %s", src, arch, trigger 

1688 ) 

1689 arch_dict[arch] = self._now 

1690 self.send_test_request(src, arch, all_triggers, huge=huge) 

1691 

1692 def result_in_baseline(self, src: str, arch: str) -> list[Any]: 

1693 """Get the result for src on arch in the baseline 

1694 

1695 The baseline is optionally all data or a reference set) 

1696 """ 

1697 

1698 # this requires iterating over all cached results and thus is expensive; 

1699 # cache the results 

1700 try: 

1701 return self.result_in_baseline_cache[src][arch] 

1702 except KeyError: 

1703 pass 

1704 

1705 result_reference: list[Any] = [Result.NONE, None, "", 0] 

1706 if self.options.adt_baseline == "reference": 

1707 if src not in self.suite_info.target_suite.sources: 1707 ↛ 1708line 1707 didn't jump to line 1708 because the condition on line 1707 was never true

1708 return result_reference 

1709 

1710 try: 

1711 result_reference = self.test_results[REF_TRIG][src][arch] 

1712 self.logger.debug( 

1713 "Found result for src %s in reference: %s", 

1714 src, 

1715 result_reference[0].name, 

1716 ) 

1717 except KeyError: 

1718 self.logger.debug( 

1719 "Found NO result for src %s in reference: %s", 

1720 src, 

1721 result_reference[0].name, 

1722 ) 

1723 self.result_in_baseline_cache[src][arch] = deepcopy(result_reference) 

1724 return result_reference 

1725 

1726 result_ever: list[Any] = [Result.FAIL, None, "", 0] 

1727 for srcmap in self.test_results.values(): 

1728 try: 

1729 if srcmap[src][arch][0] != Result.FAIL: 

1730 result_ever = srcmap[src][arch] 

1731 # If we are not looking at a reference run, We don't really 

1732 # care about anything except the status, so we're done 

1733 # once we find a PASS. 

1734 if result_ever[0] == Result.PASS: 

1735 break 

1736 except KeyError: 

1737 pass 

1738 

1739 self.result_in_baseline_cache[src][arch] = deepcopy(result_ever) 

1740 self.logger.debug("Result for src %s ever: %s", src, result_ever[0].name) 

1741 return result_ever 

1742 

1743 def has_test_in_target(self, src: str) -> bool: 

1744 test_in_target = False 

1745 try: 

1746 srcinfo = self.suite_info.target_suite.sources[src] 

1747 if "autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo): 

1748 test_in_target = True 

1749 # AttributeError is only needed for the test suite as 

1750 # srcinfo can be a NoneType 

1751 except (KeyError, AttributeError): 

1752 pass 

1753 

1754 return test_in_target 

1755 

1756 def pkg_test_result( 

1757 self, src: str, ver: str, arch: str, trigger: str 

1758 ) -> tuple[str, str, str | None, str]: 

1759 """Get current test status of a particular package 

1760 

1761 Return (status, real_version, run_id, log_url) tuple; status is a key in 

1762 EXCUSES_LABELS. run_id is None if the test is still running. 

1763 """ 

1764 assert self.pending_tests is not None # for type checking 

1765 # determine current test result status 

1766 run_id = None 

1767 try: 

1768 r = self.test_results[trigger][src][arch] 

1769 ver = r[1] 

1770 run_id = r[2] 

1771 

1772 if r[0] in {Result.FAIL, Result.OLD_FAIL}: 

1773 # determine current test result status 

1774 baseline_result = self.result_in_baseline(src, arch)[0] 

1775 

1776 # Special-case triggers from linux-meta*: we cannot compare 

1777 # results against different kernels, as e. g. a DKMS module 

1778 # might work against the default kernel but fail against a 

1779 # different flavor; so for those, ignore the "ever 

1780 # passed" check; FIXME: check against trigsrc only 

1781 if self.options.adt_baseline != "reference" and ( 

1782 trigger.startswith("linux-meta") or trigger.startswith("linux/") 

1783 ): 

1784 baseline_result = Result.FAIL 

1785 

1786 # Check if the autopkgtest (still) exists in the target suite 

1787 test_in_target = self.has_test_in_target(src) 

1788 

1789 if test_in_target and baseline_result in { 

1790 Result.NONE, 

1791 Result.OLD_FAIL, 

1792 Result.OLD_NEUTRAL, 

1793 Result.OLD_PASS, 

1794 }: 

1795 self.request_test_if_not_queued(src, arch, REF_TRIG) 

1796 

1797 if self.has_force_badtest(src, ver, arch): 

1798 result = "IGNORE-FAIL" 

1799 elif not test_in_target: 

1800 if self.options.adt_ignore_failure_for_new_tests: 

1801 result = "IGNORE-FAIL" 

1802 else: 

1803 result = r[0].name 

1804 elif baseline_result in {Result.FAIL, Result.OLD_FAIL}: 

1805 result = "ALWAYSFAIL" 

1806 elif baseline_result == Result.NONE: 1806 ↛ 1807line 1806 didn't jump to line 1807 because the condition on line 1806 was never true

1807 result = "RUNNING-REFERENCE" 

1808 else: 

1809 result = "REGRESSION" 

1810 

1811 else: 

1812 result = r[0].name 

1813 

1814 url = self.format_log_url(src, arch, run_id) 

1815 except KeyError: 

1816 # no result for src/arch; still running? 

1817 assert arch in self.pending_tests.get(trigger, {}).get(src, {}).keys(), ( 

1818 "Result for %s/%s/%s (triggered by %s) is neither known nor pending!" 

1819 % (src, ver, arch, trigger) 

1820 ) 

1821 

1822 if self.has_force_badtest(src, ver, arch): 

1823 result = "RUNNING-IGNORE" 

1824 else: 

1825 if self.has_test_in_target(src): 

1826 baseline_result = self.result_in_baseline(src, arch)[0] 

1827 if baseline_result == Result.FAIL: 

1828 result = "RUNNING-ALWAYSFAIL" 

1829 else: 

1830 result = "RUNNING" 

1831 else: 

1832 if self.options.adt_ignore_failure_for_new_tests: 

1833 result = "RUNNING-IGNORE" 

1834 else: 

1835 result = "RUNNING" 

1836 url = self.options.adt_ci_url + "status/pending" 

1837 

1838 return (result, ver, run_id, url) 

1839 

1840 def has_force_badtest(self, src: str, ver: str, arch: str) -> bool: 

1841 """Check if src/ver/arch has a force-badtest hint""" 

1842 

1843 assert self.hints is not None 

1844 hints = self.hints.search("force-badtest", package=src) 

1845 if hints: 

1846 self.logger.info( 

1847 "Checking hints for %s/%s/%s: %s", 

1848 src, 

1849 arch, 

1850 ver, 

1851 [str(h) for h in hints], 

1852 ) 

1853 for hint in hints: 

1854 if [ 

1855 mi 

1856 for mi in hint.packages 

1857 if mi.architecture in ["source", arch] 

1858 and ( 

1859 mi.version is None 

1860 or mi.version == "all" # Historical unversioned hint 

1861 or apt_pkg.version_compare(ver, mi.version) <= 0 

1862 ) 

1863 ]: 

1864 return True 

1865 

1866 return False 

1867 

1868 def has_built_on_this_arch_or_is_arch_all( 

1869 self, src_data: SourcePackage, arch: str 

1870 ) -> bool: 

1871 """When a source builds arch:all binaries, those binaries are 

1872 added to all architectures and thus the source 'exists' 

1873 everywhere. This function checks if the source has any arch 

1874 specific binaries on this architecture and if not, if it 

1875 has them on any architecture. 

1876 """ 

1877 packages_s_a = self.suite_info.primary_source_suite.binaries[arch] 

1878 has_unknown_binary = False 

1879 for binary_s in filter_out_faux(src_data.binaries): 

1880 try: 

1881 binary_u = packages_s_a[binary_s.package_name] 

1882 except KeyError: 

1883 # src_data.binaries has all the built binaries, so if 

1884 # we get here, we know that at least one architecture 

1885 # has architecture specific binaries 

1886 has_unknown_binary = True 

1887 continue 

1888 if binary_u.architecture == arch: 

1889 return True 

1890 # If we get here, we have only seen arch:all packages for this 

1891 # arch. 

1892 return not has_unknown_binary