Coverage for britney2/policies/autopkgtest.py: 90%
869 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-06-17 09:00 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-06-17 09:00 +0000
1# Copyright (C) 2013 - 2016 Canonical Ltd.
2# Authors:
3# Colin Watson <cjwatson@ubuntu.com>
4# Jean-Baptiste Lallement <jean-baptiste.lallement@canonical.com>
5# Martin Pitt <martin.pitt@ubuntu.com>
7# This program is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
17import calendar
18import collections
19import http.client
20import io
21import itertools
22import json
23import optparse
24import os
25import sys
26import tarfile
27import time
28import urllib.parse
29from collections.abc import Iterator
30from copy import deepcopy
31from enum import Enum
32from functools import lru_cache, total_ordering
33from typing import TYPE_CHECKING, Any, Optional, cast
34from urllib.error import HTTPError
35from urllib.request import urlopen
36from urllib.response import addinfourl
38import apt_pkg
40from britney2 import (
41 BinaryPackageId,
42 PackageId,
43 SourcePackage,
44 SuiteClass,
45 Suites,
46 TargetSuite,
47)
48from britney2.hints import HintAnnotate, HintType
49from britney2.migrationitem import MigrationItem
50from britney2.policies import PolicyVerdict
51from britney2.policies.policy import AbstractBasePolicy
52from britney2.utils import (
53 binaries_from_source_version,
54 filter_out_faux,
55 filter_out_faux_gen,
56 get_dependency_solvers,
57 iter_except,
58 parse_option,
59)
61if TYPE_CHECKING: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 import amqplib.client_0_8 as amqp
64 from ..britney import Britney
65 from ..excuse import Excuse
66 from ..hints import HintParser
69@total_ordering
70class Result(Enum):
71 PASS = 1
72 NEUTRAL = 2
73 FAIL = 3
74 OLD_PASS = 4
75 OLD_NEUTRAL = 5
76 OLD_FAIL = 6
77 NONE = 7
79 def __lt__(self, other: "Result") -> bool:
80 return self.value < other.value
83EXCUSES_LABELS = {
84 "PASS": '<span style="background:#87d96c">Pass</span>',
85 "OLD_PASS": '<span style="background:#87d96c">Pass</span>',
86 "NEUTRAL": "No tests, superficial or marked flaky",
87 "OLD_NEUTRAL": "No tests, superficial or marked flaky",
88 "FAIL": '<span style="background:#ff6666">Failed</span>',
89 "OLD_FAIL": '<span style="background:#ff6666">Failed</span>',
90 "ALWAYSFAIL": '<span style="background:#e5c545">Failed (not a regression)</span>',
91 "REGRESSION": '<span style="background:#ff6666">Regression</span>',
92 "IGNORE-FAIL": '<span style="background:#e5c545">Ignored failure</span>',
93 "RUNNING": '<span style="background:#99ddff">Test triggered</span>',
94 "RUNNING-ALWAYSFAIL": "Test triggered (will not be considered a regression)",
95 "RUNNING-IGNORE": "Test triggered (failure will be ignored)",
96 "RUNNING-REFERENCE": '<span style="background:#ff6666">Reference test triggered, but real test failed already</span>',
97 "DEFERRED": '<span style="background:#99ddff">Test deferred</span>',
98}
100REF_TRIG = "migration-reference/0"
102VERSION_KEY = "britney-autopkgtest-pending-file-version"
105def srchash(src: str) -> str:
106 """archive hash prefix for source package"""
108 if src.startswith("lib"): 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 return src[:4]
110 else:
111 return src[0]
114def added_pkgs_compared_to_target_suite(
115 package_ids: frozenset[BinaryPackageId],
116 target_suite: TargetSuite,
117 *,
118 invert: bool = False,
119) -> Iterator[BinaryPackageId]:
120 if invert: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true
121 pkgs_ids_to_ignore = package_ids - set(
122 target_suite.which_of_these_are_in_the_suite(package_ids)
123 )
124 names_ignored = {p.package_name for p in pkgs_ids_to_ignore}
125 else:
126 names_ignored = {
127 p.package_name
128 for p in target_suite.which_of_these_are_in_the_suite(package_ids)
129 }
130 yield from (p for p in package_ids if p.package_name not in names_ignored)
133def all_leaf_results(
134 test_results: dict[str, dict[str, dict[str, list[Any]]]],
135) -> Iterator[list[Any]]:
136 for trigger in test_results.values():
137 for arch in trigger.values():
138 yield from arch.values()
141def mark_result_as_old(result: Result) -> Result:
142 """Convert current result into corresponding old result"""
144 if result is Result.FAIL:
145 result = Result.OLD_FAIL
146 elif result is Result.PASS:
147 result = Result.OLD_PASS
148 elif result is Result.NEUTRAL: 148 ↛ 150line 148 didn't jump to line 150 because the condition on line 148 was always true
149 result = Result.OLD_NEUTRAL
150 return result
153def concat_bdeps(src_data: SourcePackage) -> str:
154 """Concatenate build_deps_arch and build_deps_indep"""
155 return ",".join(
156 (src_data.build_deps_arch or "", src_data.build_deps_indep or "")
157 ).strip(",")
160class AutopkgtestPolicy(AbstractBasePolicy):
161 """autopkgtest regression policy for source migrations
163 Run autopkgtests for the excuse and all of its reverse dependencies, and
164 reject the upload if any of those regress.
165 """
167 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
168 super().__init__(
169 "autopkgtest", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
170 )
171 # tests requested in this and previous runs
172 # trigger -> src -> [arch]
173 self.pending_tests: dict[str, dict[str, dict[str, int]]] | None = None
174 self.pending_tests_file = os.path.join(
175 self.state_dir, "autopkgtest-pending.json"
176 )
177 self.testsuite_triggers: dict[str, set[str]] = {}
178 self.result_in_baseline_cache: dict[str, dict[str, list[Any]]] = (
179 collections.defaultdict(dict)
180 )
182 self.amqp_file_handle: io.TextIOWrapper | None = None
184 # Default values for this policy's options
185 parse_option(options, "adt_baseline")
186 parse_option(options, "adt_huge", to_int=True)
187 parse_option(options, "adt_ppas")
188 parse_option(options, "adt_reference_max_age", day_to_sec=True)
189 parse_option(options, "adt_pending_max_age", default=5, day_to_sec=True)
190 parse_option(options, "adt_regression_penalty", default=0, to_int=True)
191 parse_option(options, "adt_log_url") # see below for defaults
192 parse_option(options, "adt_retry_url") # see below for defaults
193 parse_option(options, "adt_retry_older_than", day_to_sec=True)
194 parse_option(options, "adt_results_cache_age", day_to_sec=True)
195 parse_option(options, "adt_shared_results_cache")
196 parse_option(options, "adt_success_bounty", default=0, to_int=True)
197 parse_option(options, "adt_ignore_failure_for_new_tests", to_bool=True)
199 # When ADT_RESULTS_CACHE_AGE is smaller than or equal to
200 # ADT_REFERENCE_MAX_AGE old reference result will be removed from cache
201 # before the newly scheduled results are in, potentially causing
202 # additional waiting. For packages like glibc this might cause an
203 # infinite delay as there will always be a package that's
204 # waiting. Similarly for ADT_RETRY_OLDER_THAN.
205 if self.options.adt_results_cache_age <= self.options.adt_reference_max_age:
206 self.logger.warning(
207 "Unexpected: ADT_REFERENCE_MAX_AGE bigger than ADT_RESULTS_CACHE_AGE"
208 )
209 if self.options.adt_results_cache_age <= self.options.adt_retry_older_than:
210 self.logger.warning(
211 "Unexpected: ADT_RETRY_OLDER_THAN bigger than ADT_RESULTS_CACHE_AGE"
212 )
214 if not self.options.adt_log_url: 214 ↛ 240line 214 didn't jump to line 240 because the condition on line 214 was always true
215 # Historical defaults
216 if self.options.adt_swift_url.startswith("file://"):
217 self.options.adt_log_url = os.path.join(
218 self.options.adt_ci_url,
219 "data",
220 "autopkgtest",
221 self.options.series,
222 "{arch}",
223 "{hash}",
224 "{package}",
225 "{run_id}",
226 "log.gz",
227 )
228 else:
229 self.options.adt_log_url = os.path.join(
230 self.options.adt_swift_url,
231 "{swift_container}",
232 self.options.series,
233 "{arch}",
234 "{hash}",
235 "{package}",
236 "{run_id}",
237 "log.gz",
238 )
240 if hasattr(self.options, "adt_retry_url_mech"): 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 self.logger.warning(
242 "The ADT_RETRY_URL_MECH configuration has been deprecated."
243 )
244 self.logger.warning(
245 "Instead britney now supports ADT_RETRY_URL for more flexibility."
246 )
247 if self.options.adt_retry_url:
248 self.logger.error(
249 "Please remove the ADT_RETRY_URL_MECH as ADT_RETRY_URL will be used."
250 )
251 elif self.options.adt_retry_url_mech == "run_id":
252 self.options.adt_retry_url = (
253 self.options.adt_ci_url + "api/v1/retry/{run_id}"
254 )
255 if not self.options.adt_retry_url: 255 ↛ 272line 255 didn't jump to line 272 because the condition on line 255 was always true
256 # Historical default
257 self.options.adt_retry_url = (
258 self.options.adt_ci_url
259 + "request.cgi?"
260 + "release={release}&arch={arch}&package={package}&trigger={trigger}{ppas}"
261 )
263 # results map: trigger -> src -> arch -> [passed, version, run_id, seen]
264 # - trigger is "source/version" of an unstable package that triggered
265 # this test run.
266 # - "passed" is a Result
267 # - "version" is the package version of "src" of that test
268 # - "run_id" is an opaque ID that identifies a particular test run for
269 # a given src/arch.
270 # - "seen" is an approximate time stamp of the test run. How this is
271 # deduced depends on the interface used.
272 self.test_results: dict[str, dict[str, dict[str, list[Any]]]] = {}
273 if self.options.adt_shared_results_cache:
274 self.results_cache_file = self.options.adt_shared_results_cache
275 else:
276 self.results_cache_file = os.path.join(
277 self.state_dir, "autopkgtest-results.cache"
278 )
280 try:
281 self.options.adt_ppas = self.options.adt_ppas.strip().split()
282 except AttributeError:
283 self.options.adt_ppas = []
285 self.swift_container = "autopkgtest-" + options.series
286 if self.options.adt_ppas:
287 self.swift_container += "-" + options.adt_ppas[-1].replace("/", "-")
289 # restrict adt_arches to architectures we actually run for
290 self.adt_arches = []
291 for arch in self.options.adt_arches.split():
292 if arch in self.options.architectures:
293 self.adt_arches.append(arch)
294 else:
295 self.logger.info(
296 "Ignoring ADT_ARCHES %s as it is not in architectures list", arch
297 )
299 def __del__(self) -> None:
300 if self.amqp_file_handle: 300 ↛ exitline 300 didn't return from function '__del__' because the condition on line 300 was always true
301 try:
302 self.amqp_file_handle.close()
303 except AttributeError:
304 pass
306 def register_hints(self, hint_parser: "HintParser") -> None:
307 hint_parser.register_hint_type(
308 HintType(
309 "force-badtest",
310 versioned=HintAnnotate.OPTIONAL,
311 architectured=HintAnnotate.OPTIONAL,
312 )
313 )
314 hint_parser.register_hint_type(HintType("force-skiptest"))
316 def initialise(self, britney: "Britney") -> None:
317 super().initialise(britney)
318 # We want to use the "current" time stamp in multiple locations
319 time_now = round(time.time())
320 if hasattr(self.options, "fake_runtime"):
321 time_now = int(self.options.fake_runtime)
322 self._now = time_now
324 # local copies for better performance
325 parse_src_depends = apt_pkg.parse_src_depends
327 # compute inverse Testsuite-Triggers: map, unifying all series
328 self.logger.info("Building inverse testsuite_triggers map")
329 for suite in self.suite_info:
330 for src, data in suite.sources.items():
331 # for now, let's assume that autodep8 uses builddeps (most do)
332 if (
333 self.has_autodep8(data)
334 and "@builddeps@" not in data.testsuite_triggers
335 ):
336 data.testsuite_triggers.append("@builddeps@")
337 for trigger in data.testsuite_triggers:
338 if trigger == "@builddeps@":
339 for arch in self.adt_arches:
340 for block in parse_src_depends(
341 concat_bdeps(data), True, arch
342 ):
343 self.testsuite_triggers.setdefault(
344 block[0][0], set()
345 ).add(src)
346 else:
347 self.testsuite_triggers.setdefault(trigger, set()).add(src)
348 target_suite_name = self.suite_info.target_suite.name
350 os.makedirs(self.state_dir, exist_ok=True)
351 self.read_pending_tests()
353 # read the cached results that we collected so far
354 if os.path.exists(self.results_cache_file):
355 with open(self.results_cache_file) as f:
356 test_results = json.load(f)
357 self.test_results = self.check_and_upgrade_cache(test_results)
358 self.logger.info("Read previous results from %s", self.results_cache_file)
359 else:
360 self.logger.info(
361 "%s does not exist, re-downloading all results from swift",
362 self.results_cache_file,
363 )
365 # read in the new results
366 if self.options.adt_swift_url.startswith("file://"):
367 debci_file = self.options.adt_swift_url[7:]
368 if os.path.exists(debci_file):
369 with open(debci_file) as f:
370 test_results = json.load(f)
371 self.logger.info("Read new results from %s", debci_file)
372 for res in test_results["results"]:
373 # if there's no date, the test didn't finish yet
374 if res["date"] is None: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true
375 continue
376 test_suite = res["suite"]
377 if test_suite != target_suite_name: 377 ↛ 379line 377 didn't jump to line 379 because the condition on line 377 was never true
378 # not requested for this target suite, so ignore
379 continue
380 triggers = res["trigger"]
381 if triggers is None: 381 ↛ 383line 381 didn't jump to line 383 because the condition on line 381 was never true
382 # not requested for this policy, so ignore
383 continue
384 status = res["status"]
385 if status is None:
386 # still running => pending
387 continue
388 src = res["package"]
389 arch = res["arch"]
390 ver = res["version"]
391 run_id = str(res["run_id"])
392 seen = round(
393 calendar.timegm(
394 time.strptime(res["date"][0:-5], "%Y-%m-%dT%H:%M:%S")
395 )
396 )
397 for trigger in triggers.split():
398 # remove matching test requests
399 self.remove_from_pending(trigger, src, arch, seen)
400 if status == "tmpfail": 400 ↛ 402line 400 didn't jump to line 402 because the condition on line 400 was never true
401 # let's see if we still need it
402 continue
403 self.logger.debug(
404 "Results %s %s %s added", src, trigger, status
405 )
406 self.add_trigger_to_results(
407 trigger,
408 src,
409 ver,
410 arch,
411 run_id,
412 seen,
413 Result[status.upper()],
414 )
415 else:
416 self.logger.info(
417 "%s does not exist, no new data will be processed", debci_file
418 )
420 # The cache can contain results against versions of packages that
421 # are not in any suite anymore. Strip those out, as we don't want
422 # to use those results. Additionally, old references may be
423 # filtered out.
424 if self.options.adt_baseline == "reference":
425 self.filter_old_results()
427 # we need sources, binaries, and installability tester, so for now
428 # remember the whole britney object
429 self.britney = britney
431 # Initialize AMQP connection
432 self.amqp_channel: Optional["amqp.channel.Channel"] = None
433 self.amqp_file_handle = None
434 if self.options.dry_run: 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true
435 return
437 amqp_url = self.options.adt_amqp
439 if amqp_url.startswith("amqp://"): 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true
440 import amqplib.client_0_8 as amqp
442 # depending on the setup we connect to a AMQP server
443 creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False)
444 self.amqp_con = amqp.Connection(
445 creds.hostname, userid=creds.username, password=creds.password
446 )
447 self.amqp_channel = self.amqp_con.channel()
448 self.logger.info("Connected to AMQP server")
449 elif amqp_url.startswith("file://"): 449 ↛ 454line 449 didn't jump to line 454 because the condition on line 449 was always true
450 # or in Debian and in testing mode, adt_amqp will be a file:// URL
451 amqp_file = amqp_url[7:]
452 self.amqp_file_handle = open(amqp_file, "w", 1)
453 else:
454 raise RuntimeError("Unknown ADT_AMQP schema %s" % amqp_url.split(":", 1)[0])
456 def check_and_upgrade_cache(
457 self, test_results: dict[str, dict[str, dict[str, list[Any]]]]
458 ) -> dict[str, dict[str, dict[str, list[Any]]]]:
459 # Drop results older than ADT_RESULTS_CACHE_AGE
461 # Collect keys to delete instead of copying the full list of keys and
462 # changing the dicts on the fly. The lists containing the keys to delete
463 # only reaches the upper bound if all entries are too old.
464 to_delete_trigger = []
465 for trigger, trigger_data in test_results.items():
466 to_delete_pkg = []
467 for pkg, results in trigger_data.items():
468 to_delete_arch = []
469 for arch, arch_result in results.items():
470 arch_result[0] = Result[arch_result[0]]
471 if self._now - arch_result[3] > self.options.adt_results_cache_age: 471 ↛ 472line 471 didn't jump to line 472 because the condition on line 471 was never true
472 to_delete_arch.append(arch)
474 for arch in to_delete_arch: 474 ↛ 475line 474 didn't jump to line 475 because the loop on line 474 never started
475 del results[arch]
476 if not results: 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true
477 to_delete_pkg.append(pkg)
478 for pkg in to_delete_pkg: 478 ↛ 479line 478 didn't jump to line 479 because the loop on line 478 never started
479 del trigger_data[pkg]
480 if not trigger_data: 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true
481 to_delete_trigger.append(trigger)
482 for trigger in to_delete_trigger: 482 ↛ 483line 482 didn't jump to line 483 because the loop on line 482 never started
483 del test_results[trigger]
485 return test_results
487 def filter_old_results(self) -> None:
488 """Remove results for old versions and reference runs from the cache.
490 For now, only delete reference runs. If we delete regular
491 results after a while, packages with lots of triggered tests may
492 never have all the results at the same time."""
494 test_results = self.test_results
496 for trigger, trigger_data in test_results.items():
497 for src, results in trigger_data.items():
498 for arch, result in results.items():
499 if (
500 trigger == REF_TRIG
501 and self._now - result[3] > self.options.adt_reference_max_age
502 ):
503 result[0] = mark_result_as_old(result[0])
504 elif not self.test_version_in_any_suite(src, result[1]):
505 result[0] = mark_result_as_old(result[0])
507 def test_version_in_any_suite(self, src: str, version: str) -> bool:
508 """Check if the mentioned version of src is found in a suite
510 To prevent regressions in the target suite, the result should be
511 from a test with the version of the package in either the source
512 suite or the target suite. The source suite is also valid,
513 because due to versioned test dependencies and Breaks/Conflicts
514 relations, regularly the version in the source suite is used
515 during testing.
516 """
518 versions = {
519 suite.sources[src].version
520 for suite in self.suite_info
521 if src in suite.sources
522 }
524 valid_version = False
525 for ver in versions:
526 if apt_pkg.version_compare(ver, version) == 0:
527 valid_version = True
528 break
530 return valid_version
532 def save_pending_json(self) -> None:
533 # update the pending tests on-disk cache
534 self.logger.info(
535 "Updating pending requested tests in %s" % self.pending_tests_file
536 )
537 # Shallow clone pending_tests as we only modify the toplevel and change its type.
538 pending_tests: dict[str, Any] = {}
539 if self.pending_tests:
540 pending_tests = dict(self.pending_tests)
541 # Avoid adding if there are no pending results at all (eases testing)
542 pending_tests[VERSION_KEY] = 1
543 with open(self.pending_tests_file + ".new", "w") as f:
544 json.dump(pending_tests, f, indent=2)
545 os.rename(self.pending_tests_file + ".new", self.pending_tests_file)
547 def save_state(self, britney: "Britney") -> None:
548 super().save_state(britney)
550 # update the results on-disk cache, unless we are using a r/o shared one
551 if not self.options.adt_shared_results_cache:
552 self.logger.info("Updating results cache")
553 test_results = deepcopy(self.test_results)
554 for result in all_leaf_results(test_results):
555 result[0] = result[0].name
556 with open(self.results_cache_file + ".new", "w") as f:
557 json.dump(test_results, f, indent=2)
558 os.rename(self.results_cache_file + ".new", self.results_cache_file)
560 self.save_pending_json()
562 def format_retry_url(
563 self, run_id: str | None, arch: str, testsrc: str, trigger: str
564 ) -> str:
565 if self.options.adt_ppas:
566 ppas = "&" + urllib.parse.urlencode(
567 [("ppa", p) for p in self.options.adt_ppas]
568 )
569 else:
570 ppas = ""
571 return cast(str, self.options.adt_retry_url).format(
572 run_id=run_id,
573 release=self.options.series,
574 arch=arch,
575 package=testsrc,
576 trigger=urllib.parse.quote_plus(trigger),
577 ppas=ppas,
578 )
580 def format_log_url(self, testsrc: str, arch: str, run_id: str) -> str:
581 return cast(str, self.options.adt_log_url).format(
582 release=self.options.series,
583 swift_container=self.swift_container,
584 hash=srchash(testsrc),
585 package=testsrc,
586 arch=arch,
587 run_id=run_id,
588 )
590 def apply_src_policy_impl(
591 self,
592 tests_info: dict[str, Any],
593 source_data_tdist: SourcePackage | None,
594 source_data_srcdist: SourcePackage,
595 excuse: "Excuse",
596 ) -> PolicyVerdict:
598 # initialize
599 verdict = PolicyVerdict.PASS
600 source_name = excuse.item.package
602 # skip/delay autopkgtests until new package is built somewhere
603 if not binaries_from_source_version(source_data_srcdist, self.suite_info)[0]:
604 self.logger.debug(
605 "%s hasnot been built anywhere, skipping autopkgtest policy",
606 excuse.name,
607 )
608 verdict = PolicyVerdict.REJECTED_TEMPORARILY
609 excuse.add_verdict_info(verdict, "Autopkgtest deferred: missing builds")
611 elif "all" in excuse.missing_builds:
612 self.logger.debug(
613 "%s hasnot been built for arch:all, skipping autopkgtest policy",
614 source_name,
615 )
616 verdict = PolicyVerdict.REJECTED_TEMPORARILY
617 excuse.add_verdict_info(
618 verdict, "Autopkgtest deferred: missing arch:all build"
619 )
621 all_self_tests_pass = False
622 results_info: list[str] = []
623 if not verdict.is_rejected:
624 self.logger.debug("Checking autopkgtests for %s", source_name)
625 trigger = source_name + "/" + source_data_srcdist.version
627 # build a (testsrc, testver) → arch → (status, run_id, log_url) map; we trigger/check test
628 # results per architecture for technical/efficiency reasons, but we
629 # want to evaluate and present the results by tested source package
630 # first
631 pkg_arch_result: dict[
632 tuple[str, str], dict[str, tuple[str, str | None, str]]
633 ] = collections.defaultdict(dict)
634 for arch in self.adt_arches:
635 if arch in excuse.missing_builds:
636 verdict = PolicyVerdict.REJECTED_TEMPORARILY
637 self.logger.debug(
638 "%s hasnot been built on arch %s, delay autopkgtest there",
639 source_name,
640 arch,
641 )
642 excuse.add_verdict_info(
643 verdict,
644 f"Autopkgtest deferred on {arch}: missing arch:{arch} build",
645 )
646 else:
647 verdict = self.check_and_request_arch(
648 excuse,
649 arch,
650 source_data_srcdist,
651 pkg_arch_result,
652 trigger,
653 verdict,
654 )
656 verdict, results_info, all_self_tests_pass = self.process_pkg_arch_results(
657 tests_info, excuse, pkg_arch_result, verdict, trigger
658 )
660 verdict = self.finalize_excuse(
661 excuse, verdict, all_self_tests_pass, results_info
662 )
663 return verdict
665 def apply_srcarch_policy_impl(
666 self,
667 tests_info: dict[str, Any],
668 arch: str,
669 source_data_tdist: SourcePackage | None,
670 source_data_srcdist: SourcePackage,
671 excuse: "Excuse",
672 ) -> PolicyVerdict:
674 assert self.hints is not None # for type checking
675 # initialize
676 verdict = PolicyVerdict.PASS
678 str_excuses = str(excuse.item)
679 self.logger.debug("Checking autopkgtests for binNMU %s/%s", str_excuses, arch)
681 if arch not in self.adt_arches:
682 return verdict
684 # find the binNMU version
685 versions = set()
686 for bin_pkg in source_data_srcdist.binaries:
687 if bin_pkg.architecture == arch:
688 if (
689 len(parts := bin_pkg.version.split("+b")) > 1
690 and parts[-1].isdigit()
691 ):
692 versions.add(parts[-1])
693 else:
694 self.logger.debug(
695 "Version %s doesn't end with '+b#', skipping", bin_pkg.version
696 )
697 if not versions or len(versions) > 1:
698 self.logger.debug("This migration item doesn't look like a binNMU")
699 return verdict
701 trigger = str_excuses + "/" + versions.pop()
703 # While we don't need the arch here, this is common with apply_src_policy_impl()
704 # (testsrc, testver) → arch → (status, run_id, log_url) map
705 pkg_arch_result: dict[
706 tuple[str, str], dict[str, tuple[str, str | None, str]]
707 ] = collections.defaultdict(dict)
709 verdict = self.check_and_request_arch(
710 excuse, arch, source_data_srcdist, pkg_arch_result, trigger, verdict
711 )
713 verdict, results_info, all_self_tests_pass = self.process_pkg_arch_results(
714 tests_info, excuse, pkg_arch_result, verdict, trigger
715 )
717 verdict = self.finalize_excuse(
718 excuse, verdict, all_self_tests_pass, results_info
719 )
720 return verdict
722 def check_and_request_arch(
723 self,
724 excuse: "Excuse",
725 arch: str,
726 source_data_srcdist: SourcePackage,
727 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]],
728 trigger: str,
729 verdict: PolicyVerdict,
730 ) -> PolicyVerdict:
731 """Perform sanity checks and request test/results when they pass"""
733 source_name = excuse.item.package
734 if arch in excuse.policy_info["depends"].get("arch_all_not_installable", []):
735 self.logger.debug(
736 "%s is uninstallable on arch %s (which is allowed), not running autopkgtest there",
737 source_name,
738 arch,
739 )
740 excuse.addinfo(
741 f"Autopkgtest skipped on {arch}: not installable (which is allowed)"
742 )
743 elif arch in excuse.unsatisfiable_on_archs and arch not in excuse.policy_info[
744 "depends"
745 ].get("autopkgtest_run_anyways", []):
746 verdict = PolicyVerdict.REJECTED_TEMPORARILY
747 self.logger.debug(
748 "%s is uninstallable on arch %s, not running autopkgtest there",
749 source_name,
750 arch,
751 )
752 excuse.addinfo(f"Autopkgtest skipped on {arch}: not installable")
753 else:
754 self.request_tests_for_source(
755 arch, source_data_srcdist, pkg_arch_result, excuse, trigger
756 )
758 return verdict
760 def process_pkg_arch_results(
761 self,
762 tests_info: dict[str, Any],
763 excuse: "Excuse",
764 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]],
765 verdict: PolicyVerdict,
766 trigger: str,
767 ) -> tuple[PolicyVerdict, list[str], bool]:
768 """Calculate verdict based on results and render excuse text"""
770 source_name = excuse.item.package
771 all_self_tests_pass = False
772 results_info = []
774 # add test result details to Excuse
775 cloud_url = self.options.adt_ci_url + "packages/%(h)s/%(s)s/%(r)s/%(a)s"
776 testver: str | None
777 for testsrc, testver in sorted(pkg_arch_result):
778 assert testver is not None
779 arch_results = pkg_arch_result[(testsrc, testver)]
780 r = {v[0] for v in arch_results.values()}
781 if r & {"FAIL", "OLD_FAIL", "REGRESSION"}:
782 verdict = PolicyVerdict.REJECTED_PERMANENTLY
783 elif (
784 r & {"DEFERRED", "RUNNING", "RUNNING-REFERENCE"}
785 and not verdict.is_rejected
786 ):
787 verdict = PolicyVerdict.REJECTED_TEMPORARILY
788 # skip version if still running on all arches
789 if not r - {"DEFERRED", "RUNNING", "RUNNING-ALWAYSFAIL", "RUNNING-IGNORE"}:
790 testver = None
792 # A source package is eligible for the bounty if it has tests
793 # of its own that pass on all tested architectures.
794 if testsrc == source_name:
795 excuse.autopkgtest_results = r
796 if r == {"PASS"}:
797 all_self_tests_pass = True
799 if testver:
800 testname = f"{testsrc}/{testver}"
801 else:
802 testname = testsrc
804 html_archmsg = []
805 for arch in sorted(arch_results):
806 status, run_id, log_url = arch_results[arch]
807 artifact_url = None
808 retry_url = None
809 reference_url = None
810 reference_retry_url = None
811 history_url = None
812 if self.options.adt_ppas:
813 if log_url.endswith("log.gz"):
814 artifact_url = log_url.replace("log.gz", "artifacts.tar.gz")
815 else:
816 history_url = cloud_url % {
817 "h": srchash(testsrc),
818 "s": testsrc,
819 "r": self.options.series,
820 "a": arch,
821 }
822 if status not in ("DEFERRED", "PASS", "RUNNING", "RUNNING-IGNORE"):
823 retry_url = self.format_retry_url(run_id, arch, testsrc, trigger)
825 baseline_result = self.result_in_baseline(testsrc, arch)
826 if baseline_result and baseline_result[0] is not Result.NONE:
827 baseline_run_id = str(baseline_result[2])
828 reference_url = self.format_log_url(
829 testsrc, arch, baseline_run_id
830 )
831 if self.options.adt_baseline == "reference":
832 reference_retry_url = self.format_retry_url(
833 baseline_run_id, arch, testsrc, REF_TRIG
834 )
835 tests_info.setdefault(testname, {})[arch] = [
836 status,
837 log_url,
838 history_url,
839 artifact_url,
840 retry_url,
841 ]
843 # render HTML snippet for testsrc entry for current arch
844 if history_url:
845 message = f'<a href="{history_url}">{arch}</a>'
846 else:
847 message = arch
848 message += ': <a href="{}">{}</a>'.format(
849 log_url,
850 EXCUSES_LABELS[status],
851 )
852 if retry_url:
853 message += (
854 '<a href="%s" style="text-decoration: none;"> ♻</a>' % retry_url
855 )
856 if reference_url:
857 message += ' (<a href="%s">reference</a>' % reference_url
858 if reference_retry_url:
859 message += (
860 '<a href="%s" style="text-decoration: none;"> ♻</a>'
861 % reference_retry_url
862 )
863 message += ")"
864 if artifact_url:
865 message += ' <a href="%s">[artifacts]</a>' % artifact_url
866 html_archmsg.append(message)
868 # render HTML line for testsrc entry
869 # - if action is or may be required
870 # - for ones own package
871 if (
872 r
873 - {
874 "PASS",
875 "NEUTRAL",
876 "RUNNING-ALWAYSFAIL",
877 "ALWAYSFAIL",
878 "IGNORE-FAIL",
879 }
880 or testsrc == source_name
881 ):
882 if testver:
883 pkg = '<a href="#{0}">{0}</a>/{1}'.format(testsrc, testver)
884 else:
885 pkg = '<a href="#{0}">{0}</a>'.format(testsrc)
886 results_info.append(
887 "Autopkgtest for {}: {}".format(pkg, ", ".join(html_archmsg))
888 )
890 return (verdict, results_info, all_self_tests_pass)
892 def finalize_excuse(
893 self,
894 excuse: "Excuse",
895 verdict: PolicyVerdict,
896 all_self_tests_pass: bool,
897 results_info: list[str],
898 ) -> PolicyVerdict:
899 """Updates excuses and verdict for hints and bounty/penalty config
901 Given the verdict so far, hints and configuration, the verdict may be
902 updated. Depending of the end verdict, the content of results_info is
903 added as info or as excuse.
904 """
906 package = excuse.item.package
907 version = excuse.item.version
909 assert self.hints is not None # for type checking
910 if verdict.is_rejected:
911 # check for force-skiptest hint
912 if (
913 hint := self.hints.search_first(
914 "force-skiptest",
915 package=package,
916 version=version,
917 )
918 ) is not None:
919 excuse.addreason("skiptest")
920 excuse.addinfo(
921 "Not waiting for autopkgtest results and failures are "
922 f"ignored because of hint by {hint.user}"
923 )
924 verdict = PolicyVerdict.PASS_HINTED
925 else:
926 excuse.addreason("autopkgtest")
928 if (
929 self.options.adt_success_bounty
930 and verdict is PolicyVerdict.PASS
931 and all_self_tests_pass
932 ):
933 excuse.add_bounty("autopkgtest", self.options.adt_success_bounty)
934 if self.options.adt_regression_penalty and verdict in {
935 PolicyVerdict.REJECTED_PERMANENTLY,
936 PolicyVerdict.REJECTED_TEMPORARILY,
937 }:
938 if self.options.adt_regression_penalty > 0: 938 ↛ 941line 938 didn't jump to line 941 because the condition on line 938 was always true
939 excuse.add_penalty("autopkgtest", self.options.adt_regression_penalty)
940 # In case we give penalties instead of blocking, we must always pass
941 verdict = PolicyVerdict.PASS
942 for i in results_info:
943 if verdict.is_rejected:
944 excuse.add_verdict_info(verdict, i)
945 else:
946 excuse.addinfo(i)
948 return verdict
950 @staticmethod
951 def has_autodep8(srcinfo: SourcePackage) -> bool:
952 """Check if package is covered by autodep8
954 srcinfo is an item from self.britney.sources
955 """
956 # autodep8?
957 for t in srcinfo.testsuite:
958 if t.startswith("autopkgtest-pkg"):
959 return True
961 return False
963 def request_tests_for_source(
964 self,
965 arch: str,
966 source_data_srcdist: SourcePackage,
967 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]],
968 excuse: "Excuse",
969 trigger: str,
970 ) -> None:
971 pkg_universe = self.britney.pkg_universe
972 target_suite = self.suite_info.target_suite
973 source_suite = excuse.item.suite
974 sources_t = target_suite.sources
975 sources_s = excuse.item.suite.sources
976 packages_s_a = excuse.item.suite.binaries[arch]
977 source_name = excuse.item.package
978 source_version = source_data_srcdist.version
979 # request tests (unless they were already requested earlier or have a result)
980 tests = self.tests_for_source(source_name, source_version, arch, excuse)
981 is_huge = len(tests) > self.options.adt_huge
983 # local copies for better performance
984 parse_src_depends = apt_pkg.parse_src_depends
986 # Here we figure out what is required from the source suite
987 # for the test to install successfully.
988 #
989 # The ImplicitDependencyPolicy does a similar calculation, but
990 # if I (elbrus) understand correctly, only in the reverse
991 # dependency direction. We are doing something similar here
992 # but in the dependency direction (note: this code is older).
993 # We use the ImplicitDependencyPolicy result for the reverse
994 # dependencies and we keep the code below for the
995 # dependencies. Using the ImplicitDependencyPolicy results
996 # also in the reverse direction seems to require quite some
997 # reorganisation to get that information available here, as in
998 # the current state only the current excuse is available here
999 # and the required other excuses may not be calculated yet.
1000 #
1001 # Loop over all binary packages from trigger and
1002 # recursively look up which *versioned* dependencies are
1003 # only satisfied in the source suite.
1004 #
1005 # For all binaries found, look up which packages they
1006 # break/conflict with in the target suite, but not in the
1007 # source suite. The main reason to do this is to cover test
1008 # dependencies, so we will check Testsuite-Triggers as
1009 # well.
1010 #
1011 # OI: do we need to do the first check in a smart way
1012 # (i.e. only for the packages that are actually going to be
1013 # installed) for the breaks/conflicts set as well, i.e. do
1014 # we need to check if any of the packages that we now
1015 # enforce being from the source suite, actually have new
1016 # versioned depends and new breaks/conflicts.
1017 #
1018 # For all binaries found, add the set of unique source
1019 # packages to the list of triggers.
1021 bin_triggers: set[PackageId] = set()
1022 bin_new = filter_out_faux(source_data_srcdist.binaries)
1023 # For each build-depends block (if any) check if the first alternative
1024 # is satisfiable in the target suite. If not, add it to the initial set
1025 # used for checking.
1026 for block in parse_src_depends(concat_bdeps(source_data_srcdist), True, arch):
1027 if not get_dependency_solvers(
1028 [block[0]],
1029 target_suite.binaries[arch],
1030 target_suite.provides_table[arch],
1031 build_depends=True,
1032 ) and (
1033 solvers := get_dependency_solvers(
1034 [block[0]],
1035 packages_s_a,
1036 source_suite.provides_table[arch],
1037 build_depends=True,
1038 )
1039 ):
1040 bin_new.add(solvers[0].pkg_id)
1041 for n_binary in iter_except(bin_new.pop, KeyError):
1042 if n_binary in bin_triggers:
1043 continue
1044 bin_triggers.add(n_binary)
1046 # Check if there is a dependency that is not
1047 # available in the target suite.
1048 # We add slightly too much here, because new binaries
1049 # will also show up, but they are already properly
1050 # installed. Nevermind.
1051 depends = pkg_universe.dependencies_of(n_binary)
1052 # depends is a frozenset{frozenset{BinaryPackageId, ..}}
1053 for deps_of_bin in depends:
1054 if target_suite.any_of_these_are_in_the_suite(deps_of_bin):
1055 # if any of the alternative dependencies is already
1056 # satisfied in the target suite, we can just ignore it
1057 continue
1058 # We'll figure out which version later
1059 bin_new.update(
1060 added_pkgs_compared_to_target_suite(deps_of_bin, target_suite)
1061 )
1063 # Check if the package breaks/conflicts anything. We might
1064 # be adding slightly too many source packages due to the
1065 # check here as a binary package that is broken may be
1066 # coming from a different source package in the source
1067 # suite. Nevermind.
1068 bin_broken = set()
1069 for t_binary in bin_triggers:
1070 # broken is a frozenset{BinaryPackageId, ..}
1071 broken = pkg_universe.negative_dependencies_of(
1072 cast(BinaryPackageId, t_binary)
1073 )
1074 broken_in_target = {
1075 p.package_name
1076 for p in target_suite.which_of_these_are_in_the_suite(broken)
1077 }
1078 broken_in_source = {
1079 p.package_name
1080 for p in source_suite.which_of_these_are_in_the_suite(broken)
1081 }
1082 # We want packages with a newer version in the source suite that
1083 # no longer has the conflict. This is an approximation
1084 broken_filtered = {
1085 p
1086 for p in broken
1087 if p.package_name in broken_in_target
1088 and p.package_name not in broken_in_source
1089 }
1090 # We add the version in the target suite, but the code below will
1091 # change it to the version in the source suite
1092 bin_broken.update(broken_filtered)
1093 bin_triggers.update(bin_broken)
1095 # The ImplicitDependencyPolicy also found packages that need
1096 # to migrate together, so add them to the triggers too.
1097 for bin_implicit in excuse.depends_packages_flattened:
1098 if bin_implicit.architecture == arch:
1099 bin_triggers.add(bin_implicit)
1101 triggers = set()
1102 for t_binary2 in bin_triggers:
1103 if t_binary2.architecture == arch:
1104 try:
1105 source_of_bin = packages_s_a[t_binary2.package_name].source
1106 # If the version in the target suite is the same, don't add a trigger.
1107 # Note that we looked up the source package in the source suite.
1108 # If it were a different source package in the target suite, however, then
1109 # we would not have this source package in the same version anyway.
1110 #
1111 # binNMU's exist, so let's also check if t_binary2 exists
1112 # in the target suite if the sources are the same.
1113 if (
1114 sources_t.get(source_of_bin, None) is None
1115 or sources_s[source_of_bin].version
1116 != sources_t[source_of_bin].version
1117 or not target_suite.any_of_these_are_in_the_suite(
1118 {cast(BinaryPackageId, t_binary2)}
1119 )
1120 ):
1121 triggers.add(
1122 source_of_bin + "/" + sources_s[source_of_bin].version
1123 )
1124 except KeyError:
1125 # Apparently the package was removed from
1126 # unstable e.g. if packages are replaced
1127 # (e.g. -dbg to -dbgsym)
1128 pass
1129 if t_binary2 not in source_data_srcdist.binaries:
1130 for tdep_src in self.testsuite_triggers.get(
1131 t_binary2.package_name, set()
1132 ):
1133 try:
1134 # Only add trigger if versions in the target and source suites are different
1135 if ( 1135 ↛ 1130line 1135 didn't jump to line 1130
1136 sources_t.get(tdep_src, None) is None
1137 or sources_s[tdep_src].version
1138 != sources_t[tdep_src].version
1139 ):
1140 triggers.add(
1141 tdep_src + "/" + sources_s[tdep_src].version
1142 )
1143 except KeyError:
1144 # Apparently the source was removed from
1145 # unstable (testsuite_triggers are unified
1146 # over all suites)
1147 pass
1148 source_trigger = source_name + "/" + source_version
1149 triggers.discard(source_trigger)
1150 triggers_list = sorted(list(triggers))
1151 triggers_list.insert(0, trigger)
1153 impl_pids = excuse.policy_info.get("implicit-deps", {}).get(
1154 "broken-binaries", []
1155 )
1156 for testsrc, testver in tests:
1157 # Not if binaries from testsrc are not installable
1158 skip = False
1159 for bpid_s in impl_pids:
1160 bpid = BinaryPackageId(*bpid_s.split("/"))
1161 if (
1162 bpid.architecture == arch
1163 and testsrc == target_suite.all_binaries_in_suite[bpid].source
1164 ):
1165 skip = True
1166 break
1167 if skip:
1168 pkg_arch_result[(testsrc, testver)][arch] = ("DEFERRED", None, "")
1169 else:
1170 self.pkg_test_request(testsrc, arch, triggers_list, huge=is_huge)
1171 result, real_ver, run_id, url = self.pkg_test_result(
1172 testsrc, testver, arch, trigger
1173 )
1174 pkg_arch_result[(testsrc, real_ver)][arch] = (result, run_id, url)
1176 def tests_for_source(
1177 self, src: str, ver: str, arch: str, excuse: "Excuse"
1178 ) -> list[tuple[str, str]]:
1179 """Iterate over all tests that should be run for given source and arch"""
1181 source_suite = self.suite_info.primary_source_suite
1182 target_suite = self.suite_info.target_suite
1183 sources_info = target_suite.sources
1184 binaries_info = target_suite.binaries[arch]
1186 reported_pkgs = set()
1188 tests = []
1190 # Debian doesn't have linux-meta, but Ubuntu does
1191 # for linux themselves we don't want to trigger tests -- these should
1192 # all come from linux-meta*. A new kernel ABI without a corresponding
1193 # -meta won't be installed and thus we can't sensibly run tests against
1194 # it.
1195 if ( 1195 ↛ 1199line 1195 didn't jump to line 1199
1196 src.startswith("linux")
1197 and src.replace("linux", "linux-meta") in sources_info
1198 ):
1199 return []
1201 # we want to test the package itself, if it still has a test in unstable
1202 # but only if the package actually exists on this arch
1203 srcinfo = source_suite.sources[src]
1204 if ("autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo)) and len(
1205 excuse.packages[arch]
1206 ) > 0:
1207 reported_pkgs.add(src)
1208 tests.append((src, ver))
1210 extra_bins = []
1211 # Debian doesn't have linux-meta, but Ubuntu does
1212 # Hack: For new kernels trigger all DKMS packages by pretending that
1213 # linux-meta* builds a "dkms" binary as well. With that we ensure that we
1214 # don't regress DKMS drivers with new kernel versions.
1215 if src.startswith("linux-meta"):
1216 # does this have any image on this arch?
1217 for pkg_id in srcinfo.binaries:
1218 if pkg_id.architecture == arch and "-image" in pkg_id.package_name:
1219 try:
1220 extra_bins.append(binaries_info["dkms"].pkg_id)
1221 except KeyError:
1222 pass
1224 if not self.has_built_on_this_arch_or_is_arch_all(srcinfo, arch):
1225 return []
1227 pkg_universe = self.britney.pkg_universe
1228 # plus all direct reverse dependencies and test triggers of its
1229 # binaries which have an autopkgtest
1230 for binary in itertools.chain(srcinfo.binaries, extra_bins):
1231 for rdep in filter_out_faux_gen(
1232 pkg_universe.reverse_dependencies_of(binary)
1233 ):
1234 try:
1235 rdep_src = binaries_info[rdep.package_name].source
1236 # Don't re-trigger the package itself here; this should
1237 # have been done above if the package still continues to
1238 # have an autopkgtest in unstable.
1239 if rdep_src == src:
1240 continue
1241 except KeyError:
1242 continue
1244 rdep_src_info = sources_info[rdep_src]
1245 if "autopkgtest" in rdep_src_info.testsuite or self.has_autodep8(
1246 rdep_src_info
1247 ):
1248 if rdep_src not in reported_pkgs:
1249 tests.append((rdep_src, rdep_src_info.version))
1250 reported_pkgs.add(rdep_src)
1252 for tdep_src in self.testsuite_triggers.get(binary.package_name, set()):
1253 if tdep_src not in reported_pkgs:
1254 try:
1255 tdep_src_info = sources_info[tdep_src]
1256 except KeyError:
1257 continue
1258 if "autopkgtest" in tdep_src_info.testsuite or self.has_autodep8( 1258 ↛ 1252line 1258 didn't jump to line 1252 because the condition on line 1258 was always true
1259 tdep_src_info
1260 ):
1261 for pkg_id in tdep_src_info.binaries: 1261 ↛ 1252line 1261 didn't jump to line 1252 because the loop on line 1261 didn't complete
1262 if pkg_id.architecture == arch:
1263 tests.append((tdep_src, tdep_src_info.version))
1264 reported_pkgs.add(tdep_src)
1265 break
1267 tests.sort(key=lambda s_v: s_v[0])
1268 return tests
1270 def read_pending_tests(self) -> None:
1271 """Read pending test requests from previous britney runs
1273 Initialize self.pending_tests with that data.
1274 """
1275 assert self.pending_tests is None, "already initialized"
1276 if not os.path.exists(self.pending_tests_file):
1277 self.logger.info(
1278 "No %s, starting with no pending tests", self.pending_tests_file
1279 )
1280 self.pending_tests = {}
1281 return
1282 with open(self.pending_tests_file) as f:
1283 self.pending_tests = json.load(f)
1284 if VERSION_KEY in self.pending_tests:
1285 del self.pending_tests[VERSION_KEY]
1287 # Collect keys to delete instead of copying the full list of keys and
1288 # changing the dicts on the fly. The lists containing the keys to delete
1289 # only reaches the upper bound if all entries are too old.
1290 to_delete_trigger = []
1291 for trigger, trigger_results in self.pending_tests.items():
1292 to_delete_pkg = []
1293 for pkg, arch_dict in trigger_results.items():
1294 to_delete_arch = []
1295 for arch, pending_test in arch_dict.items():
1296 if self._now - pending_test > self.options.adt_pending_max_age:
1297 to_delete_arch.append(arch)
1299 for key in to_delete_arch:
1300 del arch_dict[key]
1301 if not arch_dict:
1302 to_delete_pkg.append(pkg)
1303 for key in to_delete_pkg:
1304 del trigger_results[key]
1305 if not trigger_results:
1306 to_delete_trigger.append(trigger)
1307 for key in to_delete_trigger:
1308 del self.pending_tests[key]
1309 else:
1310 # Migration code:
1311 for trigger_data in self.pending_tests.values(): 1311 ↛ 1312line 1311 didn't jump to line 1312 because the loop on line 1311 never started
1312 for pkg, arch_list in trigger_data.items():
1313 trigger_data[pkg] = {}
1314 for arch in arch_list:
1315 trigger_data[pkg][arch] = self._now
1317 self.logger.info(
1318 "Read pending requested tests from %s", self.pending_tests_file
1319 )
1320 self.logger.debug("%s", self.pending_tests)
1322 # this requires iterating over all triggers and thus is expensive;
1323 # cache the results
1324 @lru_cache(None)
1325 def latest_run_for_package(self, src: str, arch: str) -> str:
1326 """Return latest run ID for src on arch"""
1328 latest_run_id = ""
1329 for srcmap in self.test_results.values():
1330 try:
1331 run_id = srcmap[src][arch][2]
1332 except KeyError:
1333 continue
1334 if run_id > latest_run_id:
1335 latest_run_id = run_id
1336 return latest_run_id
1338 def urlopen_retry(self, url: str) -> http.client.HTTPResponse | addinfourl:
1339 """A urlopen() that retries on time outs or errors"""
1341 exc: Exception
1342 for retry in range(5): 1342 ↛ 1363line 1342 didn't jump to line 1363 because the loop on line 1342 didn't complete
1343 try:
1344 req = urlopen(url, timeout=30)
1345 code = req.getcode()
1346 if not code or 200 <= code < 300: 1346 ↛ 1342line 1346 didn't jump to line 1342 because the condition on line 1346 was always true
1347 return req # type: ignore[no-any-return]
1348 except TimeoutError as e: 1348 ↛ 1349line 1348 didn't jump to line 1349 because the exception caught by line 1348 didn't happen
1349 self.logger.info(
1350 "Timeout downloading '%s', will retry %d more times."
1351 % (url, 5 - retry - 1)
1352 )
1353 exc = e
1354 except HTTPError as e:
1355 if e.code not in (503, 502): 1355 ↛ 1357line 1355 didn't jump to line 1357 because the condition on line 1355 was always true
1356 raise
1357 self.logger.info(
1358 "Caught error %d downloading '%s', will retry %d more times."
1359 % (e.code, url, 5 - retry - 1)
1360 )
1361 exc = e
1362 else:
1363 raise exc
1365 @lru_cache(None)
1366 def fetch_swift_results(self, swift_url: str, src: str, arch: str) -> None:
1367 """Download new results for source package/arch from swift"""
1369 # prepare query: get all runs with a timestamp later than the latest
1370 # run_id for this package/arch; '@' is at the end of each run id, to
1371 # mark the end of a test run directory path
1372 # example: <autopkgtest-wily>wily/amd64/libp/libpng/20150630_054517@/result.tar
1373 query = {
1374 "delimiter": "@",
1375 "prefix": f"{self.options.series}/{arch}/{srchash(src)}/{src}/",
1376 }
1378 # determine latest run_id from results
1379 if not self.options.adt_shared_results_cache:
1380 latest_run_id = self.latest_run_for_package(src, arch)
1381 if latest_run_id:
1382 query["marker"] = query["prefix"] + latest_run_id
1384 # request new results from swift
1385 url = os.path.join(swift_url, self.swift_container)
1386 url += "?" + urllib.parse.urlencode(query)
1387 f = None
1388 try:
1389 f = self.urlopen_retry(url)
1390 if f.getcode() == 200:
1391 result_paths = f.read().decode().strip().splitlines()
1392 elif f.getcode() == 204: # No content 1392 ↛ 1398line 1392 didn't jump to line 1398 because the condition on line 1392 was always true
1393 result_paths = []
1394 else:
1395 # we should not ever end up here as we expect a HTTPError in
1396 # other cases; e. g. 3XX is something that tells us to adjust
1397 # our URLS, so fail hard on those
1398 raise NotImplementedError(
1399 "fetch_swift_results(%s): cannot handle HTTP code %r"
1400 % (url, f.getcode())
1401 )
1402 except OSError as e:
1403 # 401 "Unauthorized" is swift's way of saying "container does not exist"
1404 if getattr(e, "code", -1) == 401: 1404 ↛ 1413line 1404 didn't jump to line 1413 because the condition on line 1404 was always true
1405 self.logger.info(
1406 "fetch_swift_results: %s does not exist yet or is inaccessible", url
1407 )
1408 return
1409 # Other status codes are usually a transient
1410 # network/infrastructure failure. Ignoring this can lead to
1411 # re-requesting tests which we already have results for, so
1412 # fail hard on this and let the next run retry.
1413 self.logger.error("Failure to fetch swift results from %s: %s", url, str(e))
1414 sys.exit(1)
1415 finally:
1416 if f is not None: 1416 ↛ 1419line 1416 didn't jump to line 1419 because the condition on line 1416 was always true
1417 f.close() 1417 ↛ exitline 1417 didn't return from function 'fetch_swift_results' because the return on line 1408 wasn't executed
1419 for p in result_paths:
1420 self.fetch_one_result(
1421 os.path.join(swift_url, self.swift_container, p, "result.tar"),
1422 src,
1423 arch,
1424 )
1426 def fetch_one_result(self, url: str, src: str, arch: str) -> None:
1427 """Download one result URL for source/arch
1429 Remove matching pending_tests entries.
1430 """
1431 f = None
1432 try:
1433 f = self.urlopen_retry(url)
1434 if f.getcode() == 200: 1434 ↛ 1437line 1434 didn't jump to line 1437 because the condition on line 1434 was always true
1435 tar_bytes = io.BytesIO(f.read())
1436 else:
1437 raise NotImplementedError(
1438 "fetch_one_result(%s): cannot handle HTTP code %r"
1439 % (url, f.getcode())
1440 )
1441 except OSError as err:
1442 self.logger.error("Failure to fetch %s: %s", url, str(err))
1443 # we tolerate "not found" (something went wrong on uploading the
1444 # result), but other things indicate infrastructure problems
1445 if getattr(err, "code", -1) == 404:
1446 return
1447 sys.exit(1)
1448 finally:
1449 if f is not None: 1449 ↛ exit, 1449 ↛ 14512 missed branches: 1) line 1449 didn't return from function 'fetch_one_result' because the return on line 1446 wasn't executed, 2) line 1449 didn't jump to line 1451 because the condition on line 1449 was always true
1450 f.close() 1450 ↛ exitline 1450 didn't return from function 'fetch_one_result' because the return on line 1446 wasn't executed
1451 try:
1452 with tarfile.open(None, "r", tar_bytes) as tar:
1453 exitcode = int(tar.extractfile("exitcode").read().strip()) # type: ignore[union-attr]
1454 srcver = tar.extractfile("testpkg-version").read().decode().strip() # type: ignore[union-attr]
1455 ressrc, ver = srcver.split()
1456 testinfo = json.loads(tar.extractfile("testinfo.json").read().decode()) # type: ignore[union-attr]
1457 except (KeyError, ValueError, tarfile.TarError) as err:
1458 self.logger.error("%s is damaged, ignoring: %s", url, str(err))
1459 # ignore this; this will leave an orphaned request in autopkgtest-pending.json
1460 # and thus require manual retries after fixing the tmpfail, but we
1461 # can't just blindly attribute it to some pending test.
1462 return
1464 if src != ressrc: 1464 ↛ 1465line 1464 didn't jump to line 1465 because the condition on line 1464 was never true
1465 self.logger.error(
1466 "%s is a result for package %s, but expected package %s",
1467 url,
1468 ressrc,
1469 src,
1470 )
1471 return
1473 # parse recorded triggers in test result
1474 for e in testinfo.get("custom_environment", []): 1474 ↛ 1479line 1474 didn't jump to line 1479 because the loop on line 1474 didn't complete
1475 if e.startswith("ADT_TEST_TRIGGERS="): 1475 ↛ 1474line 1475 didn't jump to line 1474 because the condition on line 1475 was always true
1476 result_triggers = [i for i in e.split("=", 1)[1].split() if "/" in i]
1477 break
1478 else:
1479 self.logger.error("%s result has no ADT_TEST_TRIGGERS, ignoring")
1480 return
1482 run_id = os.path.basename(os.path.dirname(url))
1483 seen = round(calendar.timegm(time.strptime(run_id, "%Y%m%d_%H%M%S@")))
1484 # allow some skipped tests, but nothing else
1485 if exitcode in [0, 2]:
1486 result = Result.PASS
1487 elif exitcode == 8: 1487 ↛ 1488line 1487 didn't jump to line 1488 because the condition on line 1487 was never true
1488 result = Result.NEUTRAL
1489 else:
1490 result = Result.FAIL
1492 self.logger.info(
1493 "Fetched test result for %s/%s/%s %s (triggers: %s): %s",
1494 src,
1495 ver,
1496 arch,
1497 run_id,
1498 result_triggers,
1499 result.name.lower(),
1500 )
1502 # remove matching test requests
1503 for trigger in result_triggers:
1504 self.remove_from_pending(trigger, src, arch)
1506 # add this result
1507 for trigger in result_triggers:
1508 self.add_trigger_to_results(trigger, src, ver, arch, run_id, seen, result)
1510 def remove_from_pending(
1511 self, trigger: str, src: str, arch: str, timestamp: int = sys.maxsize
1512 ) -> None:
1513 assert self.pending_tests is not None # for type checking
1514 try:
1515 arch_dict = self.pending_tests[trigger][src]
1516 if timestamp < arch_dict[arch]:
1517 # The result is from before the moment of scheduling, so it's
1518 # not the one we're waiting for
1519 return
1520 del arch_dict[arch]
1521 if not arch_dict:
1522 del self.pending_tests[trigger][src]
1523 if not self.pending_tests[trigger]:
1524 del self.pending_tests[trigger]
1525 self.logger.debug(
1526 "-> matches pending request %s/%s for trigger %s", src, arch, trigger
1527 )
1528 except KeyError:
1529 self.logger.debug(
1530 "-> does not match any pending request for %s/%s", src, arch
1531 )
1533 def add_trigger_to_results(
1534 self,
1535 trigger: str,
1536 src: str,
1537 ver: str,
1538 arch: str,
1539 run_id: str,
1540 timestamp: int,
1541 status_to_add: Result,
1542 ) -> None:
1543 # Ensure that we got a new enough version
1544 parts = trigger.split("/")
1545 match len(parts):
1546 case 2:
1547 trigsrc, trigver = parts
1548 case 4: 1548 ↛ 1550line 1548 didn't jump to line 1550 because the pattern on line 1548 always matched
1549 trigsrc, trigarch, trigver, rebuild = parts
1550 case _:
1551 self.logger.info("Ignoring invalid test trigger %s", trigger)
1552 return
1553 if trigsrc == src and apt_pkg.version_compare(ver, trigver) < 0: 1553 ↛ 1554line 1553 didn't jump to line 1554 because the condition on line 1553 was never true
1554 self.logger.debug(
1555 "test trigger %s, but run for older version %s, ignoring", trigger, ver
1556 )
1557 return
1559 stored_result = (
1560 self.test_results.setdefault(trigger, {})
1561 .setdefault(src, {})
1562 .setdefault(arch, [Result.FAIL, None, "", 0])
1563 )
1565 # reruns shouldn't flip the result from PASS or NEUTRAL to
1566 # FAIL, so remember the most recent version of the best result
1567 # we've seen. Except for reference updates, which we always
1568 # want to update with the most recent result. The result data
1569 # may not be ordered by timestamp, so we need to check time.
1570 update = False
1571 if self.options.adt_baseline == "reference" and trigger == REF_TRIG:
1572 if stored_result[3] < timestamp:
1573 update = True
1574 elif status_to_add < stored_result[0]:
1575 update = True
1576 elif status_to_add == stored_result[0] and stored_result[3] < timestamp:
1577 update = True
1579 if update:
1580 stored_result[0] = status_to_add
1581 stored_result[1] = ver
1582 stored_result[2] = run_id
1583 stored_result[3] = timestamp
1585 def send_test_request(
1586 self, src: str, arch: str, triggers: list[str], huge: bool = False
1587 ) -> None:
1588 """Send out AMQP request for testing src/arch for triggers
1590 If huge is true, then the request will be put into the -huge instead of
1591 normal queue.
1592 """
1593 if self.options.dry_run: 1593 ↛ 1594line 1593 didn't jump to line 1594 because the condition on line 1593 was never true
1594 return
1596 params: dict[str, Any] = {"triggers": triggers}
1597 if self.options.adt_ppas:
1598 params["ppas"] = self.options.adt_ppas
1599 qname = f"debci-ppa-{self.options.series}-{arch}"
1600 elif huge:
1601 qname = f"debci-huge-{self.options.series}-{arch}"
1602 else:
1603 qname = f"debci-{self.options.series}-{arch}"
1604 params["submit-time"] = time.strftime("%Y-%m-%d %H:%M:%S%z", time.gmtime())
1606 if self.amqp_channel: 1606 ↛ 1607line 1606 didn't jump to line 1607 because the condition on line 1606 was never true
1607 self.amqp_channel.basic_publish(
1608 amqp.Message(
1609 src + "\n" + json.dumps(params), delivery_mode=2
1610 ), # persistent
1611 routing_key=qname,
1612 )
1613 # we save pending.json with every request, so that if britney
1614 # crashes we don't re-request tests. This is only needed when using
1615 # real amqp, as with file-based submission the pending tests are
1616 # returned by debci along with the results each run.
1617 self.save_pending_json()
1618 else:
1619 # for file-based submission, triggers are space separated
1620 params["triggers"] = [" ".join(params["triggers"])]
1621 assert self.amqp_file_handle
1622 self.amqp_file_handle.write(f"{qname}:{src} {json.dumps(params)}\n")
1624 def pkg_test_request(
1625 self, src: str, arch: str, all_triggers: list[str], huge: bool = False
1626 ) -> None:
1627 """Request one package test for a set of triggers
1629 all_triggers is a list of "pkgname/version". These are the packages
1630 that will be taken from the source suite. The first package in this
1631 list is the package that triggers the testing of src, the rest are
1632 additional packages required for installability of the test deps. If
1633 huge is true, then the request will be put into the -huge instead of
1634 normal queue.
1636 This will only be done if that test wasn't already requested in
1637 a previous run (i. e. if it's not already in self.pending_tests)
1638 or if there is already a fresh or a positive result for it. This
1639 ensures to download current results for this package before
1640 requesting any test."""
1641 trigger = all_triggers[0]
1642 uses_swift = not self.options.adt_swift_url.startswith("file://")
1643 try:
1644 result = self.test_results[trigger][src][arch]
1645 has_result = True
1646 except KeyError:
1647 has_result = False
1649 if has_result:
1650 result_state = result[0]
1651 if result_state in {Result.OLD_PASS, Result.OLD_FAIL, Result.OLD_NEUTRAL}:
1652 pass
1653 elif (
1654 result_state is Result.FAIL
1655 and self.result_in_baseline(src, arch)[0]
1656 in {Result.PASS, Result.NEUTRAL, Result.OLD_PASS, Result.OLD_NEUTRAL}
1657 and self._now - result[3] > self.options.adt_retry_older_than
1658 ):
1659 # We might want to retry this failure, so continue
1660 pass
1661 elif not uses_swift:
1662 # We're done if we don't retrigger and we're not using swift
1663 return
1664 elif result_state in {Result.PASS, Result.NEUTRAL}:
1665 self.logger.debug(
1666 "%s/%s triggered by %s already known", src, arch, trigger
1667 )
1668 return
1670 # Without swift we don't expect new results
1671 if uses_swift:
1672 self.logger.info(
1673 "Checking for new results for failed %s/%s for trigger %s",
1674 src,
1675 arch,
1676 trigger,
1677 )
1678 self.fetch_swift_results(self.options.adt_swift_url, src, arch)
1679 # do we have one now?
1680 try:
1681 self.test_results[trigger][src][arch]
1682 return
1683 except KeyError:
1684 pass
1686 self.request_test_if_not_queued(src, arch, trigger, all_triggers, huge=huge)
1688 def request_test_if_not_queued(
1689 self,
1690 src: str,
1691 arch: str,
1692 trigger: str,
1693 all_triggers: list[str] = [],
1694 huge: bool = False,
1695 ) -> None:
1696 assert self.pending_tests is not None # for type checking
1697 if not all_triggers:
1698 all_triggers = [trigger]
1700 # Don't re-request if it's already pending
1701 arch_dict = self.pending_tests.setdefault(trigger, {}).setdefault(src, {})
1702 if arch in arch_dict.keys():
1703 self.logger.debug(
1704 "Test %s/%s for %s is already pending, not queueing", src, arch, trigger
1705 )
1706 else:
1707 self.logger.debug(
1708 "Requesting %s autopkgtest on %s to verify %s", src, arch, trigger
1709 )
1710 arch_dict[arch] = self._now
1711 self.send_test_request(src, arch, all_triggers, huge=huge)
1713 def result_in_baseline(self, src: str, arch: str) -> list[Any]:
1714 """Get the result for src on arch in the baseline
1716 The baseline is optionally all data or a reference set)
1717 """
1719 # this requires iterating over all cached results and thus is expensive;
1720 # cache the results
1721 try:
1722 return self.result_in_baseline_cache[src][arch]
1723 except KeyError:
1724 pass
1726 result_reference: list[Any] = [Result.NONE, None, "", 0]
1727 if self.options.adt_baseline == "reference":
1728 if src not in self.suite_info.target_suite.sources: 1728 ↛ 1729line 1728 didn't jump to line 1729 because the condition on line 1728 was never true
1729 return result_reference
1731 try:
1732 result_reference = self.test_results[REF_TRIG][src][arch]
1733 self.logger.debug(
1734 "Found result for src %s in reference: %s",
1735 src,
1736 result_reference[0].name,
1737 )
1738 except KeyError:
1739 self.logger.debug(
1740 "Found NO result for src %s in reference: %s",
1741 src,
1742 result_reference[0].name,
1743 )
1744 self.result_in_baseline_cache[src][arch] = deepcopy(result_reference)
1745 return result_reference
1747 result_ever: list[Any] = [Result.FAIL, None, "", 0]
1748 for srcmap in self.test_results.values():
1749 try:
1750 if srcmap[src][arch][0] is not Result.FAIL:
1751 result_ever = srcmap[src][arch]
1752 # If we are not looking at a reference run, We don't really
1753 # care about anything except the status, so we're done
1754 # once we find a PASS.
1755 if result_ever[0] is Result.PASS:
1756 break
1757 except KeyError:
1758 pass
1760 self.result_in_baseline_cache[src][arch] = deepcopy(result_ever)
1761 self.logger.debug("Result for src %s ever: %s", src, result_ever[0].name)
1762 return result_ever
1764 def has_test_in_target(self, src: str) -> bool:
1765 test_in_target = False
1766 try:
1767 srcinfo = self.suite_info.target_suite.sources[src]
1768 if "autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo):
1769 test_in_target = True
1770 # AttributeError is only needed for the test suite as
1771 # srcinfo can be a NoneType
1772 except (KeyError, AttributeError):
1773 pass
1775 return test_in_target
1777 def pkg_test_result(
1778 self, src: str, ver: str, arch: str, trigger: str
1779 ) -> tuple[str, str, str | None, str]:
1780 """Get current test status of a particular package
1782 Return (status, real_version, run_id, log_url) tuple; status is a key in
1783 EXCUSES_LABELS. run_id is None if the test is still running.
1784 """
1785 assert self.pending_tests is not None # for type checking
1786 # determine current test result status
1787 run_id = None
1788 try:
1789 r = self.test_results[trigger][src][arch]
1790 ver = r[1]
1791 run_id = r[2]
1793 if r[0] in {Result.FAIL, Result.OLD_FAIL}:
1794 # determine current test result status
1795 baseline_result = self.result_in_baseline(src, arch)[0]
1797 # Special-case triggers from linux-meta*: we cannot compare
1798 # results against different kernels, as e. g. a DKMS module
1799 # might work against the default kernel but fail against a
1800 # different flavor; so for those, ignore the "ever
1801 # passed" check; FIXME: check against trigsrc only
1802 if self.options.adt_baseline != "reference" and (
1803 trigger.startswith("linux-meta") or trigger.startswith("linux/")
1804 ):
1805 baseline_result = Result.FAIL
1807 # Check if the autopkgtest (still) exists in the target suite
1808 test_in_target = self.has_test_in_target(src)
1810 if test_in_target and baseline_result in {
1811 Result.NONE,
1812 Result.OLD_FAIL,
1813 Result.OLD_NEUTRAL,
1814 Result.OLD_PASS,
1815 }:
1816 self.request_test_if_not_queued(src, arch, REF_TRIG)
1818 if self.has_force_badtest(src, ver, arch):
1819 result = "IGNORE-FAIL"
1820 elif not test_in_target:
1821 if self.options.adt_ignore_failure_for_new_tests:
1822 result = "IGNORE-FAIL"
1823 else:
1824 result = r[0].name
1825 elif baseline_result in {Result.FAIL, Result.OLD_FAIL}:
1826 result = "ALWAYSFAIL"
1827 elif baseline_result is Result.NONE: 1827 ↛ 1828line 1827 didn't jump to line 1828 because the condition on line 1827 was never true
1828 result = "RUNNING-REFERENCE"
1829 else:
1830 result = "REGRESSION"
1832 else:
1833 result = r[0].name
1835 url = self.format_log_url(src, arch, run_id)
1836 except KeyError:
1837 # no result for src/arch; still running?
1838 assert arch in self.pending_tests.get(trigger, {}).get(src, {}).keys(), (
1839 "Result for %s/%s/%s (triggered by %s) is neither known nor pending!"
1840 % (src, ver, arch, trigger)
1841 )
1843 if self.has_force_badtest(src, ver, arch):
1844 result = "RUNNING-IGNORE"
1845 else:
1846 if self.has_test_in_target(src):
1847 baseline_result = self.result_in_baseline(src, arch)[0]
1848 if baseline_result is Result.FAIL:
1849 result = "RUNNING-ALWAYSFAIL"
1850 else:
1851 result = "RUNNING"
1852 else:
1853 if self.options.adt_ignore_failure_for_new_tests:
1854 result = "RUNNING-IGNORE"
1855 else:
1856 result = "RUNNING"
1857 url = self.options.adt_ci_url + "status/pending"
1859 return (result, ver, run_id, url)
1861 def has_force_badtest(self, src: str, ver: str, arch: str) -> bool:
1862 """Check if src/ver/arch has a force-badtest hint"""
1864 assert self.hints is not None
1865 for hint in self.hints.search("force-badtest", package=src):
1866 if any(
1867 mi
1868 for mi in hint.packages
1869 if mi.architecture in ("source", arch)
1870 and (
1871 mi.version is None
1872 or mi.version == "all" # Historical unversioned hint
1873 or apt_pkg.version_compare(ver, mi.version) <= 0
1874 )
1875 ):
1876 self.logger.info(
1877 "Checking hints for %s/%s/%s: %s",
1878 src,
1879 arch,
1880 ver,
1881 hint,
1882 )
1883 return True
1885 return False
1887 def has_built_on_this_arch_or_is_arch_all(
1888 self, src_data: SourcePackage, arch: str
1889 ) -> bool:
1890 """When a source builds arch:all binaries, those binaries are
1891 added to all architectures and thus the source 'exists'
1892 everywhere. This function checks if the source has any arch
1893 specific binaries on this architecture and if not, if it
1894 has them on any architecture.
1895 """
1896 packages_s_a = self.suite_info.primary_source_suite.binaries[arch]
1897 has_unknown_binary = False
1898 for binary_s in filter_out_faux_gen(src_data.binaries):
1899 try:
1900 binary_u = packages_s_a[binary_s.package_name]
1901 except KeyError:
1902 # src_data.binaries has all the built binaries, so if
1903 # we get here, we know that at least one architecture
1904 # has architecture specific binaries
1905 has_unknown_binary = True
1906 continue
1907 if binary_u.architecture == arch:
1908 return True
1909 # If we get here, we have only seen arch:all packages for this
1910 # arch.
1911 return not has_unknown_binary