Coverage for britney2/policies/autopkgtest.py: 90%

795 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-08-31 22:10 +0000

1# -*- coding: utf-8 -*- 

2 

3# Copyright (C) 2013 - 2016 Canonical Ltd. 

4# Authors: 

5# Colin Watson <cjwatson@ubuntu.com> 

6# Jean-Baptiste Lallement <jean-baptiste.lallement@canonical.com> 

7# Martin Pitt <martin.pitt@ubuntu.com> 

8 

9# This program is free software; you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation; either version 2 of the License, or 

12# (at your option) any later version. 

13 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18 

19import calendar 

20import collections 

21import http.client 

22import io 

23import itertools 

24import json 

25import optparse 

26import os 

27import socket 

28import sys 

29import tarfile 

30import time 

31import urllib.parse 

32from copy import deepcopy 

33from enum import Enum 

34from functools import lru_cache, total_ordering 

35from typing import TYPE_CHECKING, Any, Optional, cast 

36from collections.abc import Iterator 

37from urllib.error import HTTPError 

38from urllib.request import urlopen 

39from urllib.response import addinfourl 

40 

41import apt_pkg 

42 

43import britney2.hints 

44from britney2 import ( 

45 BinaryPackageId, 

46 PackageId, 

47 SourcePackage, 

48 SuiteClass, 

49 Suites, 

50 TargetSuite, 

51) 

52from britney2.migrationitem import MigrationItem 

53from britney2.policies import PolicyVerdict 

54from britney2.policies.policy import AbstractBasePolicy 

55from britney2.utils import iter_except, parse_option 

56 

57if TYPE_CHECKING: 57 ↛ 58line 57 didn't jump to line 58, because the condition on line 57 was never true

58 import amqplib.client_0_8 as amqp 

59 

60 from ..britney import Britney 

61 from ..excuse import Excuse 

62 from ..hints import HintParser 

63 

64 

65@total_ordering 

66class Result(Enum): 

67 PASS = 1 

68 NEUTRAL = 2 

69 FAIL = 3 

70 OLD_PASS = 4 

71 OLD_NEUTRAL = 5 

72 OLD_FAIL = 6 

73 NONE = 7 

74 

75 def __lt__(self, other: "Result") -> bool: 

76 return True if self.value < other.value else False 

77 

78 

79EXCUSES_LABELS = { 

80 "PASS": '<span style="background:#87d96c">Pass</span>', 

81 "OLD_PASS": '<span style="background:#87d96c">Pass</span>', 

82 "NEUTRAL": '<span style="background:#e5c545">No tests, superficial or marked flaky</span>', 

83 "OLD_NEUTRAL": '<span style="background:#e5c545">No tests, superficial or marked flaky</span>', 

84 "FAIL": '<span style="background:#ff6666">Failed</span>', 

85 "OLD_FAIL": '<span style="background:#ff6666">Failed</span>', 

86 "ALWAYSFAIL": '<span style="background:#e5c545">Failed (not a regression)</span>', 

87 "REGRESSION": '<span style="background:#ff6666">Regression</span>', 

88 "IGNORE-FAIL": '<span style="background:#e5c545">Ignored failure</span>', 

89 "RUNNING": '<span style="background:#99ddff">Test triggered</span>', 

90 "RUNNING-ALWAYSFAIL": '<span style="background:#99ddff">Test triggered (will not be considered a regression)</span>', 

91 "RUNNING-IGNORE": '<span style="background:#99ddff">Test triggered (failure will be ignored)</span>', 

92 "RUNNING-REFERENCE": '<span style="background:#ff6666">Reference test triggered, but real test failed already</span>', 

93} 

94 

95REF_TRIG = "migration-reference/0" 

96 

97VERSION_KEY = "britney-autopkgtest-pending-file-version" 

98 

99 

100def srchash(src: str) -> str: 

101 """archive hash prefix for source package""" 

102 

103 if src.startswith("lib"): 103 ↛ 104line 103 didn't jump to line 104, because the condition on line 103 was never true

104 return src[:4] 

105 else: 

106 return src[0] 

107 

108 

109def added_pkgs_compared_to_target_suite( 

110 package_ids: frozenset[BinaryPackageId], 

111 target_suite: TargetSuite, 

112 *, 

113 invert: bool = False, 

114) -> Iterator[BinaryPackageId]: 

115 if invert: 115 ↛ 116line 115 didn't jump to line 116, because the condition on line 115 was never true

116 pkgs_ids_to_ignore = package_ids - set( 

117 target_suite.which_of_these_are_in_the_suite(package_ids) 

118 ) 

119 names_ignored = {p.package_name for p in pkgs_ids_to_ignore} 

120 else: 

121 names_ignored = { 

122 p.package_name 

123 for p in target_suite.which_of_these_are_in_the_suite(package_ids) 

124 } 

125 yield from (p for p in package_ids if p.package_name not in names_ignored) 

126 

127 

128def all_leaf_results( 

129 test_results: dict[str, dict[str, dict[str, list[Any]]]], 

130) -> Iterator[list[Any]]: 

131 for trigger in test_results.values(): 

132 for arch in trigger.values(): 

133 yield from arch.values() 

134 

135 

136def mark_result_as_old(result: Result) -> Result: 

137 """Convert current result into corresponding old result""" 

138 

139 if result == Result.FAIL: 

140 result = Result.OLD_FAIL 

141 elif result == Result.PASS: 

142 result = Result.OLD_PASS 

143 elif result == Result.NEUTRAL: 143 ↛ 145line 143 didn't jump to line 145, because the condition on line 143 was never false

144 result = Result.OLD_NEUTRAL 

145 return result 

146 

147 

148class AutopkgtestPolicy(AbstractBasePolicy): 

149 """autopkgtest regression policy for source migrations 

150 

151 Run autopkgtests for the excuse and all of its reverse dependencies, and 

152 reject the upload if any of those regress. 

153 """ 

154 

155 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

156 super().__init__( 

157 "autopkgtest", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

158 ) 

159 # tests requested in this and previous runs 

160 # trigger -> src -> [arch] 

161 self.pending_tests: Optional[dict[str, dict[str, dict[str, int]]]] = None 

162 self.pending_tests_file = os.path.join( 

163 self.state_dir, "autopkgtest-pending.json" 

164 ) 

165 self.testsuite_triggers: dict[str, set[str]] = {} 

166 self.result_in_baseline_cache: dict[str, dict[str, list[Any]]] = ( 

167 collections.defaultdict(dict) 

168 ) 

169 

170 self.amqp_file_handle: io.TextIOWrapper | None = None 

171 

172 # Default values for this policy's options 

173 parse_option(options, "adt_baseline") 

174 parse_option(options, "adt_huge", to_int=True) 

175 parse_option(options, "adt_ppas") 

176 parse_option(options, "adt_reference_max_age", day_to_sec=True) 

177 parse_option(options, "adt_pending_max_age", default=5, day_to_sec=True) 

178 parse_option(options, "adt_regression_penalty", default=0, to_int=True) 

179 parse_option(options, "adt_log_url") # see below for defaults 

180 parse_option(options, "adt_retry_url") # see below for defaults 

181 parse_option(options, "adt_retry_older_than", day_to_sec=True) 

182 parse_option(options, "adt_results_cache_age", day_to_sec=True) 

183 parse_option(options, "adt_shared_results_cache") 

184 parse_option(options, "adt_success_bounty", default=0, to_int=True) 

185 parse_option(options, "adt_ignore_failure_for_new_tests", to_bool=True) 

186 

187 # When ADT_RESULTS_CACHE_AGE is smaller than or equal to 

188 # ADT_REFERENCE_MAX_AGE old reference result will be removed from cache 

189 # before the newly scheduled results are in, potentially causing 

190 # additional waiting. For packages like glibc this might cause an 

191 # infinite delay as there will always be a package that's 

192 # waiting. Similarly for ADT_RETRY_OLDER_THAN. 

193 if self.options.adt_results_cache_age <= self.options.adt_reference_max_age: 

194 self.logger.warning( 

195 "Unexpected: ADT_REFERENCE_MAX_AGE bigger than ADT_RESULTS_CACHE_AGE" 

196 ) 

197 if self.options.adt_results_cache_age <= self.options.adt_retry_older_than: 

198 self.logger.warning( 

199 "Unexpected: ADT_RETRY_OLDER_THAN bigger than ADT_RESULTS_CACHE_AGE" 

200 ) 

201 

202 if not self.options.adt_log_url: 202 ↛ 228line 202 didn't jump to line 228, because the condition on line 202 was never false

203 # Historical defaults 

204 if self.options.adt_swift_url.startswith("file://"): 

205 self.options.adt_log_url = os.path.join( 

206 self.options.adt_ci_url, 

207 "data", 

208 "autopkgtest", 

209 self.options.series, 

210 "{arch}", 

211 "{hash}", 

212 "{package}", 

213 "{run_id}", 

214 "log.gz", 

215 ) 

216 else: 

217 self.options.adt_log_url = os.path.join( 

218 self.options.adt_swift_url, 

219 "{swift_container}", 

220 self.options.series, 

221 "{arch}", 

222 "{hash}", 

223 "{package}", 

224 "{run_id}", 

225 "log.gz", 

226 ) 

227 

228 if hasattr(self.options, "adt_retry_url_mech"): 228 ↛ 229line 228 didn't jump to line 229, because the condition on line 228 was never true

229 self.logger.warning( 

230 "The ADT_RETRY_URL_MECH configuration has been deprecated." 

231 ) 

232 self.logger.warning( 

233 "Instead britney now supports ADT_RETRY_URL for more flexibility." 

234 ) 

235 if self.options.adt_retry_url: 

236 self.logger.error( 

237 "Please remove the ADT_RETRY_URL_MECH as ADT_RETRY_URL will be used." 

238 ) 

239 elif self.options.adt_retry_url_mech == "run_id": 

240 self.options.adt_retry_url = ( 

241 self.options.adt_ci_url + "api/v1/retry/{run_id}" 

242 ) 

243 if not self.options.adt_retry_url: 243 ↛ 260line 243 didn't jump to line 260, because the condition on line 243 was never false

244 # Historical default 

245 self.options.adt_retry_url = ( 

246 self.options.adt_ci_url 

247 + "request.cgi?" 

248 + "release={release}&arch={arch}&package={package}&trigger={trigger}{ppas}" 

249 ) 

250 

251 # results map: trigger -> src -> arch -> [passed, version, run_id, seen] 

252 # - trigger is "source/version" of an unstable package that triggered 

253 # this test run. 

254 # - "passed" is a Result 

255 # - "version" is the package version of "src" of that test 

256 # - "run_id" is an opaque ID that identifies a particular test run for 

257 # a given src/arch. 

258 # - "seen" is an approximate time stamp of the test run. How this is 

259 # deduced depends on the interface used. 

260 self.test_results: dict[str, dict[str, dict[str, list[Any]]]] = {} 

261 if self.options.adt_shared_results_cache: 

262 self.results_cache_file = self.options.adt_shared_results_cache 

263 else: 

264 self.results_cache_file = os.path.join( 

265 self.state_dir, "autopkgtest-results.cache" 

266 ) 

267 

268 try: 

269 self.options.adt_ppas = self.options.adt_ppas.strip().split() 

270 except AttributeError: 

271 self.options.adt_ppas = [] 

272 

273 self.swift_container = "autopkgtest-" + options.series 

274 if self.options.adt_ppas: 

275 self.swift_container += "-" + options.adt_ppas[-1].replace("/", "-") 

276 

277 # restrict adt_arches to architectures we actually run for 

278 self.adt_arches = [] 

279 for arch in self.options.adt_arches.split(): 

280 if arch in self.options.architectures: 

281 self.adt_arches.append(arch) 

282 else: 

283 self.logger.info( 

284 "Ignoring ADT_ARCHES %s as it is not in architectures list", arch 

285 ) 

286 

287 def __del__(self) -> None: 

288 if self.amqp_file_handle: 288 ↛ exitline 288 didn't return from function '__del__', because the condition on line 288 was never false

289 try: 

290 self.amqp_file_handle.close() 

291 except AttributeError: 

292 pass 

293 

294 def register_hints(self, hint_parser: "HintParser") -> None: 

295 hint_parser.register_hint_type( 

296 "force-badtest", britney2.hints.split_into_one_hint_per_package 

297 ) 

298 hint_parser.register_hint_type( 

299 "force-skiptest", britney2.hints.split_into_one_hint_per_package 

300 ) 

301 

302 def initialise(self, britney: "Britney") -> None: 

303 super().initialise(britney) 

304 # We want to use the "current" time stamp in multiple locations 

305 time_now = round(time.time()) 

306 if hasattr(self.options, "fake_runtime"): 

307 time_now = int(self.options.fake_runtime) 

308 self._now = time_now 

309 # compute inverse Testsuite-Triggers: map, unifying all series 

310 self.logger.info("Building inverse testsuite_triggers map") 

311 for suite in self.suite_info: 

312 for src, data in suite.sources.items(): 

313 for trigger in data.testsuite_triggers: 

314 self.testsuite_triggers.setdefault(trigger, set()).add(src) 

315 target_suite_name = self.suite_info.target_suite.name 

316 

317 os.makedirs(self.state_dir, exist_ok=True) 

318 self.read_pending_tests() 

319 

320 # read the cached results that we collected so far 

321 if os.path.exists(self.results_cache_file): 

322 with open(self.results_cache_file) as f: 

323 test_results = json.load(f) 

324 self.test_results = self.check_and_upgrade_cache(test_results) 

325 self.logger.info("Read previous results from %s", self.results_cache_file) 

326 else: 

327 self.logger.info( 

328 "%s does not exist, re-downloading all results from swift", 

329 self.results_cache_file, 

330 ) 

331 

332 # read in the new results 

333 if self.options.adt_swift_url.startswith("file://"): 

334 debci_file = self.options.adt_swift_url[7:] 

335 if os.path.exists(debci_file): 

336 with open(debci_file) as f: 

337 test_results = json.load(f) 

338 self.logger.info("Read new results from %s", debci_file) 

339 for res in test_results["results"]: 

340 # if there's no date, the test didn't finish yet 

341 if res["date"] is None: 341 ↛ 342line 341 didn't jump to line 342, because the condition on line 341 was never true

342 continue 

343 (test_suite, triggers, src, arch, ver, status, run_id, seen) = [ 

344 res["suite"], 

345 res["trigger"], 

346 res["package"], 

347 res["arch"], 

348 res["version"], 

349 res["status"], 

350 str(res["run_id"]), 

351 round( 

352 calendar.timegm( 

353 time.strptime(res["date"][0:-5], "%Y-%m-%dT%H:%M:%S") 

354 ) 

355 ), 

356 ] 

357 if test_suite != target_suite_name: 357 ↛ 359line 357 didn't jump to line 359, because the condition on line 357 was never true

358 # not requested for this target suite, so ignore 

359 continue 

360 if triggers is None: 360 ↛ 362line 360 didn't jump to line 362, because the condition on line 360 was never true

361 # not requested for this policy, so ignore 

362 continue 

363 if status is None: 

364 # still running => pending 

365 continue 

366 for trigger in triggers.split(): 

367 # remove matching test requests 

368 self.remove_from_pending(trigger, src, arch, seen) 

369 if status == "tmpfail": 369 ↛ 371line 369 didn't jump to line 371, because the condition on line 369 was never true

370 # let's see if we still need it 

371 continue 

372 self.logger.debug( 

373 "Results %s %s %s added", src, trigger, status 

374 ) 

375 self.add_trigger_to_results( 

376 trigger, 

377 src, 

378 ver, 

379 arch, 

380 run_id, 

381 seen, 

382 Result[status.upper()], 

383 ) 

384 else: 

385 self.logger.info( 

386 "%s does not exist, no new data will be processed", debci_file 

387 ) 

388 

389 # The cache can contain results against versions of packages that 

390 # are not in any suite anymore. Strip those out, as we don't want 

391 # to use those results. Additionally, old references may be 

392 # filtered out. 

393 if self.options.adt_baseline == "reference": 

394 self.filter_old_results() 

395 

396 # we need sources, binaries, and installability tester, so for now 

397 # remember the whole britney object 

398 self.britney = britney 

399 

400 # Initialize AMQP connection 

401 self.amqp_channel: Optional["amqp.channel.Channel"] = None 

402 self.amqp_file_handle = None 

403 if self.options.dry_run: 403 ↛ 404line 403 didn't jump to line 404, because the condition on line 403 was never true

404 return 

405 

406 amqp_url = self.options.adt_amqp 

407 

408 if amqp_url.startswith("amqp://"): 408 ↛ 409line 408 didn't jump to line 409, because the condition on line 408 was never true

409 import amqplib.client_0_8 as amqp 

410 

411 # depending on the setup we connect to a AMQP server 

412 creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False) 

413 self.amqp_con = amqp.Connection( 

414 creds.hostname, userid=creds.username, password=creds.password 

415 ) 

416 self.amqp_channel = self.amqp_con.channel() 

417 self.logger.info("Connected to AMQP server") 

418 elif amqp_url.startswith("file://"): 418 ↛ 423line 418 didn't jump to line 423, because the condition on line 418 was never false

419 # or in Debian and in testing mode, adt_amqp will be a file:// URL 

420 amqp_file = amqp_url[7:] 

421 self.amqp_file_handle = open(amqp_file, "w", 1) 

422 else: 

423 raise RuntimeError("Unknown ADT_AMQP schema %s" % amqp_url.split(":", 1)[0]) 

424 

425 def check_and_upgrade_cache( 

426 self, test_results: dict[str, dict[str, dict[str, list[Any]]]] 

427 ) -> dict[str, dict[str, dict[str, list[Any]]]]: 

428 for leaf_result in all_leaf_results(test_results): 

429 leaf_result[0] = Result[leaf_result[0]] 

430 

431 # Drop results older than ADT_RESULTS_CACHE_AGE 

432 for trigger in list(test_results.keys()): 

433 for pkg in list(test_results[trigger].keys()): 

434 for arch in list(test_results[trigger][pkg].keys()): 

435 arch_result = test_results[trigger][pkg][arch] 

436 if self._now - arch_result[3] > self.options.adt_results_cache_age: 436 ↛ 437line 436 didn't jump to line 437, because the condition on line 436 was never true

437 del test_results[trigger][pkg][arch] 

438 if not test_results[trigger][pkg]: 438 ↛ 439line 438 didn't jump to line 439, because the condition on line 438 was never true

439 del test_results[trigger][pkg] 

440 if not test_results[trigger]: 440 ↛ 441line 440 didn't jump to line 441, because the condition on line 440 was never true

441 del test_results[trigger] 

442 

443 return test_results 

444 

445 def filter_old_results(self) -> None: 

446 """Remove results for old versions and reference runs from the cache. 

447 

448 For now, only delete reference runs. If we delete regular 

449 results after a while, packages with lots of triggered tests may 

450 never have all the results at the same time.""" 

451 

452 test_results = self.test_results 

453 

454 for trigger, trigger_data in test_results.items(): 

455 for src, results in trigger_data.items(): 

456 for arch, result in results.items(): 

457 if ( 

458 trigger == REF_TRIG 

459 and self._now - result[3] > self.options.adt_reference_max_age 

460 ): 

461 result[0] = mark_result_as_old(result[0]) 

462 elif not self.test_version_in_any_suite(src, result[1]): 

463 result[0] = mark_result_as_old(result[0]) 

464 

465 def test_version_in_any_suite(self, src: str, version: str) -> bool: 

466 """Check if the mentioned version of src is found in a suite 

467 

468 To prevent regressions in the target suite, the result should be 

469 from a test with the version of the package in either the source 

470 suite or the target suite. The source suite is also valid, 

471 because due to versioned test dependencies and Breaks/Conflicts 

472 relations, regularly the version in the source suite is used 

473 during testing. 

474 """ 

475 

476 versions = set() 

477 for suite in self.suite_info: 

478 try: 

479 srcinfo = suite.sources[src] 

480 except KeyError: 

481 continue 

482 versions.add(srcinfo.version) 

483 

484 valid_version = False 

485 for ver in versions: 

486 if apt_pkg.version_compare(ver, version) == 0: 

487 valid_version = True 

488 break 

489 

490 return valid_version 

491 

492 def save_pending_json(self) -> None: 

493 # update the pending tests on-disk cache 

494 self.logger.info( 

495 "Updating pending requested tests in %s" % self.pending_tests_file 

496 ) 

497 # Shallow clone pending_tests as we only modify the toplevel and change its type. 

498 pending_tests: dict[str, Any] = {} 

499 if self.pending_tests: 

500 pending_tests = dict(self.pending_tests) 

501 # Avoid adding if there are no pending results at all (eases testing) 

502 pending_tests[VERSION_KEY] = 1 

503 with open(self.pending_tests_file + ".new", "w") as f: 

504 json.dump(pending_tests, f, indent=2) 

505 os.rename(self.pending_tests_file + ".new", self.pending_tests_file) 

506 

507 def save_state(self, britney: "Britney") -> None: 

508 super().save_state(britney) 

509 

510 # update the results on-disk cache, unless we are using a r/o shared one 

511 if not self.options.adt_shared_results_cache: 

512 self.logger.info("Updating results cache") 

513 test_results = deepcopy(self.test_results) 

514 for result in all_leaf_results(test_results): 

515 result[0] = result[0].name 

516 with open(self.results_cache_file + ".new", "w") as f: 

517 json.dump(test_results, f, indent=2) 

518 os.rename(self.results_cache_file + ".new", self.results_cache_file) 

519 

520 self.save_pending_json() 

521 

522 def format_retry_url( 

523 self, run_id: Optional[str], arch: str, testsrc: str, trigger: str 

524 ) -> str: 

525 if self.options.adt_ppas: 

526 ppas = "&" + urllib.parse.urlencode( 

527 [("ppa", p) for p in self.options.adt_ppas] 

528 ) 

529 else: 

530 ppas = "" 

531 return cast(str, self.options.adt_retry_url).format( 

532 run_id=run_id, 

533 release=self.options.series, 

534 arch=arch, 

535 package=testsrc, 

536 trigger=urllib.parse.quote_plus(trigger), 

537 ppas=ppas, 

538 ) 

539 

540 def format_log_url(self, testsrc: str, arch: str, run_id: str) -> str: 

541 return cast(str, self.options.adt_log_url).format( 

542 release=self.options.series, 

543 swift_container=self.swift_container, 

544 hash=srchash(testsrc), 

545 package=testsrc, 

546 arch=arch, 

547 run_id=run_id, 

548 ) 

549 

550 def apply_src_policy_impl( 

551 self, 

552 tests_info: dict[str, Any], 

553 item: MigrationItem, 

554 source_data_tdist: Optional[SourcePackage], 

555 source_data_srcdist: SourcePackage, 

556 excuse: "Excuse", 

557 ) -> PolicyVerdict: 

558 assert self.hints is not None # for type checking 

559 # initialize 

560 verdict = PolicyVerdict.PASS 

561 all_self_tests_pass = False 

562 source_name = item.package 

563 results_info = [] 

564 

565 # skip/delay autopkgtests until new package is built somewhere 

566 if not source_data_srcdist.binaries: 

567 self.logger.debug( 

568 "%s hasnot been built anywhere, skipping autopkgtest policy", 

569 excuse.name, 

570 ) 

571 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

572 excuse.add_verdict_info(verdict, "nothing built yet, autopkgtest delayed") 

573 

574 if "all" in excuse.missing_builds: 

575 self.logger.debug( 

576 "%s hasnot been built for arch:all, skipping autopkgtest policy", 

577 source_name, 

578 ) 

579 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

580 excuse.add_verdict_info( 

581 verdict, "arch:all not built yet, autopkgtest delayed" 

582 ) 

583 

584 if not verdict.is_rejected: 

585 self.logger.debug("Checking autopkgtests for %s", source_name) 

586 trigger = source_name + "/" + source_data_srcdist.version 

587 

588 # build a (testsrc, testver) → arch → (status, run_id, log_url) map; we trigger/check test 

589 # results per architecture for technical/efficiency reasons, but we 

590 # want to evaluate and present the results by tested source package 

591 # first 

592 pkg_arch_result: dict[ 

593 tuple[str, str], dict[str, tuple[str, Optional[str], str]] 

594 ] = collections.defaultdict(dict) 

595 for arch in self.adt_arches: 

596 if arch in excuse.missing_builds: 

597 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

598 self.logger.debug( 

599 "%s hasnot been built on arch %s, delay autopkgtest there", 

600 source_name, 

601 arch, 

602 ) 

603 excuse.add_verdict_info( 

604 verdict, 

605 "arch:%s not built yet, autopkgtest delayed there" % arch, 

606 ) 

607 elif arch in excuse.policy_info["depends"].get( 

608 "arch_all_not_installable", [] 

609 ): 

610 self.logger.debug( 

611 "%s is uninstallable on arch %s (which is allowed), not running autopkgtest there", 

612 source_name, 

613 arch, 

614 ) 

615 excuse.addinfo( 

616 "uninstallable on arch %s (which is allowed), not running autopkgtest there" 

617 % arch 

618 ) 

619 elif ( 

620 arch in excuse.unsatisfiable_on_archs 

621 and arch 

622 not in excuse.policy_info["depends"].get( 

623 "autopkgtest_run_anyways", [] 

624 ) 

625 ): 

626 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

627 self.logger.debug( 

628 "%s is uninstallable on arch %s, not running autopkgtest there", 

629 source_name, 

630 arch, 

631 ) 

632 excuse.addinfo( 

633 "uninstallable on arch %s, not running autopkgtest there" % arch 

634 ) 

635 else: 

636 self.request_tests_for_source( 

637 item, arch, source_data_srcdist, pkg_arch_result, excuse 

638 ) 

639 

640 # add test result details to Excuse 

641 cloud_url = self.options.adt_ci_url + "packages/%(h)s/%(s)s/%(r)s/%(a)s" 

642 testver: Optional[str] 

643 for testsrc, testver in sorted(pkg_arch_result): 

644 assert testver is not None 

645 arch_results = pkg_arch_result[(testsrc, testver)] 

646 r = {v[0] for v in arch_results.values()} 

647 if r & {"FAIL", "OLD_FAIL", "REGRESSION"}: 

648 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

649 elif r & {"RUNNING", "RUNNING-REFERENCE"} and not verdict.is_rejected: 

650 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

651 # skip version if still running on all arches 

652 if not r - {"RUNNING", "RUNNING-ALWAYSFAIL", "RUNNING-IGNORE"}: 

653 testver = None 

654 

655 # A source package is eligible for the bounty if it has tests 

656 # of its own that pass on all tested architectures. 

657 if testsrc == source_name: 

658 excuse.autopkgtest_results = r 

659 if r == {"PASS"}: 

660 all_self_tests_pass = True 

661 

662 if testver: 

663 testname = "%s/%s" % (testsrc, testver) 

664 else: 

665 testname = testsrc 

666 

667 html_archmsg = [] 

668 for arch in sorted(arch_results): 

669 (status, run_id, log_url) = arch_results[arch] 

670 artifact_url = None 

671 retry_url = None 

672 reference_url = None 

673 reference_retry_url = None 

674 history_url = None 

675 if self.options.adt_ppas: 

676 if log_url.endswith("log.gz"): 

677 artifact_url = log_url.replace("log.gz", "artifacts.tar.gz") 

678 else: 

679 history_url = cloud_url % { 

680 "h": srchash(testsrc), 

681 "s": testsrc, 

682 "r": self.options.series, 

683 "a": arch, 

684 } 

685 if status not in ("PASS", "RUNNING", "RUNNING-IGNORE"): 

686 retry_url = self.format_retry_url( 

687 run_id, arch, testsrc, trigger 

688 ) 

689 

690 baseline_result = self.result_in_baseline(testsrc, arch) 

691 if baseline_result and baseline_result[0] != Result.NONE: 

692 baseline_run_id = str(baseline_result[2]) 

693 reference_url = self.format_log_url( 

694 testsrc, arch, baseline_run_id 

695 ) 

696 if self.options.adt_baseline == "reference": 

697 reference_retry_url = self.format_retry_url( 

698 baseline_run_id, arch, testsrc, REF_TRIG 

699 ) 

700 tests_info.setdefault(testname, {})[arch] = [ 

701 status, 

702 log_url, 

703 history_url, 

704 artifact_url, 

705 retry_url, 

706 ] 

707 

708 # render HTML snippet for testsrc entry for current arch 

709 if history_url: 

710 message = '<a href="%s">%s</a>' % (history_url, arch) 

711 else: 

712 message = arch 

713 message += ': <a href="%s">%s</a>' % ( 

714 log_url, 

715 EXCUSES_LABELS[status], 

716 ) 

717 if retry_url: 

718 message += ( 

719 '<a href="%s" style="text-decoration: none;"> ♻</a>' 

720 % retry_url 

721 ) 

722 if reference_url: 

723 message += ' (<a href="%s">reference</a>' % reference_url 

724 if reference_retry_url: 

725 message += ( 

726 '<a href="%s" style="text-decoration: none;"> ♻</a>' 

727 % reference_retry_url 

728 ) 

729 message += ")" 

730 if artifact_url: 

731 message += ' <a href="%s">[artifacts]</a>' % artifact_url 

732 html_archmsg.append(message) 

733 

734 # render HTML line for testsrc entry 

735 # - if action is or may be required 

736 # - for ones own package 

737 if ( 

738 r 

739 - { 

740 "PASS", 

741 "NEUTRAL", 

742 "RUNNING-ALWAYSFAIL", 

743 "ALWAYSFAIL", 

744 "IGNORE-FAIL", 

745 } 

746 or testsrc == source_name 

747 ): 

748 if testver: 

749 pkg = '<a href="#{0}">{0}</a>/{1}'.format(testsrc, testver) 

750 else: 

751 pkg = '<a href="#{0}">{0}</a>'.format(testsrc) 

752 results_info.append( 

753 "autopkgtest for %s: %s" % (pkg, ", ".join(html_archmsg)) 

754 ) 

755 

756 if verdict.is_rejected: 

757 # check for force-skiptest hint 

758 hints = self.hints.search( 

759 "force-skiptest", 

760 package=source_name, 

761 version=source_data_srcdist.version, 

762 ) 

763 if hints: 

764 excuse.addreason("skiptest") 

765 excuse.addinfo( 

766 "Should wait for tests relating to %s %s, but forced by %s" 

767 % (source_name, source_data_srcdist.version, hints[0].user) 

768 ) 

769 verdict = PolicyVerdict.PASS_HINTED 

770 else: 

771 excuse.addreason("autopkgtest") 

772 

773 if ( 

774 self.options.adt_success_bounty 

775 and verdict == PolicyVerdict.PASS 

776 and all_self_tests_pass 

777 ): 

778 excuse.add_bounty("autopkgtest", self.options.adt_success_bounty) 

779 if self.options.adt_regression_penalty and verdict in { 

780 PolicyVerdict.REJECTED_PERMANENTLY, 

781 PolicyVerdict.REJECTED_TEMPORARILY, 

782 }: 

783 if self.options.adt_regression_penalty > 0: 783 ↛ 786line 783 didn't jump to line 786, because the condition on line 783 was never false

784 excuse.add_penalty("autopkgtest", self.options.adt_regression_penalty) 

785 # In case we give penalties instead of blocking, we must always pass 

786 verdict = PolicyVerdict.PASS 

787 for i in results_info: 

788 if verdict.is_rejected: 

789 excuse.add_verdict_info(verdict, i) 

790 else: 

791 excuse.addinfo(i) 

792 

793 return verdict 

794 

795 # 

796 # helper functions 

797 # 

798 

799 @staticmethod 

800 def has_autodep8(srcinfo: SourcePackage) -> bool: 

801 """Check if package is covered by autodep8 

802 

803 srcinfo is an item from self.britney.sources 

804 """ 

805 # autodep8? 

806 for t in srcinfo.testsuite: 

807 if t.startswith("autopkgtest-pkg"): 

808 return True 

809 

810 return False 

811 

812 def request_tests_for_source( 

813 self, 

814 item: MigrationItem, 

815 arch: str, 

816 source_data_srcdist: SourcePackage, 

817 pkg_arch_result: dict[ 

818 tuple[str, str], dict[str, tuple[str, Optional[str], str]] 

819 ], 

820 excuse: "Excuse", 

821 ) -> None: 

822 pkg_universe = self.britney.pkg_universe 

823 target_suite = self.suite_info.target_suite 

824 source_suite = item.suite 

825 sources_t = target_suite.sources 

826 sources_s = item.suite.sources 

827 packages_s_a = item.suite.binaries[arch] 

828 source_name = item.package 

829 source_version = source_data_srcdist.version 

830 # request tests (unless they were already requested earlier or have a result) 

831 tests = self.tests_for_source(source_name, source_version, arch, excuse) 

832 is_huge = len(tests) > self.options.adt_huge 

833 

834 # Here we figure out what is required from the source suite 

835 # for the test to install successfully. 

836 # 

837 # The ImplicitDependencyPolicy does a similar calculation, but 

838 # if I (elbrus) understand correctly, only in the reverse 

839 # dependency direction. We are doing something similar here 

840 # but in the dependency direction (note: this code is older). 

841 # We use the ImplicitDependencyPolicy result for the reverse 

842 # dependencies and we keep the code below for the 

843 # dependencies. Using the ImplicitDependencyPolicy results 

844 # also in the reverse direction seems to require quite some 

845 # reorganisation to get that information available here, as in 

846 # the current state only the current excuse is available here 

847 # and the required other excuses may not be calculated yet. 

848 # 

849 # Loop over all binary packages from trigger and 

850 # recursively look up which *versioned* dependencies are 

851 # only satisfied in the source suite. 

852 # 

853 # For all binaries found, look up which packages they 

854 # break/conflict with in the target suite, but not in the 

855 # source suite. The main reason to do this is to cover test 

856 # dependencies, so we will check Testsuite-Triggers as 

857 # well. 

858 # 

859 # OI: do we need to do the first check in a smart way 

860 # (i.e. only for the packages that are actually going to be 

861 # installed) for the breaks/conflicts set as well, i.e. do 

862 # we need to check if any of the packages that we now 

863 # enforce being from the source suite, actually have new 

864 # versioned depends and new breaks/conflicts. 

865 # 

866 # For all binaries found, add the set of unique source 

867 # packages to the list of triggers. 

868 

869 bin_triggers: set[PackageId] = set() 

870 bin_new = set(source_data_srcdist.binaries) 

871 for n_binary in iter_except(bin_new.pop, KeyError): 

872 if n_binary in bin_triggers: 

873 continue 

874 bin_triggers.add(n_binary) 

875 

876 # Check if there is a dependency that is not 

877 # available in the target suite. 

878 # We add slightly too much here, because new binaries 

879 # will also show up, but they are already properly 

880 # installed. Nevermind. 

881 depends = pkg_universe.dependencies_of(n_binary) 

882 # depends is a frozenset{frozenset{BinaryPackageId, ..}} 

883 for deps_of_bin in depends: 

884 if target_suite.any_of_these_are_in_the_suite(deps_of_bin): 

885 # if any of the alternative dependencies is already 

886 # satisfied in the target suite, we can just ignore it 

887 continue 

888 # We'll figure out which version later 

889 bin_new.update( 

890 added_pkgs_compared_to_target_suite(deps_of_bin, target_suite) 

891 ) 

892 

893 # Check if the package breaks/conflicts anything. We might 

894 # be adding slightly too many source packages due to the 

895 # check here as a binary package that is broken may be 

896 # coming from a different source package in the source 

897 # suite. Nevermind. 

898 bin_broken = set() 

899 for t_binary in bin_triggers: 

900 # broken is a frozenset{BinaryPackageId, ..} 

901 broken = pkg_universe.negative_dependencies_of( 

902 cast(BinaryPackageId, t_binary) 

903 ) 

904 broken_in_target = { 

905 p.package_name 

906 for p in target_suite.which_of_these_are_in_the_suite(broken) 

907 } 

908 broken_in_source = { 

909 p.package_name 

910 for p in source_suite.which_of_these_are_in_the_suite(broken) 

911 } 

912 # We want packages with a newer version in the source suite that 

913 # no longer has the conflict. This is an approximation 

914 broken_filtered = set( 

915 p 

916 for p in broken 

917 if p.package_name in broken_in_target 

918 and p.package_name not in broken_in_source 

919 ) 

920 # We add the version in the target suite, but the code below will 

921 # change it to the version in the source suite 

922 bin_broken.update(broken_filtered) 

923 bin_triggers.update(bin_broken) 

924 

925 # The ImplicitDependencyPolicy also found packages that need 

926 # to migrate together, so add them to the triggers too. 

927 for bin_implicit in excuse.depends_packages_flattened: 

928 if bin_implicit.architecture == arch: 

929 bin_triggers.add(bin_implicit) 

930 

931 triggers = set() 

932 for t_binary2 in bin_triggers: 

933 if t_binary2.architecture == arch: 

934 try: 

935 source_of_bin = packages_s_a[t_binary2.package_name].source 

936 # If the version in the target suite is the same, don't add a trigger. 

937 # Note that we looked up the source package in the source suite. 

938 # If it were a different source package in the target suite, however, then 

939 # we would not have this source package in the same version anyway. 

940 if ( 

941 sources_t.get(source_of_bin, None) is None 

942 or sources_s[source_of_bin].version 

943 != sources_t[source_of_bin].version 

944 ): 

945 triggers.add( 

946 source_of_bin + "/" + sources_s[source_of_bin].version 

947 ) 

948 except KeyError: 

949 # Apparently the package was removed from 

950 # unstable e.g. if packages are replaced 

951 # (e.g. -dbg to -dbgsym) 

952 pass 

953 if t_binary2 not in source_data_srcdist.binaries: 

954 for tdep_src in self.testsuite_triggers.get( 954 ↛ 957line 954 didn't jump to line 957, because the loop on line 954 never started

955 t_binary2.package_name, set() 

956 ): 

957 try: 

958 # Only add trigger if versions in the target and source suites are different 

959 if ( 

960 sources_t.get(tdep_src, None) is None 

961 or sources_s[tdep_src].version 

962 != sources_t[tdep_src].version 

963 ): 

964 triggers.add( 

965 tdep_src + "/" + sources_s[tdep_src].version 

966 ) 

967 except KeyError: 

968 # Apparently the source was removed from 

969 # unstable (testsuite_triggers are unified 

970 # over all suites) 

971 pass 

972 trigger = source_name + "/" + source_version 

973 triggers.discard(trigger) 

974 triggers_list = sorted(list(triggers)) 

975 triggers_list.insert(0, trigger) 

976 

977 for testsrc, testver in tests: 

978 self.pkg_test_request(testsrc, arch, triggers_list, huge=is_huge) 

979 (result, real_ver, run_id, url) = self.pkg_test_result( 

980 testsrc, testver, arch, trigger 

981 ) 

982 pkg_arch_result[(testsrc, real_ver)][arch] = (result, run_id, url) 

983 

984 def tests_for_source( 

985 self, src: str, ver: str, arch: str, excuse: "Excuse" 

986 ) -> list[tuple[str, str]]: 

987 """Iterate over all tests that should be run for given source and arch""" 

988 

989 source_suite = self.suite_info.primary_source_suite 

990 target_suite = self.suite_info.target_suite 

991 sources_info = target_suite.sources 

992 binaries_info = target_suite.binaries[arch] 

993 

994 reported_pkgs = set() 

995 

996 tests = [] 

997 

998 # Debian doesn't have linux-meta, but Ubuntu does 

999 # for linux themselves we don't want to trigger tests -- these should 

1000 # all come from linux-meta*. A new kernel ABI without a corresponding 

1001 # -meta won't be installed and thus we can't sensibly run tests against 

1002 # it. 

1003 if ( 1003 ↛ 1007line 1003 didn't jump to line 1007

1004 src.startswith("linux") 

1005 and src.replace("linux", "linux-meta") in sources_info 

1006 ): 

1007 return [] 

1008 

1009 # we want to test the package itself, if it still has a test in unstable 

1010 # but only if the package actually exists on this arch 

1011 srcinfo = source_suite.sources[src] 

1012 if ("autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo)) and len( 

1013 excuse.packages[arch] 

1014 ) > 0: 

1015 reported_pkgs.add(src) 

1016 tests.append((src, ver)) 

1017 

1018 extra_bins = [] 

1019 # Debian doesn't have linux-meta, but Ubuntu does 

1020 # Hack: For new kernels trigger all DKMS packages by pretending that 

1021 # linux-meta* builds a "dkms" binary as well. With that we ensure that we 

1022 # don't regress DKMS drivers with new kernel versions. 

1023 if src.startswith("linux-meta"): 

1024 # does this have any image on this arch? 

1025 for pkg_id in srcinfo.binaries: 

1026 if pkg_id.architecture == arch and "-image" in pkg_id.package_name: 

1027 try: 

1028 extra_bins.append(binaries_info["dkms"].pkg_id) 

1029 except KeyError: 

1030 pass 

1031 

1032 if not self.has_built_on_this_arch_or_is_arch_all(srcinfo, arch): 

1033 return [] 

1034 

1035 pkg_universe = self.britney.pkg_universe 

1036 # plus all direct reverse dependencies and test triggers of its 

1037 # binaries which have an autopkgtest 

1038 for binary in itertools.chain(srcinfo.binaries, extra_bins): 

1039 rdeps = pkg_universe.reverse_dependencies_of(binary) 

1040 for rdep in rdeps: 

1041 try: 

1042 rdep_src = binaries_info[rdep.package_name].source 

1043 # Don't re-trigger the package itself here; this should 

1044 # have been done above if the package still continues to 

1045 # have an autopkgtest in unstable. 

1046 if rdep_src == src: 

1047 continue 

1048 except KeyError: 

1049 continue 

1050 

1051 rdep_src_info = sources_info[rdep_src] 

1052 if "autopkgtest" in rdep_src_info.testsuite or self.has_autodep8( 

1053 rdep_src_info 

1054 ): 

1055 if rdep_src not in reported_pkgs: 

1056 tests.append((rdep_src, rdep_src_info.version)) 

1057 reported_pkgs.add(rdep_src) 

1058 

1059 for tdep_src in self.testsuite_triggers.get(binary.package_name, set()): 

1060 if tdep_src not in reported_pkgs: 

1061 try: 

1062 tdep_src_info = sources_info[tdep_src] 

1063 except KeyError: 

1064 continue 

1065 if "autopkgtest" in tdep_src_info.testsuite or self.has_autodep8( 1065 ↛ 1059line 1065 didn't jump to line 1059, because the condition on line 1065 was never false

1066 tdep_src_info 

1067 ): 

1068 for pkg_id in tdep_src_info.binaries: 1068 ↛ 1059line 1068 didn't jump to line 1059, because the loop on line 1068 didn't complete

1069 if pkg_id.architecture == arch: 

1070 tests.append((tdep_src, tdep_src_info.version)) 

1071 reported_pkgs.add(tdep_src) 

1072 break 

1073 

1074 tests.sort(key=lambda s_v: s_v[0]) 

1075 return tests 

1076 

1077 def read_pending_tests(self) -> None: 

1078 """Read pending test requests from previous britney runs 

1079 

1080 Initialize self.pending_tests with that data. 

1081 """ 

1082 assert self.pending_tests is None, "already initialized" 

1083 if not os.path.exists(self.pending_tests_file): 

1084 self.logger.info( 

1085 "No %s, starting with no pending tests", self.pending_tests_file 

1086 ) 

1087 self.pending_tests = {} 

1088 return 

1089 with open(self.pending_tests_file) as f: 

1090 self.pending_tests = json.load(f) 

1091 if VERSION_KEY in self.pending_tests: 

1092 del self.pending_tests[VERSION_KEY] 

1093 for trigger in list(self.pending_tests.keys()): 

1094 for pkg in list(self.pending_tests[trigger].keys()): 

1095 arch_dict = self.pending_tests[trigger][pkg] 

1096 for arch in list(arch_dict.keys()): 

1097 if ( 

1098 self._now - arch_dict[arch] 

1099 > self.options.adt_pending_max_age 

1100 ): 

1101 del arch_dict[arch] 

1102 if not arch_dict: 

1103 del self.pending_tests[trigger][pkg] 

1104 if not self.pending_tests[trigger]: 

1105 del self.pending_tests[trigger] 

1106 else: 

1107 # Migration code: 

1108 for trigger_data in self.pending_tests.values(): 1108 ↛ 1109line 1108 didn't jump to line 1109, because the loop on line 1108 never started

1109 for pkg, arch_list in trigger_data.items(): 

1110 trigger_data[pkg] = {} 

1111 for arch in arch_list: 

1112 trigger_data[pkg][arch] = self._now 

1113 

1114 self.logger.info( 

1115 "Read pending requested tests from %s", self.pending_tests_file 

1116 ) 

1117 self.logger.debug("%s", self.pending_tests) 

1118 

1119 # this requires iterating over all triggers and thus is expensive; 

1120 # cache the results 

1121 @lru_cache(None) 

1122 def latest_run_for_package(self, src: str, arch: str) -> str: 

1123 """Return latest run ID for src on arch""" 

1124 

1125 latest_run_id = "" 

1126 for srcmap in self.test_results.values(): 

1127 try: 

1128 run_id = srcmap[src][arch][2] 

1129 except KeyError: 

1130 continue 

1131 if run_id > latest_run_id: 

1132 latest_run_id = run_id 

1133 return latest_run_id 

1134 

1135 def urlopen_retry(self, url: str) -> http.client.HTTPResponse | addinfourl: 

1136 """A urlopen() that retries on time outs or errors""" 

1137 

1138 exc: Exception 

1139 for retry in range(5): 1139 ↛ 1160line 1139 didn't jump to line 1160, because the loop on line 1139 didn't complete

1140 try: 

1141 req = urlopen(url, timeout=30) 

1142 code = req.getcode() 

1143 if not code or 200 <= code < 300: 1143 ↛ 1139line 1143 didn't jump to line 1139, because the condition on line 1143 was never false

1144 return req # type: ignore[no-any-return] 

1145 except socket.timeout as e: 1145 ↛ 1146line 1145 didn't jump to line 1146, because the exception caught by line 1145 didn't happen

1146 self.logger.info( 

1147 "Timeout downloading '%s', will retry %d more times." 

1148 % (url, 5 - retry - 1) 

1149 ) 

1150 exc = e 

1151 except HTTPError as e: 

1152 if e.code not in (503, 502): 1152 ↛ 1154line 1152 didn't jump to line 1154, because the condition on line 1152 was never false

1153 raise 

1154 self.logger.info( 

1155 "Caught error %d downloading '%s', will retry %d more times." 

1156 % (e.code, url, 5 - retry - 1) 

1157 ) 

1158 exc = e 

1159 else: 

1160 raise exc 

1161 

1162 @lru_cache(None) 

1163 def fetch_swift_results(self, swift_url: str, src: str, arch: str) -> None: 

1164 """Download new results for source package/arch from swift""" 

1165 

1166 # prepare query: get all runs with a timestamp later than the latest 

1167 # run_id for this package/arch; '@' is at the end of each run id, to 

1168 # mark the end of a test run directory path 

1169 # example: <autopkgtest-wily>wily/amd64/libp/libpng/20150630_054517@/result.tar 

1170 query = { 

1171 "delimiter": "@", 

1172 "prefix": "%s/%s/%s/%s/" % (self.options.series, arch, srchash(src), src), 

1173 } 

1174 

1175 # determine latest run_id from results 

1176 if not self.options.adt_shared_results_cache: 

1177 latest_run_id = self.latest_run_for_package(src, arch) 

1178 if latest_run_id: 

1179 query["marker"] = query["prefix"] + latest_run_id 

1180 

1181 # request new results from swift 

1182 url = os.path.join(swift_url, self.swift_container) 

1183 url += "?" + urllib.parse.urlencode(query) 

1184 f = None 

1185 try: 

1186 f = self.urlopen_retry(url) 

1187 if f.getcode() == 200: 

1188 result_paths = f.read().decode().strip().splitlines() 

1189 elif f.getcode() == 204: # No content 1189 ↛ 1195line 1189 didn't jump to line 1195, because the condition on line 1189 was never false

1190 result_paths = [] 

1191 else: 

1192 # we should not ever end up here as we expect a HTTPError in 

1193 # other cases; e. g. 3XX is something that tells us to adjust 

1194 # our URLS, so fail hard on those 

1195 raise NotImplementedError( 

1196 "fetch_swift_results(%s): cannot handle HTTP code %r" 

1197 % (url, f.getcode()) 

1198 ) 

1199 except IOError as e: 

1200 # 401 "Unauthorized" is swift's way of saying "container does not exist" 

1201 if getattr(e, "code", -1) == 401: 1201 ↛ 1210line 1201 didn't jump to line 1210, because the condition on line 1201 was never false

1202 self.logger.info( 

1203 "fetch_swift_results: %s does not exist yet or is inaccessible", url 

1204 ) 

1205 return 

1206 # Other status codes are usually a transient 

1207 # network/infrastructure failure. Ignoring this can lead to 

1208 # re-requesting tests which we already have results for, so 

1209 # fail hard on this and let the next run retry. 

1210 self.logger.error("Failure to fetch swift results from %s: %s", url, str(e)) 

1211 sys.exit(1) 

1212 finally: 

1213 if f is not None: 1213 ↛ 1216line 1213 didn't jump to line 1216, because the condition on line 1213 was never false

1214 f.close() 1214 ↛ exitline 1214 didn't return from function 'fetch_swift_results', because the return on line 1205 wasn't executed

1215 

1216 for p in result_paths: 

1217 self.fetch_one_result( 

1218 os.path.join(swift_url, self.swift_container, p, "result.tar"), 

1219 src, 

1220 arch, 

1221 ) 

1222 

1223 def fetch_one_result(self, url: str, src: str, arch: str) -> None: 

1224 """Download one result URL for source/arch 

1225 

1226 Remove matching pending_tests entries. 

1227 """ 

1228 f = None 

1229 try: 

1230 f = self.urlopen_retry(url) 

1231 if f.getcode() == 200: 1231 ↛ 1234line 1231 didn't jump to line 1234, because the condition on line 1231 was never false

1232 tar_bytes = io.BytesIO(f.read()) 

1233 else: 

1234 raise NotImplementedError( 

1235 "fetch_one_result(%s): cannot handle HTTP code %r" 

1236 % (url, f.getcode()) 

1237 ) 

1238 except IOError as err: 

1239 self.logger.error("Failure to fetch %s: %s", url, str(err)) 

1240 # we tolerate "not found" (something went wrong on uploading the 

1241 # result), but other things indicate infrastructure problems 

1242 if getattr(err, "code", -1) == 404: 

1243 return 

1244 sys.exit(1) 

1245 finally: 

1246 if f is not None: 1246 ↛ exit,   1246 ↛ 12482 missed branches: 1) line 1246 didn't return from function 'fetch_one_result', because the return on line 1243 wasn't executed, 2) line 1246 didn't jump to line 1248, because the condition on line 1246 was never false

1247 f.close() 1247 ↛ exitline 1247 didn't return from function 'fetch_one_result', because the return on line 1243 wasn't executed

1248 try: 

1249 with tarfile.open(None, "r", tar_bytes) as tar: 

1250 exitcode = int(tar.extractfile("exitcode").read().strip()) # type: ignore[union-attr] 

1251 srcver = tar.extractfile("testpkg-version").read().decode().strip() # type: ignore[union-attr] 

1252 (ressrc, ver) = srcver.split() 

1253 testinfo = json.loads(tar.extractfile("testinfo.json").read().decode()) # type: ignore[union-attr] 

1254 except (KeyError, ValueError, tarfile.TarError) as err: 

1255 self.logger.error("%s is damaged, ignoring: %s", url, str(err)) 

1256 # ignore this; this will leave an orphaned request in autopkgtest-pending.json 

1257 # and thus require manual retries after fixing the tmpfail, but we 

1258 # can't just blindly attribute it to some pending test. 

1259 return 

1260 

1261 if src != ressrc: 1261 ↛ 1262line 1261 didn't jump to line 1262, because the condition on line 1261 was never true

1262 self.logger.error( 

1263 "%s is a result for package %s, but expected package %s", 

1264 url, 

1265 ressrc, 

1266 src, 

1267 ) 

1268 return 

1269 

1270 # parse recorded triggers in test result 

1271 for e in testinfo.get("custom_environment", []): 1271 ↛ 1276line 1271 didn't jump to line 1276, because the loop on line 1271 didn't complete

1272 if e.startswith("ADT_TEST_TRIGGERS="): 1272 ↛ 1271line 1272 didn't jump to line 1271, because the condition on line 1272 was never false

1273 result_triggers = [i for i in e.split("=", 1)[1].split() if "/" in i] 

1274 break 

1275 else: 

1276 self.logger.error("%s result has no ADT_TEST_TRIGGERS, ignoring") 

1277 return 

1278 

1279 run_id = os.path.basename(os.path.dirname(url)) 

1280 seen = round(calendar.timegm(time.strptime(run_id, "%Y%m%d_%H%M%S@"))) 

1281 # allow some skipped tests, but nothing else 

1282 if exitcode in [0, 2]: 

1283 result = Result.PASS 

1284 elif exitcode == 8: 1284 ↛ 1285line 1284 didn't jump to line 1285, because the condition on line 1284 was never true

1285 result = Result.NEUTRAL 

1286 else: 

1287 result = Result.FAIL 

1288 

1289 self.logger.info( 

1290 "Fetched test result for %s/%s/%s %s (triggers: %s): %s", 

1291 src, 

1292 ver, 

1293 arch, 

1294 run_id, 

1295 result_triggers, 

1296 result.name.lower(), 

1297 ) 

1298 

1299 # remove matching test requests 

1300 for trigger in result_triggers: 

1301 self.remove_from_pending(trigger, src, arch) 

1302 

1303 # add this result 

1304 for trigger in result_triggers: 

1305 self.add_trigger_to_results(trigger, src, ver, arch, run_id, seen, result) 

1306 

1307 def remove_from_pending( 

1308 self, trigger: str, src: str, arch: str, timestamp: int = sys.maxsize 

1309 ) -> None: 

1310 assert self.pending_tests is not None # for type checking 

1311 try: 

1312 arch_dict = self.pending_tests[trigger][src] 

1313 if timestamp < arch_dict[arch]: 

1314 # The result is from before the moment of scheduling, so it's 

1315 # not the one we're waiting for 

1316 return 

1317 del arch_dict[arch] 

1318 if not arch_dict: 

1319 del self.pending_tests[trigger][src] 

1320 if not self.pending_tests[trigger]: 

1321 del self.pending_tests[trigger] 

1322 self.logger.debug( 

1323 "-> matches pending request %s/%s for trigger %s", src, arch, trigger 

1324 ) 

1325 except KeyError: 

1326 self.logger.debug( 

1327 "-> does not match any pending request for %s/%s", src, arch 

1328 ) 

1329 

1330 def add_trigger_to_results( 

1331 self, 

1332 trigger: str, 

1333 src: str, 

1334 ver: str, 

1335 arch: str, 

1336 run_id: str, 

1337 timestamp: int, 

1338 status_to_add: Result, 

1339 ) -> None: 

1340 # Ensure that we got a new enough version 

1341 try: 

1342 (trigsrc, trigver) = trigger.split("/", 1) 

1343 except ValueError: 

1344 self.logger.info("Ignoring invalid test trigger %s", trigger) 

1345 return 

1346 if trigsrc == src and apt_pkg.version_compare(ver, trigver) < 0: 1346 ↛ 1347line 1346 didn't jump to line 1347, because the condition on line 1346 was never true

1347 self.logger.debug( 

1348 "test trigger %s, but run for older version %s, ignoring", trigger, ver 

1349 ) 

1350 return 

1351 

1352 stored_result = ( 

1353 self.test_results.setdefault(trigger, {}) 

1354 .setdefault(src, {}) 

1355 .setdefault(arch, [Result.FAIL, None, "", 0]) 

1356 ) 

1357 

1358 # reruns shouldn't flip the result from PASS or NEUTRAL to 

1359 # FAIL, so remember the most recent version of the best result 

1360 # we've seen. Except for reference updates, which we always 

1361 # want to update with the most recent result. The result data 

1362 # may not be ordered by timestamp, so we need to check time. 

1363 update = False 

1364 if self.options.adt_baseline == "reference" and trigger == REF_TRIG: 

1365 if stored_result[3] < timestamp: 

1366 update = True 

1367 elif status_to_add < stored_result[0]: 

1368 update = True 

1369 elif status_to_add == stored_result[0] and stored_result[3] < timestamp: 

1370 update = True 

1371 

1372 if update: 

1373 stored_result[0] = status_to_add 

1374 stored_result[1] = ver 

1375 stored_result[2] = run_id 

1376 stored_result[3] = timestamp 

1377 

1378 def send_test_request( 

1379 self, src: str, arch: str, triggers: list[str], huge: bool = False 

1380 ) -> None: 

1381 """Send out AMQP request for testing src/arch for triggers 

1382 

1383 If huge is true, then the request will be put into the -huge instead of 

1384 normal queue. 

1385 """ 

1386 if self.options.dry_run: 1386 ↛ 1387line 1386 didn't jump to line 1387, because the condition on line 1386 was never true

1387 return 

1388 

1389 params: dict[str, Any] = {"triggers": triggers} 

1390 if self.options.adt_ppas: 

1391 params["ppas"] = self.options.adt_ppas 

1392 qname = "debci-ppa-%s-%s" % (self.options.series, arch) 

1393 elif huge: 

1394 qname = "debci-huge-%s-%s" % (self.options.series, arch) 

1395 else: 

1396 qname = "debci-%s-%s" % (self.options.series, arch) 

1397 params["submit-time"] = time.strftime("%Y-%m-%d %H:%M:%S%z", time.gmtime()) 

1398 

1399 if self.amqp_channel: 1399 ↛ 1400line 1399 didn't jump to line 1400, because the condition on line 1399 was never true

1400 self.amqp_channel.basic_publish( 

1401 amqp.Message( 

1402 src + "\n" + json.dumps(params), delivery_mode=2 

1403 ), # persistent 

1404 routing_key=qname, 

1405 ) 

1406 # we save pending.json with every request, so that if britney 

1407 # crashes we don't re-request tests. This is only needed when using 

1408 # real amqp, as with file-based submission the pending tests are 

1409 # returned by debci along with the results each run. 

1410 self.save_pending_json() 

1411 else: 

1412 # for file-based submission, triggers are space separated 

1413 params["triggers"] = [" ".join(params["triggers"])] 

1414 assert self.amqp_file_handle 

1415 self.amqp_file_handle.write("%s:%s %s\n" % (qname, src, json.dumps(params))) 

1416 

1417 def pkg_test_request( 

1418 self, src: str, arch: str, all_triggers: list[str], huge: bool = False 

1419 ) -> None: 

1420 """Request one package test for a set of triggers 

1421 

1422 all_triggers is a list of "pkgname/version". These are the packages 

1423 that will be taken from the source suite. The first package in this 

1424 list is the package that triggers the testing of src, the rest are 

1425 additional packages required for installability of the test deps. If 

1426 huge is true, then the request will be put into the -huge instead of 

1427 normal queue. 

1428 

1429 This will only be done if that test wasn't already requested in 

1430 a previous run (i. e. if it's not already in self.pending_tests) 

1431 or if there is already a fresh or a positive result for it. This 

1432 ensures to download current results for this package before 

1433 requesting any test.""" 

1434 trigger = all_triggers[0] 

1435 uses_swift = not self.options.adt_swift_url.startswith("file://") 

1436 try: 

1437 result = self.test_results[trigger][src][arch] 

1438 has_result = True 

1439 except KeyError: 

1440 has_result = False 

1441 

1442 if has_result: 

1443 result_state = result[0] 

1444 if result_state in {Result.OLD_PASS, Result.OLD_FAIL, Result.OLD_NEUTRAL}: 

1445 pass 

1446 elif ( 

1447 result_state == Result.FAIL 

1448 and self.result_in_baseline(src, arch)[0] 

1449 in {Result.PASS, Result.NEUTRAL, Result.OLD_PASS, Result.OLD_NEUTRAL} 

1450 and self._now - result[3] > self.options.adt_retry_older_than 

1451 ): 

1452 # We might want to retry this failure, so continue 

1453 pass 

1454 elif not uses_swift: 

1455 # We're done if we don't retrigger and we're not using swift 

1456 return 

1457 elif result_state in {Result.PASS, Result.NEUTRAL}: 

1458 self.logger.debug( 

1459 "%s/%s triggered by %s already known", src, arch, trigger 

1460 ) 

1461 return 

1462 

1463 # Without swift we don't expect new results 

1464 if uses_swift: 

1465 self.logger.info( 

1466 "Checking for new results for failed %s/%s for trigger %s", 

1467 src, 

1468 arch, 

1469 trigger, 

1470 ) 

1471 self.fetch_swift_results(self.options.adt_swift_url, src, arch) 

1472 # do we have one now? 

1473 try: 

1474 self.test_results[trigger][src][arch] 

1475 return 

1476 except KeyError: 

1477 pass 

1478 

1479 self.request_test_if_not_queued(src, arch, trigger, all_triggers, huge=huge) 

1480 

1481 def request_test_if_not_queued( 

1482 self, 

1483 src: str, 

1484 arch: str, 

1485 trigger: str, 

1486 all_triggers: list[str] = [], 

1487 huge: bool = False, 

1488 ) -> None: 

1489 assert self.pending_tests is not None # for type checking 

1490 if not all_triggers: 

1491 all_triggers = [trigger] 

1492 

1493 # Don't re-request if it's already pending 

1494 arch_dict = self.pending_tests.setdefault(trigger, {}).setdefault(src, {}) 

1495 if arch in arch_dict.keys(): 

1496 self.logger.debug( 

1497 "Test %s/%s for %s is already pending, not queueing", src, arch, trigger 

1498 ) 

1499 else: 

1500 self.logger.debug( 

1501 "Requesting %s autopkgtest on %s to verify %s", src, arch, trigger 

1502 ) 

1503 arch_dict[arch] = self._now 

1504 self.send_test_request(src, arch, all_triggers, huge=huge) 

1505 

1506 def result_in_baseline(self, src: str, arch: str) -> list[Any]: 

1507 """Get the result for src on arch in the baseline 

1508 

1509 The baseline is optionally all data or a reference set) 

1510 """ 

1511 

1512 # this requires iterating over all cached results and thus is expensive; 

1513 # cache the results 

1514 try: 

1515 return self.result_in_baseline_cache[src][arch] 

1516 except KeyError: 

1517 pass 

1518 

1519 result_reference: list[Any] = [Result.NONE, None, "", 0] 

1520 if self.options.adt_baseline == "reference": 

1521 if src not in self.suite_info.target_suite.sources: 1521 ↛ 1522line 1521 didn't jump to line 1522, because the condition on line 1521 was never true

1522 return result_reference 

1523 

1524 try: 

1525 result_reference = self.test_results[REF_TRIG][src][arch] 

1526 self.logger.debug( 

1527 "Found result for src %s in reference: %s", 

1528 src, 

1529 result_reference[0].name, 

1530 ) 

1531 except KeyError: 

1532 self.logger.debug( 

1533 "Found NO result for src %s in reference: %s", 

1534 src, 

1535 result_reference[0].name, 

1536 ) 

1537 self.result_in_baseline_cache[src][arch] = deepcopy(result_reference) 

1538 return result_reference 

1539 

1540 result_ever: list[Any] = [Result.FAIL, None, "", 0] 

1541 for srcmap in self.test_results.values(): 

1542 try: 

1543 if srcmap[src][arch][0] != Result.FAIL: 

1544 result_ever = srcmap[src][arch] 

1545 # If we are not looking at a reference run, We don't really 

1546 # care about anything except the status, so we're done 

1547 # once we find a PASS. 

1548 if result_ever[0] == Result.PASS: 

1549 break 

1550 except KeyError: 

1551 pass 

1552 

1553 self.result_in_baseline_cache[src][arch] = deepcopy(result_ever) 

1554 self.logger.debug("Result for src %s ever: %s", src, result_ever[0].name) 

1555 return result_ever 

1556 

1557 def has_test_in_target(self, src: str) -> bool: 

1558 test_in_target = False 

1559 try: 

1560 srcinfo = self.suite_info.target_suite.sources[src] 

1561 if "autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo): 

1562 test_in_target = True 

1563 # AttributeError is only needed for the test suite as 

1564 # srcinfo can be a NoneType 

1565 except (KeyError, AttributeError): 

1566 pass 

1567 

1568 return test_in_target 

1569 

1570 def pkg_test_result( 

1571 self, src: str, ver: str, arch: str, trigger: str 

1572 ) -> tuple[str, str, Optional[str], str]: 

1573 """Get current test status of a particular package 

1574 

1575 Return (status, real_version, run_id, log_url) tuple; status is a key in 

1576 EXCUSES_LABELS. run_id is None if the test is still running. 

1577 """ 

1578 assert self.pending_tests is not None # for type checking 

1579 # determine current test result status 

1580 run_id = None 

1581 try: 

1582 r = self.test_results[trigger][src][arch] 

1583 ver = r[1] 

1584 run_id = r[2] 

1585 

1586 if r[0] in {Result.FAIL, Result.OLD_FAIL}: 

1587 # determine current test result status 

1588 baseline_result = self.result_in_baseline(src, arch)[0] 

1589 

1590 # Special-case triggers from linux-meta*: we cannot compare 

1591 # results against different kernels, as e. g. a DKMS module 

1592 # might work against the default kernel but fail against a 

1593 # different flavor; so for those, ignore the "ever 

1594 # passed" check; FIXME: check against trigsrc only 

1595 if self.options.adt_baseline != "reference" and ( 

1596 trigger.startswith("linux-meta") or trigger.startswith("linux/") 

1597 ): 

1598 baseline_result = Result.FAIL 

1599 

1600 # Check if the autopkgtest (still) exists in the target suite 

1601 test_in_target = self.has_test_in_target(src) 

1602 

1603 if test_in_target and baseline_result in { 

1604 Result.NONE, 

1605 Result.OLD_FAIL, 

1606 Result.OLD_NEUTRAL, 

1607 Result.OLD_PASS, 

1608 }: 

1609 self.request_test_if_not_queued(src, arch, REF_TRIG) 

1610 

1611 if self.has_force_badtest(src, ver, arch): 

1612 result = "IGNORE-FAIL" 

1613 elif not test_in_target: 

1614 if self.options.adt_ignore_failure_for_new_tests: 

1615 result = "IGNORE-FAIL" 

1616 else: 

1617 result = r[0].name 

1618 elif baseline_result in {Result.FAIL, Result.OLD_FAIL}: 

1619 result = "ALWAYSFAIL" 

1620 elif baseline_result == Result.NONE: 1620 ↛ 1621line 1620 didn't jump to line 1621, because the condition on line 1620 was never true

1621 result = "RUNNING-REFERENCE" 

1622 else: 

1623 result = "REGRESSION" 

1624 

1625 else: 

1626 result = r[0].name 

1627 

1628 url = self.format_log_url(src, arch, run_id) 

1629 except KeyError: 

1630 # no result for src/arch; still running? 

1631 assert arch in self.pending_tests.get(trigger, {}).get(src, {}).keys(), ( 

1632 "Result for %s/%s/%s (triggered by %s) is neither known nor pending!" 

1633 % (src, ver, arch, trigger) 

1634 ) 

1635 

1636 if self.has_force_badtest(src, ver, arch): 

1637 result = "RUNNING-IGNORE" 

1638 else: 

1639 if self.has_test_in_target(src): 

1640 baseline_result = self.result_in_baseline(src, arch)[0] 

1641 if baseline_result == Result.FAIL: 

1642 result = "RUNNING-ALWAYSFAIL" 

1643 else: 

1644 result = "RUNNING" 

1645 else: 

1646 if self.options.adt_ignore_failure_for_new_tests: 

1647 result = "RUNNING-IGNORE" 

1648 else: 

1649 result = "RUNNING" 

1650 url = self.options.adt_ci_url + "status/pending" 

1651 

1652 return (result, ver, run_id, url) 

1653 

1654 def has_force_badtest(self, src: str, ver: str, arch: str) -> bool: 

1655 """Check if src/ver/arch has a force-badtest hint""" 

1656 

1657 assert self.hints is not None 

1658 hints = self.hints.search("force-badtest", package=src) 

1659 if hints: 

1660 self.logger.info( 

1661 "Checking hints for %s/%s/%s: %s", 

1662 src, 

1663 arch, 

1664 ver, 

1665 [str(h) for h in hints], 

1666 ) 

1667 for hint in hints: 

1668 if [ 

1669 mi 

1670 for mi in hint.packages 

1671 if mi.architecture in ["source", arch] 

1672 and ( 

1673 mi.version == "all" 

1674 or apt_pkg.version_compare(ver, mi.version) <= 0 # type: ignore[arg-type] 

1675 ) 

1676 ]: 

1677 return True 

1678 

1679 return False 

1680 

1681 def has_built_on_this_arch_or_is_arch_all( 

1682 self, src_data: SourcePackage, arch: str 

1683 ) -> bool: 

1684 """When a source builds arch:all binaries, those binaries are 

1685 added to all architectures and thus the source 'exists' 

1686 everywhere. This function checks if the source has any arch 

1687 specific binaries on this architecture and if not, if it 

1688 has them on any architecture. 

1689 """ 

1690 packages_s_a = self.suite_info.primary_source_suite.binaries[arch] 

1691 has_unknown_binary = False 

1692 for binary_s in src_data.binaries: 

1693 try: 

1694 binary_u = packages_s_a[binary_s.package_name] 

1695 except KeyError: 

1696 # src_data.binaries has all the built binaries, so if 

1697 # we get here, we know that at least one architecture 

1698 # has architecture specific binaries 

1699 has_unknown_binary = True 

1700 continue 

1701 if binary_u.architecture == arch: 

1702 return True 

1703 # If we get here, we have only seen arch:all packages for this 

1704 # arch. 

1705 return not has_unknown_binary