Coverage for britney2/policies/autopkgtest.py: 89%
795 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-10-17 17:32 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-10-17 17:32 +0000
1# Copyright (C) 2013 - 2016 Canonical Ltd.
2# Authors:
3# Colin Watson <cjwatson@ubuntu.com>
4# Jean-Baptiste Lallement <jean-baptiste.lallement@canonical.com>
5# Martin Pitt <martin.pitt@ubuntu.com>
7# This program is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
17import calendar
18import collections
19import http.client
20import io
21import itertools
22import json
23import optparse
24import os
25import socket
26import sys
27import tarfile
28import time
29import urllib.parse
30from collections.abc import Iterator
31from copy import deepcopy
32from enum import Enum
33from functools import lru_cache, total_ordering
34from typing import TYPE_CHECKING, Any, Optional, cast
35from urllib.error import HTTPError
36from urllib.request import urlopen
37from urllib.response import addinfourl
39import apt_pkg
41from britney2 import (
42 BinaryPackageId,
43 PackageId,
44 SourcePackage,
45 SuiteClass,
46 Suites,
47 TargetSuite,
48)
49from britney2.hints import HintAnnotate, HintType
50from britney2.migrationitem import MigrationItem
51from britney2.policies import PolicyVerdict
52from britney2.policies.policy import AbstractBasePolicy
53from britney2.utils import iter_except, parse_option
55if TYPE_CHECKING: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true
56 import amqplib.client_0_8 as amqp
58 from ..britney import Britney
59 from ..excuse import Excuse
60 from ..hints import HintParser
63@total_ordering
64class Result(Enum):
65 PASS = 1
66 NEUTRAL = 2
67 FAIL = 3
68 OLD_PASS = 4
69 OLD_NEUTRAL = 5
70 OLD_FAIL = 6
71 NONE = 7
73 def __lt__(self, other: "Result") -> bool:
74 return True if self.value < other.value else False
77EXCUSES_LABELS = {
78 "PASS": '<span style="background:#87d96c">Pass</span>',
79 "OLD_PASS": '<span style="background:#87d96c">Pass</span>',
80 "NEUTRAL": '<span style="background:#e5c545">No tests, superficial or marked flaky</span>',
81 "OLD_NEUTRAL": '<span style="background:#e5c545">No tests, superficial or marked flaky</span>',
82 "FAIL": '<span style="background:#ff6666">Failed</span>',
83 "OLD_FAIL": '<span style="background:#ff6666">Failed</span>',
84 "ALWAYSFAIL": '<span style="background:#e5c545">Failed (not a regression)</span>',
85 "REGRESSION": '<span style="background:#ff6666">Regression</span>',
86 "IGNORE-FAIL": '<span style="background:#e5c545">Ignored failure</span>',
87 "RUNNING": '<span style="background:#99ddff">Test triggered</span>',
88 "RUNNING-ALWAYSFAIL": '<span style="background:#99ddff">Test triggered (will not be considered a regression)</span>',
89 "RUNNING-IGNORE": '<span style="background:#99ddff">Test triggered (failure will be ignored)</span>',
90 "RUNNING-REFERENCE": '<span style="background:#ff6666">Reference test triggered, but real test failed already</span>',
91}
93REF_TRIG = "migration-reference/0"
95VERSION_KEY = "britney-autopkgtest-pending-file-version"
98def srchash(src: str) -> str:
99 """archive hash prefix for source package"""
101 if src.startswith("lib"): 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true
102 return src[:4]
103 else:
104 return src[0]
107def added_pkgs_compared_to_target_suite(
108 package_ids: frozenset[BinaryPackageId],
109 target_suite: TargetSuite,
110 *,
111 invert: bool = False,
112) -> Iterator[BinaryPackageId]:
113 if invert: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 pkgs_ids_to_ignore = package_ids - set(
115 target_suite.which_of_these_are_in_the_suite(package_ids)
116 )
117 names_ignored = {p.package_name for p in pkgs_ids_to_ignore}
118 else:
119 names_ignored = {
120 p.package_name
121 for p in target_suite.which_of_these_are_in_the_suite(package_ids)
122 }
123 yield from (p for p in package_ids if p.package_name not in names_ignored)
126def all_leaf_results(
127 test_results: dict[str, dict[str, dict[str, list[Any]]]],
128) -> Iterator[list[Any]]:
129 for trigger in test_results.values():
130 for arch in trigger.values():
131 yield from arch.values()
134def mark_result_as_old(result: Result) -> Result:
135 """Convert current result into corresponding old result"""
137 if result == Result.FAIL:
138 result = Result.OLD_FAIL
139 elif result == Result.PASS:
140 result = Result.OLD_PASS
141 elif result == Result.NEUTRAL: 141 ↛ 143line 141 didn't jump to line 143 because the condition on line 141 was always true
142 result = Result.OLD_NEUTRAL
143 return result
146class AutopkgtestPolicy(AbstractBasePolicy):
147 """autopkgtest regression policy for source migrations
149 Run autopkgtests for the excuse and all of its reverse dependencies, and
150 reject the upload if any of those regress.
151 """
153 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
154 super().__init__(
155 "autopkgtest", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
156 )
157 # tests requested in this and previous runs
158 # trigger -> src -> [arch]
159 self.pending_tests: dict[str, dict[str, dict[str, int]]] | None = None
160 self.pending_tests_file = os.path.join(
161 self.state_dir, "autopkgtest-pending.json"
162 )
163 self.testsuite_triggers: dict[str, set[str]] = {}
164 self.result_in_baseline_cache: dict[str, dict[str, list[Any]]] = (
165 collections.defaultdict(dict)
166 )
168 self.amqp_file_handle: io.TextIOWrapper | None = None
170 # Default values for this policy's options
171 parse_option(options, "adt_baseline")
172 parse_option(options, "adt_huge", to_int=True)
173 parse_option(options, "adt_ppas")
174 parse_option(options, "adt_reference_max_age", day_to_sec=True)
175 parse_option(options, "adt_pending_max_age", default=5, day_to_sec=True)
176 parse_option(options, "adt_regression_penalty", default=0, to_int=True)
177 parse_option(options, "adt_log_url") # see below for defaults
178 parse_option(options, "adt_retry_url") # see below for defaults
179 parse_option(options, "adt_retry_older_than", day_to_sec=True)
180 parse_option(options, "adt_results_cache_age", day_to_sec=True)
181 parse_option(options, "adt_shared_results_cache")
182 parse_option(options, "adt_success_bounty", default=0, to_int=True)
183 parse_option(options, "adt_ignore_failure_for_new_tests", to_bool=True)
185 # When ADT_RESULTS_CACHE_AGE is smaller than or equal to
186 # ADT_REFERENCE_MAX_AGE old reference result will be removed from cache
187 # before the newly scheduled results are in, potentially causing
188 # additional waiting. For packages like glibc this might cause an
189 # infinite delay as there will always be a package that's
190 # waiting. Similarly for ADT_RETRY_OLDER_THAN.
191 if self.options.adt_results_cache_age <= self.options.adt_reference_max_age:
192 self.logger.warning(
193 "Unexpected: ADT_REFERENCE_MAX_AGE bigger than ADT_RESULTS_CACHE_AGE"
194 )
195 if self.options.adt_results_cache_age <= self.options.adt_retry_older_than:
196 self.logger.warning(
197 "Unexpected: ADT_RETRY_OLDER_THAN bigger than ADT_RESULTS_CACHE_AGE"
198 )
200 if not self.options.adt_log_url: 200 ↛ 226line 200 didn't jump to line 226 because the condition on line 200 was always true
201 # Historical defaults
202 if self.options.adt_swift_url.startswith("file://"):
203 self.options.adt_log_url = os.path.join(
204 self.options.adt_ci_url,
205 "data",
206 "autopkgtest",
207 self.options.series,
208 "{arch}",
209 "{hash}",
210 "{package}",
211 "{run_id}",
212 "log.gz",
213 )
214 else:
215 self.options.adt_log_url = os.path.join(
216 self.options.adt_swift_url,
217 "{swift_container}",
218 self.options.series,
219 "{arch}",
220 "{hash}",
221 "{package}",
222 "{run_id}",
223 "log.gz",
224 )
226 if hasattr(self.options, "adt_retry_url_mech"): 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true
227 self.logger.warning(
228 "The ADT_RETRY_URL_MECH configuration has been deprecated."
229 )
230 self.logger.warning(
231 "Instead britney now supports ADT_RETRY_URL for more flexibility."
232 )
233 if self.options.adt_retry_url:
234 self.logger.error(
235 "Please remove the ADT_RETRY_URL_MECH as ADT_RETRY_URL will be used."
236 )
237 elif self.options.adt_retry_url_mech == "run_id":
238 self.options.adt_retry_url = (
239 self.options.adt_ci_url + "api/v1/retry/{run_id}"
240 )
241 if not self.options.adt_retry_url: 241 ↛ 258line 241 didn't jump to line 258 because the condition on line 241 was always true
242 # Historical default
243 self.options.adt_retry_url = (
244 self.options.adt_ci_url
245 + "request.cgi?"
246 + "release={release}&arch={arch}&package={package}&trigger={trigger}{ppas}"
247 )
249 # results map: trigger -> src -> arch -> [passed, version, run_id, seen]
250 # - trigger is "source/version" of an unstable package that triggered
251 # this test run.
252 # - "passed" is a Result
253 # - "version" is the package version of "src" of that test
254 # - "run_id" is an opaque ID that identifies a particular test run for
255 # a given src/arch.
256 # - "seen" is an approximate time stamp of the test run. How this is
257 # deduced depends on the interface used.
258 self.test_results: dict[str, dict[str, dict[str, list[Any]]]] = {}
259 if self.options.adt_shared_results_cache:
260 self.results_cache_file = self.options.adt_shared_results_cache
261 else:
262 self.results_cache_file = os.path.join(
263 self.state_dir, "autopkgtest-results.cache"
264 )
266 try:
267 self.options.adt_ppas = self.options.adt_ppas.strip().split()
268 except AttributeError:
269 self.options.adt_ppas = []
271 self.swift_container = "autopkgtest-" + options.series
272 if self.options.adt_ppas:
273 self.swift_container += "-" + options.adt_ppas[-1].replace("/", "-")
275 # restrict adt_arches to architectures we actually run for
276 self.adt_arches = []
277 for arch in self.options.adt_arches.split():
278 if arch in self.options.architectures:
279 self.adt_arches.append(arch)
280 else:
281 self.logger.info(
282 "Ignoring ADT_ARCHES %s as it is not in architectures list", arch
283 )
285 def __del__(self) -> None:
286 if self.amqp_file_handle: 286 ↛ exitline 286 didn't return from function '__del__' because the condition on line 286 was always true
287 try:
288 self.amqp_file_handle.close()
289 except AttributeError:
290 pass
292 def register_hints(self, hint_parser: "HintParser") -> None:
293 hint_parser.register_hint_type(
294 HintType(
295 "force-badtest",
296 versioned=HintAnnotate.OPTIONAL,
297 architectured=HintAnnotate.OPTIONAL,
298 )
299 )
300 hint_parser.register_hint_type(HintType("force-skiptest"))
302 def initialise(self, britney: "Britney") -> None:
303 super().initialise(britney)
304 # We want to use the "current" time stamp in multiple locations
305 time_now = round(time.time())
306 if hasattr(self.options, "fake_runtime"): 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true
307 time_now = int(self.options.fake_runtime)
308 self._now = time_now
309 # compute inverse Testsuite-Triggers: map, unifying all series
310 self.logger.info("Building inverse testsuite_triggers map")
311 for suite in self.suite_info:
312 for src, data in suite.sources.items():
313 for trigger in data.testsuite_triggers:
314 self.testsuite_triggers.setdefault(trigger, set()).add(src)
315 target_suite_name = self.suite_info.target_suite.name
317 os.makedirs(self.state_dir, exist_ok=True)
318 self.read_pending_tests()
320 # read the cached results that we collected so far
321 if os.path.exists(self.results_cache_file):
322 with open(self.results_cache_file) as f:
323 test_results = json.load(f)
324 self.test_results = self.check_and_upgrade_cache(test_results)
325 self.logger.info("Read previous results from %s", self.results_cache_file)
326 else:
327 self.logger.info(
328 "%s does not exist, re-downloading all results from swift",
329 self.results_cache_file,
330 )
332 # read in the new results
333 if self.options.adt_swift_url.startswith("file://"):
334 debci_file = self.options.adt_swift_url[7:]
335 if os.path.exists(debci_file):
336 with open(debci_file) as f:
337 test_results = json.load(f)
338 self.logger.info("Read new results from %s", debci_file)
339 for res in test_results["results"]:
340 # if there's no date, the test didn't finish yet
341 if res["date"] is None: 341 ↛ 342line 341 didn't jump to line 342 because the condition on line 341 was never true
342 continue
343 (test_suite, triggers, src, arch, ver, status, run_id, seen) = [
344 res["suite"],
345 res["trigger"],
346 res["package"],
347 res["arch"],
348 res["version"],
349 res["status"],
350 str(res["run_id"]),
351 round(
352 calendar.timegm(
353 time.strptime(res["date"][0:-5], "%Y-%m-%dT%H:%M:%S")
354 )
355 ),
356 ]
357 if test_suite != target_suite_name: 357 ↛ 359line 357 didn't jump to line 359 because the condition on line 357 was never true
358 # not requested for this target suite, so ignore
359 continue
360 if triggers is None: 360 ↛ 362line 360 didn't jump to line 362 because the condition on line 360 was never true
361 # not requested for this policy, so ignore
362 continue
363 if status is None:
364 # still running => pending
365 continue
366 for trigger in triggers.split():
367 # remove matching test requests
368 self.remove_from_pending(trigger, src, arch, seen)
369 if status == "tmpfail": 369 ↛ 371line 369 didn't jump to line 371 because the condition on line 369 was never true
370 # let's see if we still need it
371 continue
372 self.logger.debug(
373 "Results %s %s %s added", src, trigger, status
374 )
375 self.add_trigger_to_results(
376 trigger,
377 src,
378 ver,
379 arch,
380 run_id,
381 seen,
382 Result[status.upper()],
383 )
384 else:
385 self.logger.info(
386 "%s does not exist, no new data will be processed", debci_file
387 )
389 # The cache can contain results against versions of packages that
390 # are not in any suite anymore. Strip those out, as we don't want
391 # to use those results. Additionally, old references may be
392 # filtered out.
393 if self.options.adt_baseline == "reference":
394 self.filter_old_results()
396 # we need sources, binaries, and installability tester, so for now
397 # remember the whole britney object
398 self.britney = britney
400 # Initialize AMQP connection
401 self.amqp_channel: Optional["amqp.channel.Channel"] = None
402 self.amqp_file_handle = None
403 if self.options.dry_run: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true
404 return
406 amqp_url = self.options.adt_amqp
408 if amqp_url.startswith("amqp://"): 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true
409 import amqplib.client_0_8 as amqp
411 # depending on the setup we connect to a AMQP server
412 creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False)
413 self.amqp_con = amqp.Connection(
414 creds.hostname, userid=creds.username, password=creds.password
415 )
416 self.amqp_channel = self.amqp_con.channel()
417 self.logger.info("Connected to AMQP server")
418 elif amqp_url.startswith("file://"): 418 ↛ 423line 418 didn't jump to line 423 because the condition on line 418 was always true
419 # or in Debian and in testing mode, adt_amqp will be a file:// URL
420 amqp_file = amqp_url[7:]
421 self.amqp_file_handle = open(amqp_file, "w", 1)
422 else:
423 raise RuntimeError("Unknown ADT_AMQP schema %s" % amqp_url.split(":", 1)[0])
425 def check_and_upgrade_cache(
426 self, test_results: dict[str, dict[str, dict[str, list[Any]]]]
427 ) -> dict[str, dict[str, dict[str, list[Any]]]]:
428 for leaf_result in all_leaf_results(test_results):
429 leaf_result[0] = Result[leaf_result[0]]
431 # Drop results older than ADT_RESULTS_CACHE_AGE
432 for trigger in list(test_results.keys()):
433 for pkg in list(test_results[trigger].keys()):
434 for arch in list(test_results[trigger][pkg].keys()):
435 arch_result = test_results[trigger][pkg][arch]
436 if self._now - arch_result[3] > self.options.adt_results_cache_age: 436 ↛ 437line 436 didn't jump to line 437 because the condition on line 436 was never true
437 del test_results[trigger][pkg][arch]
438 if not test_results[trigger][pkg]: 438 ↛ 439line 438 didn't jump to line 439 because the condition on line 438 was never true
439 del test_results[trigger][pkg]
440 if not test_results[trigger]: 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true
441 del test_results[trigger]
443 return test_results
445 def filter_old_results(self) -> None:
446 """Remove results for old versions and reference runs from the cache.
448 For now, only delete reference runs. If we delete regular
449 results after a while, packages with lots of triggered tests may
450 never have all the results at the same time."""
452 test_results = self.test_results
454 for trigger, trigger_data in test_results.items():
455 for src, results in trigger_data.items():
456 for arch, result in results.items():
457 if (
458 trigger == REF_TRIG
459 and self._now - result[3] > self.options.adt_reference_max_age
460 ):
461 result[0] = mark_result_as_old(result[0])
462 elif not self.test_version_in_any_suite(src, result[1]):
463 result[0] = mark_result_as_old(result[0])
465 def test_version_in_any_suite(self, src: str, version: str) -> bool:
466 """Check if the mentioned version of src is found in a suite
468 To prevent regressions in the target suite, the result should be
469 from a test with the version of the package in either the source
470 suite or the target suite. The source suite is also valid,
471 because due to versioned test dependencies and Breaks/Conflicts
472 relations, regularly the version in the source suite is used
473 during testing.
474 """
476 versions = set()
477 for suite in self.suite_info:
478 try:
479 srcinfo = suite.sources[src]
480 except KeyError:
481 continue
482 versions.add(srcinfo.version)
484 valid_version = False
485 for ver in versions:
486 if apt_pkg.version_compare(ver, version) == 0:
487 valid_version = True
488 break
490 return valid_version
492 def save_pending_json(self) -> None:
493 # update the pending tests on-disk cache
494 self.logger.info(
495 "Updating pending requested tests in %s" % self.pending_tests_file
496 )
497 # Shallow clone pending_tests as we only modify the toplevel and change its type.
498 pending_tests: dict[str, Any] = {}
499 if self.pending_tests:
500 pending_tests = dict(self.pending_tests)
501 # Avoid adding if there are no pending results at all (eases testing)
502 pending_tests[VERSION_KEY] = 1
503 with open(self.pending_tests_file + ".new", "w") as f:
504 json.dump(pending_tests, f, indent=2)
505 os.rename(self.pending_tests_file + ".new", self.pending_tests_file)
507 def save_state(self, britney: "Britney") -> None:
508 super().save_state(britney)
510 # update the results on-disk cache, unless we are using a r/o shared one
511 if not self.options.adt_shared_results_cache:
512 self.logger.info("Updating results cache")
513 test_results = deepcopy(self.test_results)
514 for result in all_leaf_results(test_results):
515 result[0] = result[0].name
516 with open(self.results_cache_file + ".new", "w") as f:
517 json.dump(test_results, f, indent=2)
518 os.rename(self.results_cache_file + ".new", self.results_cache_file)
520 self.save_pending_json()
522 def format_retry_url(
523 self, run_id: str | None, arch: str, testsrc: str, trigger: str
524 ) -> str:
525 if self.options.adt_ppas:
526 ppas = "&" + urllib.parse.urlencode(
527 [("ppa", p) for p in self.options.adt_ppas]
528 )
529 else:
530 ppas = ""
531 return cast(str, self.options.adt_retry_url).format(
532 run_id=run_id,
533 release=self.options.series,
534 arch=arch,
535 package=testsrc,
536 trigger=urllib.parse.quote_plus(trigger),
537 ppas=ppas,
538 )
540 def format_log_url(self, testsrc: str, arch: str, run_id: str) -> str:
541 return cast(str, self.options.adt_log_url).format(
542 release=self.options.series,
543 swift_container=self.swift_container,
544 hash=srchash(testsrc),
545 package=testsrc,
546 arch=arch,
547 run_id=run_id,
548 )
550 def apply_src_policy_impl(
551 self,
552 tests_info: dict[str, Any],
553 item: MigrationItem,
554 source_data_tdist: SourcePackage | None,
555 source_data_srcdist: SourcePackage,
556 excuse: "Excuse",
557 ) -> PolicyVerdict:
558 assert self.hints is not None # for type checking
559 # initialize
560 verdict = PolicyVerdict.PASS
561 all_self_tests_pass = False
562 source_name = item.package
563 results_info = []
565 # skip/delay autopkgtests until new package is built somewhere
566 if not source_data_srcdist.binaries:
567 self.logger.debug(
568 "%s hasnot been built anywhere, skipping autopkgtest policy",
569 excuse.name,
570 )
571 verdict = PolicyVerdict.REJECTED_TEMPORARILY
572 excuse.add_verdict_info(verdict, "nothing built yet, autopkgtest delayed")
574 if "all" in excuse.missing_builds:
575 self.logger.debug(
576 "%s hasnot been built for arch:all, skipping autopkgtest policy",
577 source_name,
578 )
579 verdict = PolicyVerdict.REJECTED_TEMPORARILY
580 excuse.add_verdict_info(
581 verdict, "arch:all not built yet, autopkgtest delayed"
582 )
584 if not verdict.is_rejected:
585 self.logger.debug("Checking autopkgtests for %s", source_name)
586 trigger = source_name + "/" + source_data_srcdist.version
588 # build a (testsrc, testver) → arch → (status, run_id, log_url) map; we trigger/check test
589 # results per architecture for technical/efficiency reasons, but we
590 # want to evaluate and present the results by tested source package
591 # first
592 pkg_arch_result: dict[
593 tuple[str, str], dict[str, tuple[str, str | None, str]]
594 ] = collections.defaultdict(dict)
595 for arch in self.adt_arches:
596 if arch in excuse.missing_builds:
597 verdict = PolicyVerdict.REJECTED_TEMPORARILY
598 self.logger.debug(
599 "%s hasnot been built on arch %s, delay autopkgtest there",
600 source_name,
601 arch,
602 )
603 excuse.add_verdict_info(
604 verdict,
605 "arch:%s not built yet, autopkgtest delayed there" % arch,
606 )
607 elif arch in excuse.policy_info["depends"].get(
608 "arch_all_not_installable", []
609 ):
610 self.logger.debug(
611 "%s is uninstallable on arch %s (which is allowed), not running autopkgtest there",
612 source_name,
613 arch,
614 )
615 excuse.addinfo(
616 "uninstallable on arch %s (which is allowed), not running autopkgtest there"
617 % arch
618 )
619 elif (
620 arch in excuse.unsatisfiable_on_archs
621 and arch
622 not in excuse.policy_info["depends"].get(
623 "autopkgtest_run_anyways", []
624 )
625 ):
626 verdict = PolicyVerdict.REJECTED_TEMPORARILY
627 self.logger.debug(
628 "%s is uninstallable on arch %s, not running autopkgtest there",
629 source_name,
630 arch,
631 )
632 excuse.addinfo(
633 "uninstallable on arch %s, not running autopkgtest there" % arch
634 )
635 else:
636 self.request_tests_for_source(
637 item, arch, source_data_srcdist, pkg_arch_result, excuse
638 )
640 # add test result details to Excuse
641 cloud_url = self.options.adt_ci_url + "packages/%(h)s/%(s)s/%(r)s/%(a)s"
642 testver: str | None
643 for testsrc, testver in sorted(pkg_arch_result):
644 assert testver is not None
645 arch_results = pkg_arch_result[(testsrc, testver)]
646 r = {v[0] for v in arch_results.values()}
647 if r & {"FAIL", "OLD_FAIL", "REGRESSION"}:
648 verdict = PolicyVerdict.REJECTED_PERMANENTLY
649 elif r & {"RUNNING", "RUNNING-REFERENCE"} and not verdict.is_rejected:
650 verdict = PolicyVerdict.REJECTED_TEMPORARILY
651 # skip version if still running on all arches
652 if not r - {"RUNNING", "RUNNING-ALWAYSFAIL", "RUNNING-IGNORE"}:
653 testver = None
655 # A source package is eligible for the bounty if it has tests
656 # of its own that pass on all tested architectures.
657 if testsrc == source_name:
658 excuse.autopkgtest_results = r
659 if r == {"PASS"}:
660 all_self_tests_pass = True
662 if testver:
663 testname = f"{testsrc}/{testver}"
664 else:
665 testname = testsrc
667 html_archmsg = []
668 for arch in sorted(arch_results):
669 (status, run_id, log_url) = arch_results[arch]
670 artifact_url = None
671 retry_url = None
672 reference_url = None
673 reference_retry_url = None
674 history_url = None
675 if self.options.adt_ppas:
676 if log_url.endswith("log.gz"):
677 artifact_url = log_url.replace("log.gz", "artifacts.tar.gz")
678 else:
679 history_url = cloud_url % {
680 "h": srchash(testsrc),
681 "s": testsrc,
682 "r": self.options.series,
683 "a": arch,
684 }
685 if status not in ("PASS", "RUNNING", "RUNNING-IGNORE"):
686 retry_url = self.format_retry_url(
687 run_id, arch, testsrc, trigger
688 )
690 baseline_result = self.result_in_baseline(testsrc, arch)
691 if baseline_result and baseline_result[0] != Result.NONE:
692 baseline_run_id = str(baseline_result[2])
693 reference_url = self.format_log_url(
694 testsrc, arch, baseline_run_id
695 )
696 if self.options.adt_baseline == "reference":
697 reference_retry_url = self.format_retry_url(
698 baseline_run_id, arch, testsrc, REF_TRIG
699 )
700 tests_info.setdefault(testname, {})[arch] = [
701 status,
702 log_url,
703 history_url,
704 artifact_url,
705 retry_url,
706 ]
708 # render HTML snippet for testsrc entry for current arch
709 if history_url:
710 message = f'<a href="{history_url}">{arch}</a>'
711 else:
712 message = arch
713 message += ': <a href="{}">{}</a>'.format(
714 log_url,
715 EXCUSES_LABELS[status],
716 )
717 if retry_url:
718 message += (
719 '<a href="%s" style="text-decoration: none;"> ♻</a>'
720 % retry_url
721 )
722 if reference_url:
723 message += ' (<a href="%s">reference</a>' % reference_url
724 if reference_retry_url:
725 message += (
726 '<a href="%s" style="text-decoration: none;"> ♻</a>'
727 % reference_retry_url
728 )
729 message += ")"
730 if artifact_url:
731 message += ' <a href="%s">[artifacts]</a>' % artifact_url
732 html_archmsg.append(message)
734 # render HTML line for testsrc entry
735 # - if action is or may be required
736 # - for ones own package
737 if (
738 r
739 - {
740 "PASS",
741 "NEUTRAL",
742 "RUNNING-ALWAYSFAIL",
743 "ALWAYSFAIL",
744 "IGNORE-FAIL",
745 }
746 or testsrc == source_name
747 ):
748 if testver:
749 pkg = '<a href="#{0}">{0}</a>/{1}'.format(testsrc, testver)
750 else:
751 pkg = '<a href="#{0}">{0}</a>'.format(testsrc)
752 results_info.append(
753 "autopkgtest for {}: {}".format(pkg, ", ".join(html_archmsg))
754 )
756 if verdict.is_rejected:
757 # check for force-skiptest hint
758 hints = self.hints.search(
759 "force-skiptest",
760 package=source_name,
761 version=source_data_srcdist.version,
762 )
763 if hints:
764 excuse.addreason("skiptest")
765 excuse.addinfo(
766 "Should wait for tests relating to %s %s, but forced by %s"
767 % (source_name, source_data_srcdist.version, hints[0].user)
768 )
769 verdict = PolicyVerdict.PASS_HINTED
770 else:
771 excuse.addreason("autopkgtest")
773 if (
774 self.options.adt_success_bounty
775 and verdict == PolicyVerdict.PASS
776 and all_self_tests_pass
777 ):
778 excuse.add_bounty("autopkgtest", self.options.adt_success_bounty)
779 if self.options.adt_regression_penalty and verdict in {
780 PolicyVerdict.REJECTED_PERMANENTLY,
781 PolicyVerdict.REJECTED_TEMPORARILY,
782 }:
783 if self.options.adt_regression_penalty > 0: 783 ↛ 786line 783 didn't jump to line 786 because the condition on line 783 was always true
784 excuse.add_penalty("autopkgtest", self.options.adt_regression_penalty)
785 # In case we give penalties instead of blocking, we must always pass
786 verdict = PolicyVerdict.PASS
787 for i in results_info:
788 if verdict.is_rejected:
789 excuse.add_verdict_info(verdict, i)
790 else:
791 excuse.addinfo(i)
793 return verdict
795 #
796 # helper functions
797 #
799 @staticmethod
800 def has_autodep8(srcinfo: SourcePackage) -> bool:
801 """Check if package is covered by autodep8
803 srcinfo is an item from self.britney.sources
804 """
805 # autodep8?
806 for t in srcinfo.testsuite:
807 if t.startswith("autopkgtest-pkg"):
808 return True
810 return False
812 def request_tests_for_source(
813 self,
814 item: MigrationItem,
815 arch: str,
816 source_data_srcdist: SourcePackage,
817 pkg_arch_result: dict[tuple[str, str], dict[str, tuple[str, str | None, str]]],
818 excuse: "Excuse",
819 ) -> None:
820 pkg_universe = self.britney.pkg_universe
821 target_suite = self.suite_info.target_suite
822 source_suite = item.suite
823 sources_t = target_suite.sources
824 sources_s = item.suite.sources
825 packages_s_a = item.suite.binaries[arch]
826 source_name = item.package
827 source_version = source_data_srcdist.version
828 # request tests (unless they were already requested earlier or have a result)
829 tests = self.tests_for_source(source_name, source_version, arch, excuse)
830 is_huge = len(tests) > self.options.adt_huge
832 # Here we figure out what is required from the source suite
833 # for the test to install successfully.
834 #
835 # The ImplicitDependencyPolicy does a similar calculation, but
836 # if I (elbrus) understand correctly, only in the reverse
837 # dependency direction. We are doing something similar here
838 # but in the dependency direction (note: this code is older).
839 # We use the ImplicitDependencyPolicy result for the reverse
840 # dependencies and we keep the code below for the
841 # dependencies. Using the ImplicitDependencyPolicy results
842 # also in the reverse direction seems to require quite some
843 # reorganisation to get that information available here, as in
844 # the current state only the current excuse is available here
845 # and the required other excuses may not be calculated yet.
846 #
847 # Loop over all binary packages from trigger and
848 # recursively look up which *versioned* dependencies are
849 # only satisfied in the source suite.
850 #
851 # For all binaries found, look up which packages they
852 # break/conflict with in the target suite, but not in the
853 # source suite. The main reason to do this is to cover test
854 # dependencies, so we will check Testsuite-Triggers as
855 # well.
856 #
857 # OI: do we need to do the first check in a smart way
858 # (i.e. only for the packages that are actually going to be
859 # installed) for the breaks/conflicts set as well, i.e. do
860 # we need to check if any of the packages that we now
861 # enforce being from the source suite, actually have new
862 # versioned depends and new breaks/conflicts.
863 #
864 # For all binaries found, add the set of unique source
865 # packages to the list of triggers.
867 bin_triggers: set[PackageId] = set()
868 bin_new = set(source_data_srcdist.binaries)
869 for n_binary in iter_except(bin_new.pop, KeyError):
870 if n_binary in bin_triggers:
871 continue
872 bin_triggers.add(n_binary)
874 # Check if there is a dependency that is not
875 # available in the target suite.
876 # We add slightly too much here, because new binaries
877 # will also show up, but they are already properly
878 # installed. Nevermind.
879 depends = pkg_universe.dependencies_of(n_binary)
880 # depends is a frozenset{frozenset{BinaryPackageId, ..}}
881 for deps_of_bin in depends:
882 if target_suite.any_of_these_are_in_the_suite(deps_of_bin):
883 # if any of the alternative dependencies is already
884 # satisfied in the target suite, we can just ignore it
885 continue
886 # We'll figure out which version later
887 bin_new.update(
888 added_pkgs_compared_to_target_suite(deps_of_bin, target_suite)
889 )
891 # Check if the package breaks/conflicts anything. We might
892 # be adding slightly too many source packages due to the
893 # check here as a binary package that is broken may be
894 # coming from a different source package in the source
895 # suite. Nevermind.
896 bin_broken = set()
897 for t_binary in bin_triggers:
898 # broken is a frozenset{BinaryPackageId, ..}
899 broken = pkg_universe.negative_dependencies_of(
900 cast(BinaryPackageId, t_binary)
901 )
902 broken_in_target = {
903 p.package_name
904 for p in target_suite.which_of_these_are_in_the_suite(broken)
905 }
906 broken_in_source = {
907 p.package_name
908 for p in source_suite.which_of_these_are_in_the_suite(broken)
909 }
910 # We want packages with a newer version in the source suite that
911 # no longer has the conflict. This is an approximation
912 broken_filtered = {
913 p
914 for p in broken
915 if p.package_name in broken_in_target
916 and p.package_name not in broken_in_source
917 }
918 # We add the version in the target suite, but the code below will
919 # change it to the version in the source suite
920 bin_broken.update(broken_filtered)
921 bin_triggers.update(bin_broken)
923 # The ImplicitDependencyPolicy also found packages that need
924 # to migrate together, so add them to the triggers too.
925 for bin_implicit in excuse.depends_packages_flattened:
926 if bin_implicit.architecture == arch:
927 bin_triggers.add(bin_implicit)
929 triggers = set()
930 for t_binary2 in bin_triggers:
931 if t_binary2.architecture == arch:
932 try:
933 source_of_bin = packages_s_a[t_binary2.package_name].source
934 # If the version in the target suite is the same, don't add a trigger.
935 # Note that we looked up the source package in the source suite.
936 # If it were a different source package in the target suite, however, then
937 # we would not have this source package in the same version anyway.
938 if (
939 sources_t.get(source_of_bin, None) is None
940 or sources_s[source_of_bin].version
941 != sources_t[source_of_bin].version
942 ):
943 triggers.add(
944 source_of_bin + "/" + sources_s[source_of_bin].version
945 )
946 except KeyError:
947 # Apparently the package was removed from
948 # unstable e.g. if packages are replaced
949 # (e.g. -dbg to -dbgsym)
950 pass
951 if t_binary2 not in source_data_srcdist.binaries:
952 for tdep_src in self.testsuite_triggers.get( 952 ↛ 955line 952 didn't jump to line 955 because the loop on line 952 never started
953 t_binary2.package_name, set()
954 ):
955 try:
956 # Only add trigger if versions in the target and source suites are different
957 if (
958 sources_t.get(tdep_src, None) is None
959 or sources_s[tdep_src].version
960 != sources_t[tdep_src].version
961 ):
962 triggers.add(
963 tdep_src + "/" + sources_s[tdep_src].version
964 )
965 except KeyError:
966 # Apparently the source was removed from
967 # unstable (testsuite_triggers are unified
968 # over all suites)
969 pass
970 trigger = source_name + "/" + source_version
971 triggers.discard(trigger)
972 triggers_list = sorted(list(triggers))
973 triggers_list.insert(0, trigger)
975 for testsrc, testver in tests:
976 self.pkg_test_request(testsrc, arch, triggers_list, huge=is_huge)
977 (result, real_ver, run_id, url) = self.pkg_test_result(
978 testsrc, testver, arch, trigger
979 )
980 pkg_arch_result[(testsrc, real_ver)][arch] = (result, run_id, url)
982 def tests_for_source(
983 self, src: str, ver: str, arch: str, excuse: "Excuse"
984 ) -> list[tuple[str, str]]:
985 """Iterate over all tests that should be run for given source and arch"""
987 source_suite = self.suite_info.primary_source_suite
988 target_suite = self.suite_info.target_suite
989 sources_info = target_suite.sources
990 binaries_info = target_suite.binaries[arch]
992 reported_pkgs = set()
994 tests = []
996 # Debian doesn't have linux-meta, but Ubuntu does
997 # for linux themselves we don't want to trigger tests -- these should
998 # all come from linux-meta*. A new kernel ABI without a corresponding
999 # -meta won't be installed and thus we can't sensibly run tests against
1000 # it.
1001 if ( 1001 ↛ 1005line 1001 didn't jump to line 1005
1002 src.startswith("linux")
1003 and src.replace("linux", "linux-meta") in sources_info
1004 ):
1005 return []
1007 # we want to test the package itself, if it still has a test in unstable
1008 # but only if the package actually exists on this arch
1009 srcinfo = source_suite.sources[src]
1010 if ("autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo)) and len(
1011 excuse.packages[arch]
1012 ) > 0:
1013 reported_pkgs.add(src)
1014 tests.append((src, ver))
1016 extra_bins = []
1017 # Debian doesn't have linux-meta, but Ubuntu does
1018 # Hack: For new kernels trigger all DKMS packages by pretending that
1019 # linux-meta* builds a "dkms" binary as well. With that we ensure that we
1020 # don't regress DKMS drivers with new kernel versions.
1021 if src.startswith("linux-meta"):
1022 # does this have any image on this arch?
1023 for pkg_id in srcinfo.binaries:
1024 if pkg_id.architecture == arch and "-image" in pkg_id.package_name:
1025 try:
1026 extra_bins.append(binaries_info["dkms"].pkg_id)
1027 except KeyError:
1028 pass
1030 if not self.has_built_on_this_arch_or_is_arch_all(srcinfo, arch):
1031 return []
1033 pkg_universe = self.britney.pkg_universe
1034 # plus all direct reverse dependencies and test triggers of its
1035 # binaries which have an autopkgtest
1036 for binary in itertools.chain(srcinfo.binaries, extra_bins):
1037 rdeps = pkg_universe.reverse_dependencies_of(binary)
1038 for rdep in rdeps:
1039 try:
1040 rdep_src = binaries_info[rdep.package_name].source
1041 # Don't re-trigger the package itself here; this should
1042 # have been done above if the package still continues to
1043 # have an autopkgtest in unstable.
1044 if rdep_src == src:
1045 continue
1046 except KeyError:
1047 continue
1049 rdep_src_info = sources_info[rdep_src]
1050 if "autopkgtest" in rdep_src_info.testsuite or self.has_autodep8(
1051 rdep_src_info
1052 ):
1053 if rdep_src not in reported_pkgs:
1054 tests.append((rdep_src, rdep_src_info.version))
1055 reported_pkgs.add(rdep_src)
1057 for tdep_src in self.testsuite_triggers.get(binary.package_name, set()):
1058 if tdep_src not in reported_pkgs:
1059 try:
1060 tdep_src_info = sources_info[tdep_src]
1061 except KeyError:
1062 continue
1063 if "autopkgtest" in tdep_src_info.testsuite or self.has_autodep8( 1063 ↛ 1057line 1063 didn't jump to line 1057 because the condition on line 1063 was always true
1064 tdep_src_info
1065 ):
1066 for pkg_id in tdep_src_info.binaries: 1066 ↛ 1057line 1066 didn't jump to line 1057 because the loop on line 1066 didn't complete
1067 if pkg_id.architecture == arch:
1068 tests.append((tdep_src, tdep_src_info.version))
1069 reported_pkgs.add(tdep_src)
1070 break
1072 tests.sort(key=lambda s_v: s_v[0])
1073 return tests
1075 def read_pending_tests(self) -> None:
1076 """Read pending test requests from previous britney runs
1078 Initialize self.pending_tests with that data.
1079 """
1080 assert self.pending_tests is None, "already initialized"
1081 if not os.path.exists(self.pending_tests_file):
1082 self.logger.info(
1083 "No %s, starting with no pending tests", self.pending_tests_file
1084 )
1085 self.pending_tests = {}
1086 return
1087 with open(self.pending_tests_file) as f:
1088 self.pending_tests = json.load(f)
1089 if VERSION_KEY in self.pending_tests:
1090 del self.pending_tests[VERSION_KEY]
1091 for trigger in list(self.pending_tests.keys()):
1092 for pkg in list(self.pending_tests[trigger].keys()):
1093 arch_dict = self.pending_tests[trigger][pkg]
1094 for arch in list(arch_dict.keys()):
1095 if (
1096 self._now - arch_dict[arch]
1097 > self.options.adt_pending_max_age
1098 ):
1099 del arch_dict[arch]
1100 if not arch_dict:
1101 del self.pending_tests[trigger][pkg]
1102 if not self.pending_tests[trigger]:
1103 del self.pending_tests[trigger]
1104 else:
1105 # Migration code:
1106 for trigger_data in self.pending_tests.values(): 1106 ↛ 1107line 1106 didn't jump to line 1107 because the loop on line 1106 never started
1107 for pkg, arch_list in trigger_data.items():
1108 trigger_data[pkg] = {}
1109 for arch in arch_list:
1110 trigger_data[pkg][arch] = self._now
1112 self.logger.info(
1113 "Read pending requested tests from %s", self.pending_tests_file
1114 )
1115 self.logger.debug("%s", self.pending_tests)
1117 # this requires iterating over all triggers and thus is expensive;
1118 # cache the results
1119 @lru_cache(None)
1120 def latest_run_for_package(self, src: str, arch: str) -> str:
1121 """Return latest run ID for src on arch"""
1123 latest_run_id = ""
1124 for srcmap in self.test_results.values():
1125 try:
1126 run_id = srcmap[src][arch][2]
1127 except KeyError:
1128 continue
1129 if run_id > latest_run_id:
1130 latest_run_id = run_id
1131 return latest_run_id
1133 def urlopen_retry(self, url: str) -> http.client.HTTPResponse | addinfourl:
1134 """A urlopen() that retries on time outs or errors"""
1136 exc: Exception
1137 for retry in range(5): 1137 ↛ 1158line 1137 didn't jump to line 1158 because the loop on line 1137 didn't complete
1138 try:
1139 req = urlopen(url, timeout=30)
1140 code = req.getcode()
1141 if not code or 200 <= code < 300: 1141 ↛ 1137line 1141 didn't jump to line 1137 because the condition on line 1141 was always true
1142 return req # type: ignore[no-any-return]
1143 except TimeoutError as e: 1143 ↛ 1144line 1143 didn't jump to line 1144 because the exception caught by line 1143 didn't happen
1144 self.logger.info(
1145 "Timeout downloading '%s', will retry %d more times."
1146 % (url, 5 - retry - 1)
1147 )
1148 exc = e
1149 except HTTPError as e:
1150 if e.code not in (503, 502): 1150 ↛ 1152line 1150 didn't jump to line 1152 because the condition on line 1150 was always true
1151 raise
1152 self.logger.info(
1153 "Caught error %d downloading '%s', will retry %d more times."
1154 % (e.code, url, 5 - retry - 1)
1155 )
1156 exc = e
1157 else:
1158 raise exc
1160 @lru_cache(None)
1161 def fetch_swift_results(self, swift_url: str, src: str, arch: str) -> None:
1162 """Download new results for source package/arch from swift"""
1164 # prepare query: get all runs with a timestamp later than the latest
1165 # run_id for this package/arch; '@' is at the end of each run id, to
1166 # mark the end of a test run directory path
1167 # example: <autopkgtest-wily>wily/amd64/libp/libpng/20150630_054517@/result.tar
1168 query = {
1169 "delimiter": "@",
1170 "prefix": f"{self.options.series}/{arch}/{srchash(src)}/{src}/",
1171 }
1173 # determine latest run_id from results
1174 if not self.options.adt_shared_results_cache:
1175 latest_run_id = self.latest_run_for_package(src, arch)
1176 if latest_run_id:
1177 query["marker"] = query["prefix"] + latest_run_id
1179 # request new results from swift
1180 url = os.path.join(swift_url, self.swift_container)
1181 url += "?" + urllib.parse.urlencode(query)
1182 f = None
1183 try:
1184 f = self.urlopen_retry(url)
1185 if f.getcode() == 200:
1186 result_paths = f.read().decode().strip().splitlines()
1187 elif f.getcode() == 204: # No content 1187 ↛ 1193line 1187 didn't jump to line 1193 because the condition on line 1187 was always true
1188 result_paths = []
1189 else:
1190 # we should not ever end up here as we expect a HTTPError in
1191 # other cases; e. g. 3XX is something that tells us to adjust
1192 # our URLS, so fail hard on those
1193 raise NotImplementedError(
1194 "fetch_swift_results(%s): cannot handle HTTP code %r"
1195 % (url, f.getcode())
1196 )
1197 except OSError as e:
1198 # 401 "Unauthorized" is swift's way of saying "container does not exist"
1199 if getattr(e, "code", -1) == 401: 1199 ↛ 1208line 1199 didn't jump to line 1208 because the condition on line 1199 was always true
1200 self.logger.info(
1201 "fetch_swift_results: %s does not exist yet or is inaccessible", url
1202 )
1203 return
1204 # Other status codes are usually a transient
1205 # network/infrastructure failure. Ignoring this can lead to
1206 # re-requesting tests which we already have results for, so
1207 # fail hard on this and let the next run retry.
1208 self.logger.error("Failure to fetch swift results from %s: %s", url, str(e))
1209 sys.exit(1)
1210 finally:
1211 if f is not None: 1211 ↛ 1214line 1211 didn't jump to line 1214 because the condition on line 1211 was always true
1212 f.close() 1212 ↛ exitline 1212 didn't return from function 'fetch_swift_results' because the return on line 1203 wasn't executed
1214 for p in result_paths:
1215 self.fetch_one_result(
1216 os.path.join(swift_url, self.swift_container, p, "result.tar"),
1217 src,
1218 arch,
1219 )
1221 def fetch_one_result(self, url: str, src: str, arch: str) -> None:
1222 """Download one result URL for source/arch
1224 Remove matching pending_tests entries.
1225 """
1226 f = None
1227 try:
1228 f = self.urlopen_retry(url)
1229 if f.getcode() == 200: 1229 ↛ 1232line 1229 didn't jump to line 1232 because the condition on line 1229 was always true
1230 tar_bytes = io.BytesIO(f.read())
1231 else:
1232 raise NotImplementedError(
1233 "fetch_one_result(%s): cannot handle HTTP code %r"
1234 % (url, f.getcode())
1235 )
1236 except OSError as err:
1237 self.logger.error("Failure to fetch %s: %s", url, str(err))
1238 # we tolerate "not found" (something went wrong on uploading the
1239 # result), but other things indicate infrastructure problems
1240 if getattr(err, "code", -1) == 404:
1241 return
1242 sys.exit(1)
1243 finally:
1244 if f is not None: 1244 ↛ exit, 1244 ↛ 12462 missed branches: 1) line 1244 didn't return from function 'fetch_one_result' because the return on line 1241 wasn't executed, 2) line 1244 didn't jump to line 1246 because the condition on line 1244 was always true
1245 f.close() 1245 ↛ exitline 1245 didn't return from function 'fetch_one_result' because the return on line 1241 wasn't executed
1246 try:
1247 with tarfile.open(None, "r", tar_bytes) as tar:
1248 exitcode = int(tar.extractfile("exitcode").read().strip()) # type: ignore[union-attr]
1249 srcver = tar.extractfile("testpkg-version").read().decode().strip() # type: ignore[union-attr]
1250 (ressrc, ver) = srcver.split()
1251 testinfo = json.loads(tar.extractfile("testinfo.json").read().decode()) # type: ignore[union-attr]
1252 except (KeyError, ValueError, tarfile.TarError) as err:
1253 self.logger.error("%s is damaged, ignoring: %s", url, str(err))
1254 # ignore this; this will leave an orphaned request in autopkgtest-pending.json
1255 # and thus require manual retries after fixing the tmpfail, but we
1256 # can't just blindly attribute it to some pending test.
1257 return
1259 if src != ressrc: 1259 ↛ 1260line 1259 didn't jump to line 1260 because the condition on line 1259 was never true
1260 self.logger.error(
1261 "%s is a result for package %s, but expected package %s",
1262 url,
1263 ressrc,
1264 src,
1265 )
1266 return
1268 # parse recorded triggers in test result
1269 for e in testinfo.get("custom_environment", []): 1269 ↛ 1274line 1269 didn't jump to line 1274 because the loop on line 1269 didn't complete
1270 if e.startswith("ADT_TEST_TRIGGERS="): 1270 ↛ 1269line 1270 didn't jump to line 1269 because the condition on line 1270 was always true
1271 result_triggers = [i for i in e.split("=", 1)[1].split() if "/" in i]
1272 break
1273 else:
1274 self.logger.error("%s result has no ADT_TEST_TRIGGERS, ignoring")
1275 return
1277 run_id = os.path.basename(os.path.dirname(url))
1278 seen = round(calendar.timegm(time.strptime(run_id, "%Y%m%d_%H%M%S@")))
1279 # allow some skipped tests, but nothing else
1280 if exitcode in [0, 2]:
1281 result = Result.PASS
1282 elif exitcode == 8: 1282 ↛ 1283line 1282 didn't jump to line 1283 because the condition on line 1282 was never true
1283 result = Result.NEUTRAL
1284 else:
1285 result = Result.FAIL
1287 self.logger.info(
1288 "Fetched test result for %s/%s/%s %s (triggers: %s): %s",
1289 src,
1290 ver,
1291 arch,
1292 run_id,
1293 result_triggers,
1294 result.name.lower(),
1295 )
1297 # remove matching test requests
1298 for trigger in result_triggers:
1299 self.remove_from_pending(trigger, src, arch)
1301 # add this result
1302 for trigger in result_triggers:
1303 self.add_trigger_to_results(trigger, src, ver, arch, run_id, seen, result)
1305 def remove_from_pending(
1306 self, trigger: str, src: str, arch: str, timestamp: int = sys.maxsize
1307 ) -> None:
1308 assert self.pending_tests is not None # for type checking
1309 try:
1310 arch_dict = self.pending_tests[trigger][src]
1311 if timestamp < arch_dict[arch]:
1312 # The result is from before the moment of scheduling, so it's
1313 # not the one we're waiting for
1314 return
1315 del arch_dict[arch]
1316 if not arch_dict:
1317 del self.pending_tests[trigger][src]
1318 if not self.pending_tests[trigger]:
1319 del self.pending_tests[trigger]
1320 self.logger.debug(
1321 "-> matches pending request %s/%s for trigger %s", src, arch, trigger
1322 )
1323 except KeyError:
1324 self.logger.debug(
1325 "-> does not match any pending request for %s/%s", src, arch
1326 )
1328 def add_trigger_to_results(
1329 self,
1330 trigger: str,
1331 src: str,
1332 ver: str,
1333 arch: str,
1334 run_id: str,
1335 timestamp: int,
1336 status_to_add: Result,
1337 ) -> None:
1338 # Ensure that we got a new enough version
1339 try:
1340 (trigsrc, trigver) = trigger.split("/", 1)
1341 except ValueError:
1342 self.logger.info("Ignoring invalid test trigger %s", trigger)
1343 return
1344 if trigsrc == src and apt_pkg.version_compare(ver, trigver) < 0: 1344 ↛ 1345line 1344 didn't jump to line 1345 because the condition on line 1344 was never true
1345 self.logger.debug(
1346 "test trigger %s, but run for older version %s, ignoring", trigger, ver
1347 )
1348 return
1350 stored_result = (
1351 self.test_results.setdefault(trigger, {})
1352 .setdefault(src, {})
1353 .setdefault(arch, [Result.FAIL, None, "", 0])
1354 )
1356 # reruns shouldn't flip the result from PASS or NEUTRAL to
1357 # FAIL, so remember the most recent version of the best result
1358 # we've seen. Except for reference updates, which we always
1359 # want to update with the most recent result. The result data
1360 # may not be ordered by timestamp, so we need to check time.
1361 update = False
1362 if self.options.adt_baseline == "reference" and trigger == REF_TRIG:
1363 if stored_result[3] < timestamp:
1364 update = True
1365 elif status_to_add < stored_result[0]:
1366 update = True
1367 elif status_to_add == stored_result[0] and stored_result[3] < timestamp:
1368 update = True
1370 if update:
1371 stored_result[0] = status_to_add
1372 stored_result[1] = ver
1373 stored_result[2] = run_id
1374 stored_result[3] = timestamp
1376 def send_test_request(
1377 self, src: str, arch: str, triggers: list[str], huge: bool = False
1378 ) -> None:
1379 """Send out AMQP request for testing src/arch for triggers
1381 If huge is true, then the request will be put into the -huge instead of
1382 normal queue.
1383 """
1384 if self.options.dry_run: 1384 ↛ 1385line 1384 didn't jump to line 1385 because the condition on line 1384 was never true
1385 return
1387 params: dict[str, Any] = {"triggers": triggers}
1388 if self.options.adt_ppas:
1389 params["ppas"] = self.options.adt_ppas
1390 qname = f"debci-ppa-{self.options.series}-{arch}"
1391 elif huge:
1392 qname = f"debci-huge-{self.options.series}-{arch}"
1393 else:
1394 qname = f"debci-{self.options.series}-{arch}"
1395 params["submit-time"] = time.strftime("%Y-%m-%d %H:%M:%S%z", time.gmtime())
1397 if self.amqp_channel: 1397 ↛ 1398line 1397 didn't jump to line 1398 because the condition on line 1397 was never true
1398 self.amqp_channel.basic_publish(
1399 amqp.Message(
1400 src + "\n" + json.dumps(params), delivery_mode=2
1401 ), # persistent
1402 routing_key=qname,
1403 )
1404 # we save pending.json with every request, so that if britney
1405 # crashes we don't re-request tests. This is only needed when using
1406 # real amqp, as with file-based submission the pending tests are
1407 # returned by debci along with the results each run.
1408 self.save_pending_json()
1409 else:
1410 # for file-based submission, triggers are space separated
1411 params["triggers"] = [" ".join(params["triggers"])]
1412 assert self.amqp_file_handle
1413 self.amqp_file_handle.write(f"{qname}:{src} {json.dumps(params)}\n")
1415 def pkg_test_request(
1416 self, src: str, arch: str, all_triggers: list[str], huge: bool = False
1417 ) -> None:
1418 """Request one package test for a set of triggers
1420 all_triggers is a list of "pkgname/version". These are the packages
1421 that will be taken from the source suite. The first package in this
1422 list is the package that triggers the testing of src, the rest are
1423 additional packages required for installability of the test deps. If
1424 huge is true, then the request will be put into the -huge instead of
1425 normal queue.
1427 This will only be done if that test wasn't already requested in
1428 a previous run (i. e. if it's not already in self.pending_tests)
1429 or if there is already a fresh or a positive result for it. This
1430 ensures to download current results for this package before
1431 requesting any test."""
1432 trigger = all_triggers[0]
1433 uses_swift = not self.options.adt_swift_url.startswith("file://")
1434 try:
1435 result = self.test_results[trigger][src][arch]
1436 has_result = True
1437 except KeyError:
1438 has_result = False
1440 if has_result:
1441 result_state = result[0]
1442 if result_state in {Result.OLD_PASS, Result.OLD_FAIL, Result.OLD_NEUTRAL}:
1443 pass
1444 elif (
1445 result_state == Result.FAIL
1446 and self.result_in_baseline(src, arch)[0]
1447 in {Result.PASS, Result.NEUTRAL, Result.OLD_PASS, Result.OLD_NEUTRAL}
1448 and self._now - result[3] > self.options.adt_retry_older_than
1449 ):
1450 # We might want to retry this failure, so continue
1451 pass
1452 elif not uses_swift:
1453 # We're done if we don't retrigger and we're not using swift
1454 return
1455 elif result_state in {Result.PASS, Result.NEUTRAL}:
1456 self.logger.debug(
1457 "%s/%s triggered by %s already known", src, arch, trigger
1458 )
1459 return
1461 # Without swift we don't expect new results
1462 if uses_swift:
1463 self.logger.info(
1464 "Checking for new results for failed %s/%s for trigger %s",
1465 src,
1466 arch,
1467 trigger,
1468 )
1469 self.fetch_swift_results(self.options.adt_swift_url, src, arch)
1470 # do we have one now?
1471 try:
1472 self.test_results[trigger][src][arch]
1473 return
1474 except KeyError:
1475 pass
1477 self.request_test_if_not_queued(src, arch, trigger, all_triggers, huge=huge)
1479 def request_test_if_not_queued(
1480 self,
1481 src: str,
1482 arch: str,
1483 trigger: str,
1484 all_triggers: list[str] = [],
1485 huge: bool = False,
1486 ) -> None:
1487 assert self.pending_tests is not None # for type checking
1488 if not all_triggers:
1489 all_triggers = [trigger]
1491 # Don't re-request if it's already pending
1492 arch_dict = self.pending_tests.setdefault(trigger, {}).setdefault(src, {})
1493 if arch in arch_dict.keys():
1494 self.logger.debug(
1495 "Test %s/%s for %s is already pending, not queueing", src, arch, trigger
1496 )
1497 else:
1498 self.logger.debug(
1499 "Requesting %s autopkgtest on %s to verify %s", src, arch, trigger
1500 )
1501 arch_dict[arch] = self._now
1502 self.send_test_request(src, arch, all_triggers, huge=huge)
1504 def result_in_baseline(self, src: str, arch: str) -> list[Any]:
1505 """Get the result for src on arch in the baseline
1507 The baseline is optionally all data or a reference set)
1508 """
1510 # this requires iterating over all cached results and thus is expensive;
1511 # cache the results
1512 try:
1513 return self.result_in_baseline_cache[src][arch]
1514 except KeyError:
1515 pass
1517 result_reference: list[Any] = [Result.NONE, None, "", 0]
1518 if self.options.adt_baseline == "reference":
1519 if src not in self.suite_info.target_suite.sources: 1519 ↛ 1520line 1519 didn't jump to line 1520 because the condition on line 1519 was never true
1520 return result_reference
1522 try:
1523 result_reference = self.test_results[REF_TRIG][src][arch]
1524 self.logger.debug(
1525 "Found result for src %s in reference: %s",
1526 src,
1527 result_reference[0].name,
1528 )
1529 except KeyError:
1530 self.logger.debug(
1531 "Found NO result for src %s in reference: %s",
1532 src,
1533 result_reference[0].name,
1534 )
1535 self.result_in_baseline_cache[src][arch] = deepcopy(result_reference)
1536 return result_reference
1538 result_ever: list[Any] = [Result.FAIL, None, "", 0]
1539 for srcmap in self.test_results.values():
1540 try:
1541 if srcmap[src][arch][0] != Result.FAIL:
1542 result_ever = srcmap[src][arch]
1543 # If we are not looking at a reference run, We don't really
1544 # care about anything except the status, so we're done
1545 # once we find a PASS.
1546 if result_ever[0] == Result.PASS:
1547 break
1548 except KeyError:
1549 pass
1551 self.result_in_baseline_cache[src][arch] = deepcopy(result_ever)
1552 self.logger.debug("Result for src %s ever: %s", src, result_ever[0].name)
1553 return result_ever
1555 def has_test_in_target(self, src: str) -> bool:
1556 test_in_target = False
1557 try:
1558 srcinfo = self.suite_info.target_suite.sources[src]
1559 if "autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo):
1560 test_in_target = True
1561 # AttributeError is only needed for the test suite as
1562 # srcinfo can be a NoneType
1563 except (KeyError, AttributeError):
1564 pass
1566 return test_in_target
1568 def pkg_test_result(
1569 self, src: str, ver: str, arch: str, trigger: str
1570 ) -> tuple[str, str, str | None, str]:
1571 """Get current test status of a particular package
1573 Return (status, real_version, run_id, log_url) tuple; status is a key in
1574 EXCUSES_LABELS. run_id is None if the test is still running.
1575 """
1576 assert self.pending_tests is not None # for type checking
1577 # determine current test result status
1578 run_id = None
1579 try:
1580 r = self.test_results[trigger][src][arch]
1581 ver = r[1]
1582 run_id = r[2]
1584 if r[0] in {Result.FAIL, Result.OLD_FAIL}:
1585 # determine current test result status
1586 baseline_result = self.result_in_baseline(src, arch)[0]
1588 # Special-case triggers from linux-meta*: we cannot compare
1589 # results against different kernels, as e. g. a DKMS module
1590 # might work against the default kernel but fail against a
1591 # different flavor; so for those, ignore the "ever
1592 # passed" check; FIXME: check against trigsrc only
1593 if self.options.adt_baseline != "reference" and (
1594 trigger.startswith("linux-meta") or trigger.startswith("linux/")
1595 ):
1596 baseline_result = Result.FAIL
1598 # Check if the autopkgtest (still) exists in the target suite
1599 test_in_target = self.has_test_in_target(src)
1601 if test_in_target and baseline_result in {
1602 Result.NONE,
1603 Result.OLD_FAIL,
1604 Result.OLD_NEUTRAL,
1605 Result.OLD_PASS,
1606 }:
1607 self.request_test_if_not_queued(src, arch, REF_TRIG)
1609 if self.has_force_badtest(src, ver, arch):
1610 result = "IGNORE-FAIL"
1611 elif not test_in_target:
1612 if self.options.adt_ignore_failure_for_new_tests: 1612 ↛ 1613line 1612 didn't jump to line 1613 because the condition on line 1612 was never true
1613 result = "IGNORE-FAIL"
1614 else:
1615 result = r[0].name
1616 elif baseline_result in {Result.FAIL, Result.OLD_FAIL}:
1617 result = "ALWAYSFAIL"
1618 elif baseline_result == Result.NONE: 1618 ↛ 1619line 1618 didn't jump to line 1619 because the condition on line 1618 was never true
1619 result = "RUNNING-REFERENCE"
1620 else:
1621 result = "REGRESSION"
1623 else:
1624 result = r[0].name
1626 url = self.format_log_url(src, arch, run_id)
1627 except KeyError:
1628 # no result for src/arch; still running?
1629 assert arch in self.pending_tests.get(trigger, {}).get(src, {}).keys(), (
1630 "Result for %s/%s/%s (triggered by %s) is neither known nor pending!"
1631 % (src, ver, arch, trigger)
1632 )
1634 if self.has_force_badtest(src, ver, arch):
1635 result = "RUNNING-IGNORE"
1636 else:
1637 if self.has_test_in_target(src):
1638 baseline_result = self.result_in_baseline(src, arch)[0]
1639 if baseline_result == Result.FAIL:
1640 result = "RUNNING-ALWAYSFAIL"
1641 else:
1642 result = "RUNNING"
1643 else:
1644 if self.options.adt_ignore_failure_for_new_tests:
1645 result = "RUNNING-IGNORE"
1646 else:
1647 result = "RUNNING"
1648 url = self.options.adt_ci_url + "status/pending"
1650 return (result, ver, run_id, url)
1652 def has_force_badtest(self, src: str, ver: str, arch: str) -> bool:
1653 """Check if src/ver/arch has a force-badtest hint"""
1655 assert self.hints is not None
1656 hints = self.hints.search("force-badtest", package=src)
1657 if hints:
1658 self.logger.info(
1659 "Checking hints for %s/%s/%s: %s",
1660 src,
1661 arch,
1662 ver,
1663 [str(h) for h in hints],
1664 )
1665 for hint in hints:
1666 if [
1667 mi
1668 for mi in hint.packages
1669 if mi.architecture in ["source", arch]
1670 and (
1671 mi.version is None
1672 or mi.version == "all" # Historical unversioned hint
1673 or apt_pkg.version_compare(ver, mi.version) <= 0
1674 )
1675 ]:
1676 return True
1678 return False
1680 def has_built_on_this_arch_or_is_arch_all(
1681 self, src_data: SourcePackage, arch: str
1682 ) -> bool:
1683 """When a source builds arch:all binaries, those binaries are
1684 added to all architectures and thus the source 'exists'
1685 everywhere. This function checks if the source has any arch
1686 specific binaries on this architecture and if not, if it
1687 has them on any architecture.
1688 """
1689 packages_s_a = self.suite_info.primary_source_suite.binaries[arch]
1690 has_unknown_binary = False
1691 for binary_s in src_data.binaries:
1692 try:
1693 binary_u = packages_s_a[binary_s.package_name]
1694 except KeyError:
1695 # src_data.binaries has all the built binaries, so if
1696 # we get here, we know that at least one architecture
1697 # has architecture specific binaries
1698 has_unknown_binary = True
1699 continue
1700 if binary_u.architecture == arch:
1701 return True
1702 # If we get here, we have only seen arch:all packages for this
1703 # arch.
1704 return not has_unknown_binary