Coverage for britney2/policies/autopkgtest.py: 90%
771 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
1# -*- coding: utf-8 -*-
3# Copyright (C) 2013 - 2016 Canonical Ltd.
4# Authors:
5# Colin Watson <cjwatson@ubuntu.com>
6# Jean-Baptiste Lallement <jean-baptiste.lallement@canonical.com>
7# Martin Pitt <martin.pitt@ubuntu.com>
9# This program is free software; you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation; either version 2 of the License, or
12# (at your option) any later version.
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
19import calendar
20import collections
21import io
22import itertools
23import json
24import optparse
25import os
26import sys
27import tarfile
28import time
29import urllib.parse
30from copy import deepcopy
31from enum import Enum
32from functools import lru_cache, total_ordering
33from typing import TYPE_CHECKING, Any, Optional, cast
34from collections.abc import Iterator
35from urllib.request import urlopen
37import apt_pkg
39import britney2.hints
40from britney2 import (
41 BinaryPackageId,
42 PackageId,
43 SourcePackage,
44 SuiteClass,
45 Suites,
46 TargetSuite,
47)
48from britney2.migrationitem import MigrationItem
49from britney2.policies import PolicyVerdict
50from britney2.policies.policy import AbstractBasePolicy
51from britney2.utils import iter_except, parse_option
53if TYPE_CHECKING: 53 ↛ 54line 53 didn't jump to line 54, because the condition on line 53 was never true
54 import amqplib.client_0_8 as amqp
56 from ..britney import Britney
57 from ..excuse import Excuse
58 from ..hints import HintParser
61@total_ordering
62class Result(Enum):
63 PASS = 1
64 NEUTRAL = 2
65 FAIL = 3
66 OLD_PASS = 4
67 OLD_NEUTRAL = 5
68 OLD_FAIL = 6
69 NONE = 7
71 def __lt__(self, other: "Result") -> bool:
72 return True if self.value < other.value else False
75EXCUSES_LABELS = {
76 "PASS": '<span style="background:#87d96c">Pass</span>',
77 "OLD_PASS": '<span style="background:#87d96c">Pass</span>',
78 "NEUTRAL": '<span style="background:#e5c545">No tests, superficial or marked flaky</span>',
79 "OLD_NEUTRAL": '<span style="background:#e5c545">No tests, superficial or marked flaky</span>',
80 "FAIL": '<span style="background:#ff6666">Failed</span>',
81 "OLD_FAIL": '<span style="background:#ff6666">Failed</span>',
82 "ALWAYSFAIL": '<span style="background:#e5c545">Failed (not a regression)</span>',
83 "REGRESSION": '<span style="background:#ff6666">Regression or new test</span>',
84 "IGNORE-FAIL": '<span style="background:#e5c545">Ignored failure</span>',
85 "RUNNING": '<span style="background:#99ddff">Test in progress</span>',
86 "RUNNING-REFERENCE": '<span style="background:#ff6666">Reference test in progress, but real test failed already</span>',
87 "RUNNING-ALWAYSFAIL": '<span style="background:#99ddff">Test in progress (will not be considered a regression)</span>',
88}
90REF_TRIG = "migration-reference/0"
92VERSION_KEY = "britney-autopkgtest-pending-file-version"
95def srchash(src: str) -> str:
96 """archive hash prefix for source package"""
98 if src.startswith("lib"): 98 ↛ 99line 98 didn't jump to line 99, because the condition on line 98 was never true
99 return src[:4]
100 else:
101 return src[0]
104def added_pkgs_compared_to_target_suite(
105 package_ids: frozenset[BinaryPackageId],
106 target_suite: TargetSuite,
107 *,
108 invert: bool = False,
109) -> Iterator[BinaryPackageId]:
110 if invert: 110 ↛ 111line 110 didn't jump to line 111, because the condition on line 110 was never true
111 pkgs_ids_to_ignore = package_ids - set(
112 target_suite.which_of_these_are_in_the_suite(package_ids)
113 )
114 names_ignored = {p.package_name for p in pkgs_ids_to_ignore}
115 else:
116 names_ignored = {
117 p.package_name
118 for p in target_suite.which_of_these_are_in_the_suite(package_ids)
119 }
120 yield from (p for p in package_ids if p.package_name not in names_ignored)
123def all_leaf_results(
124 test_results: dict[str, dict[str, dict[str, list[Any]]]],
125) -> Iterator[list[Any]]:
126 for trigger in test_results.values():
127 for arch in trigger.values():
128 yield from arch.values()
131def mark_result_as_old(result: Result) -> Result:
132 """Convert current result into corresponding old result"""
134 if result == Result.FAIL:
135 result = Result.OLD_FAIL
136 elif result == Result.PASS:
137 result = Result.OLD_PASS
138 elif result == Result.NEUTRAL: 138 ↛ 140line 138 didn't jump to line 140, because the condition on line 138 was never false
139 result = Result.OLD_NEUTRAL
140 return result
143class AutopkgtestPolicy(AbstractBasePolicy):
144 """autopkgtest regression policy for source migrations
146 Run autopkgtests for the excuse and all of its reverse dependencies, and
147 reject the upload if any of those regress.
148 """
150 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
151 super().__init__(
152 "autopkgtest", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
153 )
154 # tests requested in this and previous runs
155 # trigger -> src -> [arch]
156 self.pending_tests: Optional[dict[str, dict[str, dict[str, int]]]] = None
157 self.pending_tests_file = os.path.join(
158 self.state_dir, "autopkgtest-pending.json"
159 )
160 self.testsuite_triggers: dict[str, set[str]] = {}
161 self.result_in_baseline_cache: dict[str, dict[str, list[Any]]] = (
162 collections.defaultdict(dict)
163 )
165 self.amqp_file_handle: io.TextIOWrapper | None = None
167 # Default values for this policy's options
168 parse_option(options, "adt_baseline")
169 parse_option(options, "adt_huge", to_int=True)
170 parse_option(options, "adt_ppas")
171 parse_option(options, "adt_reference_max_age", day_to_sec=True)
172 parse_option(options, "adt_pending_max_age", default=5, day_to_sec=True)
173 parse_option(options, "adt_regression_penalty", default=0, to_int=True)
174 parse_option(options, "adt_log_url") # see below for defaults
175 parse_option(options, "adt_retry_url") # see below for defaults
176 parse_option(options, "adt_retry_older_than", day_to_sec=True)
177 parse_option(options, "adt_results_cache_age", day_to_sec=True)
178 parse_option(options, "adt_shared_results_cache")
179 parse_option(options, "adt_success_bounty", default=0, to_int=True)
180 parse_option(options, "adt_ignore_failure_for_new_tests", to_bool=True)
182 # When ADT_RESULTS_CACHE_AGE is smaller than or equal to
183 # ADT_REFERENCE_MAX_AGE old reference result will be removed from cache
184 # before the newly scheduled results are in, potentially causing
185 # additional waiting. For packages like glibc this might cause an
186 # infinite delay as there will always be a package that's
187 # waiting. Similarly for ADT_RETRY_OLDER_THAN.
188 if self.options.adt_results_cache_age <= self.options.adt_reference_max_age:
189 self.logger.warning(
190 "Unexpected: ADT_REFERENCE_MAX_AGE bigger than ADT_RESULTS_CACHE_AGE"
191 )
192 if self.options.adt_results_cache_age <= self.options.adt_retry_older_than:
193 self.logger.warning(
194 "Unexpected: ADT_RETRY_OLDER_THAN bigger than ADT_RESULTS_CACHE_AGE"
195 )
197 if not self.options.adt_log_url: 197 ↛ 223line 197 didn't jump to line 223, because the condition on line 197 was never false
198 # Historical defaults
199 if self.options.adt_swift_url.startswith("file://"):
200 self.options.adt_log_url = os.path.join(
201 self.options.adt_ci_url,
202 "data",
203 "autopkgtest",
204 self.options.series,
205 "{arch}",
206 "{hash}",
207 "{package}",
208 "{run_id}",
209 "log.gz",
210 )
211 else:
212 self.options.adt_log_url = os.path.join(
213 self.options.adt_swift_url,
214 "{swift_container}",
215 self.options.series,
216 "{arch}",
217 "{hash}",
218 "{package}",
219 "{run_id}",
220 "log.gz",
221 )
223 if hasattr(self.options, "adt_retry_url_mech"): 223 ↛ 224line 223 didn't jump to line 224, because the condition on line 223 was never true
224 self.logger.warning(
225 "The ADT_RETRY_URL_MECH configuration has been deprecated."
226 )
227 self.logger.warning(
228 "Instead britney now supports ADT_RETRY_URL for more flexibility."
229 )
230 if self.options.adt_retry_url:
231 self.logger.error(
232 "Please remove the ADT_RETRY_URL_MECH as ADT_RETRY_URL will be used."
233 )
234 elif self.options.adt_retry_url_mech == "run_id":
235 self.options.adt_retry_url = (
236 self.options.adt_ci_url + "api/v1/retry/{run_id}"
237 )
238 if not self.options.adt_retry_url: 238 ↛ 255line 238 didn't jump to line 255, because the condition on line 238 was never false
239 # Historical default
240 self.options.adt_retry_url = (
241 self.options.adt_ci_url
242 + "request.cgi?"
243 + "release={release}&arch={arch}&package={package}&trigger={trigger}{ppas}"
244 )
246 # results map: trigger -> src -> arch -> [passed, version, run_id, seen]
247 # - trigger is "source/version" of an unstable package that triggered
248 # this test run.
249 # - "passed" is a Result
250 # - "version" is the package version of "src" of that test
251 # - "run_id" is an opaque ID that identifies a particular test run for
252 # a given src/arch.
253 # - "seen" is an approximate time stamp of the test run. How this is
254 # deduced depends on the interface used.
255 self.test_results: dict[str, dict[str, dict[str, list[Any]]]] = {}
256 if self.options.adt_shared_results_cache:
257 self.results_cache_file = self.options.adt_shared_results_cache
258 else:
259 self.results_cache_file = os.path.join(
260 self.state_dir, "autopkgtest-results.cache"
261 )
263 try:
264 self.options.adt_ppas = self.options.adt_ppas.strip().split()
265 except AttributeError:
266 self.options.adt_ppas = []
268 self.swift_container = "autopkgtest-" + options.series
269 if self.options.adt_ppas:
270 self.swift_container += "-" + options.adt_ppas[-1].replace("/", "-")
272 # restrict adt_arches to architectures we actually run for
273 self.adt_arches = []
274 for arch in self.options.adt_arches.split():
275 if arch in self.options.architectures:
276 self.adt_arches.append(arch)
277 else:
278 self.logger.info(
279 "Ignoring ADT_ARCHES %s as it is not in architectures list", arch
280 )
282 def __del__(self) -> None:
283 if self.amqp_file_handle: 283 ↛ exitline 283 didn't return from function '__del__', because the condition on line 283 was never false
284 try:
285 self.amqp_file_handle.close()
286 except AttributeError:
287 pass
289 def register_hints(self, hint_parser: "HintParser") -> None:
290 hint_parser.register_hint_type(
291 "force-badtest", britney2.hints.split_into_one_hint_per_package
292 )
293 hint_parser.register_hint_type(
294 "force-skiptest", britney2.hints.split_into_one_hint_per_package
295 )
297 def initialise(self, britney: "Britney") -> None:
298 super().initialise(britney)
299 # We want to use the "current" time stamp in multiple locations
300 time_now = round(time.time())
301 if hasattr(self.options, "fake_runtime"):
302 time_now = int(self.options.fake_runtime)
303 self._now = time_now
304 # compute inverse Testsuite-Triggers: map, unifying all series
305 self.logger.info("Building inverse testsuite_triggers map")
306 for suite in self.suite_info:
307 for src, data in suite.sources.items():
308 for trigger in data.testsuite_triggers:
309 self.testsuite_triggers.setdefault(trigger, set()).add(src)
310 target_suite_name = self.suite_info.target_suite.name
312 os.makedirs(self.state_dir, exist_ok=True)
313 self.read_pending_tests()
315 # read the cached results that we collected so far
316 if os.path.exists(self.results_cache_file):
317 with open(self.results_cache_file) as f:
318 test_results = json.load(f)
319 self.test_results = self.check_and_upgrade_cache(test_results)
320 self.logger.info("Read previous results from %s", self.results_cache_file)
321 else:
322 self.logger.info(
323 "%s does not exist, re-downloading all results from swift",
324 self.results_cache_file,
325 )
327 # read in the new results
328 if self.options.adt_swift_url.startswith("file://"):
329 debci_file = self.options.adt_swift_url[7:]
330 if os.path.exists(debci_file):
331 with open(debci_file) as f:
332 test_results = json.load(f)
333 self.logger.info("Read new results from %s", debci_file)
334 for res in test_results["results"]:
335 # tests denied on infrastructure don't get a version
336 if res["version"] is None: 336 ↛ 337line 336 didn't jump to line 337, because the condition on line 336 was never true
337 res["version"] = "blocked-on-ci-infra"
338 assert res["version"] is not None
339 (test_suite, triggers, src, arch, ver, status, run_id, seen) = [
340 res["suite"],
341 res["trigger"],
342 res["package"],
343 res["arch"],
344 res["version"],
345 res["status"],
346 str(res["run_id"]),
347 round(
348 calendar.timegm(
349 time.strptime(
350 res["updated_at"][0:-5], "%Y-%m-%dT%H:%M:%S"
351 )
352 )
353 ),
354 ]
355 if test_suite != target_suite_name: 355 ↛ 357line 355 didn't jump to line 357, because the condition on line 355 was never true
356 # not requested for this target suite, so ignore
357 continue
358 if triggers is None: 358 ↛ 360line 358 didn't jump to line 360, because the condition on line 358 was never true
359 # not requested for this policy, so ignore
360 continue
361 if status is None:
362 # still running => pending
363 continue
364 for trigger in triggers.split():
365 # remove matching test requests
366 self.remove_from_pending(trigger, src, arch, seen)
367 if status == "tmpfail": 367 ↛ 369line 367 didn't jump to line 369, because the condition on line 367 was never true
368 # let's see if we still need it
369 continue
370 self.logger.debug(
371 "Results %s %s %s added", src, trigger, status
372 )
373 self.add_trigger_to_results(
374 trigger,
375 src,
376 ver,
377 arch,
378 run_id,
379 seen,
380 Result[status.upper()],
381 )
382 else:
383 self.logger.info(
384 "%s does not exist, no new data will be processed", debci_file
385 )
387 # The cache can contain results against versions of packages that
388 # are not in any suite anymore. Strip those out, as we don't want
389 # to use those results. Additionally, old references may be
390 # filtered out.
391 if self.options.adt_baseline == "reference":
392 self.filter_old_results()
394 # we need sources, binaries, and installability tester, so for now
395 # remember the whole britney object
396 self.britney = britney
398 # Initialize AMQP connection
399 self.amqp_channel: Optional["amqp.channel.Channel"] = None
400 self.amqp_file_handle = None
401 if self.options.dry_run: 401 ↛ 402line 401 didn't jump to line 402, because the condition on line 401 was never true
402 return
404 amqp_url = self.options.adt_amqp
406 if amqp_url.startswith("amqp://"): 406 ↛ 407line 406 didn't jump to line 407, because the condition on line 406 was never true
407 import amqplib.client_0_8 as amqp
409 # depending on the setup we connect to a AMQP server
410 creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False)
411 self.amqp_con = amqp.Connection(
412 creds.hostname, userid=creds.username, password=creds.password
413 )
414 self.amqp_channel = self.amqp_con.channel()
415 self.logger.info("Connected to AMQP server")
416 elif amqp_url.startswith("file://"): 416 ↛ 421line 416 didn't jump to line 421, because the condition on line 416 was never false
417 # or in Debian and in testing mode, adt_amqp will be a file:// URL
418 amqp_file = amqp_url[7:]
419 self.amqp_file_handle = open(amqp_file, "w", 1)
420 else:
421 raise RuntimeError("Unknown ADT_AMQP schema %s" % amqp_url.split(":", 1)[0])
423 def check_and_upgrade_cache(
424 self, test_results: dict[str, dict[str, dict[str, list[Any]]]]
425 ) -> dict[str, dict[str, dict[str, list[Any]]]]:
426 for leaf_result in all_leaf_results(test_results):
427 leaf_result[0] = Result[leaf_result[0]]
429 # Drop results older than ADT_RESULTS_CACHE_AGE
430 for trigger in list(test_results.keys()):
431 for pkg in list(test_results[trigger].keys()):
432 for arch in list(test_results[trigger][pkg].keys()):
433 arch_result = test_results[trigger][pkg][arch]
434 if self._now - arch_result[3] > self.options.adt_results_cache_age: 434 ↛ 435line 434 didn't jump to line 435, because the condition on line 434 was never true
435 del test_results[trigger][pkg][arch]
436 if not test_results[trigger][pkg]: 436 ↛ 437line 436 didn't jump to line 437, because the condition on line 436 was never true
437 del test_results[trigger][pkg]
438 if not test_results[trigger]: 438 ↛ 439line 438 didn't jump to line 439, because the condition on line 438 was never true
439 del test_results[trigger]
441 return test_results
443 def filter_old_results(self) -> None:
444 """Remove results for old versions and reference runs from the cache.
446 For now, only delete reference runs. If we delete regular
447 results after a while, packages with lots of triggered tests may
448 never have all the results at the same time."""
450 test_results = self.test_results
452 for trigger, trigger_data in test_results.items():
453 for src, results in trigger_data.items():
454 for arch, result in results.items():
455 if (
456 trigger == REF_TRIG
457 and self._now - result[3] > self.options.adt_reference_max_age
458 ):
459 result[0] = mark_result_as_old(result[0])
460 elif not self.test_version_in_any_suite(src, result[1]):
461 result[0] = mark_result_as_old(result[0])
463 def test_version_in_any_suite(self, src: str, version: str) -> bool:
464 """Check if the mentioned version of src is found in a suite
466 To prevent regressions in the target suite, the result should be
467 from a test with the version of the package in either the source
468 suite or the target suite. The source suite is also valid,
469 because due to versioned test dependencies and Breaks/Conflicts
470 relations, regularly the version in the source suite is used
471 during testing.
472 """
474 versions = set()
475 for suite in self.suite_info:
476 try:
477 srcinfo = suite.sources[src]
478 except KeyError:
479 continue
480 versions.add(srcinfo.version)
482 valid_version = False
483 for ver in versions:
484 if apt_pkg.version_compare(ver, version) == 0:
485 valid_version = True
486 break
488 return valid_version
490 def save_pending_json(self) -> None:
491 # update the pending tests on-disk cache
492 self.logger.info(
493 "Updating pending requested tests in %s" % self.pending_tests_file
494 )
495 # Shallow clone pending_tests as we only modify the toplevel and change its type.
496 pending_tests: dict[str, Any] = {}
497 if self.pending_tests:
498 pending_tests = dict(self.pending_tests)
499 # Avoid adding if there are no pending results at all (eases testing)
500 pending_tests[VERSION_KEY] = 1
501 with open(self.pending_tests_file + ".new", "w") as f:
502 json.dump(pending_tests, f, indent=2)
503 os.rename(self.pending_tests_file + ".new", self.pending_tests_file)
505 def save_state(self, britney: "Britney") -> None:
506 super().save_state(britney)
508 # update the results on-disk cache, unless we are using a r/o shared one
509 if not self.options.adt_shared_results_cache:
510 self.logger.info("Updating results cache")
511 test_results = deepcopy(self.test_results)
512 for result in all_leaf_results(test_results):
513 result[0] = result[0].name
514 with open(self.results_cache_file + ".new", "w") as f:
515 json.dump(test_results, f, indent=2)
516 os.rename(self.results_cache_file + ".new", self.results_cache_file)
518 self.save_pending_json()
520 def format_retry_url(
521 self, run_id: Optional[str], arch: str, testsrc: str, trigger: str
522 ) -> str:
523 if self.options.adt_ppas:
524 ppas = "&" + urllib.parse.urlencode(
525 [("ppa", p) for p in self.options.adt_ppas]
526 )
527 else:
528 ppas = ""
529 return cast(str, self.options.adt_retry_url).format(
530 run_id=run_id,
531 release=self.options.series,
532 arch=arch,
533 package=testsrc,
534 trigger=urllib.parse.quote_plus(trigger),
535 ppas=ppas,
536 )
538 def format_log_url(self, testsrc: str, arch: str, run_id: str) -> str:
539 return cast(str, self.options.adt_log_url).format(
540 release=self.options.series,
541 swift_container=self.swift_container,
542 hash=srchash(testsrc),
543 package=testsrc,
544 arch=arch,
545 run_id=run_id,
546 )
548 def apply_src_policy_impl(
549 self,
550 tests_info: dict[str, Any],
551 item: MigrationItem,
552 source_data_tdist: Optional[SourcePackage],
553 source_data_srcdist: SourcePackage,
554 excuse: "Excuse",
555 ) -> PolicyVerdict:
556 assert self.hints is not None # for type checking
557 # initialize
558 verdict = PolicyVerdict.PASS
559 all_self_tests_pass = False
560 source_name = item.package
561 results_info = []
563 # skip/delay autopkgtests until new package is built somewhere
564 if not source_data_srcdist.binaries:
565 self.logger.debug(
566 "%s hasnot been built anywhere, skipping autopkgtest policy",
567 excuse.name,
568 )
569 verdict = PolicyVerdict.REJECTED_TEMPORARILY
570 excuse.add_verdict_info(verdict, "nothing built yet, autopkgtest delayed")
572 if "all" in excuse.missing_builds:
573 self.logger.debug(
574 "%s hasnot been built for arch:all, skipping autopkgtest policy",
575 source_name,
576 )
577 verdict = PolicyVerdict.REJECTED_TEMPORARILY
578 excuse.add_verdict_info(
579 verdict, "arch:all not built yet, autopkgtest delayed"
580 )
582 if verdict == PolicyVerdict.PASS:
583 self.logger.debug("Checking autopkgtests for %s", source_name)
584 trigger = source_name + "/" + source_data_srcdist.version
586 # build a (testsrc, testver) → arch → (status, run_id, log_url) map; we trigger/check test
587 # results per architecture for technical/efficiency reasons, but we
588 # want to evaluate and present the results by tested source package
589 # first
590 pkg_arch_result: dict[
591 tuple[str, str], dict[str, tuple[str, Optional[str], str]]
592 ] = collections.defaultdict(dict)
593 for arch in self.adt_arches:
594 if arch in excuse.missing_builds:
595 verdict = PolicyVerdict.REJECTED_TEMPORARILY
596 self.logger.debug(
597 "%s hasnot been built on arch %s, delay autopkgtest there",
598 source_name,
599 arch,
600 )
601 excuse.add_verdict_info(
602 verdict,
603 "arch:%s not built yet, autopkgtest delayed there" % arch,
604 )
605 elif arch in excuse.policy_info["depends"].get(
606 "arch_all_not_installable", []
607 ):
608 self.logger.debug(
609 "%s is uninstallable on arch %s (which is allowed), not running autopkgtest there",
610 source_name,
611 arch,
612 )
613 excuse.addinfo(
614 "uninstallable on arch %s (which is allowed), not running autopkgtest there"
615 % arch
616 )
617 elif (
618 arch in excuse.unsatisfiable_on_archs
619 and arch
620 not in excuse.policy_info["depends"].get(
621 "autopkgtest_run_anyways", []
622 )
623 ):
624 verdict = PolicyVerdict.REJECTED_TEMPORARILY
625 self.logger.debug(
626 "%s is uninstallable on arch %s, not running autopkgtest there",
627 source_name,
628 arch,
629 )
630 excuse.addinfo(
631 "uninstallable on arch %s, not running autopkgtest there" % arch
632 )
633 else:
634 self.request_tests_for_source(
635 item, arch, source_data_srcdist, pkg_arch_result, excuse
636 )
638 # add test result details to Excuse
639 cloud_url = self.options.adt_ci_url + "packages/%(h)s/%(s)s/%(r)s/%(a)s"
640 testver: Optional[str]
641 for testsrc, testver in sorted(pkg_arch_result):
642 assert testver is not None
643 arch_results = pkg_arch_result[(testsrc, testver)]
644 r = {v[0] for v in arch_results.values()}
645 if "REGRESSION" in r:
646 verdict = PolicyVerdict.REJECTED_PERMANENTLY
647 elif (
648 "RUNNING" in r or "RUNNING-REFERENCE" in r
649 ) and verdict == PolicyVerdict.PASS:
650 verdict = PolicyVerdict.REJECTED_TEMPORARILY
651 # skip version if still running on all arches
652 if not r - {"RUNNING", "RUNNING-ALWAYSFAIL"}:
653 testver = None
655 # A source package is eligible for the bounty if it has tests
656 # of its own that pass on all tested architectures.
657 if testsrc == source_name:
658 excuse.autopkgtest_results = r
659 if r == {"PASS"}:
660 all_self_tests_pass = True
662 if testver:
663 testname = "%s/%s" % (testsrc, testver)
664 else:
665 testname = testsrc
667 html_archmsg = []
668 for arch in sorted(arch_results):
669 (status, run_id, log_url) = arch_results[arch]
670 artifact_url = None
671 retry_url = None
672 reference_url = None
673 reference_retry_url = None
674 history_url = None
675 if self.options.adt_ppas:
676 if log_url.endswith("log.gz"):
677 artifact_url = log_url.replace("log.gz", "artifacts.tar.gz")
678 else:
679 history_url = cloud_url % {
680 "h": srchash(testsrc),
681 "s": testsrc,
682 "r": self.options.series,
683 "a": arch,
684 }
685 if status in ("NEUTRAL", "REGRESSION", "RUNNING-REFERENCE"):
686 retry_url = self.format_retry_url(
687 run_id, arch, testsrc, trigger
688 )
690 baseline_result = self.result_in_baseline(testsrc, arch)
691 if baseline_result and baseline_result[0] != Result.NONE:
692 baseline_run_id = str(baseline_result[2])
693 reference_url = self.format_log_url(
694 testsrc, arch, baseline_run_id
695 )
696 if self.options.adt_baseline == "reference":
697 reference_retry_url = self.format_retry_url(
698 baseline_run_id, arch, testsrc, REF_TRIG
699 )
700 tests_info.setdefault(testname, {})[arch] = [
701 status,
702 log_url,
703 history_url,
704 artifact_url,
705 retry_url,
706 ]
708 # render HTML snippet for testsrc entry for current arch
709 if history_url:
710 message = '<a href="%s">%s</a>' % (history_url, arch)
711 else:
712 message = arch
713 message += ': <a href="%s">%s</a>' % (
714 log_url,
715 EXCUSES_LABELS[status],
716 )
717 if retry_url:
718 message += (
719 '<a href="%s" style="text-decoration: none;"> ♻</a>'
720 % retry_url
721 )
722 if reference_url:
723 message += ' (<a href="%s">reference</a>' % reference_url
724 if reference_retry_url:
725 message += (
726 '<a href="%s" style="text-decoration: none;"> ♻</a>'
727 % reference_retry_url
728 )
729 message += ")"
730 if artifact_url:
731 message += ' <a href="%s">[artifacts]</a>' % artifact_url
732 html_archmsg.append(message)
734 # render HTML line for testsrc entry
735 # - if action is or may be required
736 # - for ones own package
737 if (
738 r
739 - {
740 "PASS",
741 "NEUTRAL",
742 "RUNNING-ALWAYSFAIL",
743 "ALWAYSFAIL",
744 "IGNORE-FAIL",
745 }
746 or testsrc == source_name
747 ):
748 if testver:
749 pkg = '<a href="#{0}">{0}</a>/{1}'.format(testsrc, testver)
750 else:
751 pkg = '<a href="#{0}">{0}</a>'.format(testsrc)
752 results_info.append(
753 "autopkgtest for %s: %s" % (pkg, ", ".join(html_archmsg))
754 )
756 if verdict != PolicyVerdict.PASS:
757 # check for force-skiptest hint
758 hints = self.hints.search(
759 "force-skiptest",
760 package=source_name,
761 version=source_data_srcdist.version,
762 )
763 if hints:
764 excuse.addreason("skiptest")
765 excuse.addinfo(
766 "Should wait for tests relating to %s %s, but forced by %s"
767 % (source_name, source_data_srcdist.version, hints[0].user)
768 )
769 verdict = PolicyVerdict.PASS_HINTED
770 else:
771 excuse.addreason("autopkgtest")
773 if (
774 self.options.adt_success_bounty
775 and verdict == PolicyVerdict.PASS
776 and all_self_tests_pass
777 ):
778 excuse.add_bounty("autopkgtest", self.options.adt_success_bounty)
779 if self.options.adt_regression_penalty and verdict in {
780 PolicyVerdict.REJECTED_PERMANENTLY,
781 PolicyVerdict.REJECTED_TEMPORARILY,
782 }:
783 if self.options.adt_regression_penalty > 0: 783 ↛ 786line 783 didn't jump to line 786, because the condition on line 783 was never false
784 excuse.add_penalty("autopkgtest", self.options.adt_regression_penalty)
785 # In case we give penalties instead of blocking, we must always pass
786 verdict = PolicyVerdict.PASS
787 for i in results_info:
788 if verdict.is_rejected:
789 excuse.add_verdict_info(verdict, i)
790 else:
791 excuse.addinfo(i)
793 return verdict
795 #
796 # helper functions
797 #
799 @staticmethod
800 def has_autodep8(srcinfo: SourcePackage) -> bool:
801 """Check if package is covered by autodep8
803 srcinfo is an item from self.britney.sources
804 """
805 # autodep8?
806 for t in srcinfo.testsuite:
807 if t.startswith("autopkgtest-pkg"):
808 return True
810 return False
812 def request_tests_for_source(
813 self,
814 item: MigrationItem,
815 arch: str,
816 source_data_srcdist: SourcePackage,
817 pkg_arch_result: dict[
818 tuple[str, str], dict[str, tuple[str, Optional[str], str]]
819 ],
820 excuse: "Excuse",
821 ) -> None:
822 pkg_universe = self.britney.pkg_universe
823 target_suite = self.suite_info.target_suite
824 source_suite = item.suite
825 sources_t = target_suite.sources
826 sources_s = item.suite.sources
827 packages_s_a = item.suite.binaries[arch]
828 source_name = item.package
829 source_version = source_data_srcdist.version
830 # request tests (unless they were already requested earlier or have a result)
831 tests = self.tests_for_source(source_name, source_version, arch, excuse)
832 is_huge = len(tests) > self.options.adt_huge
834 # Here we figure out what is required from the source suite
835 # for the test to install successfully.
836 #
837 # The ImplicitDependencyPolicy does a similar calculation, but
838 # if I (elbrus) understand correctly, only in the reverse
839 # dependency direction. We are doing something similar here
840 # but in the dependency direction (note: this code is older).
841 # We use the ImplicitDependencyPolicy result for the reverse
842 # dependencies and we keep the code below for the
843 # dependencies. Using the ImplicitDependencyPolicy results
844 # also in the reverse direction seems to require quite some
845 # reorganisation to get that information available here, as in
846 # the current state only the current excuse is available here
847 # and the required other excuses may not be calculated yet.
848 #
849 # Loop over all binary packages from trigger and
850 # recursively look up which *versioned* dependencies are
851 # only satisfied in the source suite.
852 #
853 # For all binaries found, look up which packages they
854 # break/conflict with in the target suite, but not in the
855 # source suite. The main reason to do this is to cover test
856 # dependencies, so we will check Testsuite-Triggers as
857 # well.
858 #
859 # OI: do we need to do the first check in a smart way
860 # (i.e. only for the packages that are actually going to be
861 # installed) for the breaks/conflicts set as well, i.e. do
862 # we need to check if any of the packages that we now
863 # enforce being from the source suite, actually have new
864 # versioned depends and new breaks/conflicts.
865 #
866 # For all binaries found, add the set of unique source
867 # packages to the list of triggers.
869 bin_triggers: set[PackageId] = set()
870 bin_new = set(source_data_srcdist.binaries)
871 for n_binary in iter_except(bin_new.pop, KeyError):
872 if n_binary in bin_triggers:
873 continue
874 bin_triggers.add(n_binary)
876 # Check if there is a dependency that is not
877 # available in the target suite.
878 # We add slightly too much here, because new binaries
879 # will also show up, but they are already properly
880 # installed. Nevermind.
881 depends = pkg_universe.dependencies_of(n_binary)
882 # depends is a frozenset{frozenset{BinaryPackageId, ..}}
883 for deps_of_bin in depends:
884 if target_suite.any_of_these_are_in_the_suite(deps_of_bin):
885 # if any of the alternative dependencies is already
886 # satisfied in the target suite, we can just ignore it
887 continue
888 # We'll figure out which version later
889 bin_new.update(
890 added_pkgs_compared_to_target_suite(deps_of_bin, target_suite)
891 )
893 # Check if the package breaks/conflicts anything. We might
894 # be adding slightly too many source packages due to the
895 # check here as a binary package that is broken may be
896 # coming from a different source package in the source
897 # suite. Nevermind.
898 bin_broken = set()
899 for t_binary in bin_triggers:
900 # broken is a frozenset{BinaryPackageId, ..}
901 broken = pkg_universe.negative_dependencies_of(
902 cast(BinaryPackageId, t_binary)
903 )
904 broken_in_target = {
905 p.package_name
906 for p in target_suite.which_of_these_are_in_the_suite(broken)
907 }
908 broken_in_source = {
909 p.package_name
910 for p in source_suite.which_of_these_are_in_the_suite(broken)
911 }
912 # We want packages with a newer version in the source suite that
913 # no longer has the conflict. This is an approximation
914 broken_filtered = set(
915 p
916 for p in broken
917 if p.package_name in broken_in_target
918 and p.package_name not in broken_in_source
919 )
920 # We add the version in the target suite, but the code below will
921 # change it to the version in the source suite
922 bin_broken.update(broken_filtered)
923 bin_triggers.update(bin_broken)
925 # The ImplicitDependencyPolicy also found packages that need
926 # to migrate together, so add them to the triggers too.
927 for bin_implicit in excuse.depends_packages_flattened:
928 if bin_implicit.architecture == arch:
929 bin_triggers.add(bin_implicit)
931 triggers = set()
932 for t_binary2 in bin_triggers:
933 if t_binary2.architecture == arch:
934 try:
935 source_of_bin = packages_s_a[t_binary2.package_name].source
936 # If the version in the target suite is the same, don't add a trigger.
937 # Note that we looked up the source package in the source suite.
938 # If it were a different source package in the target suite, however, then
939 # we would not have this source package in the same version anyway.
940 if (
941 sources_t.get(source_of_bin, None) is None
942 or sources_s[source_of_bin].version
943 != sources_t[source_of_bin].version
944 ):
945 triggers.add(
946 source_of_bin + "/" + sources_s[source_of_bin].version
947 )
948 except KeyError:
949 # Apparently the package was removed from
950 # unstable e.g. if packages are replaced
951 # (e.g. -dbg to -dbgsym)
952 pass
953 if t_binary2 not in source_data_srcdist.binaries:
954 for tdep_src in self.testsuite_triggers.get( 954 ↛ 957line 954 didn't jump to line 957, because the loop on line 954 never started
955 t_binary2.package_name, set()
956 ):
957 try:
958 # Only add trigger if versions in the target and source suites are different
959 if (
960 sources_t.get(tdep_src, None) is None
961 or sources_s[tdep_src].version
962 != sources_t[tdep_src].version
963 ):
964 triggers.add(
965 tdep_src + "/" + sources_s[tdep_src].version
966 )
967 except KeyError:
968 # Apparently the source was removed from
969 # unstable (testsuite_triggers are unified
970 # over all suites)
971 pass
972 trigger = source_name + "/" + source_version
973 triggers.discard(trigger)
974 triggers_list = sorted(list(triggers))
975 triggers_list.insert(0, trigger)
977 for testsrc, testver in tests:
978 self.pkg_test_request(testsrc, arch, triggers_list, huge=is_huge)
979 (result, real_ver, run_id, url) = self.pkg_test_result(
980 testsrc, testver, arch, trigger
981 )
982 pkg_arch_result[(testsrc, real_ver)][arch] = (result, run_id, url)
984 def tests_for_source(
985 self, src: str, ver: str, arch: str, excuse: "Excuse"
986 ) -> list[tuple[str, str]]:
987 """Iterate over all tests that should be run for given source and arch"""
989 source_suite = self.suite_info.primary_source_suite
990 target_suite = self.suite_info.target_suite
991 sources_info = target_suite.sources
992 binaries_info = target_suite.binaries[arch]
994 reported_pkgs = set()
996 tests = []
998 # Debian doesn't have linux-meta, but Ubuntu does
999 # for linux themselves we don't want to trigger tests -- these should
1000 # all come from linux-meta*. A new kernel ABI without a corresponding
1001 # -meta won't be installed and thus we can't sensibly run tests against
1002 # it.
1003 if ( 1003 ↛ 1007line 1003 didn't jump to line 1007
1004 src.startswith("linux")
1005 and src.replace("linux", "linux-meta") in sources_info
1006 ):
1007 return []
1009 # we want to test the package itself, if it still has a test in unstable
1010 # but only if the package actually exists on this arch
1011 srcinfo = source_suite.sources[src]
1012 if ("autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo)) and len(
1013 excuse.packages[arch]
1014 ) > 0:
1015 reported_pkgs.add(src)
1016 tests.append((src, ver))
1018 extra_bins = []
1019 # Debian doesn't have linux-meta, but Ubuntu does
1020 # Hack: For new kernels trigger all DKMS packages by pretending that
1021 # linux-meta* builds a "dkms" binary as well. With that we ensure that we
1022 # don't regress DKMS drivers with new kernel versions.
1023 if src.startswith("linux-meta"):
1024 # does this have any image on this arch?
1025 for pkg_id in srcinfo.binaries:
1026 if pkg_id.architecture == arch and "-image" in pkg_id.package_name:
1027 try:
1028 extra_bins.append(binaries_info["dkms"].pkg_id)
1029 except KeyError:
1030 pass
1032 if not self.has_built_on_this_arch_or_is_arch_all(srcinfo, arch):
1033 return []
1035 pkg_universe = self.britney.pkg_universe
1036 # plus all direct reverse dependencies and test triggers of its
1037 # binaries which have an autopkgtest
1038 for binary in itertools.chain(srcinfo.binaries, extra_bins):
1039 rdeps = pkg_universe.reverse_dependencies_of(binary)
1040 for rdep in rdeps:
1041 try:
1042 rdep_src = binaries_info[rdep.package_name].source
1043 # Don't re-trigger the package itself here; this should
1044 # have been done above if the package still continues to
1045 # have an autopkgtest in unstable.
1046 if rdep_src == src:
1047 continue
1048 except KeyError:
1049 continue
1051 rdep_src_info = sources_info[rdep_src]
1052 if "autopkgtest" in rdep_src_info.testsuite or self.has_autodep8(
1053 rdep_src_info
1054 ):
1055 if rdep_src not in reported_pkgs:
1056 tests.append((rdep_src, rdep_src_info.version))
1057 reported_pkgs.add(rdep_src)
1059 for tdep_src in self.testsuite_triggers.get(binary.package_name, set()):
1060 if tdep_src not in reported_pkgs:
1061 try:
1062 tdep_src_info = sources_info[tdep_src]
1063 except KeyError:
1064 continue
1065 if "autopkgtest" in tdep_src_info.testsuite or self.has_autodep8( 1065 ↛ 1059line 1065 didn't jump to line 1059, because the condition on line 1065 was never false
1066 tdep_src_info
1067 ):
1068 for pkg_id in tdep_src_info.binaries: 1068 ↛ 1059line 1068 didn't jump to line 1059, because the loop on line 1068 didn't complete
1069 if pkg_id.architecture == arch:
1070 tests.append((tdep_src, tdep_src_info.version))
1071 reported_pkgs.add(tdep_src)
1072 break
1074 tests.sort(key=lambda s_v: s_v[0])
1075 return tests
1077 def read_pending_tests(self) -> None:
1078 """Read pending test requests from previous britney runs
1080 Initialize self.pending_tests with that data.
1081 """
1082 assert self.pending_tests is None, "already initialized"
1083 if not os.path.exists(self.pending_tests_file):
1084 self.logger.info(
1085 "No %s, starting with no pending tests", self.pending_tests_file
1086 )
1087 self.pending_tests = {}
1088 return
1089 with open(self.pending_tests_file) as f:
1090 self.pending_tests = json.load(f)
1091 if VERSION_KEY in self.pending_tests:
1092 del self.pending_tests[VERSION_KEY]
1093 for trigger in list(self.pending_tests.keys()):
1094 for pkg in list(self.pending_tests[trigger].keys()):
1095 arch_dict = self.pending_tests[trigger][pkg]
1096 for arch in list(arch_dict.keys()):
1097 if (
1098 self._now - arch_dict[arch]
1099 > self.options.adt_pending_max_age
1100 ):
1101 del arch_dict[arch]
1102 if not arch_dict:
1103 del self.pending_tests[trigger][pkg]
1104 if not self.pending_tests[trigger]:
1105 del self.pending_tests[trigger]
1106 else:
1107 # Migration code:
1108 for trigger_data in self.pending_tests.values(): 1108 ↛ 1109line 1108 didn't jump to line 1109, because the loop on line 1108 never started
1109 for pkg, arch_list in trigger_data.items():
1110 trigger_data[pkg] = {}
1111 for arch in arch_list:
1112 trigger_data[pkg][arch] = self._now
1114 self.logger.info(
1115 "Read pending requested tests from %s", self.pending_tests_file
1116 )
1117 self.logger.debug("%s", self.pending_tests)
1119 # this requires iterating over all triggers and thus is expensive;
1120 # cache the results
1121 @lru_cache(None)
1122 def latest_run_for_package(self, src: str, arch: str) -> str:
1123 """Return latest run ID for src on arch"""
1125 latest_run_id = ""
1126 for srcmap in self.test_results.values():
1127 try:
1128 run_id = srcmap[src][arch][2]
1129 except KeyError:
1130 continue
1131 if run_id > latest_run_id:
1132 latest_run_id = run_id
1133 return latest_run_id
1135 @lru_cache(None)
1136 def fetch_swift_results(self, swift_url: str, src: str, arch: str) -> None:
1137 """Download new results for source package/arch from swift"""
1139 # prepare query: get all runs with a timestamp later than the latest
1140 # run_id for this package/arch; '@' is at the end of each run id, to
1141 # mark the end of a test run directory path
1142 # example: <autopkgtest-wily>wily/amd64/libp/libpng/20150630_054517@/result.tar
1143 query = {
1144 "delimiter": "@",
1145 "prefix": "%s/%s/%s/%s/" % (self.options.series, arch, srchash(src), src),
1146 }
1148 # determine latest run_id from results
1149 if not self.options.adt_shared_results_cache:
1150 latest_run_id = self.latest_run_for_package(src, arch)
1151 if latest_run_id:
1152 query["marker"] = query["prefix"] + latest_run_id
1154 # request new results from swift
1155 url = os.path.join(swift_url, self.swift_container)
1156 url += "?" + urllib.parse.urlencode(query)
1157 f = None
1158 try:
1159 f = urlopen(url, timeout=30)
1160 if f.getcode() == 200:
1161 result_paths = f.read().decode().strip().splitlines()
1162 elif f.getcode() == 204: # No content 1162 ↛ 1168line 1162 didn't jump to line 1168, because the condition on line 1162 was never false
1163 result_paths = []
1164 else:
1165 # we should not ever end up here as we expect a HTTPError in
1166 # other cases; e. g. 3XX is something that tells us to adjust
1167 # our URLS, so fail hard on those
1168 raise NotImplementedError(
1169 "fetch_swift_results(%s): cannot handle HTTP code %i"
1170 % (url, f.getcode())
1171 )
1172 except IOError as e:
1173 # 401 "Unauthorized" is swift's way of saying "container does not exist"
1174 if getattr(e, "code", -1) == 401: 1174 ↛ 1183line 1174 didn't jump to line 1183, because the condition on line 1174 was never false
1175 self.logger.info(
1176 "fetch_swift_results: %s does not exist yet or is inaccessible", url
1177 )
1178 return
1179 # Other status codes are usually a transient
1180 # network/infrastructure failure. Ignoring this can lead to
1181 # re-requesting tests which we already have results for, so
1182 # fail hard on this and let the next run retry.
1183 self.logger.error("Failure to fetch swift results from %s: %s", url, str(e))
1184 sys.exit(1)
1185 finally:
1186 if f is not None: 1186 ↛ 1189line 1186 didn't jump to line 1189, because the condition on line 1186 was never false
1187 f.close() 1187 ↛ exitline 1187 didn't return from function 'fetch_swift_results', because the return on line 1178 wasn't executed
1189 for p in result_paths:
1190 self.fetch_one_result(
1191 os.path.join(swift_url, self.swift_container, p, "result.tar"),
1192 src,
1193 arch,
1194 )
1196 def fetch_one_result(self, url: str, src: str, arch: str) -> None:
1197 """Download one result URL for source/arch
1199 Remove matching pending_tests entries.
1200 """
1201 f = None
1202 try:
1203 f = urlopen(url, timeout=30)
1204 if f.getcode() == 200: 1204 ↛ 1207line 1204 didn't jump to line 1207, because the condition on line 1204 was never false
1205 tar_bytes = io.BytesIO(f.read())
1206 else:
1207 raise NotImplementedError(
1208 "fetch_one_result(%s): cannot handle HTTP code %i"
1209 % (url, f.getcode())
1210 )
1211 except IOError as err:
1212 self.logger.error("Failure to fetch %s: %s", url, str(err))
1213 # we tolerate "not found" (something went wrong on uploading the
1214 # result), but other things indicate infrastructure problems
1215 if getattr(err, "code", -1) == 404:
1216 return
1217 sys.exit(1)
1218 finally:
1219 if f is not None: 1219 ↛ exit, 1219 ↛ 12212 missed branches: 1) line 1219 didn't return from function 'fetch_one_result', because the return on line 1216 wasn't executed, 2) line 1219 didn't jump to line 1221, because the condition on line 1219 was never false
1220 f.close() 1220 ↛ exitline 1220 didn't return from function 'fetch_one_result', because the return on line 1216 wasn't executed
1221 try:
1222 with tarfile.open(None, "r", tar_bytes) as tar:
1223 exitcode = int(tar.extractfile("exitcode").read().strip()) # type: ignore[union-attr]
1224 srcver = tar.extractfile("testpkg-version").read().decode().strip() # type: ignore[union-attr]
1225 (ressrc, ver) = srcver.split()
1226 testinfo = json.loads(tar.extractfile("testinfo.json").read().decode()) # type: ignore[union-attr]
1227 except (KeyError, ValueError, tarfile.TarError) as err:
1228 self.logger.error("%s is damaged, ignoring: %s", url, str(err))
1229 # ignore this; this will leave an orphaned request in autopkgtest-pending.json
1230 # and thus require manual retries after fixing the tmpfail, but we
1231 # can't just blindly attribute it to some pending test.
1232 return
1234 if src != ressrc: 1234 ↛ 1235line 1234 didn't jump to line 1235, because the condition on line 1234 was never true
1235 self.logger.error(
1236 "%s is a result for package %s, but expected package %s",
1237 url,
1238 ressrc,
1239 src,
1240 )
1241 return
1243 # parse recorded triggers in test result
1244 for e in testinfo.get("custom_environment", []): 1244 ↛ 1249line 1244 didn't jump to line 1249, because the loop on line 1244 didn't complete
1245 if e.startswith("ADT_TEST_TRIGGERS="): 1245 ↛ 1244line 1245 didn't jump to line 1244, because the condition on line 1245 was never false
1246 result_triggers = [i for i in e.split("=", 1)[1].split() if "/" in i]
1247 break
1248 else:
1249 self.logger.error("%s result has no ADT_TEST_TRIGGERS, ignoring")
1250 return
1252 run_id = os.path.basename(os.path.dirname(url))
1253 seen = round(calendar.timegm(time.strptime(run_id, "%Y%m%d_%H%M%S@")))
1254 # allow some skipped tests, but nothing else
1255 if exitcode in [0, 2]:
1256 result = Result.PASS
1257 elif exitcode == 8: 1257 ↛ 1258line 1257 didn't jump to line 1258, because the condition on line 1257 was never true
1258 result = Result.NEUTRAL
1259 else:
1260 result = Result.FAIL
1262 self.logger.info(
1263 "Fetched test result for %s/%s/%s %s (triggers: %s): %s",
1264 src,
1265 ver,
1266 arch,
1267 run_id,
1268 result_triggers,
1269 result.name.lower(),
1270 )
1272 # remove matching test requests
1273 for trigger in result_triggers:
1274 self.remove_from_pending(trigger, src, arch)
1276 # add this result
1277 for trigger in result_triggers:
1278 self.add_trigger_to_results(trigger, src, ver, arch, run_id, seen, result)
1280 def remove_from_pending(
1281 self, trigger: str, src: str, arch: str, timestamp: int = sys.maxsize
1282 ) -> None:
1283 assert self.pending_tests is not None # for type checking
1284 try:
1285 arch_dict = self.pending_tests[trigger][src]
1286 if timestamp < arch_dict[arch]:
1287 # The result is from before the moment of scheduling, so it's
1288 # not the one we're waiting for
1289 return
1290 del arch_dict[arch]
1291 if not arch_dict:
1292 del self.pending_tests[trigger][src]
1293 if not self.pending_tests[trigger]:
1294 del self.pending_tests[trigger]
1295 self.logger.debug(
1296 "-> matches pending request %s/%s for trigger %s", src, arch, trigger
1297 )
1298 except KeyError:
1299 self.logger.debug(
1300 "-> does not match any pending request for %s/%s", src, arch
1301 )
1303 def add_trigger_to_results(
1304 self,
1305 trigger: str,
1306 src: str,
1307 ver: str,
1308 arch: str,
1309 run_id: str,
1310 timestamp: int,
1311 status_to_add: Result,
1312 ) -> None:
1313 # Ensure that we got a new enough version
1314 try:
1315 (trigsrc, trigver) = trigger.split("/", 1)
1316 except ValueError:
1317 self.logger.info("Ignoring invalid test trigger %s", trigger)
1318 return
1319 if trigsrc == src and apt_pkg.version_compare(ver, trigver) < 0: 1319 ↛ 1320line 1319 didn't jump to line 1320, because the condition on line 1319 was never true
1320 self.logger.debug(
1321 "test trigger %s, but run for older version %s, ignoring", trigger, ver
1322 )
1323 return
1325 stored_result = (
1326 self.test_results.setdefault(trigger, {})
1327 .setdefault(src, {})
1328 .setdefault(arch, [Result.FAIL, None, "", 0])
1329 )
1331 # reruns shouldn't flip the result from PASS or NEUTRAL to
1332 # FAIL, so remember the most recent version of the best result
1333 # we've seen. Except for reference updates, which we always
1334 # want to update with the most recent result. The result data
1335 # may not be ordered by timestamp, so we need to check time.
1336 update = False
1337 if self.options.adt_baseline == "reference" and trigger == REF_TRIG:
1338 if stored_result[3] < timestamp:
1339 update = True
1340 elif status_to_add < stored_result[0]:
1341 update = True
1342 elif status_to_add == stored_result[0] and stored_result[3] < timestamp:
1343 update = True
1345 if update:
1346 stored_result[0] = status_to_add
1347 stored_result[1] = ver
1348 stored_result[2] = run_id
1349 stored_result[3] = timestamp
1351 def send_test_request(
1352 self, src: str, arch: str, triggers: list[str], huge: bool = False
1353 ) -> None:
1354 """Send out AMQP request for testing src/arch for triggers
1356 If huge is true, then the request will be put into the -huge instead of
1357 normal queue.
1358 """
1359 if self.options.dry_run: 1359 ↛ 1360line 1359 didn't jump to line 1360, because the condition on line 1359 was never true
1360 return
1362 params: dict[str, Any] = {"triggers": triggers}
1363 if self.options.adt_ppas:
1364 params["ppas"] = self.options.adt_ppas
1365 qname = "debci-ppa-%s-%s" % (self.options.series, arch)
1366 elif huge:
1367 qname = "debci-huge-%s-%s" % (self.options.series, arch)
1368 else:
1369 qname = "debci-%s-%s" % (self.options.series, arch)
1370 params["submit-time"] = time.strftime("%Y-%m-%d %H:%M:%S%z", time.gmtime())
1372 if self.amqp_channel: 1372 ↛ 1373line 1372 didn't jump to line 1373, because the condition on line 1372 was never true
1373 self.amqp_channel.basic_publish(
1374 amqp.Message(
1375 src + "\n" + json.dumps(params), delivery_mode=2
1376 ), # persistent
1377 routing_key=qname,
1378 )
1379 # we save pending.json with every request, so that if britney
1380 # crashes we don't re-request tests. This is only needed when using
1381 # real amqp, as with file-based submission the pending tests are
1382 # returned by debci along with the results each run.
1383 self.save_pending_json()
1384 else:
1385 # for file-based submission, triggers are space separated
1386 params["triggers"] = [" ".join(params["triggers"])]
1387 assert self.amqp_file_handle
1388 self.amqp_file_handle.write("%s:%s %s\n" % (qname, src, json.dumps(params)))
1390 def pkg_test_request(
1391 self, src: str, arch: str, all_triggers: list[str], huge: bool = False
1392 ) -> None:
1393 """Request one package test for a set of triggers
1395 all_triggers is a list of "pkgname/version". These are the packages
1396 that will be taken from the source suite. The first package in this
1397 list is the package that triggers the testing of src, the rest are
1398 additional packages required for installability of the test deps. If
1399 huge is true, then the request will be put into the -huge instead of
1400 normal queue.
1402 This will only be done if that test wasn't already requested in
1403 a previous run (i. e. if it's not already in self.pending_tests)
1404 or if there is already a fresh or a positive result for it. This
1405 ensures to download current results for this package before
1406 requesting any test."""
1407 trigger = all_triggers[0]
1408 uses_swift = not self.options.adt_swift_url.startswith("file://")
1409 try:
1410 result = self.test_results[trigger][src][arch]
1411 has_result = True
1412 except KeyError:
1413 has_result = False
1415 if has_result:
1416 result_state = result[0]
1417 if result_state in {Result.OLD_PASS, Result.OLD_FAIL, Result.OLD_NEUTRAL}:
1418 pass
1419 elif (
1420 result_state == Result.FAIL
1421 and self.result_in_baseline(src, arch)[0]
1422 in {Result.PASS, Result.NEUTRAL, Result.OLD_PASS, Result.OLD_NEUTRAL}
1423 and self._now - result[3] > self.options.adt_retry_older_than
1424 ):
1425 # We might want to retry this failure, so continue
1426 pass
1427 elif not uses_swift:
1428 # We're done if we don't retrigger and we're not using swift
1429 return
1430 elif result_state in {Result.PASS, Result.NEUTRAL}:
1431 self.logger.debug(
1432 "%s/%s triggered by %s already known", src, arch, trigger
1433 )
1434 return
1436 # Without swift we don't expect new results
1437 if uses_swift:
1438 self.logger.info(
1439 "Checking for new results for failed %s/%s for trigger %s",
1440 src,
1441 arch,
1442 trigger,
1443 )
1444 self.fetch_swift_results(self.options.adt_swift_url, src, arch)
1445 # do we have one now?
1446 try:
1447 self.test_results[trigger][src][arch]
1448 return
1449 except KeyError:
1450 pass
1452 self.request_test_if_not_queued(src, arch, trigger, all_triggers, huge=huge)
1454 def request_test_if_not_queued(
1455 self,
1456 src: str,
1457 arch: str,
1458 trigger: str,
1459 all_triggers: list[str] = [],
1460 huge: bool = False,
1461 ) -> None:
1462 assert self.pending_tests is not None # for type checking
1463 if not all_triggers:
1464 all_triggers = [trigger]
1466 # Don't re-request if it's already pending
1467 arch_dict = self.pending_tests.setdefault(trigger, {}).setdefault(src, {})
1468 if arch in arch_dict.keys():
1469 self.logger.debug(
1470 "Test %s/%s for %s is already pending, not queueing", src, arch, trigger
1471 )
1472 else:
1473 self.logger.debug(
1474 "Requesting %s autopkgtest on %s to verify %s", src, arch, trigger
1475 )
1476 arch_dict[arch] = self._now
1477 self.send_test_request(src, arch, all_triggers, huge=huge)
1479 def result_in_baseline(self, src: str, arch: str) -> list[Any]:
1480 """Get the result for src on arch in the baseline
1482 The baseline is optionally all data or a reference set)
1483 """
1485 # this requires iterating over all cached results and thus is expensive;
1486 # cache the results
1487 try:
1488 return self.result_in_baseline_cache[src][arch]
1489 except KeyError:
1490 pass
1492 result_reference: list[Any] = [Result.NONE, None, "", 0]
1493 if self.options.adt_baseline == "reference":
1494 if src not in self.suite_info.target_suite.sources:
1495 return result_reference
1497 try:
1498 result_reference = self.test_results[REF_TRIG][src][arch]
1499 self.logger.debug(
1500 "Found result for src %s in reference: %s",
1501 src,
1502 result_reference[0].name,
1503 )
1504 except KeyError:
1505 self.logger.debug(
1506 "Found NO result for src %s in reference: %s",
1507 src,
1508 result_reference[0].name,
1509 )
1510 self.result_in_baseline_cache[src][arch] = deepcopy(result_reference)
1511 return result_reference
1513 result_ever: list[Any] = [Result.FAIL, None, "", 0]
1514 for srcmap in self.test_results.values():
1515 try:
1516 if srcmap[src][arch][0] != Result.FAIL:
1517 result_ever = srcmap[src][arch]
1518 # If we are not looking at a reference run, We don't really
1519 # care about anything except the status, so we're done
1520 # once we find a PASS.
1521 if result_ever[0] == Result.PASS:
1522 break
1523 except KeyError:
1524 pass
1526 self.result_in_baseline_cache[src][arch] = deepcopy(result_ever)
1527 self.logger.debug("Result for src %s ever: %s", src, result_ever[0].name)
1528 return result_ever
1530 def has_test_in_target(self, src: str) -> bool:
1531 test_in_target = False
1532 try:
1533 srcinfo = self.suite_info.target_suite.sources[src]
1534 if "autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo):
1535 test_in_target = True
1536 # AttributeError is only needed for the test suite as
1537 # srcinfo can be a NoneType
1538 except (KeyError, AttributeError):
1539 pass
1541 return test_in_target
1543 def pkg_test_result(
1544 self, src: str, ver: str, arch: str, trigger: str
1545 ) -> tuple[str, str, Optional[str], str]:
1546 """Get current test status of a particular package
1548 Return (status, real_version, run_id, log_url) tuple; status is a key in
1549 EXCUSES_LABELS. run_id is None if the test is still running.
1550 """
1551 assert self.pending_tests is not None # for type checking
1552 # determine current test result status
1553 run_id = None
1554 try:
1555 r = self.test_results[trigger][src][arch]
1556 ver = r[1]
1557 run_id = r[2]
1559 if r[0] in {Result.FAIL, Result.OLD_FAIL}:
1560 # determine current test result status
1561 baseline_result = self.result_in_baseline(src, arch)[0]
1563 # Special-case triggers from linux-meta*: we cannot compare
1564 # results against different kernels, as e. g. a DKMS module
1565 # might work against the default kernel but fail against a
1566 # different flavor; so for those, ignore the "ever
1567 # passed" check; FIXME: check against trigsrc only
1568 if self.options.adt_baseline != "reference" and (
1569 trigger.startswith("linux-meta") or trigger.startswith("linux/")
1570 ):
1571 baseline_result = Result.FAIL
1573 # Check if the autopkgtest (still) exists in the target suite
1574 test_in_target = self.has_test_in_target(src)
1576 if test_in_target and baseline_result in {
1577 Result.NONE,
1578 Result.OLD_FAIL,
1579 Result.OLD_NEUTRAL,
1580 Result.OLD_PASS,
1581 }:
1582 self.request_test_if_not_queued(src, arch, REF_TRIG)
1584 result = "REGRESSION"
1585 if baseline_result in {Result.FAIL, Result.OLD_FAIL}:
1586 result = "ALWAYSFAIL"
1587 elif baseline_result == Result.NONE and test_in_target: 1587 ↛ 1588line 1587 didn't jump to line 1588, because the condition on line 1587 was never true
1588 result = "RUNNING-REFERENCE"
1590 if self.options.adt_ignore_failure_for_new_tests and not test_in_target:
1591 result = "ALWAYSFAIL"
1593 if self.has_force_badtest(src, ver, arch):
1594 result = "IGNORE-FAIL"
1595 else:
1596 result = r[0].name
1598 url = self.format_log_url(src, arch, run_id)
1599 except KeyError:
1600 # no result for src/arch; still running?
1601 if arch in self.pending_tests.get(trigger, {}).get(src, {}).keys(): 1601 ↛ 1616line 1601 didn't jump to line 1616, because the condition on line 1601 was never false
1602 baseline_result = self.result_in_baseline(src, arch)[0]
1603 if (
1604 self.options.adt_ignore_failure_for_new_tests
1605 and not self.has_test_in_target(src)
1606 ):
1607 result = "RUNNING-ALWAYSFAIL"
1608 elif baseline_result != Result.FAIL and not self.has_force_badtest(
1609 src, ver, arch
1610 ):
1611 result = "RUNNING"
1612 else:
1613 result = "RUNNING-ALWAYSFAIL"
1614 url = self.options.adt_ci_url + "status/pending"
1615 else:
1616 raise RuntimeError(
1617 "Result for %s/%s/%s (triggered by %s) is neither known nor pending!"
1618 % (src, ver, arch, trigger)
1619 )
1621 return (result, ver, run_id, url)
1623 def has_force_badtest(self, src: str, ver: str, arch: str) -> bool:
1624 """Check if src/ver/arch has a force-badtest hint"""
1626 assert self.hints is not None
1627 hints = self.hints.search("force-badtest", package=src)
1628 if hints:
1629 self.logger.info(
1630 "Checking hints for %s/%s/%s: %s",
1631 src,
1632 ver,
1633 arch,
1634 [str(h) for h in hints],
1635 )
1636 for hint in hints:
1637 if [
1638 mi
1639 for mi in hint.packages
1640 if mi.architecture in ["source", arch]
1641 and (
1642 mi.version == "all"
1643 or apt_pkg.version_compare(ver, mi.version) <= 0 # type: ignore[arg-type]
1644 )
1645 ]:
1646 return True
1648 return False
1650 def has_built_on_this_arch_or_is_arch_all(
1651 self, src_data: SourcePackage, arch: str
1652 ) -> bool:
1653 """When a source builds arch:all binaries, those binaries are
1654 added to all architectures and thus the source 'exists'
1655 everywhere. This function checks if the source has any arch
1656 specific binaries on this architecture and if not, if it
1657 has them on any architecture.
1658 """
1659 packages_s_a = self.suite_info.primary_source_suite.binaries[arch]
1660 has_unknown_binary = False
1661 for binary_s in src_data.binaries:
1662 try:
1663 binary_u = packages_s_a[binary_s.package_name]
1664 except KeyError:
1665 # src_data.binaries has all the built binaries, so if
1666 # we get here, we know that at least one architecture
1667 # has architecture specific binaries
1668 has_unknown_binary = True
1669 continue
1670 if binary_u.architecture == arch:
1671 return True
1672 # If we get here, we have only seen arch:all packages for this
1673 # arch.
1674 return not has_unknown_binary