Coverage for britney2/policies/autopkgtest.py: 90%
770 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-05-18 10:34 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-05-18 10:34 +0000
1# -*- coding: utf-8 -*-
3# Copyright (C) 2013 - 2016 Canonical Ltd.
4# Authors:
5# Colin Watson <cjwatson@ubuntu.com>
6# Jean-Baptiste Lallement <jean-baptiste.lallement@canonical.com>
7# Martin Pitt <martin.pitt@ubuntu.com>
9# This program is free software; you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation; either version 2 of the License, or
12# (at your option) any later version.
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
19import calendar
20import collections
21import io
22import itertools
23import json
24import optparse
25import os
26import sys
27import tarfile
28import time
29import urllib.parse
30from copy import deepcopy
31from enum import Enum
32from functools import lru_cache, total_ordering
33from typing import TYPE_CHECKING, Any, Optional, cast
34from collections.abc import Iterator
35from urllib.request import urlopen
37import apt_pkg
39import britney2.hints
40from britney2 import (
41 BinaryPackageId,
42 PackageId,
43 SourcePackage,
44 SuiteClass,
45 Suites,
46 TargetSuite,
47)
48from britney2.migrationitem import MigrationItem
49from britney2.policies import PolicyVerdict
50from britney2.policies.policy import AbstractBasePolicy
51from britney2.utils import iter_except, parse_option
53if TYPE_CHECKING: 53 ↛ 54line 53 didn't jump to line 54, because the condition on line 53 was never true
54 import amqplib.client_0_8 as amqp
56 from ..britney import Britney
57 from ..excuse import Excuse
58 from ..hints import HintParser
61@total_ordering
62class Result(Enum):
63 PASS = 1
64 NEUTRAL = 2
65 FAIL = 3
66 OLD_PASS = 4
67 OLD_NEUTRAL = 5
68 OLD_FAIL = 6
69 NONE = 7
71 def __lt__(self, other: "Result") -> bool:
72 return True if self.value < other.value else False
75EXCUSES_LABELS = {
76 "PASS": '<span style="background:#87d96c">Pass</span>',
77 "OLD_PASS": '<span style="background:#87d96c">Pass</span>',
78 "NEUTRAL": '<span style="background:#e5c545">No tests, superficial or marked flaky</span>',
79 "OLD_NEUTRAL": '<span style="background:#e5c545">No tests, superficial or marked flaky</span>',
80 "FAIL": '<span style="background:#ff6666">Failed</span>',
81 "OLD_FAIL": '<span style="background:#ff6666">Failed</span>',
82 "ALWAYSFAIL": '<span style="background:#e5c545">Failed (not a regression)</span>',
83 "REGRESSION": '<span style="background:#ff6666">Regression or new test</span>',
84 "IGNORE-FAIL": '<span style="background:#e5c545">Ignored failure</span>',
85 "RUNNING": '<span style="background:#99ddff">Test in progress</span>',
86 "RUNNING-REFERENCE": '<span style="background:#ff6666">Reference test in progress, but real test failed already</span>',
87 "RUNNING-ALWAYSFAIL": '<span style="background:#99ddff">Test in progress (will not be considered a regression)</span>',
88}
90REF_TRIG = "migration-reference/0"
92VERSION_KEY = "britney-autopkgtest-pending-file-version"
95def srchash(src: str) -> str:
96 """archive hash prefix for source package"""
98 if src.startswith("lib"): 98 ↛ 99line 98 didn't jump to line 99, because the condition on line 98 was never true
99 return src[:4]
100 else:
101 return src[0]
104def added_pkgs_compared_to_target_suite(
105 package_ids: frozenset[BinaryPackageId],
106 target_suite: TargetSuite,
107 *,
108 invert: bool = False,
109) -> Iterator[BinaryPackageId]:
110 if invert: 110 ↛ 111line 110 didn't jump to line 111, because the condition on line 110 was never true
111 pkgs_ids_to_ignore = package_ids - set(
112 target_suite.which_of_these_are_in_the_suite(package_ids)
113 )
114 names_ignored = {p.package_name for p in pkgs_ids_to_ignore}
115 else:
116 names_ignored = {
117 p.package_name
118 for p in target_suite.which_of_these_are_in_the_suite(package_ids)
119 }
120 yield from (p for p in package_ids if p.package_name not in names_ignored)
123def all_leaf_results(
124 test_results: dict[str, dict[str, dict[str, list[Any]]]],
125) -> Iterator[list[Any]]:
126 for trigger in test_results.values():
127 for arch in trigger.values():
128 yield from arch.values()
131def mark_result_as_old(result: Result) -> Result:
132 """Convert current result into corresponding old result"""
134 if result == Result.FAIL:
135 result = Result.OLD_FAIL
136 elif result == Result.PASS:
137 result = Result.OLD_PASS
138 elif result == Result.NEUTRAL: 138 ↛ 140line 138 didn't jump to line 140, because the condition on line 138 was never false
139 result = Result.OLD_NEUTRAL
140 return result
143class AutopkgtestPolicy(AbstractBasePolicy):
144 """autopkgtest regression policy for source migrations
146 Run autopkgtests for the excuse and all of its reverse dependencies, and
147 reject the upload if any of those regress.
148 """
150 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
151 super().__init__(
152 "autopkgtest", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
153 )
154 # tests requested in this and previous runs
155 # trigger -> src -> [arch]
156 self.pending_tests: Optional[dict[str, dict[str, dict[str, int]]]] = None
157 self.pending_tests_file = os.path.join(
158 self.state_dir, "autopkgtest-pending.json"
159 )
160 self.testsuite_triggers: dict[str, set[str]] = {}
161 self.result_in_baseline_cache: dict[str, dict[str, list[Any]]] = (
162 collections.defaultdict(dict)
163 )
165 self.amqp_file_handle: io.TextIOWrapper | None = None
167 # Default values for this policy's options
168 parse_option(options, "adt_baseline")
169 parse_option(options, "adt_huge", to_int=True)
170 parse_option(options, "adt_ppas")
171 parse_option(options, "adt_reference_max_age", day_to_sec=True)
172 parse_option(options, "adt_pending_max_age", default=5, day_to_sec=True)
173 parse_option(options, "adt_regression_penalty", default=0, to_int=True)
174 parse_option(options, "adt_log_url") # see below for defaults
175 parse_option(options, "adt_retry_url") # see below for defaults
176 parse_option(options, "adt_retry_older_than", day_to_sec=True)
177 parse_option(options, "adt_results_cache_age", day_to_sec=True)
178 parse_option(options, "adt_shared_results_cache")
179 parse_option(options, "adt_success_bounty", default=0, to_int=True)
180 parse_option(options, "adt_ignore_failure_for_new_tests", to_bool=True)
182 # When ADT_RESULTS_CACHE_AGE is smaller than or equal to
183 # ADT_REFERENCE_MAX_AGE old reference result will be removed from cache
184 # before the newly scheduled results are in, potentially causing
185 # additional waiting. For packages like glibc this might cause an
186 # infinite delay as there will always be a package that's
187 # waiting. Similarly for ADT_RETRY_OLDER_THAN.
188 if self.options.adt_results_cache_age <= self.options.adt_reference_max_age:
189 self.logger.warning(
190 "Unexpected: ADT_REFERENCE_MAX_AGE bigger than ADT_RESULTS_CACHE_AGE"
191 )
192 if self.options.adt_results_cache_age <= self.options.adt_retry_older_than:
193 self.logger.warning(
194 "Unexpected: ADT_RETRY_OLDER_THAN bigger than ADT_RESULTS_CACHE_AGE"
195 )
197 if not self.options.adt_log_url: 197 ↛ 223line 197 didn't jump to line 223, because the condition on line 197 was never false
198 # Historical defaults
199 if self.options.adt_swift_url.startswith("file://"):
200 self.options.adt_log_url = os.path.join(
201 self.options.adt_ci_url,
202 "data",
203 "autopkgtest",
204 self.options.series,
205 "{arch}",
206 "{hash}",
207 "{package}",
208 "{run_id}",
209 "log.gz",
210 )
211 else:
212 self.options.adt_log_url = os.path.join(
213 self.options.adt_swift_url,
214 "{swift_container}",
215 self.options.series,
216 "{arch}",
217 "{hash}",
218 "{package}",
219 "{run_id}",
220 "log.gz",
221 )
223 if hasattr(self.options, "adt_retry_url_mech"): 223 ↛ 224line 223 didn't jump to line 224, because the condition on line 223 was never true
224 self.logger.warning(
225 "The ADT_RETRY_URL_MECH configuration has been deprecated."
226 )
227 self.logger.warning(
228 "Instead britney now supports ADT_RETRY_URL for more flexibility."
229 )
230 if self.options.adt_retry_url:
231 self.logger.error(
232 "Please remove the ADT_RETRY_URL_MECH as ADT_RETRY_URL will be used."
233 )
234 elif self.options.adt_retry_url_mech == "run_id":
235 self.options.adt_retry_url = (
236 self.options.adt_ci_url + "api/v1/retry/{run_id}"
237 )
238 if not self.options.adt_retry_url: 238 ↛ 255line 238 didn't jump to line 255, because the condition on line 238 was never false
239 # Historical default
240 self.options.adt_retry_url = (
241 self.options.adt_ci_url
242 + "request.cgi?"
243 + "release={release}&arch={arch}&package={package}&trigger={trigger}{ppas}"
244 )
246 # results map: trigger -> src -> arch -> [passed, version, run_id, seen]
247 # - trigger is "source/version" of an unstable package that triggered
248 # this test run.
249 # - "passed" is a Result
250 # - "version" is the package version of "src" of that test
251 # - "run_id" is an opaque ID that identifies a particular test run for
252 # a given src/arch.
253 # - "seen" is an approximate time stamp of the test run. How this is
254 # deduced depends on the interface used.
255 self.test_results: dict[str, dict[str, dict[str, list[Any]]]] = {}
256 if self.options.adt_shared_results_cache:
257 self.results_cache_file = self.options.adt_shared_results_cache
258 else:
259 self.results_cache_file = os.path.join(
260 self.state_dir, "autopkgtest-results.cache"
261 )
263 try:
264 self.options.adt_ppas = self.options.adt_ppas.strip().split()
265 except AttributeError:
266 self.options.adt_ppas = []
268 self.swift_container = "autopkgtest-" + options.series
269 if self.options.adt_ppas:
270 self.swift_container += "-" + options.adt_ppas[-1].replace("/", "-")
272 # restrict adt_arches to architectures we actually run for
273 self.adt_arches = []
274 for arch in self.options.adt_arches.split():
275 if arch in self.options.architectures:
276 self.adt_arches.append(arch)
277 else:
278 self.logger.info(
279 "Ignoring ADT_ARCHES %s as it is not in architectures list", arch
280 )
282 def __del__(self) -> None:
283 if self.amqp_file_handle: 283 ↛ exitline 283 didn't return from function '__del__', because the condition on line 283 was never false
284 try:
285 self.amqp_file_handle.close()
286 except AttributeError:
287 pass
289 def register_hints(self, hint_parser: "HintParser") -> None:
290 hint_parser.register_hint_type(
291 "force-badtest", britney2.hints.split_into_one_hint_per_package
292 )
293 hint_parser.register_hint_type(
294 "force-skiptest", britney2.hints.split_into_one_hint_per_package
295 )
297 def initialise(self, britney: "Britney") -> None:
298 super().initialise(britney)
299 # We want to use the "current" time stamp in multiple locations
300 time_now = round(time.time())
301 if hasattr(self.options, "fake_runtime"):
302 time_now = int(self.options.fake_runtime)
303 self._now = time_now
304 # compute inverse Testsuite-Triggers: map, unifying all series
305 self.logger.info("Building inverse testsuite_triggers map")
306 for suite in self.suite_info:
307 for src, data in suite.sources.items():
308 for trigger in data.testsuite_triggers:
309 self.testsuite_triggers.setdefault(trigger, set()).add(src)
310 target_suite_name = self.suite_info.target_suite.name
312 os.makedirs(self.state_dir, exist_ok=True)
313 self.read_pending_tests()
315 # read the cached results that we collected so far
316 if os.path.exists(self.results_cache_file):
317 with open(self.results_cache_file) as f:
318 test_results = json.load(f)
319 self.test_results = self.check_and_upgrade_cache(test_results)
320 self.logger.info("Read previous results from %s", self.results_cache_file)
321 else:
322 self.logger.info(
323 "%s does not exist, re-downloading all results from swift",
324 self.results_cache_file,
325 )
327 # read in the new results
328 if self.options.adt_swift_url.startswith("file://"):
329 debci_file = self.options.adt_swift_url[7:]
330 if os.path.exists(debci_file):
331 with open(debci_file) as f:
332 test_results = json.load(f)
333 self.logger.info("Read new results from %s", debci_file)
334 for res in test_results["results"]:
335 # if there's no date, the test didn't finish yet
336 if res["date"] is None: 336 ↛ 337line 336 didn't jump to line 337, because the condition on line 336 was never true
337 continue
338 (test_suite, triggers, src, arch, ver, status, run_id, seen) = [
339 res["suite"],
340 res["trigger"],
341 res["package"],
342 res["arch"],
343 res["version"],
344 res["status"],
345 str(res["run_id"]),
346 round(
347 calendar.timegm(
348 time.strptime(res["date"][0:-5], "%Y-%m-%dT%H:%M:%S")
349 )
350 ),
351 ]
352 if test_suite != target_suite_name: 352 ↛ 354line 352 didn't jump to line 354, because the condition on line 352 was never true
353 # not requested for this target suite, so ignore
354 continue
355 if triggers is None: 355 ↛ 357line 355 didn't jump to line 357, because the condition on line 355 was never true
356 # not requested for this policy, so ignore
357 continue
358 if status is None:
359 # still running => pending
360 continue
361 for trigger in triggers.split():
362 # remove matching test requests
363 self.remove_from_pending(trigger, src, arch, seen)
364 if status == "tmpfail": 364 ↛ 366line 364 didn't jump to line 366, because the condition on line 364 was never true
365 # let's see if we still need it
366 continue
367 self.logger.debug(
368 "Results %s %s %s added", src, trigger, status
369 )
370 self.add_trigger_to_results(
371 trigger,
372 src,
373 ver,
374 arch,
375 run_id,
376 seen,
377 Result[status.upper()],
378 )
379 else:
380 self.logger.info(
381 "%s does not exist, no new data will be processed", debci_file
382 )
384 # The cache can contain results against versions of packages that
385 # are not in any suite anymore. Strip those out, as we don't want
386 # to use those results. Additionally, old references may be
387 # filtered out.
388 if self.options.adt_baseline == "reference":
389 self.filter_old_results()
391 # we need sources, binaries, and installability tester, so for now
392 # remember the whole britney object
393 self.britney = britney
395 # Initialize AMQP connection
396 self.amqp_channel: Optional["amqp.channel.Channel"] = None
397 self.amqp_file_handle = None
398 if self.options.dry_run: 398 ↛ 399line 398 didn't jump to line 399, because the condition on line 398 was never true
399 return
401 amqp_url = self.options.adt_amqp
403 if amqp_url.startswith("amqp://"): 403 ↛ 404line 403 didn't jump to line 404, because the condition on line 403 was never true
404 import amqplib.client_0_8 as amqp
406 # depending on the setup we connect to a AMQP server
407 creds = urllib.parse.urlsplit(amqp_url, allow_fragments=False)
408 self.amqp_con = amqp.Connection(
409 creds.hostname, userid=creds.username, password=creds.password
410 )
411 self.amqp_channel = self.amqp_con.channel()
412 self.logger.info("Connected to AMQP server")
413 elif amqp_url.startswith("file://"): 413 ↛ 418line 413 didn't jump to line 418, because the condition on line 413 was never false
414 # or in Debian and in testing mode, adt_amqp will be a file:// URL
415 amqp_file = amqp_url[7:]
416 self.amqp_file_handle = open(amqp_file, "w", 1)
417 else:
418 raise RuntimeError("Unknown ADT_AMQP schema %s" % amqp_url.split(":", 1)[0])
420 def check_and_upgrade_cache(
421 self, test_results: dict[str, dict[str, dict[str, list[Any]]]]
422 ) -> dict[str, dict[str, dict[str, list[Any]]]]:
423 for leaf_result in all_leaf_results(test_results):
424 leaf_result[0] = Result[leaf_result[0]]
426 # Drop results older than ADT_RESULTS_CACHE_AGE
427 for trigger in list(test_results.keys()):
428 for pkg in list(test_results[trigger].keys()):
429 for arch in list(test_results[trigger][pkg].keys()):
430 arch_result = test_results[trigger][pkg][arch]
431 if self._now - arch_result[3] > self.options.adt_results_cache_age: 431 ↛ 432line 431 didn't jump to line 432, because the condition on line 431 was never true
432 del test_results[trigger][pkg][arch]
433 if not test_results[trigger][pkg]: 433 ↛ 434line 433 didn't jump to line 434, because the condition on line 433 was never true
434 del test_results[trigger][pkg]
435 if not test_results[trigger]: 435 ↛ 436line 435 didn't jump to line 436, because the condition on line 435 was never true
436 del test_results[trigger]
438 return test_results
440 def filter_old_results(self) -> None:
441 """Remove results for old versions and reference runs from the cache.
443 For now, only delete reference runs. If we delete regular
444 results after a while, packages with lots of triggered tests may
445 never have all the results at the same time."""
447 test_results = self.test_results
449 for trigger, trigger_data in test_results.items():
450 for src, results in trigger_data.items():
451 for arch, result in results.items():
452 if (
453 trigger == REF_TRIG
454 and self._now - result[3] > self.options.adt_reference_max_age
455 ):
456 result[0] = mark_result_as_old(result[0])
457 elif not self.test_version_in_any_suite(src, result[1]):
458 result[0] = mark_result_as_old(result[0])
460 def test_version_in_any_suite(self, src: str, version: str) -> bool:
461 """Check if the mentioned version of src is found in a suite
463 To prevent regressions in the target suite, the result should be
464 from a test with the version of the package in either the source
465 suite or the target suite. The source suite is also valid,
466 because due to versioned test dependencies and Breaks/Conflicts
467 relations, regularly the version in the source suite is used
468 during testing.
469 """
471 versions = set()
472 for suite in self.suite_info:
473 try:
474 srcinfo = suite.sources[src]
475 except KeyError:
476 continue
477 versions.add(srcinfo.version)
479 valid_version = False
480 for ver in versions:
481 if apt_pkg.version_compare(ver, version) == 0:
482 valid_version = True
483 break
485 return valid_version
487 def save_pending_json(self) -> None:
488 # update the pending tests on-disk cache
489 self.logger.info(
490 "Updating pending requested tests in %s" % self.pending_tests_file
491 )
492 # Shallow clone pending_tests as we only modify the toplevel and change its type.
493 pending_tests: dict[str, Any] = {}
494 if self.pending_tests:
495 pending_tests = dict(self.pending_tests)
496 # Avoid adding if there are no pending results at all (eases testing)
497 pending_tests[VERSION_KEY] = 1
498 with open(self.pending_tests_file + ".new", "w") as f:
499 json.dump(pending_tests, f, indent=2)
500 os.rename(self.pending_tests_file + ".new", self.pending_tests_file)
502 def save_state(self, britney: "Britney") -> None:
503 super().save_state(britney)
505 # update the results on-disk cache, unless we are using a r/o shared one
506 if not self.options.adt_shared_results_cache:
507 self.logger.info("Updating results cache")
508 test_results = deepcopy(self.test_results)
509 for result in all_leaf_results(test_results):
510 result[0] = result[0].name
511 with open(self.results_cache_file + ".new", "w") as f:
512 json.dump(test_results, f, indent=2)
513 os.rename(self.results_cache_file + ".new", self.results_cache_file)
515 self.save_pending_json()
517 def format_retry_url(
518 self, run_id: Optional[str], arch: str, testsrc: str, trigger: str
519 ) -> str:
520 if self.options.adt_ppas:
521 ppas = "&" + urllib.parse.urlencode(
522 [("ppa", p) for p in self.options.adt_ppas]
523 )
524 else:
525 ppas = ""
526 return cast(str, self.options.adt_retry_url).format(
527 run_id=run_id,
528 release=self.options.series,
529 arch=arch,
530 package=testsrc,
531 trigger=urllib.parse.quote_plus(trigger),
532 ppas=ppas,
533 )
535 def format_log_url(self, testsrc: str, arch: str, run_id: str) -> str:
536 return cast(str, self.options.adt_log_url).format(
537 release=self.options.series,
538 swift_container=self.swift_container,
539 hash=srchash(testsrc),
540 package=testsrc,
541 arch=arch,
542 run_id=run_id,
543 )
545 def apply_src_policy_impl(
546 self,
547 tests_info: dict[str, Any],
548 item: MigrationItem,
549 source_data_tdist: Optional[SourcePackage],
550 source_data_srcdist: SourcePackage,
551 excuse: "Excuse",
552 ) -> PolicyVerdict:
553 assert self.hints is not None # for type checking
554 # initialize
555 verdict = PolicyVerdict.PASS
556 all_self_tests_pass = False
557 source_name = item.package
558 results_info = []
560 # skip/delay autopkgtests until new package is built somewhere
561 if not source_data_srcdist.binaries:
562 self.logger.debug(
563 "%s hasnot been built anywhere, skipping autopkgtest policy",
564 excuse.name,
565 )
566 verdict = PolicyVerdict.REJECTED_TEMPORARILY
567 excuse.add_verdict_info(verdict, "nothing built yet, autopkgtest delayed")
569 if "all" in excuse.missing_builds:
570 self.logger.debug(
571 "%s hasnot been built for arch:all, skipping autopkgtest policy",
572 source_name,
573 )
574 verdict = PolicyVerdict.REJECTED_TEMPORARILY
575 excuse.add_verdict_info(
576 verdict, "arch:all not built yet, autopkgtest delayed"
577 )
579 if verdict == PolicyVerdict.PASS:
580 self.logger.debug("Checking autopkgtests for %s", source_name)
581 trigger = source_name + "/" + source_data_srcdist.version
583 # build a (testsrc, testver) → arch → (status, run_id, log_url) map; we trigger/check test
584 # results per architecture for technical/efficiency reasons, but we
585 # want to evaluate and present the results by tested source package
586 # first
587 pkg_arch_result: dict[
588 tuple[str, str], dict[str, tuple[str, Optional[str], str]]
589 ] = collections.defaultdict(dict)
590 for arch in self.adt_arches:
591 if arch in excuse.missing_builds:
592 verdict = PolicyVerdict.REJECTED_TEMPORARILY
593 self.logger.debug(
594 "%s hasnot been built on arch %s, delay autopkgtest there",
595 source_name,
596 arch,
597 )
598 excuse.add_verdict_info(
599 verdict,
600 "arch:%s not built yet, autopkgtest delayed there" % arch,
601 )
602 elif arch in excuse.policy_info["depends"].get(
603 "arch_all_not_installable", []
604 ):
605 self.logger.debug(
606 "%s is uninstallable on arch %s (which is allowed), not running autopkgtest there",
607 source_name,
608 arch,
609 )
610 excuse.addinfo(
611 "uninstallable on arch %s (which is allowed), not running autopkgtest there"
612 % arch
613 )
614 elif (
615 arch in excuse.unsatisfiable_on_archs
616 and arch
617 not in excuse.policy_info["depends"].get(
618 "autopkgtest_run_anyways", []
619 )
620 ):
621 verdict = PolicyVerdict.REJECTED_TEMPORARILY
622 self.logger.debug(
623 "%s is uninstallable on arch %s, not running autopkgtest there",
624 source_name,
625 arch,
626 )
627 excuse.addinfo(
628 "uninstallable on arch %s, not running autopkgtest there" % arch
629 )
630 else:
631 self.request_tests_for_source(
632 item, arch, source_data_srcdist, pkg_arch_result, excuse
633 )
635 # add test result details to Excuse
636 cloud_url = self.options.adt_ci_url + "packages/%(h)s/%(s)s/%(r)s/%(a)s"
637 testver: Optional[str]
638 for testsrc, testver in sorted(pkg_arch_result):
639 assert testver is not None
640 arch_results = pkg_arch_result[(testsrc, testver)]
641 r = {v[0] for v in arch_results.values()}
642 if "REGRESSION" in r:
643 verdict = PolicyVerdict.REJECTED_PERMANENTLY
644 elif (
645 "RUNNING" in r or "RUNNING-REFERENCE" in r
646 ) and verdict == PolicyVerdict.PASS:
647 verdict = PolicyVerdict.REJECTED_TEMPORARILY
648 # skip version if still running on all arches
649 if not r - {"RUNNING", "RUNNING-ALWAYSFAIL"}:
650 testver = None
652 # A source package is eligible for the bounty if it has tests
653 # of its own that pass on all tested architectures.
654 if testsrc == source_name:
655 excuse.autopkgtest_results = r
656 if r == {"PASS"}:
657 all_self_tests_pass = True
659 if testver:
660 testname = "%s/%s" % (testsrc, testver)
661 else:
662 testname = testsrc
664 html_archmsg = []
665 for arch in sorted(arch_results):
666 (status, run_id, log_url) = arch_results[arch]
667 artifact_url = None
668 retry_url = None
669 reference_url = None
670 reference_retry_url = None
671 history_url = None
672 if self.options.adt_ppas:
673 if log_url.endswith("log.gz"):
674 artifact_url = log_url.replace("log.gz", "artifacts.tar.gz")
675 else:
676 history_url = cloud_url % {
677 "h": srchash(testsrc),
678 "s": testsrc,
679 "r": self.options.series,
680 "a": arch,
681 }
682 if status in ("NEUTRAL", "REGRESSION", "RUNNING-REFERENCE"):
683 retry_url = self.format_retry_url(
684 run_id, arch, testsrc, trigger
685 )
687 baseline_result = self.result_in_baseline(testsrc, arch)
688 if baseline_result and baseline_result[0] != Result.NONE:
689 baseline_run_id = str(baseline_result[2])
690 reference_url = self.format_log_url(
691 testsrc, arch, baseline_run_id
692 )
693 if self.options.adt_baseline == "reference":
694 reference_retry_url = self.format_retry_url(
695 baseline_run_id, arch, testsrc, REF_TRIG
696 )
697 tests_info.setdefault(testname, {})[arch] = [
698 status,
699 log_url,
700 history_url,
701 artifact_url,
702 retry_url,
703 ]
705 # render HTML snippet for testsrc entry for current arch
706 if history_url:
707 message = '<a href="%s">%s</a>' % (history_url, arch)
708 else:
709 message = arch
710 message += ': <a href="%s">%s</a>' % (
711 log_url,
712 EXCUSES_LABELS[status],
713 )
714 if retry_url:
715 message += (
716 '<a href="%s" style="text-decoration: none;"> ♻</a>'
717 % retry_url
718 )
719 if reference_url:
720 message += ' (<a href="%s">reference</a>' % reference_url
721 if reference_retry_url:
722 message += (
723 '<a href="%s" style="text-decoration: none;"> ♻</a>'
724 % reference_retry_url
725 )
726 message += ")"
727 if artifact_url:
728 message += ' <a href="%s">[artifacts]</a>' % artifact_url
729 html_archmsg.append(message)
731 # render HTML line for testsrc entry
732 # - if action is or may be required
733 # - for ones own package
734 if (
735 r
736 - {
737 "PASS",
738 "NEUTRAL",
739 "RUNNING-ALWAYSFAIL",
740 "ALWAYSFAIL",
741 "IGNORE-FAIL",
742 }
743 or testsrc == source_name
744 ):
745 if testver:
746 pkg = '<a href="#{0}">{0}</a>/{1}'.format(testsrc, testver)
747 else:
748 pkg = '<a href="#{0}">{0}</a>'.format(testsrc)
749 results_info.append(
750 "autopkgtest for %s: %s" % (pkg, ", ".join(html_archmsg))
751 )
753 if verdict != PolicyVerdict.PASS:
754 # check for force-skiptest hint
755 hints = self.hints.search(
756 "force-skiptest",
757 package=source_name,
758 version=source_data_srcdist.version,
759 )
760 if hints:
761 excuse.addreason("skiptest")
762 excuse.addinfo(
763 "Should wait for tests relating to %s %s, but forced by %s"
764 % (source_name, source_data_srcdist.version, hints[0].user)
765 )
766 verdict = PolicyVerdict.PASS_HINTED
767 else:
768 excuse.addreason("autopkgtest")
770 if (
771 self.options.adt_success_bounty
772 and verdict == PolicyVerdict.PASS
773 and all_self_tests_pass
774 ):
775 excuse.add_bounty("autopkgtest", self.options.adt_success_bounty)
776 if self.options.adt_regression_penalty and verdict in {
777 PolicyVerdict.REJECTED_PERMANENTLY,
778 PolicyVerdict.REJECTED_TEMPORARILY,
779 }:
780 if self.options.adt_regression_penalty > 0: 780 ↛ 783line 780 didn't jump to line 783, because the condition on line 780 was never false
781 excuse.add_penalty("autopkgtest", self.options.adt_regression_penalty)
782 # In case we give penalties instead of blocking, we must always pass
783 verdict = PolicyVerdict.PASS
784 for i in results_info:
785 if verdict.is_rejected:
786 excuse.add_verdict_info(verdict, i)
787 else:
788 excuse.addinfo(i)
790 return verdict
792 #
793 # helper functions
794 #
796 @staticmethod
797 def has_autodep8(srcinfo: SourcePackage) -> bool:
798 """Check if package is covered by autodep8
800 srcinfo is an item from self.britney.sources
801 """
802 # autodep8?
803 for t in srcinfo.testsuite:
804 if t.startswith("autopkgtest-pkg"):
805 return True
807 return False
809 def request_tests_for_source(
810 self,
811 item: MigrationItem,
812 arch: str,
813 source_data_srcdist: SourcePackage,
814 pkg_arch_result: dict[
815 tuple[str, str], dict[str, tuple[str, Optional[str], str]]
816 ],
817 excuse: "Excuse",
818 ) -> None:
819 pkg_universe = self.britney.pkg_universe
820 target_suite = self.suite_info.target_suite
821 source_suite = item.suite
822 sources_t = target_suite.sources
823 sources_s = item.suite.sources
824 packages_s_a = item.suite.binaries[arch]
825 source_name = item.package
826 source_version = source_data_srcdist.version
827 # request tests (unless they were already requested earlier or have a result)
828 tests = self.tests_for_source(source_name, source_version, arch, excuse)
829 is_huge = len(tests) > self.options.adt_huge
831 # Here we figure out what is required from the source suite
832 # for the test to install successfully.
833 #
834 # The ImplicitDependencyPolicy does a similar calculation, but
835 # if I (elbrus) understand correctly, only in the reverse
836 # dependency direction. We are doing something similar here
837 # but in the dependency direction (note: this code is older).
838 # We use the ImplicitDependencyPolicy result for the reverse
839 # dependencies and we keep the code below for the
840 # dependencies. Using the ImplicitDependencyPolicy results
841 # also in the reverse direction seems to require quite some
842 # reorganisation to get that information available here, as in
843 # the current state only the current excuse is available here
844 # and the required other excuses may not be calculated yet.
845 #
846 # Loop over all binary packages from trigger and
847 # recursively look up which *versioned* dependencies are
848 # only satisfied in the source suite.
849 #
850 # For all binaries found, look up which packages they
851 # break/conflict with in the target suite, but not in the
852 # source suite. The main reason to do this is to cover test
853 # dependencies, so we will check Testsuite-Triggers as
854 # well.
855 #
856 # OI: do we need to do the first check in a smart way
857 # (i.e. only for the packages that are actually going to be
858 # installed) for the breaks/conflicts set as well, i.e. do
859 # we need to check if any of the packages that we now
860 # enforce being from the source suite, actually have new
861 # versioned depends and new breaks/conflicts.
862 #
863 # For all binaries found, add the set of unique source
864 # packages to the list of triggers.
866 bin_triggers: set[PackageId] = set()
867 bin_new = set(source_data_srcdist.binaries)
868 for n_binary in iter_except(bin_new.pop, KeyError):
869 if n_binary in bin_triggers:
870 continue
871 bin_triggers.add(n_binary)
873 # Check if there is a dependency that is not
874 # available in the target suite.
875 # We add slightly too much here, because new binaries
876 # will also show up, but they are already properly
877 # installed. Nevermind.
878 depends = pkg_universe.dependencies_of(n_binary)
879 # depends is a frozenset{frozenset{BinaryPackageId, ..}}
880 for deps_of_bin in depends:
881 if target_suite.any_of_these_are_in_the_suite(deps_of_bin):
882 # if any of the alternative dependencies is already
883 # satisfied in the target suite, we can just ignore it
884 continue
885 # We'll figure out which version later
886 bin_new.update(
887 added_pkgs_compared_to_target_suite(deps_of_bin, target_suite)
888 )
890 # Check if the package breaks/conflicts anything. We might
891 # be adding slightly too many source packages due to the
892 # check here as a binary package that is broken may be
893 # coming from a different source package in the source
894 # suite. Nevermind.
895 bin_broken = set()
896 for t_binary in bin_triggers:
897 # broken is a frozenset{BinaryPackageId, ..}
898 broken = pkg_universe.negative_dependencies_of(
899 cast(BinaryPackageId, t_binary)
900 )
901 broken_in_target = {
902 p.package_name
903 for p in target_suite.which_of_these_are_in_the_suite(broken)
904 }
905 broken_in_source = {
906 p.package_name
907 for p in source_suite.which_of_these_are_in_the_suite(broken)
908 }
909 # We want packages with a newer version in the source suite that
910 # no longer has the conflict. This is an approximation
911 broken_filtered = set(
912 p
913 for p in broken
914 if p.package_name in broken_in_target
915 and p.package_name not in broken_in_source
916 )
917 # We add the version in the target suite, but the code below will
918 # change it to the version in the source suite
919 bin_broken.update(broken_filtered)
920 bin_triggers.update(bin_broken)
922 # The ImplicitDependencyPolicy also found packages that need
923 # to migrate together, so add them to the triggers too.
924 for bin_implicit in excuse.depends_packages_flattened:
925 if bin_implicit.architecture == arch:
926 bin_triggers.add(bin_implicit)
928 triggers = set()
929 for t_binary2 in bin_triggers:
930 if t_binary2.architecture == arch:
931 try:
932 source_of_bin = packages_s_a[t_binary2.package_name].source
933 # If the version in the target suite is the same, don't add a trigger.
934 # Note that we looked up the source package in the source suite.
935 # If it were a different source package in the target suite, however, then
936 # we would not have this source package in the same version anyway.
937 if (
938 sources_t.get(source_of_bin, None) is None
939 or sources_s[source_of_bin].version
940 != sources_t[source_of_bin].version
941 ):
942 triggers.add(
943 source_of_bin + "/" + sources_s[source_of_bin].version
944 )
945 except KeyError:
946 # Apparently the package was removed from
947 # unstable e.g. if packages are replaced
948 # (e.g. -dbg to -dbgsym)
949 pass
950 if t_binary2 not in source_data_srcdist.binaries:
951 for tdep_src in self.testsuite_triggers.get( 951 ↛ 954line 951 didn't jump to line 954, because the loop on line 951 never started
952 t_binary2.package_name, set()
953 ):
954 try:
955 # Only add trigger if versions in the target and source suites are different
956 if (
957 sources_t.get(tdep_src, None) is None
958 or sources_s[tdep_src].version
959 != sources_t[tdep_src].version
960 ):
961 triggers.add(
962 tdep_src + "/" + sources_s[tdep_src].version
963 )
964 except KeyError:
965 # Apparently the source was removed from
966 # unstable (testsuite_triggers are unified
967 # over all suites)
968 pass
969 trigger = source_name + "/" + source_version
970 triggers.discard(trigger)
971 triggers_list = sorted(list(triggers))
972 triggers_list.insert(0, trigger)
974 for testsrc, testver in tests:
975 self.pkg_test_request(testsrc, arch, triggers_list, huge=is_huge)
976 (result, real_ver, run_id, url) = self.pkg_test_result(
977 testsrc, testver, arch, trigger
978 )
979 pkg_arch_result[(testsrc, real_ver)][arch] = (result, run_id, url)
981 def tests_for_source(
982 self, src: str, ver: str, arch: str, excuse: "Excuse"
983 ) -> list[tuple[str, str]]:
984 """Iterate over all tests that should be run for given source and arch"""
986 source_suite = self.suite_info.primary_source_suite
987 target_suite = self.suite_info.target_suite
988 sources_info = target_suite.sources
989 binaries_info = target_suite.binaries[arch]
991 reported_pkgs = set()
993 tests = []
995 # Debian doesn't have linux-meta, but Ubuntu does
996 # for linux themselves we don't want to trigger tests -- these should
997 # all come from linux-meta*. A new kernel ABI without a corresponding
998 # -meta won't be installed and thus we can't sensibly run tests against
999 # it.
1000 if ( 1000 ↛ 1004line 1000 didn't jump to line 1004
1001 src.startswith("linux")
1002 and src.replace("linux", "linux-meta") in sources_info
1003 ):
1004 return []
1006 # we want to test the package itself, if it still has a test in unstable
1007 # but only if the package actually exists on this arch
1008 srcinfo = source_suite.sources[src]
1009 if ("autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo)) and len(
1010 excuse.packages[arch]
1011 ) > 0:
1012 reported_pkgs.add(src)
1013 tests.append((src, ver))
1015 extra_bins = []
1016 # Debian doesn't have linux-meta, but Ubuntu does
1017 # Hack: For new kernels trigger all DKMS packages by pretending that
1018 # linux-meta* builds a "dkms" binary as well. With that we ensure that we
1019 # don't regress DKMS drivers with new kernel versions.
1020 if src.startswith("linux-meta"):
1021 # does this have any image on this arch?
1022 for pkg_id in srcinfo.binaries:
1023 if pkg_id.architecture == arch and "-image" in pkg_id.package_name:
1024 try:
1025 extra_bins.append(binaries_info["dkms"].pkg_id)
1026 except KeyError:
1027 pass
1029 if not self.has_built_on_this_arch_or_is_arch_all(srcinfo, arch):
1030 return []
1032 pkg_universe = self.britney.pkg_universe
1033 # plus all direct reverse dependencies and test triggers of its
1034 # binaries which have an autopkgtest
1035 for binary in itertools.chain(srcinfo.binaries, extra_bins):
1036 rdeps = pkg_universe.reverse_dependencies_of(binary)
1037 for rdep in rdeps:
1038 try:
1039 rdep_src = binaries_info[rdep.package_name].source
1040 # Don't re-trigger the package itself here; this should
1041 # have been done above if the package still continues to
1042 # have an autopkgtest in unstable.
1043 if rdep_src == src:
1044 continue
1045 except KeyError:
1046 continue
1048 rdep_src_info = sources_info[rdep_src]
1049 if "autopkgtest" in rdep_src_info.testsuite or self.has_autodep8(
1050 rdep_src_info
1051 ):
1052 if rdep_src not in reported_pkgs:
1053 tests.append((rdep_src, rdep_src_info.version))
1054 reported_pkgs.add(rdep_src)
1056 for tdep_src in self.testsuite_triggers.get(binary.package_name, set()):
1057 if tdep_src not in reported_pkgs:
1058 try:
1059 tdep_src_info = sources_info[tdep_src]
1060 except KeyError:
1061 continue
1062 if "autopkgtest" in tdep_src_info.testsuite or self.has_autodep8( 1062 ↛ 1056line 1062 didn't jump to line 1056, because the condition on line 1062 was never false
1063 tdep_src_info
1064 ):
1065 for pkg_id in tdep_src_info.binaries: 1065 ↛ 1056line 1065 didn't jump to line 1056, because the loop on line 1065 didn't complete
1066 if pkg_id.architecture == arch:
1067 tests.append((tdep_src, tdep_src_info.version))
1068 reported_pkgs.add(tdep_src)
1069 break
1071 tests.sort(key=lambda s_v: s_v[0])
1072 return tests
1074 def read_pending_tests(self) -> None:
1075 """Read pending test requests from previous britney runs
1077 Initialize self.pending_tests with that data.
1078 """
1079 assert self.pending_tests is None, "already initialized"
1080 if not os.path.exists(self.pending_tests_file):
1081 self.logger.info(
1082 "No %s, starting with no pending tests", self.pending_tests_file
1083 )
1084 self.pending_tests = {}
1085 return
1086 with open(self.pending_tests_file) as f:
1087 self.pending_tests = json.load(f)
1088 if VERSION_KEY in self.pending_tests:
1089 del self.pending_tests[VERSION_KEY]
1090 for trigger in list(self.pending_tests.keys()):
1091 for pkg in list(self.pending_tests[trigger].keys()):
1092 arch_dict = self.pending_tests[trigger][pkg]
1093 for arch in list(arch_dict.keys()):
1094 if (
1095 self._now - arch_dict[arch]
1096 > self.options.adt_pending_max_age
1097 ):
1098 del arch_dict[arch]
1099 if not arch_dict:
1100 del self.pending_tests[trigger][pkg]
1101 if not self.pending_tests[trigger]:
1102 del self.pending_tests[trigger]
1103 else:
1104 # Migration code:
1105 for trigger_data in self.pending_tests.values(): 1105 ↛ 1106line 1105 didn't jump to line 1106, because the loop on line 1105 never started
1106 for pkg, arch_list in trigger_data.items():
1107 trigger_data[pkg] = {}
1108 for arch in arch_list:
1109 trigger_data[pkg][arch] = self._now
1111 self.logger.info(
1112 "Read pending requested tests from %s", self.pending_tests_file
1113 )
1114 self.logger.debug("%s", self.pending_tests)
1116 # this requires iterating over all triggers and thus is expensive;
1117 # cache the results
1118 @lru_cache(None)
1119 def latest_run_for_package(self, src: str, arch: str) -> str:
1120 """Return latest run ID for src on arch"""
1122 latest_run_id = ""
1123 for srcmap in self.test_results.values():
1124 try:
1125 run_id = srcmap[src][arch][2]
1126 except KeyError:
1127 continue
1128 if run_id > latest_run_id:
1129 latest_run_id = run_id
1130 return latest_run_id
1132 @lru_cache(None)
1133 def fetch_swift_results(self, swift_url: str, src: str, arch: str) -> None:
1134 """Download new results for source package/arch from swift"""
1136 # prepare query: get all runs with a timestamp later than the latest
1137 # run_id for this package/arch; '@' is at the end of each run id, to
1138 # mark the end of a test run directory path
1139 # example: <autopkgtest-wily>wily/amd64/libp/libpng/20150630_054517@/result.tar
1140 query = {
1141 "delimiter": "@",
1142 "prefix": "%s/%s/%s/%s/" % (self.options.series, arch, srchash(src), src),
1143 }
1145 # determine latest run_id from results
1146 if not self.options.adt_shared_results_cache:
1147 latest_run_id = self.latest_run_for_package(src, arch)
1148 if latest_run_id:
1149 query["marker"] = query["prefix"] + latest_run_id
1151 # request new results from swift
1152 url = os.path.join(swift_url, self.swift_container)
1153 url += "?" + urllib.parse.urlencode(query)
1154 f = None
1155 try:
1156 f = urlopen(url, timeout=30)
1157 if f.getcode() == 200:
1158 result_paths = f.read().decode().strip().splitlines()
1159 elif f.getcode() == 204: # No content 1159 ↛ 1165line 1159 didn't jump to line 1165, because the condition on line 1159 was never false
1160 result_paths = []
1161 else:
1162 # we should not ever end up here as we expect a HTTPError in
1163 # other cases; e. g. 3XX is something that tells us to adjust
1164 # our URLS, so fail hard on those
1165 raise NotImplementedError(
1166 "fetch_swift_results(%s): cannot handle HTTP code %i"
1167 % (url, f.getcode())
1168 )
1169 except IOError as e:
1170 # 401 "Unauthorized" is swift's way of saying "container does not exist"
1171 if getattr(e, "code", -1) == 401: 1171 ↛ 1180line 1171 didn't jump to line 1180, because the condition on line 1171 was never false
1172 self.logger.info(
1173 "fetch_swift_results: %s does not exist yet or is inaccessible", url
1174 )
1175 return
1176 # Other status codes are usually a transient
1177 # network/infrastructure failure. Ignoring this can lead to
1178 # re-requesting tests which we already have results for, so
1179 # fail hard on this and let the next run retry.
1180 self.logger.error("Failure to fetch swift results from %s: %s", url, str(e))
1181 sys.exit(1)
1182 finally:
1183 if f is not None: 1183 ↛ 1186line 1183 didn't jump to line 1186, because the condition on line 1183 was never false
1184 f.close() 1184 ↛ exitline 1184 didn't return from function 'fetch_swift_results', because the return on line 1175 wasn't executed
1186 for p in result_paths:
1187 self.fetch_one_result(
1188 os.path.join(swift_url, self.swift_container, p, "result.tar"),
1189 src,
1190 arch,
1191 )
1193 def fetch_one_result(self, url: str, src: str, arch: str) -> None:
1194 """Download one result URL for source/arch
1196 Remove matching pending_tests entries.
1197 """
1198 f = None
1199 try:
1200 f = urlopen(url, timeout=30)
1201 if f.getcode() == 200: 1201 ↛ 1204line 1201 didn't jump to line 1204, because the condition on line 1201 was never false
1202 tar_bytes = io.BytesIO(f.read())
1203 else:
1204 raise NotImplementedError(
1205 "fetch_one_result(%s): cannot handle HTTP code %i"
1206 % (url, f.getcode())
1207 )
1208 except IOError as err:
1209 self.logger.error("Failure to fetch %s: %s", url, str(err))
1210 # we tolerate "not found" (something went wrong on uploading the
1211 # result), but other things indicate infrastructure problems
1212 if getattr(err, "code", -1) == 404:
1213 return
1214 sys.exit(1)
1215 finally:
1216 if f is not None: 1216 ↛ exit, 1216 ↛ 12182 missed branches: 1) line 1216 didn't return from function 'fetch_one_result', because the return on line 1213 wasn't executed, 2) line 1216 didn't jump to line 1218, because the condition on line 1216 was never false
1217 f.close() 1217 ↛ exitline 1217 didn't return from function 'fetch_one_result', because the return on line 1213 wasn't executed
1218 try:
1219 with tarfile.open(None, "r", tar_bytes) as tar:
1220 exitcode = int(tar.extractfile("exitcode").read().strip()) # type: ignore[union-attr]
1221 srcver = tar.extractfile("testpkg-version").read().decode().strip() # type: ignore[union-attr]
1222 (ressrc, ver) = srcver.split()
1223 testinfo = json.loads(tar.extractfile("testinfo.json").read().decode()) # type: ignore[union-attr]
1224 except (KeyError, ValueError, tarfile.TarError) as err:
1225 self.logger.error("%s is damaged, ignoring: %s", url, str(err))
1226 # ignore this; this will leave an orphaned request in autopkgtest-pending.json
1227 # and thus require manual retries after fixing the tmpfail, but we
1228 # can't just blindly attribute it to some pending test.
1229 return
1231 if src != ressrc: 1231 ↛ 1232line 1231 didn't jump to line 1232, because the condition on line 1231 was never true
1232 self.logger.error(
1233 "%s is a result for package %s, but expected package %s",
1234 url,
1235 ressrc,
1236 src,
1237 )
1238 return
1240 # parse recorded triggers in test result
1241 for e in testinfo.get("custom_environment", []): 1241 ↛ 1246line 1241 didn't jump to line 1246, because the loop on line 1241 didn't complete
1242 if e.startswith("ADT_TEST_TRIGGERS="): 1242 ↛ 1241line 1242 didn't jump to line 1241, because the condition on line 1242 was never false
1243 result_triggers = [i for i in e.split("=", 1)[1].split() if "/" in i]
1244 break
1245 else:
1246 self.logger.error("%s result has no ADT_TEST_TRIGGERS, ignoring")
1247 return
1249 run_id = os.path.basename(os.path.dirname(url))
1250 seen = round(calendar.timegm(time.strptime(run_id, "%Y%m%d_%H%M%S@")))
1251 # allow some skipped tests, but nothing else
1252 if exitcode in [0, 2]:
1253 result = Result.PASS
1254 elif exitcode == 8: 1254 ↛ 1255line 1254 didn't jump to line 1255, because the condition on line 1254 was never true
1255 result = Result.NEUTRAL
1256 else:
1257 result = Result.FAIL
1259 self.logger.info(
1260 "Fetched test result for %s/%s/%s %s (triggers: %s): %s",
1261 src,
1262 ver,
1263 arch,
1264 run_id,
1265 result_triggers,
1266 result.name.lower(),
1267 )
1269 # remove matching test requests
1270 for trigger in result_triggers:
1271 self.remove_from_pending(trigger, src, arch)
1273 # add this result
1274 for trigger in result_triggers:
1275 self.add_trigger_to_results(trigger, src, ver, arch, run_id, seen, result)
1277 def remove_from_pending(
1278 self, trigger: str, src: str, arch: str, timestamp: int = sys.maxsize
1279 ) -> None:
1280 assert self.pending_tests is not None # for type checking
1281 try:
1282 arch_dict = self.pending_tests[trigger][src]
1283 if timestamp < arch_dict[arch]:
1284 # The result is from before the moment of scheduling, so it's
1285 # not the one we're waiting for
1286 return
1287 del arch_dict[arch]
1288 if not arch_dict:
1289 del self.pending_tests[trigger][src]
1290 if not self.pending_tests[trigger]:
1291 del self.pending_tests[trigger]
1292 self.logger.debug(
1293 "-> matches pending request %s/%s for trigger %s", src, arch, trigger
1294 )
1295 except KeyError:
1296 self.logger.debug(
1297 "-> does not match any pending request for %s/%s", src, arch
1298 )
1300 def add_trigger_to_results(
1301 self,
1302 trigger: str,
1303 src: str,
1304 ver: str,
1305 arch: str,
1306 run_id: str,
1307 timestamp: int,
1308 status_to_add: Result,
1309 ) -> None:
1310 # Ensure that we got a new enough version
1311 try:
1312 (trigsrc, trigver) = trigger.split("/", 1)
1313 except ValueError:
1314 self.logger.info("Ignoring invalid test trigger %s", trigger)
1315 return
1316 if trigsrc == src and apt_pkg.version_compare(ver, trigver) < 0: 1316 ↛ 1317line 1316 didn't jump to line 1317, because the condition on line 1316 was never true
1317 self.logger.debug(
1318 "test trigger %s, but run for older version %s, ignoring", trigger, ver
1319 )
1320 return
1322 stored_result = (
1323 self.test_results.setdefault(trigger, {})
1324 .setdefault(src, {})
1325 .setdefault(arch, [Result.FAIL, None, "", 0])
1326 )
1328 # reruns shouldn't flip the result from PASS or NEUTRAL to
1329 # FAIL, so remember the most recent version of the best result
1330 # we've seen. Except for reference updates, which we always
1331 # want to update with the most recent result. The result data
1332 # may not be ordered by timestamp, so we need to check time.
1333 update = False
1334 if self.options.adt_baseline == "reference" and trigger == REF_TRIG:
1335 if stored_result[3] < timestamp:
1336 update = True
1337 elif status_to_add < stored_result[0]:
1338 update = True
1339 elif status_to_add == stored_result[0] and stored_result[3] < timestamp:
1340 update = True
1342 if update:
1343 stored_result[0] = status_to_add
1344 stored_result[1] = ver
1345 stored_result[2] = run_id
1346 stored_result[3] = timestamp
1348 def send_test_request(
1349 self, src: str, arch: str, triggers: list[str], huge: bool = False
1350 ) -> None:
1351 """Send out AMQP request for testing src/arch for triggers
1353 If huge is true, then the request will be put into the -huge instead of
1354 normal queue.
1355 """
1356 if self.options.dry_run: 1356 ↛ 1357line 1356 didn't jump to line 1357, because the condition on line 1356 was never true
1357 return
1359 params: dict[str, Any] = {"triggers": triggers}
1360 if self.options.adt_ppas:
1361 params["ppas"] = self.options.adt_ppas
1362 qname = "debci-ppa-%s-%s" % (self.options.series, arch)
1363 elif huge:
1364 qname = "debci-huge-%s-%s" % (self.options.series, arch)
1365 else:
1366 qname = "debci-%s-%s" % (self.options.series, arch)
1367 params["submit-time"] = time.strftime("%Y-%m-%d %H:%M:%S%z", time.gmtime())
1369 if self.amqp_channel: 1369 ↛ 1370line 1369 didn't jump to line 1370, because the condition on line 1369 was never true
1370 self.amqp_channel.basic_publish(
1371 amqp.Message(
1372 src + "\n" + json.dumps(params), delivery_mode=2
1373 ), # persistent
1374 routing_key=qname,
1375 )
1376 # we save pending.json with every request, so that if britney
1377 # crashes we don't re-request tests. This is only needed when using
1378 # real amqp, as with file-based submission the pending tests are
1379 # returned by debci along with the results each run.
1380 self.save_pending_json()
1381 else:
1382 # for file-based submission, triggers are space separated
1383 params["triggers"] = [" ".join(params["triggers"])]
1384 assert self.amqp_file_handle
1385 self.amqp_file_handle.write("%s:%s %s\n" % (qname, src, json.dumps(params)))
1387 def pkg_test_request(
1388 self, src: str, arch: str, all_triggers: list[str], huge: bool = False
1389 ) -> None:
1390 """Request one package test for a set of triggers
1392 all_triggers is a list of "pkgname/version". These are the packages
1393 that will be taken from the source suite. The first package in this
1394 list is the package that triggers the testing of src, the rest are
1395 additional packages required for installability of the test deps. If
1396 huge is true, then the request will be put into the -huge instead of
1397 normal queue.
1399 This will only be done if that test wasn't already requested in
1400 a previous run (i. e. if it's not already in self.pending_tests)
1401 or if there is already a fresh or a positive result for it. This
1402 ensures to download current results for this package before
1403 requesting any test."""
1404 trigger = all_triggers[0]
1405 uses_swift = not self.options.adt_swift_url.startswith("file://")
1406 try:
1407 result = self.test_results[trigger][src][arch]
1408 has_result = True
1409 except KeyError:
1410 has_result = False
1412 if has_result:
1413 result_state = result[0]
1414 if result_state in {Result.OLD_PASS, Result.OLD_FAIL, Result.OLD_NEUTRAL}:
1415 pass
1416 elif (
1417 result_state == Result.FAIL
1418 and self.result_in_baseline(src, arch)[0]
1419 in {Result.PASS, Result.NEUTRAL, Result.OLD_PASS, Result.OLD_NEUTRAL}
1420 and self._now - result[3] > self.options.adt_retry_older_than
1421 ):
1422 # We might want to retry this failure, so continue
1423 pass
1424 elif not uses_swift:
1425 # We're done if we don't retrigger and we're not using swift
1426 return
1427 elif result_state in {Result.PASS, Result.NEUTRAL}:
1428 self.logger.debug(
1429 "%s/%s triggered by %s already known", src, arch, trigger
1430 )
1431 return
1433 # Without swift we don't expect new results
1434 if uses_swift:
1435 self.logger.info(
1436 "Checking for new results for failed %s/%s for trigger %s",
1437 src,
1438 arch,
1439 trigger,
1440 )
1441 self.fetch_swift_results(self.options.adt_swift_url, src, arch)
1442 # do we have one now?
1443 try:
1444 self.test_results[trigger][src][arch]
1445 return
1446 except KeyError:
1447 pass
1449 self.request_test_if_not_queued(src, arch, trigger, all_triggers, huge=huge)
1451 def request_test_if_not_queued(
1452 self,
1453 src: str,
1454 arch: str,
1455 trigger: str,
1456 all_triggers: list[str] = [],
1457 huge: bool = False,
1458 ) -> None:
1459 assert self.pending_tests is not None # for type checking
1460 if not all_triggers:
1461 all_triggers = [trigger]
1463 # Don't re-request if it's already pending
1464 arch_dict = self.pending_tests.setdefault(trigger, {}).setdefault(src, {})
1465 if arch in arch_dict.keys():
1466 self.logger.debug(
1467 "Test %s/%s for %s is already pending, not queueing", src, arch, trigger
1468 )
1469 else:
1470 self.logger.debug(
1471 "Requesting %s autopkgtest on %s to verify %s", src, arch, trigger
1472 )
1473 arch_dict[arch] = self._now
1474 self.send_test_request(src, arch, all_triggers, huge=huge)
1476 def result_in_baseline(self, src: str, arch: str) -> list[Any]:
1477 """Get the result for src on arch in the baseline
1479 The baseline is optionally all data or a reference set)
1480 """
1482 # this requires iterating over all cached results and thus is expensive;
1483 # cache the results
1484 try:
1485 return self.result_in_baseline_cache[src][arch]
1486 except KeyError:
1487 pass
1489 result_reference: list[Any] = [Result.NONE, None, "", 0]
1490 if self.options.adt_baseline == "reference":
1491 if src not in self.suite_info.target_suite.sources:
1492 return result_reference
1494 try:
1495 result_reference = self.test_results[REF_TRIG][src][arch]
1496 self.logger.debug(
1497 "Found result for src %s in reference: %s",
1498 src,
1499 result_reference[0].name,
1500 )
1501 except KeyError:
1502 self.logger.debug(
1503 "Found NO result for src %s in reference: %s",
1504 src,
1505 result_reference[0].name,
1506 )
1507 self.result_in_baseline_cache[src][arch] = deepcopy(result_reference)
1508 return result_reference
1510 result_ever: list[Any] = [Result.FAIL, None, "", 0]
1511 for srcmap in self.test_results.values():
1512 try:
1513 if srcmap[src][arch][0] != Result.FAIL:
1514 result_ever = srcmap[src][arch]
1515 # If we are not looking at a reference run, We don't really
1516 # care about anything except the status, so we're done
1517 # once we find a PASS.
1518 if result_ever[0] == Result.PASS:
1519 break
1520 except KeyError:
1521 pass
1523 self.result_in_baseline_cache[src][arch] = deepcopy(result_ever)
1524 self.logger.debug("Result for src %s ever: %s", src, result_ever[0].name)
1525 return result_ever
1527 def has_test_in_target(self, src: str) -> bool:
1528 test_in_target = False
1529 try:
1530 srcinfo = self.suite_info.target_suite.sources[src]
1531 if "autopkgtest" in srcinfo.testsuite or self.has_autodep8(srcinfo):
1532 test_in_target = True
1533 # AttributeError is only needed for the test suite as
1534 # srcinfo can be a NoneType
1535 except (KeyError, AttributeError):
1536 pass
1538 return test_in_target
1540 def pkg_test_result(
1541 self, src: str, ver: str, arch: str, trigger: str
1542 ) -> tuple[str, str, Optional[str], str]:
1543 """Get current test status of a particular package
1545 Return (status, real_version, run_id, log_url) tuple; status is a key in
1546 EXCUSES_LABELS. run_id is None if the test is still running.
1547 """
1548 assert self.pending_tests is not None # for type checking
1549 # determine current test result status
1550 run_id = None
1551 try:
1552 r = self.test_results[trigger][src][arch]
1553 ver = r[1]
1554 run_id = r[2]
1556 if r[0] in {Result.FAIL, Result.OLD_FAIL}:
1557 # determine current test result status
1558 baseline_result = self.result_in_baseline(src, arch)[0]
1560 # Special-case triggers from linux-meta*: we cannot compare
1561 # results against different kernels, as e. g. a DKMS module
1562 # might work against the default kernel but fail against a
1563 # different flavor; so for those, ignore the "ever
1564 # passed" check; FIXME: check against trigsrc only
1565 if self.options.adt_baseline != "reference" and (
1566 trigger.startswith("linux-meta") or trigger.startswith("linux/")
1567 ):
1568 baseline_result = Result.FAIL
1570 # Check if the autopkgtest (still) exists in the target suite
1571 test_in_target = self.has_test_in_target(src)
1573 if test_in_target and baseline_result in {
1574 Result.NONE,
1575 Result.OLD_FAIL,
1576 Result.OLD_NEUTRAL,
1577 Result.OLD_PASS,
1578 }:
1579 self.request_test_if_not_queued(src, arch, REF_TRIG)
1581 result = "REGRESSION"
1582 if baseline_result in {Result.FAIL, Result.OLD_FAIL}:
1583 result = "ALWAYSFAIL"
1584 elif baseline_result == Result.NONE and test_in_target: 1584 ↛ 1585line 1584 didn't jump to line 1585, because the condition on line 1584 was never true
1585 result = "RUNNING-REFERENCE"
1587 if self.options.adt_ignore_failure_for_new_tests and not test_in_target:
1588 result = "ALWAYSFAIL"
1590 if self.has_force_badtest(src, ver, arch):
1591 result = "IGNORE-FAIL"
1592 else:
1593 result = r[0].name
1595 url = self.format_log_url(src, arch, run_id)
1596 except KeyError:
1597 # no result for src/arch; still running?
1598 if arch in self.pending_tests.get(trigger, {}).get(src, {}).keys(): 1598 ↛ 1613line 1598 didn't jump to line 1613, because the condition on line 1598 was never false
1599 baseline_result = self.result_in_baseline(src, arch)[0]
1600 if (
1601 self.options.adt_ignore_failure_for_new_tests
1602 and not self.has_test_in_target(src)
1603 ):
1604 result = "RUNNING-ALWAYSFAIL"
1605 elif baseline_result != Result.FAIL and not self.has_force_badtest(
1606 src, ver, arch
1607 ):
1608 result = "RUNNING"
1609 else:
1610 result = "RUNNING-ALWAYSFAIL"
1611 url = self.options.adt_ci_url + "status/pending"
1612 else:
1613 raise RuntimeError(
1614 "Result for %s/%s/%s (triggered by %s) is neither known nor pending!"
1615 % (src, ver, arch, trigger)
1616 )
1618 return (result, ver, run_id, url)
1620 def has_force_badtest(self, src: str, ver: str, arch: str) -> bool:
1621 """Check if src/ver/arch has a force-badtest hint"""
1623 assert self.hints is not None
1624 hints = self.hints.search("force-badtest", package=src)
1625 if hints:
1626 self.logger.info(
1627 "Checking hints for %s/%s/%s: %s",
1628 src,
1629 ver,
1630 arch,
1631 [str(h) for h in hints],
1632 )
1633 for hint in hints:
1634 if [
1635 mi
1636 for mi in hint.packages
1637 if mi.architecture in ["source", arch]
1638 and (
1639 mi.version == "all"
1640 or apt_pkg.version_compare(ver, mi.version) <= 0 # type: ignore[arg-type]
1641 )
1642 ]:
1643 return True
1645 return False
1647 def has_built_on_this_arch_or_is_arch_all(
1648 self, src_data: SourcePackage, arch: str
1649 ) -> bool:
1650 """When a source builds arch:all binaries, those binaries are
1651 added to all architectures and thus the source 'exists'
1652 everywhere. This function checks if the source has any arch
1653 specific binaries on this architecture and if not, if it
1654 has them on any architecture.
1655 """
1656 packages_s_a = self.suite_info.primary_source_suite.binaries[arch]
1657 has_unknown_binary = False
1658 for binary_s in src_data.binaries:
1659 try:
1660 binary_u = packages_s_a[binary_s.package_name]
1661 except KeyError:
1662 # src_data.binaries has all the built binaries, so if
1663 # we get here, we know that at least one architecture
1664 # has architecture specific binaries
1665 has_unknown_binary = True
1666 continue
1667 if binary_u.architecture == arch:
1668 return True
1669 # If we get here, we have only seen arch:all packages for this
1670 # arch.
1671 return not has_unknown_binary