Coverage for britney2/policies/autopkgtest.py: 90%

771 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-03-23 07:34 +0000

1# -*- coding: utf-8 -*- 

2 

3# Copyright (C) 2013 - 2016 Canonical Ltd. 

4# Authors: 

5# Colin Watson <cjwatson@ubuntu.com> 

6# Jean-Baptiste Lallement <jean-baptiste.lallement@canonical.com> 

7# Martin Pitt <martin.pitt@ubuntu.com> 

8 

9# This program is free software; you can redistribute it and/or modify 

10# it under the terms of the GNU General Public License as published by 

11# the Free Software Foundation; either version 2 of the License, or 

12# (at your option) any later version. 

13 

14# This program is distributed in the hope that it will be useful, 

15# but WITHOUT ANY WARRANTY; without even the implied warranty of 

16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

17# GNU General Public License for more details. 

18 

19import calendar 

20import collections 

21import io 

22import itertools 

23import json 

24import optparse 

25import os 

26import sys 

27import tarfile 

28import time 

29import urllib.parse 

30from copy import deepcopy 

31from enum import Enum 

32from functools import lru_cache, total_ordering 

33from typing import TYPE_CHECKING, Any, Optional, cast 

34from collections.abc import Iterator 

35from urllib.request import urlopen 

36 

37import apt_pkg 

38 

39import britney2.hints 

40from britney2 import ( 

41 BinaryPackageId, 

42 PackageId, 

43 SourcePackage, 

44 SuiteClass, 

45 Suites, 

46 TargetSuite, 

47) 

48from britney2.migrationitem import MigrationItem 

49from britney2.policies import PolicyVerdict 

50from britney2.policies.policy import AbstractBasePolicy 

51from britney2.utils import iter_except, parse_option 

52 

53if TYPE_CHECKING: 53 ↛ 54line 53 didn't jump to line 54, because the condition on line 53 was never true

54 import amqplib.client_0_8 as amqp 

55 

56 from ..britney import Britney 

57 from ..excuse import Excuse 

58 from ..hints import HintParser 

59 

60 

61@total_ordering 

62class Result(Enum): 

63 PASS = 1 

64 NEUTRAL = 2 

65 FAIL = 3 

66 OLD_PASS = 4 

67 OLD_NEUTRAL = 5 

68 OLD_FAIL = 6 

69 NONE = 7 

70 

71 def __lt__(self, other: "Result") -> bool: 

72 return True if self.value < other.value else False 

73 

74 

75EXCUSES_LABELS = { 

76 "PASS": '<span style="background:#87d96c">Pass</span>', 

77 "OLD_PASS": '<span style="background:#87d96c">Pass</span>', 

78 "NEUTRAL": '<span style="background:#e5c545">No tests, superficial or marked flaky</span>', 

79 "OLD_NEUTRAL": '<span style="background:#e5c545">No tests, superficial or marked flaky</span>', 

80 "FAIL": '<span style="background:#ff6666">Failed</span>', 

81 "OLD_FAIL": '<span style="background:#ff6666">Failed</span>', 

82 "ALWAYSFAIL": '<span style="background:#e5c545">Failed (not a regression)</span>', 

83 "REGRESSION": '<span style="background:#ff6666">Regression or new test</span>', 

84 "IGNORE-FAIL": '<span style="background:#e5c545">Ignored failure</span>', 

85 "RUNNING": '<span style="background:#99ddff">Test in progress</span>', 

86 "RUNNING-REFERENCE": '<span style="background:#ff6666">Reference test in progress, but real test failed already</span>', 

87 "RUNNING-ALWAYSFAIL": '<span style="background:#99ddff">Test in progress (will not be considered a regression)</span>', 

88} 

89 

90REF_TRIG = "migration-reference/0" 

91 

92VERSION_KEY = "britney-autopkgtest-pending-file-version" 

93 

94 

95def srchash(src: str) -> str: 

96 """archive hash prefix for source package""" 

97 

98 if src.startswith("lib"): 98 ↛ 99line 98 didn't jump to line 99, because the condition on line 98 was never true

99 return src[:4] 

100 else: 

101 return src[0] 

102 

103 

104def added_pkgs_compared_to_target_suite( 

105 package_ids: frozenset[BinaryPackageId], 

106 target_suite: TargetSuite, 

107 *, 

108 invert: bool = False, 

109) -> Iterator[BinaryPackageId]: 

110 if invert: 110 ↛ 111line 110 didn't jump to line 111, because the condition on line 110 was never true

111 pkgs_ids_to_ignore = package_ids - set( 

112 target_suite.which_of_these_are_in_the_suite(package_ids) 

113 ) 

114 names_ignored = {p.package_name for p in pkgs_ids_to_ignore} 

115 else: 

116 names_ignored = { 

117 p.package_name 

118 for p in target_suite.which_of_these_are_in_the_suite(package_ids) 

119 } 

120 yield from (p for p in package_ids if p.package_name not in names_ignored) 

121 

122 

123def all_leaf_results( 

124 test_results: dict[str, dict[str, dict[str, list[Any]]]], 

125) -> Iterator[list[Any]]: 

126 for trigger in test_results.values(): 

127 for arch in trigger.values(): 

128 yield from arch.values() 

129 

130 

131def mark_result_as_old(result: Result) -> Result: 

132 """Convert current result into corresponding old result""" 

133 

134 if result == Result.FAIL: 

135 result = Result.OLD_FAIL 

136 elif result == Result.PASS: 

137 result = Result.OLD_PASS 

138 elif result == Result.NEUTRAL: 138 ↛ 140line 138 didn't jump to line 140, because the condition on line 138 was never false

139 result = Result.OLD_NEUTRAL 

140 return result 

141 

142 

143class AutopkgtestPolicy(AbstractBasePolicy): 

144 """autopkgtest regression policy for source migrations 

145 

146 Run autopkgtests for the excuse and all of its reverse dependencies, and 

147 reject the upload if any of those regress. 

148 """ 

149 

150 def __init__(self, options: optparse.Values, suite_info: Suites) -> None: 

151 super().__init__( 

152 "autopkgtest", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE} 

153 ) 

154 # tests requested in this and previous runs 

155 # trigger -> src -> [arch] 

156 self.pending_tests: Optional[dict[str, dict[str, dict[str, int]]]] = None 

157 self.pending_tests_file = os.path.join( 

158 self.state_dir, "autopkgtest-pending.json" 

159 ) 

160 self.testsuite_triggers: dict[str, set[str]] = {} 

161 self.result_in_baseline_cache: dict[str, dict[str, list[Any]]] = ( 

162 collections.defaultdict(dict) 

163 ) 

164 

165 self.amqp_file_handle: io.TextIOWrapper | None = None 

166 

167 # Default values for this policy's options 

168 parse_option(options, "adt_baseline") 

169 parse_option(options, "adt_huge", to_int=True) 

170 parse_option(options, "adt_ppas") 

171 parse_option(options, "adt_reference_max_age", day_to_sec=True) 

172 parse_option(options, "adt_pending_max_age", default=5, day_to_sec=True) 

173 parse_option(options, "adt_regression_penalty", default=0, to_int=True) 

174 parse_option(options, "adt_log_url") # see below for defaults 

175 parse_option(options, "adt_retry_url") # see below for defaults 

176 parse_option(options, "adt_retry_older_than", day_to_sec=True) 

177 parse_option(options, "adt_results_cache_age", day_to_sec=True) 

178 parse_option(options, "adt_shared_results_cache") 

179 parse_option(options, "adt_success_bounty", default=0, to_int=True) 

180 parse_option(options, "adt_ignore_failure_for_new_tests", to_bool=True) 

181 

182 # When ADT_RESULTS_CACHE_AGE is smaller than or equal to 

183 # ADT_REFERENCE_MAX_AGE old reference result will be removed from cache 

184 # before the newly scheduled results are in, potentially causing 

185 # additional waiting. For packages like glibc this might cause an 

186 # infinite delay as there will always be a package that's 

187 # waiting. Similarly for ADT_RETRY_OLDER_THAN. 

188 if self.options.adt_results_cache_age <= self.options.adt_reference_max_age: 

189 self.logger.warning( 

190 "Unexpected: ADT_REFERENCE_MAX_AGE bigger than ADT_RESULTS_CACHE_AGE" 

191 ) 

192 if self.options.adt_results_cache_age <= self.options.adt_retry_older_than: 

193 self.logger.warning( 

194 "Unexpected: ADT_RETRY_OLDER_THAN bigger than ADT_RESULTS_CACHE_AGE" 

195 ) 

196 

197 if not self.options.adt_log_url: 197 ↛ 223line 197 didn't jump to line 223, because the condition on line 197 was never false

198 # Historical defaults 

199 if self.options.adt_swift_url.startswith("file://"): 

200 self.options.adt_log_url = os.path.join( 

201 self.options.adt_ci_url, 

202 "data", 

203 "autopkgtest", 

204 self.options.series, 

205 "{arch}", 

206 "{hash}", 

207 "{package}", 

208 "{run_id}", 

209 "log.gz", 

210 ) 

211 else: 

212 self.options.adt_log_url = os.path.join( 

213 self.options.adt_swift_url, 

214 "{swift_container}", 

215 self.options.series, 

216 "{arch}", 

217 "{hash}", 

218 "{package}", 

219 "{run_id}", 

220 "log.gz", 

221 ) 

222 

223 if hasattr(self.options, "adt_retry_url_mech"): 223 ↛ 224line 223 didn't jump to line 224, because the condition on line 223 was never true

224 self.logger.warning( 

225 "The ADT_RETRY_URL_MECH configuration has been deprecated." 

226 ) 

227 self.logger.warning( 

228 "Instead britney now supports ADT_RETRY_URL for more flexibility." 

229 ) 

230 if self.options.adt_retry_url: 

231 self.logger.error( 

232 "Please remove the ADT_RETRY_URL_MECH as ADT_RETRY_URL will be used." 

233 ) 

234 elif self.options.adt_retry_url_mech == "run_id": 

235 self.options.adt_retry_url = ( 

236 self.options.adt_ci_url + "api/v1/retry/{run_id}" 

237 ) 

238 if not self.options.adt_retry_url: 238 ↛ 255line 238 didn't jump to line 255, because the condition on line 238 was never false

239 # Historical default 

240 self.options.adt_retry_url = ( 

241 self.options.adt_ci_url 

242 + "request.cgi?" 

243 + "release={release}&arch={arch}&package={package}&trigger={trigger}{ppas}" 

244 ) 

245 

246 # results map: trigger -> src -> arch -> [passed, version, run_id, seen] 

247 # - trigger is "source/version" of an unstable package that triggered 

248 # this test run. 

249 # - "passed" is a Result 

250 # - "version" is the package version of "src" of that test 

251 # - "run_id" is an opaque ID that identifies a particular test run for 

252 # a given src/arch. 

253 # - "seen" is an approximate time stamp of the test run. How this is 

254 # deduced depends on the interface used. 

255 self.test_results: dict[str, dict[str, dict[str, list[Any]]]] = {} 

256 if self.options.adt_shared_results_cache: 

257 self.results_cache_file = self.options.adt_shared_results_cache 

258 else: 

259 self.results_cache_file = os.path.join( 

260 self.state_dir, "autopkgtest-results.cache" 

261 ) 

262 

263 try: 

264 self.options.adt_ppas = self.options.adt_ppas.strip().split() 

265 except AttributeError: 

266 self.options.adt_ppas = [] 

267 

268 self.swift_container = "autopkgtest-" + options.series 

269 if self.options.adt_ppas: 

270 self.swift_container += "-" + options.adt_ppas[-1].replace("/", "-") 

271 

272 # restrict adt_arches to architectures we actually run for 

273 self.adt_arches = [] 

274 for arch in self.options.adt_arches.split(): 

275 if arch in self.options.architectures: 

276 self.adt_arches.append(arch) 

277 else: 

278 self.logger.info( 

279 "Ignoring ADT_ARCHES %s as it is not in architectures list", arch 

280 ) 

281 

282 def __del__(self) -> None: 

283 if self.amqp_file_handle: 283 ↛ exitline 283 didn't return from function '__del__', because the condition on line 283 was never false

284 try: 

285 self.amqp_file_handle.close() 

286 except AttributeError: 

287 pass 

288 

289 def register_hints(self, hint_parser: "HintParser") -> None: 

290 hint_parser.register_hint_type( 

291 "force-badtest", britney2.hints.split_into_one_hint_per_package 

292 ) 

293 hint_parser.register_hint_type( 

294 "force-skiptest", britney2.hints.split_into_one_hint_per_package 

295 ) 

296 

297 def initialise(self, britney: "Britney") -> None: 

298 super().initialise(britney) 

299 # We want to use the "current" time stamp in multiple locations 

300 time_now = round(time.time()) 

301 if hasattr(self.options, "fake_runtime"): 

302 time_now = int(self.options.fake_runtime) 

303 self._now = time_now 

304 # compute inverse Testsuite-Triggers: map, unifying all series 

305 self.logger.info("Building inverse testsuite_triggers map") 

306 for suite in self.suite_info: 

307 for src, data in suite.sources.items(): 

308 for trigger in data.testsuite_triggers: 

309 self.testsuite_triggers.setdefault(trigger, set()).add(src) 

310 target_suite_name = self.suite_info.target_suite.name 

311 

312 os.makedirs(self.state_dir, exist_ok=True) 

313 self.read_pending_tests() 

314 

315 # read the cached results that we collected so far 

316 if os.path.exists(self.results_cache_file): 

317 with open(self.results_cache_file) as f: 

318 test_results = json.load(f) 

319 self.test_results = self.check_and_upgrade_cache(test_results) 

320 self.logger.info("Read previous results from %s", self.results_cache_file) 

321 else: 

322 self.logger.info( 

323 "%s does not exist, re-downloading all results from swift", 

324 self.results_cache_file, 

325 ) 

326 

327 # read in the new results 

328 if self.options.adt_swift_url.startswith("file://"): 

329 debci_file = self.options.adt_swift_url[7:] 

330 if os.path.exists(debci_file): 

331 with open(debci_file) as f: 

332 test_results = json.load(f) 

333 self.logger.info("Read new results from %s", debci_file) 

334 for res in test_results["results"]: 

335 # tests denied on infrastructure don't get a version 

336 if res["version"] is None: 336 ↛ 337line 336 didn't jump to line 337, because the condition on line 336 was never true

337 res["version"] = "blocked-on-ci-infra" 

338 assert res["version"] is not None 

339 (test_suite, triggers, src, arch, ver, status, run_id, seen) = [ 

340 res["suite"], 

341 res["trigger"], 

342 res["package"], 

343 res["arch"], 

344 res["version"], 

345 res["status"], 

346 str(res["run_id"]), 

347 round( 

348 calendar.timegm( 

349 time.strptime( 

350 res["updated_at"][0:-5], "%Y-%m-%dT%H:%M:%S" 

351 ) 

352 ) 

353 ), 

354 ] 

355 if test_suite != target_suite_name: 355 ↛ 357line 355 didn't jump to line 357, because the condition on line 355 was never true

356 # not requested for this target suite, so ignore 

357 continue 

358 if triggers is None: 358 ↛ 360line 358 didn't jump to line 360, because the condition on line 358 was never true

359 # not requested for this policy, so ignore 

360 continue 

361 if status is None: 

362 # still running => pending 

363 continue 

364 for trigger in triggers.split(): 

365 # remove matching test requests 

366 self.remove_from_pending(trigger, src, arch, seen) 

367 if status == "tmpfail": 367 ↛ 369line 367 didn't jump to line 369, because the condition on line 367 was never true

368 # let's see if we still need it 

369 continue 

370 self.logger.debug( 

371 "Results %s %s %s added", src, trigger, status 

372 ) 

373 self.add_trigger_to_results( 

374 trigger, 

375 src, 

376 ver, 

377 arch, 

378 run_id, 

379 seen, 

380 Result[status.upper()], 

381 ) 

382 else: 

383 self.logger.info( 

384 "%s does not exist, no new data will be processed", debci_file 

385 ) 

386 

387 # The cache can contain results against versions of packages that 

388 # are not in any suite anymore. Strip those out, as we don't want 

389 # to use those results. Additionally, old references may be 

390 # filtered out. 

391 if self.options.adt_baseline == "reference": 

392 self.filter_old_results() 

393 

394 # we need sources, binaries, and installability tester, so for now 

395 # remember the whole britney object 

396 self.britney = britney 

397 

398 # Initialize AMQP connection 

399 self.amqp_channel: Optional["amqp.channel.Channel"] = None 

400 self.amqp_file_handle = None 

401 if self.options.dry_run: 401 ↛ 402line 401 didn't jump to line 402, because the condition on line 401 was never true

402 return 

403 

404 amqp_url = self.options.adt_amqp 

405 

406 if amqp_url.startswith("amqp://"): 406 ↛ 407line 406 didn't jump to line 407, because the condition on line 406 was never true

407 import amqplib.client_0_8 as amqp 

408 

409 # depending on the setup we connect to a AMQP server 

410 creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False) 

411 self.amqp_con = amqp.Connection( 

412 creds.hostname, userid=creds.username, password=creds.password 

413 ) 

414 self.amqp_channel = self.amqp_con.channel() 

415 self.logger.info("Connected to AMQP server") 

416 elif amqp_url.startswith("file://"): 416 ↛ 421line 416 didn't jump to line 421, because the condition on line 416 was never false

417 # or in Debian and in testing mode, adt_amqp will be a file:// URL 

418 amqp_file = amqp_url[7:] 

419 self.amqp_file_handle = open(amqp_file, "w", 1) 

420 else: 

421 raise RuntimeError("Unknown ADT_AMQP schema %s" % amqp_url.split(":", 1)[0]) 

422 

423 def check_and_upgrade_cache( 

424 self, test_results: dict[str, dict[str, dict[str, list[Any]]]] 

425 ) -> dict[str, dict[str, dict[str, list[Any]]]]: 

426 for leaf_result in all_leaf_results(test_results): 

427 leaf_result[0] = Result[leaf_result[0]] 

428 

429 # Drop results older than ADT_RESULTS_CACHE_AGE 

430 for trigger in list(test_results.keys()): 

431 for pkg in list(test_results[trigger].keys()): 

432 for arch in list(test_results[trigger][pkg].keys()): 

433 arch_result = test_results[trigger][pkg][arch] 

434 if self._now - arch_result[3] > self.options.adt_results_cache_age: 434 ↛ 435line 434 didn't jump to line 435, because the condition on line 434 was never true

435 del test_results[trigger][pkg][arch] 

436 if not test_results[trigger][pkg]: 436 ↛ 437line 436 didn't jump to line 437, because the condition on line 436 was never true

437 del test_results[trigger][pkg] 

438 if not test_results[trigger]: 438 ↛ 439line 438 didn't jump to line 439, because the condition on line 438 was never true

439 del test_results[trigger] 

440 

441 return test_results 

442 

443 def filter_old_results(self) -> None: 

444 """Remove results for old versions and reference runs from the cache. 

445 

446 For now, only delete reference runs. If we delete regular 

447 results after a while, packages with lots of triggered tests may 

448 never have all the results at the same time.""" 

449 

450 test_results = self.test_results 

451 

452 for trigger, trigger_data in test_results.items(): 

453 for src, results in trigger_data.items(): 

454 for arch, result in results.items(): 

455 if ( 

456 trigger == REF_TRIG 

457 and self._now - result[3] > self.options.adt_reference_max_age 

458 ): 

459 result[0] = mark_result_as_old(result[0]) 

460 elif not self.test_version_in_any_suite(src, result[1]): 

461 result[0] = mark_result_as_old(result[0]) 

462 

463 def test_version_in_any_suite(self, src: str, version: str) -> bool: 

464 """Check if the mentioned version of src is found in a suite 

465 

466 To prevent regressions in the target suite, the result should be 

467 from a test with the version of the package in either the source 

468 suite or the target suite. The source suite is also valid, 

469 because due to versioned test dependencies and Breaks/Conflicts 

470 relations, regularly the version in the source suite is used 

471 during testing. 

472 """ 

473 

474 versions = set() 

475 for suite in self.suite_info: 

476 try: 

477 srcinfo = suite.sources[src] 

478 except KeyError: 

479 continue 

480 versions.add(srcinfo.version) 

481 

482 valid_version = False 

483 for ver in versions: 

484 if apt_pkg.version_compare(ver, version) == 0: 

485 valid_version = True 

486 break 

487 

488 return valid_version 

489 

490 def save_pending_json(self) -> None: 

491 # update the pending tests on-disk cache 

492 self.logger.info( 

493 "Updating pending requested tests in %s" % self.pending_tests_file 

494 ) 

495 # Shallow clone pending_tests as we only modify the toplevel and change its type. 

496 pending_tests: dict[str, Any] = {} 

497 if self.pending_tests: 

498 pending_tests = dict(self.pending_tests) 

499 # Avoid adding if there are no pending results at all (eases testing) 

500 pending_tests[VERSION_KEY] = 1 

501 with open(self.pending_tests_file + ".new", "w") as f: 

502 json.dump(pending_tests, f, indent=2) 

503 os.rename(self.pending_tests_file + ".new", self.pending_tests_file) 

504 

505 def save_state(self, britney: "Britney") -> None: 

506 super().save_state(britney) 

507 

508 # update the results on-disk cache, unless we are using a r/o shared one 

509 if not self.options.adt_shared_results_cache: 

510 self.logger.info("Updating results cache") 

511 test_results = deepcopy(self.test_results) 

512 for result in all_leaf_results(test_results): 

513 result[0] = result[0].name 

514 with open(self.results_cache_file + ".new", "w") as f: 

515 json.dump(test_results, f, indent=2) 

516 os.rename(self.results_cache_file + ".new", self.results_cache_file) 

517 

518 self.save_pending_json() 

519 

520 def format_retry_url( 

521 self, run_id: Optional[str], arch: str, testsrc: str, trigger: str 

522 ) -> str: 

523 if self.options.adt_ppas: 

524 ppas = "&" + urllib.parse.urlencode( 

525 [("ppa", p) for p in self.options.adt_ppas] 

526 ) 

527 else: 

528 ppas = "" 

529 return cast(str, self.options.adt_retry_url).format( 

530 run_id=run_id, 

531 release=self.options.series, 

532 arch=arch, 

533 package=testsrc, 

534 trigger=urllib.parse.quote_plus(trigger), 

535 ppas=ppas, 

536 ) 

537 

538 def format_log_url(self, testsrc: str, arch: str, run_id: str) -> str: 

539 return cast(str, self.options.adt_log_url).format( 

540 release=self.options.series, 

541 swift_container=self.swift_container, 

542 hash=srchash(testsrc), 

543 package=testsrc, 

544 arch=arch, 

545 run_id=run_id, 

546 ) 

547 

548 def apply_src_policy_impl( 

549 self, 

550 tests_info: dict[str, Any], 

551 item: MigrationItem, 

552 source_data_tdist: Optional[SourcePackage], 

553 source_data_srcdist: SourcePackage, 

554 excuse: "Excuse", 

555 ) -> PolicyVerdict: 

556 assert self.hints is not None # for type checking 

557 # initialize 

558 verdict = PolicyVerdict.PASS 

559 all_self_tests_pass = False 

560 source_name = item.package 

561 results_info = [] 

562 

563 # skip/delay autopkgtests until new package is built somewhere 

564 if not source_data_srcdist.binaries: 

565 self.logger.debug( 

566 "%s hasnot been built anywhere, skipping autopkgtest policy", 

567 excuse.name, 

568 ) 

569 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

570 excuse.add_verdict_info(verdict, "nothing built yet, autopkgtest delayed") 

571 

572 if "all" in excuse.missing_builds: 

573 self.logger.debug( 

574 "%s hasnot been built for arch:all, skipping autopkgtest policy", 

575 source_name, 

576 ) 

577 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

578 excuse.add_verdict_info( 

579 verdict, "arch:all not built yet, autopkgtest delayed" 

580 ) 

581 

582 if verdict == PolicyVerdict.PASS: 

583 self.logger.debug("Checking autopkgtests for %s", source_name) 

584 trigger = source_name + "/" + source_data_srcdist.version 

585 

586 # build a (testsrc, testver) → arch → (status, run_id, log_url) map; we trigger/check test 

587 # results per architecture for technical/efficiency reasons, but we 

588 # want to evaluate and present the results by tested source package 

589 # first 

590 pkg_arch_result: dict[ 

591 tuple[str, str], dict[str, tuple[str, Optional[str], str]] 

592 ] = collections.defaultdict(dict) 

593 for arch in self.adt_arches: 

594 if arch in excuse.missing_builds: 

595 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

596 self.logger.debug( 

597 "%s hasnot been built on arch %s, delay autopkgtest there", 

598 source_name, 

599 arch, 

600 ) 

601 excuse.add_verdict_info( 

602 verdict, 

603 "arch:%s not built yet, autopkgtest delayed there" % arch, 

604 ) 

605 elif arch in excuse.policy_info["depends"].get( 

606 "arch_all_not_installable", [] 

607 ): 

608 self.logger.debug( 

609 "%s is uninstallable on arch %s (which is allowed), not running autopkgtest there", 

610 source_name, 

611 arch, 

612 ) 

613 excuse.addinfo( 

614 "uninstallable on arch %s (which is allowed), not running autopkgtest there" 

615 % arch 

616 ) 

617 elif ( 

618 arch in excuse.unsatisfiable_on_archs 

619 and arch 

620 not in excuse.policy_info["depends"].get( 

621 "autopkgtest_run_anyways", [] 

622 ) 

623 ): 

624 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

625 self.logger.debug( 

626 "%s is uninstallable on arch %s, not running autopkgtest there", 

627 source_name, 

628 arch, 

629 ) 

630 excuse.addinfo( 

631 "uninstallable on arch %s, not running autopkgtest there" % arch 

632 ) 

633 else: 

634 self.request_tests_for_source( 

635 item, arch, source_data_srcdist, pkg_arch_result, excuse 

636 ) 

637 

638 # add test result details to Excuse 

639 cloud_url = self.options.adt_ci_url + "packages/%(h)s/%(s)s/%(r)s/%(a)s" 

640 testver: Optional[str] 

641 for testsrc, testver in sorted(pkg_arch_result): 

642 assert testver is not None 

643 arch_results = pkg_arch_result[(testsrc, testver)] 

644 r = {v[0] for v in arch_results.values()} 

645 if "REGRESSION" in r: 

646 verdict = PolicyVerdict.REJECTED_PERMANENTLY 

647 elif ( 

648 "RUNNING" in r or "RUNNING-REFERENCE" in r 

649 ) and verdict == PolicyVerdict.PASS: 

650 verdict = PolicyVerdict.REJECTED_TEMPORARILY 

651 # skip version if still running on all arches 

652 if not r - {"RUNNING", "RUNNING-ALWAYSFAIL"}: 

653 testver = None 

654 

655 # A source package is eligible for the bounty if it has tests 

656 # of its own that pass on all tested architectures. 

657 if testsrc == source_name: 

658 excuse.autopkgtest_results = r 

659 if r == {"PASS"}: 

660 all_self_tests_pass = True 

661 

662 if testver: 

663 testname = "%s/%s" % (testsrc, testver) 

664 else: 

665 testname = testsrc 

666 

667 html_archmsg = [] 

668 for arch in sorted(arch_results): 

669 (status, run_id, log_url) = arch_results[arch] 

670 artifact_url = None 

671 retry_url = None 

672 reference_url = None 

673 reference_retry_url = None 

674 history_url = None 

675 if self.options.adt_ppas: 

676 if log_url.endswith("log.gz"): 

677 artifact_url = log_url.replace("log.gz", "artifacts.tar.gz") 

678 else: 

679 history_url = cloud_url % { 

680 "h": srchash(testsrc), 

681 "s": testsrc, 

682 "r": self.options.series, 

683 "a": arch, 

684 } 

685 if status in ("NEUTRAL", "REGRESSION", "RUNNING-REFERENCE"): 

686 retry_url = self.format_retry_url( 

687 run_id, arch, testsrc, trigger 

688 ) 

689 

690 baseline_result = self.result_in_baseline(testsrc, arch) 

691 if baseline_result and baseline_result[0] != Result.NONE: 

692 baseline_run_id = str(baseline_result[2]) 

693 reference_url = self.format_log_url( 

694 testsrc, arch, baseline_run_id 

695 ) 

696 if self.options.adt_baseline == "reference": 

697 reference_retry_url = self.format_retry_url( 

698 baseline_run_id, arch, testsrc, REF_TRIG 

699 ) 

700 tests_info.setdefault(testname, {})[arch] = [ 

701 status, 

702 log_url, 

703 history_url, 

704 artifact_url, 

705 retry_url, 

706 ] 

707 

708 # render HTML snippet for testsrc entry for current arch 

709 if history_url: 

710 message = '<a href="%s">%s</a>' % (history_url, arch) 

711 else: 

712 message = arch 

713 message += ': <a href="%s">%s</a>' % ( 

714 log_url, 

715 EXCUSES_LABELS[status], 

716 ) 

717 if retry_url: 

718 message += ( 

719 '<a href="%s" style="text-decoration: none;"> ♻</a>' 

720 % retry_url 

721 ) 

722 if reference_url: 

723 message += ' (<a href="%s">reference</a>' % reference_url 

724 if reference_retry_url: 

725 message += ( 

726 '<a href="%s" style="text-decoration: none;"> ♻</a>' 

727 % reference_retry_url 

728 ) 

729 message += ")" 

730 if artifact_url: 

731 message += ' <a href="%s">[artifacts]</a>' % artifact_url 

732 html_archmsg.append(message) 

733 

734 # render HTML line for testsrc entry 

735 # - if action is or may be required 

736 # - for ones own package 

737 if ( 

738 r 

739 - { 

740 "PASS", 

741 "NEUTRAL", 

742 "RUNNING-ALWAYSFAIL", 

743 "ALWAYSFAIL", 

744 "IGNORE-FAIL", 

745 } 

746 or testsrc == source_name 

747 ): 

748 if testver: 

749 pkg = '<a href="#{0}">{0}</a>/{1}'.format(testsrc, testver) 

750 else: 

751 pkg = '<a href="#{0}">{0}</a>'.format(testsrc) 

752 results_info.append( 

753 "autopkgtest for %s: %s" % (pkg, ", ".join(html_archmsg)) 

754 ) 

755 

756 if verdict != PolicyVerdict.PASS: 

757 # check for force-skiptest hint 

758 hints = self.hints.search( 

759 "force-skiptest", 

760 package=source_name, 

761 version=source_data_srcdist.version, 

762 ) 

763 if hints: 

764 excuse.addreason("skiptest") 

765 excuse.addinfo( 

766 "Should wait for tests relating to %s %s, but forced by %s" 

767 % (source_name, source_data_srcdist.version, hints[0].user) 

768 ) 

769 verdict = PolicyVerdict.PASS_HINTED 

770 else: 

771 excuse.addreason("autopkgtest") 

772 

773 if ( 

774 self.options.adt_success_bounty 

775 and verdict == PolicyVerdict.PASS 

776 and all_self_tests_pass 

777 ): 

778 excuse.add_bounty("autopkgtest", self.options.adt_success_bounty) 

779 if self.options.adt_regression_penalty and verdict in { 

780 PolicyVerdict.REJECTED_PERMANENTLY, 

781 PolicyVerdict.REJECTED_TEMPORARILY, 

782 }: 

783 if self.options.adt_regression_penalty > 0: 783 ↛ 786line 783 didn't jump to line 786, because the condition on line 783 was never false

784 excuse.add_penalty("autopkgtest", self.options.adt_regression_penalty) 

785 # In case we give penalties instead of blocking, we must always pass 

786 verdict = PolicyVerdict.PASS 

787 for i in results_info: 

788 if verdict.is_rejected: 

789 excuse.add_verdict_info(verdict, i) 

790 else: 

791 excuse.addinfo(i) 

792 

793 return verdict 

794 

795 # 

796 # helper functions 

797 # 

798 

799 @staticmethod 

800 def has_autodep8(srcinfo: SourcePackage) -> bool: 

801 """Check if package is covered by autodep8 

802 

803 srcinfo is an item from self.britney.sources 

804 """ 

805 # autodep8? 

806 for t in srcinfo.testsuite: 

807 if t.startswith("autopkgtest-pkg"): 

808 return True 

809 

810 return False 

811 

812 def request_tests_for_source( 

813 self, 

814 item: MigrationItem, 

815 arch: str, 

816 source_data_srcdist: SourcePackage, 

817 pkg_arch_result: dict[ 

818 tuple[str, str], dict[str, tuple[str, Optional[str], str]] 

819 ], 

820 excuse: "Excuse", 

821 ) -> None: 

822 pkg_universe = self.britney.pkg_universe 

823 target_suite = self.suite_info.target_suite 

824 source_suite = item.suite 

825 sources_t = target_suite.sources 

826 sources_s = item.suite.sources 

827 packages_s_a = item.suite.binaries[arch] 

828 source_name = item.package 

829 source_version = source_data_srcdist.version 

830 # request tests (unless they were already requested earlier or have a result) 

831 tests = self.tests_for_source(source_name, source_version, arch, excuse) 

832 is_huge = len(tests) > self.options.adt_huge 

833 

834 # Here we figure out what is required from the source suite 

835 # for the test to install successfully. 

836 # 

837 # The ImplicitDependencyPolicy does a similar calculation, but 

838 # if I (elbrus) understand correctly, only in the reverse 

839 # dependency direction. We are doing something similar here 

840 # but in the dependency direction (note: this code is older). 

841 # We use the ImplicitDependencyPolicy result for the reverse 

842 # dependencies and we keep the code below for the 

843 # dependencies. Using the ImplicitDependencyPolicy results 

844 # also in the reverse direction seems to require quite some 

845 # reorganisation to get that information available here, as in 

846 # the current state only the current excuse is available here 

847 # and the required other excuses may not be calculated yet. 

848 # 

849 # Loop over all binary packages from trigger and 

850 # recursively look up which *versioned* dependencies are 

851 # only satisfied in the source suite. 

852 # 

853 # For all binaries found, look up which packages they 

854 # break/conflict with in the target suite, but not in the 

855 # source suite. The main reason to do this is to cover test 

856 # dependencies, so we will check Testsuite-Triggers as 

857 # well. 

858 # 

859 # OI: do we need to do the first check in a smart way 

860 # (i.e. only for the packages that are actually going to be 

861 # installed) for the breaks/conflicts set as well, i.e. do 

862 # we need to check if any of the packages that we now 

863 # enforce being from the source suite, actually have new 

864 # versioned depends and new breaks/conflicts. 

865 # 

866 # For all binaries found, add the set of unique source 

867 # packages to the list of triggers. 

868 

869 bin_triggers: set[PackageId] = set() 

870 bin_new = set(source_data_srcdist.binaries) 

871 for n_binary in iter_except(bin_new.pop, KeyError): 

872 if n_binary in bin_triggers: 

873 continue 

874 bin_triggers.add(n_binary) 

875 

876 # Check if there is a dependency that is not 

877 # available in the target suite. 

878 # We add slightly too much here, because new binaries 

879 # will also show up, but they are already properly 

880 # installed. Nevermind. 

881 depends = pkg_universe.dependencies_of(n_binary) 

882 # depends is a frozenset{frozenset{BinaryPackageId, ..}} 

883 for deps_of_bin in depends: 

884 if target_suite.any_of_these_are_in_the_suite(deps_of_bin): 

885 # if any of the alternative dependencies is already 

886 # satisfied in the target suite, we can just ignore it 

887 continue 

888 # We'll figure out which version later 

889 bin_new.update( 

890 added_pkgs_compared_to_target_suite(deps_of_bin, target_suite) 

891 ) 

892 

893 # Check if the package breaks/conflicts anything. We might 

894 # be adding slightly too many source packages due to the 

895 # check here as a binary package that is broken may be 

896 # coming from a different source package in the source 

897 # suite. Nevermind. 

898 bin_broken = set() 

899 for t_binary in bin_triggers: 

900 # broken is a frozenset{BinaryPackageId, ..} 

901 broken = pkg_universe.negative_dependencies_of( 

902 cast(BinaryPackageId, t_binary) 

903 ) 

904 broken_in_target = { 

905 p.package_name 

906 for p in target_suite.which_of_these_are_in_the_suite(broken) 

907 } 

908 broken_in_source = { 

909 p.package_name 

910 for p in source_suite.which_of_these_are_in_the_suite(broken) 

911 } 

912 # We want packages with a newer version in the source suite that 

913 # no longer has the conflict. This is an approximation 

914 broken_filtered = set( 

915 p 

916 for p in broken 

917 if p.package_name in broken_in_target 

918 and p.package_name not in broken_in_source 

919 ) 

920 # We add the version in the target suite, but the code below will 

921 # change it to the version in the source suite 

922 bin_broken.update(broken_filtered) 

923 bin_triggers.update(bin_broken) 

924 

925 # The ImplicitDependencyPolicy also found packages that need 

926 # to migrate together, so add them to the triggers too. 

927 for bin_implicit in excuse.depends_packages_flattened: 

928 if bin_implicit.architecture == arch: 

929 bin_triggers.add(bin_implicit) 

930 

931 triggers = set() 

932 for t_binary2 in bin_triggers: 

933 if t_binary2.architecture == arch: 

934 try: 

935 source_of_bin = packages_s_a[t_binary2.package_name].source 

936 # If the version in the target suite is the same, don't add a trigger. 

937 # Note that we looked up the source package in the source suite. 

938 # If it were a different source package in the target suite, however, then 

939 # we would not have this source package in the same version anyway. 

940 if ( 

941 sources_t.get(source_of_bin, None) is None 

942 or sources_s[source_of_bin].version 

943 != sources_t[source_of_bin].version 

944 ): 

945 triggers.add( 

946 source_of_bin + "/" + sources_s[source_of_bin].version 

947 ) 

948 except KeyError: 

949 # Apparently the package was removed from 

950 # unstable e.g. if packages are replaced 

951 # (e.g. -dbg to -dbgsym) 

952 pass 

953 if t_binary2 not in source_data_srcdist.binaries: 

954 for tdep_src in self.testsuite_triggers.get( 954 ↛ 957line 954 didn't jump to line 957, because the loop on line 954 never started

955 t_binary2.package_name, set() 

956 ): 

957 try: 

958 # Only add trigger if versions in the target and source suites are different 

959 if ( 

960 sources_t.get(tdep_src, None) is None 

961 or sources_s[tdep_src].version 

962 != sources_t[tdep_src].version 

963 ): 

964 triggers.add( 

965 tdep_src + "/" + sources_s[tdep_src].version 

966 ) 

967 except KeyError: 

968 # Apparently the source was removed from 

969 # unstable (testsuite_triggers are unified 

970 # over all suites) 

971 pass 

972 trigger = source_name + "/" + source_version 

973 triggers.discard(trigger) 

974 triggers_list = sorted(list(triggers)) 

975 triggers_list.insert(0, trigger) 

976 

977 for testsrc, testver in tests: 

978 self.pkg_test_request(testsrc, arch, triggers_list, huge=is_huge) 

979 (result, real_ver, run_id, url) = self.pkg_test_result( 

980 testsrc, testver, arch, trigger 

981 ) 

982 pkg_arch_result[(testsrc, real_ver)][arch] = (result, run_id, url) 

983 

984 def tests_for_source( 

985 self, src: str, ver: str, arch: str, excuse: "Excuse" 

986 ) -> list[tuple[str, str]]: 

987 """Iterate over all tests that should be run for given source and arch""" 

988 

989 source_suite = self.suite_info.primary_source_suite 

990 target_suite = self.suite_info.target_suite 

991 sources_info = target_suite.sources 

992 binaries_info = target_suite.binaries[arch] 

993 

994 reported_pkgs = set() 

995 

996 tests = [] 

997 

998 # Debian doesn't have linux-meta, but Ubuntu does 

999 # for linux themselves we don't want to trigger tests -- these should 

1000 # all come from linux-meta*. A new kernel ABI without a corresponding 

1001 # -meta won't be installed and thus we can't sensibly run tests against 

1002 # it. 

1003 if ( 1003 ↛ 1007line 1003 didn't jump to line 1007

1004 src.startswith("linux") 

1005 and src.replace("linux", "linux-meta") in sources_info 

1006 ): 

1007 return [] 

1008 

1009 # we want to test the package itself, if it still has a test in unstable 

1010 # but only if the package actually exists on this arch 

1011 srcinfo = source_suite.sources[src] 

1012 if ("autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo)) and len( 

1013 excuse.packages[arch] 

1014 ) > 0: 

1015 reported_pkgs.add(src) 

1016 tests.append((src, ver)) 

1017 

1018 extra_bins = [] 

1019 # Debian doesn't have linux-meta, but Ubuntu does 

1020 # Hack: For new kernels trigger all DKMS packages by pretending that 

1021 # linux-meta* builds a "dkms" binary as well. With that we ensure that we 

1022 # don't regress DKMS drivers with new kernel versions. 

1023 if src.startswith("linux-meta"): 

1024 # does this have any image on this arch? 

1025 for pkg_id in srcinfo.binaries: 

1026 if pkg_id.architecture == arch and "-image" in pkg_id.package_name: 

1027 try: 

1028 extra_bins.append(binaries_info["dkms"].pkg_id) 

1029 except KeyError: 

1030 pass 

1031 

1032 if not self.has_built_on_this_arch_or_is_arch_all(srcinfo, arch): 

1033 return [] 

1034 

1035 pkg_universe = self.britney.pkg_universe 

1036 # plus all direct reverse dependencies and test triggers of its 

1037 # binaries which have an autopkgtest 

1038 for binary in itertools.chain(srcinfo.binaries, extra_bins): 

1039 rdeps = pkg_universe.reverse_dependencies_of(binary) 

1040 for rdep in rdeps: 

1041 try: 

1042 rdep_src = binaries_info[rdep.package_name].source 

1043 # Don't re-trigger the package itself here; this should 

1044 # have been done above if the package still continues to 

1045 # have an autopkgtest in unstable. 

1046 if rdep_src == src: 

1047 continue 

1048 except KeyError: 

1049 continue 

1050 

1051 rdep_src_info = sources_info[rdep_src] 

1052 if "autopkgtest" in rdep_src_info.testsuite or self.has_autodep8( 

1053 rdep_src_info 

1054 ): 

1055 if rdep_src not in reported_pkgs: 

1056 tests.append((rdep_src, rdep_src_info.version)) 

1057 reported_pkgs.add(rdep_src) 

1058 

1059 for tdep_src in self.testsuite_triggers.get(binary.package_name, set()): 

1060 if tdep_src not in reported_pkgs: 

1061 try: 

1062 tdep_src_info = sources_info[tdep_src] 

1063 except KeyError: 

1064 continue 

1065 if "autopkgtest" in tdep_src_info.testsuite or self.has_autodep8( 1065 ↛ 1059line 1065 didn't jump to line 1059, because the condition on line 1065 was never false

1066 tdep_src_info 

1067 ): 

1068 for pkg_id in tdep_src_info.binaries: 1068 ↛ 1059line 1068 didn't jump to line 1059, because the loop on line 1068 didn't complete

1069 if pkg_id.architecture == arch: 

1070 tests.append((tdep_src, tdep_src_info.version)) 

1071 reported_pkgs.add(tdep_src) 

1072 break 

1073 

1074 tests.sort(key=lambda s_v: s_v[0]) 

1075 return tests 

1076 

1077 def read_pending_tests(self) -> None: 

1078 """Read pending test requests from previous britney runs 

1079 

1080 Initialize self.pending_tests with that data. 

1081 """ 

1082 assert self.pending_tests is None, "already initialized" 

1083 if not os.path.exists(self.pending_tests_file): 

1084 self.logger.info( 

1085 "No %s, starting with no pending tests", self.pending_tests_file 

1086 ) 

1087 self.pending_tests = {} 

1088 return 

1089 with open(self.pending_tests_file) as f: 

1090 self.pending_tests = json.load(f) 

1091 if VERSION_KEY in self.pending_tests: 

1092 del self.pending_tests[VERSION_KEY] 

1093 for trigger in list(self.pending_tests.keys()): 

1094 for pkg in list(self.pending_tests[trigger].keys()): 

1095 arch_dict = self.pending_tests[trigger][pkg] 

1096 for arch in list(arch_dict.keys()): 

1097 if ( 

1098 self._now - arch_dict[arch] 

1099 > self.options.adt_pending_max_age 

1100 ): 

1101 del arch_dict[arch] 

1102 if not arch_dict: 

1103 del self.pending_tests[trigger][pkg] 

1104 if not self.pending_tests[trigger]: 

1105 del self.pending_tests[trigger] 

1106 else: 

1107 # Migration code: 

1108 for trigger_data in self.pending_tests.values(): 1108 ↛ 1109line 1108 didn't jump to line 1109, because the loop on line 1108 never started

1109 for pkg, arch_list in trigger_data.items(): 

1110 trigger_data[pkg] = {} 

1111 for arch in arch_list: 

1112 trigger_data[pkg][arch] = self._now 

1113 

1114 self.logger.info( 

1115 "Read pending requested tests from %s", self.pending_tests_file 

1116 ) 

1117 self.logger.debug("%s", self.pending_tests) 

1118 

1119 # this requires iterating over all triggers and thus is expensive; 

1120 # cache the results 

1121 @lru_cache(None) 

1122 def latest_run_for_package(self, src: str, arch: str) -> str: 

1123 """Return latest run ID for src on arch""" 

1124 

1125 latest_run_id = "" 

1126 for srcmap in self.test_results.values(): 

1127 try: 

1128 run_id = srcmap[src][arch][2] 

1129 except KeyError: 

1130 continue 

1131 if run_id > latest_run_id: 

1132 latest_run_id = run_id 

1133 return latest_run_id 

1134 

1135 @lru_cache(None) 

1136 def fetch_swift_results(self, swift_url: str, src: str, arch: str) -> None: 

1137 """Download new results for source package/arch from swift""" 

1138 

1139 # prepare query: get all runs with a timestamp later than the latest 

1140 # run_id for this package/arch; '@' is at the end of each run id, to 

1141 # mark the end of a test run directory path 

1142 # example: <autopkgtest-wily>wily/amd64/libp/libpng/20150630_054517@/result.tar 

1143 query = { 

1144 "delimiter": "@", 

1145 "prefix": "%s/%s/%s/%s/" % (self.options.series, arch, srchash(src), src), 

1146 } 

1147 

1148 # determine latest run_id from results 

1149 if not self.options.adt_shared_results_cache: 

1150 latest_run_id = self.latest_run_for_package(src, arch) 

1151 if latest_run_id: 

1152 query["marker"] = query["prefix"] + latest_run_id 

1153 

1154 # request new results from swift 

1155 url = os.path.join(swift_url, self.swift_container) 

1156 url += "?" + urllib.parse.urlencode(query) 

1157 f = None 

1158 try: 

1159 f = urlopen(url, timeout=30) 

1160 if f.getcode() == 200: 

1161 result_paths = f.read().decode().strip().splitlines() 

1162 elif f.getcode() == 204: # No content 1162 ↛ 1168line 1162 didn't jump to line 1168, because the condition on line 1162 was never false

1163 result_paths = [] 

1164 else: 

1165 # we should not ever end up here as we expect a HTTPError in 

1166 # other cases; e. g. 3XX is something that tells us to adjust 

1167 # our URLS, so fail hard on those 

1168 raise NotImplementedError( 

1169 "fetch_swift_results(%s): cannot handle HTTP code %i" 

1170 % (url, f.getcode()) 

1171 ) 

1172 except IOError as e: 

1173 # 401 "Unauthorized" is swift's way of saying "container does not exist" 

1174 if getattr(e, "code", -1) == 401: 1174 ↛ 1183line 1174 didn't jump to line 1183, because the condition on line 1174 was never false

1175 self.logger.info( 

1176 "fetch_swift_results: %s does not exist yet or is inaccessible", url 

1177 ) 

1178 return 

1179 # Other status codes are usually a transient 

1180 # network/infrastructure failure. Ignoring this can lead to 

1181 # re-requesting tests which we already have results for, so 

1182 # fail hard on this and let the next run retry. 

1183 self.logger.error("Failure to fetch swift results from %s: %s", url, str(e)) 

1184 sys.exit(1) 

1185 finally: 

1186 if f is not None: 1186 ↛ 1189line 1186 didn't jump to line 1189, because the condition on line 1186 was never false

1187 f.close() 1187 ↛ exitline 1187 didn't return from function 'fetch_swift_results', because the return on line 1178 wasn't executed

1188 

1189 for p in result_paths: 

1190 self.fetch_one_result( 

1191 os.path.join(swift_url, self.swift_container, p, "result.tar"), 

1192 src, 

1193 arch, 

1194 ) 

1195 

1196 def fetch_one_result(self, url: str, src: str, arch: str) -> None: 

1197 """Download one result URL for source/arch 

1198 

1199 Remove matching pending_tests entries. 

1200 """ 

1201 f = None 

1202 try: 

1203 f = urlopen(url, timeout=30) 

1204 if f.getcode() == 200: 1204 ↛ 1207line 1204 didn't jump to line 1207, because the condition on line 1204 was never false

1205 tar_bytes = io.BytesIO(f.read()) 

1206 else: 

1207 raise NotImplementedError( 

1208 "fetch_one_result(%s): cannot handle HTTP code %i" 

1209 % (url, f.getcode()) 

1210 ) 

1211 except IOError as err: 

1212 self.logger.error("Failure to fetch %s: %s", url, str(err)) 

1213 # we tolerate "not found" (something went wrong on uploading the 

1214 # result), but other things indicate infrastructure problems 

1215 if getattr(err, "code", -1) == 404: 

1216 return 

1217 sys.exit(1) 

1218 finally: 

1219 if f is not None: 1219 ↛ exit,   1219 ↛ 12212 missed branches: 1) line 1219 didn't return from function 'fetch_one_result', because the return on line 1216 wasn't executed, 2) line 1219 didn't jump to line 1221, because the condition on line 1219 was never false

1220 f.close() 1220 ↛ exitline 1220 didn't return from function 'fetch_one_result', because the return on line 1216 wasn't executed

1221 try: 

1222 with tarfile.open(None, "r", tar_bytes) as tar: 

1223 exitcode = int(tar.extractfile("exitcode").read().strip()) # type: ignore[union-attr] 

1224 srcver = tar.extractfile("testpkg-version").read().decode().strip() # type: ignore[union-attr] 

1225 (ressrc, ver) = srcver.split() 

1226 testinfo = json.loads(tar.extractfile("testinfo.json").read().decode()) # type: ignore[union-attr] 

1227 except (KeyError, ValueError, tarfile.TarError) as err: 

1228 self.logger.error("%s is damaged, ignoring: %s", url, str(err)) 

1229 # ignore this; this will leave an orphaned request in autopkgtest-pending.json 

1230 # and thus require manual retries after fixing the tmpfail, but we 

1231 # can't just blindly attribute it to some pending test. 

1232 return 

1233 

1234 if src != ressrc: 1234 ↛ 1235line 1234 didn't jump to line 1235, because the condition on line 1234 was never true

1235 self.logger.error( 

1236 "%s is a result for package %s, but expected package %s", 

1237 url, 

1238 ressrc, 

1239 src, 

1240 ) 

1241 return 

1242 

1243 # parse recorded triggers in test result 

1244 for e in testinfo.get("custom_environment", []): 1244 ↛ 1249line 1244 didn't jump to line 1249, because the loop on line 1244 didn't complete

1245 if e.startswith("ADT_TEST_TRIGGERS="): 1245 ↛ 1244line 1245 didn't jump to line 1244, because the condition on line 1245 was never false

1246 result_triggers = [i for i in e.split("=", 1)[1].split() if "/" in i] 

1247 break 

1248 else: 

1249 self.logger.error("%s result has no ADT_TEST_TRIGGERS, ignoring") 

1250 return 

1251 

1252 run_id = os.path.basename(os.path.dirname(url)) 

1253 seen = round(calendar.timegm(time.strptime(run_id, "%Y%m%d_%H%M%S@"))) 

1254 # allow some skipped tests, but nothing else 

1255 if exitcode in [0, 2]: 

1256 result = Result.PASS 

1257 elif exitcode == 8: 1257 ↛ 1258line 1257 didn't jump to line 1258, because the condition on line 1257 was never true

1258 result = Result.NEUTRAL 

1259 else: 

1260 result = Result.FAIL 

1261 

1262 self.logger.info( 

1263 "Fetched test result for %s/%s/%s %s (triggers: %s): %s", 

1264 src, 

1265 ver, 

1266 arch, 

1267 run_id, 

1268 result_triggers, 

1269 result.name.lower(), 

1270 ) 

1271 

1272 # remove matching test requests 

1273 for trigger in result_triggers: 

1274 self.remove_from_pending(trigger, src, arch) 

1275 

1276 # add this result 

1277 for trigger in result_triggers: 

1278 self.add_trigger_to_results(trigger, src, ver, arch, run_id, seen, result) 

1279 

1280 def remove_from_pending( 

1281 self, trigger: str, src: str, arch: str, timestamp: int = sys.maxsize 

1282 ) -> None: 

1283 assert self.pending_tests is not None # for type checking 

1284 try: 

1285 arch_dict = self.pending_tests[trigger][src] 

1286 if timestamp < arch_dict[arch]: 

1287 # The result is from before the moment of scheduling, so it's 

1288 # not the one we're waiting for 

1289 return 

1290 del arch_dict[arch] 

1291 if not arch_dict: 

1292 del self.pending_tests[trigger][src] 

1293 if not self.pending_tests[trigger]: 

1294 del self.pending_tests[trigger] 

1295 self.logger.debug( 

1296 "-> matches pending request %s/%s for trigger %s", src, arch, trigger 

1297 ) 

1298 except KeyError: 

1299 self.logger.debug( 

1300 "-> does not match any pending request for %s/%s", src, arch 

1301 ) 

1302 

1303 def add_trigger_to_results( 

1304 self, 

1305 trigger: str, 

1306 src: str, 

1307 ver: str, 

1308 arch: str, 

1309 run_id: str, 

1310 timestamp: int, 

1311 status_to_add: Result, 

1312 ) -> None: 

1313 # Ensure that we got a new enough version 

1314 try: 

1315 (trigsrc, trigver) = trigger.split("/", 1) 

1316 except ValueError: 

1317 self.logger.info("Ignoring invalid test trigger %s", trigger) 

1318 return 

1319 if trigsrc == src and apt_pkg.version_compare(ver, trigver) < 0: 1319 ↛ 1320line 1319 didn't jump to line 1320, because the condition on line 1319 was never true

1320 self.logger.debug( 

1321 "test trigger %s, but run for older version %s, ignoring", trigger, ver 

1322 ) 

1323 return 

1324 

1325 stored_result = ( 

1326 self.test_results.setdefault(trigger, {}) 

1327 .setdefault(src, {}) 

1328 .setdefault(arch, [Result.FAIL, None, "", 0]) 

1329 ) 

1330 

1331 # reruns shouldn't flip the result from PASS or NEUTRAL to 

1332 # FAIL, so remember the most recent version of the best result 

1333 # we've seen. Except for reference updates, which we always 

1334 # want to update with the most recent result. The result data 

1335 # may not be ordered by timestamp, so we need to check time. 

1336 update = False 

1337 if self.options.adt_baseline == "reference" and trigger == REF_TRIG: 

1338 if stored_result[3] < timestamp: 

1339 update = True 

1340 elif status_to_add < stored_result[0]: 

1341 update = True 

1342 elif status_to_add == stored_result[0] and stored_result[3] < timestamp: 

1343 update = True 

1344 

1345 if update: 

1346 stored_result[0] = status_to_add 

1347 stored_result[1] = ver 

1348 stored_result[2] = run_id 

1349 stored_result[3] = timestamp 

1350 

1351 def send_test_request( 

1352 self, src: str, arch: str, triggers: list[str], huge: bool = False 

1353 ) -> None: 

1354 """Send out AMQP request for testing src/arch for triggers 

1355 

1356 If huge is true, then the request will be put into the -huge instead of 

1357 normal queue. 

1358 """ 

1359 if self.options.dry_run: 1359 ↛ 1360line 1359 didn't jump to line 1360, because the condition on line 1359 was never true

1360 return 

1361 

1362 params: dict[str, Any] = {"triggers": triggers} 

1363 if self.options.adt_ppas: 

1364 params["ppas"] = self.options.adt_ppas 

1365 qname = "debci-ppa-%s-%s" % (self.options.series, arch) 

1366 elif huge: 

1367 qname = "debci-huge-%s-%s" % (self.options.series, arch) 

1368 else: 

1369 qname = "debci-%s-%s" % (self.options.series, arch) 

1370 params["submit-time"] = time.strftime("%Y-%m-%d %H:%M:%S%z", time.gmtime()) 

1371 

1372 if self.amqp_channel: 1372 ↛ 1373line 1372 didn't jump to line 1373, because the condition on line 1372 was never true

1373 self.amqp_channel.basic_publish( 

1374 amqp.Message( 

1375 src + "\n" + json.dumps(params), delivery_mode=2 

1376 ), # persistent 

1377 routing_key=qname, 

1378 ) 

1379 # we save pending.json with every request, so that if britney 

1380 # crashes we don't re-request tests. This is only needed when using 

1381 # real amqp, as with file-based submission the pending tests are 

1382 # returned by debci along with the results each run. 

1383 self.save_pending_json() 

1384 else: 

1385 # for file-based submission, triggers are space separated 

1386 params["triggers"] = [" ".join(params["triggers"])] 

1387 assert self.amqp_file_handle 

1388 self.amqp_file_handle.write("%s:%s %s\n" % (qname, src, json.dumps(params))) 

1389 

1390 def pkg_test_request( 

1391 self, src: str, arch: str, all_triggers: list[str], huge: bool = False 

1392 ) -> None: 

1393 """Request one package test for a set of triggers 

1394 

1395 all_triggers is a list of "pkgname/version". These are the packages 

1396 that will be taken from the source suite. The first package in this 

1397 list is the package that triggers the testing of src, the rest are 

1398 additional packages required for installability of the test deps. If 

1399 huge is true, then the request will be put into the -huge instead of 

1400 normal queue. 

1401 

1402 This will only be done if that test wasn't already requested in 

1403 a previous run (i. e. if it's not already in self.pending_tests) 

1404 or if there is already a fresh or a positive result for it. This 

1405 ensures to download current results for this package before 

1406 requesting any test.""" 

1407 trigger = all_triggers[0] 

1408 uses_swift = not self.options.adt_swift_url.startswith("file://") 

1409 try: 

1410 result = self.test_results[trigger][src][arch] 

1411 has_result = True 

1412 except KeyError: 

1413 has_result = False 

1414 

1415 if has_result: 

1416 result_state = result[0] 

1417 if result_state in {Result.OLD_PASS, Result.OLD_FAIL, Result.OLD_NEUTRAL}: 

1418 pass 

1419 elif ( 

1420 result_state == Result.FAIL 

1421 and self.result_in_baseline(src, arch)[0] 

1422 in {Result.PASS, Result.NEUTRAL, Result.OLD_PASS, Result.OLD_NEUTRAL} 

1423 and self._now - result[3] > self.options.adt_retry_older_than 

1424 ): 

1425 # We might want to retry this failure, so continue 

1426 pass 

1427 elif not uses_swift: 

1428 # We're done if we don't retrigger and we're not using swift 

1429 return 

1430 elif result_state in {Result.PASS, Result.NEUTRAL}: 

1431 self.logger.debug( 

1432 "%s/%s triggered by %s already known", src, arch, trigger 

1433 ) 

1434 return 

1435 

1436 # Without swift we don't expect new results 

1437 if uses_swift: 

1438 self.logger.info( 

1439 "Checking for new results for failed %s/%s for trigger %s", 

1440 src, 

1441 arch, 

1442 trigger, 

1443 ) 

1444 self.fetch_swift_results(self.options.adt_swift_url, src, arch) 

1445 # do we have one now? 

1446 try: 

1447 self.test_results[trigger][src][arch] 

1448 return 

1449 except KeyError: 

1450 pass 

1451 

1452 self.request_test_if_not_queued(src, arch, trigger, all_triggers, huge=huge) 

1453 

1454 def request_test_if_not_queued( 

1455 self, 

1456 src: str, 

1457 arch: str, 

1458 trigger: str, 

1459 all_triggers: list[str] = [], 

1460 huge: bool = False, 

1461 ) -> None: 

1462 assert self.pending_tests is not None # for type checking 

1463 if not all_triggers: 

1464 all_triggers = [trigger] 

1465 

1466 # Don't re-request if it's already pending 

1467 arch_dict = self.pending_tests.setdefault(trigger, {}).setdefault(src, {}) 

1468 if arch in arch_dict.keys(): 

1469 self.logger.debug( 

1470 "Test %s/%s for %s is already pending, not queueing", src, arch, trigger 

1471 ) 

1472 else: 

1473 self.logger.debug( 

1474 "Requesting %s autopkgtest on %s to verify %s", src, arch, trigger 

1475 ) 

1476 arch_dict[arch] = self._now 

1477 self.send_test_request(src, arch, all_triggers, huge=huge) 

1478 

1479 def result_in_baseline(self, src: str, arch: str) -> list[Any]: 

1480 """Get the result for src on arch in the baseline 

1481 

1482 The baseline is optionally all data or a reference set) 

1483 """ 

1484 

1485 # this requires iterating over all cached results and thus is expensive; 

1486 # cache the results 

1487 try: 

1488 return self.result_in_baseline_cache[src][arch] 

1489 except KeyError: 

1490 pass 

1491 

1492 result_reference: list[Any] = [Result.NONE, None, "", 0] 

1493 if self.options.adt_baseline == "reference": 

1494 if src not in self.suite_info.target_suite.sources: 

1495 return result_reference 

1496 

1497 try: 

1498 result_reference = self.test_results[REF_TRIG][src][arch] 

1499 self.logger.debug( 

1500 "Found result for src %s in reference: %s", 

1501 src, 

1502 result_reference[0].name, 

1503 ) 

1504 except KeyError: 

1505 self.logger.debug( 

1506 "Found NO result for src %s in reference: %s", 

1507 src, 

1508 result_reference[0].name, 

1509 ) 

1510 self.result_in_baseline_cache[src][arch] = deepcopy(result_reference) 

1511 return result_reference 

1512 

1513 result_ever: list[Any] = [Result.FAIL, None, "", 0] 

1514 for srcmap in self.test_results.values(): 

1515 try: 

1516 if srcmap[src][arch][0] != Result.FAIL: 

1517 result_ever = srcmap[src][arch] 

1518 # If we are not looking at a reference run, We don't really 

1519 # care about anything except the status, so we're done 

1520 # once we find a PASS. 

1521 if result_ever[0] == Result.PASS: 

1522 break 

1523 except KeyError: 

1524 pass 

1525 

1526 self.result_in_baseline_cache[src][arch] = deepcopy(result_ever) 

1527 self.logger.debug("Result for src %s ever: %s", src, result_ever[0].name) 

1528 return result_ever 

1529 

1530 def has_test_in_target(self, src: str) -> bool: 

1531 test_in_target = False 

1532 try: 

1533 srcinfo = self.suite_info.target_suite.sources[src] 

1534 if "autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo): 

1535 test_in_target = True 

1536 # AttributeError is only needed for the test suite as 

1537 # srcinfo can be a NoneType 

1538 except (KeyError, AttributeError): 

1539 pass 

1540 

1541 return test_in_target 

1542 

1543 def pkg_test_result( 

1544 self, src: str, ver: str, arch: str, trigger: str 

1545 ) -> tuple[str, str, Optional[str], str]: 

1546 """Get current test status of a particular package 

1547 

1548 Return (status, real_version, run_id, log_url) tuple; status is a key in 

1549 EXCUSES_LABELS. run_id is None if the test is still running. 

1550 """ 

1551 assert self.pending_tests is not None # for type checking 

1552 # determine current test result status 

1553 run_id = None 

1554 try: 

1555 r = self.test_results[trigger][src][arch] 

1556 ver = r[1] 

1557 run_id = r[2] 

1558 

1559 if r[0] in {Result.FAIL, Result.OLD_FAIL}: 

1560 # determine current test result status 

1561 baseline_result = self.result_in_baseline(src, arch)[0] 

1562 

1563 # Special-case triggers from linux-meta*: we cannot compare 

1564 # results against different kernels, as e. g. a DKMS module 

1565 # might work against the default kernel but fail against a 

1566 # different flavor; so for those, ignore the "ever 

1567 # passed" check; FIXME: check against trigsrc only 

1568 if self.options.adt_baseline != "reference" and ( 

1569 trigger.startswith("linux-meta") or trigger.startswith("linux/") 

1570 ): 

1571 baseline_result = Result.FAIL 

1572 

1573 # Check if the autopkgtest (still) exists in the target suite 

1574 test_in_target = self.has_test_in_target(src) 

1575 

1576 if test_in_target and baseline_result in { 

1577 Result.NONE, 

1578 Result.OLD_FAIL, 

1579 Result.OLD_NEUTRAL, 

1580 Result.OLD_PASS, 

1581 }: 

1582 self.request_test_if_not_queued(src, arch, REF_TRIG) 

1583 

1584 result = "REGRESSION" 

1585 if baseline_result in {Result.FAIL, Result.OLD_FAIL}: 

1586 result = "ALWAYSFAIL" 

1587 elif baseline_result == Result.NONE and test_in_target: 1587 ↛ 1588line 1587 didn't jump to line 1588, because the condition on line 1587 was never true

1588 result = "RUNNING-REFERENCE" 

1589 

1590 if self.options.adt_ignore_failure_for_new_tests and not test_in_target: 

1591 result = "ALWAYSFAIL" 

1592 

1593 if self.has_force_badtest(src, ver, arch): 

1594 result = "IGNORE-FAIL" 

1595 else: 

1596 result = r[0].name 

1597 

1598 url = self.format_log_url(src, arch, run_id) 

1599 except KeyError: 

1600 # no result for src/arch; still running? 

1601 if arch in self.pending_tests.get(trigger, {}).get(src, {}).keys(): 1601 ↛ 1616line 1601 didn't jump to line 1616, because the condition on line 1601 was never false

1602 baseline_result = self.result_in_baseline(src, arch)[0] 

1603 if ( 

1604 self.options.adt_ignore_failure_for_new_tests 

1605 and not self.has_test_in_target(src) 

1606 ): 

1607 result = "RUNNING-ALWAYSFAIL" 

1608 elif baseline_result != Result.FAIL and not self.has_force_badtest( 

1609 src, ver, arch 

1610 ): 

1611 result = "RUNNING" 

1612 else: 

1613 result = "RUNNING-ALWAYSFAIL" 

1614 url = self.options.adt_ci_url + "status/pending" 

1615 else: 

1616 raise RuntimeError( 

1617 "Result for %s/%s/%s (triggered by %s) is neither known nor pending!" 

1618 % (src, ver, arch, trigger) 

1619 ) 

1620 

1621 return (result, ver, run_id, url) 

1622 

1623 def has_force_badtest(self, src: str, ver: str, arch: str) -> bool: 

1624 """Check if src/ver/arch has a force-badtest hint""" 

1625 

1626 assert self.hints is not None 

1627 hints = self.hints.search("force-badtest", package=src) 

1628 if hints: 

1629 self.logger.info( 

1630 "Checking hints for %s/%s/%s: %s", 

1631 src, 

1632 ver, 

1633 arch, 

1634 [str(h) for h in hints], 

1635 ) 

1636 for hint in hints: 

1637 if [ 

1638 mi 

1639 for mi in hint.packages 

1640 if mi.architecture in ["source", arch] 

1641 and ( 

1642 mi.version == "all" 

1643 or apt_pkg.version_compare(ver, mi.version) <= 0 # type: ignore[arg-type] 

1644 ) 

1645 ]: 

1646 return True 

1647 

1648 return False 

1649 

1650 def has_built_on_this_arch_or_is_arch_all( 

1651 self, src_data: SourcePackage, arch: str 

1652 ) -> bool: 

1653 """When a source builds arch:all binaries, those binaries are 

1654 added to all architectures and thus the source 'exists' 

1655 everywhere. This function checks if the source has any arch 

1656 specific binaries on this architecture and if not, if it 

1657 has them on any architecture. 

1658 """ 

1659 packages_s_a = self.suite_info.primary_source_suite.binaries[arch] 

1660 has_unknown_binary = False 

1661 for binary_s in src_data.binaries: 

1662 try: 

1663 binary_u = packages_s_a[binary_s.package_name] 

1664 except KeyError: 

1665 # src_data.binaries has all the built binaries, so if 

1666 # we get here, we know that at least one architecture 

1667 # has architecture specific binaries 

1668 has_unknown_binary = True 

1669 continue 

1670 if binary_u.architecture == arch: 

1671 return True 

1672 # If we get here, we have only seen arch:all packages for this 

1673 # arch. 

1674 return not has_unknown_binary