Coverage for britney2/policies/policy.py: 85%
1249 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-13 17:57 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-12-13 17:57 +0000
1import json
2import logging
3import optparse
4import os
5import re
6import sys
7import time
8from abc import ABC, abstractmethod
9from collections import defaultdict
10from collections.abc import Callable, Container
11from enum import IntEnum, unique
12from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, cast
13from urllib.parse import quote
15import apt_pkg
16import yaml
18from britney2 import (
19 BinaryPackage,
20 BinaryPackageId,
21 DependencyType,
22 PackageId,
23 SourcePackage,
24 Suite,
25 SuiteClass,
26 Suites,
27 TargetSuite,
28)
29from britney2.excusedeps import DependencySpec
30from britney2.hints import (
31 Hint,
32 HintAnnotate,
33 HintCollection,
34 HintParser,
35 HintType,
36 PolicyHintParserProto,
37)
38from britney2.inputs.suiteloader import SuiteContentLoader
39from britney2.migrationitem import MigrationItem, MigrationItemFactory
40from britney2.policies import ApplySrcPolicy, PolicyVerdict
41from britney2.utils import (
42 GetDependencySolversProto,
43 compute_reverse_tree,
44 find_newer_binaries,
45 get_dependency_solvers,
46 is_smooth_update_allowed,
47 parse_option,
48)
50if TYPE_CHECKING: 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true
51 from ..britney import Britney
52 from ..excuse import Excuse
53 from ..installability.universe import BinaryPackageUniverse
56class PolicyLoadRequest:
57 __slots__ = ("_options_name", "_default_value", "_policy_constructor")
59 def __init__(
60 self,
61 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"],
62 options_name: str | None,
63 default_value: bool,
64 ) -> None:
65 self._policy_constructor = policy_constructor
66 self._options_name = options_name
67 self._default_value = default_value
69 def is_enabled(self, options: optparse.Values) -> bool:
70 if self._options_name is None:
71 assert self._default_value
72 return True
73 actual_value = getattr(options, self._options_name, None)
74 if actual_value is None:
75 return self._default_value
76 return actual_value.lower() in ("yes", "y", "true", "t")
78 def load(self, options: optparse.Values, suite_info: Suites) -> "BasePolicy":
79 return self._policy_constructor(options, suite_info)
81 @classmethod
82 def always_load(
83 cls, policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"]
84 ) -> "PolicyLoadRequest":
85 return cls(policy_constructor, None, True)
87 @classmethod
88 def conditionally_load(
89 cls,
90 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"],
91 option_name: str,
92 default_value: bool,
93 ) -> "PolicyLoadRequest":
94 return cls(policy_constructor, option_name, default_value)
97class PolicyEngine:
98 def __init__(self) -> None:
99 self._policies: list["BasePolicy"] = []
101 def add_policy(self, policy: "BasePolicy") -> None:
102 self._policies.append(policy)
104 def load_policies(
105 self,
106 options: optparse.Values,
107 suite_info: Suites,
108 policy_load_requests: list[PolicyLoadRequest],
109 ) -> None:
110 for policy_load_request in policy_load_requests:
111 if policy_load_request.is_enabled(options):
112 self.add_policy(policy_load_request.load(options, suite_info))
114 def register_policy_hints(self, hint_parser: HintParser) -> None:
115 for policy in self._policies:
116 policy.register_hints(hint_parser)
118 def initialise(self, britney: "Britney", hints: HintCollection) -> None:
119 for policy in self._policies:
120 policy.hints = hints
121 policy.initialise(britney)
123 def save_state(self, britney: "Britney") -> None:
124 for policy in self._policies:
125 policy.save_state(britney)
127 def apply_src_policies(
128 self,
129 item: MigrationItem,
130 source_t: SourcePackage | None,
131 source_u: SourcePackage,
132 excuse: "Excuse",
133 ) -> None:
134 excuse_verdict = excuse.policy_verdict
135 source_suite = item.suite
136 suite_class = source_suite.suite_class
137 for policy in self._policies:
138 pinfo: dict[str, Any] = {}
139 policy_verdict = PolicyVerdict.NOT_APPLICABLE
140 if suite_class in policy.applicable_suites:
141 if policy.src_policy.run_arch:
142 for arch in policy.options.architectures:
143 v = policy.apply_srcarch_policy_impl(
144 pinfo, item, arch, source_t, source_u, excuse
145 )
146 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v)
147 if policy.src_policy.run_src:
148 v = policy.apply_src_policy_impl(
149 pinfo, item, source_t, source_u, excuse
150 )
151 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v)
152 # The base policy provides this field, so the subclass should leave it blank
153 assert "verdict" not in pinfo
154 if policy_verdict != PolicyVerdict.NOT_APPLICABLE:
155 excuse.policy_info[policy.policy_id] = pinfo
156 pinfo["verdict"] = policy_verdict.name
157 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict)
158 excuse.policy_verdict = excuse_verdict
160 def apply_srcarch_policies(
161 self,
162 item: MigrationItem,
163 arch: str,
164 source_t: SourcePackage | None,
165 source_u: SourcePackage,
166 excuse: "Excuse",
167 ) -> None:
168 excuse_verdict = excuse.policy_verdict
169 source_suite = item.suite
170 suite_class = source_suite.suite_class
171 for policy in self._policies:
172 pinfo: dict[str, Any] = {}
173 if suite_class in policy.applicable_suites:
174 policy_verdict = policy.apply_srcarch_policy_impl(
175 pinfo, item, arch, source_t, source_u, excuse
176 )
177 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict)
178 # The base policy provides this field, so the subclass should leave it blank
179 assert "verdict" not in pinfo
180 if policy_verdict != PolicyVerdict.NOT_APPLICABLE:
181 excuse.policy_info[policy.policy_id] = pinfo
182 pinfo["verdict"] = policy_verdict.name
183 excuse.policy_verdict = excuse_verdict
186class BasePolicy(ABC):
187 britney: "Britney"
188 policy_id: str
189 hints: HintCollection | None
190 applicable_suites: set[SuiteClass]
191 src_policy: ApplySrcPolicy
192 options: optparse.Values
193 suite_info: Suites
195 def __init__(
196 self,
197 options: optparse.Values,
198 suite_info: Suites,
199 ) -> None:
200 """The BasePolicy constructor
202 :param options: The options member of Britney with all the
203 config values.
204 """
206 @property
207 @abstractmethod
208 def state_dir(self) -> str: ... 208 ↛ exitline 208 didn't return from function 'state_dir' because
210 def register_hints(self, hint_parser: HintParser) -> None: # pragma: no cover
211 """Register new hints that this policy accepts
213 :param hint_parser: (see HintParser.register_hint_type)
214 """
216 def initialise(self, britney: "Britney") -> None: # pragma: no cover
217 """Called once to make the policy initialise any data structures
219 This is useful for e.g. parsing files or other "heavy do-once" work.
221 :param britney: This is the instance of the "Britney" class.
222 """
223 self.britney = britney
225 def save_state(self, britney: "Britney") -> None: # pragma: no cover
226 """Called once at the end of the run to make the policy save any persistent data
228 Note this will *not* be called for "dry-runs" as such runs should not change
229 the state.
231 :param britney: This is the instance of the "Britney" class.
232 """
234 def apply_src_policy_impl(
235 self,
236 policy_info: dict[str, Any],
237 item: MigrationItem,
238 source_data_tdist: SourcePackage | None,
239 source_data_srcdist: SourcePackage,
240 excuse: "Excuse",
241 ) -> PolicyVerdict: # pragma: no cover
242 """Apply a policy on a given source migration
244 Britney will call this method on a given source package, when
245 Britney is considering to migrate it from the given source
246 suite to the target suite. The policy will then evaluate the
247 the migration and then return a verdict.
249 :param policy_info: A dictionary of all policy results. The
250 policy can add a value stored in a key related to its name.
251 (e.g. policy_info['age'] = {...}). This will go directly into
252 the "excuses.yaml" output.
254 :param item: The migration item the policy is applied to.
256 :param source_data_tdist: Information about the source package
257 in the target distribution (e.g. "testing"). This is the
258 data structure in source_suite.sources[source_name]
260 :param source_data_srcdist: Information about the source
261 package in the source distribution (e.g. "unstable" or "tpu").
262 This is the data structure in target_suite.sources[source_name]
264 :return: A Policy Verdict (e.g. PolicyVerdict.PASS)
265 """
266 return PolicyVerdict.NOT_APPLICABLE
268 def apply_srcarch_policy_impl(
269 self,
270 policy_info: dict[str, Any],
271 item: MigrationItem,
272 arch: str,
273 source_data_tdist: SourcePackage | None,
274 source_data_srcdist: SourcePackage,
275 excuse: "Excuse",
276 ) -> PolicyVerdict:
277 """Apply a policy on a given binary migration
279 Britney will call this method on binaries from a given source package
280 on a given architecture, when Britney is considering to migrate them
281 from the given source suite to the target suite. The policy will then
282 evaluate the migration and then return a verdict.
284 :param policy_info: A dictionary of all policy results. The
285 policy can add a value stored in a key related to its name.
286 (e.g. policy_info['age'] = {...}). This will go directly into
287 the "excuses.yaml" output.
289 :param item: The migration item the policy is applied to.
291 :param arch: The architecture the item is applied to. This is mostly
292 relevant for policies where src_policy is not ApplySrcPolicy.RUN_SRC
293 (as that is the only case where arch can differ from item.architecture)
295 :param source_data_tdist: Information about the source package
296 in the target distribution (e.g. "testing"). This is the
297 data structure in source_suite.sources[source_name]
299 :param source_data_srcdist: Information about the source
300 package in the source distribution (e.g. "unstable" or "tpu").
301 This is the data structure in target_suite.sources[source_name]
303 :return: A Policy Verdict (e.g. PolicyVerdict.PASS)
304 """
305 # if the policy doesn't implement this function, assume it's OK
306 return PolicyVerdict.NOT_APPLICABLE
309class AbstractBasePolicy(BasePolicy):
310 """
311 A shared abstract class for building BasePolicy objects.
313 tests/test_policy.py:initialize_policy() needs to be able to build BasePolicy
314 objects with just a two-item constructor, while all other uses of BasePolicy-
315 derived objects need the 5-item constructor. So AbstractBasePolicy was split
316 out to document this.
317 """
319 def __init__(
320 self,
321 policy_id: str,
322 options: optparse.Values,
323 suite_info: Suites,
324 applicable_suites: set[SuiteClass],
325 src_policy: ApplySrcPolicy = ApplySrcPolicy.RUN_SRC,
326 ) -> None:
327 """Concrete initializer.
329 :param policy_id: Identifies the policy. It will
330 determine the key used for the excuses.yaml etc.
332 :param options: The options member of Britney with all the
333 config values.
335 :param applicable_suites: Where this policy applies.
336 """
337 self.policy_id = policy_id
338 self.options = options
339 self.suite_info = suite_info
340 self.applicable_suites = applicable_suites
341 self.src_policy = src_policy
342 self.hints: HintCollection | None = None
343 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
344 self.logger = logging.getLogger(logger_name)
346 @property
347 def state_dir(self) -> str:
348 return cast(str, self.options.state_dir)
351_T = TypeVar("_T")
354class SimplePolicyHint(Hint, Generic[_T]):
355 def __init__(
356 self,
357 user: str,
358 hint_type: HintType,
359 policy_parameter: _T,
360 packages: list[MigrationItem],
361 ) -> None:
362 super().__init__(user, hint_type, packages)
363 self._policy_parameter = policy_parameter
365 def __eq__(self, other: Any) -> bool:
366 if self.type != other.type or self._policy_parameter != other._policy_parameter:
367 return False
368 return super().__eq__(other)
370 def str(self) -> str:
371 return "{} {} {}".format(
372 self._type,
373 str(self._policy_parameter),
374 " ".join(x.name for x in self._packages),
375 )
378class AgeDayHint(SimplePolicyHint[int]):
379 @property
380 def days(self) -> int:
381 return self._policy_parameter
384class IgnoreRCBugHint(SimplePolicyHint[frozenset[str]]):
385 @property
386 def ignored_rcbugs(self) -> frozenset[str]:
387 return self._policy_parameter
390def simple_policy_hint_parser_function(
391 class_name: Callable[[str, HintType, _T, list[MigrationItem]], Hint],
392 converter: Callable[[str], _T],
393) -> PolicyHintParserProto:
394 def f(
395 mi_factory: MigrationItemFactory,
396 hints: HintCollection,
397 who: str,
398 hint_type: HintType,
399 *args: str,
400 ) -> None:
401 policy_parameter = args[0]
402 args = args[1:]
403 for item in mi_factory.parse_items(*args):
404 hints.add_hint(
405 class_name(who, hint_type, converter(policy_parameter), [item])
406 )
408 return f
411class AgePolicy(AbstractBasePolicy):
412 """Configurable Aging policy for source migrations
414 The AgePolicy will let packages stay in the source suite for a pre-defined
415 amount of days before letting migrate (based on their urgency, if any).
417 The AgePolicy's decision is influenced by the following:
419 State files:
420 * ${STATE_DIR}/age-policy-urgencies: File containing urgencies for source
421 packages. Note that urgencies are "sticky" and the most "urgent" urgency
422 will be used (i.e. the one with lowest age-requirements).
423 - This file needs to be updated externally, if the policy should take
424 urgencies into consideration. If empty (or not updated), the policy
425 will simply use the default urgency (see the "Config" section below)
426 - In Debian, these values are taken from the .changes file, but that is
427 not a requirement for Britney.
428 * ${STATE_DIR}/age-policy-dates: File containing the age of all source
429 packages.
430 - The policy will automatically update this file.
431 Config:
432 * DEFAULT_URGENCY: Name of the urgency used for packages without an urgency
433 (or for unknown urgencies). Will also be used to set the "minimum"
434 aging requirements for packages not in the target suite.
435 * MINDAYS_<URGENCY>: The age-requirements in days for packages with the
436 given urgency.
437 - Commonly used urgencies are: low, medium, high, emergency, critical
438 Hints:
439 * urgent <source>/<version>: Disregard the age requirements for a given
440 source/version.
441 * age-days X <source>/<version>: Set the age requirements for a given
442 source/version to X days. Note that X can exceed the highest
443 age-requirement normally given.
445 """
447 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
448 super().__init__("age", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE})
449 self._min_days = self._generate_mindays_table()
450 self._min_days_default = 0
451 # britney's "day" begins at 7pm (we want aging to occur in the 22:00Z run and we run Britney 2-4 times a day)
452 # NB: _date_now is used in tests
453 time_now = time.time()
454 if hasattr(self.options, "fake_runtime"):
455 time_now = int(self.options.fake_runtime)
456 self.logger.info("overriding runtime with fake_runtime %d" % time_now)
458 self._date_now = int(((time_now / (60 * 60)) - 19) / 24)
459 self._dates: dict[str, tuple[str, int]] = {}
460 self._urgencies: dict[str, str] = {}
461 self._default_urgency: str = self.options.default_urgency
462 self._penalty_immune_urgencies: frozenset[str] = frozenset()
463 if hasattr(self.options, "no_penalties"):
464 self._penalty_immune_urgencies = frozenset(
465 x.strip() for x in self.options.no_penalties.split()
466 )
467 self._bounty_min_age: int | None = None # initialised later
469 def _generate_mindays_table(self) -> dict[str, int]:
470 mindays: dict[str, int] = {}
471 for k in dir(self.options):
472 if not k.startswith("mindays_"):
473 continue
474 v = getattr(self.options, k)
475 try:
476 as_days = int(v)
477 except ValueError:
478 raise ValueError(
479 "Unable to parse "
480 + k
481 + " as a number of days. Must be 0 or a positive integer"
482 )
483 if as_days < 0: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true
484 raise ValueError(
485 "The value of " + k + " must be zero or a positive integer"
486 )
487 mindays[k.split("_")[1]] = as_days
488 return mindays
490 def register_hints(self, hint_parser: HintParser) -> None:
491 hint_parser.register_hint_type(
492 HintType(
493 "age-days",
494 simple_policy_hint_parser_function(AgeDayHint, int),
495 min_args=2,
496 )
497 )
498 hint_parser.register_hint_type(HintType("urgent"))
500 def initialise(self, britney: "Britney") -> None:
501 super().initialise(britney)
502 self._read_dates_file()
503 self._read_urgencies_file()
504 if self._default_urgency not in self._min_days: # pragma: no cover
505 raise ValueError(
506 "Missing age-requirement for default urgency (MINDAYS_%s)"
507 % self._default_urgency
508 )
509 self._min_days_default = self._min_days[self._default_urgency]
510 try:
511 self._bounty_min_age = int(self.options.bounty_min_age)
512 except ValueError: 512 ↛ 513line 512 didn't jump to line 513 because the exception caught by line 512 didn't happen
513 if self.options.bounty_min_age in self._min_days:
514 self._bounty_min_age = self._min_days[self.options.bounty_min_age]
515 else: # pragma: no cover
516 raise ValueError(
517 "Please fix BOUNTY_MIN_AGE in the britney configuration"
518 )
519 except AttributeError:
520 # The option wasn't defined in the configuration
521 self._bounty_min_age = 0
523 def save_state(self, britney: "Britney") -> None:
524 super().save_state(britney)
525 self._write_dates_file()
527 def apply_src_policy_impl(
528 self,
529 age_info: dict[str, Any],
530 item: MigrationItem,
531 source_data_tdist: SourcePackage | None,
532 source_data_srcdist: SourcePackage,
533 excuse: "Excuse",
534 ) -> PolicyVerdict:
535 # retrieve the urgency for the upload, ignoring it if this is a NEW package
536 # (not present in the target suite)
537 source_name = item.package
538 urgency = self._urgencies.get(source_name, self._default_urgency)
540 if urgency not in self._min_days: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true
541 age_info["unknown-urgency"] = urgency
542 urgency = self._default_urgency
544 if not source_data_tdist:
545 if self._min_days[urgency] < self._min_days_default:
546 age_info["urgency-reduced"] = {
547 "from": urgency,
548 "to": self._default_urgency,
549 }
550 urgency = self._default_urgency
552 if source_name not in self._dates:
553 self._dates[source_name] = (source_data_srcdist.version, self._date_now)
554 elif self._dates[source_name][0] != source_data_srcdist.version:
555 self._dates[source_name] = (source_data_srcdist.version, self._date_now)
557 days_old = self._date_now - self._dates[source_name][1]
558 min_days = self._min_days[urgency]
559 for bounty in excuse.bounty:
560 if excuse.bounty[bounty]: 560 ↛ 559line 560 didn't jump to line 559 because the condition on line 560 was always true
561 self.logger.info(
562 "Applying bounty for %s granted by %s: %d days",
563 source_name,
564 bounty,
565 excuse.bounty[bounty],
566 )
567 excuse.addinfo(
568 "Required age reduced by %d days because of %s"
569 % (excuse.bounty[bounty], bounty)
570 )
571 assert excuse.bounty[bounty] > 0, "negative bounties shouldn't happen"
572 min_days -= excuse.bounty[bounty]
573 if urgency not in self._penalty_immune_urgencies:
574 for penalty in excuse.penalty:
575 if excuse.penalty[penalty]: 575 ↛ 574line 575 didn't jump to line 574 because the condition on line 575 was always true
576 self.logger.info(
577 "Applying penalty for %s given by %s: %d days",
578 source_name,
579 penalty,
580 excuse.penalty[penalty],
581 )
582 excuse.addinfo(
583 "Required age increased by %d days because of %s"
584 % (excuse.penalty[penalty], penalty)
585 )
586 assert (
587 excuse.penalty[penalty] > 0
588 ), "negative penalties should be handled earlier"
589 min_days += excuse.penalty[penalty]
591 assert self._bounty_min_age is not None
592 # the age in BOUNTY_MIN_AGE can be higher than the one associated with
593 # the real urgency, so don't forget to take it into account
594 bounty_min_age = min(self._bounty_min_age, self._min_days[urgency])
595 if min_days < bounty_min_age:
596 min_days = bounty_min_age
597 excuse.addinfo(
598 "Required age is not allowed to drop below %d days" % min_days
599 )
601 age_info["current-age"] = days_old
603 assert self.hints is not None
604 for age_days_hint in cast(
605 "list[AgeDayHint]",
606 self.hints.search(
607 "age-days", package=source_name, version=source_data_srcdist.version
608 ),
609 ):
610 new_req = age_days_hint.days
611 age_info["age-requirement-reduced"] = {
612 "new-requirement": new_req,
613 "changed-by": age_days_hint.user,
614 }
615 if "original-age-requirement" not in age_info: 615 ↛ 617line 615 didn't jump to line 617 because the condition on line 615 was always true
616 age_info["original-age-requirement"] = min_days
617 min_days = new_req
619 age_info["age-requirement"] = min_days
620 res = PolicyVerdict.PASS
622 if days_old < min_days:
623 urgent_hints = self.hints.search(
624 "urgent", package=source_name, version=source_data_srcdist.version
625 )
626 if urgent_hints:
627 age_info["age-requirement-reduced"] = {
628 "new-requirement": 0,
629 "changed-by": urgent_hints[0].user,
630 }
631 res = PolicyVerdict.PASS_HINTED
632 else:
633 res = PolicyVerdict.REJECTED_TEMPORARILY
635 # update excuse
636 age_hint = age_info.get("age-requirement-reduced", None)
637 age_min_req = age_info["age-requirement"]
638 if age_hint:
639 new_req = age_hint["new-requirement"]
640 who = age_hint["changed-by"]
641 if new_req:
642 excuse.addinfo(
643 "Overriding age needed from %d days to %d by %s"
644 % (age_min_req, new_req, who)
645 )
646 age_min_req = new_req
647 else:
648 excuse.addinfo("Too young, but urgency pushed by %s" % who)
649 age_min_req = 0
650 excuse.setdaysold(age_info["current-age"], age_min_req)
652 if age_min_req == 0:
653 excuse.addinfo("%d days old" % days_old)
654 elif days_old < age_min_req:
655 excuse.add_verdict_info(
656 res, "Too young, only %d of %d days old" % (days_old, age_min_req)
657 )
658 else:
659 excuse.addinfo("%d days old (needed %d days)" % (days_old, age_min_req))
661 return res
663 def _read_dates_file(self) -> None:
664 """Parse the dates file"""
665 dates = self._dates
666 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Dates")
667 using_new_name = False
668 try:
669 filename = os.path.join(self.state_dir, "age-policy-dates")
670 if not os.path.exists(filename) and os.path.exists(fallback_filename): 670 ↛ 671line 670 didn't jump to line 671 because the condition on line 670 was never true
671 filename = fallback_filename
672 else:
673 using_new_name = True
674 except AttributeError:
675 if os.path.exists(fallback_filename):
676 filename = fallback_filename
677 else:
678 raise RuntimeError("Please set STATE_DIR in the britney configuration")
680 try:
681 with open(filename, encoding="utf-8") as fd:
682 for line in fd:
683 if line.startswith("#"):
684 # Ignore comment lines (mostly used for tests)
685 continue
686 # <source> <version> <date>)
687 ln = line.split()
688 if len(ln) != 3: # pragma: no cover
689 continue
690 try:
691 dates[ln[0]] = (ln[1], int(ln[2]))
692 except ValueError: # pragma: no cover
693 pass
694 except FileNotFoundError:
695 if not using_new_name: 695 ↛ 697line 695 didn't jump to line 697 because the condition on line 695 was never true
696 # If we using the legacy name, then just give up
697 raise
698 self.logger.info("%s does not appear to exist. Creating it", filename)
699 with open(filename, mode="x", encoding="utf-8"):
700 pass
702 def _read_urgencies_file(self) -> None:
703 urgencies = self._urgencies
704 min_days_default = self._min_days_default
705 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Urgency")
706 try:
707 filename = os.path.join(self.state_dir, "age-policy-urgencies")
708 if not os.path.exists(filename) and os.path.exists(fallback_filename): 708 ↛ 709line 708 didn't jump to line 709 because the condition on line 708 was never true
709 filename = fallback_filename
710 except AttributeError:
711 filename = fallback_filename
713 sources_s = self.suite_info.primary_source_suite.sources
714 sources_t = self.suite_info.target_suite.sources
716 with open(filename, errors="surrogateescape", encoding="ascii") as fd:
717 for line in fd:
718 if line.startswith("#"):
719 # Ignore comment lines (mostly used for tests)
720 continue
721 # <source> <version> <urgency>
722 ln = line.split()
723 if len(ln) != 3: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true
724 continue
726 # read the minimum days associated with the urgencies
727 urgency_old = urgencies.get(ln[0], None)
728 mindays_old = self._min_days.get(urgency_old, 1000) # type: ignore[arg-type]
729 mindays_new = self._min_days.get(ln[2], min_days_default)
731 # if the new urgency is lower (so the min days are higher), do nothing
732 if mindays_old <= mindays_new:
733 continue
735 # if the package exists in the target suite and it is more recent, do nothing
736 tsrcv = sources_t.get(ln[0], None)
737 if tsrcv and apt_pkg.version_compare(tsrcv.version, ln[1]) >= 0:
738 continue
740 # if the package doesn't exist in the primary source suite or it is older, do nothing
741 usrcv = sources_s.get(ln[0], None)
742 if not usrcv or apt_pkg.version_compare(usrcv.version, ln[1]) < 0: 742 ↛ 743line 742 didn't jump to line 743 because the condition on line 742 was never true
743 continue
745 # update the urgency for the package
746 urgencies[ln[0]] = ln[2]
748 def _write_dates_file(self) -> None:
749 dates = self._dates
750 try:
751 directory = self.state_dir
752 basename = "age-policy-dates"
753 old_file = os.path.join(self.suite_info.target_suite.path, "Dates")
754 except AttributeError:
755 directory = self.suite_info.target_suite.path
756 basename = "Dates"
757 old_file = None
758 filename = os.path.join(directory, basename)
759 filename_tmp = os.path.join(directory, "%s_new" % basename)
760 with open(filename_tmp, "w", encoding="utf-8") as fd:
761 for pkg in sorted(dates):
762 version, date = dates[pkg]
763 fd.write("%s %s %d\n" % (pkg, version, date))
764 os.rename(filename_tmp, filename)
765 if old_file is not None and os.path.exists(old_file): 765 ↛ 766line 765 didn't jump to line 766 because the condition on line 765 was never true
766 self.logger.info("Removing old age-policy-dates file %s", old_file)
767 os.unlink(old_file)
770class RCBugPolicy(AbstractBasePolicy):
771 """RC bug regression policy for source migrations
773 The RCBugPolicy will read provided list of RC bugs and block any
774 source upload that would introduce a *new* RC bug in the target
775 suite.
777 The RCBugPolicy's decision is influenced by the following:
779 State files:
780 * ${STATE_DIR}/rc-bugs-${SUITE_NAME}: File containing RC bugs for packages in
781 the given suite (one for both primary source suite and the target sutie is
782 needed).
783 - These files need to be updated externally.
784 """
786 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
787 super().__init__(
788 "rc-bugs", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
789 )
790 self._bugs_source: dict[str, set[str]] | None = None
791 self._bugs_target: dict[str, set[str]] | None = None
793 def register_hints(self, hint_parser: HintParser) -> None:
794 f = simple_policy_hint_parser_function(
795 IgnoreRCBugHint, lambda x: frozenset(x.split(","))
796 )
797 hint_parser.register_hint_type(HintType("ignore-rc-bugs", f, min_args=2))
799 def initialise(self, britney: "Britney") -> None:
800 super().initialise(britney)
801 source_suite = self.suite_info.primary_source_suite
802 target_suite = self.suite_info.target_suite
803 fallback_unstable = os.path.join(source_suite.path, "BugsV")
804 fallback_testing = os.path.join(target_suite.path, "BugsV")
805 try:
806 filename_unstable = os.path.join(
807 self.state_dir, "rc-bugs-%s" % source_suite.name
808 )
809 filename_testing = os.path.join(
810 self.state_dir, "rc-bugs-%s" % target_suite.name
811 )
812 if ( 812 ↛ 818line 812 didn't jump to line 818
813 not os.path.exists(filename_unstable)
814 and not os.path.exists(filename_testing)
815 and os.path.exists(fallback_unstable)
816 and os.path.exists(fallback_testing)
817 ):
818 filename_unstable = fallback_unstable
819 filename_testing = fallback_testing
820 except AttributeError:
821 filename_unstable = fallback_unstable
822 filename_testing = fallback_testing
823 self._bugs_source = self._read_bugs(filename_unstable)
824 self._bugs_target = self._read_bugs(filename_testing)
826 def apply_src_policy_impl(
827 self,
828 rcbugs_info: dict[str, Any],
829 item: MigrationItem,
830 source_data_tdist: SourcePackage | None,
831 source_data_srcdist: SourcePackage,
832 excuse: "Excuse",
833 ) -> PolicyVerdict:
834 assert self._bugs_source is not None # for type checking
835 assert self._bugs_target is not None # for type checking
836 bugs_t = set()
837 bugs_s = set()
838 source_name = item.package
839 binaries_s = {x[0] for x in source_data_srcdist.binaries}
840 try:
841 binaries_t = {x[0] for x in source_data_tdist.binaries} # type: ignore[union-attr]
842 except AttributeError:
843 binaries_t = set()
845 src_key = f"src:{source_name}"
846 if source_data_tdist and src_key in self._bugs_target:
847 bugs_t.update(self._bugs_target[src_key])
848 if src_key in self._bugs_source:
849 bugs_s.update(self._bugs_source[src_key])
851 for pkg in binaries_s:
852 if pkg in self._bugs_source:
853 bugs_s |= self._bugs_source[pkg]
854 for pkg in binaries_t:
855 if pkg in self._bugs_target:
856 bugs_t |= self._bugs_target[pkg]
858 # The bts seems to support filing source bugs against a binary of the
859 # same name if that binary isn't built by any source. An example is bug
860 # 820347 against Package: juce (in the live-2016-04-11 test). Add those
861 # bugs too.
862 if (
863 source_name not in (binaries_s | binaries_t)
864 and source_name
865 not in {
866 x.package_name
867 for x in self.suite_info.primary_source_suite.all_binaries_in_suite.keys()
868 }
869 and source_name
870 not in {
871 x.package_name
872 for x in self.suite_info.target_suite.all_binaries_in_suite.keys()
873 }
874 ):
875 if source_name in self._bugs_source:
876 bugs_s |= self._bugs_source[source_name]
877 if source_name in self._bugs_target: 877 ↛ 878line 877 didn't jump to line 878 because the condition on line 877 was never true
878 bugs_t |= self._bugs_target[source_name]
880 # If a package is not in the target suite, it has no RC bugs per
881 # definition. Unfortunately, it seems that the live-data is
882 # not always accurate (e.g. live-2011-12-13 suggests that
883 # obdgpslogger had the same bug in testing and unstable,
884 # but obdgpslogger was not in testing at that time).
885 # - For the curious, obdgpslogger was removed on that day
886 # and the BTS probably had not caught up with that fact.
887 # (https://tracker.debian.org/news/415935)
888 assert not bugs_t or source_data_tdist, (
889 "%s had bugs in the target suite but is not present" % source_name
890 )
892 verdict = PolicyVerdict.PASS
894 assert self.hints is not None
895 for ignore_hint in cast(
896 list[IgnoreRCBugHint],
897 self.hints.search(
898 "ignore-rc-bugs",
899 package=source_name,
900 version=source_data_srcdist.version,
901 ),
902 ):
903 ignored_bugs = ignore_hint.ignored_rcbugs
905 # Only handle one hint for now
906 if "ignored-bugs" in rcbugs_info:
907 self.logger.info(
908 "Ignoring ignore-rc-bugs hint from %s on %s due to another hint from %s",
909 ignore_hint.user,
910 source_name,
911 rcbugs_info["ignored-bugs"]["issued-by"],
912 )
913 continue
914 if not ignored_bugs.isdisjoint(bugs_s): 914 ↛ 923line 914 didn't jump to line 923 because the condition on line 914 was always true
915 bugs_s -= ignored_bugs
916 bugs_t -= ignored_bugs
917 rcbugs_info["ignored-bugs"] = {
918 "bugs": sorted(ignored_bugs),
919 "issued-by": ignore_hint.user,
920 }
921 verdict = PolicyVerdict.PASS_HINTED
922 else:
923 self.logger.info(
924 "Ignoring ignore-rc-bugs hint from %s on %s as none of %s affect the package",
925 ignore_hint.user,
926 source_name,
927 str(ignored_bugs),
928 )
930 rcbugs_info["shared-bugs"] = sorted(bugs_s & bugs_t)
931 rcbugs_info["unique-source-bugs"] = sorted(bugs_s - bugs_t)
932 rcbugs_info["unique-target-bugs"] = sorted(bugs_t - bugs_s)
934 # update excuse
935 new_bugs = rcbugs_info["unique-source-bugs"]
936 old_bugs = rcbugs_info["unique-target-bugs"]
937 excuse.setbugs(old_bugs, new_bugs)
939 if new_bugs:
940 verdict = PolicyVerdict.REJECTED_PERMANENTLY
941 excuse.add_verdict_info(
942 verdict,
943 "Updating %s would introduce bugs in %s: %s"
944 % (
945 source_name,
946 self.suite_info.target_suite.name,
947 ", ".join(
948 [
949 '<a href="https://bugs.debian.org/%s">#%s</a>'
950 % (quote(a), a)
951 for a in new_bugs
952 ]
953 ),
954 ),
955 )
957 if old_bugs:
958 excuse.addinfo(
959 "Updating %s will fix bugs in %s: %s"
960 % (
961 source_name,
962 self.suite_info.target_suite.name,
963 ", ".join(
964 [
965 '<a href="https://bugs.debian.org/%s">#%s</a>'
966 % (quote(a), a)
967 for a in old_bugs
968 ]
969 ),
970 )
971 )
973 return verdict
975 def _read_bugs(self, filename: str) -> dict[str, set[str]]:
976 """Read the release critical bug summary from the specified file
978 The file contains rows with the format:
980 <package-name> <bug number>[,<bug number>...]
982 The method returns a dictionary where the key is the binary package
983 name and the value is the list of open RC bugs for it.
984 """
985 bugs: dict[str, set[str]] = {}
986 self.logger.info("Loading RC bugs data from %s", filename)
987 with open(filename, encoding="ascii") as f:
988 for line in f:
989 ln = line.split()
990 if len(ln) != 2: # pragma: no cover
991 self.logger.warning("Malformed line found in line %s", line)
992 continue
993 pkg = ln[0]
994 if pkg not in bugs:
995 bugs[pkg] = set()
996 bugs[pkg].update(ln[1].split(","))
997 return bugs
1000class PiupartsPolicy(AbstractBasePolicy):
1001 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1002 super().__init__(
1003 "piuparts", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
1004 )
1005 self._piuparts_source: dict[str, tuple[str, str]] | None = None
1006 self._piuparts_target: dict[str, tuple[str, str]] | None = None
1008 def register_hints(self, hint_parser: HintParser) -> None:
1009 hint_parser.register_hint_type(HintType("ignore-piuparts"))
1011 def initialise(self, britney: "Britney") -> None:
1012 super().initialise(britney)
1013 source_suite = self.suite_info.primary_source_suite
1014 target_suite = self.suite_info.target_suite
1015 try:
1016 filename_unstable = os.path.join(
1017 self.state_dir, "piuparts-summary-%s.json" % source_suite.name
1018 )
1019 filename_testing = os.path.join(
1020 self.state_dir, "piuparts-summary-%s.json" % target_suite.name
1021 )
1022 except AttributeError as e: # pragma: no cover
1023 raise RuntimeError(
1024 "Please set STATE_DIR in the britney configuration"
1025 ) from e
1026 self._piuparts_source = self._read_piuparts_summary(
1027 filename_unstable, keep_url=True
1028 )
1029 self._piuparts_target = self._read_piuparts_summary(
1030 filename_testing, keep_url=False
1031 )
1033 def apply_src_policy_impl(
1034 self,
1035 piuparts_info: dict[str, Any],
1036 item: MigrationItem,
1037 source_data_tdist: SourcePackage | None,
1038 source_data_srcdist: SourcePackage,
1039 excuse: "Excuse",
1040 ) -> PolicyVerdict:
1041 assert self._piuparts_source is not None # for type checking
1042 assert self._piuparts_target is not None # for type checking
1043 source_name = item.package
1045 if source_name in self._piuparts_target:
1046 testing_state = self._piuparts_target[source_name][0]
1047 else:
1048 testing_state = "X"
1049 url: str | None
1050 if source_name in self._piuparts_source:
1051 unstable_state, url = self._piuparts_source[source_name]
1052 else:
1053 unstable_state = "X"
1054 url = None
1055 url_html = "(no link yet)"
1056 if url is not None:
1057 url_html = '<a href="{0}">{0}</a>'.format(url)
1059 if unstable_state == "P":
1060 # Not a regression
1061 msg = f"Piuparts tested OK - {url_html}"
1062 result = PolicyVerdict.PASS
1063 piuparts_info["test-results"] = "pass"
1064 elif unstable_state == "F":
1065 if testing_state != unstable_state:
1066 piuparts_info["test-results"] = "regression"
1067 msg = f"Piuparts regression - {url_html}"
1068 result = PolicyVerdict.REJECTED_PERMANENTLY
1069 else:
1070 piuparts_info["test-results"] = "failed"
1071 msg = f"Piuparts failure (not a regression) - {url_html}"
1072 result = PolicyVerdict.PASS
1073 elif unstable_state == "W":
1074 msg = f"Piuparts check waiting for test results - {url_html}"
1075 result = PolicyVerdict.REJECTED_TEMPORARILY
1076 piuparts_info["test-results"] = "waiting-for-test-results"
1077 else:
1078 msg = f"Piuparts can't test {source_name} (not a blocker) - {url_html}"
1079 piuparts_info["test-results"] = "cannot-be-tested"
1080 result = PolicyVerdict.PASS
1082 if url is not None:
1083 piuparts_info["piuparts-test-url"] = url
1084 if result.is_rejected:
1085 excuse.add_verdict_info(result, msg)
1086 else:
1087 excuse.addinfo(msg)
1089 if result.is_rejected:
1090 assert self.hints is not None
1091 for ignore_hint in self.hints.search(
1092 "ignore-piuparts",
1093 package=source_name,
1094 version=source_data_srcdist.version,
1095 ):
1096 piuparts_info["ignored-piuparts"] = {"issued-by": ignore_hint.user}
1097 result = PolicyVerdict.PASS_HINTED
1098 excuse.addinfo(
1099 f"Piuparts issue ignored as requested by {ignore_hint.user}"
1100 )
1101 break
1103 return result
1105 def _read_piuparts_summary(
1106 self, filename: str, keep_url: bool = True
1107 ) -> dict[str, tuple[str, str]]:
1108 summary: dict[str, tuple[str, str]] = {}
1109 self.logger.info("Loading piuparts report from %s", filename)
1110 with open(filename) as fd: 1110 ↛ exitline 1110 didn't return from function '_read_piuparts_summary' because the return on line 1112 wasn't executed
1111 if os.fstat(fd.fileno()).st_size < 1: 1111 ↛ 1112line 1111 didn't jump to line 1112 because the condition on line 1111 was never true
1112 return summary
1113 data = json.load(fd)
1114 try:
1115 if (
1116 data["_id"] != "Piuparts Package Test Results Summary"
1117 or data["_version"] != "1.0"
1118 ): # pragma: no cover
1119 raise ValueError(
1120 f"Piuparts results in {filename} does not have the correct ID or version"
1121 )
1122 except KeyError as e: # pragma: no cover
1123 raise ValueError(
1124 f"Piuparts results in {filename} is missing id or version field"
1125 ) from e
1126 for source, suite_data in data["packages"].items():
1127 if len(suite_data) != 1: # pragma: no cover
1128 raise ValueError(
1129 f"Piuparts results in {filename}, the source {source} does not have "
1130 "exactly one result set"
1131 )
1132 item = next(iter(suite_data.values()))
1133 state, _, url = item
1134 if not keep_url:
1135 url = None
1136 summary[source] = (state, url)
1138 return summary
1141class DependsPolicy(AbstractBasePolicy):
1142 pkg_universe: "BinaryPackageUniverse"
1143 broken_packages: frozenset["BinaryPackageId"]
1144 all_binaries: dict["BinaryPackageId", "BinaryPackage"]
1145 allow_uninst: dict[str, set[str | None]]
1147 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1148 super().__init__(
1149 "depends",
1150 options,
1151 suite_info,
1152 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1153 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1154 )
1155 self.nobreakall_arches = None
1156 self.new_arches = None
1157 self.break_arches = None
1159 def initialise(self, britney: "Britney") -> None:
1160 super().initialise(britney)
1161 self.pkg_universe = britney.pkg_universe
1162 self.broken_packages = self.pkg_universe.broken_packages
1163 self.all_binaries = britney.all_binaries
1164 self.nobreakall_arches = self.options.nobreakall_arches
1165 self.new_arches = self.options.new_arches
1166 self.break_arches = self.options.break_arches
1167 self.allow_uninst = britney.allow_uninst
1169 def apply_srcarch_policy_impl(
1170 self,
1171 deps_info: dict[str, Any],
1172 item: MigrationItem,
1173 arch: str,
1174 source_data_tdist: SourcePackage | None,
1175 source_data_srcdist: SourcePackage,
1176 excuse: "Excuse",
1177 ) -> PolicyVerdict:
1178 verdict = PolicyVerdict.PASS
1180 assert self.break_arches is not None
1181 assert self.new_arches is not None
1182 if arch in self.break_arches or arch in self.new_arches:
1183 # we don't check these in the policy (TODO - for now?)
1184 return verdict
1186 source_suite = item.suite
1187 target_suite = self.suite_info.target_suite
1189 packages_s_a = source_suite.binaries[arch]
1190 packages_t_a = target_suite.binaries[arch]
1192 my_bins = sorted(excuse.packages[arch])
1194 arch_all_installable = set()
1195 arch_arch_installable = set()
1196 consider_it_regression = True
1198 for pkg_id in my_bins:
1199 pkg_name = pkg_id.package_name
1200 binary_u = packages_s_a[pkg_name]
1201 pkg_arch = binary_u.architecture
1203 # in some cases, we want to track the uninstallability of a
1204 # package (because the autopkgtest policy uses this), but we still
1205 # want to allow the package to be uninstallable
1206 skip_dep_check = False
1208 if binary_u.source_version != source_data_srcdist.version:
1209 # don't check cruft in unstable
1210 continue
1212 if item.architecture != "source" and pkg_arch == "all":
1213 # we don't care about the existing arch: all binaries when
1214 # checking a binNMU item, because the arch: all binaries won't
1215 # migrate anyway
1216 skip_dep_check = True
1218 if pkg_arch == "all" and arch not in self.nobreakall_arches:
1219 skip_dep_check = True
1221 if pkg_name in self.allow_uninst[arch]: 1221 ↛ 1224line 1221 didn't jump to line 1224 because the condition on line 1221 was never true
1222 # this binary is allowed to become uninstallable, so we don't
1223 # need to check anything
1224 skip_dep_check = True
1226 if pkg_name in packages_t_a:
1227 oldbin = packages_t_a[pkg_name]
1228 if not target_suite.is_installable(oldbin.pkg_id):
1229 # as the current binary in testing is already
1230 # uninstallable, the newer version is allowed to be
1231 # uninstallable as well, so we don't need to check
1232 # anything
1233 skip_dep_check = True
1234 consider_it_regression = False
1236 if pkg_id in self.broken_packages:
1237 if pkg_arch == "all":
1238 arch_all_installable.add(False)
1239 else:
1240 arch_arch_installable.add(False)
1241 # dependencies can't be satisfied by all the known binaries -
1242 # this certainly won't work...
1243 excuse.add_unsatisfiable_on_arch(arch)
1244 if skip_dep_check:
1245 # ...but if the binary is allowed to become uninstallable,
1246 # we don't care
1247 # we still want the binary to be listed as uninstallable,
1248 continue
1249 verdict = PolicyVerdict.REJECTED_PERMANENTLY
1250 excuse.add_verdict_info(
1251 verdict, f"{pkg_name}/{arch} has unsatisfiable dependency"
1252 )
1253 excuse.addreason("depends")
1254 else:
1255 if pkg_arch == "all":
1256 arch_all_installable.add(True)
1257 else:
1258 arch_arch_installable.add(True)
1260 if skip_dep_check:
1261 continue
1263 deps = self.pkg_universe.dependencies_of(pkg_id)
1265 for dep in deps:
1266 # dep is a list of packages, each of which satisfy the
1267 # dependency
1269 if dep == frozenset():
1270 continue
1271 is_ok = False
1272 needed_for_dep = set()
1274 for alternative in dep:
1275 if target_suite.is_pkg_in_the_suite(alternative):
1276 # dep can be satisfied in testing - ok
1277 is_ok = True
1278 elif alternative in my_bins:
1279 # can be satisfied by binary from same item: will be
1280 # ok if item migrates
1281 is_ok = True
1282 else:
1283 needed_for_dep.add(alternative)
1285 if not is_ok:
1286 spec = DependencySpec(DependencyType.DEPENDS, arch)
1287 excuse.add_package_depends(spec, needed_for_dep)
1289 # The autopkgtest policy needs delicate trade offs for
1290 # non-installability. The current choice (considering source
1291 # migration and only binaries built by the version of the
1292 # source):
1293 #
1294 # * Run autopkgtest if all arch:$arch binaries are installable
1295 # (but some or all arch:all binaries are not)
1296 #
1297 # * Don't schedule nor wait for not installable arch:all only package
1298 # on ! NOBREAKALL_ARCHES
1299 #
1300 # * Run autopkgtest if installability isn't a regression (there are (or
1301 # rather, should) not be a lot of packages in this state, and most
1302 # likely they'll just fail quickly)
1303 #
1304 # * Don't schedule, but wait otherwise
1305 if arch_arch_installable == {True} and False in arch_all_installable:
1306 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch)
1307 elif (
1308 arch not in self.nobreakall_arches
1309 and arch_arch_installable == set()
1310 and False in arch_all_installable
1311 ):
1312 deps_info.setdefault("arch_all_not_installable", []).append(arch)
1313 elif not consider_it_regression:
1314 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch)
1316 return verdict
1319@unique
1320class BuildDepResult(IntEnum):
1321 # relation is satisfied in target
1322 OK = 1
1323 # relation can be satisfied by other packages in source
1324 DEPENDS = 2
1325 # relation cannot be satisfied
1326 FAILED = 3
1329class BuildDependsPolicy(AbstractBasePolicy):
1331 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1332 super().__init__(
1333 "build-depends",
1334 options,
1335 suite_info,
1336 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1337 )
1338 self._all_buildarch: list[str] = []
1340 parse_option(options, "all_buildarch")
1342 def initialise(self, britney: "Britney") -> None:
1343 super().initialise(britney)
1344 if self.options.all_buildarch:
1345 self._all_buildarch = SuiteContentLoader.config_str_as_list(
1346 self.options.all_buildarch, []
1347 )
1349 def apply_src_policy_impl(
1350 self,
1351 build_deps_info: dict[str, Any],
1352 item: MigrationItem,
1353 source_data_tdist: SourcePackage | None,
1354 source_data_srcdist: SourcePackage,
1355 excuse: "Excuse",
1356 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers,
1357 ) -> PolicyVerdict:
1358 verdict = PolicyVerdict.PASS
1360 # analyze the dependency fields (if present)
1361 if deps := source_data_srcdist.build_deps_arch:
1362 v = self._check_build_deps(
1363 deps,
1364 DependencyType.BUILD_DEPENDS,
1365 build_deps_info,
1366 item,
1367 source_data_tdist,
1368 source_data_srcdist,
1369 excuse,
1370 get_dependency_solvers=get_dependency_solvers,
1371 )
1372 verdict = PolicyVerdict.worst_of(verdict, v)
1374 if ideps := source_data_srcdist.build_deps_indep:
1375 v = self._check_build_deps(
1376 ideps,
1377 DependencyType.BUILD_DEPENDS_INDEP,
1378 build_deps_info,
1379 item,
1380 source_data_tdist,
1381 source_data_srcdist,
1382 excuse,
1383 get_dependency_solvers=get_dependency_solvers,
1384 )
1385 verdict = PolicyVerdict.worst_of(verdict, v)
1387 return verdict
1389 def _get_check_archs(
1390 self, archs: Container[str], dep_type: DependencyType
1391 ) -> list[str]:
1392 oos = self.options.outofsync_arches
1394 if dep_type == DependencyType.BUILD_DEPENDS:
1395 return [
1396 arch
1397 for arch in self.options.architectures
1398 if arch in archs and arch not in oos
1399 ]
1401 # first try the all buildarch
1402 checkarchs = list(self._all_buildarch)
1403 # then try the architectures where this source has arch specific
1404 # binaries (in the order of the architecture config file)
1405 checkarchs.extend(
1406 arch
1407 for arch in self.options.architectures
1408 if arch in archs and arch not in checkarchs
1409 )
1410 # then try all other architectures
1411 checkarchs.extend(
1412 arch for arch in self.options.architectures if arch not in checkarchs
1413 )
1415 # and drop OUTOFSYNC_ARCHES
1416 return [arch for arch in checkarchs if arch not in oos]
1418 def _add_info_for_arch(
1419 self,
1420 arch: str,
1421 excuses_info: dict[str, list[str]],
1422 blockers: dict[str, set[BinaryPackageId]],
1423 results: dict[str, BuildDepResult],
1424 dep_type: DependencyType,
1425 target_suite: TargetSuite,
1426 source_suite: Suite,
1427 excuse: "Excuse",
1428 verdict: PolicyVerdict,
1429 ) -> PolicyVerdict:
1430 if arch in blockers:
1431 packages = blockers[arch]
1433 # for the solving packages, update the excuse to add the dependencies
1434 for p in packages:
1435 if arch not in self.options.break_arches: 1435 ↛ 1434line 1435 didn't jump to line 1434 because the condition on line 1435 was always true
1436 spec = DependencySpec(dep_type, arch)
1437 excuse.add_package_depends(spec, {p})
1439 if arch in results and results[arch] == BuildDepResult.FAILED:
1440 verdict = PolicyVerdict.worst_of(
1441 verdict, PolicyVerdict.REJECTED_PERMANENTLY
1442 )
1444 if arch in excuses_info:
1445 for excuse_text in excuses_info[arch]:
1446 if verdict.is_rejected: 1446 ↛ 1449line 1446 didn't jump to line 1449 because the condition on line 1446 was always true
1447 excuse.add_verdict_info(verdict, excuse_text)
1448 else:
1449 excuse.addinfo(excuse_text)
1451 return verdict
1453 def _check_build_deps(
1454 self,
1455 deps: str,
1456 dep_type: DependencyType,
1457 build_deps_info: dict[str, Any],
1458 item: MigrationItem,
1459 source_data_tdist: SourcePackage | None,
1460 source_data_srcdist: SourcePackage,
1461 excuse: "Excuse",
1462 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers,
1463 ) -> PolicyVerdict:
1464 verdict = PolicyVerdict.PASS
1465 any_arch_ok = dep_type == DependencyType.BUILD_DEPENDS_INDEP
1467 britney = self.britney
1469 # local copies for better performance
1470 parse_src_depends = apt_pkg.parse_src_depends
1472 source_name = item.package
1473 source_suite = item.suite
1474 target_suite = self.suite_info.target_suite
1475 binaries_s = source_suite.binaries
1476 provides_s = source_suite.provides_table
1477 binaries_t = target_suite.binaries
1478 provides_t = target_suite.provides_table
1479 unsat_bd: dict[str, list[str]] = {}
1480 relevant_archs: set[str] = {
1481 binary.architecture
1482 for binary in source_data_srcdist.binaries
1483 if britney.all_binaries[binary].architecture != "all"
1484 }
1486 excuses_info: dict[str, list[str]] = defaultdict(list)
1487 blockers: dict[str, set[BinaryPackageId]] = defaultdict(set)
1488 arch_results = {}
1489 result_archs = defaultdict(list)
1490 bestresult = BuildDepResult.FAILED
1491 check_archs = self._get_check_archs(relevant_archs, dep_type)
1492 if not check_archs:
1493 # when the arch list is empty, we check the b-d on any arch, instead of all archs
1494 # this happens for Build-Depens on a source package that only produces arch: all binaries
1495 any_arch_ok = True
1496 check_archs = self._get_check_archs(
1497 self.options.architectures, DependencyType.BUILD_DEPENDS_INDEP
1498 )
1500 for arch in check_archs:
1501 # retrieve the binary package from the specified suite and arch
1502 binaries_s_a = binaries_s[arch]
1503 provides_s_a = provides_s[arch]
1504 binaries_t_a = binaries_t[arch]
1505 provides_t_a = provides_t[arch]
1506 arch_results[arch] = BuildDepResult.OK
1507 # for every dependency block (formed as conjunction of disjunction)
1508 for block_txt in deps.split(","):
1509 block_list = parse_src_depends(block_txt, False, arch)
1510 # Unlike regular dependencies, some clauses of the Build-Depends(-Arch|-Indep) can be
1511 # filtered out by (e.g.) architecture restrictions. We need to cope with this while
1512 # keeping block_txt and block aligned.
1513 if not block_list:
1514 # Relation is not relevant for this architecture.
1515 continue
1516 block = block_list[0]
1517 # if the block is satisfied in the target suite, then skip the block
1518 if get_dependency_solvers(
1519 block, binaries_t_a, provides_t_a, build_depends=True
1520 ):
1521 # Satisfied in the target suite; all ok.
1522 continue
1524 # check if the block can be satisfied in the source suite, and list the solving packages
1525 packages = get_dependency_solvers(
1526 block, binaries_s_a, provides_s_a, build_depends=True
1527 )
1528 sources = sorted(p.source for p in packages)
1530 # if the dependency can be satisfied by the same source package, skip the block:
1531 # obviously both binary packages will enter the target suite together
1532 if source_name in sources: 1532 ↛ 1533line 1532 didn't jump to line 1533 because the condition on line 1532 was never true
1533 continue
1535 # if no package can satisfy the dependency, add this information to the excuse
1536 if not packages:
1537 excuses_info[arch].append(
1538 "%s unsatisfiable %s on %s: %s"
1539 % (source_name, dep_type, arch, block_txt.strip())
1540 )
1541 if arch not in unsat_bd: 1541 ↛ 1543line 1541 didn't jump to line 1543 because the condition on line 1541 was always true
1542 unsat_bd[arch] = []
1543 unsat_bd[arch].append(block_txt.strip())
1544 arch_results[arch] = BuildDepResult.FAILED
1545 continue
1547 blockers[arch].update(p.pkg_id for p in packages)
1548 if arch_results[arch] < BuildDepResult.DEPENDS:
1549 arch_results[arch] = BuildDepResult.DEPENDS
1551 if any_arch_ok:
1552 if arch_results[arch] < bestresult:
1553 bestresult = arch_results[arch]
1554 result_archs[arch_results[arch]].append(arch)
1555 if bestresult == BuildDepResult.OK:
1556 # we found an architecture where the b-deps-indep are
1557 # satisfied in the target suite, so we can stop
1558 break
1560 if any_arch_ok:
1561 arch = result_archs[bestresult][0]
1562 excuse.add_detailed_info(f"Checking {dep_type.get_description()} on {arch}")
1563 key = "check-%s-on-arch" % dep_type.get_reason()
1564 build_deps_info[key] = arch
1565 verdict = self._add_info_for_arch(
1566 arch,
1567 excuses_info,
1568 blockers,
1569 arch_results,
1570 dep_type,
1571 target_suite,
1572 source_suite,
1573 excuse,
1574 verdict,
1575 )
1577 else:
1578 for arch in check_archs:
1579 verdict = self._add_info_for_arch(
1580 arch,
1581 excuses_info,
1582 blockers,
1583 arch_results,
1584 dep_type,
1585 target_suite,
1586 source_suite,
1587 excuse,
1588 verdict,
1589 )
1591 if unsat_bd:
1592 build_deps_info["unsatisfiable-arch-build-depends"] = unsat_bd
1594 return verdict
1597class BuiltUsingPolicy(AbstractBasePolicy):
1598 """Built-Using policy
1600 Binaries that incorporate (part of) another source package must list these
1601 sources under 'Built-Using'.
1603 This policy checks if the corresponding sources are available in the
1604 target suite. If they are not, but they are candidates for migration, a
1605 dependency is added.
1607 If the binary incorporates a newer version of a source, that is not (yet)
1608 a candidate, we don't want to accept that binary. A rebuild later in the
1609 primary suite wouldn't fix the issue, because that would incorporate the
1610 newer version again.
1612 If the binary incorporates an older version of the source, a newer version
1613 will be accepted as a replacement. We assume that this can be fixed by
1614 rebuilding the binary at some point during the development cycle.
1616 Requiring exact version of the source would not be useful in practice. A
1617 newer upload of that source wouldn't be blocked by this policy, so the
1618 built-using would be outdated anyway.
1620 """
1622 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1623 super().__init__(
1624 "built-using",
1625 options,
1626 suite_info,
1627 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1628 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1629 )
1631 def initialise(self, britney: "Britney") -> None:
1632 super().initialise(britney)
1634 def apply_srcarch_policy_impl(
1635 self,
1636 build_deps_info: dict[str, Any],
1637 item: MigrationItem,
1638 arch: str,
1639 source_data_tdist: SourcePackage | None,
1640 source_data_srcdist: SourcePackage,
1641 excuse: "Excuse",
1642 ) -> PolicyVerdict:
1643 verdict = PolicyVerdict.PASS
1645 source_suite = item.suite
1646 target_suite = self.suite_info.target_suite
1647 binaries_s = source_suite.binaries
1649 def check_bu_in_suite(
1650 bu_source: str, bu_version: str, source_suite: Suite
1651 ) -> bool:
1652 found = False
1653 if bu_source not in source_suite.sources:
1654 return found
1655 s_source = source_suite.sources[bu_source]
1656 s_ver = s_source.version
1657 if apt_pkg.version_compare(s_ver, bu_version) >= 0:
1658 found = True
1659 dep = PackageId(bu_source, s_ver, "source")
1660 if arch in self.options.break_arches:
1661 excuse.add_detailed_info(
1662 "Ignoring Built-Using for %s/%s on %s"
1663 % (pkg_name, arch, dep.uvname)
1664 )
1665 else:
1666 spec = DependencySpec(DependencyType.BUILT_USING, arch)
1667 excuse.add_package_depends(spec, {dep})
1668 excuse.add_detailed_info(
1669 f"{pkg_name}/{arch} has Built-Using on {dep.uvname}"
1670 )
1672 return found
1674 for pkg_id in sorted(
1675 x for x in source_data_srcdist.binaries if x.architecture == arch
1676 ):
1677 pkg_name = pkg_id.package_name
1679 # retrieve the testing (if present) and unstable corresponding binary packages
1680 binary_s = binaries_s[arch][pkg_name]
1682 for bu in binary_s.builtusing:
1683 bu_source = bu[0]
1684 bu_version = bu[1]
1685 found = False
1686 if bu_source in target_suite.sources:
1687 t_source = target_suite.sources[bu_source]
1688 t_ver = t_source.version
1689 if apt_pkg.version_compare(t_ver, bu_version) >= 0:
1690 found = True
1692 if not found:
1693 found = check_bu_in_suite(bu_source, bu_version, source_suite)
1695 if not found and source_suite.suite_class.is_additional_source:
1696 found = check_bu_in_suite(
1697 bu_source, bu_version, self.suite_info.primary_source_suite
1698 )
1700 if not found:
1701 if arch in self.options.break_arches:
1702 excuse.add_detailed_info(
1703 "Ignoring unsatisfiable Built-Using for %s/%s on %s %s"
1704 % (pkg_name, arch, bu_source, bu_version)
1705 )
1706 else:
1707 verdict = PolicyVerdict.worst_of(
1708 verdict, PolicyVerdict.REJECTED_PERMANENTLY
1709 )
1710 excuse.add_verdict_info(
1711 verdict,
1712 "%s/%s has unsatisfiable Built-Using on %s %s"
1713 % (pkg_name, arch, bu_source, bu_version),
1714 )
1716 return verdict
1719class BlockPolicy(AbstractBasePolicy):
1720 BLOCK_HINT_REGEX = re.compile("^(un)?(block-?.*)$")
1722 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1723 super().__init__(
1724 "block",
1725 options,
1726 suite_info,
1727 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1728 )
1729 self._blockall: dict[str | None, Hint] = {}
1731 def initialise(self, britney: "Britney") -> None:
1732 super().initialise(britney)
1733 assert self.hints is not None
1734 for hint in self.hints.search(type="block-all"):
1735 self._blockall[hint.package] = hint
1737 self._key_packages = []
1738 if "key" in self._blockall:
1739 self._key_packages = self._read_key_packages()
1741 def _read_key_packages(self) -> list[str]:
1742 """Read the list of key packages
1744 The file contains data in the yaml format :
1746 - reason: <something>
1747 source: <package>
1749 The method returns a list of all key packages.
1750 """
1751 filename = os.path.join(self.state_dir, "key_packages.yaml")
1752 self.logger.info("Loading key packages from %s", filename)
1753 if os.path.exists(filename): 1753 ↛ 1758line 1753 didn't jump to line 1758 because the condition on line 1753 was always true
1754 with open(filename) as f:
1755 data = yaml.safe_load(f)
1756 key_packages = [item["source"] for item in data]
1757 else:
1758 self.logger.error(
1759 "Britney was asked to block key packages, "
1760 + "but no key_packages.yaml file was found."
1761 )
1762 sys.exit(1)
1764 return key_packages
1766 def register_hints(self, hint_parser: HintParser) -> None:
1767 # block related hints are currently defined in hint.py
1768 pass
1770 def _check_blocked(
1771 self, item: "MigrationItem", arch: str, version: str, excuse: "Excuse"
1772 ) -> PolicyVerdict:
1773 verdict = PolicyVerdict.PASS
1774 blocked = {}
1775 unblocked = {}
1776 block_info = {}
1777 source_suite = item.suite
1778 suite_name = source_suite.name
1779 src = item.package
1780 is_primary = source_suite.suite_class == SuiteClass.PRIMARY_SOURCE_SUITE
1782 tooltip = (
1783 "please contact %s-release if update is needed" % self.options.distribution
1784 )
1786 assert self.hints is not None
1787 shints = self.hints.search(package=src)
1788 mismatches = False
1789 r = self.BLOCK_HINT_REGEX
1790 for hint in shints:
1791 m = r.match(hint.type)
1792 if m:
1793 if m.group(1) == "un":
1794 assert hint.suite is not None
1795 if (
1796 hint.version != version
1797 or hint.suite.name != suite_name
1798 or (hint.architecture != arch and hint.architecture != "source")
1799 ):
1800 self.logger.info(
1801 "hint mismatch: %s %s %s", version, arch, suite_name
1802 )
1803 mismatches = True
1804 else:
1805 unblocked[m.group(2)] = hint.user
1806 excuse.add_hint(hint)
1807 else:
1808 # block(-*) hint: only accepts a source, so this will
1809 # always match
1810 blocked[m.group(2)] = hint.user
1811 excuse.add_hint(hint)
1813 if "block" not in blocked and is_primary:
1814 # if there is a specific block hint for this package, we don't
1815 # check for the general hints
1817 if self.options.distribution == "debian": 1817 ↛ 1824line 1817 didn't jump to line 1824 because the condition on line 1817 was always true
1818 url = "https://release.debian.org/testing/freeze_policy.html"
1819 tooltip = (
1820 'Follow the <a href="%s">freeze policy</a> when applying for an unblock'
1821 % url
1822 )
1824 if "source" in self._blockall:
1825 blocked["block"] = self._blockall["source"].user
1826 excuse.add_hint(self._blockall["source"])
1827 elif (
1828 "new-source" in self._blockall
1829 and src not in self.suite_info.target_suite.sources
1830 ):
1831 blocked["block"] = self._blockall["new-source"].user
1832 excuse.add_hint(self._blockall["new-source"])
1833 # no tooltip: new sources will probably not be accepted anyway
1834 block_info["block"] = "blocked by {}: is not in {}".format(
1835 self._blockall["new-source"].user,
1836 self.suite_info.target_suite.name,
1837 )
1838 elif "key" in self._blockall and src in self._key_packages:
1839 blocked["block"] = self._blockall["key"].user
1840 excuse.add_hint(self._blockall["key"])
1841 block_info["block"] = "blocked by {}: is a key package ({})".format(
1842 self._blockall["key"].user,
1843 tooltip,
1844 )
1845 elif "no-autopkgtest" in self._blockall:
1846 if excuse.autopkgtest_results == {"PASS"}:
1847 if not blocked: 1847 ↛ 1873line 1847 didn't jump to line 1873 because the condition on line 1847 was always true
1848 excuse.addinfo("not blocked: has successful autopkgtest")
1849 else:
1850 blocked["block"] = self._blockall["no-autopkgtest"].user
1851 excuse.add_hint(self._blockall["no-autopkgtest"])
1852 if not excuse.autopkgtest_results:
1853 block_info["block"] = (
1854 "blocked by %s: does not have autopkgtest (%s)"
1855 % (
1856 self._blockall["no-autopkgtest"].user,
1857 tooltip,
1858 )
1859 )
1860 else:
1861 block_info["block"] = (
1862 "blocked by %s: autopkgtest not fully successful (%s)"
1863 % (
1864 self._blockall["no-autopkgtest"].user,
1865 tooltip,
1866 )
1867 )
1869 elif not is_primary:
1870 blocked["block"] = suite_name
1871 excuse.needs_approval = True
1873 for block_cmd in blocked:
1874 unblock_cmd = "un" + block_cmd
1875 if block_cmd in unblocked:
1876 if is_primary or block_cmd == "block-udeb":
1877 excuse.addinfo(
1878 "Ignoring %s request by %s, due to %s request by %s"
1879 % (
1880 block_cmd,
1881 blocked[block_cmd],
1882 unblock_cmd,
1883 unblocked[block_cmd],
1884 )
1885 )
1886 else:
1887 excuse.addinfo("Approved by %s" % (unblocked[block_cmd]))
1888 else:
1889 verdict = PolicyVerdict.REJECTED_NEEDS_APPROVAL
1890 if is_primary or block_cmd == "block-udeb":
1891 # redirect people to d-i RM for udeb things:
1892 if block_cmd == "block-udeb":
1893 tooltip = "please contact the d-i release manager if an update is needed"
1894 if block_cmd in block_info:
1895 info = block_info[block_cmd]
1896 else:
1897 info = (
1898 "Not touching package due to {} request by {} ({})".format(
1899 block_cmd,
1900 blocked[block_cmd],
1901 tooltip,
1902 )
1903 )
1904 excuse.add_verdict_info(verdict, info)
1905 else:
1906 excuse.add_verdict_info(verdict, "NEEDS APPROVAL BY RM")
1907 excuse.addreason("block")
1908 if mismatches:
1909 excuse.add_detailed_info(
1910 "Some hints for %s do not match this item" % src
1911 )
1912 return verdict
1914 def apply_src_policy_impl(
1915 self,
1916 block_info: dict[str, Any],
1917 item: MigrationItem,
1918 source_data_tdist: SourcePackage | None,
1919 source_data_srcdist: SourcePackage,
1920 excuse: "Excuse",
1921 ) -> PolicyVerdict:
1922 return self._check_blocked(item, "source", source_data_srcdist.version, excuse)
1924 def apply_srcarch_policy_impl(
1925 self,
1926 block_info: dict[str, Any],
1927 item: MigrationItem,
1928 arch: str,
1929 source_data_tdist: SourcePackage | None,
1930 source_data_srcdist: SourcePackage,
1931 excuse: "Excuse",
1932 ) -> PolicyVerdict:
1933 return self._check_blocked(item, arch, source_data_srcdist.version, excuse)
1936class BuiltOnBuilddPolicy(AbstractBasePolicy):
1938 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1939 super().__init__(
1940 "builtonbuildd",
1941 options,
1942 suite_info,
1943 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1944 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1945 )
1946 self._builtonbuildd: dict[str, Any] = {
1947 "signerinfo": None,
1948 }
1950 def register_hints(self, hint_parser: HintParser) -> None:
1951 hint_parser.register_hint_type(
1952 HintType(
1953 "allow-archall-maintainer-upload",
1954 versioned=HintAnnotate.FORBIDDEN,
1955 )
1956 )
1958 def initialise(self, britney: "Britney") -> None:
1959 super().initialise(britney)
1960 try:
1961 filename_signerinfo = os.path.join(self.state_dir, "signers.json")
1962 except AttributeError as e: # pragma: no cover
1963 raise RuntimeError(
1964 "Please set STATE_DIR in the britney configuration"
1965 ) from e
1966 self._builtonbuildd["signerinfo"] = self._read_signerinfo(filename_signerinfo)
1968 def apply_srcarch_policy_impl(
1969 self,
1970 buildd_info: dict[str, Any],
1971 item: MigrationItem,
1972 arch: str,
1973 source_data_tdist: SourcePackage | None,
1974 source_data_srcdist: SourcePackage,
1975 excuse: "Excuse",
1976 ) -> PolicyVerdict:
1977 verdict = PolicyVerdict.PASS
1978 signers = self._builtonbuildd["signerinfo"]
1980 if "signed-by" not in buildd_info:
1981 buildd_info["signed-by"] = {}
1983 source_suite = item.suite
1985 # horribe hard-coding, but currently, we don't keep track of the
1986 # component when loading the packages files
1987 component = "main"
1988 # we use the source component, because a binary in contrib can
1989 # belong to a source in main
1990 section = source_data_srcdist.section
1991 if section.find("/") > -1:
1992 component = section.split("/")[0]
1994 packages_s_a = source_suite.binaries[arch]
1995 assert self.hints is not None
1997 for pkg_id in sorted(
1998 x for x in source_data_srcdist.binaries if x.architecture == arch
1999 ):
2000 pkg_name = pkg_id.package_name
2001 binary_u = packages_s_a[pkg_name]
2002 pkg_arch = binary_u.architecture
2004 if binary_u.source_version != source_data_srcdist.version: 2004 ↛ 2005line 2004 didn't jump to line 2005 because the condition on line 2004 was never true
2005 continue
2007 if item.architecture != "source" and pkg_arch == "all":
2008 # we don't care about the existing arch: all binaries when
2009 # checking a binNMU item, because the arch: all binaries won't
2010 # migrate anyway
2011 continue
2013 signer = None
2014 uid = None
2015 uidinfo = ""
2016 buildd_ok = False
2017 failure_verdict = PolicyVerdict.REJECTED_PERMANENTLY
2018 try:
2019 signer = signers[pkg_name][pkg_id.version][pkg_arch]
2020 if signer["buildd"]:
2021 buildd_ok = True
2022 uid = signer["uid"]
2023 uidinfo = f"arch {pkg_arch} binaries uploaded by {uid}"
2024 except KeyError:
2025 self.logger.info(
2026 "signer info for %s %s (%s) on %s not found "
2027 % (pkg_name, binary_u.version, pkg_arch, arch)
2028 )
2029 uidinfo = "upload info for arch %s binaries not found" % (pkg_arch)
2030 failure_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT
2031 if not buildd_ok:
2032 if component != "main":
2033 if not buildd_ok and pkg_arch not in buildd_info["signed-by"]: 2033 ↛ 2037line 2033 didn't jump to line 2037 because the condition on line 2033 was always true
2034 excuse.add_detailed_info(
2035 f"{uidinfo}, but package in {component}"
2036 )
2037 buildd_ok = True
2038 elif pkg_arch == "all":
2039 allow_hints = self.hints.search(
2040 "allow-archall-maintainer-upload", package=item.package
2041 )
2042 if allow_hints:
2043 buildd_ok = True
2044 verdict = PolicyVerdict.worst_of(
2045 verdict, PolicyVerdict.PASS_HINTED
2046 )
2047 if pkg_arch not in buildd_info["signed-by"]:
2048 excuse.addinfo(
2049 "%s, but whitelisted by %s"
2050 % (uidinfo, allow_hints[0].user)
2051 )
2052 if not buildd_ok:
2053 verdict = failure_verdict
2054 if pkg_arch not in buildd_info["signed-by"]:
2055 if pkg_arch == "all":
2056 uidinfo += (
2057 ", a new source-only upload is needed to allow migration"
2058 )
2059 excuse.add_verdict_info(
2060 verdict, "Not built on buildd: %s" % (uidinfo)
2061 )
2063 if ( 2063 ↛ 2067line 2063 didn't jump to line 2067
2064 pkg_arch in buildd_info["signed-by"]
2065 and buildd_info["signed-by"][pkg_arch] != uid
2066 ):
2067 self.logger.info(
2068 "signer mismatch for %s (%s %s) on %s: %s, while %s already listed"
2069 % (
2070 pkg_name,
2071 binary_u.source,
2072 binary_u.source_version,
2073 pkg_arch,
2074 uid,
2075 buildd_info["signed-by"][pkg_arch],
2076 )
2077 )
2079 buildd_info["signed-by"][pkg_arch] = uid
2081 return verdict
2083 def _read_signerinfo(self, filename: str) -> dict[str, Any]:
2084 signerinfo: dict[str, Any] = {}
2085 self.logger.info("Loading signer info from %s", filename)
2086 with open(filename) as fd: 2086 ↛ exitline 2086 didn't return from function '_read_signerinfo' because the return on line 2088 wasn't executed
2087 if os.fstat(fd.fileno()).st_size < 1: 2087 ↛ 2088line 2087 didn't jump to line 2088 because the condition on line 2087 was never true
2088 return signerinfo
2089 signerinfo = json.load(fd)
2091 return signerinfo
2094class ImplicitDependencyPolicy(AbstractBasePolicy):
2095 """Implicit Dependency policy
2097 Upgrading a package pkg-a can break the installability of a package pkg-b.
2098 A newer version (or the removal) of pkg-b might fix the issue. In that
2099 case, pkg-a has an 'implicit dependency' on pkg-b, because pkg-a can only
2100 migrate if pkg-b also migrates.
2102 This policy tries to discover a few common cases, and adds the relevant
2103 info to the excuses. If another item is needed to fix the
2104 uninstallability, a dependency is added. If no newer item can fix it, this
2105 excuse will be blocked.
2107 Note that the migration step will check the installability of every
2108 package, so this policy doesn't need to handle every corner case. It
2109 must, however, make sure that no excuse is unnecessarily blocked.
2111 Some cases that should be detected by this policy:
2113 * pkg-a is upgraded from 1.0-1 to 2.0-1, while
2114 pkg-b has "Depends: pkg-a (<< 2.0)"
2115 This typically happens if pkg-b has a strict dependency on pkg-a because
2116 it uses some non-stable internal interface (examples are glibc,
2117 binutils, python3-defaults, ...)
2119 * pkg-a is upgraded from 1.0-1 to 2.0-1, and
2120 pkg-a 1.0-1 has "Provides: provides-1",
2121 pkg-a 2.0-1 has "Provides: provides-2",
2122 pkg-b has "Depends: provides-1"
2123 This typically happens when pkg-a has an interface that changes between
2124 versions, and a virtual package is used to identify the version of this
2125 interface (e.g. perl-api-x.y)
2127 """
2129 _pkg_universe: "BinaryPackageUniverse"
2130 _all_binaries: dict["BinaryPackageId", "BinaryPackage"]
2131 _allow_uninst: dict[str, set[str | None]]
2132 _nobreakall_arches: list[str]
2134 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2135 super().__init__(
2136 "implicit-deps",
2137 options,
2138 suite_info,
2139 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
2140 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
2141 )
2143 def initialise(self, britney: "Britney") -> None:
2144 super().initialise(britney)
2145 self._pkg_universe = britney.pkg_universe
2146 self._all_binaries = britney.all_binaries
2147 self._smooth_updates = britney.options.smooth_updates
2148 self._nobreakall_arches = self.options.nobreakall_arches
2149 self._new_arches = self.options.new_arches
2150 self._break_arches = self.options.break_arches
2151 self._allow_uninst = britney.allow_uninst
2152 self._outofsync_arches = self.options.outofsync_arches
2154 def can_be_removed(self, pkg: BinaryPackage) -> bool:
2155 src = pkg.source
2156 target_suite = self.suite_info.target_suite
2158 # TODO these conditions shouldn't be hardcoded here
2159 # ideally, we would be able to look up excuses to see if the removal
2160 # is in there, but in the current flow, this policy is called before
2161 # all possible excuses exist, so there is no list for us to check
2163 if src not in self.suite_info.primary_source_suite.sources:
2164 # source for pkg not in unstable: candidate for removal
2165 return True
2167 source_t = target_suite.sources[src]
2168 assert self.hints is not None
2169 for hint in self.hints.search("remove", package=src, version=source_t.version):
2170 # removal hint for the source in testing: candidate for removal
2171 return True
2173 if target_suite.is_cruft(pkg):
2174 # if pkg is cruft in testing, removal will be tried
2175 return True
2177 # the case were the newer version of the source no longer includes the
2178 # binary (or includes a cruft version of the binary) will be handled
2179 # separately (in that case there might be an implicit dependency on
2180 # the newer source)
2182 return False
2184 def should_skip_rdep(
2185 self, pkg: BinaryPackage, source_name: str, myarch: str
2186 ) -> bool:
2187 target_suite = self.suite_info.target_suite
2189 if not target_suite.is_pkg_in_the_suite(pkg.pkg_id):
2190 # it is not in the target suite, migration cannot break anything
2191 return True
2193 if pkg.source == source_name:
2194 # if it is built from the same source, it will be upgraded
2195 # with the source
2196 return True
2198 if self.can_be_removed(pkg):
2199 # could potentially be removed, so if that happens, it won't be
2200 # broken
2201 return True
2203 if pkg.architecture == "all" and myarch not in self._nobreakall_arches:
2204 # arch all on non nobreakarch is allowed to become uninstallable
2205 return True
2207 if pkg.pkg_id.package_name in self._allow_uninst[myarch]:
2208 # there is a hint to allow this binary to become uninstallable
2209 return True
2211 if not target_suite.is_installable(pkg.pkg_id):
2212 # it is already uninstallable in the target suite, migration
2213 # cannot break anything
2214 return True
2216 return False
2218 def breaks_installability(
2219 self,
2220 pkg_id_t: BinaryPackageId,
2221 pkg_id_s: BinaryPackageId | None,
2222 pkg_to_check: BinaryPackageId,
2223 ) -> bool:
2224 """
2225 Check if upgrading pkg_id_t to pkg_id_s breaks the installability of
2226 pkg_to_check.
2228 To check if removing pkg_id_t breaks pkg_to_check, set pkg_id_s to
2229 None.
2230 """
2232 pkg_universe = self._pkg_universe
2233 negative_deps = pkg_universe.negative_dependencies_of(pkg_to_check)
2235 for dep in pkg_universe.dependencies_of(pkg_to_check):
2236 if pkg_id_t not in dep:
2237 # this depends doesn't have pkg_id_t as alternative, so
2238 # upgrading pkg_id_t cannot break this dependency clause
2239 continue
2241 # We check all the alternatives for this dependency, to find one
2242 # that can satisfy it when pkg_id_t is upgraded to pkg_id_s
2243 found_alternative = False
2244 for d in dep:
2245 if d in negative_deps:
2246 # If this alternative dependency conflicts with
2247 # pkg_to_check, it cannot be used to satisfy the
2248 # dependency.
2249 # This commonly happens when breaks are added to pkg_id_s.
2250 continue
2252 if d.package_name != pkg_id_t.package_name:
2253 # a binary different from pkg_id_t can satisfy the dep, so
2254 # upgrading pkg_id_t won't break this dependency
2255 found_alternative = True
2256 break
2258 if d != pkg_id_s:
2259 # We want to know the impact of the upgrade of
2260 # pkg_id_t to pkg_id_s. If pkg_id_s migrates to the
2261 # target suite, any other version of this binary will
2262 # not be there, so it cannot satisfy this dependency.
2263 # This includes pkg_id_t, but also other versions.
2264 continue
2266 # pkg_id_s can satisfy the dep
2267 found_alternative = True
2269 if not found_alternative:
2270 return True
2271 return False
2273 def check_upgrade(
2274 self,
2275 pkg_id_t: BinaryPackageId,
2276 pkg_id_s: BinaryPackageId | None,
2277 source_name: str,
2278 myarch: str,
2279 broken_binaries: set[str],
2280 excuse: "Excuse",
2281 ) -> PolicyVerdict:
2282 verdict = PolicyVerdict.PASS
2284 pkg_universe = self._pkg_universe
2285 all_binaries = self._all_binaries
2287 # check all rdeps of the package in testing
2288 rdeps_t = pkg_universe.reverse_dependencies_of(pkg_id_t)
2290 for rdep_pkg in sorted(rdeps_t):
2291 rdep_p = all_binaries[rdep_pkg]
2293 # check some cases where the rdep won't become uninstallable, or
2294 # where we don't care if it does
2295 if self.should_skip_rdep(rdep_p, source_name, myarch):
2296 continue
2298 if not self.breaks_installability(pkg_id_t, pkg_id_s, rdep_pkg):
2299 # if upgrading pkg_id_t to pkg_id_s doesn't break rdep_pkg,
2300 # there is no implicit dependency
2301 continue
2303 # The upgrade breaks the installability of the rdep. We need to
2304 # find out if there is a newer version of the rdep that solves the
2305 # uninstallability. If that is the case, there is an implicit
2306 # dependency. If not, the upgrade will fail.
2308 # check source versions
2309 newer_versions = find_newer_binaries(
2310 self.suite_info, rdep_p, add_source_for_dropped_bin=True
2311 )
2312 good_newer_versions = set()
2313 for npkg, suite in newer_versions:
2314 if npkg.architecture == "source":
2315 # When a newer version of the source package doesn't have
2316 # the binary, we get the source as 'newer version'. In
2317 # this case, the binary will not be uninstallable if the
2318 # newer source migrates, because it is no longer there.
2319 good_newer_versions.add(npkg)
2320 continue
2321 assert isinstance(npkg, BinaryPackageId)
2322 if not self.breaks_installability(pkg_id_t, pkg_id_s, npkg):
2323 good_newer_versions.add(npkg)
2325 if good_newer_versions:
2326 spec = DependencySpec(DependencyType.IMPLICIT_DEPENDENCY, myarch)
2327 excuse.add_package_depends(spec, good_newer_versions)
2328 else:
2329 # no good newer versions: no possible solution
2330 broken_binaries.add(rdep_pkg.name)
2331 if pkg_id_s:
2332 action = "migrating {} to {}".format(
2333 pkg_id_s.name,
2334 self.suite_info.target_suite.name,
2335 )
2336 else:
2337 action = "removing {} from {}".format(
2338 pkg_id_t.name,
2339 self.suite_info.target_suite.name,
2340 )
2341 info = '{0} makes <a href="#{1}">{1}</a> uninstallable'.format(
2342 action, rdep_pkg.name
2343 )
2344 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2345 excuse.add_verdict_info(verdict, info)
2347 return verdict
2349 def apply_srcarch_policy_impl(
2350 self,
2351 implicit_dep_info: dict[str, Any],
2352 item: MigrationItem,
2353 arch: str,
2354 source_data_tdist: SourcePackage | None,
2355 source_data_srcdist: SourcePackage,
2356 excuse: "Excuse",
2357 ) -> PolicyVerdict:
2358 verdict = PolicyVerdict.PASS
2360 if not source_data_tdist:
2361 # this item is not currently in testing: no implicit dependency
2362 return verdict
2364 if excuse.hasreason("missingbuild"):
2365 # if the build is missing, the policy would treat this as if the
2366 # binaries would be removed, which would give incorrect (and
2367 # confusing) info
2368 info = "missing build, not checking implicit dependencies on %s" % (arch)
2369 excuse.add_detailed_info(info)
2370 return verdict
2372 source_suite = item.suite
2373 source_name = item.package
2374 target_suite = self.suite_info.target_suite
2375 all_binaries = self._all_binaries
2377 # we check all binaries for this excuse that are currently in testing
2378 relevant_binaries = [
2379 x
2380 for x in source_data_tdist.binaries
2381 if (arch == "source" or x.architecture == arch)
2382 and x.package_name in target_suite.binaries[x.architecture]
2383 and x.architecture not in self._new_arches
2384 and x.architecture not in self._break_arches
2385 and x.architecture not in self._outofsync_arches
2386 ]
2388 broken_binaries: set[str] = set()
2390 assert self.hints is not None
2391 for pkg_id_t in sorted(relevant_binaries):
2392 mypkg = pkg_id_t.package_name
2393 myarch = pkg_id_t.architecture
2394 binaries_t_a = target_suite.binaries[myarch]
2395 binaries_s_a = source_suite.binaries[myarch]
2397 if target_suite.is_cruft(all_binaries[pkg_id_t]):
2398 # this binary is cruft in testing: it will stay around as long
2399 # as necessary to satisfy dependencies, so we don't need to
2400 # care
2401 continue
2403 if mypkg in binaries_s_a:
2404 mybin = binaries_s_a[mypkg]
2405 pkg_id_s: Optional["BinaryPackageId"] = mybin.pkg_id
2406 if mybin.source != source_name:
2407 # hijack: this is too complicated to check, so we ignore
2408 # it (the migration code will check the installability
2409 # later anyway)
2410 pass
2411 elif mybin.source_version != source_data_srcdist.version:
2412 # cruft in source suite: pretend the binary doesn't exist
2413 pkg_id_s = None
2414 elif pkg_id_t == pkg_id_s:
2415 # same binary (probably arch: all from a binNMU):
2416 # 'upgrading' doesn't change anything, for this binary, so
2417 # it won't break anything
2418 continue
2419 else:
2420 pkg_id_s = None
2422 if not pkg_id_s and is_smooth_update_allowed(
2423 binaries_t_a[mypkg], self._smooth_updates, self.hints
2424 ):
2425 # the binary isn't in the new version (or is cruft there), and
2426 # smooth updates are allowed: the binary can stay around if
2427 # that is necessary to satisfy dependencies, so we don't need
2428 # to check it
2429 continue
2431 if (
2432 not pkg_id_s
2433 and source_data_tdist.version == source_data_srcdist.version
2434 and source_suite.suite_class == SuiteClass.ADDITIONAL_SOURCE_SUITE
2435 and binaries_t_a[mypkg].architecture == "all"
2436 ):
2437 # we're very probably migrating a binNMU built in tpu where the arch:all
2438 # binaries were not copied to it as that's not needed. This policy could
2439 # needlessly block.
2440 continue
2442 v = self.check_upgrade(
2443 pkg_id_t, pkg_id_s, source_name, myarch, broken_binaries, excuse
2444 )
2445 verdict = PolicyVerdict.worst_of(verdict, v)
2447 # each arch is processed separately, so if we already have info from
2448 # other archs, we need to merge the info from this arch
2449 broken_old = set()
2450 if "implicit-deps" not in implicit_dep_info:
2451 implicit_dep_info["implicit-deps"] = {}
2452 else:
2453 broken_old = set(implicit_dep_info["implicit-deps"]["broken-binaries"])
2455 implicit_dep_info["implicit-deps"]["broken-binaries"] = sorted(
2456 broken_old | broken_binaries
2457 )
2459 return verdict
2462class ReverseRemovalPolicy(AbstractBasePolicy):
2463 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2464 super().__init__(
2465 "reverseremoval",
2466 options,
2467 suite_info,
2468 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
2469 )
2471 def register_hints(self, hint_parser: HintParser) -> None:
2472 hint_parser.register_hint_type(HintType("ignore-reverse-remove"))
2474 def initialise(self, britney: "Britney") -> None:
2475 super().initialise(britney)
2477 pkg_universe = britney.pkg_universe
2478 source_suites = britney.suite_info.source_suites
2479 target_suite = britney.suite_info.target_suite
2481 # Build set of the sources of reverse (Build-) Depends
2482 assert self.hints is not None
2483 hints = self.hints.search("remove")
2485 rev_bin: dict[BinaryPackageId, set[str]] = defaultdict(set)
2486 for hint in hints:
2487 for item in hint.packages:
2488 # I think we don't need to look at the target suite
2489 for src_suite in source_suites:
2490 try:
2491 my_bins = set(src_suite.sources[item.uvname].binaries)
2492 except KeyError:
2493 continue
2494 compute_reverse_tree(pkg_universe, my_bins)
2495 for this_bin in my_bins:
2496 rev_bin.setdefault(this_bin, set()).add(item.uvname)
2498 rev_src: dict[str, set[str]] = defaultdict(set)
2499 for bin_pkg, reasons in rev_bin.items():
2500 # If the pkg is in the target suite, there's nothing this
2501 # policy wants to do.
2502 if target_suite.is_pkg_in_the_suite(bin_pkg):
2503 continue
2504 that_bin = britney.all_binaries[bin_pkg]
2505 bin_src = that_bin.source + "/" + that_bin.source_version
2506 rev_src.setdefault(bin_src, set()).update(reasons)
2507 self._block_src_for_rm_hint = rev_src
2509 def apply_src_policy_impl(
2510 self,
2511 rev_remove_info: dict[str, Any],
2512 item: MigrationItem,
2513 source_data_tdist: SourcePackage | None,
2514 source_data_srcdist: SourcePackage,
2515 excuse: "Excuse",
2516 ) -> PolicyVerdict:
2517 verdict = PolicyVerdict.PASS
2519 if item.name in self._block_src_for_rm_hint:
2520 reason = ", ".join(sorted(self._block_src_for_rm_hint[item.name]))
2521 assert self.hints is not None
2522 ignore_hints = self.hints.search(
2523 "ignore-reverse-remove", package=item.uvname, version=item.version
2524 )
2525 excuse.addreason("reverseremoval")
2526 if ignore_hints:
2527 excuse.addreason("ignore-reverse-remove")
2528 excuse.addinfo(
2529 "Should block migration because of remove hint for %s, but forced by %s"
2530 % (reason, ignore_hints[0].user)
2531 )
2532 verdict = PolicyVerdict.PASS_HINTED
2533 else:
2534 excuse.addinfo("Remove hint for (transitive) dependency: %s" % reason)
2535 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2537 return verdict
2540class ReproduciblePolicy(AbstractBasePolicy):
2541 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2542 super().__init__(
2543 "reproducible",
2544 options,
2545 suite_info,
2546 {SuiteClass.PRIMARY_SOURCE_SUITE},
2547 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
2548 )
2549 self._reproducible: dict[str, Any] = {
2550 "source": {},
2551 "target": {},
2552 }
2554 # Default values for this policy's options
2555 parse_option(options, "repro_success_bounty", default=0, to_int=True)
2556 parse_option(options, "repro_regression_penalty", default=0, to_int=True)
2557 parse_option(options, "repro_url")
2558 parse_option(options, "repro_retry_url")
2559 parse_option(options, "repro_components")
2561 def register_hints(self, hint_parser: HintParser) -> None:
2562 hint_parser.register_hint_type(
2563 HintType("ignore-reproducible", architectured=HintAnnotate.OPTIONAL)
2564 )
2566 def initialise(self, britney: "Britney") -> None:
2567 super().initialise(britney)
2568 source_suite = self.suite_info.primary_source_suite
2569 target_suite = self.suite_info.target_suite
2570 try:
2571 filename = os.path.join(self.state_dir, "reproducible.json")
2572 except AttributeError as e: # pragma: no cover
2573 raise RuntimeError(
2574 "Please set STATE_DIR in the britney configuration"
2575 ) from e
2577 self._reproducible = self._read_repro_status(
2578 filename,
2579 source={source_suite.name, source_suite.codename},
2580 target={target_suite.name, target_suite.codename},
2581 )
2583 def apply_srcarch_policy_impl(
2584 self,
2585 reproducible_info: dict[str, Any],
2586 item: MigrationItem,
2587 arch: str,
2588 source_data_tdist: SourcePackage | None,
2589 source_data_srcdist: SourcePackage,
2590 excuse: "Excuse",
2591 ) -> PolicyVerdict:
2592 verdict = PolicyVerdict.PASS
2594 # we don't want to apply this policy (yet) on binNMUs
2595 if item.architecture != "source":
2596 return verdict
2598 # we're not supposed to judge on this arch
2599 if arch not in self.options.repro_arches:
2600 return verdict
2602 # bail out if this arch has no packages for this source (not build
2603 # here)
2604 if arch not in excuse.packages:
2605 return verdict
2607 # horrible hard-coding, but currently, we don't keep track of the
2608 # component when loading the packages files
2609 component = "main"
2610 if "/" in (section := source_data_srcdist.section):
2611 component = section.split("/")[0]
2613 if (
2614 self.options.repro_components
2615 and component not in self.options.repro_components
2616 ):
2617 return verdict
2619 source_name = item.package
2620 try:
2621 tar_res = self._reproducible["target"][arch]
2622 src_res = self._reproducible["source"][arch]
2623 except KeyError:
2624 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2625 msg = "No reproducible data available at all for %s" % arch
2626 excuse.add_verdict_info(verdict, msg)
2627 return verdict
2629 if source_data_tdist is None:
2630 target_suite_state = "new"
2631 elif source_name not in tar_res:
2632 target_suite_state = "unknown"
2633 elif tar_res[source_name]["version"] == source_data_tdist.version:
2634 target_suite_state = tar_res[source_name]["status"]
2635 else:
2636 target_suite_state = "stale"
2638 if source_name in src_res and src_res[source_name]["version"] == item.version:
2639 source_suite_state = src_res[source_name]["status"]
2640 else:
2641 source_suite_state = "unknown"
2643 # status of ['E404', 'FTBFS', 'FTBR', 'NFU', 'blacklisted', 'depwait',
2644 # 'reproducible', 'timeout'] with ['new', 'stale', 'unknown']
2645 wait_states = ("E404", "depwait", "stale", "timeout", "unknown")
2646 no_build_states = ("FTBFS", "NFU", "blacklisted")
2648 # if this package doesn't build on this architecture, we don't need to
2649 # judge it
2650 # FTBFS: Fails to build from source on r-b infra
2651 # NFU: the package explicitly doesn't support building on this arch
2652 # blacklisted: per package per arch per suite
2653 if source_suite_state in no_build_states:
2654 return verdict
2655 # Assume depwait in the source suite only are intermittent (might not
2656 # be true, e.g. with new build depends)
2657 if source_suite_state == target_suite_state and target_suite_state == "depwait":
2658 return verdict
2660 if self.options.repro_url:
2661 url = self.options.repro_url.format(package=quote(source_name), arch=arch)
2662 url_html = ' - <a href="%s">info</a>' % url
2663 if self.options.repro_retry_url:
2664 url_html += (
2665 ' <a href="%s">♻ </a>'
2666 % self.options.repro_retry_url.format(
2667 package=quote(source_name), arch=arch
2668 )
2669 )
2670 # When run on multiple archs, the last one "wins"
2671 reproducible_info["reproducible-test-url"] = url
2672 else:
2673 url = None
2674 url_html = ""
2676 eligible_for_bounty = False
2677 if source_suite_state == "reproducible":
2678 verdict = PolicyVerdict.PASS
2679 msg = f"Reproducible on {arch}{url_html}"
2680 reproducible_info.setdefault("test-results", []).append(
2681 "reproducible on %s" % arch
2682 )
2683 eligible_for_bounty = True
2684 elif source_suite_state == "FTBR":
2685 if target_suite_state == "new":
2686 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2687 msg = f"New but not reproducible on {arch}{url_html}"
2688 reproducible_info.setdefault("test-results", []).append(
2689 "new but not reproducible on %s" % arch
2690 )
2691 elif target_suite_state in wait_states:
2692 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2693 msg = "Waiting for reproducibility reference results on {}{}".format(
2694 arch,
2695 url_html,
2696 )
2697 reproducible_info.setdefault("test-results", []).append(
2698 "waiting-for-reference-results on %s" % arch
2699 )
2700 elif target_suite_state == "reproducible":
2701 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2702 msg = f"Reproducibility regression on {arch}{url_html}"
2703 reproducible_info.setdefault("test-results", []).append(
2704 "regression on %s" % arch
2705 )
2706 elif target_suite_state == "FTBR":
2707 verdict = PolicyVerdict.PASS
2708 msg = "Ignoring non-reproducibility on {} (not a regression){}".format(
2709 arch,
2710 url_html,
2711 )
2712 reproducible_info.setdefault("test-results", []).append(
2713 "not reproducible on %s" % arch
2714 )
2715 else:
2716 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2717 msg = "No reference result, but not reproducibility on {}{}".format(
2718 arch,
2719 url_html,
2720 )
2721 reproducible_info.setdefault("test-results", []).append(
2722 f"reference {target_suite_state} on {arch}"
2723 )
2724 elif source_suite_state in wait_states:
2725 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2726 msg = f"Waiting for reproducibility test results on {arch}{url_html}"
2727 reproducible_info.setdefault("test-results", []).append(
2728 "waiting-for-test-results on %s" % arch
2729 )
2730 else:
2731 raise KeyError("Unhandled reproducibility state %s" % source_suite_state)
2733 if verdict.is_rejected:
2734 assert self.hints is not None
2735 for hint_arch in ("source", arch):
2736 for ignore_hint in self.hints.search(
2737 "ignore-reproducible",
2738 package=source_name,
2739 version=source_data_srcdist.version,
2740 architecture=hint_arch,
2741 ):
2742 verdict = PolicyVerdict.PASS_HINTED
2743 reproducible_info.setdefault("ignored-reproducible", {}).setdefault(
2744 arch, {}
2745 ).setdefault("issued-by", []).append(ignore_hint.user)
2746 excuse.addinfo(
2747 "Ignoring reproducibility issue on %s as requested "
2748 "by %s" % (arch, ignore_hint.user)
2749 )
2750 break
2752 if self.options.repro_success_bounty and eligible_for_bounty:
2753 excuse.add_bounty("reproducibility", self.options.repro_success_bounty)
2755 if self.options.repro_regression_penalty and verdict in {
2756 PolicyVerdict.REJECTED_PERMANENTLY,
2757 PolicyVerdict.REJECTED_TEMPORARILY,
2758 }:
2759 if self.options.repro_regression_penalty > 0:
2760 excuse.add_penalty(
2761 "reproducibility", self.options.repro_regression_penalty
2762 )
2763 # In case we give penalties instead of blocking, we must always pass
2764 verdict = PolicyVerdict.PASS
2766 if verdict.is_rejected:
2767 excuse.add_verdict_info(verdict, msg)
2768 else:
2769 excuse.addinfo(msg)
2771 return verdict
2773 def _read_repro_status(
2774 self, filename: str, source: set[str], target: set[str]
2775 ) -> dict[str, dict[str, str]]:
2776 summary = self._reproducible
2777 self.logger.info("Loading reproducibility report from %s", filename)
2778 with open(filename) as fd:
2779 if os.fstat(fd.fileno()).st_size < 1:
2780 return summary
2781 data = json.load(fd)
2783 for result in data:
2784 if result["suite"] in source:
2785 summary["source"].setdefault(result["architecture"], {})[
2786 result["package"]
2787 ] = result
2788 if result["suite"] in target:
2789 summary["target"].setdefault(result["architecture"], {})[
2790 result["package"]
2791 ] = result
2793 return summary