Coverage for britney2/policies/autopkgtest.py: 91%
856 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-04-19 18:02 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-04-19 18:02 +0000
1# Copyright (C) 2013 - 2016 Canonical Ltd.
2# Authors:
3# Colin Watson <cjwatson@ubuntu.com>
4# Jean-Baptiste Lallement <jean-baptiste.lallement@canonical.com>
5# Martin Pitt <martin.pitt@ubuntu.com>
7# This program is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
17import calendar
18import collections
19import http.client
20import io
21import itertools
22import json
23import optparse
24import os
25import sys
26import tarfile
27import time
28import urllib.parse
29from collections.abc import Iterator
30from copy import deepcopy
31from enum import Enum
32from functools import lru_cache, total_ordering
33from typing import TYPE_CHECKING, Any, Optional, cast
34from urllib.error import HTTPError
35from urllib.request import urlopen
36from urllib.response import addinfourl
38import apt_pkg
40from britney2 import (
41 BinaryPackageId,
42 PackageId,
43 SourcePackage,
44 SuiteClass,
45 Suites,
46 TargetSuite,
47)
48from britney2.hints import HintAnnotate, HintType
49from britney2.migrationitem import MigrationItem
50from britney2.policies import PolicyVerdict
51from britney2.policies.policy import AbstractBasePolicy
52from britney2.utils import (
53 binaries_from_source_version,
54 filter_out_faux,
55 get_dependency_solvers,
56 iter_except,
57 parse_option,
58)
60if TYPE_CHECKING: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true
61 import amqplib.client_0_8 as amqp
63 from ..britney import Britney
64 from ..excuse import Excuse
65 from ..hints import HintParser
68@total_ordering
69class Result(Enum):
70 PASS = 1
71 NEUTRAL = 2
72 FAIL = 3
73 OLD_PASS = 4
74 OLD_NEUTRAL = 5
75 OLD_FAIL = 6
76 NONE = 7
78 def __lt__(self, other: "Result") -> bool:
79 return True if self.value < other.value else False
82EXCUSES_LABELS = {
83 "PASS": '<span style="background:#87d96c">Pass</span>',
84 "OLD_PASS": '<span style="background:#87d96c">Pass</span>',
85 "NEUTRAL": "No tests, superficial or marked flaky",
86 "OLD_NEUTRAL": "No tests, superficial or marked flaky",
87 "FAIL": '<span style="background:#ff6666">Failed</span>',
88 "OLD_FAIL": '<span style="background:#ff6666">Failed</span>',
89 "ALWAYSFAIL": '<span style="background:#e5c545">Failed (not a regression)</span>',
90 "REGRESSION": '<span style="background:#ff6666">Regression</span>',
91 "IGNORE-FAIL": '<span style="background:#e5c545">Ignored failure</span>',
92 "RUNNING": '<span style="background:#99ddff">Test triggered</span>',
93 "RUNNING-ALWAYSFAIL": "Test triggered (will not be considered a regression)",
94 "RUNNING-IGNORE": "Test triggered (failure will be ignored)",
95 "RUNNING-REFERENCE": '<span style="background:#ff6666">Reference test triggered, but real test failed already</span>',
96 "DEFERRED": '<span style="background:#99ddff">Test deferred</span>',
97}
99REF_TRIG = "migration-reference/0"
101VERSION_KEY = "britney-autopkgtest-pending-file-version"
104def srchash(src: str) -> str:
105 """archive hash prefix for source package"""
107 if src.startswith("lib"): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 return src[:4]
109 else:
110 return src[0]
113def added_pkgs_compared_to_target_suite(
114 package_ids: frozenset[BinaryPackageId],
115 target_suite: TargetSuite,
116 *,
117 invert: bool = False,
118) -> Iterator[BinaryPackageId]:
119 if invert: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true
120 pkgs_ids_to_ignore = package_ids - set(
121 target_suite.which_of_these_are_in_the_suite(package_ids)
122 )
123 names_ignored = {p.package_name for p in pkgs_ids_to_ignore}
124 else:
125 names_ignored = {
126 p.package_name
127 for p in target_suite.which_of_these_are_in_the_suite(package_ids)
128 }
129 yield from (p for p in package_ids if p.package_name not in names_ignored)
132def all_leaf_results(
133 test_results: dict[str, dict[str, dict[str, list[Any]]]],
134) -> Iterator[list[Any]]:
135 for trigger in test_results.values():
136 for arch in trigger.values():
137 yield from arch.values()
140def mark_result_as_old(result: Result) -> Result:
141 """Convert current result into corresponding old result"""
143 if result == Result.FAIL:
144 result = Result.OLD_FAIL
145 elif result == Result.PASS:
146 result = Result.OLD_PASS
147 elif result == Result.NEUTRAL: 147 ↛ 149line 147 didn't jump to line 149 because the condition on line 147 was always true
148 result = Result.OLD_NEUTRAL
149 return result
152def concat_bdeps(src_data: SourcePackage) -> str:
153 """Concatenate build_deps_arch and build_deps_indep"""
154 return ",".join(
155 (src_data.build_deps_arch or "", src_data.build_deps_indep or "")
156 ).strip(",")
159class AutopkgtestPolicy(AbstractBasePolicy):
160 """autopkgtest regression policy for source migrations
162 Run autopkgtests for the excuse and all of its reverse dependencies, and
163 reject the upload if any of those regress.
164 """
166 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
167 super().__init__(
168 "autopkgtest", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
169 )
170 # tests requested in this and previous runs
171 # trigger -> src -> [arch]
172 self.pending_tests: dict[str, dict[str, dict[str, int]]] | None = None
173 self.pending_tests_file = os.path.join(
174 self.state_dir, "autopkgtest-pending.json"
175 )
176 self.testsuite_triggers: dict[str, set[str]] = {}
177 self.result_in_baseline_cache: dict[str, dict[str, list[Any]]] = (
178 collections.defaultdict(dict)
179 )
181 self.amqp_file_handle: io.TextIOWrapper | None = None
183 # Default values for this policy's options
184 parse_option(options, "adt_baseline")
185 parse_option(options, "adt_huge", to_int=True)
186 parse_option(options, "adt_ppas")
187 parse_option(options, "adt_reference_max_age", day_to_sec=True)
188 parse_option(options, "adt_pending_max_age", default=5, day_to_sec=True)
189 parse_option(options, "adt_regression_penalty", default=0, to_int=True)
190 parse_option(options, "adt_log_url") # see below for defaults
191 parse_option(options, "adt_retry_url") # see below for defaults
192 parse_option(options, "adt_retry_older_than", day_to_sec=True)
193 parse_option(options, "adt_results_cache_age", day_to_sec=True)
194 parse_option(options, "adt_shared_results_cache")
195 parse_option(options, "adt_success_bounty", default=0, to_int=True)
196 parse_option(options, "adt_ignore_failure_for_new_tests", to_bool=True)
198 # When ADT_RESULTS_CACHE_AGE is smaller than or equal to
199 # ADT_REFERENCE_MAX_AGE old reference result will be removed from cache
200 # before the newly scheduled results are in, potentially causing
201 # additional waiting. For packages like glibc this might cause an
202 # infinite delay as there will always be a package that's
203 # waiting. Similarly for ADT_RETRY_OLDER_THAN.
204 if self.options.adt_results_cache_age <= self.options.adt_reference_max_age:
205 self.logger.warning(
206 "Unexpected: ADT_REFERENCE_MAX_AGE bigger than ADT_RESULTS_CACHE_AGE"
207 )
208 if self.options.adt_results_cache_age <= self.options.adt_retry_older_than:
209 self.logger.warning(
210 "Unexpected: ADT_RETRY_OLDER_THAN bigger than ADT_RESULTS_CACHE_AGE"
211 )
213 if not self.options.adt_log_url: 213 ↛ 239line 213 didn't jump to line 239 because the condition on line 213 was always true
214 # Historical defaults
215 if self.options.adt_swift_url.startswith("file://"):
216 self.options.adt_log_url = os.path.join(
217 self.options.adt_ci_url,
218 "data",
219 "autopkgtest",
220 self.options.series,
221 "{arch}",
222 "{hash}",
223 "{package}",
224 "{run_id}",
225 "log.gz",
226 )
227 else:
228 self.options.adt_log_url = os.path.join(
229 self.options.adt_swift_url,
230 "{swift_container}",
231 self.options.series,
232 "{arch}",
233 "{hash}",
234 "{package}",
235 "{run_id}",
236 "log.gz",
237 )
239 if hasattr(self.options, "adt_retry_url_mech"): 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 self.logger.warning(
241 "The ADT_RETRY_URL_MECH configuration has been deprecated."
242 )
243 self.logger.warning(
244 "Instead britney now supports ADT_RETRY_URL for more flexibility."
245 )
246 if self.options.adt_retry_url:
247 self.logger.error(
248 "Please remove the ADT_RETRY_URL_MECH as ADT_RETRY_URL will be used."
249 )
250 elif self.options.adt_retry_url_mech == "run_id":
251 self.options.adt_retry_url = (
252 self.options.adt_ci_url + "api/v1/retry/{run_id}"
253 )
254 if not self.options.adt_retry_url: 254 ↛ 271line 254 didn't jump to line 271 because the condition on line 254 was always true
255 # Historical default
256 self.options.adt_retry_url = (
257 self.options.adt_ci_url
258 + "request.cgi?"
259 + "release={release}&arch={arch}&package={package}&trigger={trigger}{ppas}"
260 )
262 # results map: trigger -> src -> arch -> [passed, version, run_id, seen]
263 # - trigger is "source/version" of an unstable package that triggered
264 # this test run.
265 # - "passed" is a Result
266 # - "version" is the package version of "src" of that test
267 # - "run_id" is an opaque ID that identifies a particular test run for
268 # a given src/arch.
269 # - "seen" is an approximate time stamp of the test run. How this is
270 # deduced depends on the interface used.
271 self.test_results: dict[str, dict[str, dict[str, list[Any]]]] = {}
272 if self.options.adt_shared_results_cache:
273 self.results_cache_file = self.options.adt_shared_results_cache
274 else:
275 self.results_cache_file = os.path.join(
276 self.state_dir, "autopkgtest-results.cache"
277 )
279 try:
280 self.options.adt_ppas = self.options.adt_ppas.strip().split()
281 except AttributeError:
282 self.options.adt_ppas = []
284 self.swift_container = "autopkgtest-" + options.series
285 if self.options.adt_ppas:
286 self.swift_container += "-" + options.adt_ppas[-1].replace("/", "-")
288 # restrict adt_arches to architectures we actually run for
289 self.adt_arches = []
290 for arch in self.options.adt_arches.split():
291 if arch in self.options.architectures:
292 self.adt_arches.append(arch)
293 else:
294 self.logger.info(
295 "Ignoring ADT_ARCHES %s as it is not in architectures list", arch
296 )
298 def __del__(self) -> None:
299 if self.amqp_file_handle: 299 ↛ exitline 299 didn't return from function '__del__' because the condition on line 299 was always true
300 try:
301 self.amqp_file_handle.close()
302 except AttributeError:
303 pass
305 def register_hints(self, hint_parser: "HintParser") -> None:
306 hint_parser.register_hint_type(
307 HintType(
308 "force-badtest",
309 versioned=HintAnnotate.OPTIONAL,
310 architectured=HintAnnotate.OPTIONAL,
311 )
312 )
313 hint_parser.register_hint_type(HintType("force-skiptest"))
315 def initialise(self, britney: "Britney") -> None:
316 super().initialise(britney)
317 # We want to use the "current" time stamp in multiple locations
318 time_now = round(time.time())
319 if hasattr(self.options, "fake_runtime"):
320 time_now = int(self.options.fake_runtime)
321 self._now = time_now
323 # local copies for better performance
324 parse_src_depends = apt_pkg.parse_src_depends
326 # compute inverse Testsuite-Triggers: map, unifying all series
327 self.logger.info("Building inverse testsuite_triggers map")
328 for suite in self.suite_info:
329 for src, data in suite.sources.items():
330 # for now, let's assume that autodep8 uses builddeps (most do)
331 if (
332 self.has_autodep8(data)
333 and "@builddeps@" not in data.testsuite_triggers
334 ):
335 data.testsuite_triggers.append("@builddeps@")
336 for trigger in data.testsuite_triggers:
337 if trigger == "@builddeps@":
338 for arch in self.adt_arches:
339 for block in parse_src_depends(
340 concat_bdeps(data), True, arch
341 ):
342 self.testsuite_triggers.setdefault(
343 block[0][0], set()
344 ).add(src)
345 else:
346 self.testsuite_triggers.setdefault(trigger, set()).add(src)
347 target_suite_name = self.suite_info.target_suite.name
349 os.makedirs(self.state_dir, exist_ok=True)
350 self.read_pending_tests()
352 # read the cached results that we collected so far
353 if os.path.exists(self.results_cache_file):
354 with open(self.results_cache_file) as f:
355 test_results = json.load(f)
356 self.test_results = self.check_and_upgrade_cache(test_results)
357 self.logger.info("Read previous results from %s", self.results_cache_file)
358 else:
359 self.logger.info(
360 "%s does not exist, re-downloading all results from swift",
361 self.results_cache_file,
362 )
364 # read in the new results
365 if self.options.adt_swift_url.startswith("file://"):
366 debci_file = self.options.adt_swift_url[7:]
367 if os.path.exists(debci_file):
368 with open(debci_file) as f:
369 test_results = json.load(f)
370 self.logger.info("Read new results from %s", debci_file)
371 for res in test_results["results"]:
372 # if there's no date, the test didn't finish yet
373 if res["date"] is None: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true
374 continue
375 (test_suite, triggers, src, arch, ver, status, run_id, seen) = [
376 res["suite"],
377 res["trigger"],
378 res["package"],
379 res["arch"],
380 res["version"],
381 res["status"],
382 str(res["run_id"]),
383 round(
384 calendar.timegm(
385 time.strptime(res["date"][0:-5], "%Y-%m-%dT%H:%M:%S")
386 )
387 ),
388 ]
389 if test_suite != target_suite_name: 389 ↛ 391line 389 didn't jump to line 391 because the condition on line 389 was never true
390 # not requested for this target suite, so ignore
391 continue
392 if triggers is None: 392 ↛ 394line 392 didn't jump to line 394 because the condition on line 392 was never true
393 # not requested for this policy, so ignore
394 continue
395 if status is None:
396 # still running => pending
397 continue
398 for trigger in triggers.split():
399 # remove matching test requests
400 self.remove_from_pending(trigger, src, arch, seen)
401 if status == "tmpfail": 401 ↛ 403line 401 didn't jump to line 403 because the condition on line 401 was never true
402 # let's see if we still need it
403 continue
404 self.logger.debug(
405 "Results %s %s %s added", src, trigger, status
406 )
407 self.add_trigger_to_results(
408 trigger,
409 src,
410 ver,
411 arch,
412 run_id,
413 seen,
414 Result[status.upper()],
415 )
416 else:
417 self.logger.info(
418 "%s does not exist, no new data will be processed", debci_file
419 )
421 # The cache can contain results against versions of packages that
422 # are not in any suite anymore. Strip those out, as we don't want
423 # to use those results. Additionally, old references may be
424 # filtered out.
425 if self.options.adt_baseline == "reference":
426 self.filter_old_results()
428 # we need sources, binaries, and installability tester, so for now
429 # remember the whole britney object
430 self.britney = britney
432 # Initialize AMQP connection
433 self.amqp_channel: Optional["amqp.channel.Channel"] = None
434 self.amqp_file_handle = None
435 if self.options.dry_run: 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true
436 return
438 amqp_url = self.options.adt_amqp
440 if amqp_url.startswith("amqp://"): 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true
441 import amqplib.client_0_8 as amqp
443 # depending on the setup we connect to a AMQP server
444 creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False)
445 self.amqp_con = amqp.Connection(
446 creds.hostname, userid=creds.username, password=creds.password
447 )
448 self.amqp_channel = self.amqp_con.channel()
449 self.logger.info("Connected to AMQP server")
450 elif amqp_url.startswith("file://"): 450 ↛ 455line 450 didn't jump to line 455 because the condition on line 450 was always true
451 # or in Debian and in testing mode, adt_amqp will be a file:// URL
452 amqp_file = amqp_url[7:]
453 self.amqp_file_handle = open(amqp_file, "w", 1)
454 else:
455 raise RuntimeError("Unknown ADT_AMQP schema %s" % amqp_url.split(":", 1)[0])
457 def check_and_upgrade_cache(
458 self, test_results: dict[str, dict[str, dict[str, list[Any]]]]
459 ) -> dict[str, dict[str, dict[str, list[Any]]]]:
460 for leaf_result in all_leaf_results(test_results):
461 leaf_result[0] = Result[leaf_result[0]]
463 # Drop results older than ADT_RESULTS_CACHE_AGE
464 for trigger in list(test_results.keys()):
465 for pkg in list(test_results[trigger].keys()):
466 for arch in list(test_results[trigger][pkg].keys()):
467 arch_result = test_results[trigger][pkg][arch]
468 if self._now - arch_result[3] > self.options.adt_results_cache_age: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true
469 del test_results[trigger][pkg][arch]
470 if not test_results[trigger][pkg]: 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true
471 del test_results[trigger][pkg]
472 if not test_results[trigger]: 472 ↛ 473line 472 didn't jump to line 473 because the condition on line 472 was never true
473 del test_results[trigger]
475 return test_results
477 def filter_old_results(self) -> None:
478 """Remove results for old versions and reference runs from the cache.
480 For now, only delete reference runs. If we delete regular
481 results after a while, packages with lots of triggered tests may
482 never have all the results at the same time."""
484 test_results = self.test_results
486 for trigger, trigger_data in test_results.items():
487 for src, results in trigger_data.items():
488 for arch, result in results.items():
489 if (
490 trigger == REF_TRIG
491 and self._now - result[3] > self.options.adt_reference_max_age
492 ):
493 result[0] = mark_result_as_old(result[0])
494 elif not self.test_version_in_any_suite(src, result[1]):
495 result[0] = mark_result_as_old(result[0])
497 def test_version_in_any_suite(self, src: str, version: str) -> bool:
498 """Check if the mentioned version of src is found in a suite
500 To prevent regressions in the target suite, the result should be
501 from a test with the version of the package in either the source
502 suite or the target suite. The source suite is also valid,
503 because due to versioned test dependencies and Breaks/Conflicts
504 relations, regularly the version in the source suite is used
505 during testing.
506 """
508 versions = set()
509 for suite in self.suite_info:
510 try:
511 srcinfo = suite.sources[src]
512 except KeyError:
513 continue
514 versions.add(srcinfo.version)
516 valid_version = False
517 for ver in versions:
518 if apt_pkg.version_compare(ver, version) == 0:
519 valid_version = True
520 break
522 return valid_version
524 def save_pending_json(self) -> None:
525 # update the pending tests on-disk cache
526 self.logger.info(
527 "Updating pending requested tests in %s" % self.pending_tests_file
528 )
529 # Shallow clone pending_tests as we only modify the toplevel and change its type.
530 pending_tests: dict[str, Any] = {}
531 if self.pending_tests:
532 pending_tests = dict(self.pending_tests)
533 # Avoid adding if there are no pending results at all (eases testing)
534 pending_tests[VERSION_KEY] = 1
535 with open(self.pending_tests_file + ".new", "w") as f:
536 json.dump(pending_tests, f, indent=2)
537 os.rename(self.pending_tests_file + ".new", self.pending_tests_file)
539 def save_state(self, britney: "Britney") -> None:
540 super().save_state(britney)
542 # update the results on-disk cache, unless we are using a r/o shared one
543 if not self.options.adt_shared_results_cache:
544 self.logger.info("Updating results cache")
545 test_results = deepcopy(self.test_results)
546 for result in all_leaf_results(test_results):
547 result[0] = result[0].name
548 with open(self.results_cache_file + ".new", "w") as f:
549 json.dump(test_results, f, indent=2)
550 os.rename(self.results_cache_file + ".new", self.results_cache_file)
552 self.save_pending_json()
554 def format_retry_url(
555 self, run_id: str | None, arch: str, testsrc: str, trigger: str
556 ) -> str:
557 if self.options.adt_ppas:
558 ppas = "&" + urllib.parse.urlencode(
559 [("ppa", p) for p in self.options.adt_ppas]
560 )
561 else:
562 ppas = ""
563 return cast(str, self.options.adt_retry_url).format(
564 run_id=run_id,
565 release=self.options.series,
566 arch=arch,
567 package=testsrc,
568 trigger=urllib.parse.quote_plus(trigger),
569 ppas=ppas,
570 )
572 def format_log_url(self, testsrc: str, arch: str, run_id: str) -> str:
573 return cast(str, self.options.adt_log_url).format(
574 release=self.options.series,
575 swift_container=self.swift_container,
576 hash=srchash(testsrc),
577 package=testsrc,
578 arch=arch,
579 run_id=run_id,
580 )
582 def apply_src_policy_impl(
583 self,
584 tests_info: dict[str, Any],
585 source_data_tdist: SourcePackage | None,
586 source_data_srcdist: SourcePackage,
587 excuse: "Excuse",
588 ) -> PolicyVerdict:
590 # initialize
591 verdict = PolicyVerdict.PASS
592 source_name = excuse.item.package
594 # skip/delay autopkgtests until new package is built somewhere
595 if not binaries_from_source_version(source_data_srcdist, self.suite_info):
596 self.logger.debug(
597 "%s hasnot been built anywhere, skipping autopkgtest policy",
598 excuse.name,
599 )
600 verdict = PolicyVerdict.REJECTED_TEMPORARILY
601 excuse.add_verdict_info(verdict, "Autopkgtest deferred: missing builds")
603 elif "all" in excuse.missing_builds:
604 self.logger.debug(
605 "%s hasnot been built for arch:all, skipping autopkgtest policy",
606 source_name,
607 )
608 verdict = PolicyVerdict.REJECTED_TEMPORARILY
609 excuse.add_verdict_info(
610 verdict, "Autopkgtest deferred: missing arch:all build"
611 )
613 all_self_tests_pass = False
614 results_info: list[str] = []
615 if not verdict.is_rejected:
616 self.logger.debug("Checking autopkgtests for %s", source_name)
617 trigger = source_name + "/" + source_data_srcdist.version
619 # build a (testsrc, testver) → arch → (status, run_id, log_url) map; we trigger/check test
620 # results per architecture for technical/efficiency reasons, but we
621 # want to evaluate and present the results by tested source package
622 # first
623 pkg_arch_result: dict[
624 tuple[str, str], dict[str, tuple[str, str | None, str]]
625 ] = collections.defaultdict(dict)
626 for arch in self.adt_arches:
627 if arch in excuse.missing_builds:
628 verdict = PolicyVerdict.REJECTED_TEMPORARILY
629 self.logger.debug(
630 "%s hasnot been built on arch %s, delay autopkgtest there",
631 source_name,
632 arch,
633 )
634 excuse.add_verdict_info(
635 verdict,
636 f"Autopkgtest deferred on {arch}: missing arch:{arch} build",
637 )
638 else:
639 verdict = self.check_and_request_arch(
640 excuse,
641 arch,
642 source_data_srcdist,
643 pkg_arch_result,
644 trigger,
645 verdict,
646 )
648 verdict, results_info, all_self_tests_pass = self.process_pkg_arch_results(
649 tests_info, excuse, pkg_arch_result, verdict, trigger
650 )
652 verdict = self.finalize_excuse(
653 excuse, verdict, all_self_tests_pass, results_info
654 )
655 return verdict
657 def apply_srcarch_policy_impl(
658 self,
659 tests_info: dict[str, Any],
660 arch: str,
661 source_data_tdist: SourcePackage | None,
662 source_data_srcdist: SourcePackage,
663 excuse: "Excuse",
664 ) -> PolicyVerdict:
666 assert self.hints is not None # for type checking
667 # initialize
668 verdict = PolicyVerdict.PASS
670 self.logger.debug(f"Checking autopkgtests for binNMU {str(excuse.item)}/{arch}")
672 if arch not in self.adt_arches:
673 return verdict
675 # find the binNMU version
676 versions = set()
677 for bin_pkg in source_data_srcdist.binaries:
678 if bin_pkg.architecture == arch:
679 if (
680 len(parts := bin_pkg.version.split("+b")) > 1
681 and parts[-1].isdigit()
682 ):
683 versions.add(parts[-1])
684 else:
685 self.logger.debug(
686 f"Version {bin_pkg.version} doesn't end with '+b#', skipping"
687 )
688 if not versions or len(versions) > 1:
689 self.logger.debug("This migration item doesn't look like a binNMU")
690 return verdict
692 trigger = str(excuse.item) + "/" + versions.pop()
694 # While we don't need the arch here, this is common with apply_src_policy_impl()
695 # (testsrc, testver) → arch → (status, run_id, log_url) map
696 pkg_arch_result: dict[
697 tuple[str, str], dict[str, tuple[str, str | None, str]]
698 ] = collections.defaultdict(dict)
700 verdict = self.check_and_request_arch(
701 excuse, arch, source_data_srcdist, pkg_arch_result, trigger, verdict
702 )
704 verdict, results_info, all_self_tests_pass = self.process_pkg_arch_results(
705 tests_info, excuse, pkg_arch_result, verdict, trigger
706 )
708 verdict = self.finalize_excuse(
709 excuse, verdict, all_self_tests_pass, results_info
710 )
711 return verdict
713 def check_and_request_arch(
714 self,
715 excuse: "Excuse",
716 arch: str,
717 source_data_srcdist: SourcePackage,
718 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]],
719 trigger: str,
720 verdict: PolicyVerdict,
721 ) -> PolicyVerdict:
722 """Perform sanity checks and request test/results when they pass"""
724 source_name = excuse.item.package
725 if arch in excuse.policy_info["depends"].get("arch_all_not_installable", []):
726 self.logger.debug(
727 "%s is uninstallable on arch %s (which is allowed), not running autopkgtest there",
728 source_name,
729 arch,
730 )
731 excuse.addinfo(
732 f"Autopkgtest skipped on {arch}: not installable (which is allowed)"
733 )
734 elif arch in excuse.unsatisfiable_on_archs and arch not in excuse.policy_info[
735 "depends"
736 ].get("autopkgtest_run_anyways", []):
737 verdict = PolicyVerdict.REJECTED_TEMPORARILY
738 self.logger.debug(
739 "%s is uninstallable on arch %s, not running autopkgtest there",
740 source_name,
741 arch,
742 )
743 excuse.addinfo(f"Autopkgtest skipped on {arch}: not installable")
744 else:
745 self.request_tests_for_source(
746 arch, source_data_srcdist, pkg_arch_result, excuse, trigger
747 )
749 return verdict
751 def process_pkg_arch_results(
752 self,
753 tests_info: dict[str, Any],
754 excuse: "Excuse",
755 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]],
756 verdict: PolicyVerdict,
757 trigger: str,
758 ) -> tuple[PolicyVerdict, list[str], bool]:
759 """Calculate verdict based on results and render excuse text"""
761 source_name = excuse.item.package
762 all_self_tests_pass = False
763 results_info = []
765 # add test result details to Excuse
766 cloud_url = self.options.adt_ci_url + "packages/%(h)s/%(s)s/%(r)s/%(a)s"
767 testver: str | None
768 for testsrc, testver in sorted(pkg_arch_result):
769 assert testver is not None
770 arch_results = pkg_arch_result[(testsrc, testver)]
771 r = {v[0] for v in arch_results.values()}
772 if r & {"FAIL", "OLD_FAIL", "REGRESSION"}:
773 verdict = PolicyVerdict.REJECTED_PERMANENTLY
774 elif (
775 r & {"DEFERRED", "RUNNING", "RUNNING-REFERENCE"}
776 and not verdict.is_rejected
777 ):
778 verdict = PolicyVerdict.REJECTED_TEMPORARILY
779 # skip version if still running on all arches
780 if not r - {"DEFERRED", "RUNNING", "RUNNING-ALWAYSFAIL", "RUNNING-IGNORE"}:
781 testver = None
783 # A source package is eligible for the bounty if it has tests
784 # of its own that pass on all tested architectures.
785 if testsrc == source_name:
786 excuse.autopkgtest_results = r
787 if r == {"PASS"}:
788 all_self_tests_pass = True
790 if testver:
791 testname = f"{testsrc}/{testver}"
792 else:
793 testname = testsrc
795 html_archmsg = []
796 for arch in sorted(arch_results):
797 (status, run_id, log_url) = arch_results[arch]
798 artifact_url = None
799 retry_url = None
800 reference_url = None
801 reference_retry_url = None
802 history_url = None
803 if self.options.adt_ppas:
804 if log_url.endswith("log.gz"):
805 artifact_url = log_url.replace("log.gz", "artifacts.tar.gz")
806 else:
807 history_url = cloud_url % {
808 "h": srchash(testsrc),
809 "s": testsrc,
810 "r": self.options.series,
811 "a": arch,
812 }
813 if status not in ("DEFERRED", "PASS", "RUNNING", "RUNNING-IGNORE"):
814 retry_url = self.format_retry_url(run_id, arch, testsrc, trigger)
816 baseline_result = self.result_in_baseline(testsrc, arch)
817 if baseline_result and baseline_result[0] != Result.NONE:
818 baseline_run_id = str(baseline_result[2])
819 reference_url = self.format_log_url(
820 testsrc, arch, baseline_run_id
821 )
822 if self.options.adt_baseline == "reference":
823 reference_retry_url = self.format_retry_url(
824 baseline_run_id, arch, testsrc, REF_TRIG
825 )
826 tests_info.setdefault(testname, {})[arch] = [
827 status,
828 log_url,
829 history_url,
830 artifact_url,
831 retry_url,
832 ]
834 # render HTML snippet for testsrc entry for current arch
835 if history_url:
836 message = f'<a href="{history_url}">{arch}</a>'
837 else:
838 message = arch
839 message += ': <a href="{}">{}</a>'.format(
840 log_url,
841 EXCUSES_LABELS[status],
842 )
843 if retry_url:
844 message += (
845 '<a href="%s" style="text-decoration: none;"> ♻</a>' % retry_url
846 )
847 if reference_url:
848 message += ' (<a href="%s">reference</a>' % reference_url
849 if reference_retry_url:
850 message += (
851 '<a href="%s" style="text-decoration: none;"> ♻</a>'
852 % reference_retry_url
853 )
854 message += ")"
855 if artifact_url:
856 message += ' <a href="%s">[artifacts]</a>' % artifact_url
857 html_archmsg.append(message)
859 # render HTML line for testsrc entry
860 # - if action is or may be required
861 # - for ones own package
862 if (
863 r
864 - {
865 "PASS",
866 "NEUTRAL",
867 "RUNNING-ALWAYSFAIL",
868 "ALWAYSFAIL",
869 "IGNORE-FAIL",
870 }
871 or testsrc == source_name
872 ):
873 if testver:
874 pkg = '<a href="#{0}">{0}</a>/{1}'.format(testsrc, testver)
875 else:
876 pkg = '<a href="#{0}">{0}</a>'.format(testsrc)
877 results_info.append(
878 "Autopkgtest for {}: {}".format(pkg, ", ".join(html_archmsg))
879 )
881 return (verdict, results_info, all_self_tests_pass)
883 def finalize_excuse(
884 self,
885 excuse: "Excuse",
886 verdict: PolicyVerdict,
887 all_self_tests_pass: bool,
888 results_info: list[str],
889 ) -> PolicyVerdict:
890 """Updates excuses and verdict for hints and bounty/penalty config
892 Given the verdict so far, hints and configuration, the verdict may be
893 updated. Depending of the end verdict, the content of results_info is
894 added as info or as excuse.
895 """
897 package = excuse.item.package
898 version = excuse.item.version
900 assert self.hints is not None # for type checking
901 if verdict.is_rejected:
902 # check for force-skiptest hint
903 hints = self.hints.search(
904 "force-skiptest",
905 package=package,
906 version=version,
907 )
908 if hints:
909 excuse.addreason("skiptest")
910 excuse.addinfo(
911 "Autopkgtest check should wait for tests relating to %s %s, but forced by %s"
912 % (package, version, hints[0].user)
913 )
914 verdict = PolicyVerdict.PASS_HINTED
915 else:
916 excuse.addreason("autopkgtest")
918 if (
919 self.options.adt_success_bounty
920 and verdict == PolicyVerdict.PASS
921 and all_self_tests_pass
922 ):
923 excuse.add_bounty("autopkgtest", self.options.adt_success_bounty)
924 if self.options.adt_regression_penalty and verdict in {
925 PolicyVerdict.REJECTED_PERMANENTLY,
926 PolicyVerdict.REJECTED_TEMPORARILY,
927 }:
928 if self.options.adt_regression_penalty > 0: 928 ↛ 931line 928 didn't jump to line 931 because the condition on line 928 was always true
929 excuse.add_penalty("autopkgtest", self.options.adt_regression_penalty)
930 # In case we give penalties instead of blocking, we must always pass
931 verdict = PolicyVerdict.PASS
932 for i in results_info:
933 if verdict.is_rejected:
934 excuse.add_verdict_info(verdict, i)
935 else:
936 excuse.addinfo(i)
938 return verdict
940 @staticmethod
941 def has_autodep8(srcinfo: SourcePackage) -> bool:
942 """Check if package is covered by autodep8
944 srcinfo is an item from self.britney.sources
945 """
946 # autodep8?
947 for t in srcinfo.testsuite:
948 if t.startswith("autopkgtest-pkg"):
949 return True
951 return False
953 def request_tests_for_source(
954 self,
955 arch: str,
956 source_data_srcdist: SourcePackage,
957 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]],
958 excuse: "Excuse",
959 trigger: str,
960 ) -> None:
961 pkg_universe = self.britney.pkg_universe
962 target_suite = self.suite_info.target_suite
963 source_suite = excuse.item.suite
964 sources_t = target_suite.sources
965 sources_s = excuse.item.suite.sources
966 packages_s_a = excuse.item.suite.binaries[arch]
967 source_name = excuse.item.package
968 source_version = source_data_srcdist.version
969 # request tests (unless they were already requested earlier or have a result)
970 tests = self.tests_for_source(source_name, source_version, arch, excuse)
971 is_huge = len(tests) > self.options.adt_huge
973 # local copies for better performance
974 parse_src_depends = apt_pkg.parse_src_depends
976 # Here we figure out what is required from the source suite
977 # for the test to install successfully.
978 #
979 # The ImplicitDependencyPolicy does a similar calculation, but
980 # if I (elbrus) understand correctly, only in the reverse
981 # dependency direction. We are doing something similar here
982 # but in the dependency direction (note: this code is older).
983 # We use the ImplicitDependencyPolicy result for the reverse
984 # dependencies and we keep the code below for the
985 # dependencies. Using the ImplicitDependencyPolicy results
986 # also in the reverse direction seems to require quite some
987 # reorganisation to get that information available here, as in
988 # the current state only the current excuse is available here
989 # and the required other excuses may not be calculated yet.
990 #
991 # Loop over all binary packages from trigger and
992 # recursively look up which *versioned* dependencies are
993 # only satisfied in the source suite.
994 #
995 # For all binaries found, look up which packages they
996 # break/conflict with in the target suite, but not in the
997 # source suite. The main reason to do this is to cover test
998 # dependencies, so we will check Testsuite-Triggers as
999 # well.
1000 #
1001 # OI: do we need to do the first check in a smart way
1002 # (i.e. only for the packages that are actually going to be
1003 # installed) for the breaks/conflicts set as well, i.e. do
1004 # we need to check if any of the packages that we now
1005 # enforce being from the source suite, actually have new
1006 # versioned depends and new breaks/conflicts.
1007 #
1008 # For all binaries found, add the set of unique source
1009 # packages to the list of triggers.
1011 bin_triggers: set[PackageId] = set()
1012 bin_new = set(filter_out_faux(source_data_srcdist.binaries))
1013 # For each build-depends block (if any) check if the first alternative
1014 # is satisfiable in the target suite. If not, add it to the initial set
1015 # used for checking.
1016 for block in parse_src_depends(concat_bdeps(source_data_srcdist), True, arch):
1017 if not get_dependency_solvers(
1018 [block[0]],
1019 target_suite.binaries[arch],
1020 target_suite.provides_table[arch],
1021 build_depends=True,
1022 ) and (
1023 solvers := get_dependency_solvers(
1024 [block[0]],
1025 packages_s_a,
1026 source_suite.provides_table[arch],
1027 build_depends=True,
1028 )
1029 ):
1030 bin_new.add(solvers[0].pkg_id)
1031 for n_binary in iter_except(bin_new.pop, KeyError):
1032 if n_binary in bin_triggers:
1033 continue
1034 bin_triggers.add(n_binary)
1036 # Check if there is a dependency that is not
1037 # available in the target suite.
1038 # We add slightly too much here, because new binaries
1039 # will also show up, but they are already properly
1040 # installed. Nevermind.
1041 depends = pkg_universe.dependencies_of(n_binary)
1042 # depends is a frozenset{frozenset{BinaryPackageId, ..}}
1043 for deps_of_bin in depends:
1044 if target_suite.any_of_these_are_in_the_suite(deps_of_bin):
1045 # if any of the alternative dependencies is already
1046 # satisfied in the target suite, we can just ignore it
1047 continue
1048 # We'll figure out which version later
1049 bin_new.update(
1050 added_pkgs_compared_to_target_suite(deps_of_bin, target_suite)
1051 )
1053 # Check if the package breaks/conflicts anything. We might
1054 # be adding slightly too many source packages due to the
1055 # check here as a binary package that is broken may be
1056 # coming from a different source package in the source
1057 # suite. Nevermind.
1058 bin_broken = set()
1059 for t_binary in bin_triggers:
1060 # broken is a frozenset{BinaryPackageId, ..}
1061 broken = pkg_universe.negative_dependencies_of(
1062 cast(BinaryPackageId, t_binary)
1063 )
1064 broken_in_target = {
1065 p.package_name
1066 for p in target_suite.which_of_these_are_in_the_suite(broken)
1067 }
1068 broken_in_source = {
1069 p.package_name
1070 for p in source_suite.which_of_these_are_in_the_suite(broken)
1071 }
1072 # We want packages with a newer version in the source suite that
1073 # no longer has the conflict. This is an approximation
1074 broken_filtered = {
1075 p
1076 for p in broken
1077 if p.package_name in broken_in_target
1078 and p.package_name not in broken_in_source
1079 }
1080 # We add the version in the target suite, but the code below will
1081 # change it to the version in the source suite
1082 bin_broken.update(broken_filtered)
1083 bin_triggers.update(bin_broken)
1085 # The ImplicitDependencyPolicy also found packages that need
1086 # to migrate together, so add them to the triggers too.
1087 for bin_implicit in excuse.depends_packages_flattened:
1088 if bin_implicit.architecture == arch:
1089 bin_triggers.add(bin_implicit)
1091 triggers = set()
1092 for t_binary2 in bin_triggers:
1093 if t_binary2.architecture == arch:
1094 try:
1095 source_of_bin = packages_s_a[t_binary2.package_name].source
1096 # If the version in the target suite is the same, don't add a trigger.
1097 # Note that we looked up the source package in the source suite.
1098 # If it were a different source package in the target suite, however, then
1099 # we would not have this source package in the same version anyway.
1100 #
1101 # binNMU's exist, so let's also check if t_binary2 exists
1102 # in the target suite if the sources are the same.
1103 if (
1104 sources_t.get(source_of_bin, None) is None
1105 or sources_s[source_of_bin].version
1106 != sources_t[source_of_bin].version
1107 or not target_suite.any_of_these_are_in_the_suite(
1108 {cast(BinaryPackageId, t_binary2)}
1109 )
1110 ):
1111 triggers.add(
1112 source_of_bin + "/" + sources_s[source_of_bin].version
1113 )
1114 except KeyError:
1115 # Apparently the package was removed from
1116 # unstable e.g. if packages are replaced
1117 # (e.g. -dbg to -dbgsym)
1118 pass
1119 if t_binary2 not in source_data_srcdist.binaries:
1120 for tdep_src in self.testsuite_triggers.get(
1121 t_binary2.package_name, set()
1122 ):
1123 try:
1124 # Only add trigger if versions in the target and source suites are different
1125 if ( 1125 ↛ 1120line 1125 didn't jump to line 1120
1126 sources_t.get(tdep_src, None) is None
1127 or sources_s[tdep_src].version
1128 != sources_t[tdep_src].version
1129 ):
1130 triggers.add(
1131 tdep_src + "/" + sources_s[tdep_src].version
1132 )
1133 except KeyError:
1134 # Apparently the source was removed from
1135 # unstable (testsuite_triggers are unified
1136 # over all suites)
1137 pass
1138 source_trigger = source_name + "/" + source_version
1139 triggers.discard(source_trigger)
1140 triggers_list = sorted(list(triggers))
1141 triggers_list.insert(0, trigger)
1143 impl_pids = excuse.policy_info.get("implicit-deps", {}).get(
1144 "broken-binaries", []
1145 )
1146 for testsrc, testver in tests:
1147 # Not if binaries from testsrc are not installable
1148 skip = False
1149 for bpid_s in impl_pids:
1150 bpid = BinaryPackageId(*bpid_s.split("/"))
1151 if (
1152 bpid.architecture == arch
1153 and testsrc == target_suite.all_binaries_in_suite[bpid].source
1154 ):
1155 skip = True
1156 break
1157 if skip:
1158 pkg_arch_result[(testsrc, testver)][arch] = ("DEFERRED", None, "")
1159 else:
1160 self.pkg_test_request(testsrc, arch, triggers_list, huge=is_huge)
1161 (result, real_ver, run_id, url) = self.pkg_test_result(
1162 testsrc, testver, arch, trigger
1163 )
1164 pkg_arch_result[(testsrc, real_ver)][arch] = (result, run_id, url)
1166 def tests_for_source(
1167 self, src: str, ver: str, arch: str, excuse: "Excuse"
1168 ) -> list[tuple[str, str]]:
1169 """Iterate over all tests that should be run for given source and arch"""
1171 source_suite = self.suite_info.primary_source_suite
1172 target_suite = self.suite_info.target_suite
1173 sources_info = target_suite.sources
1174 binaries_info = target_suite.binaries[arch]
1176 reported_pkgs = set()
1178 tests = []
1180 # Debian doesn't have linux-meta, but Ubuntu does
1181 # for linux themselves we don't want to trigger tests -- these should
1182 # all come from linux-meta*. A new kernel ABI without a corresponding
1183 # -meta won't be installed and thus we can't sensibly run tests against
1184 # it.
1185 if ( 1185 ↛ 1189line 1185 didn't jump to line 1189
1186 src.startswith("linux")
1187 and src.replace("linux", "linux-meta") in sources_info
1188 ):
1189 return []
1191 # we want to test the package itself, if it still has a test in unstable
1192 # but only if the package actually exists on this arch
1193 srcinfo = source_suite.sources[src]
1194 if ("autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo)) and len(
1195 excuse.packages[arch]
1196 ) > 0:
1197 reported_pkgs.add(src)
1198 tests.append((src, ver))
1200 extra_bins = []
1201 # Debian doesn't have linux-meta, but Ubuntu does
1202 # Hack: For new kernels trigger all DKMS packages by pretending that
1203 # linux-meta* builds a "dkms" binary as well. With that we ensure that we
1204 # don't regress DKMS drivers with new kernel versions.
1205 if src.startswith("linux-meta"):
1206 # does this have any image on this arch?
1207 for pkg_id in srcinfo.binaries:
1208 if pkg_id.architecture == arch and "-image" in pkg_id.package_name:
1209 try:
1210 extra_bins.append(binaries_info["dkms"].pkg_id)
1211 except KeyError:
1212 pass
1214 if not self.has_built_on_this_arch_or_is_arch_all(srcinfo, arch):
1215 return []
1217 pkg_universe = self.britney.pkg_universe
1218 # plus all direct reverse dependencies and test triggers of its
1219 # binaries which have an autopkgtest
1220 for binary in itertools.chain(srcinfo.binaries, extra_bins):
1221 rdeps = filter_out_faux(pkg_universe.reverse_dependencies_of(binary))
1222 for rdep in rdeps:
1223 try:
1224 rdep_src = binaries_info[rdep.package_name].source
1225 # Don't re-trigger the package itself here; this should
1226 # have been done above if the package still continues to
1227 # have an autopkgtest in unstable.
1228 if rdep_src == src:
1229 continue
1230 except KeyError:
1231 continue
1233 rdep_src_info = sources_info[rdep_src]
1234 if "autopkgtest" in rdep_src_info.testsuite or self.has_autodep8(
1235 rdep_src_info
1236 ):
1237 if rdep_src not in reported_pkgs:
1238 tests.append((rdep_src, rdep_src_info.version))
1239 reported_pkgs.add(rdep_src)
1241 for tdep_src in self.testsuite_triggers.get(binary.package_name, set()):
1242 if tdep_src not in reported_pkgs:
1243 try:
1244 tdep_src_info = sources_info[tdep_src]
1245 except KeyError:
1246 continue
1247 if "autopkgtest" in tdep_src_info.testsuite or self.has_autodep8( 1247 ↛ 1241line 1247 didn't jump to line 1241 because the condition on line 1247 was always true
1248 tdep_src_info
1249 ):
1250 for pkg_id in tdep_src_info.binaries: 1250 ↛ 1241line 1250 didn't jump to line 1241 because the loop on line 1250 didn't complete
1251 if pkg_id.architecture == arch:
1252 tests.append((tdep_src, tdep_src_info.version))
1253 reported_pkgs.add(tdep_src)
1254 break
1256 tests.sort(key=lambda s_v: s_v[0])
1257 return tests
1259 def read_pending_tests(self) -> None:
1260 """Read pending test requests from previous britney runs
1262 Initialize self.pending_tests with that data.
1263 """
1264 assert self.pending_tests is None, "already initialized"
1265 if not os.path.exists(self.pending_tests_file):
1266 self.logger.info(
1267 "No %s, starting with no pending tests", self.pending_tests_file
1268 )
1269 self.pending_tests = {}
1270 return
1271 with open(self.pending_tests_file) as f:
1272 self.pending_tests = json.load(f)
1273 if VERSION_KEY in self.pending_tests:
1274 del self.pending_tests[VERSION_KEY]
1275 for trigger in list(self.pending_tests.keys()):
1276 for pkg in list(self.pending_tests[trigger].keys()):
1277 arch_dict = self.pending_tests[trigger][pkg]
1278 for arch in list(arch_dict.keys()):
1279 if (
1280 self._now - arch_dict[arch]
1281 > self.options.adt_pending_max_age
1282 ):
1283 del arch_dict[arch]
1284 if not arch_dict:
1285 del self.pending_tests[trigger][pkg]
1286 if not self.pending_tests[trigger]:
1287 del self.pending_tests[trigger]
1288 else:
1289 # Migration code:
1290 for trigger_data in self.pending_tests.values(): 1290 ↛ 1291line 1290 didn't jump to line 1291 because the loop on line 1290 never started
1291 for pkg, arch_list in trigger_data.items():
1292 trigger_data[pkg] = {}
1293 for arch in arch_list:
1294 trigger_data[pkg][arch] = self._now
1296 self.logger.info(
1297 "Read pending requested tests from %s", self.pending_tests_file
1298 )
1299 self.logger.debug("%s", self.pending_tests)
1301 # this requires iterating over all triggers and thus is expensive;
1302 # cache the results
1303 @lru_cache(None)
1304 def latest_run_for_package(self, src: str, arch: str) -> str:
1305 """Return latest run ID for src on arch"""
1307 latest_run_id = ""
1308 for srcmap in self.test_results.values():
1309 try:
1310 run_id = srcmap[src][arch][2]
1311 except KeyError:
1312 continue
1313 if run_id > latest_run_id:
1314 latest_run_id = run_id
1315 return latest_run_id
1317 def urlopen_retry(self, url: str) -> http.client.HTTPResponse | addinfourl:
1318 """A urlopen() that retries on time outs or errors"""
1320 exc: Exception
1321 for retry in range(5): 1321 ↛ 1342line 1321 didn't jump to line 1342 because the loop on line 1321 didn't complete
1322 try:
1323 req = urlopen(url, timeout=30)
1324 code = req.getcode()
1325 if not code or 200 <= code < 300: 1325 ↛ 1321line 1325 didn't jump to line 1321 because the condition on line 1325 was always true
1326 return req # type: ignore[no-any-return]
1327 except TimeoutError as e: 1327 ↛ 1328line 1327 didn't jump to line 1328 because the exception caught by line 1327 didn't happen
1328 self.logger.info(
1329 "Timeout downloading '%s', will retry %d more times."
1330 % (url, 5 - retry - 1)
1331 )
1332 exc = e
1333 except HTTPError as e:
1334 if e.code not in (503, 502): 1334 ↛ 1336line 1334 didn't jump to line 1336 because the condition on line 1334 was always true
1335 raise
1336 self.logger.info(
1337 "Caught error %d downloading '%s', will retry %d more times."
1338 % (e.code, url, 5 - retry - 1)
1339 )
1340 exc = e
1341 else:
1342 raise exc
1344 @lru_cache(None)
1345 def fetch_swift_results(self, swift_url: str, src: str, arch: str) -> None:
1346 """Download new results for source package/arch from swift"""
1348 # prepare query: get all runs with a timestamp later than the latest
1349 # run_id for this package/arch; '@' is at the end of each run id, to
1350 # mark the end of a test run directory path
1351 # example: <autopkgtest-wily>wily/amd64/libp/libpng/20150630_054517@/result.tar
1352 query = {
1353 "delimiter": "@",
1354 "prefix": f"{self.options.series}/{arch}/{srchash(src)}/{src}/",
1355 }
1357 # determine latest run_id from results
1358 if not self.options.adt_shared_results_cache:
1359 latest_run_id = self.latest_run_for_package(src, arch)
1360 if latest_run_id:
1361 query["marker"] = query["prefix"] + latest_run_id
1363 # request new results from swift
1364 url = os.path.join(swift_url, self.swift_container)
1365 url += "?" + urllib.parse.urlencode(query)
1366 f = None
1367 try:
1368 f = self.urlopen_retry(url)
1369 if f.getcode() == 200:
1370 result_paths = f.read().decode().strip().splitlines()
1371 elif f.getcode() == 204: # No content 1371 ↛ 1377line 1371 didn't jump to line 1377 because the condition on line 1371 was always true
1372 result_paths = []
1373 else:
1374 # we should not ever end up here as we expect a HTTPError in
1375 # other cases; e. g. 3XX is something that tells us to adjust
1376 # our URLS, so fail hard on those
1377 raise NotImplementedError(
1378 "fetch_swift_results(%s): cannot handle HTTP code %r"
1379 % (url, f.getcode())
1380 )
1381 except OSError as e:
1382 # 401 "Unauthorized" is swift's way of saying "container does not exist"
1383 if getattr(e, "code", -1) == 401: 1383 ↛ 1392line 1383 didn't jump to line 1392 because the condition on line 1383 was always true
1384 self.logger.info(
1385 "fetch_swift_results: %s does not exist yet or is inaccessible", url
1386 )
1387 return
1388 # Other status codes are usually a transient
1389 # network/infrastructure failure. Ignoring this can lead to
1390 # re-requesting tests which we already have results for, so
1391 # fail hard on this and let the next run retry.
1392 self.logger.error("Failure to fetch swift results from %s: %s", url, str(e))
1393 sys.exit(1)
1394 finally:
1395 if f is not None: 1395 ↛ 1398line 1395 didn't jump to line 1398 because the condition on line 1395 was always true
1396 f.close() 1396 ↛ exitline 1396 didn't return from function 'fetch_swift_results' because the return on line 1387 wasn't executed
1398 for p in result_paths:
1399 self.fetch_one_result(
1400 os.path.join(swift_url, self.swift_container, p, "result.tar"),
1401 src,
1402 arch,
1403 )
1405 def fetch_one_result(self, url: str, src: str, arch: str) -> None:
1406 """Download one result URL for source/arch
1408 Remove matching pending_tests entries.
1409 """
1410 f = None
1411 try:
1412 f = self.urlopen_retry(url)
1413 if f.getcode() == 200: 1413 ↛ 1416line 1413 didn't jump to line 1416 because the condition on line 1413 was always true
1414 tar_bytes = io.BytesIO(f.read())
1415 else:
1416 raise NotImplementedError(
1417 "fetch_one_result(%s): cannot handle HTTP code %r"
1418 % (url, f.getcode())
1419 )
1420 except OSError as err:
1421 self.logger.error("Failure to fetch %s: %s", url, str(err))
1422 # we tolerate "not found" (something went wrong on uploading the
1423 # result), but other things indicate infrastructure problems
1424 if getattr(err, "code", -1) == 404:
1425 return
1426 sys.exit(1)
1427 finally:
1428 if f is not None: 1428 ↛ exit, 1428 ↛ 14302 missed branches: 1) line 1428 didn't return from function 'fetch_one_result' because the return on line 1425 wasn't executed, 2) line 1428 didn't jump to line 1430 because the condition on line 1428 was always true
1429 f.close() 1429 ↛ exitline 1429 didn't return from function 'fetch_one_result' because the return on line 1425 wasn't executed
1430 try:
1431 with tarfile.open(None, "r", tar_bytes) as tar:
1432 exitcode = int(tar.extractfile("exitcode").read().strip()) # type: ignore[union-attr]
1433 srcver = tar.extractfile("testpkg-version").read().decode().strip() # type: ignore[union-attr]
1434 (ressrc, ver) = srcver.split()
1435 testinfo = json.loads(tar.extractfile("testinfo.json").read().decode()) # type: ignore[union-attr]
1436 except (KeyError, ValueError, tarfile.TarError) as err:
1437 self.logger.error("%s is damaged, ignoring: %s", url, str(err))
1438 # ignore this; this will leave an orphaned request in autopkgtest-pending.json
1439 # and thus require manual retries after fixing the tmpfail, but we
1440 # can't just blindly attribute it to some pending test.
1441 return
1443 if src != ressrc: 1443 ↛ 1444line 1443 didn't jump to line 1444 because the condition on line 1443 was never true
1444 self.logger.error(
1445 "%s is a result for package %s, but expected package %s",
1446 url,
1447 ressrc,
1448 src,
1449 )
1450 return
1452 # parse recorded triggers in test result
1453 for e in testinfo.get("custom_environment", []): 1453 ↛ 1458line 1453 didn't jump to line 1458 because the loop on line 1453 didn't complete
1454 if e.startswith("ADT_TEST_TRIGGERS="): 1454 ↛ 1453line 1454 didn't jump to line 1453 because the condition on line 1454 was always true
1455 result_triggers = [i for i in e.split("=", 1)[1].split() if "/" in i]
1456 break
1457 else:
1458 self.logger.error("%s result has no ADT_TEST_TRIGGERS, ignoring")
1459 return
1461 run_id = os.path.basename(os.path.dirname(url))
1462 seen = round(calendar.timegm(time.strptime(run_id, "%Y%m%d_%H%M%S@")))
1463 # allow some skipped tests, but nothing else
1464 if exitcode in [0, 2]:
1465 result = Result.PASS
1466 elif exitcode == 8: 1466 ↛ 1467line 1466 didn't jump to line 1467 because the condition on line 1466 was never true
1467 result = Result.NEUTRAL
1468 else:
1469 result = Result.FAIL
1471 self.logger.info(
1472 "Fetched test result for %s/%s/%s %s (triggers: %s): %s",
1473 src,
1474 ver,
1475 arch,
1476 run_id,
1477 result_triggers,
1478 result.name.lower(),
1479 )
1481 # remove matching test requests
1482 for trigger in result_triggers:
1483 self.remove_from_pending(trigger, src, arch)
1485 # add this result
1486 for trigger in result_triggers:
1487 self.add_trigger_to_results(trigger, src, ver, arch, run_id, seen, result)
1489 def remove_from_pending(
1490 self, trigger: str, src: str, arch: str, timestamp: int = sys.maxsize
1491 ) -> None:
1492 assert self.pending_tests is not None # for type checking
1493 try:
1494 arch_dict = self.pending_tests[trigger][src]
1495 if timestamp < arch_dict[arch]:
1496 # The result is from before the moment of scheduling, so it's
1497 # not the one we're waiting for
1498 return
1499 del arch_dict[arch]
1500 if not arch_dict:
1501 del self.pending_tests[trigger][src]
1502 if not self.pending_tests[trigger]:
1503 del self.pending_tests[trigger]
1504 self.logger.debug(
1505 "-> matches pending request %s/%s for trigger %s", src, arch, trigger
1506 )
1507 except KeyError:
1508 self.logger.debug(
1509 "-> does not match any pending request for %s/%s", src, arch
1510 )
1512 def add_trigger_to_results(
1513 self,
1514 trigger: str,
1515 src: str,
1516 ver: str,
1517 arch: str,
1518 run_id: str,
1519 timestamp: int,
1520 status_to_add: Result,
1521 ) -> None:
1522 # Ensure that we got a new enough version
1523 parts = trigger.split("/")
1524 match len(parts):
1525 case 2:
1526 trigsrc, trigver = parts
1527 case 4: 1527 ↛ 1529line 1527 didn't jump to line 1529 because the pattern on line 1527 always matched
1528 trigsrc, trigarch, trigver, rebuild = parts
1529 case _:
1530 self.logger.info("Ignoring invalid test trigger %s", trigger)
1531 return
1532 if trigsrc == src and apt_pkg.version_compare(ver, trigver) < 0: 1532 ↛ 1533line 1532 didn't jump to line 1533 because the condition on line 1532 was never true
1533 self.logger.debug(
1534 "test trigger %s, but run for older version %s, ignoring", trigger, ver
1535 )
1536 return
1538 stored_result = (
1539 self.test_results.setdefault(trigger, {})
1540 .setdefault(src, {})
1541 .setdefault(arch, [Result.FAIL, None, "", 0])
1542 )
1544 # reruns shouldn't flip the result from PASS or NEUTRAL to
1545 # FAIL, so remember the most recent version of the best result
1546 # we've seen. Except for reference updates, which we always
1547 # want to update with the most recent result. The result data
1548 # may not be ordered by timestamp, so we need to check time.
1549 update = False
1550 if self.options.adt_baseline == "reference" and trigger == REF_TRIG:
1551 if stored_result[3] < timestamp:
1552 update = True
1553 elif status_to_add < stored_result[0]:
1554 update = True
1555 elif status_to_add == stored_result[0] and stored_result[3] < timestamp:
1556 update = True
1558 if update:
1559 stored_result[0] = status_to_add
1560 stored_result[1] = ver
1561 stored_result[2] = run_id
1562 stored_result[3] = timestamp
1564 def send_test_request(
1565 self, src: str, arch: str, triggers: list[str], huge: bool = False
1566 ) -> None:
1567 """Send out AMQP request for testing src/arch for triggers
1569 If huge is true, then the request will be put into the -huge instead of
1570 normal queue.
1571 """
1572 if self.options.dry_run: 1572 ↛ 1573line 1572 didn't jump to line 1573 because the condition on line 1572 was never true
1573 return
1575 params: dict[str, Any] = {"triggers": triggers}
1576 if self.options.adt_ppas:
1577 params["ppas"] = self.options.adt_ppas
1578 qname = f"debci-ppa-{self.options.series}-{arch}"
1579 elif huge:
1580 qname = f"debci-huge-{self.options.series}-{arch}"
1581 else:
1582 qname = f"debci-{self.options.series}-{arch}"
1583 params["submit-time"] = time.strftime("%Y-%m-%d %H:%M:%S%z", time.gmtime())
1585 if self.amqp_channel: 1585 ↛ 1586line 1585 didn't jump to line 1586 because the condition on line 1585 was never true
1586 self.amqp_channel.basic_publish(
1587 amqp.Message(
1588 src + "\n" + json.dumps(params), delivery_mode=2
1589 ), # persistent
1590 routing_key=qname,
1591 )
1592 # we save pending.json with every request, so that if britney
1593 # crashes we don't re-request tests. This is only needed when using
1594 # real amqp, as with file-based submission the pending tests are
1595 # returned by debci along with the results each run.
1596 self.save_pending_json()
1597 else:
1598 # for file-based submission, triggers are space separated
1599 params["triggers"] = [" ".join(params["triggers"])]
1600 assert self.amqp_file_handle
1601 self.amqp_file_handle.write(f"{qname}:{src} {json.dumps(params)}\n")
1603 def pkg_test_request(
1604 self, src: str, arch: str, all_triggers: list[str], huge: bool = False
1605 ) -> None:
1606 """Request one package test for a set of triggers
1608 all_triggers is a list of "pkgname/version". These are the packages
1609 that will be taken from the source suite. The first package in this
1610 list is the package that triggers the testing of src, the rest are
1611 additional packages required for installability of the test deps. If
1612 huge is true, then the request will be put into the -huge instead of
1613 normal queue.
1615 This will only be done if that test wasn't already requested in
1616 a previous run (i. e. if it's not already in self.pending_tests)
1617 or if there is already a fresh or a positive result for it. This
1618 ensures to download current results for this package before
1619 requesting any test."""
1620 trigger = all_triggers[0]
1621 uses_swift = not self.options.adt_swift_url.startswith("file://")
1622 try:
1623 result = self.test_results[trigger][src][arch]
1624 has_result = True
1625 except KeyError:
1626 has_result = False
1628 if has_result:
1629 result_state = result[0]
1630 if result_state in {Result.OLD_PASS, Result.OLD_FAIL, Result.OLD_NEUTRAL}:
1631 pass
1632 elif (
1633 result_state == Result.FAIL
1634 and self.result_in_baseline(src, arch)[0]
1635 in {Result.PASS, Result.NEUTRAL, Result.OLD_PASS, Result.OLD_NEUTRAL}
1636 and self._now - result[3] > self.options.adt_retry_older_than
1637 ):
1638 # We might want to retry this failure, so continue
1639 pass
1640 elif not uses_swift:
1641 # We're done if we don't retrigger and we're not using swift
1642 return
1643 elif result_state in {Result.PASS, Result.NEUTRAL}:
1644 self.logger.debug(
1645 "%s/%s triggered by %s already known", src, arch, trigger
1646 )
1647 return
1649 # Without swift we don't expect new results
1650 if uses_swift:
1651 self.logger.info(
1652 "Checking for new results for failed %s/%s for trigger %s",
1653 src,
1654 arch,
1655 trigger,
1656 )
1657 self.fetch_swift_results(self.options.adt_swift_url, src, arch)
1658 # do we have one now?
1659 try:
1660 self.test_results[trigger][src][arch]
1661 return
1662 except KeyError:
1663 pass
1665 self.request_test_if_not_queued(src, arch, trigger, all_triggers, huge=huge)
1667 def request_test_if_not_queued(
1668 self,
1669 src: str,
1670 arch: str,
1671 trigger: str,
1672 all_triggers: list[str] = [],
1673 huge: bool = False,
1674 ) -> None:
1675 assert self.pending_tests is not None # for type checking
1676 if not all_triggers:
1677 all_triggers = [trigger]
1679 # Don't re-request if it's already pending
1680 arch_dict = self.pending_tests.setdefault(trigger, {}).setdefault(src, {})
1681 if arch in arch_dict.keys():
1682 self.logger.debug(
1683 "Test %s/%s for %s is already pending, not queueing", src, arch, trigger
1684 )
1685 else:
1686 self.logger.debug(
1687 "Requesting %s autopkgtest on %s to verify %s", src, arch, trigger
1688 )
1689 arch_dict[arch] = self._now
1690 self.send_test_request(src, arch, all_triggers, huge=huge)
1692 def result_in_baseline(self, src: str, arch: str) -> list[Any]:
1693 """Get the result for src on arch in the baseline
1695 The baseline is optionally all data or a reference set)
1696 """
1698 # this requires iterating over all cached results and thus is expensive;
1699 # cache the results
1700 try:
1701 return self.result_in_baseline_cache[src][arch]
1702 except KeyError:
1703 pass
1705 result_reference: list[Any] = [Result.NONE, None, "", 0]
1706 if self.options.adt_baseline == "reference":
1707 if src not in self.suite_info.target_suite.sources: 1707 ↛ 1708line 1707 didn't jump to line 1708 because the condition on line 1707 was never true
1708 return result_reference
1710 try:
1711 result_reference = self.test_results[REF_TRIG][src][arch]
1712 self.logger.debug(
1713 "Found result for src %s in reference: %s",
1714 src,
1715 result_reference[0].name,
1716 )
1717 except KeyError:
1718 self.logger.debug(
1719 "Found NO result for src %s in reference: %s",
1720 src,
1721 result_reference[0].name,
1722 )
1723 self.result_in_baseline_cache[src][arch] = deepcopy(result_reference)
1724 return result_reference
1726 result_ever: list[Any] = [Result.FAIL, None, "", 0]
1727 for srcmap in self.test_results.values():
1728 try:
1729 if srcmap[src][arch][0] != Result.FAIL:
1730 result_ever = srcmap[src][arch]
1731 # If we are not looking at a reference run, We don't really
1732 # care about anything except the status, so we're done
1733 # once we find a PASS.
1734 if result_ever[0] == Result.PASS:
1735 break
1736 except KeyError:
1737 pass
1739 self.result_in_baseline_cache[src][arch] = deepcopy(result_ever)
1740 self.logger.debug("Result for src %s ever: %s", src, result_ever[0].name)
1741 return result_ever
1743 def has_test_in_target(self, src: str) -> bool:
1744 test_in_target = False
1745 try:
1746 srcinfo = self.suite_info.target_suite.sources[src]
1747 if "autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo):
1748 test_in_target = True
1749 # AttributeError is only needed for the test suite as
1750 # srcinfo can be a NoneType
1751 except (KeyError, AttributeError):
1752 pass
1754 return test_in_target
1756 def pkg_test_result(
1757 self, src: str, ver: str, arch: str, trigger: str
1758 ) -> tuple[str, str, str | None, str]:
1759 """Get current test status of a particular package
1761 Return (status, real_version, run_id, log_url) tuple; status is a key in
1762 EXCUSES_LABELS. run_id is None if the test is still running.
1763 """
1764 assert self.pending_tests is not None # for type checking
1765 # determine current test result status
1766 run_id = None
1767 try:
1768 r = self.test_results[trigger][src][arch]
1769 ver = r[1]
1770 run_id = r[2]
1772 if r[0] in {Result.FAIL, Result.OLD_FAIL}:
1773 # determine current test result status
1774 baseline_result = self.result_in_baseline(src, arch)[0]
1776 # Special-case triggers from linux-meta*: we cannot compare
1777 # results against different kernels, as e. g. a DKMS module
1778 # might work against the default kernel but fail against a
1779 # different flavor; so for those, ignore the "ever
1780 # passed" check; FIXME: check against trigsrc only
1781 if self.options.adt_baseline != "reference" and (
1782 trigger.startswith("linux-meta") or trigger.startswith("linux/")
1783 ):
1784 baseline_result = Result.FAIL
1786 # Check if the autopkgtest (still) exists in the target suite
1787 test_in_target = self.has_test_in_target(src)
1789 if test_in_target and baseline_result in {
1790 Result.NONE,
1791 Result.OLD_FAIL,
1792 Result.OLD_NEUTRAL,
1793 Result.OLD_PASS,
1794 }:
1795 self.request_test_if_not_queued(src, arch, REF_TRIG)
1797 if self.has_force_badtest(src, ver, arch):
1798 result = "IGNORE-FAIL"
1799 elif not test_in_target:
1800 if self.options.adt_ignore_failure_for_new_tests:
1801 result = "IGNORE-FAIL"
1802 else:
1803 result = r[0].name
1804 elif baseline_result in {Result.FAIL, Result.OLD_FAIL}:
1805 result = "ALWAYSFAIL"
1806 elif baseline_result == Result.NONE: 1806 ↛ 1807line 1806 didn't jump to line 1807 because the condition on line 1806 was never true
1807 result = "RUNNING-REFERENCE"
1808 else:
1809 result = "REGRESSION"
1811 else:
1812 result = r[0].name
1814 url = self.format_log_url(src, arch, run_id)
1815 except KeyError:
1816 # no result for src/arch; still running?
1817 assert arch in self.pending_tests.get(trigger, {}).get(src, {}).keys(), (
1818 "Result for %s/%s/%s (triggered by %s) is neither known nor pending!"
1819 % (src, ver, arch, trigger)
1820 )
1822 if self.has_force_badtest(src, ver, arch):
1823 result = "RUNNING-IGNORE"
1824 else:
1825 if self.has_test_in_target(src):
1826 baseline_result = self.result_in_baseline(src, arch)[0]
1827 if baseline_result == Result.FAIL:
1828 result = "RUNNING-ALWAYSFAIL"
1829 else:
1830 result = "RUNNING"
1831 else:
1832 if self.options.adt_ignore_failure_for_new_tests:
1833 result = "RUNNING-IGNORE"
1834 else:
1835 result = "RUNNING"
1836 url = self.options.adt_ci_url + "status/pending"
1838 return (result, ver, run_id, url)
1840 def has_force_badtest(self, src: str, ver: str, arch: str) -> bool:
1841 """Check if src/ver/arch has a force-badtest hint"""
1843 assert self.hints is not None
1844 hints = self.hints.search("force-badtest", package=src)
1845 if hints:
1846 self.logger.info(
1847 "Checking hints for %s/%s/%s: %s",
1848 src,
1849 arch,
1850 ver,
1851 [str(h) for h in hints],
1852 )
1853 for hint in hints:
1854 if [
1855 mi
1856 for mi in hint.packages
1857 if mi.architecture in ["source", arch]
1858 and (
1859 mi.version is None
1860 or mi.version == "all" # Historical unversioned hint
1861 or apt_pkg.version_compare(ver, mi.version) <= 0
1862 )
1863 ]:
1864 return True
1866 return False
1868 def has_built_on_this_arch_or_is_arch_all(
1869 self, src_data: SourcePackage, arch: str
1870 ) -> bool:
1871 """When a source builds arch:all binaries, those binaries are
1872 added to all architectures and thus the source 'exists'
1873 everywhere. This function checks if the source has any arch
1874 specific binaries on this architecture and if not, if it
1875 has them on any architecture.
1876 """
1877 packages_s_a = self.suite_info.primary_source_suite.binaries[arch]
1878 has_unknown_binary = False
1879 for binary_s in filter_out_faux(src_data.binaries):
1880 try:
1881 binary_u = packages_s_a[binary_s.package_name]
1882 except KeyError:
1883 # src_data.binaries has all the built binaries, so if
1884 # we get here, we know that at least one architecture
1885 # has architecture specific binaries
1886 has_unknown_binary = True
1887 continue
1888 if binary_u.architecture == arch:
1889 return True
1890 # If we get here, we have only seen arch:all packages for this
1891 # arch.
1892 return not has_unknown_binary