Coverage for britney2/policies/autopkgtest.py: 90%
847 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-29 17:21 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-29 17:21 +0000
1# Copyright (C) 2013 - 2016 Canonical Ltd.
2# Authors:
3# Colin Watson <cjwatson@ubuntu.com>
4# Jean-Baptiste Lallement <jean-baptiste.lallement@canonical.com>
5# Martin Pitt <martin.pitt@ubuntu.com>
7# This program is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
17import calendar
18import collections
19import http.client
20import io
21import itertools
22import json
23import optparse
24import os
25import sys
26import tarfile
27import time
28import urllib.parse
29from collections.abc import Iterator
30from copy import deepcopy
31from enum import Enum
32from functools import lru_cache, total_ordering
33from typing import TYPE_CHECKING, Any, Optional, cast
34from urllib.error import HTTPError
35from urllib.request import urlopen
36from urllib.response import addinfourl
38import apt_pkg
40from britney2 import (
41 BinaryPackageId,
42 PackageId,
43 SourcePackage,
44 SuiteClass,
45 Suites,
46 TargetSuite,
47)
48from britney2.hints import HintAnnotate, HintType
49from britney2.migrationitem import MigrationItem
50from britney2.policies import PolicyVerdict
51from britney2.policies.policy import AbstractBasePolicy
52from britney2.utils import (
53 filter_out_faux,
54 get_dependency_solvers,
55 iter_except,
56 parse_option,
57)
59if TYPE_CHECKING: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 import amqplib.client_0_8 as amqp
62 from ..britney import Britney
63 from ..excuse import Excuse
64 from ..hints import HintParser
67@total_ordering
68class Result(Enum):
69 PASS = 1
70 NEUTRAL = 2
71 FAIL = 3
72 OLD_PASS = 4
73 OLD_NEUTRAL = 5
74 OLD_FAIL = 6
75 NONE = 7
77 def __lt__(self, other: "Result") -> bool:
78 return True if self.value < other.value else False
81EXCUSES_LABELS = {
82 "PASS": '<span style="background:#87d96c">Pass</span>',
83 "OLD_PASS": '<span style="background:#87d96c">Pass</span>',
84 "NEUTRAL": "No tests, superficial or marked flaky",
85 "OLD_NEUTRAL": "No tests, superficial or marked flaky",
86 "FAIL": '<span style="background:#ff6666">Failed</span>',
87 "OLD_FAIL": '<span style="background:#ff6666">Failed</span>',
88 "ALWAYSFAIL": '<span style="background:#e5c545">Failed (not a regression)</span>',
89 "REGRESSION": '<span style="background:#ff6666">Regression</span>',
90 "IGNORE-FAIL": '<span style="background:#e5c545">Ignored failure</span>',
91 "RUNNING": '<span style="background:#99ddff">Test triggered</span>',
92 "RUNNING-ALWAYSFAIL": "Test triggered (will not be considered a regression)",
93 "RUNNING-IGNORE": "Test triggered (failure will be ignored)",
94 "RUNNING-REFERENCE": '<span style="background:#ff6666">Reference test triggered, but real test failed already</span>',
95}
97REF_TRIG = "migration-reference/0"
99VERSION_KEY = "britney-autopkgtest-pending-file-version"
102def srchash(src: str) -> str:
103 """archive hash prefix for source package"""
105 if src.startswith("lib"): 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true
106 return src[:4]
107 else:
108 return src[0]
111def added_pkgs_compared_to_target_suite(
112 package_ids: frozenset[BinaryPackageId],
113 target_suite: TargetSuite,
114 *,
115 invert: bool = False,
116) -> Iterator[BinaryPackageId]:
117 if invert: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true
118 pkgs_ids_to_ignore = package_ids - set(
119 target_suite.which_of_these_are_in_the_suite(package_ids)
120 )
121 names_ignored = {p.package_name for p in pkgs_ids_to_ignore}
122 else:
123 names_ignored = {
124 p.package_name
125 for p in target_suite.which_of_these_are_in_the_suite(package_ids)
126 }
127 yield from (p for p in package_ids if p.package_name not in names_ignored)
130def all_leaf_results(
131 test_results: dict[str, dict[str, dict[str, list[Any]]]],
132) -> Iterator[list[Any]]:
133 for trigger in test_results.values():
134 for arch in trigger.values():
135 yield from arch.values()
138def mark_result_as_old(result: Result) -> Result:
139 """Convert current result into corresponding old result"""
141 if result == Result.FAIL:
142 result = Result.OLD_FAIL
143 elif result == Result.PASS:
144 result = Result.OLD_PASS
145 elif result == Result.NEUTRAL: 145 ↛ 147line 145 didn't jump to line 147 because the condition on line 145 was always true
146 result = Result.OLD_NEUTRAL
147 return result
150def concat_bdeps(src_data: SourcePackage) -> str:
151 """Concatenate build_deps_arch and build_deps_indep"""
152 return ",".join(
153 (src_data.build_deps_arch or "", src_data.build_deps_indep or "")
154 ).strip(",")
157class AutopkgtestPolicy(AbstractBasePolicy):
158 """autopkgtest regression policy for source migrations
160 Run autopkgtests for the excuse and all of its reverse dependencies, and
161 reject the upload if any of those regress.
162 """
164 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
165 super().__init__(
166 "autopkgtest", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
167 )
168 # tests requested in this and previous runs
169 # trigger -> src -> [arch]
170 self.pending_tests: dict[str, dict[str, dict[str, int]]] | None = None
171 self.pending_tests_file = os.path.join(
172 self.state_dir, "autopkgtest-pending.json"
173 )
174 self.testsuite_triggers: dict[str, set[str]] = {}
175 self.result_in_baseline_cache: dict[str, dict[str, list[Any]]] = (
176 collections.defaultdict(dict)
177 )
179 self.amqp_file_handle: io.TextIOWrapper | None = None
181 # Default values for this policy's options
182 parse_option(options, "adt_baseline")
183 parse_option(options, "adt_huge", to_int=True)
184 parse_option(options, "adt_ppas")
185 parse_option(options, "adt_reference_max_age", day_to_sec=True)
186 parse_option(options, "adt_pending_max_age", default=5, day_to_sec=True)
187 parse_option(options, "adt_regression_penalty", default=0, to_int=True)
188 parse_option(options, "adt_log_url") # see below for defaults
189 parse_option(options, "adt_retry_url") # see below for defaults
190 parse_option(options, "adt_retry_older_than", day_to_sec=True)
191 parse_option(options, "adt_results_cache_age", day_to_sec=True)
192 parse_option(options, "adt_shared_results_cache")
193 parse_option(options, "adt_success_bounty", default=0, to_int=True)
194 parse_option(options, "adt_ignore_failure_for_new_tests", to_bool=True)
196 # When ADT_RESULTS_CACHE_AGE is smaller than or equal to
197 # ADT_REFERENCE_MAX_AGE old reference result will be removed from cache
198 # before the newly scheduled results are in, potentially causing
199 # additional waiting. For packages like glibc this might cause an
200 # infinite delay as there will always be a package that's
201 # waiting. Similarly for ADT_RETRY_OLDER_THAN.
202 if self.options.adt_results_cache_age <= self.options.adt_reference_max_age:
203 self.logger.warning(
204 "Unexpected: ADT_REFERENCE_MAX_AGE bigger than ADT_RESULTS_CACHE_AGE"
205 )
206 if self.options.adt_results_cache_age <= self.options.adt_retry_older_than:
207 self.logger.warning(
208 "Unexpected: ADT_RETRY_OLDER_THAN bigger than ADT_RESULTS_CACHE_AGE"
209 )
211 if not self.options.adt_log_url: 211 ↛ 237line 211 didn't jump to line 237 because the condition on line 211 was always true
212 # Historical defaults
213 if self.options.adt_swift_url.startswith("file://"):
214 self.options.adt_log_url = os.path.join(
215 self.options.adt_ci_url,
216 "data",
217 "autopkgtest",
218 self.options.series,
219 "{arch}",
220 "{hash}",
221 "{package}",
222 "{run_id}",
223 "log.gz",
224 )
225 else:
226 self.options.adt_log_url = os.path.join(
227 self.options.adt_swift_url,
228 "{swift_container}",
229 self.options.series,
230 "{arch}",
231 "{hash}",
232 "{package}",
233 "{run_id}",
234 "log.gz",
235 )
237 if hasattr(self.options, "adt_retry_url_mech"): 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true
238 self.logger.warning(
239 "The ADT_RETRY_URL_MECH configuration has been deprecated."
240 )
241 self.logger.warning(
242 "Instead britney now supports ADT_RETRY_URL for more flexibility."
243 )
244 if self.options.adt_retry_url:
245 self.logger.error(
246 "Please remove the ADT_RETRY_URL_MECH as ADT_RETRY_URL will be used."
247 )
248 elif self.options.adt_retry_url_mech == "run_id":
249 self.options.adt_retry_url = (
250 self.options.adt_ci_url + "api/v1/retry/{run_id}"
251 )
252 if not self.options.adt_retry_url: 252 ↛ 269line 252 didn't jump to line 269 because the condition on line 252 was always true
253 # Historical default
254 self.options.adt_retry_url = (
255 self.options.adt_ci_url
256 + "request.cgi?"
257 + "release={release}&arch={arch}&package={package}&trigger={trigger}{ppas}"
258 )
260 # results map: trigger -> src -> arch -> [passed, version, run_id, seen]
261 # - trigger is "source/version" of an unstable package that triggered
262 # this test run.
263 # - "passed" is a Result
264 # - "version" is the package version of "src" of that test
265 # - "run_id" is an opaque ID that identifies a particular test run for
266 # a given src/arch.
267 # - "seen" is an approximate time stamp of the test run. How this is
268 # deduced depends on the interface used.
269 self.test_results: dict[str, dict[str, dict[str, list[Any]]]] = {}
270 if self.options.adt_shared_results_cache:
271 self.results_cache_file = self.options.adt_shared_results_cache
272 else:
273 self.results_cache_file = os.path.join(
274 self.state_dir, "autopkgtest-results.cache"
275 )
277 try:
278 self.options.adt_ppas = self.options.adt_ppas.strip().split()
279 except AttributeError:
280 self.options.adt_ppas = []
282 self.swift_container = "autopkgtest-" + options.series
283 if self.options.adt_ppas:
284 self.swift_container += "-" + options.adt_ppas[-1].replace("/", "-")
286 # restrict adt_arches to architectures we actually run for
287 self.adt_arches = []
288 for arch in self.options.adt_arches.split():
289 if arch in self.options.architectures:
290 self.adt_arches.append(arch)
291 else:
292 self.logger.info(
293 "Ignoring ADT_ARCHES %s as it is not in architectures list", arch
294 )
296 def __del__(self) -> None:
297 if self.amqp_file_handle: 297 ↛ exitline 297 didn't return from function '__del__' because the condition on line 297 was always true
298 try:
299 self.amqp_file_handle.close()
300 except AttributeError:
301 pass
303 def register_hints(self, hint_parser: "HintParser") -> None:
304 hint_parser.register_hint_type(
305 HintType(
306 "force-badtest",
307 versioned=HintAnnotate.OPTIONAL,
308 architectured=HintAnnotate.OPTIONAL,
309 )
310 )
311 hint_parser.register_hint_type(HintType("force-skiptest"))
313 def initialise(self, britney: "Britney") -> None:
314 super().initialise(britney)
315 # We want to use the "current" time stamp in multiple locations
316 time_now = round(time.time())
317 if hasattr(self.options, "fake_runtime"):
318 time_now = int(self.options.fake_runtime)
319 self._now = time_now
321 # local copies for better performance
322 parse_src_depends = apt_pkg.parse_src_depends
324 # compute inverse Testsuite-Triggers: map, unifying all series
325 self.logger.info("Building inverse testsuite_triggers map")
326 for suite in self.suite_info:
327 for src, data in suite.sources.items():
328 # for now, let's assume that autodep8 uses builddeps (most do)
329 if (
330 self.has_autodep8(data)
331 and "@builddeps@" not in data.testsuite_triggers
332 ):
333 data.testsuite_triggers.append("@builddeps@")
334 for trigger in data.testsuite_triggers:
335 if trigger == "@builddeps@":
336 for arch in self.adt_arches:
337 for block in parse_src_depends(
338 concat_bdeps(data), True, arch
339 ):
340 self.testsuite_triggers.setdefault(
341 block[0][0], set()
342 ).add(src)
343 else:
344 self.testsuite_triggers.setdefault(trigger, set()).add(src)
345 target_suite_name = self.suite_info.target_suite.name
347 os.makedirs(self.state_dir, exist_ok=True)
348 self.read_pending_tests()
350 # read the cached results that we collected so far
351 if os.path.exists(self.results_cache_file):
352 with open(self.results_cache_file) as f:
353 test_results = json.load(f)
354 self.test_results = self.check_and_upgrade_cache(test_results)
355 self.logger.info("Read previous results from %s", self.results_cache_file)
356 else:
357 self.logger.info(
358 "%s does not exist, re-downloading all results from swift",
359 self.results_cache_file,
360 )
362 # read in the new results
363 if self.options.adt_swift_url.startswith("file://"):
364 debci_file = self.options.adt_swift_url[7:]
365 if os.path.exists(debci_file):
366 with open(debci_file) as f:
367 test_results = json.load(f)
368 self.logger.info("Read new results from %s", debci_file)
369 for res in test_results["results"]:
370 # if there's no date, the test didn't finish yet
371 if res["date"] is None: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true
372 continue
373 (test_suite, triggers, src, arch, ver, status, run_id, seen) = [
374 res["suite"],
375 res["trigger"],
376 res["package"],
377 res["arch"],
378 res["version"],
379 res["status"],
380 str(res["run_id"]),
381 round(
382 calendar.timegm(
383 time.strptime(res["date"][0:-5], "%Y-%m-%dT%H:%M:%S")
384 )
385 ),
386 ]
387 if test_suite != target_suite_name: 387 ↛ 389line 387 didn't jump to line 389 because the condition on line 387 was never true
388 # not requested for this target suite, so ignore
389 continue
390 if triggers is None: 390 ↛ 392line 390 didn't jump to line 392 because the condition on line 390 was never true
391 # not requested for this policy, so ignore
392 continue
393 if status is None:
394 # still running => pending
395 continue
396 for trigger in triggers.split():
397 # remove matching test requests
398 self.remove_from_pending(trigger, src, arch, seen)
399 if status == "tmpfail": 399 ↛ 401line 399 didn't jump to line 401 because the condition on line 399 was never true
400 # let's see if we still need it
401 continue
402 self.logger.debug(
403 "Results %s %s %s added", src, trigger, status
404 )
405 self.add_trigger_to_results(
406 trigger,
407 src,
408 ver,
409 arch,
410 run_id,
411 seen,
412 Result[status.upper()],
413 )
414 else:
415 self.logger.info(
416 "%s does not exist, no new data will be processed", debci_file
417 )
419 # The cache can contain results against versions of packages that
420 # are not in any suite anymore. Strip those out, as we don't want
421 # to use those results. Additionally, old references may be
422 # filtered out.
423 if self.options.adt_baseline == "reference":
424 self.filter_old_results()
426 # we need sources, binaries, and installability tester, so for now
427 # remember the whole britney object
428 self.britney = britney
430 # Initialize AMQP connection
431 self.amqp_channel: Optional["amqp.channel.Channel"] = None
432 self.amqp_file_handle = None
433 if self.options.dry_run: 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true
434 return
436 amqp_url = self.options.adt_amqp
438 if amqp_url.startswith("amqp://"): 438 ↛ 439line 438 didn't jump to line 439 because the condition on line 438 was never true
439 import amqplib.client_0_8 as amqp
441 # depending on the setup we connect to a AMQP server
442 creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False)
443 self.amqp_con = amqp.Connection(
444 creds.hostname, userid=creds.username, password=creds.password
445 )
446 self.amqp_channel = self.amqp_con.channel()
447 self.logger.info("Connected to AMQP server")
448 elif amqp_url.startswith("file://"): 448 ↛ 453line 448 didn't jump to line 453 because the condition on line 448 was always true
449 # or in Debian and in testing mode, adt_amqp will be a file:// URL
450 amqp_file = amqp_url[7:]
451 self.amqp_file_handle = open(amqp_file, "w", 1)
452 else:
453 raise RuntimeError("Unknown ADT_AMQP schema %s" % amqp_url.split(":", 1)[0])
455 def check_and_upgrade_cache(
456 self, test_results: dict[str, dict[str, dict[str, list[Any]]]]
457 ) -> dict[str, dict[str, dict[str, list[Any]]]]:
458 for leaf_result in all_leaf_results(test_results):
459 leaf_result[0] = Result[leaf_result[0]]
461 # Drop results older than ADT_RESULTS_CACHE_AGE
462 for trigger in list(test_results.keys()):
463 for pkg in list(test_results[trigger].keys()):
464 for arch in list(test_results[trigger][pkg].keys()):
465 arch_result = test_results[trigger][pkg][arch]
466 if self._now - arch_result[3] > self.options.adt_results_cache_age: 466 ↛ 467line 466 didn't jump to line 467 because the condition on line 466 was never true
467 del test_results[trigger][pkg][arch]
468 if not test_results[trigger][pkg]: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true
469 del test_results[trigger][pkg]
470 if not test_results[trigger]: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true
471 del test_results[trigger]
473 return test_results
475 def filter_old_results(self) -> None:
476 """Remove results for old versions and reference runs from the cache.
478 For now, only delete reference runs. If we delete regular
479 results after a while, packages with lots of triggered tests may
480 never have all the results at the same time."""
482 test_results = self.test_results
484 for trigger, trigger_data in test_results.items():
485 for src, results in trigger_data.items():
486 for arch, result in results.items():
487 if (
488 trigger == REF_TRIG
489 and self._now - result[3] > self.options.adt_reference_max_age
490 ):
491 result[0] = mark_result_as_old(result[0])
492 elif not self.test_version_in_any_suite(src, result[1]):
493 result[0] = mark_result_as_old(result[0])
495 def test_version_in_any_suite(self, src: str, version: str) -> bool:
496 """Check if the mentioned version of src is found in a suite
498 To prevent regressions in the target suite, the result should be
499 from a test with the version of the package in either the source
500 suite or the target suite. The source suite is also valid,
501 because due to versioned test dependencies and Breaks/Conflicts
502 relations, regularly the version in the source suite is used
503 during testing.
504 """
506 versions = set()
507 for suite in self.suite_info:
508 try:
509 srcinfo = suite.sources[src]
510 except KeyError:
511 continue
512 versions.add(srcinfo.version)
514 valid_version = False
515 for ver in versions:
516 if apt_pkg.version_compare(ver, version) == 0:
517 valid_version = True
518 break
520 return valid_version
522 def save_pending_json(self) -> None:
523 # update the pending tests on-disk cache
524 self.logger.info(
525 "Updating pending requested tests in %s" % self.pending_tests_file
526 )
527 # Shallow clone pending_tests as we only modify the toplevel and change its type.
528 pending_tests: dict[str, Any] = {}
529 if self.pending_tests:
530 pending_tests = dict(self.pending_tests)
531 # Avoid adding if there are no pending results at all (eases testing)
532 pending_tests[VERSION_KEY] = 1
533 with open(self.pending_tests_file + ".new", "w") as f:
534 json.dump(pending_tests, f, indent=2)
535 os.rename(self.pending_tests_file + ".new", self.pending_tests_file)
537 def save_state(self, britney: "Britney") -> None:
538 super().save_state(britney)
540 # update the results on-disk cache, unless we are using a r/o shared one
541 if not self.options.adt_shared_results_cache:
542 self.logger.info("Updating results cache")
543 test_results = deepcopy(self.test_results)
544 for result in all_leaf_results(test_results):
545 result[0] = result[0].name
546 with open(self.results_cache_file + ".new", "w") as f:
547 json.dump(test_results, f, indent=2)
548 os.rename(self.results_cache_file + ".new", self.results_cache_file)
550 self.save_pending_json()
552 def format_retry_url(
553 self, run_id: str | None, arch: str, testsrc: str, trigger: str
554 ) -> str:
555 if self.options.adt_ppas:
556 ppas = "&" + urllib.parse.urlencode(
557 [("ppa", p) for p in self.options.adt_ppas]
558 )
559 else:
560 ppas = ""
561 return cast(str, self.options.adt_retry_url).format(
562 run_id=run_id,
563 release=self.options.series,
564 arch=arch,
565 package=testsrc,
566 trigger=urllib.parse.quote_plus(trigger),
567 ppas=ppas,
568 )
570 def format_log_url(self, testsrc: str, arch: str, run_id: str) -> str:
571 return cast(str, self.options.adt_log_url).format(
572 release=self.options.series,
573 swift_container=self.swift_container,
574 hash=srchash(testsrc),
575 package=testsrc,
576 arch=arch,
577 run_id=run_id,
578 )
580 def apply_src_policy_impl(
581 self,
582 tests_info: dict[str, Any],
583 source_data_tdist: SourcePackage | None,
584 source_data_srcdist: SourcePackage,
585 excuse: "Excuse",
586 ) -> PolicyVerdict:
588 # initialize
589 verdict = PolicyVerdict.PASS
590 source_name = excuse.item.package
592 # skip/delay autopkgtests until new package is built somewhere
593 if not filter_out_faux(source_data_srcdist.binaries):
594 self.logger.debug(
595 "%s hasnot been built anywhere, skipping autopkgtest policy",
596 excuse.name,
597 )
598 verdict = PolicyVerdict.REJECTED_TEMPORARILY
599 excuse.add_verdict_info(verdict, "Autopkgtest deferred: missing all builds")
601 if "all" in excuse.missing_builds:
602 self.logger.debug(
603 "%s hasnot been built for arch:all, skipping autopkgtest policy",
604 source_name,
605 )
606 verdict = PolicyVerdict.REJECTED_TEMPORARILY
607 excuse.add_verdict_info(
608 verdict, "Autopkgtest deferred: missing arch:all build"
609 )
611 all_self_tests_pass = False
612 results_info: list[str] = []
613 if not verdict.is_rejected:
614 self.logger.debug("Checking autopkgtests for %s", source_name)
615 trigger = source_name + "/" + source_data_srcdist.version
617 # build a (testsrc, testver) → arch → (status, run_id, log_url) map; we trigger/check test
618 # results per architecture for technical/efficiency reasons, but we
619 # want to evaluate and present the results by tested source package
620 # first
621 pkg_arch_result: dict[
622 tuple[str, str], dict[str, tuple[str, str | None, str]]
623 ] = collections.defaultdict(dict)
624 for arch in self.adt_arches:
625 if arch in excuse.missing_builds:
626 verdict = PolicyVerdict.REJECTED_TEMPORARILY
627 self.logger.debug(
628 "%s hasnot been built on arch %s, delay autopkgtest there",
629 source_name,
630 arch,
631 )
632 excuse.add_verdict_info(
633 verdict,
634 f"Autopkgtest deferred on {arch}: missing arch:{arch} build",
635 )
636 else:
637 verdict = self.check_and_request_arch(
638 excuse,
639 arch,
640 source_data_srcdist,
641 pkg_arch_result,
642 trigger,
643 verdict,
644 )
646 verdict, results_info, all_self_tests_pass = self.process_pkg_arch_results(
647 tests_info, excuse, pkg_arch_result, verdict, trigger
648 )
650 verdict = self.finalize_excuse(
651 excuse, verdict, all_self_tests_pass, results_info
652 )
653 return verdict
655 def apply_srcarch_policy_impl(
656 self,
657 tests_info: dict[str, Any],
658 arch: str,
659 source_data_tdist: SourcePackage | None,
660 source_data_srcdist: SourcePackage,
661 excuse: "Excuse",
662 ) -> PolicyVerdict:
664 assert self.hints is not None # for type checking
665 # initialize
666 verdict = PolicyVerdict.PASS
668 self.logger.debug(f"Checking autopkgtests for binNMU {str(excuse.item)}/{arch}")
670 if arch not in self.adt_arches:
671 return verdict
673 # find the binNMU version
674 versions = set()
675 for bin_pkg in source_data_srcdist.binaries:
676 if bin_pkg.architecture == arch:
677 if (
678 len(parts := bin_pkg.version.split("+b")) > 1
679 and parts[-1].isdigit()
680 ):
681 versions.add(parts[-1])
682 else:
683 self.logger.debug(
684 f"Version {bin_pkg.version} doesn't end with '+b#', skipping"
685 )
686 if not versions or len(versions) > 1:
687 self.logger.debug("This migration item doesn't look like a binNMU")
688 return verdict
690 trigger = str(excuse.item) + "/" + versions.pop()
692 # While we don't need the arch here, this is common with apply_src_policy_impl()
693 # (testsrc, testver) → arch → (status, run_id, log_url) map
694 pkg_arch_result: dict[
695 tuple[str, str], dict[str, tuple[str, str | None, str]]
696 ] = collections.defaultdict(dict)
698 verdict = self.check_and_request_arch(
699 excuse, arch, source_data_srcdist, pkg_arch_result, trigger, verdict
700 )
702 verdict, results_info, all_self_tests_pass = self.process_pkg_arch_results(
703 tests_info, excuse, pkg_arch_result, verdict, trigger
704 )
706 verdict = self.finalize_excuse(
707 excuse, verdict, all_self_tests_pass, results_info
708 )
709 return verdict
711 def check_and_request_arch(
712 self,
713 excuse: "Excuse",
714 arch: str,
715 source_data_srcdist: SourcePackage,
716 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]],
717 trigger: str,
718 verdict: PolicyVerdict,
719 ) -> PolicyVerdict:
720 """Perform sanity checks and request test/results when they pass"""
722 source_name = excuse.item.package
723 if arch in excuse.policy_info["depends"].get("arch_all_not_installable", []):
724 self.logger.debug(
725 "%s is uninstallable on arch %s (which is allowed), not running autopkgtest there",
726 source_name,
727 arch,
728 )
729 excuse.addinfo(
730 f"Autopkgtest skipped on {arch}: not installable (which is allowed)"
731 )
732 elif arch in excuse.unsatisfiable_on_archs and arch not in excuse.policy_info[
733 "depends"
734 ].get("autopkgtest_run_anyways", []):
735 verdict = PolicyVerdict.REJECTED_TEMPORARILY
736 self.logger.debug(
737 "%s is uninstallable on arch %s, not running autopkgtest there",
738 source_name,
739 arch,
740 )
741 excuse.addinfo(f"Autopkgtest skipped on {arch}: not installable")
742 else:
743 self.request_tests_for_source(
744 arch, source_data_srcdist, pkg_arch_result, excuse, trigger
745 )
747 return verdict
749 def process_pkg_arch_results(
750 self,
751 tests_info: dict[str, Any],
752 excuse: "Excuse",
753 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]],
754 verdict: PolicyVerdict,
755 trigger: str,
756 ) -> tuple[PolicyVerdict, list[str], bool]:
757 """Calculate verdict based on results and render excuse text"""
759 source_name = excuse.item.package
760 all_self_tests_pass = False
761 results_info = []
763 # add test result details to Excuse
764 cloud_url = self.options.adt_ci_url + "packages/%(h)s/%(s)s/%(r)s/%(a)s"
765 testver: str | None
766 for testsrc, testver in sorted(pkg_arch_result):
767 assert testver is not None
768 arch_results = pkg_arch_result[(testsrc, testver)]
769 r = {v[0] for v in arch_results.values()}
770 if r & {"FAIL", "OLD_FAIL", "REGRESSION"}:
771 verdict = PolicyVerdict.REJECTED_PERMANENTLY
772 elif r & {"RUNNING", "RUNNING-REFERENCE"} and not verdict.is_rejected:
773 verdict = PolicyVerdict.REJECTED_TEMPORARILY
774 # skip version if still running on all arches
775 if not r - {"RUNNING", "RUNNING-ALWAYSFAIL", "RUNNING-IGNORE"}:
776 testver = None
778 # A source package is eligible for the bounty if it has tests
779 # of its own that pass on all tested architectures.
780 if testsrc == source_name:
781 excuse.autopkgtest_results = r
782 if r == {"PASS"}:
783 all_self_tests_pass = True
785 if testver:
786 testname = f"{testsrc}/{testver}"
787 else:
788 testname = testsrc
790 html_archmsg = []
791 for arch in sorted(arch_results):
792 (status, run_id, log_url) = arch_results[arch]
793 artifact_url = None
794 retry_url = None
795 reference_url = None
796 reference_retry_url = None
797 history_url = None
798 if self.options.adt_ppas:
799 if log_url.endswith("log.gz"):
800 artifact_url = log_url.replace("log.gz", "artifacts.tar.gz")
801 else:
802 history_url = cloud_url % {
803 "h": srchash(testsrc),
804 "s": testsrc,
805 "r": self.options.series,
806 "a": arch,
807 }
808 if status not in ("PASS", "RUNNING", "RUNNING-IGNORE"):
809 retry_url = self.format_retry_url(run_id, arch, testsrc, trigger)
811 baseline_result = self.result_in_baseline(testsrc, arch)
812 if baseline_result and baseline_result[0] != Result.NONE:
813 baseline_run_id = str(baseline_result[2])
814 reference_url = self.format_log_url(
815 testsrc, arch, baseline_run_id
816 )
817 if self.options.adt_baseline == "reference":
818 reference_retry_url = self.format_retry_url(
819 baseline_run_id, arch, testsrc, REF_TRIG
820 )
821 tests_info.setdefault(testname, {})[arch] = [
822 status,
823 log_url,
824 history_url,
825 artifact_url,
826 retry_url,
827 ]
829 # render HTML snippet for testsrc entry for current arch
830 if history_url:
831 message = f'<a href="{history_url}">{arch}</a>'
832 else:
833 message = arch
834 message += ': <a href="{}">{}</a>'.format(
835 log_url,
836 EXCUSES_LABELS[status],
837 )
838 if retry_url:
839 message += (
840 '<a href="%s" style="text-decoration: none;"> ♻</a>' % retry_url
841 )
842 if reference_url:
843 message += ' (<a href="%s">reference</a>' % reference_url
844 if reference_retry_url:
845 message += (
846 '<a href="%s" style="text-decoration: none;"> ♻</a>'
847 % reference_retry_url
848 )
849 message += ")"
850 if artifact_url:
851 message += ' <a href="%s">[artifacts]</a>' % artifact_url
852 html_archmsg.append(message)
854 # render HTML line for testsrc entry
855 # - if action is or may be required
856 # - for ones own package
857 if (
858 r
859 - {
860 "PASS",
861 "NEUTRAL",
862 "RUNNING-ALWAYSFAIL",
863 "ALWAYSFAIL",
864 "IGNORE-FAIL",
865 }
866 or testsrc == source_name
867 ):
868 if testver:
869 pkg = '<a href="#{0}">{0}</a>/{1}'.format(testsrc, testver)
870 else:
871 pkg = '<a href="#{0}">{0}</a>'.format(testsrc)
872 results_info.append(
873 "Autopkgtest for {}: {}".format(pkg, ", ".join(html_archmsg))
874 )
876 return (verdict, results_info, all_self_tests_pass)
878 def finalize_excuse(
879 self,
880 excuse: "Excuse",
881 verdict: PolicyVerdict,
882 all_self_tests_pass: bool,
883 results_info: list[str],
884 ) -> PolicyVerdict:
885 """Updates excuses and verdict for hints and bounty/penalty config
887 Given the verdict so far, hints and configuration, the verdict may be
888 updated. Depending of the end verdict, the content of results_info is
889 added as info or as excuse.
890 """
892 package = excuse.item.package
893 version = excuse.item.version
895 assert self.hints is not None # for type checking
896 if verdict.is_rejected:
897 # check for force-skiptest hint
898 hints = self.hints.search(
899 "force-skiptest",
900 package=package,
901 version=version,
902 )
903 if hints:
904 excuse.addreason("skiptest")
905 excuse.addinfo(
906 "Autopkgtest check should wait for tests relating to %s %s, but forced by %s"
907 % (package, version, hints[0].user)
908 )
909 verdict = PolicyVerdict.PASS_HINTED
910 else:
911 excuse.addreason("autopkgtest")
913 if (
914 self.options.adt_success_bounty
915 and verdict == PolicyVerdict.PASS
916 and all_self_tests_pass
917 ):
918 excuse.add_bounty("autopkgtest", self.options.adt_success_bounty)
919 if self.options.adt_regression_penalty and verdict in {
920 PolicyVerdict.REJECTED_PERMANENTLY,
921 PolicyVerdict.REJECTED_TEMPORARILY,
922 }:
923 if self.options.adt_regression_penalty > 0: 923 ↛ 926line 923 didn't jump to line 926 because the condition on line 923 was always true
924 excuse.add_penalty("autopkgtest", self.options.adt_regression_penalty)
925 # In case we give penalties instead of blocking, we must always pass
926 verdict = PolicyVerdict.PASS
927 for i in results_info:
928 if verdict.is_rejected:
929 excuse.add_verdict_info(verdict, i)
930 else:
931 excuse.addinfo(i)
933 return verdict
935 @staticmethod
936 def has_autodep8(srcinfo: SourcePackage) -> bool:
937 """Check if package is covered by autodep8
939 srcinfo is an item from self.britney.sources
940 """
941 # autodep8?
942 for t in srcinfo.testsuite:
943 if t.startswith("autopkgtest-pkg"):
944 return True
946 return False
948 def request_tests_for_source(
949 self,
950 arch: str,
951 source_data_srcdist: SourcePackage,
952 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]],
953 excuse: "Excuse",
954 trigger: str,
955 ) -> None:
956 pkg_universe = self.britney.pkg_universe
957 target_suite = self.suite_info.target_suite
958 source_suite = excuse.item.suite
959 sources_t = target_suite.sources
960 sources_s = excuse.item.suite.sources
961 packages_s_a = excuse.item.suite.binaries[arch]
962 source_name = excuse.item.package
963 source_version = source_data_srcdist.version
964 # request tests (unless they were already requested earlier or have a result)
965 tests = self.tests_for_source(source_name, source_version, arch, excuse)
966 is_huge = len(tests) > self.options.adt_huge
968 # local copies for better performance
969 parse_src_depends = apt_pkg.parse_src_depends
971 # Here we figure out what is required from the source suite
972 # for the test to install successfully.
973 #
974 # The ImplicitDependencyPolicy does a similar calculation, but
975 # if I (elbrus) understand correctly, only in the reverse
976 # dependency direction. We are doing something similar here
977 # but in the dependency direction (note: this code is older).
978 # We use the ImplicitDependencyPolicy result for the reverse
979 # dependencies and we keep the code below for the
980 # dependencies. Using the ImplicitDependencyPolicy results
981 # also in the reverse direction seems to require quite some
982 # reorganisation to get that information available here, as in
983 # the current state only the current excuse is available here
984 # and the required other excuses may not be calculated yet.
985 #
986 # Loop over all binary packages from trigger and
987 # recursively look up which *versioned* dependencies are
988 # only satisfied in the source suite.
989 #
990 # For all binaries found, look up which packages they
991 # break/conflict with in the target suite, but not in the
992 # source suite. The main reason to do this is to cover test
993 # dependencies, so we will check Testsuite-Triggers as
994 # well.
995 #
996 # OI: do we need to do the first check in a smart way
997 # (i.e. only for the packages that are actually going to be
998 # installed) for the breaks/conflicts set as well, i.e. do
999 # we need to check if any of the packages that we now
1000 # enforce being from the source suite, actually have new
1001 # versioned depends and new breaks/conflicts.
1002 #
1003 # For all binaries found, add the set of unique source
1004 # packages to the list of triggers.
1006 bin_triggers: set[PackageId] = set()
1007 bin_new = set(filter_out_faux(source_data_srcdist.binaries))
1008 # For each build-depends block (if any) check if the first alternative
1009 # is satisfiable in the target suite. If not, add it to the initial set
1010 # used for checking.
1011 for block in parse_src_depends(concat_bdeps(source_data_srcdist), True, arch):
1012 if not get_dependency_solvers(
1013 [block[0]],
1014 target_suite.binaries[arch],
1015 target_suite.provides_table[arch],
1016 build_depends=True,
1017 ) and (
1018 solvers := get_dependency_solvers(
1019 [block[0]],
1020 packages_s_a,
1021 source_suite.provides_table[arch],
1022 build_depends=True,
1023 )
1024 ):
1025 bin_new.add(solvers[0].pkg_id)
1026 for n_binary in iter_except(bin_new.pop, KeyError):
1027 if n_binary in bin_triggers:
1028 continue
1029 bin_triggers.add(n_binary)
1031 # Check if there is a dependency that is not
1032 # available in the target suite.
1033 # We add slightly too much here, because new binaries
1034 # will also show up, but they are already properly
1035 # installed. Nevermind.
1036 depends = pkg_universe.dependencies_of(n_binary)
1037 # depends is a frozenset{frozenset{BinaryPackageId, ..}}
1038 for deps_of_bin in depends:
1039 if target_suite.any_of_these_are_in_the_suite(deps_of_bin):
1040 # if any of the alternative dependencies is already
1041 # satisfied in the target suite, we can just ignore it
1042 continue
1043 # We'll figure out which version later
1044 bin_new.update(
1045 added_pkgs_compared_to_target_suite(deps_of_bin, target_suite)
1046 )
1048 # Check if the package breaks/conflicts anything. We might
1049 # be adding slightly too many source packages due to the
1050 # check here as a binary package that is broken may be
1051 # coming from a different source package in the source
1052 # suite. Nevermind.
1053 bin_broken = set()
1054 for t_binary in bin_triggers:
1055 # broken is a frozenset{BinaryPackageId, ..}
1056 broken = pkg_universe.negative_dependencies_of(
1057 cast(BinaryPackageId, t_binary)
1058 )
1059 broken_in_target = {
1060 p.package_name
1061 for p in target_suite.which_of_these_are_in_the_suite(broken)
1062 }
1063 broken_in_source = {
1064 p.package_name
1065 for p in source_suite.which_of_these_are_in_the_suite(broken)
1066 }
1067 # We want packages with a newer version in the source suite that
1068 # no longer has the conflict. This is an approximation
1069 broken_filtered = {
1070 p
1071 for p in broken
1072 if p.package_name in broken_in_target
1073 and p.package_name not in broken_in_source
1074 }
1075 # We add the version in the target suite, but the code below will
1076 # change it to the version in the source suite
1077 bin_broken.update(broken_filtered)
1078 bin_triggers.update(bin_broken)
1080 # The ImplicitDependencyPolicy also found packages that need
1081 # to migrate together, so add them to the triggers too.
1082 for bin_implicit in excuse.depends_packages_flattened:
1083 if bin_implicit.architecture == arch:
1084 bin_triggers.add(bin_implicit)
1086 triggers = set()
1087 for t_binary2 in bin_triggers:
1088 if t_binary2.architecture == arch:
1089 try:
1090 source_of_bin = packages_s_a[t_binary2.package_name].source
1091 # If the version in the target suite is the same, don't add a trigger.
1092 # Note that we looked up the source package in the source suite.
1093 # If it were a different source package in the target suite, however, then
1094 # we would not have this source package in the same version anyway.
1095 #
1096 # binNMU's exist, so let's also check if t_binary2 exists
1097 # in the target suite if the sources are the same.
1098 if (
1099 sources_t.get(source_of_bin, None) is None
1100 or sources_s[source_of_bin].version
1101 != sources_t[source_of_bin].version
1102 or not target_suite.any_of_these_are_in_the_suite(
1103 {cast(BinaryPackageId, t_binary2)}
1104 )
1105 ):
1106 triggers.add(
1107 source_of_bin + "/" + sources_s[source_of_bin].version
1108 )
1109 except KeyError:
1110 # Apparently the package was removed from
1111 # unstable e.g. if packages are replaced
1112 # (e.g. -dbg to -dbgsym)
1113 pass
1114 if t_binary2 not in source_data_srcdist.binaries:
1115 for tdep_src in self.testsuite_triggers.get(
1116 t_binary2.package_name, set()
1117 ):
1118 try:
1119 # Only add trigger if versions in the target and source suites are different
1120 if ( 1120 ↛ 1115line 1120 didn't jump to line 1115
1121 sources_t.get(tdep_src, None) is None
1122 or sources_s[tdep_src].version
1123 != sources_t[tdep_src].version
1124 ):
1125 triggers.add(
1126 tdep_src + "/" + sources_s[tdep_src].version
1127 )
1128 except KeyError:
1129 # Apparently the source was removed from
1130 # unstable (testsuite_triggers are unified
1131 # over all suites)
1132 pass
1133 source_trigger = source_name + "/" + source_version
1134 triggers.discard(source_trigger)
1135 triggers_list = sorted(list(triggers))
1136 triggers_list.insert(0, trigger)
1138 for testsrc, testver in tests:
1139 self.pkg_test_request(testsrc, arch, triggers_list, huge=is_huge)
1140 (result, real_ver, run_id, url) = self.pkg_test_result(
1141 testsrc, testver, arch, trigger
1142 )
1143 pkg_arch_result[(testsrc, real_ver)][arch] = (result, run_id, url)
1145 def tests_for_source(
1146 self, src: str, ver: str, arch: str, excuse: "Excuse"
1147 ) -> list[tuple[str, str]]:
1148 """Iterate over all tests that should be run for given source and arch"""
1150 source_suite = self.suite_info.primary_source_suite
1151 target_suite = self.suite_info.target_suite
1152 sources_info = target_suite.sources
1153 binaries_info = target_suite.binaries[arch]
1155 reported_pkgs = set()
1157 tests = []
1159 # Debian doesn't have linux-meta, but Ubuntu does
1160 # for linux themselves we don't want to trigger tests -- these should
1161 # all come from linux-meta*. A new kernel ABI without a corresponding
1162 # -meta won't be installed and thus we can't sensibly run tests against
1163 # it.
1164 if ( 1164 ↛ 1168line 1164 didn't jump to line 1168
1165 src.startswith("linux")
1166 and src.replace("linux", "linux-meta") in sources_info
1167 ):
1168 return []
1170 # we want to test the package itself, if it still has a test in unstable
1171 # but only if the package actually exists on this arch
1172 srcinfo = source_suite.sources[src]
1173 if ("autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo)) and len(
1174 excuse.packages[arch]
1175 ) > 0:
1176 reported_pkgs.add(src)
1177 tests.append((src, ver))
1179 extra_bins = []
1180 # Debian doesn't have linux-meta, but Ubuntu does
1181 # Hack: For new kernels trigger all DKMS packages by pretending that
1182 # linux-meta* builds a "dkms" binary as well. With that we ensure that we
1183 # don't regress DKMS drivers with new kernel versions.
1184 if src.startswith("linux-meta"):
1185 # does this have any image on this arch?
1186 for pkg_id in srcinfo.binaries:
1187 if pkg_id.architecture == arch and "-image" in pkg_id.package_name:
1188 try:
1189 extra_bins.append(binaries_info["dkms"].pkg_id)
1190 except KeyError:
1191 pass
1193 if not self.has_built_on_this_arch_or_is_arch_all(srcinfo, arch):
1194 return []
1196 pkg_universe = self.britney.pkg_universe
1197 # plus all direct reverse dependencies and test triggers of its
1198 # binaries which have an autopkgtest
1199 for binary in itertools.chain(srcinfo.binaries, extra_bins):
1200 rdeps = filter_out_faux(pkg_universe.reverse_dependencies_of(binary))
1201 for rdep in rdeps:
1202 try:
1203 rdep_src = binaries_info[rdep.package_name].source
1204 # Don't re-trigger the package itself here; this should
1205 # have been done above if the package still continues to
1206 # have an autopkgtest in unstable.
1207 if rdep_src == src:
1208 continue
1209 except KeyError:
1210 continue
1212 rdep_src_info = sources_info[rdep_src]
1213 if "autopkgtest" in rdep_src_info.testsuite or self.has_autodep8(
1214 rdep_src_info
1215 ):
1216 if rdep_src not in reported_pkgs:
1217 tests.append((rdep_src, rdep_src_info.version))
1218 reported_pkgs.add(rdep_src)
1220 for tdep_src in self.testsuite_triggers.get(binary.package_name, set()):
1221 if tdep_src not in reported_pkgs:
1222 try:
1223 tdep_src_info = sources_info[tdep_src]
1224 except KeyError:
1225 continue
1226 if "autopkgtest" in tdep_src_info.testsuite or self.has_autodep8( 1226 ↛ 1220line 1226 didn't jump to line 1220 because the condition on line 1226 was always true
1227 tdep_src_info
1228 ):
1229 for pkg_id in tdep_src_info.binaries: 1229 ↛ 1220line 1229 didn't jump to line 1220 because the loop on line 1229 didn't complete
1230 if pkg_id.architecture == arch:
1231 tests.append((tdep_src, tdep_src_info.version))
1232 reported_pkgs.add(tdep_src)
1233 break
1235 tests.sort(key=lambda s_v: s_v[0])
1236 return tests
1238 def read_pending_tests(self) -> None:
1239 """Read pending test requests from previous britney runs
1241 Initialize self.pending_tests with that data.
1242 """
1243 assert self.pending_tests is None, "already initialized"
1244 if not os.path.exists(self.pending_tests_file):
1245 self.logger.info(
1246 "No %s, starting with no pending tests", self.pending_tests_file
1247 )
1248 self.pending_tests = {}
1249 return
1250 with open(self.pending_tests_file) as f:
1251 self.pending_tests = json.load(f)
1252 if VERSION_KEY in self.pending_tests:
1253 del self.pending_tests[VERSION_KEY]
1254 for trigger in list(self.pending_tests.keys()):
1255 for pkg in list(self.pending_tests[trigger].keys()):
1256 arch_dict = self.pending_tests[trigger][pkg]
1257 for arch in list(arch_dict.keys()):
1258 if (
1259 self._now - arch_dict[arch]
1260 > self.options.adt_pending_max_age
1261 ):
1262 del arch_dict[arch]
1263 if not arch_dict:
1264 del self.pending_tests[trigger][pkg]
1265 if not self.pending_tests[trigger]:
1266 del self.pending_tests[trigger]
1267 else:
1268 # Migration code:
1269 for trigger_data in self.pending_tests.values(): 1269 ↛ 1270line 1269 didn't jump to line 1270 because the loop on line 1269 never started
1270 for pkg, arch_list in trigger_data.items():
1271 trigger_data[pkg] = {}
1272 for arch in arch_list:
1273 trigger_data[pkg][arch] = self._now
1275 self.logger.info(
1276 "Read pending requested tests from %s", self.pending_tests_file
1277 )
1278 self.logger.debug("%s", self.pending_tests)
1280 # this requires iterating over all triggers and thus is expensive;
1281 # cache the results
1282 @lru_cache(None)
1283 def latest_run_for_package(self, src: str, arch: str) -> str:
1284 """Return latest run ID for src on arch"""
1286 latest_run_id = ""
1287 for srcmap in self.test_results.values():
1288 try:
1289 run_id = srcmap[src][arch][2]
1290 except KeyError:
1291 continue
1292 if run_id > latest_run_id:
1293 latest_run_id = run_id
1294 return latest_run_id
1296 def urlopen_retry(self, url: str) -> http.client.HTTPResponse | addinfourl:
1297 """A urlopen() that retries on time outs or errors"""
1299 exc: Exception
1300 for retry in range(5): 1300 ↛ 1321line 1300 didn't jump to line 1321 because the loop on line 1300 didn't complete
1301 try:
1302 req = urlopen(url, timeout=30)
1303 code = req.getcode()
1304 if not code or 200 <= code < 300: 1304 ↛ 1300line 1304 didn't jump to line 1300 because the condition on line 1304 was always true
1305 return req # type: ignore[no-any-return]
1306 except TimeoutError as e: 1306 ↛ 1307line 1306 didn't jump to line 1307 because the exception caught by line 1306 didn't happen
1307 self.logger.info(
1308 "Timeout downloading '%s', will retry %d more times."
1309 % (url, 5 - retry - 1)
1310 )
1311 exc = e
1312 except HTTPError as e:
1313 if e.code not in (503, 502): 1313 ↛ 1315line 1313 didn't jump to line 1315 because the condition on line 1313 was always true
1314 raise
1315 self.logger.info(
1316 "Caught error %d downloading '%s', will retry %d more times."
1317 % (e.code, url, 5 - retry - 1)
1318 )
1319 exc = e
1320 else:
1321 raise exc
1323 @lru_cache(None)
1324 def fetch_swift_results(self, swift_url: str, src: str, arch: str) -> None:
1325 """Download new results for source package/arch from swift"""
1327 # prepare query: get all runs with a timestamp later than the latest
1328 # run_id for this package/arch; '@' is at the end of each run id, to
1329 # mark the end of a test run directory path
1330 # example: <autopkgtest-wily>wily/amd64/libp/libpng/20150630_054517@/result.tar
1331 query = {
1332 "delimiter": "@",
1333 "prefix": f"{self.options.series}/{arch}/{srchash(src)}/{src}/",
1334 }
1336 # determine latest run_id from results
1337 if not self.options.adt_shared_results_cache:
1338 latest_run_id = self.latest_run_for_package(src, arch)
1339 if latest_run_id:
1340 query["marker"] = query["prefix"] + latest_run_id
1342 # request new results from swift
1343 url = os.path.join(swift_url, self.swift_container)
1344 url += "?" + urllib.parse.urlencode(query)
1345 f = None
1346 try:
1347 f = self.urlopen_retry(url)
1348 if f.getcode() == 200:
1349 result_paths = f.read().decode().strip().splitlines()
1350 elif f.getcode() == 204: # No content 1350 ↛ 1356line 1350 didn't jump to line 1356 because the condition on line 1350 was always true
1351 result_paths = []
1352 else:
1353 # we should not ever end up here as we expect a HTTPError in
1354 # other cases; e. g. 3XX is something that tells us to adjust
1355 # our URLS, so fail hard on those
1356 raise NotImplementedError(
1357 "fetch_swift_results(%s): cannot handle HTTP code %r"
1358 % (url, f.getcode())
1359 )
1360 except OSError as e:
1361 # 401 "Unauthorized" is swift's way of saying "container does not exist"
1362 if getattr(e, "code", -1) == 401: 1362 ↛ 1371line 1362 didn't jump to line 1371 because the condition on line 1362 was always true
1363 self.logger.info(
1364 "fetch_swift_results: %s does not exist yet or is inaccessible", url
1365 )
1366 return
1367 # Other status codes are usually a transient
1368 # network/infrastructure failure. Ignoring this can lead to
1369 # re-requesting tests which we already have results for, so
1370 # fail hard on this and let the next run retry.
1371 self.logger.error("Failure to fetch swift results from %s: %s", url, str(e))
1372 sys.exit(1)
1373 finally:
1374 if f is not None: 1374 ↛ 1377line 1374 didn't jump to line 1377 because the condition on line 1374 was always true
1375 f.close() 1375 ↛ exitline 1375 didn't return from function 'fetch_swift_results' because the return on line 1366 wasn't executed
1377 for p in result_paths:
1378 self.fetch_one_result(
1379 os.path.join(swift_url, self.swift_container, p, "result.tar"),
1380 src,
1381 arch,
1382 )
1384 def fetch_one_result(self, url: str, src: str, arch: str) -> None:
1385 """Download one result URL for source/arch
1387 Remove matching pending_tests entries.
1388 """
1389 f = None
1390 try:
1391 f = self.urlopen_retry(url)
1392 if f.getcode() == 200: 1392 ↛ 1395line 1392 didn't jump to line 1395 because the condition on line 1392 was always true
1393 tar_bytes = io.BytesIO(f.read())
1394 else:
1395 raise NotImplementedError(
1396 "fetch_one_result(%s): cannot handle HTTP code %r"
1397 % (url, f.getcode())
1398 )
1399 except OSError as err:
1400 self.logger.error("Failure to fetch %s: %s", url, str(err))
1401 # we tolerate "not found" (something went wrong on uploading the
1402 # result), but other things indicate infrastructure problems
1403 if getattr(err, "code", -1) == 404:
1404 return
1405 sys.exit(1)
1406 finally:
1407 if f is not None: 1407 ↛ exit, 1407 ↛ 14092 missed branches: 1) line 1407 didn't return from function 'fetch_one_result' because the return on line 1404 wasn't executed, 2) line 1407 didn't jump to line 1409 because the condition on line 1407 was always true
1408 f.close() 1408 ↛ exitline 1408 didn't return from function 'fetch_one_result' because the return on line 1404 wasn't executed
1409 try:
1410 with tarfile.open(None, "r", tar_bytes) as tar:
1411 exitcode = int(tar.extractfile("exitcode").read().strip()) # type: ignore[union-attr]
1412 srcver = tar.extractfile("testpkg-version").read().decode().strip() # type: ignore[union-attr]
1413 (ressrc, ver) = srcver.split()
1414 testinfo = json.loads(tar.extractfile("testinfo.json").read().decode()) # type: ignore[union-attr]
1415 except (KeyError, ValueError, tarfile.TarError) as err:
1416 self.logger.error("%s is damaged, ignoring: %s", url, str(err))
1417 # ignore this; this will leave an orphaned request in autopkgtest-pending.json
1418 # and thus require manual retries after fixing the tmpfail, but we
1419 # can't just blindly attribute it to some pending test.
1420 return
1422 if src != ressrc: 1422 ↛ 1423line 1422 didn't jump to line 1423 because the condition on line 1422 was never true
1423 self.logger.error(
1424 "%s is a result for package %s, but expected package %s",
1425 url,
1426 ressrc,
1427 src,
1428 )
1429 return
1431 # parse recorded triggers in test result
1432 for e in testinfo.get("custom_environment", []): 1432 ↛ 1437line 1432 didn't jump to line 1437 because the loop on line 1432 didn't complete
1433 if e.startswith("ADT_TEST_TRIGGERS="): 1433 ↛ 1432line 1433 didn't jump to line 1432 because the condition on line 1433 was always true
1434 result_triggers = [i for i in e.split("=", 1)[1].split() if "/" in i]
1435 break
1436 else:
1437 self.logger.error("%s result has no ADT_TEST_TRIGGERS, ignoring")
1438 return
1440 run_id = os.path.basename(os.path.dirname(url))
1441 seen = round(calendar.timegm(time.strptime(run_id, "%Y%m%d_%H%M%S@")))
1442 # allow some skipped tests, but nothing else
1443 if exitcode in [0, 2]:
1444 result = Result.PASS
1445 elif exitcode == 8: 1445 ↛ 1446line 1445 didn't jump to line 1446 because the condition on line 1445 was never true
1446 result = Result.NEUTRAL
1447 else:
1448 result = Result.FAIL
1450 self.logger.info(
1451 "Fetched test result for %s/%s/%s %s (triggers: %s): %s",
1452 src,
1453 ver,
1454 arch,
1455 run_id,
1456 result_triggers,
1457 result.name.lower(),
1458 )
1460 # remove matching test requests
1461 for trigger in result_triggers:
1462 self.remove_from_pending(trigger, src, arch)
1464 # add this result
1465 for trigger in result_triggers:
1466 self.add_trigger_to_results(trigger, src, ver, arch, run_id, seen, result)
1468 def remove_from_pending(
1469 self, trigger: str, src: str, arch: str, timestamp: int = sys.maxsize
1470 ) -> None:
1471 assert self.pending_tests is not None # for type checking
1472 try:
1473 arch_dict = self.pending_tests[trigger][src]
1474 if timestamp < arch_dict[arch]:
1475 # The result is from before the moment of scheduling, so it's
1476 # not the one we're waiting for
1477 return
1478 del arch_dict[arch]
1479 if not arch_dict:
1480 del self.pending_tests[trigger][src]
1481 if not self.pending_tests[trigger]:
1482 del self.pending_tests[trigger]
1483 self.logger.debug(
1484 "-> matches pending request %s/%s for trigger %s", src, arch, trigger
1485 )
1486 except KeyError:
1487 self.logger.debug(
1488 "-> does not match any pending request for %s/%s", src, arch
1489 )
1491 def add_trigger_to_results(
1492 self,
1493 trigger: str,
1494 src: str,
1495 ver: str,
1496 arch: str,
1497 run_id: str,
1498 timestamp: int,
1499 status_to_add: Result,
1500 ) -> None:
1501 # Ensure that we got a new enough version
1502 parts = trigger.split("/")
1503 match len(parts):
1504 case 2:
1505 trigsrc, trigver = parts
1506 case 4: 1506 ↛ 1508line 1506 didn't jump to line 1508 because the pattern on line 1506 always matched
1507 trigsrc, trigarch, trigver, rebuild = parts
1508 case _:
1509 self.logger.info("Ignoring invalid test trigger %s", trigger)
1510 return
1511 if trigsrc == src and apt_pkg.version_compare(ver, trigver) < 0: 1511 ↛ 1512line 1511 didn't jump to line 1512 because the condition on line 1511 was never true
1512 self.logger.debug(
1513 "test trigger %s, but run for older version %s, ignoring", trigger, ver
1514 )
1515 return
1517 stored_result = (
1518 self.test_results.setdefault(trigger, {})
1519 .setdefault(src, {})
1520 .setdefault(arch, [Result.FAIL, None, "", 0])
1521 )
1523 # reruns shouldn't flip the result from PASS or NEUTRAL to
1524 # FAIL, so remember the most recent version of the best result
1525 # we've seen. Except for reference updates, which we always
1526 # want to update with the most recent result. The result data
1527 # may not be ordered by timestamp, so we need to check time.
1528 update = False
1529 if self.options.adt_baseline == "reference" and trigger == REF_TRIG:
1530 if stored_result[3] < timestamp:
1531 update = True
1532 elif status_to_add < stored_result[0]:
1533 update = True
1534 elif status_to_add == stored_result[0] and stored_result[3] < timestamp:
1535 update = True
1537 if update:
1538 stored_result[0] = status_to_add
1539 stored_result[1] = ver
1540 stored_result[2] = run_id
1541 stored_result[3] = timestamp
1543 def send_test_request(
1544 self, src: str, arch: str, triggers: list[str], huge: bool = False
1545 ) -> None:
1546 """Send out AMQP request for testing src/arch for triggers
1548 If huge is true, then the request will be put into the -huge instead of
1549 normal queue.
1550 """
1551 if self.options.dry_run: 1551 ↛ 1552line 1551 didn't jump to line 1552 because the condition on line 1551 was never true
1552 return
1554 params: dict[str, Any] = {"triggers": triggers}
1555 if self.options.adt_ppas:
1556 params["ppas"] = self.options.adt_ppas
1557 qname = f"debci-ppa-{self.options.series}-{arch}"
1558 elif huge:
1559 qname = f"debci-huge-{self.options.series}-{arch}"
1560 else:
1561 qname = f"debci-{self.options.series}-{arch}"
1562 params["submit-time"] = time.strftime("%Y-%m-%d %H:%M:%S%z", time.gmtime())
1564 if self.amqp_channel: 1564 ↛ 1565line 1564 didn't jump to line 1565 because the condition on line 1564 was never true
1565 self.amqp_channel.basic_publish(
1566 amqp.Message(
1567 src + "\n" + json.dumps(params), delivery_mode=2
1568 ), # persistent
1569 routing_key=qname,
1570 )
1571 # we save pending.json with every request, so that if britney
1572 # crashes we don't re-request tests. This is only needed when using
1573 # real amqp, as with file-based submission the pending tests are
1574 # returned by debci along with the results each run.
1575 self.save_pending_json()
1576 else:
1577 # for file-based submission, triggers are space separated
1578 params["triggers"] = [" ".join(params["triggers"])]
1579 assert self.amqp_file_handle
1580 self.amqp_file_handle.write(f"{qname}:{src} {json.dumps(params)}\n")
1582 def pkg_test_request(
1583 self, src: str, arch: str, all_triggers: list[str], huge: bool = False
1584 ) -> None:
1585 """Request one package test for a set of triggers
1587 all_triggers is a list of "pkgname/version". These are the packages
1588 that will be taken from the source suite. The first package in this
1589 list is the package that triggers the testing of src, the rest are
1590 additional packages required for installability of the test deps. If
1591 huge is true, then the request will be put into the -huge instead of
1592 normal queue.
1594 This will only be done if that test wasn't already requested in
1595 a previous run (i. e. if it's not already in self.pending_tests)
1596 or if there is already a fresh or a positive result for it. This
1597 ensures to download current results for this package before
1598 requesting any test."""
1599 trigger = all_triggers[0]
1600 uses_swift = not self.options.adt_swift_url.startswith("file://")
1601 try:
1602 result = self.test_results[trigger][src][arch]
1603 has_result = True
1604 except KeyError:
1605 has_result = False
1607 if has_result:
1608 result_state = result[0]
1609 if result_state in {Result.OLD_PASS, Result.OLD_FAIL, Result.OLD_NEUTRAL}:
1610 pass
1611 elif (
1612 result_state == Result.FAIL
1613 and self.result_in_baseline(src, arch)[0]
1614 in {Result.PASS, Result.NEUTRAL, Result.OLD_PASS, Result.OLD_NEUTRAL}
1615 and self._now - result[3] > self.options.adt_retry_older_than
1616 ):
1617 # We might want to retry this failure, so continue
1618 pass
1619 elif not uses_swift:
1620 # We're done if we don't retrigger and we're not using swift
1621 return
1622 elif result_state in {Result.PASS, Result.NEUTRAL}:
1623 self.logger.debug(
1624 "%s/%s triggered by %s already known", src, arch, trigger
1625 )
1626 return
1628 # Without swift we don't expect new results
1629 if uses_swift:
1630 self.logger.info(
1631 "Checking for new results for failed %s/%s for trigger %s",
1632 src,
1633 arch,
1634 trigger,
1635 )
1636 self.fetch_swift_results(self.options.adt_swift_url, src, arch)
1637 # do we have one now?
1638 try:
1639 self.test_results[trigger][src][arch]
1640 return
1641 except KeyError:
1642 pass
1644 self.request_test_if_not_queued(src, arch, trigger, all_triggers, huge=huge)
1646 def request_test_if_not_queued(
1647 self,
1648 src: str,
1649 arch: str,
1650 trigger: str,
1651 all_triggers: list[str] = [],
1652 huge: bool = False,
1653 ) -> None:
1654 assert self.pending_tests is not None # for type checking
1655 if not all_triggers:
1656 all_triggers = [trigger]
1658 # Don't re-request if it's already pending
1659 arch_dict = self.pending_tests.setdefault(trigger, {}).setdefault(src, {})
1660 if arch in arch_dict.keys():
1661 self.logger.debug(
1662 "Test %s/%s for %s is already pending, not queueing", src, arch, trigger
1663 )
1664 else:
1665 self.logger.debug(
1666 "Requesting %s autopkgtest on %s to verify %s", src, arch, trigger
1667 )
1668 arch_dict[arch] = self._now
1669 self.send_test_request(src, arch, all_triggers, huge=huge)
1671 def result_in_baseline(self, src: str, arch: str) -> list[Any]:
1672 """Get the result for src on arch in the baseline
1674 The baseline is optionally all data or a reference set)
1675 """
1677 # this requires iterating over all cached results and thus is expensive;
1678 # cache the results
1679 try:
1680 return self.result_in_baseline_cache[src][arch]
1681 except KeyError:
1682 pass
1684 result_reference: list[Any] = [Result.NONE, None, "", 0]
1685 if self.options.adt_baseline == "reference":
1686 if src not in self.suite_info.target_suite.sources: 1686 ↛ 1687line 1686 didn't jump to line 1687 because the condition on line 1686 was never true
1687 return result_reference
1689 try:
1690 result_reference = self.test_results[REF_TRIG][src][arch]
1691 self.logger.debug(
1692 "Found result for src %s in reference: %s",
1693 src,
1694 result_reference[0].name,
1695 )
1696 except KeyError:
1697 self.logger.debug(
1698 "Found NO result for src %s in reference: %s",
1699 src,
1700 result_reference[0].name,
1701 )
1702 self.result_in_baseline_cache[src][arch] = deepcopy(result_reference)
1703 return result_reference
1705 result_ever: list[Any] = [Result.FAIL, None, "", 0]
1706 for srcmap in self.test_results.values():
1707 try:
1708 if srcmap[src][arch][0] != Result.FAIL:
1709 result_ever = srcmap[src][arch]
1710 # If we are not looking at a reference run, We don't really
1711 # care about anything except the status, so we're done
1712 # once we find a PASS.
1713 if result_ever[0] == Result.PASS:
1714 break
1715 except KeyError:
1716 pass
1718 self.result_in_baseline_cache[src][arch] = deepcopy(result_ever)
1719 self.logger.debug("Result for src %s ever: %s", src, result_ever[0].name)
1720 return result_ever
1722 def has_test_in_target(self, src: str) -> bool:
1723 test_in_target = False
1724 try:
1725 srcinfo = self.suite_info.target_suite.sources[src]
1726 if "autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo):
1727 test_in_target = True
1728 # AttributeError is only needed for the test suite as
1729 # srcinfo can be a NoneType
1730 except (KeyError, AttributeError):
1731 pass
1733 return test_in_target
1735 def pkg_test_result(
1736 self, src: str, ver: str, arch: str, trigger: str
1737 ) -> tuple[str, str, str | None, str]:
1738 """Get current test status of a particular package
1740 Return (status, real_version, run_id, log_url) tuple; status is a key in
1741 EXCUSES_LABELS. run_id is None if the test is still running.
1742 """
1743 assert self.pending_tests is not None # for type checking
1744 # determine current test result status
1745 run_id = None
1746 try:
1747 r = self.test_results[trigger][src][arch]
1748 ver = r[1]
1749 run_id = r[2]
1751 if r[0] in {Result.FAIL, Result.OLD_FAIL}:
1752 # determine current test result status
1753 baseline_result = self.result_in_baseline(src, arch)[0]
1755 # Special-case triggers from linux-meta*: we cannot compare
1756 # results against different kernels, as e. g. a DKMS module
1757 # might work against the default kernel but fail against a
1758 # different flavor; so for those, ignore the "ever
1759 # passed" check; FIXME: check against trigsrc only
1760 if self.options.adt_baseline != "reference" and (
1761 trigger.startswith("linux-meta") or trigger.startswith("linux/")
1762 ):
1763 baseline_result = Result.FAIL
1765 # Check if the autopkgtest (still) exists in the target suite
1766 test_in_target = self.has_test_in_target(src)
1768 if test_in_target and baseline_result in {
1769 Result.NONE,
1770 Result.OLD_FAIL,
1771 Result.OLD_NEUTRAL,
1772 Result.OLD_PASS,
1773 }:
1774 self.request_test_if_not_queued(src, arch, REF_TRIG)
1776 if self.has_force_badtest(src, ver, arch):
1777 result = "IGNORE-FAIL"
1778 elif not test_in_target:
1779 if self.options.adt_ignore_failure_for_new_tests:
1780 result = "IGNORE-FAIL"
1781 else:
1782 result = r[0].name
1783 elif baseline_result in {Result.FAIL, Result.OLD_FAIL}:
1784 result = "ALWAYSFAIL"
1785 elif baseline_result == Result.NONE: 1785 ↛ 1786line 1785 didn't jump to line 1786 because the condition on line 1785 was never true
1786 result = "RUNNING-REFERENCE"
1787 else:
1788 result = "REGRESSION"
1790 else:
1791 result = r[0].name
1793 url = self.format_log_url(src, arch, run_id)
1794 except KeyError:
1795 # no result for src/arch; still running?
1796 assert arch in self.pending_tests.get(trigger, {}).get(src, {}).keys(), (
1797 "Result for %s/%s/%s (triggered by %s) is neither known nor pending!"
1798 % (src, ver, arch, trigger)
1799 )
1801 if self.has_force_badtest(src, ver, arch):
1802 result = "RUNNING-IGNORE"
1803 else:
1804 if self.has_test_in_target(src):
1805 baseline_result = self.result_in_baseline(src, arch)[0]
1806 if baseline_result == Result.FAIL:
1807 result = "RUNNING-ALWAYSFAIL"
1808 else:
1809 result = "RUNNING"
1810 else:
1811 if self.options.adt_ignore_failure_for_new_tests:
1812 result = "RUNNING-IGNORE"
1813 else:
1814 result = "RUNNING"
1815 url = self.options.adt_ci_url + "status/pending"
1817 return (result, ver, run_id, url)
1819 def has_force_badtest(self, src: str, ver: str, arch: str) -> bool:
1820 """Check if src/ver/arch has a force-badtest hint"""
1822 assert self.hints is not None
1823 hints = self.hints.search("force-badtest", package=src)
1824 if hints:
1825 self.logger.info(
1826 "Checking hints for %s/%s/%s: %s",
1827 src,
1828 arch,
1829 ver,
1830 [str(h) for h in hints],
1831 )
1832 for hint in hints:
1833 if [
1834 mi
1835 for mi in hint.packages
1836 if mi.architecture in ["source", arch]
1837 and (
1838 mi.version is None
1839 or mi.version == "all" # Historical unversioned hint
1840 or apt_pkg.version_compare(ver, mi.version) <= 0
1841 )
1842 ]:
1843 return True
1845 return False
1847 def has_built_on_this_arch_or_is_arch_all(
1848 self, src_data: SourcePackage, arch: str
1849 ) -> bool:
1850 """When a source builds arch:all binaries, those binaries are
1851 added to all architectures and thus the source 'exists'
1852 everywhere. This function checks if the source has any arch
1853 specific binaries on this architecture and if not, if it
1854 has them on any architecture.
1855 """
1856 packages_s_a = self.suite_info.primary_source_suite.binaries[arch]
1857 has_unknown_binary = False
1858 for binary_s in filter_out_faux(src_data.binaries):
1859 try:
1860 binary_u = packages_s_a[binary_s.package_name]
1861 except KeyError:
1862 # src_data.binaries has all the built binaries, so if
1863 # we get here, we know that at least one architecture
1864 # has architecture specific binaries
1865 has_unknown_binary = True
1866 continue
1867 if binary_u.architecture == arch:
1868 return True
1869 # If we get here, we have only seen arch:all packages for this
1870 # arch.
1871 return not has_unknown_binary