Coverage for britney2/policies/policy.py: 91%
1378 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-06-17 09:00 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-06-17 09:00 +0000
1import json
2import logging
3import optparse
4import os
5import re
6import sys
7import time
8from abc import ABC, abstractmethod
9from collections import defaultdict
10from collections.abc import Callable, Container, Generator
11from dataclasses import dataclass, field
12from enum import Enum, IntEnum, StrEnum, auto, unique
13from itertools import chain
14from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, cast
15from urllib.parse import quote
17import apt_pkg
18import yaml
20from britney2 import (
21 BinaryPackage,
22 BinaryPackageId,
23 DependencyType,
24 PackageId,
25 SourcePackage,
26 Suite,
27 SuiteClass,
28 Suites,
29 TargetSuite,
30)
31from britney2.excusedeps import DependencySpec
32from britney2.hints import (
33 Hint,
34 HintAnnotate,
35 HintCollection,
36 HintParser,
37 HintType,
38 PolicyHintParserProto,
39)
40from britney2.inputs.suiteloader import SuiteContentLoader
41from britney2.migrationitem import MigrationItem, MigrationItemFactory
42from britney2.policies import ApplySrcPolicy, PolicyVerdict
43from britney2.utils import (
44 GetDependencySolversProto,
45 binaries_from_source_version,
46 compute_reverse_tree,
47 filter_out_faux,
48 filter_out_faux_gen,
49 find_newer_binaries,
50 get_component,
51 get_dependency_solvers,
52 is_smooth_update_allowed,
53 parse_option,
54)
56if TYPE_CHECKING: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true
57 from ..britney import Britney
58 from ..excuse import Excuse
59 from ..installability.universe import BinaryPackageUniverse
62class PolicyLoadRequest:
63 __slots__ = ("_options_name", "_default_value", "_policy_constructor")
65 def __init__(
66 self,
67 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"],
68 options_name: str | None,
69 default_value: bool,
70 ) -> None:
71 self._policy_constructor = policy_constructor
72 self._options_name = options_name
73 self._default_value = default_value
75 def is_enabled(self, options: optparse.Values) -> bool:
76 if self._options_name is None:
77 assert self._default_value
78 return True
79 actual_value = getattr(options, self._options_name, None)
80 if actual_value is None:
81 return self._default_value
82 return actual_value.lower() in ("yes", "y", "true", "t")
84 def load(self, options: optparse.Values, suite_info: Suites) -> "BasePolicy":
85 return self._policy_constructor(options, suite_info)
87 @classmethod
88 def always_load(
89 cls, policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"]
90 ) -> "PolicyLoadRequest":
91 return cls(policy_constructor, None, True)
93 @classmethod
94 def conditionally_load(
95 cls,
96 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"],
97 option_name: str,
98 default_value: bool,
99 ) -> "PolicyLoadRequest":
100 return cls(policy_constructor, option_name, default_value)
103class PolicyEngine:
104 def __init__(self) -> None:
105 self._policies: list["BasePolicy"] = []
107 def add_policy(self, policy: "BasePolicy") -> None:
108 self._policies.append(policy)
110 def load_policies(
111 self,
112 options: optparse.Values,
113 suite_info: Suites,
114 policy_load_requests: list[PolicyLoadRequest],
115 ) -> None:
116 for policy_load_request in policy_load_requests:
117 if policy_load_request.is_enabled(options):
118 self.add_policy(policy_load_request.load(options, suite_info))
120 def register_policy_hints(self, hint_parser: HintParser) -> None:
121 for policy in self._policies:
122 policy.register_hints(hint_parser)
124 def initialise(self, britney: "Britney", hints: HintCollection) -> None:
125 for policy in self._policies:
126 policy.hints = hints
127 policy.initialise(britney)
129 def save_state(self, britney: "Britney") -> None:
130 for policy in self._policies:
131 policy.save_state(britney)
133 def apply_src_policies(
134 self,
135 source_t: SourcePackage | None,
136 source_u: SourcePackage,
137 excuse: "Excuse",
138 ) -> None:
139 excuse_verdict = excuse.policy_verdict
140 source_suite = excuse.item.suite
141 suite_class = source_suite.suite_class
142 for policy in self._policies:
143 pinfo: dict[str, Any] = {}
144 policy_verdict = PolicyVerdict.NOT_APPLICABLE
145 if suite_class in policy.applicable_suites:
146 if policy.src_policy.run_arch:
147 for arch in policy.options.architectures:
148 v = policy.apply_srcarch_policy_impl(
149 pinfo, arch, source_t, source_u, excuse
150 )
151 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v)
152 if policy.src_policy.run_src:
153 v = policy.apply_src_policy_impl(pinfo, source_t, source_u, excuse)
154 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v)
155 # The base policy provides this field, so the subclass should leave it blank
156 assert "verdict" not in pinfo
157 if policy_verdict is not PolicyVerdict.NOT_APPLICABLE:
158 excuse.policy_info[policy.policy_id] = pinfo
159 pinfo["verdict"] = policy_verdict.name
160 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict)
161 excuse.policy_verdict = excuse_verdict
163 def apply_srcarch_policies(
164 self,
165 arch: str,
166 source_t: SourcePackage | None,
167 source_u: SourcePackage,
168 excuse: "Excuse",
169 ) -> None:
170 excuse_verdict = excuse.policy_verdict
171 source_suite = excuse.item.suite
172 suite_class = source_suite.suite_class
173 for policy in self._policies:
174 pinfo: dict[str, Any] = {}
175 if suite_class in policy.applicable_suites:
176 policy_verdict = policy.apply_srcarch_policy_impl(
177 pinfo, arch, source_t, source_u, excuse
178 )
179 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict)
180 # The base policy provides this field, so the subclass should leave it blank
181 assert "verdict" not in pinfo
182 if policy_verdict is not PolicyVerdict.NOT_APPLICABLE:
183 excuse.policy_info[policy.policy_id] = pinfo
184 pinfo["verdict"] = policy_verdict.name
185 excuse.policy_verdict = excuse_verdict
188class BasePolicy(ABC):
189 britney: "Britney"
190 policy_id: str
191 hints: HintCollection | None
192 applicable_suites: set[SuiteClass]
193 src_policy: ApplySrcPolicy
194 options: optparse.Values
195 suite_info: Suites
197 def __init__(
198 self,
199 options: optparse.Values,
200 suite_info: Suites,
201 ) -> None:
202 """The BasePolicy constructor
204 :param options: The options member of Britney with all the
205 config values.
206 """
208 @property
209 @abstractmethod
210 def state_dir(self) -> str: ... 210 ↛ exitline 210 didn't return from function 'state_dir' because
212 def register_hints(self, hint_parser: HintParser) -> None: # pragma: no cover
213 """Register new hints that this policy accepts
215 :param hint_parser: (see HintParser.register_hint_type)
216 """
218 def initialise(self, britney: "Britney") -> None: # pragma: no cover
219 """Called once to make the policy initialise any data structures
221 This is useful for e.g. parsing files or other "heavy do-once" work.
223 :param britney: This is the instance of the "Britney" class.
224 """
225 self.britney = britney
227 def save_state(self, britney: "Britney") -> None: # pragma: no cover
228 """Called once at the end of the run to make the policy save any persistent data
230 Note this will *not* be called for "dry-runs" as such runs should not change
231 the state.
233 :param britney: This is the instance of the "Britney" class.
234 """
236 def apply_src_policy_impl(
237 self,
238 policy_info: dict[str, Any],
239 source_data_tdist: SourcePackage | None,
240 source_data_srcdist: SourcePackage,
241 excuse: "Excuse",
242 ) -> PolicyVerdict: # pragma: no cover
243 """Apply a policy on a given source migration
245 Britney will call this method on a given source package, when
246 Britney is considering to migrate it from the given source
247 suite to the target suite. The policy will then evaluate the
248 the migration and then return a verdict.
250 :param policy_info: A dictionary of all policy results. The
251 policy can add a value stored in a key related to its name.
252 (e.g. policy_info['age'] = {...}). This will go directly into
253 the "excuses.yaml" output.
255 :param source_data_tdist: Information about the source package
256 in the target distribution (e.g. "testing"). This is the
257 data structure in source_suite.sources[source_name]
259 :param source_data_srcdist: Information about the source
260 package in the source distribution (e.g. "unstable" or "tpu").
261 This is the data structure in target_suite.sources[source_name]
263 :return: A Policy Verdict (e.g. PolicyVerdict.PASS)
264 """
265 return PolicyVerdict.NOT_APPLICABLE
267 def apply_srcarch_policy_impl(
268 self,
269 policy_info: dict[str, Any],
270 arch: str,
271 source_data_tdist: SourcePackage | None,
272 source_data_srcdist: SourcePackage,
273 excuse: "Excuse",
274 ) -> PolicyVerdict:
275 """Apply a policy on a given binary migration
277 Britney will call this method on binaries from a given source package
278 on a given architecture, when Britney is considering to migrate them
279 from the given source suite to the target suite. The policy will then
280 evaluate the migration and then return a verdict.
282 :param policy_info: A dictionary of all policy results. The
283 policy can add a value stored in a key related to its name.
284 (e.g. policy_info['age'] = {...}). This will go directly into
285 the "excuses.yaml" output.
287 :param arch: The architecture the item is applied to. This is mostly
288 relevant for policies where src_policy is not ApplySrcPolicy.RUN_SRC
289 (as that is the only case where arch can differ from item.architecture)
291 :param source_data_tdist: Information about the source package
292 in the target distribution (e.g. "testing"). This is the
293 data structure in source_suite.sources[source_name]
295 :param source_data_srcdist: Information about the source
296 package in the source distribution (e.g. "unstable" or "tpu").
297 This is the data structure in target_suite.sources[source_name]
299 :return: A Policy Verdict (e.g. PolicyVerdict.PASS)
300 """
301 # if the policy doesn't implement this function, assume it's OK
302 return PolicyVerdict.NOT_APPLICABLE
305class AbstractBasePolicy(BasePolicy):
306 """
307 A shared abstract class for building BasePolicy objects.
309 tests/test_policy.py:initialize_policy() needs to be able to build BasePolicy
310 objects with just a two-item constructor, while all other uses of BasePolicy-
311 derived objects need the 5-item constructor. So AbstractBasePolicy was split
312 out to document this.
313 """
315 def __init__(
316 self,
317 policy_id: str,
318 options: optparse.Values,
319 suite_info: Suites,
320 applicable_suites: set[SuiteClass],
321 src_policy: ApplySrcPolicy = ApplySrcPolicy.RUN_SRC,
322 ) -> None:
323 """Concrete initializer.
325 :param policy_id: Identifies the policy. It will
326 determine the key used for the excuses.yaml etc.
328 :param options: The options member of Britney with all the
329 config values.
331 :param applicable_suites: Where this policy applies.
332 """
333 self.policy_id = policy_id
334 self.options = options
335 self.suite_info = suite_info
336 self.applicable_suites = applicable_suites
337 self.src_policy = src_policy
338 self.hints: HintCollection | None = None
339 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
340 self.logger = logging.getLogger(logger_name)
342 @property
343 def state_dir(self) -> str:
344 return cast(str, self.options.state_dir)
347_T = TypeVar("_T")
350class SimplePolicyHint(Hint, Generic[_T]):
351 def __init__(
352 self,
353 user: str,
354 hint_type: HintType,
355 policy_parameter: _T,
356 packages: list[MigrationItem],
357 ) -> None:
358 super().__init__(user, hint_type, packages)
359 self._policy_parameter = policy_parameter
361 def __eq__(self, other: Any) -> bool:
362 if self.type != other.type or self._policy_parameter != other._policy_parameter:
363 return False
364 return super().__eq__(other)
366 def str(self) -> str:
367 return "{} {} {}".format(
368 self._type,
369 str(self._policy_parameter),
370 " ".join(x.name for x in self._packages),
371 )
374class AgeDayHint(SimplePolicyHint[int]):
375 @property
376 def days(self) -> int:
377 return self._policy_parameter
380class IgnoreRCBugHint(SimplePolicyHint[frozenset[str]]):
381 @property
382 def ignored_rcbugs(self) -> frozenset[str]:
383 return self._policy_parameter
386def simple_policy_hint_parser_function(
387 class_name: Callable[[str, HintType, _T, list[MigrationItem]], Hint],
388 converter: Callable[[str], _T],
389) -> PolicyHintParserProto:
390 def f(
391 mi_factory: MigrationItemFactory,
392 hints: HintCollection,
393 who: str,
394 hint_type: HintType,
395 *args: str,
396 ) -> None:
397 policy_parameter = args[0]
398 args = args[1:]
399 for item in mi_factory.parse_items(*args):
400 hints.add_hint(
401 class_name(who, hint_type, converter(policy_parameter), [item])
402 )
404 return f
407class AgePolicy(AbstractBasePolicy):
408 """Configurable Aging policy for source migrations
410 The AgePolicy will let packages stay in the source suite for a pre-defined
411 amount of days before letting migrate (based on their urgency, if any).
413 The AgePolicy's decision is influenced by the following:
415 State files:
416 * ${STATE_DIR}/age-policy-urgencies: File containing urgencies for source
417 packages. Note that urgencies are "sticky" and the most "urgent" urgency
418 will be used (i.e. the one with lowest age-requirements).
419 - This file needs to be updated externally, if the policy should take
420 urgencies into consideration. If empty (or not updated), the policy
421 will simply use the default urgency (see the "Config" section below)
422 - In Debian, these values are taken from the .changes file, but that is
423 not a requirement for Britney.
424 * ${STATE_DIR}/age-policy-dates: File containing the age of all source
425 packages.
426 - The policy will automatically update this file.
427 Config:
428 * DEFAULT_URGENCY: Name of the urgency used for packages without an urgency
429 (or for unknown urgencies). Will also be used to set the "minimum"
430 aging requirements for packages not in the target suite.
431 * MINDAYS_<URGENCY>: The age-requirements in days for packages with the
432 given urgency.
433 - Commonly used urgencies are: low, medium, high, emergency, critical
434 Hints:
435 * urgent <source>/<version>: Disregard the age requirements for a given
436 source/version.
437 * age-days X <source>/<version>: Set the age requirements for a given
438 source/version to X days. Note that X can exceed the highest
439 age-requirement normally given.
441 """
443 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
444 super().__init__("age", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE})
445 self._min_days = self._generate_mindays_table()
446 self._min_days_default = 0
447 # britney's "day" begins at 7pm (we want aging to occur in the 22:00Z run and we run Britney 2-4 times a day)
448 # NB: _date_now is used in tests
449 time_now = time.time()
450 if hasattr(self.options, "fake_runtime"):
451 time_now = int(self.options.fake_runtime)
452 self.logger.info("overriding runtime with fake_runtime %d", time_now)
454 self._date_now = int(((time_now / (60 * 60)) - 19) / 24)
455 self._dates: dict[str, tuple[str, int]] = {}
456 self._urgencies: dict[str, str] = {}
457 self._default_urgency: str = self.options.default_urgency
458 self._penalty_immune_urgencies: frozenset[str] = frozenset()
459 if hasattr(self.options, "no_penalties"):
460 self._penalty_immune_urgencies = frozenset(
461 x.strip() for x in self.options.no_penalties.split()
462 )
463 self._bounty_min_age: int | None = None # initialised later
465 def _generate_mindays_table(self) -> dict[str, int]:
466 mindays: dict[str, int] = {}
467 for k in dir(self.options):
468 if not k.startswith("mindays_"):
469 continue
470 v = getattr(self.options, k)
471 try:
472 as_days = int(v)
473 except ValueError:
474 raise ValueError(
475 "Unable to parse "
476 + k
477 + " as a number of days. Must be 0 or a positive integer"
478 )
479 if as_days < 0: 479 ↛ 480line 479 didn't jump to line 480 because the condition on line 479 was never true
480 raise ValueError(
481 "The value of " + k + " must be zero or a positive integer"
482 )
483 mindays[k.split("_")[1]] = as_days
484 return mindays
486 def register_hints(self, hint_parser: HintParser) -> None:
487 hint_parser.register_hint_type(
488 HintType(
489 "age-days",
490 simple_policy_hint_parser_function(AgeDayHint, int),
491 min_args=2,
492 )
493 )
494 hint_parser.register_hint_type(HintType("urgent"))
496 def initialise(self, britney: "Britney") -> None:
497 super().initialise(britney)
498 self._read_dates_file()
499 self._read_urgencies_file()
500 if self._default_urgency not in self._min_days: # pragma: no cover
501 raise ValueError(
502 "Missing age-requirement for default urgency (MINDAYS_%s)"
503 % self._default_urgency
504 )
505 self._min_days_default = self._min_days[self._default_urgency]
506 try:
507 self._bounty_min_age = int(self.options.bounty_min_age)
508 except ValueError: 508 ↛ 509line 508 didn't jump to line 509 because the exception caught by line 508 didn't happen
509 if self.options.bounty_min_age in self._min_days:
510 self._bounty_min_age = self._min_days[self.options.bounty_min_age]
511 else: # pragma: no cover
512 raise ValueError(
513 "Please fix BOUNTY_MIN_AGE in the britney configuration"
514 )
515 except AttributeError:
516 # The option wasn't defined in the configuration
517 self._bounty_min_age = 0
519 def save_state(self, britney: "Britney") -> None:
520 super().save_state(britney)
521 self._write_dates_file()
523 def apply_src_policy_impl(
524 self,
525 age_info: dict[str, Any],
526 source_data_tdist: SourcePackage | None,
527 source_data_srcdist: SourcePackage,
528 excuse: "Excuse",
529 ) -> PolicyVerdict:
530 # retrieve the urgency for the upload, ignoring it if this is a NEW package
531 # (not present in the target suite)
532 source_name = excuse.item.package
533 urgency = self._urgencies.get(source_name, self._default_urgency)
535 if urgency not in self._min_days: 535 ↛ 536line 535 didn't jump to line 536 because the condition on line 535 was never true
536 age_info["unknown-urgency"] = urgency
537 urgency = self._default_urgency
539 if not source_data_tdist:
540 if self._min_days[urgency] < self._min_days_default:
541 age_info["urgency-reduced"] = {
542 "from": urgency,
543 "to": self._default_urgency,
544 }
545 urgency = self._default_urgency
547 if source_name not in self._dates:
548 self._dates[source_name] = (source_data_srcdist.version, self._date_now)
549 elif self._dates[source_name][0] != source_data_srcdist.version:
550 self._dates[source_name] = (source_data_srcdist.version, self._date_now)
552 days_old = self._date_now - self._dates[source_name][1]
553 min_days = self._min_days[urgency]
554 for bounty, bounty_value in excuse.bounty.items():
555 if bounty_value: 555 ↛ 554line 555 didn't jump to line 554 because the condition on line 555 was always true
556 self.logger.info(
557 "Applying bounty for %s granted by %s: %d days",
558 source_name,
559 bounty,
560 bounty_value,
561 )
562 excuse.addinfo(
563 "Required age reduced by %d days because of %s"
564 % (bounty_value, bounty)
565 )
566 assert bounty_value > 0, "negative bounties shouldn't happen"
567 min_days -= bounty_value
568 if urgency not in self._penalty_immune_urgencies:
569 for penalty, penalty_value in excuse.penalty.items():
570 if penalty_value: 570 ↛ 569line 570 didn't jump to line 569 because the condition on line 570 was always true
571 self.logger.info(
572 "Applying penalty for %s given by %s: %d days",
573 source_name,
574 penalty,
575 penalty_value,
576 )
577 excuse.addinfo(
578 "Required age increased by %d days because of %s"
579 % (penalty_value, penalty)
580 )
581 assert (
582 penalty_value > 0
583 ), "negative penalties should be handled earlier"
584 min_days += penalty_value
586 assert self._bounty_min_age is not None
587 # the age in BOUNTY_MIN_AGE can be higher than the one associated with
588 # the real urgency, so don't forget to take it into account
589 bounty_min_age = min(self._bounty_min_age, self._min_days[urgency])
590 if min_days < bounty_min_age:
591 min_days = bounty_min_age
592 excuse.addinfo(
593 "Required age is not allowed to drop below %d days" % min_days
594 )
596 age_info["current-age"] = days_old
598 assert self.hints is not None
599 for hint in self.hints.search(
600 "age-days", package=source_name, version=source_data_srcdist.version
601 ):
602 age_days_hint = cast("AgeDayHint", hint)
604 new_req = age_days_hint.days
605 age_info["age-requirement-reduced"] = {
606 "new-requirement": new_req,
607 "changed-by": age_days_hint.user,
608 }
609 if "original-age-requirement" not in age_info: 609 ↛ 611line 609 didn't jump to line 611 because the condition on line 609 was always true
610 age_info["original-age-requirement"] = min_days
611 min_days = new_req
613 age_info["age-requirement"] = min_days
614 res = PolicyVerdict.PASS
616 if days_old < min_days:
617 if (
618 urgent_hint := self.hints.search_first(
619 "urgent", package=source_name, version=source_data_srcdist.version
620 )
621 ) is not None:
622 age_info["age-requirement-reduced"] = {
623 "new-requirement": 0,
624 "changed-by": urgent_hint.user,
625 }
626 res = PolicyVerdict.PASS_HINTED
627 else:
628 res = PolicyVerdict.REJECTED_TEMPORARILY
630 # update excuse
631 age_hint = age_info.get("age-requirement-reduced", None)
632 age_min_req = age_info["age-requirement"]
633 if age_hint:
634 new_req = age_hint["new-requirement"]
635 who = age_hint["changed-by"]
636 if new_req:
637 excuse.addinfo(
638 "Overriding age needed from %d days to %d by %s"
639 % (age_min_req, new_req, who)
640 )
641 age_min_req = new_req
642 else:
643 excuse.addinfo("Too young, but urgency pushed by %s" % who)
644 age_min_req = 0
645 excuse.setdaysold(age_info["current-age"], age_min_req)
647 if age_min_req == 0:
648 excuse.addinfo("%d days old" % days_old)
649 elif days_old < age_min_req:
650 excuse.add_verdict_info(
651 res, "Too young, only %d of %d days old" % (days_old, age_min_req)
652 )
653 else:
654 excuse.addinfo("%d days old (needed %d days)" % (days_old, age_min_req))
656 return res
658 def _read_dates_file(self) -> None:
659 """Parse the dates file"""
660 dates = self._dates
661 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Dates")
662 using_new_name = False
663 try:
664 filename = os.path.join(self.state_dir, "age-policy-dates")
665 if not os.path.exists(filename) and os.path.exists(fallback_filename): 665 ↛ 666line 665 didn't jump to line 666 because the condition on line 665 was never true
666 filename = fallback_filename
667 else:
668 using_new_name = True
669 except AttributeError:
670 if os.path.exists(fallback_filename):
671 filename = fallback_filename
672 else:
673 raise RuntimeError("Please set STATE_DIR in the britney configuration")
675 try:
676 with open(filename, encoding="utf-8") as fd:
677 for line in fd:
678 if line.startswith("#"):
679 # Ignore comment lines (mostly used for tests)
680 continue
681 # <source> <version> <date>)
682 ln = line.split()
683 if len(ln) != 3: # pragma: no cover
684 continue
685 try:
686 dates[ln[0]] = (ln[1], int(ln[2]))
687 except ValueError: # pragma: no cover
688 pass
689 except FileNotFoundError:
690 if not using_new_name: 690 ↛ 692line 690 didn't jump to line 692 because the condition on line 690 was never true
691 # If we using the legacy name, then just give up
692 raise
693 self.logger.info("%s does not appear to exist. Creating it", filename)
694 with open(filename, mode="x", encoding="utf-8"):
695 pass
697 def _read_urgencies_file(self) -> None:
698 urgencies = self._urgencies
699 min_days_default = self._min_days_default
700 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Urgency")
701 try:
702 filename = os.path.join(self.state_dir, "age-policy-urgencies")
703 if not os.path.exists(filename) and os.path.exists(fallback_filename): 703 ↛ 704line 703 didn't jump to line 704 because the condition on line 703 was never true
704 filename = fallback_filename
705 except AttributeError:
706 filename = fallback_filename
708 sources_s = self.suite_info.primary_source_suite.sources
709 sources_t = self.suite_info.target_suite.sources
711 with open(filename, errors="surrogateescape", encoding="ascii") as fd:
712 for line in fd:
713 if line.startswith("#"):
714 # Ignore comment lines (mostly used for tests)
715 continue
716 # <source> <version> <urgency>
717 ln = line.split()
718 if len(ln) != 3: 718 ↛ 719line 718 didn't jump to line 719 because the condition on line 718 was never true
719 continue
721 # read the minimum days associated with the urgencies
722 urgency_old = urgencies.get(ln[0], None)
723 mindays_old = self._min_days.get(urgency_old, 1000) # type: ignore[arg-type]
724 mindays_new = self._min_days.get(ln[2], min_days_default)
726 # if the new urgency is lower (so the min days are higher), do nothing
727 if mindays_old <= mindays_new:
728 continue
730 # if the package exists in the target suite and it is more recent, do nothing
731 tsrcv = sources_t.get(ln[0], None)
732 if tsrcv and apt_pkg.version_compare(tsrcv.version, ln[1]) >= 0:
733 continue
735 # if the package doesn't exist in the primary source suite or it is older, do nothing
736 usrcv = sources_s.get(ln[0], None)
737 if not usrcv or apt_pkg.version_compare(usrcv.version, ln[1]) < 0: 737 ↛ 738line 737 didn't jump to line 738 because the condition on line 737 was never true
738 continue
740 # update the urgency for the package
741 urgencies[ln[0]] = ln[2]
743 def _write_dates_file(self) -> None:
744 dates = self._dates
745 try:
746 directory = self.state_dir
747 basename = "age-policy-dates"
748 old_file = os.path.join(self.suite_info.target_suite.path, "Dates")
749 except AttributeError:
750 directory = self.suite_info.target_suite.path
751 basename = "Dates"
752 old_file = None
753 filename = os.path.join(directory, basename)
754 filename_tmp = os.path.join(directory, f"{basename}_new")
755 with open(filename_tmp, "w", encoding="utf-8") as fd:
756 for pkg in sorted(dates):
757 version, date = dates[pkg]
758 fd.write("%s %s %d\n" % (pkg, version, date))
759 os.rename(filename_tmp, filename)
760 if old_file is not None and os.path.exists(old_file): 760 ↛ 761line 760 didn't jump to line 761 because the condition on line 760 was never true
761 self.logger.info("Removing old age-policy-dates file %s", old_file)
762 os.unlink(old_file)
765class RCBugPolicy(AbstractBasePolicy):
766 """RC bug regression policy for source migrations
768 The RCBugPolicy will read provided list of RC bugs and block any
769 source upload that would introduce a *new* RC bug in the target
770 suite.
772 The RCBugPolicy's decision is influenced by the following:
774 State files:
775 * ${STATE_DIR}/rc-bugs-${SUITE_NAME}: File containing RC bugs for packages in
776 the given suite (one for both primary source suite and the target sutie is
777 needed).
778 - These files need to be updated externally.
779 """
781 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
782 super().__init__(
783 "rc-bugs", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
784 )
785 self._bugs_source: dict[str, set[str]] | None = None
786 self._bugs_target: dict[str, set[str]] | None = None
788 def register_hints(self, hint_parser: HintParser) -> None:
789 f = simple_policy_hint_parser_function(
790 IgnoreRCBugHint, lambda x: frozenset(x.split(","))
791 )
792 hint_parser.register_hint_type(HintType("ignore-rc-bugs", f, min_args=2))
794 def initialise(self, britney: "Britney") -> None:
795 super().initialise(britney)
796 source_suite = self.suite_info.primary_source_suite
797 target_suite = self.suite_info.target_suite
798 fallback_unstable = os.path.join(source_suite.path, "BugsV")
799 fallback_testing = os.path.join(target_suite.path, "BugsV")
800 try:
801 filename_unstable = os.path.join(
802 self.state_dir, f"rc-bugs-{source_suite.name}"
803 )
804 filename_testing = os.path.join(
805 self.state_dir, f"rc-bugs-{target_suite.name}"
806 )
807 if ( 807 ↛ 813line 807 didn't jump to line 813
808 not os.path.exists(filename_unstable)
809 and not os.path.exists(filename_testing)
810 and os.path.exists(fallback_unstable)
811 and os.path.exists(fallback_testing)
812 ):
813 filename_unstable = fallback_unstable
814 filename_testing = fallback_testing
815 except AttributeError:
816 filename_unstable = fallback_unstable
817 filename_testing = fallback_testing
818 self._bugs_source = self._read_bugs(filename_unstable)
819 self._bugs_target = self._read_bugs(filename_testing)
821 def apply_src_policy_impl(
822 self,
823 rcbugs_info: dict[str, Any],
824 source_data_tdist: SourcePackage | None,
825 source_data_srcdist: SourcePackage,
826 excuse: "Excuse",
827 ) -> PolicyVerdict:
828 assert self._bugs_source is not None # for type checking
829 assert self._bugs_target is not None # for type checking
830 bugs_t = set()
831 bugs_s = set()
832 source_name = excuse.item.package
833 binaries_s = {x.package_name for x in source_data_srcdist.binaries}
834 try:
835 binaries_t = {x.package_name for x in source_data_tdist.binaries} # type: ignore[union-attr]
836 except AttributeError:
837 binaries_t = set()
839 src_key = f"src:{source_name}"
840 if source_data_tdist and src_key in self._bugs_target:
841 bugs_t.update(self._bugs_target[src_key])
842 if src_key in self._bugs_source:
843 bugs_s.update(self._bugs_source[src_key])
845 for pkg in binaries_s:
846 if pkg in self._bugs_source:
847 bugs_s |= self._bugs_source[pkg]
848 for pkg in binaries_t:
849 if pkg in self._bugs_target:
850 bugs_t |= self._bugs_target[pkg]
852 # The bts seems to support filing source bugs against a binary of the
853 # same name if that binary isn't built by any source. An example is bug
854 # 820347 against Package: juce (in the live-2016-04-11 test). Add those
855 # bugs too.
856 if (
857 source_name not in (binaries_s | binaries_t)
858 and source_name
859 not in {
860 x.package_name
861 for x in self.suite_info.primary_source_suite.all_binaries_in_suite.keys()
862 }
863 and source_name
864 not in {
865 x.package_name
866 for x in self.suite_info.target_suite.all_binaries_in_suite.keys()
867 }
868 ):
869 if source_name in self._bugs_source:
870 bugs_s |= self._bugs_source[source_name]
871 if source_name in self._bugs_target: 871 ↛ 872line 871 didn't jump to line 872 because the condition on line 871 was never true
872 bugs_t |= self._bugs_target[source_name]
874 # If a package is not in the target suite, it has no RC bugs per
875 # definition. Unfortunately, it seems that the live-data is
876 # not always accurate (e.g. live-2011-12-13 suggests that
877 # obdgpslogger had the same bug in testing and unstable,
878 # but obdgpslogger was not in testing at that time).
879 # - For the curious, obdgpslogger was removed on that day
880 # and the BTS probably had not caught up with that fact.
881 # (https://tracker.debian.org/news/415935)
882 assert not bugs_t or source_data_tdist, (
883 "%s had bugs in the target suite but is not present" % source_name
884 )
886 verdict = PolicyVerdict.PASS
888 assert self.hints is not None
889 for hint in self.hints.search(
890 "ignore-rc-bugs",
891 package=source_name,
892 version=source_data_srcdist.version,
893 ):
894 ignore_hint = cast(IgnoreRCBugHint, hint)
895 ignored_bugs = ignore_hint.ignored_rcbugs
897 # Only handle one hint for now
898 if "ignored-bugs" in rcbugs_info:
899 self.logger.info(
900 "Ignoring ignore-rc-bugs hint from %s on %s due to another hint from %s",
901 ignore_hint.user,
902 source_name,
903 rcbugs_info["ignored-bugs"]["issued-by"],
904 )
905 continue
906 if not ignored_bugs.isdisjoint(bugs_s): 906 ↛ 915line 906 didn't jump to line 915 because the condition on line 906 was always true
907 bugs_s -= ignored_bugs
908 bugs_t -= ignored_bugs
909 rcbugs_info["ignored-bugs"] = {
910 "bugs": sorted(ignored_bugs),
911 "issued-by": ignore_hint.user,
912 }
913 verdict = PolicyVerdict.PASS_HINTED
914 else:
915 self.logger.info(
916 "Ignoring ignore-rc-bugs hint from %s on %s as none of %s affect the package",
917 ignore_hint.user,
918 source_name,
919 str(ignored_bugs),
920 )
922 rcbugs_info["shared-bugs"] = sorted(bugs_s & bugs_t)
923 rcbugs_info["unique-source-bugs"] = sorted(bugs_s - bugs_t)
924 rcbugs_info["unique-target-bugs"] = sorted(bugs_t - bugs_s)
926 # update excuse
927 new_bugs = rcbugs_info["unique-source-bugs"]
928 old_bugs = rcbugs_info["unique-target-bugs"]
929 excuse.setbugs(old_bugs, new_bugs)
931 if new_bugs:
932 verdict = PolicyVerdict.REJECTED_PERMANENTLY
933 excuse.add_verdict_info(
934 verdict,
935 "Updating %s would introduce bugs in %s: %s"
936 % (
937 source_name,
938 self.suite_info.target_suite.name,
939 ", ".join(
940 f'<a href="https://bugs.debian.org/{quote(a)}">#{a}</a>'
941 for a in new_bugs
942 ),
943 ),
944 )
946 if old_bugs:
947 excuse.addinfo(
948 "Updating %s will fix bugs in %s: %s"
949 % (
950 source_name,
951 self.suite_info.target_suite.name,
952 ", ".join(
953 f'<a href="https://bugs.debian.org/{quote(a)}">#{a}</a>'
954 for a in old_bugs
955 ),
956 )
957 )
959 return verdict
961 def _read_bugs(self, filename: str) -> dict[str, set[str]]:
962 """Read the release critical bug summary from the specified file
964 The file contains rows with the format:
966 <package-name> <bug number>[,<bug number>...]
968 The method returns a dictionary where the key is the binary package
969 name and the value is the list of open RC bugs for it.
970 """
971 bugs: dict[str, set[str]] = {}
972 self.logger.info("Loading RC bugs data from %s", filename)
973 with open(filename, encoding="ascii") as f:
974 for line in f:
975 ln = line.split()
976 if len(ln) != 2: # pragma: no cover
977 self.logger.warning("Malformed line found in line %s", line)
978 continue
979 pkg = ln[0]
980 if pkg not in bugs:
981 bugs[pkg] = set()
982 bugs[pkg].update(ln[1].split(","))
983 return bugs
986class PiupartsState(Enum):
987 FAIL = auto()
988 PASS = auto()
989 WAITING = auto()
990 UNKNOWN = auto()
992 @staticmethod
993 def from_str(val: str) -> "PiupartsState":
994 match val:
995 case "F":
996 return PiupartsState.FAIL
997 case "P":
998 return PiupartsState.PASS
999 case "W": 999 ↛ 1001line 999 didn't jump to line 1001 because the pattern on line 999 always matched
1000 return PiupartsState.WAITING
1001 case "X":
1002 return PiupartsState.UNKNOWN
1003 case _:
1004 raise ValueError(f"Invalid piuparts state {val}")
1007class PiupartsResult(StrEnum):
1008 PASS = "pass"
1009 REGRESSION = "regression"
1010 FAILED = "failed"
1011 WAITING_FOR_TESTS = "waiting-for-test-results"
1012 CANNOT_BE_TESTED = "cannot-be-tested"
1015class PiupartsPolicy(AbstractBasePolicy):
1016 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1017 super().__init__(
1018 "piuparts", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
1019 )
1020 self._piuparts_source: dict[str, tuple[PiupartsState, str]] | None = None
1021 self._piuparts_target: dict[str, PiupartsState] | None = None
1023 def register_hints(self, hint_parser: HintParser) -> None:
1024 hint_parser.register_hint_type(HintType("ignore-piuparts"))
1026 def initialise(self, britney: "Britney") -> None:
1027 super().initialise(britney)
1028 source_suite = self.suite_info.primary_source_suite
1029 target_suite = self.suite_info.target_suite
1030 try:
1031 filename_unstable = os.path.join(
1032 self.state_dir, f"piuparts-summary-{source_suite.name}.json"
1033 )
1034 filename_testing = os.path.join(
1035 self.state_dir, f"piuparts-summary-{target_suite.name}.json"
1036 )
1037 except AttributeError as e: # pragma: no cover
1038 raise RuntimeError(
1039 "Please set STATE_DIR in the britney configuration"
1040 ) from e
1041 self._piuparts_source = self._read_piuparts_summary(filename_unstable)
1042 self._piuparts_target = self._read_piuparts_summary_without_url(
1043 filename_testing
1044 )
1046 def apply_src_policy_impl(
1047 self,
1048 piuparts_info: dict[str, Any],
1049 source_data_tdist: SourcePackage | None,
1050 source_data_srcdist: SourcePackage,
1051 excuse: "Excuse",
1052 ) -> PolicyVerdict:
1053 assert self._piuparts_source is not None # for type checking
1054 assert self._piuparts_target is not None # for type checking
1055 source_name = excuse.item.package
1057 if source_name in self._piuparts_target:
1058 testing_state = self._piuparts_target[source_name]
1059 else:
1060 testing_state = PiupartsState.UNKNOWN
1061 url: str | None
1062 if source_name in self._piuparts_source:
1063 unstable_state, url = self._piuparts_source[source_name]
1064 else:
1065 unstable_state = PiupartsState.UNKNOWN
1066 url = None
1067 url_html = "(no link yet)"
1068 if url is not None:
1069 url_html = '<a href="{0}">{0}</a>'.format(url)
1071 match unstable_state:
1072 case PiupartsState.PASS:
1073 # Not a regression
1074 msg = f"Piuparts tested OK - {url_html}"
1075 result = PolicyVerdict.PASS
1076 piuparts_info["test-results"] = PiupartsResult.PASS
1077 case PiupartsState.FAIL if testing_state is not PiupartsState.FAIL:
1078 piuparts_info["test-results"] = PiupartsResult.REGRESSION
1079 msg = f"Piuparts regression - {url_html}"
1080 result = PolicyVerdict.REJECTED_PERMANENTLY
1081 case PiupartsState.FAIL:
1082 piuparts_info["test-results"] = PiupartsResult.FAILED
1083 msg = f"Piuparts failure (not a regression) - {url_html}"
1084 result = PolicyVerdict.PASS
1085 case PiupartsState.WAITING:
1086 msg = f"Piuparts check waiting for test results - {url_html}"
1087 result = PolicyVerdict.REJECTED_TEMPORARILY
1088 piuparts_info["test-results"] = PiupartsResult.WAITING_FOR_TESTS
1089 case _:
1090 msg = f"Piuparts can't test {source_name} (not a blocker) - {url_html}"
1091 piuparts_info["test-results"] = PiupartsResult.CANNOT_BE_TESTED
1092 result = PolicyVerdict.PASS
1094 if url is not None:
1095 piuparts_info["piuparts-test-url"] = url
1096 if result.is_rejected:
1097 excuse.add_verdict_info(result, msg)
1098 else:
1099 excuse.addinfo(msg)
1101 if result.is_rejected:
1102 assert self.hints is not None
1103 if (
1104 ignore_hint := self.hints.search_first(
1105 "ignore-piuparts",
1106 package=source_name,
1107 version=source_data_srcdist.version,
1108 )
1109 ) is not None:
1110 piuparts_info["ignored-piuparts"] = {"issued-by": ignore_hint.user}
1111 result = PolicyVerdict.PASS_HINTED
1112 excuse.addinfo(
1113 f"Piuparts issue ignored as requested by {ignore_hint.user}"
1114 )
1116 return result
1118 def _read_piuparts_summary_gen(
1119 self, filename: str
1120 ) -> Generator[tuple[str, PiupartsState, str], None, None]:
1121 self.logger.info("Loading piuparts report from %s", filename)
1122 with open(filename) as fd: 1122 ↛ exitline 1122 didn't return from function '_read_piuparts_summary_gen' because the return on line 1124 wasn't executed
1123 if os.fstat(fd.fileno()).st_size < 1: 1123 ↛ 1124line 1123 didn't jump to line 1124 because the condition on line 1123 was never true
1124 return
1125 data = json.load(fd)
1126 try:
1127 if (
1128 data["_id"] != "Piuparts Package Test Results Summary"
1129 or data["_version"] != "1.0"
1130 ): # pragma: no cover
1131 raise ValueError(
1132 f"Piuparts results in {filename} does not have the correct ID or version"
1133 )
1134 except KeyError as e: # pragma: no cover
1135 raise ValueError(
1136 f"Piuparts results in {filename} is missing id or version field"
1137 ) from e
1138 for source, suite_data in data["packages"].items():
1139 if len(suite_data) != 1: # pragma: no cover
1140 raise ValueError(
1141 f"Piuparts results in {filename}, the source {source} does not have "
1142 "exactly one result set"
1143 )
1144 item = next(iter(suite_data.values()))
1145 state, _, url = item
1146 yield (source, PiupartsState.from_str(state), url)
1148 def _read_piuparts_summary(
1149 self, filename: str
1150 ) -> dict[str, tuple[PiupartsState, str]]:
1151 return {
1152 source: (state, url)
1153 for (source, state, url) in self._read_piuparts_summary_gen(filename)
1154 }
1156 def _read_piuparts_summary_without_url(
1157 self, filename: str
1158 ) -> dict[str, PiupartsState]:
1159 return {
1160 source: state
1161 for (source, state, _) in self._read_piuparts_summary_gen(filename)
1162 }
1165class DependsPolicy(AbstractBasePolicy):
1166 pkg_universe: "BinaryPackageUniverse"
1167 broken_packages: frozenset["BinaryPackageId"]
1168 all_binaries: dict["BinaryPackageId", "BinaryPackage"]
1169 allow_uninst: dict[str, set[str | None]]
1171 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1172 super().__init__(
1173 "depends",
1174 options,
1175 suite_info,
1176 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1177 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1178 )
1179 self.nobreakall_arches = None
1180 self.new_arches = None
1181 self.break_arches = None
1183 def initialise(self, britney: "Britney") -> None:
1184 super().initialise(britney)
1185 self.pkg_universe = britney.pkg_universe
1186 self.broken_packages = self.pkg_universe.broken_packages
1187 self.all_binaries = britney.all_binaries
1188 self.nobreakall_arches = self.options.nobreakall_arches
1189 self.new_arches = self.options.new_arches
1190 self.break_arches = self.options.break_arches
1191 self.allow_uninst = britney.allow_uninst
1193 def apply_srcarch_policy_impl(
1194 self,
1195 deps_info: dict[str, Any],
1196 arch: str,
1197 source_data_tdist: SourcePackage | None,
1198 source_data_srcdist: SourcePackage,
1199 excuse: "Excuse",
1200 ) -> PolicyVerdict:
1201 verdict = PolicyVerdict.PASS
1203 assert self.break_arches is not None
1204 assert self.new_arches is not None
1205 if arch in self.break_arches or arch in self.new_arches:
1206 # we don't check these in the policy (TODO - for now?)
1207 return verdict
1209 item = excuse.item
1210 source_suite = item.suite
1211 target_suite = self.suite_info.target_suite
1213 packages_s_a = source_suite.binaries[arch]
1214 packages_t_a = target_suite.binaries[arch]
1216 my_bins = sorted(filter_out_faux_gen(excuse.packages[arch]))
1218 arch_all_installable = set()
1219 arch_arch_installable = set()
1220 consider_it_regression = True
1222 for pkg_id in my_bins:
1223 pkg_name = pkg_id.package_name
1224 binary_u = packages_s_a[pkg_name]
1225 pkg_arch = binary_u.architecture
1227 # in some cases, we want to track the uninstallability of a
1228 # package (because the autopkgtest policy uses this), but we still
1229 # want to allow the package to be uninstallable
1230 skip_dep_check = False
1232 if binary_u.source_version != source_data_srcdist.version:
1233 # don't check cruft in unstable
1234 continue
1236 if item.architecture != "source" and pkg_arch == "all":
1237 # we don't care about the existing arch: all binaries when
1238 # checking a binNMU item, because the arch: all binaries won't
1239 # migrate anyway
1240 skip_dep_check = True
1242 if pkg_arch == "all" and arch not in self.nobreakall_arches:
1243 skip_dep_check = True
1245 if pkg_name in self.allow_uninst[arch]: 1245 ↛ 1248line 1245 didn't jump to line 1248 because the condition on line 1245 was never true
1246 # this binary is allowed to become uninstallable, so we don't
1247 # need to check anything
1248 skip_dep_check = True
1250 if pkg_name in packages_t_a:
1251 oldbin = packages_t_a[pkg_name]
1252 if not target_suite.is_installable(oldbin.pkg_id):
1253 # as the current binary in testing is already
1254 # uninstallable, the newer version is allowed to be
1255 # uninstallable as well, so we don't need to check
1256 # anything
1257 skip_dep_check = True
1258 consider_it_regression = False
1260 if pkg_id in self.broken_packages:
1261 if pkg_arch == "all":
1262 arch_all_installable.add(False)
1263 else:
1264 arch_arch_installable.add(False)
1265 # dependencies can't be satisfied by all the known binaries -
1266 # this certainly won't work...
1267 excuse.add_unsatisfiable_on_arch(arch)
1268 if skip_dep_check:
1269 # ...but if the binary is allowed to become uninstallable,
1270 # we don't care
1271 # we still want the binary to be listed as uninstallable,
1272 continue
1273 verdict = PolicyVerdict.REJECTED_PERMANENTLY
1274 if pkg_name.endswith("-faux-build-depends"): 1274 ↛ 1275line 1274 didn't jump to line 1275 because the condition on line 1274 was never true
1275 name = pkg_name.removesuffix("-faux-build-depends")
1276 excuse.add_verdict_info(
1277 verdict,
1278 f"src:{name} has unsatisfiable build dependency",
1279 )
1280 else:
1281 excuse.add_verdict_info(
1282 verdict, f"{pkg_name}/{arch} has unsatisfiable dependency"
1283 )
1284 excuse.addreason("depends")
1285 else:
1286 if pkg_arch == "all":
1287 arch_all_installable.add(True)
1288 else:
1289 arch_arch_installable.add(True)
1291 if skip_dep_check:
1292 continue
1294 deps = self.pkg_universe.dependencies_of(pkg_id)
1296 for dep in deps:
1297 # dep is a list of packages, each of which satisfy the
1298 # dependency
1300 if not dep:
1301 continue
1302 is_ok = False
1303 needed_for_dep = set()
1305 for alternative in dep:
1306 if target_suite.is_pkg_in_the_suite(alternative):
1307 # dep can be satisfied in testing - ok
1308 is_ok = True
1309 elif alternative in my_bins:
1310 # can be satisfied by binary from same item: will be
1311 # ok if item migrates
1312 is_ok = True
1313 else:
1314 needed_for_dep.add(alternative)
1316 if not is_ok:
1317 spec = DependencySpec(DependencyType.DEPENDS, arch)
1318 excuse.add_package_depends(spec, needed_for_dep)
1320 # The autopkgtest policy needs delicate trade offs for
1321 # non-installability. The current choice (considering source
1322 # migration and only binaries built by the version of the
1323 # source):
1324 #
1325 # * Run autopkgtest if all arch:$arch binaries are installable
1326 # (but some or all arch:all binaries are not)
1327 #
1328 # * Don't schedule nor wait for not installable arch:all only package
1329 # on ! NOBREAKALL_ARCHES
1330 #
1331 # * Run autopkgtest if installability isn't a regression (there are (or
1332 # rather, should) not be a lot of packages in this state, and most
1333 # likely they'll just fail quickly)
1334 #
1335 # * Don't schedule, but wait otherwise
1336 if arch_arch_installable == {True} and False in arch_all_installable:
1337 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch)
1338 elif (
1339 arch not in self.nobreakall_arches
1340 and not arch_arch_installable
1341 and False in arch_all_installable
1342 ):
1343 deps_info.setdefault("arch_all_not_installable", []).append(arch)
1344 elif not consider_it_regression:
1345 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch)
1347 return verdict
1350@unique
1351class BuildDepResult(IntEnum):
1352 # relation is satisfied in target
1353 OK = 1
1354 # relation can be satisfied by other packages in source
1355 DEPENDS = 2
1356 # relation cannot be satisfied
1357 FAILED = 3
1360class BuildDependsPolicy(AbstractBasePolicy):
1362 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1363 super().__init__(
1364 "build-depends",
1365 options,
1366 suite_info,
1367 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1368 )
1369 self._all_buildarch: list[str] = []
1371 parse_option(options, "all_buildarch")
1373 def initialise(self, britney: "Britney") -> None:
1374 super().initialise(britney)
1375 if self.options.all_buildarch:
1376 self._all_buildarch = SuiteContentLoader.config_str_as_list(
1377 self.options.all_buildarch, []
1378 )
1380 def apply_src_policy_impl(
1381 self,
1382 build_deps_info: dict[str, Any],
1383 source_data_tdist: SourcePackage | None,
1384 source_data_srcdist: SourcePackage,
1385 excuse: "Excuse",
1386 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers,
1387 ) -> PolicyVerdict:
1388 verdict = PolicyVerdict.PASS
1390 # analyze the dependency fields (if present)
1391 if deps := source_data_srcdist.build_deps_arch:
1392 v = self._check_build_deps(
1393 deps,
1394 DependencyType.BUILD_DEPENDS,
1395 build_deps_info,
1396 source_data_tdist,
1397 source_data_srcdist,
1398 excuse,
1399 get_dependency_solvers=get_dependency_solvers,
1400 )
1401 verdict = PolicyVerdict.worst_of(verdict, v)
1403 if ideps := source_data_srcdist.build_deps_indep:
1404 v = self._check_build_deps(
1405 ideps,
1406 DependencyType.BUILD_DEPENDS_INDEP,
1407 build_deps_info,
1408 source_data_tdist,
1409 source_data_srcdist,
1410 excuse,
1411 get_dependency_solvers=get_dependency_solvers,
1412 )
1413 verdict = PolicyVerdict.worst_of(verdict, v)
1415 return verdict
1417 def _get_check_archs(
1418 self, archs: Container[str], dep_type: DependencyType
1419 ) -> list[str]:
1420 oos = self.options.outofsync_arches
1422 if dep_type is DependencyType.BUILD_DEPENDS:
1423 return [
1424 arch
1425 for arch in self.options.architectures
1426 if arch in archs and arch not in oos
1427 ]
1429 # first try the all buildarch
1430 checkarchs = list(self._all_buildarch)
1431 # then try the architectures where this source has arch specific
1432 # binaries (in the order of the architecture config file)
1433 checkarchs.extend(
1434 arch
1435 for arch in self.options.architectures
1436 if arch in archs and arch not in checkarchs
1437 )
1438 # then try all other architectures
1439 checkarchs.extend(
1440 arch for arch in self.options.architectures if arch not in checkarchs
1441 )
1443 # and drop OUTOFSYNC_ARCHES
1444 return [arch for arch in checkarchs if arch not in oos]
1446 def _add_info_for_arch(
1447 self,
1448 arch: str,
1449 excuses_info: dict[str, list[str]],
1450 blockers: dict[str, set[BinaryPackageId]],
1451 results: dict[str, BuildDepResult],
1452 dep_type: DependencyType,
1453 target_suite: TargetSuite,
1454 source_suite: Suite,
1455 excuse: "Excuse",
1456 verdict: PolicyVerdict,
1457 ) -> PolicyVerdict:
1458 if arch in blockers:
1459 packages = blockers[arch]
1461 # for the solving packages, update the excuse to add the dependencies
1462 for p in packages:
1463 if arch not in self.options.break_arches: 1463 ↛ 1462line 1463 didn't jump to line 1462 because the condition on line 1463 was always true
1464 spec = DependencySpec(dep_type, arch)
1465 excuse.add_package_depends(spec, {p})
1467 if arch in results and results[arch] is BuildDepResult.FAILED:
1468 verdict = PolicyVerdict.worst_of(
1469 verdict, PolicyVerdict.REJECTED_PERMANENTLY
1470 )
1472 if arch in excuses_info:
1473 for excuse_text in excuses_info[arch]:
1474 if verdict.is_rejected: 1474 ↛ 1477line 1474 didn't jump to line 1477 because the condition on line 1474 was always true
1475 excuse.add_verdict_info(verdict, excuse_text)
1476 else:
1477 excuse.addinfo(excuse_text)
1479 return verdict
1481 def _check_build_deps(
1482 self,
1483 deps: str,
1484 dep_type: DependencyType,
1485 build_deps_info: dict[str, Any],
1486 source_data_tdist: SourcePackage | None,
1487 source_data_srcdist: SourcePackage,
1488 excuse: "Excuse",
1489 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers,
1490 ) -> PolicyVerdict:
1491 verdict = PolicyVerdict.PASS
1492 any_arch_ok = dep_type is DependencyType.BUILD_DEPENDS_INDEP
1494 britney = self.britney
1496 # local copies for better performance
1497 parse_src_depends = apt_pkg.parse_src_depends
1499 source_name = excuse.item.package
1500 source_suite = excuse.item.suite
1501 target_suite = self.suite_info.target_suite
1502 binaries_s = source_suite.binaries
1503 provides_s = source_suite.provides_table
1504 binaries_t = target_suite.binaries
1505 provides_t = target_suite.provides_table
1506 unsat_bd: dict[str, list[str]] = {}
1507 relevant_archs: set[str] = {
1508 binary.architecture
1509 for binary in filter_out_faux_gen(source_data_srcdist.binaries)
1510 if britney.all_binaries[binary].architecture != "all"
1511 }
1513 excuses_info: dict[str, list[str]] = defaultdict(list)
1514 blockers: dict[str, set[BinaryPackageId]] = defaultdict(set)
1515 arch_results = {}
1516 result_archs = defaultdict(list)
1517 bestresult = BuildDepResult.FAILED
1518 check_archs = self._get_check_archs(relevant_archs, dep_type)
1519 if not check_archs:
1520 # when the arch list is empty, we check the b-d on any arch, instead of all archs
1521 # this happens for Build-Depens on a source package that only produces arch: all binaries
1522 any_arch_ok = True
1523 check_archs = self._get_check_archs(
1524 self.options.architectures, DependencyType.BUILD_DEPENDS_INDEP
1525 )
1527 for arch in check_archs:
1528 # retrieve the binary package from the specified suite and arch
1529 binaries_s_a = binaries_s[arch]
1530 provides_s_a = provides_s[arch]
1531 binaries_t_a = binaries_t[arch]
1532 provides_t_a = provides_t[arch]
1533 arch_results[arch] = BuildDepResult.OK
1534 # for every dependency block (formed as conjunction of disjunction)
1535 for block_txt in deps.split(","):
1536 block_list = parse_src_depends(block_txt, False, arch)
1537 # Unlike regular dependencies, some clauses of the Build-Depends(-Arch|-Indep) can be
1538 # filtered out by (e.g.) architecture restrictions. We need to cope with this while
1539 # keeping block_txt and block aligned.
1540 if not block_list:
1541 # Relation is not relevant for this architecture.
1542 continue
1543 block = block_list[0]
1544 # if the block is satisfied in the target suite, then skip the block
1545 if get_dependency_solvers(
1546 block, binaries_t_a, provides_t_a, build_depends=True
1547 ):
1548 # Satisfied in the target suite; all ok.
1549 continue
1551 # check if the block can be satisfied in the source suite, and list the solving packages
1552 packages = get_dependency_solvers(
1553 block, binaries_s_a, provides_s_a, build_depends=True
1554 )
1556 # if the dependency can be satisfied by the same source package, skip the block:
1557 # obviously both binary packages will enter the target suite together
1558 if any(source_name == p.source for p in packages): 1558 ↛ 1559line 1558 didn't jump to line 1559 because the condition on line 1558 was never true
1559 continue
1561 # if no package can satisfy the dependency, add this information to the excuse
1562 if not packages:
1563 excuses_info[arch].append(
1564 "%s unsatisfiable %s on %s: %s"
1565 % (source_name, dep_type, arch, block_txt.strip())
1566 )
1567 if arch not in unsat_bd: 1567 ↛ 1569line 1567 didn't jump to line 1569 because the condition on line 1567 was always true
1568 unsat_bd[arch] = []
1569 unsat_bd[arch].append(block_txt.strip())
1570 arch_results[arch] = BuildDepResult.FAILED
1571 continue
1573 blockers[arch].update(p.pkg_id for p in packages)
1574 if arch_results[arch] < BuildDepResult.DEPENDS:
1575 arch_results[arch] = BuildDepResult.DEPENDS
1577 if any_arch_ok:
1578 if arch_results[arch] < bestresult:
1579 bestresult = arch_results[arch]
1580 result_archs[arch_results[arch]].append(arch)
1581 if bestresult is BuildDepResult.OK:
1582 # we found an architecture where the b-deps-indep are
1583 # satisfied in the target suite, so we can stop
1584 break
1586 if any_arch_ok:
1587 arch = result_archs[bestresult][0]
1588 excuse.add_detailed_info(f"Checking {dep_type.get_description()} on {arch}")
1589 key = "check-%s-on-arch" % dep_type.get_reason()
1590 build_deps_info[key] = arch
1591 verdict = self._add_info_for_arch(
1592 arch,
1593 excuses_info,
1594 blockers,
1595 arch_results,
1596 dep_type,
1597 target_suite,
1598 source_suite,
1599 excuse,
1600 verdict,
1601 )
1603 else:
1604 for arch in check_archs:
1605 verdict = self._add_info_for_arch(
1606 arch,
1607 excuses_info,
1608 blockers,
1609 arch_results,
1610 dep_type,
1611 target_suite,
1612 source_suite,
1613 excuse,
1614 verdict,
1615 )
1617 if unsat_bd:
1618 build_deps_info["unsatisfiable-arch-build-depends"] = unsat_bd
1620 return verdict
1623class BuiltUsingPolicy(AbstractBasePolicy):
1624 """Built-Using policy
1626 Binaries that incorporate (part of) another source package must list these
1627 sources under 'Built-Using'.
1629 This policy checks if the corresponding sources are available in the
1630 target suite. If they are not, but they are candidates for migration, a
1631 dependency is added.
1633 If the binary incorporates a newer version of a source, that is not (yet)
1634 a candidate, we don't want to accept that binary. A rebuild later in the
1635 primary suite wouldn't fix the issue, because that would incorporate the
1636 newer version again.
1638 If the binary incorporates an older version of the source, a newer version
1639 will be accepted as a replacement. We assume that this can be fixed by
1640 rebuilding the binary at some point during the development cycle.
1642 Requiring exact version of the source would not be useful in practice. A
1643 newer upload of that source wouldn't be blocked by this policy, so the
1644 built-using would be outdated anyway.
1646 """
1648 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1649 super().__init__(
1650 "built-using",
1651 options,
1652 suite_info,
1653 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1654 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1655 )
1657 def initialise(self, britney: "Britney") -> None:
1658 super().initialise(britney)
1660 def apply_srcarch_policy_impl(
1661 self,
1662 build_deps_info: dict[str, Any],
1663 arch: str,
1664 source_data_tdist: SourcePackage | None,
1665 source_data_srcdist: SourcePackage,
1666 excuse: "Excuse",
1667 ) -> PolicyVerdict:
1668 verdict = PolicyVerdict.PASS
1670 source_suite = excuse.item.suite
1671 target_suite = self.suite_info.target_suite
1672 binaries_s = source_suite.binaries
1674 def check_bu_in_suite(
1675 bu_source: str, bu_version: str, source_suite: Suite
1676 ) -> bool:
1677 found = False
1678 if bu_source not in source_suite.sources:
1679 return found
1680 s_source = source_suite.sources[bu_source]
1681 s_ver = s_source.version
1682 if apt_pkg.version_compare(s_ver, bu_version) >= 0:
1683 found = True
1684 dep = PackageId(bu_source, s_ver, "source")
1685 if arch in self.options.break_arches:
1686 excuse.add_detailed_info(
1687 "Ignoring Built-Using for %s/%s on %s"
1688 % (pkg_name, arch, dep.uvname)
1689 )
1690 else:
1691 spec = DependencySpec(DependencyType.BUILT_USING, arch)
1692 excuse.add_package_depends(spec, {dep})
1693 excuse.add_detailed_info(
1694 f"{pkg_name}/{arch} has Built-Using on {dep.uvname}"
1695 )
1697 return found
1699 for pkg_id in sorted(
1700 x
1701 for x in filter_out_faux_gen(source_data_srcdist.binaries)
1702 if x.architecture == arch
1703 ):
1704 pkg_name = pkg_id.package_name
1706 # retrieve the testing (if present) and unstable corresponding binary packages
1707 binary_s = binaries_s[arch][pkg_name]
1708 if binary_s.builtusing is None:
1709 continue
1711 for bu in binary_s.builtusing:
1712 bu_source = bu[0]
1713 bu_version = bu[1]
1714 found = False
1715 if bu_source in target_suite.sources:
1716 t_source = target_suite.sources[bu_source]
1717 t_ver = t_source.version
1718 if apt_pkg.version_compare(t_ver, bu_version) >= 0:
1719 found = True
1721 if not found:
1722 found = check_bu_in_suite(bu_source, bu_version, source_suite)
1724 if not found and source_suite.suite_class.is_additional_source:
1725 found = check_bu_in_suite(
1726 bu_source, bu_version, self.suite_info.primary_source_suite
1727 )
1729 if not found:
1730 if arch in self.options.break_arches:
1731 excuse.add_detailed_info(
1732 "Ignoring unsatisfiable Built-Using for %s/%s on %s %s"
1733 % (pkg_name, arch, bu_source, bu_version)
1734 )
1735 else:
1736 verdict = PolicyVerdict.worst_of(
1737 verdict, PolicyVerdict.REJECTED_PERMANENTLY
1738 )
1739 excuse.add_verdict_info(
1740 verdict,
1741 "%s/%s has unsatisfiable Built-Using on %s %s"
1742 % (pkg_name, arch, bu_source, bu_version),
1743 )
1745 return verdict
1748class BlockPolicy(AbstractBasePolicy):
1749 BLOCK_HINT_REGEX = re.compile("^(un)?(block-?.*)$")
1751 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1752 super().__init__(
1753 "block",
1754 options,
1755 suite_info,
1756 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1757 )
1758 self._blockall: dict[str | None, Hint] = {}
1760 def initialise(self, britney: "Britney") -> None:
1761 super().initialise(britney)
1762 assert self.hints is not None
1763 for hint in self.hints.search(type="block-all"):
1764 self._blockall[hint.package] = hint
1766 self._key_packages: frozenset[str] = frozenset()
1767 if "key" in self._blockall:
1768 self._key_packages = self._read_key_packages()
1770 def _read_key_packages(self) -> frozenset[str]:
1771 """Read the list of key packages
1773 The file contains data in the yaml format :
1775 - reason: <something>
1776 source: <package>
1778 The method returns a list of all key packages.
1779 """
1780 filename = os.path.join(self.state_dir, "key_packages.yaml")
1781 self.logger.info("Loading key packages from %s", filename)
1782 if os.path.exists(filename): 1782 ↛ 1787line 1782 didn't jump to line 1787 because the condition on line 1782 was always true
1783 with open(filename) as f:
1784 data = yaml.safe_load(f)
1785 key_packages = frozenset(item["source"] for item in data)
1786 else:
1787 self.logger.error(
1788 "Britney was asked to block key packages, "
1789 + "but no key_packages.yaml file was found."
1790 )
1791 sys.exit(1)
1793 return key_packages
1795 def register_hints(self, hint_parser: HintParser) -> None:
1796 # block related hints are currently defined in hint.py
1797 pass
1799 def _check_blocked(
1800 self, arch: str, version: str, excuse: "Excuse"
1801 ) -> PolicyVerdict:
1802 verdict = PolicyVerdict.PASS
1803 blocked = {}
1804 unblocked = {}
1805 block_info = {}
1806 source_suite = excuse.item.suite
1807 suite_name = source_suite.name
1808 src = excuse.item.package
1809 is_primary = source_suite.suite_class is SuiteClass.PRIMARY_SOURCE_SUITE
1811 tooltip = (
1812 f"please contact {self.options.distribution}-release if update is needed"
1813 )
1815 assert self.hints is not None
1816 mismatches = False
1817 r = self.BLOCK_HINT_REGEX
1818 for hint in self.hints.search(package=src):
1819 m = r.match(hint.type)
1820 if m:
1821 if m.group(1) == "un":
1822 assert hint.suite is not None
1823 if (
1824 hint.version != version
1825 or hint.suite.name != suite_name
1826 or (hint.architecture != arch and hint.architecture != "source")
1827 ):
1828 self.logger.info(
1829 "hint mismatch: %s %s %s", version, arch, suite_name
1830 )
1831 mismatches = True
1832 else:
1833 unblocked[m.group(2)] = hint.user
1834 excuse.add_hint(hint)
1835 else:
1836 # block(-*) hint: only accepts a source, so this will
1837 # always match
1838 blocked[m.group(2)] = hint.user
1839 excuse.add_hint(hint)
1841 if "block" not in blocked and is_primary:
1842 # if there is a specific block hint for this package, we don't
1843 # check for the general hints
1845 if self.options.distribution == "debian": 1845 ↛ 1849line 1845 didn't jump to line 1849 because the condition on line 1845 was always true
1846 url = "https://release.debian.org/testing/freeze_policy.html"
1847 tooltip = f'Follow the <a href="{url}">freeze policy</a> when applying for an unblock'
1849 if "source" in self._blockall:
1850 blocked["block"] = self._blockall["source"].user
1851 excuse.add_hint(self._blockall["source"])
1852 elif (
1853 "new-source" in self._blockall
1854 and src not in self.suite_info.target_suite.sources
1855 ):
1856 blocked["block"] = self._blockall["new-source"].user
1857 excuse.add_hint(self._blockall["new-source"])
1858 # no tooltip: new sources will probably not be accepted anyway
1859 block_info["block"] = "blocked by {}: is not in {}".format(
1860 self._blockall["new-source"].user,
1861 self.suite_info.target_suite.name,
1862 )
1863 elif "key" in self._blockall and src in self._key_packages:
1864 blocked["block"] = self._blockall["key"].user
1865 excuse.add_hint(self._blockall["key"])
1866 block_info["block"] = "blocked by {}: is a key package ({})".format(
1867 self._blockall["key"].user,
1868 tooltip,
1869 )
1870 elif "no-autopkgtest" in self._blockall:
1871 if excuse.autopkgtest_results == {"PASS"}:
1872 if not blocked: 1872 ↛ 1898line 1872 didn't jump to line 1898 because the condition on line 1872 was always true
1873 excuse.addinfo("not blocked: has successful autopkgtest")
1874 else:
1875 blocked["block"] = self._blockall["no-autopkgtest"].user
1876 excuse.add_hint(self._blockall["no-autopkgtest"])
1877 if not excuse.autopkgtest_results:
1878 block_info["block"] = (
1879 "blocked by %s: does not have autopkgtest (%s)"
1880 % (
1881 self._blockall["no-autopkgtest"].user,
1882 tooltip,
1883 )
1884 )
1885 else:
1886 block_info["block"] = (
1887 "blocked by %s: autopkgtest not fully successful (%s)"
1888 % (
1889 self._blockall["no-autopkgtest"].user,
1890 tooltip,
1891 )
1892 )
1894 elif not is_primary:
1895 blocked["block"] = suite_name
1896 excuse.needs_approval = True
1898 for block_cmd in blocked:
1899 unblock_cmd = "un" + block_cmd
1900 if block_cmd in unblocked:
1901 if is_primary or block_cmd == "block-udeb":
1902 excuse.addinfo(
1903 "Ignoring %s request by %s, due to %s request by %s"
1904 % (
1905 block_cmd,
1906 blocked[block_cmd],
1907 unblock_cmd,
1908 unblocked[block_cmd],
1909 )
1910 )
1911 else:
1912 excuse.addinfo("Approved by %s" % (unblocked[block_cmd]))
1913 else:
1914 verdict = PolicyVerdict.REJECTED_NEEDS_APPROVAL
1915 if is_primary or block_cmd == "block-udeb":
1916 # redirect people to d-i RM for udeb things:
1917 if block_cmd == "block-udeb":
1918 tooltip = "please contact the d-i release manager if an update is needed"
1919 if block_cmd in block_info:
1920 info = block_info[block_cmd]
1921 else:
1922 info = (
1923 "Not touching package due to {} request by {} ({})".format(
1924 block_cmd,
1925 blocked[block_cmd],
1926 tooltip,
1927 )
1928 )
1929 excuse.add_verdict_info(verdict, info)
1930 else:
1931 excuse.add_verdict_info(verdict, "NEEDS APPROVAL BY RM")
1932 excuse.addreason("block")
1933 if mismatches:
1934 excuse.add_detailed_info(
1935 f"Some hints for {src} do not match this item"
1936 )
1937 return verdict
1939 def apply_src_policy_impl(
1940 self,
1941 block_info: dict[str, Any],
1942 source_data_tdist: SourcePackage | None,
1943 source_data_srcdist: SourcePackage,
1944 excuse: "Excuse",
1945 ) -> PolicyVerdict:
1946 return self._check_blocked("source", source_data_srcdist.version, excuse)
1948 def apply_srcarch_policy_impl(
1949 self,
1950 block_info: dict[str, Any],
1951 arch: str,
1952 source_data_tdist: SourcePackage | None,
1953 source_data_srcdist: SourcePackage,
1954 excuse: "Excuse",
1955 ) -> PolicyVerdict:
1956 return self._check_blocked(arch, source_data_srcdist.version, excuse)
1959class BuiltOnBuilddPolicy(AbstractBasePolicy):
1961 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1962 super().__init__(
1963 "builtonbuildd",
1964 options,
1965 suite_info,
1966 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1967 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1968 )
1969 self._builtonbuildd: dict[str, Any] = {
1970 "signerinfo": None,
1971 }
1973 def register_hints(self, hint_parser: HintParser) -> None:
1974 hint_parser.register_hint_type(
1975 HintType(
1976 "allow-archall-maintainer-upload",
1977 versioned=HintAnnotate.FORBIDDEN,
1978 )
1979 )
1981 def initialise(self, britney: "Britney") -> None:
1982 super().initialise(britney)
1983 try:
1984 filename_signerinfo = os.path.join(self.state_dir, "signers.json")
1985 except AttributeError as e: # pragma: no cover
1986 raise RuntimeError(
1987 "Please set STATE_DIR in the britney configuration"
1988 ) from e
1989 self._builtonbuildd["signerinfo"] = self._read_signerinfo(filename_signerinfo)
1991 def apply_srcarch_policy_impl(
1992 self,
1993 buildd_info: dict[str, Any],
1994 arch: str,
1995 source_data_tdist: SourcePackage | None,
1996 source_data_srcdist: SourcePackage,
1997 excuse: "Excuse",
1998 ) -> PolicyVerdict:
1999 verdict = PolicyVerdict.PASS
2000 signers = self._builtonbuildd["signerinfo"]
2002 if "signed-by" not in buildd_info:
2003 buildd_info["signed-by"] = {}
2005 item = excuse.item
2006 source_suite = item.suite
2008 # we use the source component, because a binary in contrib can
2009 # belong to a source in main
2010 component = get_component(source_data_srcdist.section)
2012 packages_s_a = source_suite.binaries[arch]
2013 assert self.hints is not None
2015 for pkg_id in sorted(
2016 x
2017 for x in filter_out_faux_gen(source_data_srcdist.binaries)
2018 if x.architecture == arch
2019 ):
2020 pkg_name = pkg_id.package_name
2021 binary_u = packages_s_a[pkg_name]
2022 pkg_arch = binary_u.architecture
2024 if binary_u.source_version != source_data_srcdist.version: 2024 ↛ 2025line 2024 didn't jump to line 2025 because the condition on line 2024 was never true
2025 continue
2027 if item.architecture != "source" and pkg_arch == "all":
2028 # we don't care about the existing arch: all binaries when
2029 # checking a binNMU item, because the arch: all binaries won't
2030 # migrate anyway
2031 continue
2033 signer = None
2034 uid = None
2035 uidinfo = ""
2036 buildd_ok = False
2037 failure_verdict = PolicyVerdict.REJECTED_PERMANENTLY
2038 try:
2039 signer = signers[pkg_name][pkg_id.version][pkg_arch]
2040 if signer["buildd"]:
2041 buildd_ok = True
2042 uid = signer["uid"]
2043 uidinfo = f"arch {pkg_arch} binaries uploaded by {uid}"
2044 except KeyError:
2045 self.logger.info(
2046 "signer info for %s %s (%s) on %s not found",
2047 pkg_name,
2048 binary_u.version,
2049 pkg_arch,
2050 arch,
2051 )
2052 uidinfo = f"upload info for arch {pkg_arch} binaries not found"
2053 failure_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT
2054 if not buildd_ok:
2055 if component != "main":
2056 if not buildd_ok and pkg_arch not in buildd_info["signed-by"]: 2056 ↛ 2060line 2056 didn't jump to line 2060 because the condition on line 2056 was always true
2057 excuse.add_detailed_info(
2058 f"{uidinfo}, but package in {component}"
2059 )
2060 buildd_ok = True
2061 elif pkg_arch == "all":
2062 if (
2063 allow_hint := self.hints.search_first(
2064 "allow-archall-maintainer-upload", package=item.package
2065 )
2066 ) is not None:
2067 buildd_ok = True
2068 verdict = PolicyVerdict.worst_of(
2069 verdict, PolicyVerdict.PASS_HINTED
2070 )
2071 if pkg_arch not in buildd_info["signed-by"]:
2072 excuse.addinfo(
2073 f"{uidinfo}, but whitelisted by {allow_hint.user}"
2074 )
2075 if not buildd_ok:
2076 verdict = failure_verdict
2077 if pkg_arch not in buildd_info["signed-by"]:
2078 if pkg_arch == "all":
2079 uidinfo += (
2080 ", a new source-only upload is needed to allow migration"
2081 )
2082 excuse.add_verdict_info(verdict, f"Not built on buildd: {uidinfo}")
2084 if ( 2084 ↛ 2088line 2084 didn't jump to line 2088
2085 pkg_arch in buildd_info["signed-by"]
2086 and buildd_info["signed-by"][pkg_arch] != uid
2087 ):
2088 self.logger.info(
2089 "signer mismatch for %s (%s %s) on %s: %s, while %s already listed",
2090 pkg_name,
2091 binary_u.source,
2092 binary_u.source_version,
2093 pkg_arch,
2094 uid,
2095 buildd_info["signed-by"][pkg_arch],
2096 )
2098 buildd_info["signed-by"][pkg_arch] = uid
2100 return verdict
2102 def _read_signerinfo(self, filename: str) -> dict[str, Any]:
2103 signerinfo: dict[str, Any] = {}
2104 self.logger.info("Loading signer info from %s", filename)
2105 with open(filename) as fd: 2105 ↛ exitline 2105 didn't return from function '_read_signerinfo' because the return on line 2107 wasn't executed
2106 if os.fstat(fd.fileno()).st_size < 1: 2106 ↛ 2107line 2106 didn't jump to line 2107 because the condition on line 2106 was never true
2107 return signerinfo
2108 signerinfo = json.load(fd)
2110 return signerinfo
2113class ImplicitDependencyPolicy(AbstractBasePolicy):
2114 """Implicit Dependency policy
2116 Upgrading a package pkg-a can break the installability of a package pkg-b.
2117 A newer version (or the removal) of pkg-b might fix the issue. In that
2118 case, pkg-a has an 'implicit dependency' on pkg-b, because pkg-a can only
2119 migrate if pkg-b also migrates.
2121 This policy tries to discover a few common cases, and adds the relevant
2122 info to the excuses. If another item is needed to fix the
2123 uninstallability, a dependency is added. If no newer item can fix it, this
2124 excuse will be blocked.
2126 Note that the migration step will check the installability of every
2127 package, so this policy doesn't need to handle every corner case. It
2128 must, however, make sure that no excuse is unnecessarily blocked.
2130 Some cases that should be detected by this policy:
2132 * pkg-a is upgraded from 1.0-1 to 2.0-1, while
2133 pkg-b has "Depends: pkg-a (<< 2.0)"
2134 This typically happens if pkg-b has a strict dependency on pkg-a because
2135 it uses some non-stable internal interface (examples are glibc,
2136 binutils, python3-defaults, ...)
2138 * pkg-a is upgraded from 1.0-1 to 2.0-1, and
2139 pkg-a 1.0-1 has "Provides: provides-1",
2140 pkg-a 2.0-1 has "Provides: provides-2",
2141 pkg-b has "Depends: provides-1"
2142 This typically happens when pkg-a has an interface that changes between
2143 versions, and a virtual package is used to identify the version of this
2144 interface (e.g. perl-api-x.y)
2146 """
2148 _pkg_universe: "BinaryPackageUniverse"
2149 _all_binaries: dict["BinaryPackageId", "BinaryPackage"]
2150 _allow_uninst: dict[str, set[str | None]]
2151 _nobreakall_arches: list[str]
2153 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2154 super().__init__(
2155 "implicit-deps",
2156 options,
2157 suite_info,
2158 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
2159 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
2160 )
2162 def initialise(self, britney: "Britney") -> None:
2163 super().initialise(britney)
2164 self._pkg_universe = britney.pkg_universe
2165 self._all_binaries = britney.all_binaries
2166 self._smooth_updates = britney.options.smooth_updates
2167 self._nobreakall_arches = self.options.nobreakall_arches
2168 self._new_arches = self.options.new_arches
2169 self._break_arches = self.options.break_arches
2170 self._allow_uninst = britney.allow_uninst
2171 self._outofsync_arches = self.options.outofsync_arches
2173 def can_be_removed(self, pkg: BinaryPackage) -> bool:
2174 src = pkg.source
2175 target_suite = self.suite_info.target_suite
2177 # TODO these conditions shouldn't be hardcoded here
2178 # ideally, we would be able to look up excuses to see if the removal
2179 # is in there, but in the current flow, this policy is called before
2180 # all possible excuses exist, so there is no list for us to check
2182 if src not in self.suite_info.primary_source_suite.sources:
2183 # source for pkg not in unstable: candidate for removal
2184 return True
2186 source_t = target_suite.sources[src]
2187 assert self.hints is not None
2188 if self.hints.has_hint("remove", package=src, version=source_t.version):
2189 # removal hint for the source in testing: candidate for removal
2190 return True
2192 if target_suite.is_cruft(pkg):
2193 # if pkg is cruft in testing, removal will be tried
2194 return True
2196 # the case were the newer version of the source no longer includes the
2197 # binary (or includes a cruft version of the binary) will be handled
2198 # separately (in that case there might be an implicit dependency on
2199 # the newer source)
2201 return False
2203 def should_skip_rdep(
2204 self, pkg: BinaryPackage, source_name: str, myarch: str
2205 ) -> bool:
2206 target_suite = self.suite_info.target_suite
2208 if not target_suite.is_pkg_in_the_suite(pkg.pkg_id):
2209 # it is not in the target suite, migration cannot break anything
2210 return True
2212 if pkg.source == source_name:
2213 # if it is built from the same source, it will be upgraded
2214 # with the source
2215 return True
2217 if self.can_be_removed(pkg):
2218 # could potentially be removed, so if that happens, it won't be
2219 # broken
2220 return True
2222 if pkg.architecture == "all" and myarch not in self._nobreakall_arches:
2223 # arch all on non nobreakarch is allowed to become uninstallable
2224 return True
2226 if pkg.pkg_id.package_name in self._allow_uninst[myarch]:
2227 # there is a hint to allow this binary to become uninstallable
2228 return True
2230 if not target_suite.is_installable(pkg.pkg_id):
2231 # it is already uninstallable in the target suite, migration
2232 # cannot break anything
2233 return True
2235 return False
2237 def breaks_installability(
2238 self,
2239 pkg_id_t: BinaryPackageId,
2240 pkg_id_s: BinaryPackageId | None,
2241 pkg_to_check: BinaryPackageId,
2242 ) -> bool:
2243 """
2244 Check if upgrading pkg_id_t to pkg_id_s breaks the installability of
2245 pkg_to_check.
2247 To check if removing pkg_id_t breaks pkg_to_check, set pkg_id_s to
2248 None.
2249 """
2251 pkg_universe = self._pkg_universe
2252 negative_deps = pkg_universe.negative_dependencies_of(pkg_to_check)
2254 for dep in pkg_universe.dependencies_of(pkg_to_check):
2255 if pkg_id_t not in dep:
2256 # this depends doesn't have pkg_id_t as alternative, so
2257 # upgrading pkg_id_t cannot break this dependency clause
2258 continue
2260 # We check all the alternatives for this dependency, to find one
2261 # that can satisfy it when pkg_id_t is upgraded to pkg_id_s
2262 found_alternative = False
2263 for d in dep:
2264 if d in negative_deps:
2265 # If this alternative dependency conflicts with
2266 # pkg_to_check, it cannot be used to satisfy the
2267 # dependency.
2268 # This commonly happens when breaks are added to pkg_id_s.
2269 continue
2271 if d.package_name != pkg_id_t.package_name:
2272 # a binary different from pkg_id_t can satisfy the dep, so
2273 # upgrading pkg_id_t won't break this dependency
2274 found_alternative = True
2275 break
2277 if d != pkg_id_s:
2278 # We want to know the impact of the upgrade of
2279 # pkg_id_t to pkg_id_s. If pkg_id_s migrates to the
2280 # target suite, any other version of this binary will
2281 # not be there, so it cannot satisfy this dependency.
2282 # This includes pkg_id_t, but also other versions.
2283 continue
2285 # pkg_id_s can satisfy the dep
2286 found_alternative = True
2287 break
2289 if not found_alternative:
2290 return True
2291 return False
2293 def check_upgrade(
2294 self,
2295 pkg_id_t: BinaryPackageId,
2296 pkg_id_s: BinaryPackageId | None,
2297 source_name: str,
2298 myarch: str,
2299 broken_binaries: set[str],
2300 excuse: "Excuse",
2301 ) -> PolicyVerdict:
2302 verdict = PolicyVerdict.PASS
2304 pkg_universe = self._pkg_universe
2305 all_binaries = self._all_binaries
2307 # check all rdeps of the package in testing
2308 rdeps_t = pkg_universe.reverse_dependencies_of(pkg_id_t)
2310 for rdep_pkg in sorted(rdeps_t):
2311 rdep_p = all_binaries[rdep_pkg]
2313 # check some cases where the rdep won't become uninstallable, or
2314 # where we don't care if it does
2315 if self.should_skip_rdep(rdep_p, source_name, myarch):
2316 continue
2318 if not self.breaks_installability(pkg_id_t, pkg_id_s, rdep_pkg):
2319 # if upgrading pkg_id_t to pkg_id_s doesn't break rdep_pkg,
2320 # there is no implicit dependency
2321 continue
2323 # The upgrade breaks the installability of the rdep. We need to
2324 # find out if there is a newer version of the rdep that solves the
2325 # uninstallability. If that is the case, there is an implicit
2326 # dependency. If not, the upgrade will fail.
2328 # check source versions
2329 good_newer_versions = set()
2330 for npkg, suite in find_newer_binaries(
2331 self.suite_info, rdep_p, add_source_for_dropped_bin=True
2332 ):
2333 if npkg.architecture == "source":
2334 # When a newer version of the source package doesn't have
2335 # the binary, we get the source as 'newer version'. In
2336 # this case, the binary will not be uninstallable if the
2337 # newer source migrates, because it is no longer there.
2338 good_newer_versions.add(npkg)
2339 continue
2340 assert isinstance(npkg, BinaryPackageId)
2341 if not self.breaks_installability(pkg_id_t, pkg_id_s, npkg):
2342 good_newer_versions.add(npkg)
2344 if good_newer_versions:
2345 spec = DependencySpec(DependencyType.IMPLICIT_DEPENDENCY, myarch)
2346 excuse.add_package_depends(spec, good_newer_versions)
2347 else:
2348 # no good newer versions: no possible solution
2349 broken_binaries.add(rdep_pkg.name)
2350 if pkg_id_s:
2351 action = "migrating {} to {}".format(
2352 pkg_id_s.name,
2353 self.suite_info.target_suite.name,
2354 )
2355 else:
2356 action = "removing {} from {}".format(
2357 pkg_id_t.name,
2358 self.suite_info.target_suite.name,
2359 )
2360 if rdep_pkg.package_name.endswith("-faux-build-depends"):
2361 name = rdep_pkg.package_name.removesuffix("-faux-build-depends")
2362 info = f'{action} makes Build-Depends of src:<a href="#{name}">{name}</a> uninstallable'
2363 else:
2364 info = '{0} makes <a href="#{1}">{1}</a> uninstallable'.format(
2365 action, rdep_pkg.name
2366 )
2367 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2368 excuse.add_verdict_info(verdict, info)
2370 return verdict
2372 def apply_srcarch_policy_impl(
2373 self,
2374 implicit_dep_info: dict[str, Any],
2375 arch: str,
2376 source_data_tdist: SourcePackage | None,
2377 source_data_srcdist: SourcePackage,
2378 excuse: "Excuse",
2379 ) -> PolicyVerdict:
2380 verdict = PolicyVerdict.PASS
2382 if not source_data_tdist:
2383 # this item is not currently in testing: no implicit dependency
2384 return verdict
2386 if excuse.hasreason("missingbuild"):
2387 # if the build is missing, the policy would treat this as if the
2388 # binaries would be removed, which would give incorrect (and
2389 # confusing) info
2390 info = "missing build, not checking implicit dependencies on %s" % (arch)
2391 excuse.add_detailed_info(info)
2392 return verdict
2394 source_suite = excuse.item.suite
2395 source_name = excuse.item.package
2396 target_suite = self.suite_info.target_suite
2397 all_binaries = self._all_binaries
2399 # we check all binaries for this excuse that are currently in testing
2400 relevant_binaries = sorted(
2401 x
2402 for x in source_data_tdist.binaries
2403 if (arch == "source" or x.architecture == arch)
2404 and x.package_name in target_suite.binaries[x.architecture]
2405 and x.architecture not in self._new_arches
2406 and x.architecture not in self._break_arches
2407 and x.architecture not in self._outofsync_arches
2408 )
2410 broken_binaries: set[str] = set()
2412 assert self.hints is not None
2413 for pkg_id_t in relevant_binaries:
2414 mypkg = pkg_id_t.package_name
2415 myarch = pkg_id_t.architecture
2416 binaries_t_a = target_suite.binaries[myarch]
2417 binaries_s_a = source_suite.binaries[myarch]
2419 if target_suite.is_cruft(all_binaries[pkg_id_t]):
2420 # this binary is cruft in testing: it will stay around as long
2421 # as necessary to satisfy dependencies, so we don't need to
2422 # care
2423 continue
2425 if mypkg in binaries_s_a:
2426 mybin = binaries_s_a[mypkg]
2427 pkg_id_s: Optional["BinaryPackageId"] = mybin.pkg_id
2428 if mybin.source != source_name:
2429 # hijack: this is too complicated to check, so we ignore
2430 # it (the migration code will check the installability
2431 # later anyway)
2432 pass
2433 elif mybin.source_version != source_data_srcdist.version:
2434 # cruft in source suite: pretend the binary doesn't exist
2435 pkg_id_s = None
2436 elif pkg_id_t == pkg_id_s:
2437 # same binary (probably arch: all from a binNMU):
2438 # 'upgrading' doesn't change anything, for this binary, so
2439 # it won't break anything
2440 continue
2441 else:
2442 pkg_id_s = None
2444 if not pkg_id_s and is_smooth_update_allowed(
2445 binaries_t_a[mypkg], self._smooth_updates, self.hints
2446 ):
2447 # the binary isn't in the new version (or is cruft there), and
2448 # smooth updates are allowed: the binary can stay around if
2449 # that is necessary to satisfy dependencies, so we don't need
2450 # to check it
2451 continue
2453 if (
2454 not pkg_id_s
2455 and source_data_tdist.version == source_data_srcdist.version
2456 and source_suite.suite_class is SuiteClass.ADDITIONAL_SOURCE_SUITE
2457 and binaries_t_a[mypkg].architecture == "all"
2458 ):
2459 # we're very probably migrating a binNMU built in tpu where the arch:all
2460 # binaries were not copied to it as that's not needed. This policy could
2461 # needlessly block.
2462 continue
2464 v = self.check_upgrade(
2465 pkg_id_t, pkg_id_s, source_name, myarch, broken_binaries, excuse
2466 )
2467 verdict = PolicyVerdict.worst_of(verdict, v)
2469 # each arch is processed separately, so if we already have info from
2470 # other archs, we need to merge the info from this arch
2471 broken_old = set(implicit_dep_info.get("broken-binaries", []))
2472 implicit_dep_info["broken-binaries"] = sorted(broken_old | broken_binaries)
2474 return verdict
2477class ReverseRemovalPolicy(AbstractBasePolicy):
2478 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2479 super().__init__(
2480 "reverseremoval",
2481 options,
2482 suite_info,
2483 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
2484 )
2486 def register_hints(self, hint_parser: HintParser) -> None:
2487 hint_parser.register_hint_type(HintType("ignore-reverse-remove"))
2489 def initialise(self, britney: "Britney") -> None:
2490 super().initialise(britney)
2492 pkg_universe = britney.pkg_universe
2493 source_suites = britney.suite_info.source_suites
2494 target_suite = britney.suite_info.target_suite
2496 # Build set of the sources of reverse (Build-) Depends
2497 assert self.hints is not None
2499 rev_bin: dict[BinaryPackageId, set[str]] = defaultdict(set)
2500 for hint in self.hints.search("remove"):
2501 for item in hint.packages:
2502 # I think we don't need to look at the target suite
2503 for src_suite in source_suites:
2504 try:
2505 # Explicitly not running filter_out_faux here
2506 my_bins = set(src_suite.sources[item.uvname].binaries)
2507 except KeyError:
2508 continue
2509 compute_reverse_tree(pkg_universe, my_bins)
2510 for this_bin in my_bins:
2511 rev_bin.setdefault(this_bin, set()).add(item.uvname)
2513 rev_src: dict[str, set[str]] = defaultdict(set)
2514 for bin_pkg, reasons in rev_bin.items():
2515 # If the pkg is in the target suite, there's nothing this
2516 # policy wants to do.
2517 if target_suite.is_pkg_in_the_suite(bin_pkg):
2518 continue
2519 that_bin = britney.all_binaries[bin_pkg]
2520 bin_src = that_bin.source + "/" + that_bin.source_version
2521 rev_src.setdefault(bin_src, set()).update(reasons)
2522 self._block_src_for_rm_hint = rev_src
2524 def apply_src_policy_impl(
2525 self,
2526 rev_remove_info: dict[str, Any],
2527 source_data_tdist: SourcePackage | None,
2528 source_data_srcdist: SourcePackage,
2529 excuse: "Excuse",
2530 ) -> PolicyVerdict:
2531 verdict = PolicyVerdict.PASS
2533 item = excuse.item
2534 if item.name in self._block_src_for_rm_hint:
2535 reason = ", ".join(sorted(self._block_src_for_rm_hint[item.name]))
2536 assert self.hints is not None
2537 excuse.addreason("reverseremoval")
2538 if (
2539 ignore_hint := self.hints.search_first(
2540 "ignore-reverse-remove", package=item.uvname, version=item.version
2541 )
2542 ) is not None:
2543 excuse.addreason("ignore-reverse-remove")
2544 excuse.addinfo(
2545 "Should block migration because of remove hint for %s, but forced by %s"
2546 % (reason, ignore_hint.user)
2547 )
2548 verdict = PolicyVerdict.PASS_HINTED
2549 else:
2550 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2551 excuse.add_verdict_info(
2552 verdict, "Remove hint for (transitive) dependency: %s" % reason
2553 )
2555 return verdict
2558class ReproducibleState(Enum):
2559 BAD = auto()
2560 FAIL = auto()
2561 GOOD = auto()
2562 UNKNOWN = auto()
2564 @staticmethod
2565 def from_str(val: str | None) -> "ReproducibleState":
2566 match val:
2567 case "BAD":
2568 return ReproducibleState.BAD
2569 case "FAIL": 2569 ↛ 2570line 2569 didn't jump to line 2570 because the pattern on line 2569 never matched
2570 return ReproducibleState.FAIL
2571 case "GOOD": 2571 ↛ 2573line 2571 didn't jump to line 2573 because the pattern on line 2571 always matched
2572 return ReproducibleState.GOOD
2573 case "UNKNOWN" | None:
2574 return ReproducibleState.UNKNOWN
2575 case _:
2576 raise ValueError(f"Invalid reproducability state f{str}")
2579@dataclass(slots=True, frozen=True)
2580class ReproducibleData:
2581 state: ReproducibleState
2582 build_id: str | None = field(default=None, kw_only=True)
2583 diffoscope_log_id: str | None = field(default=None, kw_only=True)
2584 artifact_id: str | None = field(default=None, kw_only=True)
2587class ReproduciblePolicy(AbstractBasePolicy):
2588 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2589 super().__init__(
2590 "reproducible",
2591 options,
2592 suite_info,
2593 {SuiteClass.PRIMARY_SOURCE_SUITE},
2594 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
2595 )
2596 self._reproducible: dict[str, dict[tuple[str, str], ReproducibleData]] = {}
2598 # Default values for this policy's options
2599 parse_option(options, "repro_success_bounty", default=0, to_int=True)
2600 parse_option(options, "repro_regression_penalty", default=0, to_int=True)
2601 parse_option(options, "repro_log_url")
2602 parse_option(options, "repro_excuse_url")
2603 parse_option(options, "repro_retry_url")
2604 parse_option(options, "repro_components")
2606 def register_hints(self, hint_parser: HintParser) -> None:
2607 hint_parser.register_hint_type(
2608 HintType(
2609 "ignore-reproducible-src",
2610 versioned=HintAnnotate.OPTIONAL,
2611 architectured=HintAnnotate.OPTIONAL,
2612 )
2613 )
2614 hint_parser.register_hint_type(
2615 HintType(
2616 "ignore-reproducible",
2617 versioned=HintAnnotate.OPTIONAL,
2618 architectured=HintAnnotate.OPTIONAL,
2619 )
2620 )
2622 def initialise(self, britney: "Britney") -> None:
2623 super().initialise(britney)
2624 summary = self._reproducible
2626 valid_release_names = {
2627 suite.codename
2628 for suite in chain(
2629 (britney.suite_info.target_suite,),
2630 britney.suite_info.source_suites,
2631 )
2632 } | {
2633 suite.name
2634 for suite in chain(
2635 (britney.suite_info.target_suite,),
2636 britney.suite_info.source_suites,
2637 )
2638 }
2640 assert hasattr(
2641 self, "state_dir"
2642 ), "Please set STATE_DIR in the britney configuration"
2643 assert (
2644 self.options.repro_components
2645 ), "Please set REPRO_COMPONENTS in the britney configuration"
2646 for file in os.listdir(self.state_dir):
2647 if not file.startswith("reproducible-") or not file.endswith(".json"): 2647 ↛ 2648line 2647 didn't jump to line 2648 because the condition on line 2647 was never true
2648 continue
2649 filename = os.path.join(self.state_dir, file)
2651 self.logger.info("Loading reproducibility report from %s", filename)
2652 with open(filename) as fd: 2652 ↛ 2646line 2652 didn't jump to line 2646 because the continue on line 2654 wasn't executed
2653 if os.fstat(fd.fileno()).st_size < 1: 2653 ↛ 2654line 2653 didn't jump to line 2654 because the condition on line 2653 was never true
2654 continue
2655 data = json.load(fd)
2657 for result in data["records"]:
2658 if ( 2658 ↛ 2662line 2658 didn't jump to line 2662 because the condition on line 2658 was never true
2659 release := result.get("release")
2660 ) is not None and release not in valid_release_names:
2661 # tests do not have a release set
2662 continue
2664 state = ReproducibleState.from_str(result.get("status"))
2665 repo = {
2666 key: value
2667 for key, value in result.items()
2668 if key in ("build_id", "diffoscope_log_id", "artifact_id")
2669 }
2671 summary.setdefault(result["architecture"], {})[
2672 (result["name"], result["version"])
2673 ] = ReproducibleData(state, **repo)
2675 def _lookup_data(
2676 self, package_name: str, version: str, arch: str
2677 ) -> tuple[ReproducibleData, str] | None:
2678 key = (package_name, version)
2679 if (repo := self._reproducible[arch].get(key)) is not None:
2680 return repo, arch
2682 repo = self._reproducible["all"].get(key)
2683 return (repo, "all") if repo is not None else None
2685 def _format_link(self, bpid: BinaryPackageId, arch: str) -> str:
2686 data = self._lookup_data(bpid.package_name, bpid.version, arch)
2687 assert data is not None
2688 repo, arch = data
2689 if repo.diffoscope_log_id and (diff_id := repo.artifact_id): 2689 ↛ 2690line 2689 didn't jump to line 2690 because the condition on line 2689 was never true
2690 endpoint = f"artifacts/{diff_id}/diffoscope"
2691 else:
2692 endpoint = "log"
2693 url = self.options.repro_log_url.format(
2694 arch=arch, build_id=repo.build_id, endpoint=endpoint
2695 )
2696 return f'<a href="{url}">{bpid.package_name}</a>'
2698 def _create_link_to_log(self, arch: str, failed_bpids: set[BinaryPackageId]) -> str:
2699 if not self.options.repro_log_url: 2699 ↛ 2700line 2699 didn't jump to line 2700 because the condition on line 2699 was never true
2700 return ": " + ", ".join(bpid.package_name for bpid in sorted(failed_bpids))
2702 return ": " + ", ".join(
2703 self._format_link(bpid, arch) for bpid in sorted(failed_bpids)
2704 )
2706 def apply_srcarch_policy_impl(
2707 self,
2708 policy_info: dict[str, Any],
2709 arch: str,
2710 source_data_tdist: SourcePackage | None,
2711 source_data_srcdist: SourcePackage,
2712 excuse: "Excuse",
2713 ) -> PolicyVerdict:
2714 verdict = PolicyVerdict.PASS
2715 eligible_for_bounty = False
2716 all_hints = []
2718 assert self.hints is not None # Needed for type checking / mypy
2720 # we don't want to apply this policy (yet) on binNMUs
2721 if excuse.item.architecture != "source": 2721 ↛ 2722line 2721 didn't jump to line 2722 because the condition on line 2721 was never true
2722 return verdict
2724 # we're not supposed to judge on this arch
2725 if arch not in self.options.repro_arches: 2725 ↛ 2726line 2725 didn't jump to line 2726 because the condition on line 2725 was never true
2726 return verdict
2728 # bail out if this arch has no packages for this source (not build
2729 # here)
2730 if arch not in excuse.packages: 2730 ↛ 2731line 2730 didn't jump to line 2731 because the condition on line 2730 was never true
2731 return verdict
2733 component = get_component(source_data_srcdist.section)
2735 if ( 2735 ↛ 2739line 2735 didn't jump to line 2739
2736 self.options.repro_components
2737 and component not in self.options.repro_components.split()
2738 ):
2739 return verdict
2741 source_name = excuse.item.package
2743 if self.options.repro_excuse_url: 2743 ↛ 2744line 2743 didn't jump to line 2744 because the condition on line 2743 was never true
2744 url = self.options.repro_excuse_url.format(
2745 package=quote(source_name), arch=arch
2746 )
2747 url_html = ' - <a href="%s">info</a>' % url
2748 # When run on multiple archs, the last one "wins"
2749 policy_info["status-url"] = url
2750 else:
2751 url = None
2752 url_html = ""
2754 if arch not in self._reproducible: 2754 ↛ 2755line 2754 didn't jump to line 2755 because the condition on line 2754 was never true
2755 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2756 msg = f"No reproducibility data available at all for {arch}"
2757 excuse.add_verdict_info(verdict, msg)
2758 return verdict
2759 if "all" not in self._reproducible: 2759 ↛ 2760line 2759 didn't jump to line 2760 because the condition on line 2759 was never true
2760 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2761 msg = "No reproducibility data available at all for arch:all"
2762 excuse.add_verdict_info(verdict, msg)
2763 return verdict
2765 # skip/delay policy until both arch:arch and arch:all builds are done
2766 if (arch or "all") in excuse.missing_builds: 2766 ↛ 2767line 2766 didn't jump to line 2767 because the condition on line 2766 was never true
2767 self.logger.debug(
2768 "%s not built for %s or all, skipping reproducible policy",
2769 excuse.name,
2770 arch,
2771 )
2772 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2773 excuse.add_verdict_info(
2774 verdict,
2775 f"Reproducibility check deferred on {arch}: missing builds{url_html}",
2776 )
2777 return verdict
2779 source_suite_state = "not-unknown"
2780 failed_bpids: set[BinaryPackageId] = set()
2781 # The states should either be GOOD/BAD for all binaries, UNKNOWN for all
2782 # binaries, or missing for all binaries, but let's not assume that.
2783 # They can be from different components after all.
2784 bins_src, src_suite_name = binaries_from_source_version(
2785 source_data_srcdist, self.suite_info
2786 )
2787 for bpid in bins_src:
2788 if bpid.architecture not in ("all", arch): 2788 ↛ 2789line 2788 didn't jump to line 2789 because the condition on line 2788 was never true
2789 continue
2790 in_component = True
2791 for suite in self.suite_info.source_suites:
2792 if suite.name == src_suite_name and ( 2792 ↛ 2800line 2792 didn't jump to line 2800 because the condition on line 2792 was never true
2793 (
2794 component := get_component(
2795 suite.all_binaries_in_suite[bpid].section
2796 )
2797 )
2798 not in self.options.repro_components
2799 ):
2800 self.logger.debug(
2801 "repro check for %s skipped due to component %s",
2802 bpid,
2803 component,
2804 )
2805 in_component = False
2806 break
2807 if not in_component: 2807 ↛ 2809line 2807 didn't jump to line 2809 because the condition on line 2807 was never true
2808 # TODO: should we update the excuses text?
2809 continue
2811 if (
2812 data := self._lookup_data(bpid.package_name, bpid.version, arch)
2813 ) is not None:
2814 pkg_info, _ = data
2815 self.logger.debug("repro data for %s: %s", bpid, pkg_info.state)
2816 if pkg_info.state is ReproducibleState.BAD:
2817 failed_bpids.add(bpid)
2818 # not changing source_suite_state here on purpose
2819 elif ( 2819 ↛ 2823line 2819 didn't jump to line 2823
2820 pkg_info.state is ReproducibleState.FAIL
2821 or pkg_info.state is ReproducibleState.UNKNOWN
2822 ):
2823 source_suite_state = "unknown"
2824 else:
2825 self.logger.debug("No repro data found for %s", bpid)
2826 # but maybe it's hinted (e.g. at the time of writing
2827 # reproduce.debian.net has a bug where udebs go missing)
2828 if ( 2828 ↛ 2836line 2828 didn't jump to line 2836 because the condition on line 2828 was never true
2829 bpid_hints := self.hints.search_first(
2830 "ignore-reproducible",
2831 package=bpid.package_name,
2832 version=bpid.version,
2833 architecture=bpid.architecture,
2834 )
2835 ) is not None:
2836 all_hints.append(bpid_hints)
2837 self.logger.debug(f"repro: hint found for {source_name}: {bpid}")
2838 else:
2839 source_suite_state = "unknown"
2840 break
2842 if source_suite_state == "not-unknown":
2843 source_suite_state = "known"
2845 excuse_info = []
2846 if source_suite_state == "unknown":
2847 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2848 excuse_info.append(
2849 f"Reproducibility check waiting for results on {arch}{url_html}"
2850 )
2851 policy_info.setdefault("state", {}).setdefault(arch, "unavailable")
2852 elif failed_bpids:
2853 ignored_bpids: set[BinaryPackageId] = set()
2854 if source_data_tdist is None: 2854 ↛ 2855line 2854 didn't jump to line 2855 because the condition on line 2854 was never true
2855 target_suite_state = "new"
2856 else:
2857 target_suite_state = "reproducible"
2858 for bpid in failed_bpids:
2859 pkg_name = bpid.package_name
2860 for bpid_t in filter_out_faux_gen(source_data_tdist.binaries):
2861 if bpid_t.architecture not in ("all", arch): 2861 ↛ 2862line 2861 didn't jump to line 2862 because the condition on line 2861 was never true
2862 continue
2863 if pkg_name != bpid_t.package_name: 2863 ↛ 2864line 2863 didn't jump to line 2864 because the condition on line 2863 was never true
2864 continue
2865 if ( 2865 ↛ 2880line 2865 didn't jump to line 2880 because the condition on line 2865 was always true
2866 data := self._lookup_data(pkg_name, bpid_t.version, arch)
2867 ) is not None:
2868 pkg_info, _ = data
2869 self.logger.debug(
2870 "testing repro data for %s: %s", bpid_t, pkg_info.state
2871 )
2872 if pkg_info.state is ReproducibleState.BAD:
2873 ignored_bpids.add(bpid)
2874 elif ( 2874 ↛ 2878line 2874 didn't jump to line 2878
2875 pkg_info.state is ReproducibleState.FAIL
2876 or pkg_info.state is ReproducibleState.UNKNOWN
2877 ):
2878 target_suite_state = "unknown"
2879 else:
2880 self.logger.debug(
2881 "No testing repro data found for %s", bpid_t
2882 )
2883 # This shouldn't happen as for the past migration
2884 # to have been allowed, there should be data.
2885 target_suite_state = "unknown"
2886 break
2888 # Reminder: code here is part of the non-reproducibile source-suite branch
2889 if target_suite_state == "new": 2889 ↛ 2890line 2889 didn't jump to line 2890 because the condition on line 2889 was never true
2890 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2891 excuse_info.append(
2892 f"New but not reproduced on {arch}{url_html}"
2893 f"{self._create_link_to_log(arch, failed_bpids)}"
2894 )
2895 policy_info.setdefault("state", {}).setdefault(
2896 arch, "new but not reproducible"
2897 )
2898 elif target_suite_state == "unknown": 2898 ↛ 2900line 2898 didn't jump to line 2900 because the condition on line 2898 was never true
2899 # Shouldn't happen after initial bootstrap once blocking
2900 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2901 excuse_info.append(
2902 f"Reproducibility check failed and now waiting for reference "
2903 f"results on {arch}{url_html}"
2904 f"{self._create_link_to_log(arch, failed_bpids)}"
2905 )
2906 policy_info.setdefault("state", {}).setdefault(
2907 arch, "waiting for reference"
2908 )
2909 elif failed_bpids <= ignored_bpids:
2910 # For the forseeable future we want to prevent regressions, one day
2911 # we might want to even block these.
2912 # verdict = PolicyVerdict.REJECTED_PERMANENTLY
2913 excuse_info.append(
2914 f"Not reproduced on {arch} (not a regression)"
2915 f"{self._create_link_to_log(arch, failed_bpids)}"
2916 )
2917 policy_info.setdefault("state", {}).setdefault(arch, "not reproducible")
2918 else:
2919 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2920 excuse_info.append(
2921 f"Reproducibility regression on {arch}"
2922 f"{self._create_link_to_log(arch, failed_bpids - ignored_bpids)}"
2923 )
2924 policy_info.setdefault("state", {}).setdefault(arch, "regression")
2926 # non-reproducible source-suite cases are handled above, so here we
2927 # handle the last of the source-suite cases
2928 else:
2929 excuse_info.append(f"Reproduced on {arch}{url_html}")
2930 policy_info.setdefault("state", {}).setdefault(arch, "reproducible")
2931 eligible_for_bounty = True
2933 if verdict.is_rejected:
2934 for hint_arch in ("source", arch):
2935 if (
2936 ignore_hint := self.hints.search_first(
2937 "ignore-reproducible-src",
2938 package=source_name,
2939 version=source_data_srcdist.version,
2940 architecture=hint_arch,
2941 )
2942 ) is not None:
2943 # one hint is enough, take the first one encountered
2944 verdict = PolicyVerdict.PASS_HINTED
2945 policy_info.setdefault("hints", {}).setdefault(arch, []).append(
2946 f"{ignore_hint.user}: {str(ignore_hint)}"
2947 )
2948 if hint_arch == arch: 2948 ↛ 2951line 2948 didn't jump to line 2951 because the condition on line 2948 was always true
2949 on_arch = f" on {arch}"
2950 else:
2951 on_arch = ""
2952 excuse_info.append(
2953 f"Reproducibility issues ignored for src:{ignore_hint.package}"
2954 f"{on_arch} as requested by {ignore_hint.user}"
2955 )
2956 break
2958 if verdict.is_rejected:
2959 if source_suite_state == "known":
2960 check_bpids = failed_bpids - ignored_bpids
2961 else:
2962 # Let's not wait for results if all binaries have a hint
2963 check_bpids = filter_out_faux(source_data_srcdist.binaries)
2964 missed_bpids = set()
2966 for bpid in check_bpids:
2967 if (
2968 bpid_hint := self.hints.search_first(
2969 "ignore-reproducible",
2970 package=bpid.package_name,
2971 version=bpid.version,
2972 architecture=bpid.architecture,
2973 )
2974 ) is not None:
2975 # one hint per binary is enough
2976 all_hints.append(bpid_hint)
2977 self.logger.debug(
2978 "repro: hint found for %s: %s", source_name, bpid
2979 )
2980 else:
2981 missed_bpids.add(bpid)
2983 if not missed_bpids:
2984 verdict = PolicyVerdict.PASS_HINTED
2985 for hint in all_hints:
2986 policy_info.setdefault("hints", {}).setdefault(arch, []).append(
2987 hint.user + ": " + str(hint)
2988 )
2989 # TODO: we're going to print this for arch:all binaries on each arch
2990 excuse_info.append(
2991 f"Reproducibility issues ignored for {hint.package} on {arch} as "
2992 f"requested by {hint.user}"
2993 )
2994 elif all_hints: 2994 ↛ 2995line 2994 didn't jump to line 2995 because the condition on line 2994 was never true
2995 self.logger.info(
2996 "repro: binary hints for %s ignored as they don't cover these binaries %s",
2997 source_name,
2998 missed_bpids,
2999 )
3001 # A binary without results got hinted
3002 if not verdict.is_rejected and all_hints:
3003 for hint in all_hints:
3004 excuse_info.append(
3005 f"Reproducibility unknown for {hint.package} but ignored on {arch} as "
3006 f"requested by {hint.user}"
3007 )
3009 if self.options.repro_success_bounty and eligible_for_bounty: 3009 ↛ 3010line 3009 didn't jump to line 3010 because the condition on line 3009 was never true
3010 excuse.add_bounty("reproducibility", self.options.repro_success_bounty)
3012 if verdict.is_rejected and self.options.repro_regression_penalty: 3012 ↛ 3014line 3012 didn't jump to line 3014 because the condition on line 3012 was never true
3013 # With a non-zero penalty, we shouldn't block on this policy
3014 verdict = PolicyVerdict.PASS
3015 if self.options.repro_regression_penalty > 0:
3016 excuse.add_penalty(
3017 "reproducibility", self.options.repro_regression_penalty
3018 )
3020 for msg in excuse_info:
3021 if verdict.is_rejected:
3022 excuse.add_verdict_info(verdict, msg)
3023 else:
3024 excuse.addinfo(msg)
3026 return verdict