Coverage for britney2/policies/policy.py: 84%
1240 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-10-17 17:32 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-10-17 17:32 +0000
1import json
2import logging
3import optparse
4import os
5import re
6import sys
7import time
8from abc import ABC, abstractmethod
9from collections import defaultdict
10from collections.abc import Callable, Container
11from enum import IntEnum, unique
12from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, cast
13from urllib.parse import quote
15import apt_pkg
16import yaml
18from britney2 import (
19 BinaryPackage,
20 BinaryPackageId,
21 DependencyType,
22 PackageId,
23 SourcePackage,
24 Suite,
25 SuiteClass,
26 Suites,
27 TargetSuite,
28)
29from britney2.excusedeps import DependencySpec
30from britney2.hints import (
31 Hint,
32 HintAnnotate,
33 HintCollection,
34 HintParser,
35 HintType,
36 PolicyHintParserProto,
37)
38from britney2.inputs.suiteloader import SuiteContentLoader
39from britney2.migrationitem import MigrationItem, MigrationItemFactory
40from britney2.policies import ApplySrcPolicy, PolicyVerdict
41from britney2.utils import (
42 GetDependencySolversProto,
43 compute_reverse_tree,
44 find_newer_binaries,
45 get_dependency_solvers,
46 is_smooth_update_allowed,
47 parse_option,
48)
50if TYPE_CHECKING: 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true
51 from ..britney import Britney
52 from ..excuse import Excuse
53 from ..installability.universe import BinaryPackageUniverse
56class PolicyLoadRequest:
57 __slots__ = ("_options_name", "_default_value", "_policy_constructor")
59 def __init__(
60 self,
61 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"],
62 options_name: str | None,
63 default_value: bool,
64 ) -> None:
65 self._policy_constructor = policy_constructor
66 self._options_name = options_name
67 self._default_value = default_value
69 def is_enabled(self, options: optparse.Values) -> bool:
70 if self._options_name is None:
71 assert self._default_value
72 return True
73 actual_value = getattr(options, self._options_name, None)
74 if actual_value is None:
75 return self._default_value
76 return actual_value.lower() in ("yes", "y", "true", "t")
78 def load(self, options: optparse.Values, suite_info: Suites) -> "BasePolicy":
79 return self._policy_constructor(options, suite_info)
81 @classmethod
82 def always_load(
83 cls, policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"]
84 ) -> "PolicyLoadRequest":
85 return cls(policy_constructor, None, True)
87 @classmethod
88 def conditionally_load(
89 cls,
90 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"],
91 option_name: str,
92 default_value: bool,
93 ) -> "PolicyLoadRequest":
94 return cls(policy_constructor, option_name, default_value)
97class PolicyEngine:
98 def __init__(self) -> None:
99 self._policies: list["BasePolicy"] = []
101 def add_policy(self, policy: "BasePolicy") -> None:
102 self._policies.append(policy)
104 def load_policies(
105 self,
106 options: optparse.Values,
107 suite_info: Suites,
108 policy_load_requests: list[PolicyLoadRequest],
109 ) -> None:
110 for policy_load_request in policy_load_requests:
111 if policy_load_request.is_enabled(options):
112 self.add_policy(policy_load_request.load(options, suite_info))
114 def register_policy_hints(self, hint_parser: HintParser) -> None:
115 for policy in self._policies:
116 policy.register_hints(hint_parser)
118 def initialise(self, britney: "Britney", hints: HintCollection) -> None:
119 for policy in self._policies:
120 policy.hints = hints
121 policy.initialise(britney)
123 def save_state(self, britney: "Britney") -> None:
124 for policy in self._policies:
125 policy.save_state(britney)
127 def apply_src_policies(
128 self,
129 item: MigrationItem,
130 source_t: SourcePackage | None,
131 source_u: SourcePackage,
132 excuse: "Excuse",
133 ) -> None:
134 excuse_verdict = excuse.policy_verdict
135 source_suite = item.suite
136 suite_class = source_suite.suite_class
137 for policy in self._policies:
138 pinfo: dict[str, Any] = {}
139 policy_verdict = PolicyVerdict.NOT_APPLICABLE
140 if suite_class in policy.applicable_suites:
141 if policy.src_policy.run_arch:
142 for arch in policy.options.architectures:
143 v = policy.apply_srcarch_policy_impl(
144 pinfo, item, arch, source_t, source_u, excuse
145 )
146 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v)
147 if policy.src_policy.run_src:
148 v = policy.apply_src_policy_impl(
149 pinfo, item, source_t, source_u, excuse
150 )
151 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v)
152 # The base policy provides this field, so the subclass should leave it blank
153 assert "verdict" not in pinfo
154 if policy_verdict != PolicyVerdict.NOT_APPLICABLE:
155 excuse.policy_info[policy.policy_id] = pinfo
156 pinfo["verdict"] = policy_verdict.name
157 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict)
158 excuse.policy_verdict = excuse_verdict
160 def apply_srcarch_policies(
161 self,
162 item: MigrationItem,
163 arch: str,
164 source_t: SourcePackage | None,
165 source_u: SourcePackage,
166 excuse: "Excuse",
167 ) -> None:
168 excuse_verdict = excuse.policy_verdict
169 source_suite = item.suite
170 suite_class = source_suite.suite_class
171 for policy in self._policies:
172 pinfo: dict[str, Any] = {}
173 if suite_class in policy.applicable_suites:
174 policy_verdict = policy.apply_srcarch_policy_impl(
175 pinfo, item, arch, source_t, source_u, excuse
176 )
177 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict)
178 # The base policy provides this field, so the subclass should leave it blank
179 assert "verdict" not in pinfo
180 if policy_verdict != PolicyVerdict.NOT_APPLICABLE:
181 excuse.policy_info[policy.policy_id] = pinfo
182 pinfo["verdict"] = policy_verdict.name
183 excuse.policy_verdict = excuse_verdict
186class BasePolicy(ABC):
187 britney: "Britney"
188 policy_id: str
189 hints: HintCollection | None
190 applicable_suites: set[SuiteClass]
191 src_policy: ApplySrcPolicy
192 options: optparse.Values
193 suite_info: Suites
195 def __init__(
196 self,
197 options: optparse.Values,
198 suite_info: Suites,
199 ) -> None:
200 """The BasePolicy constructor
202 :param options: The options member of Britney with all the
203 config values.
204 """
206 @property
207 @abstractmethod
208 def state_dir(self) -> str: ... 208 ↛ exitline 208 didn't return from function 'state_dir' because
210 def register_hints(self, hint_parser: HintParser) -> None: # pragma: no cover
211 """Register new hints that this policy accepts
213 :param hint_parser: (see HintParser.register_hint_type)
214 """
216 def initialise(self, britney: "Britney") -> None: # pragma: no cover
217 """Called once to make the policy initialise any data structures
219 This is useful for e.g. parsing files or other "heavy do-once" work.
221 :param britney: This is the instance of the "Britney" class.
222 """
223 self.britney = britney
225 def save_state(self, britney: "Britney") -> None: # pragma: no cover
226 """Called once at the end of the run to make the policy save any persistent data
228 Note this will *not* be called for "dry-runs" as such runs should not change
229 the state.
231 :param britney: This is the instance of the "Britney" class.
232 """
234 def apply_src_policy_impl(
235 self,
236 policy_info: dict[str, Any],
237 item: MigrationItem,
238 source_data_tdist: SourcePackage | None,
239 source_data_srcdist: SourcePackage,
240 excuse: "Excuse",
241 ) -> PolicyVerdict: # pragma: no cover
242 """Apply a policy on a given source migration
244 Britney will call this method on a given source package, when
245 Britney is considering to migrate it from the given source
246 suite to the target suite. The policy will then evaluate the
247 the migration and then return a verdict.
249 :param policy_info: A dictionary of all policy results. The
250 policy can add a value stored in a key related to its name.
251 (e.g. policy_info['age'] = {...}). This will go directly into
252 the "excuses.yaml" output.
254 :param item: The migration item the policy is applied to.
256 :param source_data_tdist: Information about the source package
257 in the target distribution (e.g. "testing"). This is the
258 data structure in source_suite.sources[source_name]
260 :param source_data_srcdist: Information about the source
261 package in the source distribution (e.g. "unstable" or "tpu").
262 This is the data structure in target_suite.sources[source_name]
264 :return: A Policy Verdict (e.g. PolicyVerdict.PASS)
265 """
266 return PolicyVerdict.NOT_APPLICABLE
268 def apply_srcarch_policy_impl(
269 self,
270 policy_info: dict[str, Any],
271 item: MigrationItem,
272 arch: str,
273 source_data_tdist: SourcePackage | None,
274 source_data_srcdist: SourcePackage,
275 excuse: "Excuse",
276 ) -> PolicyVerdict:
277 """Apply a policy on a given binary migration
279 Britney will call this method on binaries from a given source package
280 on a given architecture, when Britney is considering to migrate them
281 from the given source suite to the target suite. The policy will then
282 evaluate the migration and then return a verdict.
284 :param policy_info: A dictionary of all policy results. The
285 policy can add a value stored in a key related to its name.
286 (e.g. policy_info['age'] = {...}). This will go directly into
287 the "excuses.yaml" output.
289 :param item: The migration item the policy is applied to.
291 :param arch: The architecture the item is applied to. This is mostly
292 relevant for policies where src_policy is not ApplySrcPolicy.RUN_SRC
293 (as that is the only case where arch can differ from item.architecture)
295 :param source_data_tdist: Information about the source package
296 in the target distribution (e.g. "testing"). This is the
297 data structure in source_suite.sources[source_name]
299 :param source_data_srcdist: Information about the source
300 package in the source distribution (e.g. "unstable" or "tpu").
301 This is the data structure in target_suite.sources[source_name]
303 :return: A Policy Verdict (e.g. PolicyVerdict.PASS)
304 """
305 # if the policy doesn't implement this function, assume it's OK
306 return PolicyVerdict.NOT_APPLICABLE
309class AbstractBasePolicy(BasePolicy):
310 """
311 A shared abstract class for building BasePolicy objects.
313 tests/test_policy.py:initialize_policy() needs to be able to build BasePolicy
314 objects with just a two-item constructor, while all other uses of BasePolicy-
315 derived objects need the 5-item constructor. So AbstractBasePolicy was split
316 out to document this.
317 """
319 def __init__(
320 self,
321 policy_id: str,
322 options: optparse.Values,
323 suite_info: Suites,
324 applicable_suites: set[SuiteClass],
325 src_policy: ApplySrcPolicy = ApplySrcPolicy.RUN_SRC,
326 ) -> None:
327 """Concrete initializer.
329 :param policy_id: Identifies the policy. It will
330 determine the key used for the excuses.yaml etc.
332 :param options: The options member of Britney with all the
333 config values.
335 :param applicable_suites: Where this policy applies.
336 """
337 self.policy_id = policy_id
338 self.options = options
339 self.suite_info = suite_info
340 self.applicable_suites = applicable_suites
341 self.src_policy = src_policy
342 self.hints: HintCollection | None = None
343 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
344 self.logger = logging.getLogger(logger_name)
346 @property
347 def state_dir(self) -> str:
348 return cast(str, self.options.state_dir)
351_T = TypeVar("_T")
354class SimplePolicyHint(Hint, Generic[_T]):
355 def __init__(
356 self,
357 user: str,
358 hint_type: HintType,
359 policy_parameter: _T,
360 packages: list[MigrationItem],
361 ) -> None:
362 super().__init__(user, hint_type, packages)
363 self._policy_parameter = policy_parameter
365 def __eq__(self, other: Any) -> bool:
366 if self.type != other.type or self._policy_parameter != other._policy_parameter:
367 return False
368 return super().__eq__(other)
370 def str(self) -> str:
371 return "{} {} {}".format(
372 self._type,
373 str(self._policy_parameter),
374 " ".join(x.name for x in self._packages),
375 )
378class AgeDayHint(SimplePolicyHint[int]):
379 @property
380 def days(self) -> int:
381 return self._policy_parameter
384class IgnoreRCBugHint(SimplePolicyHint[frozenset[str]]):
385 @property
386 def ignored_rcbugs(self) -> frozenset[str]:
387 return self._policy_parameter
390def simple_policy_hint_parser_function(
391 class_name: Callable[[str, HintType, _T, list[MigrationItem]], Hint],
392 converter: Callable[[str], _T],
393) -> PolicyHintParserProto:
394 def f(
395 mi_factory: MigrationItemFactory,
396 hints: HintCollection,
397 who: str,
398 hint_type: HintType,
399 *args: str,
400 ) -> None:
401 policy_parameter = args[0]
402 args = args[1:]
403 for item in mi_factory.parse_items(*args):
404 hints.add_hint(
405 class_name(who, hint_type, converter(policy_parameter), [item])
406 )
408 return f
411class AgePolicy(AbstractBasePolicy):
412 """Configurable Aging policy for source migrations
414 The AgePolicy will let packages stay in the source suite for a pre-defined
415 amount of days before letting migrate (based on their urgency, if any).
417 The AgePolicy's decision is influenced by the following:
419 State files:
420 * ${STATE_DIR}/age-policy-urgencies: File containing urgencies for source
421 packages. Note that urgencies are "sticky" and the most "urgent" urgency
422 will be used (i.e. the one with lowest age-requirements).
423 - This file needs to be updated externally, if the policy should take
424 urgencies into consideration. If empty (or not updated), the policy
425 will simply use the default urgency (see the "Config" section below)
426 - In Debian, these values are taken from the .changes file, but that is
427 not a requirement for Britney.
428 * ${STATE_DIR}/age-policy-dates: File containing the age of all source
429 packages.
430 - The policy will automatically update this file.
431 Config:
432 * DEFAULT_URGENCY: Name of the urgency used for packages without an urgency
433 (or for unknown urgencies). Will also be used to set the "minimum"
434 aging requirements for packages not in the target suite.
435 * MINDAYS_<URGENCY>: The age-requirements in days for packages with the
436 given urgency.
437 - Commonly used urgencies are: low, medium, high, emergency, critical
438 Hints:
439 * urgent <source>/<version>: Disregard the age requirements for a given
440 source/version.
441 * age-days X <source>/<version>: Set the age requirements for a given
442 source/version to X days. Note that X can exceed the highest
443 age-requirement normally given.
445 """
447 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
448 super().__init__("age", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE})
449 self._min_days = self._generate_mindays_table()
450 self._min_days_default = 0
451 # britney's "day" begins at 7pm (we want aging to occur in the 22:00Z run and we run Britney 2-4 times a day)
452 # NB: _date_now is used in tests
453 time_now = time.time()
454 if hasattr(self.options, "fake_runtime"): 454 ↛ 455line 454 didn't jump to line 455 because the condition on line 454 was never true
455 time_now = int(self.options.fake_runtime)
456 self.logger.info("overriding runtime with fake_runtime %d" % time_now)
458 self._date_now = int(((time_now / (60 * 60)) - 19) / 24)
459 self._dates: dict[str, tuple[str, int]] = {}
460 self._urgencies: dict[str, str] = {}
461 self._default_urgency: str = self.options.default_urgency
462 self._penalty_immune_urgencies: frozenset[str] = frozenset()
463 if hasattr(self.options, "no_penalties"):
464 self._penalty_immune_urgencies = frozenset(
465 x.strip() for x in self.options.no_penalties.split()
466 )
467 self._bounty_min_age: int | None = None # initialised later
469 def _generate_mindays_table(self) -> dict[str, int]:
470 mindays: dict[str, int] = {}
471 for k in dir(self.options):
472 if not k.startswith("mindays_"):
473 continue
474 v = getattr(self.options, k)
475 try:
476 as_days = int(v)
477 except ValueError:
478 raise ValueError(
479 "Unable to parse "
480 + k
481 + " as a number of days. Must be 0 or a positive integer"
482 )
483 if as_days < 0: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true
484 raise ValueError(
485 "The value of " + k + " must be zero or a positive integer"
486 )
487 mindays[k.split("_")[1]] = as_days
488 return mindays
490 def register_hints(self, hint_parser: HintParser) -> None:
491 hint_parser.register_hint_type(
492 HintType(
493 "age-days",
494 simple_policy_hint_parser_function(AgeDayHint, int),
495 min_args=2,
496 )
497 )
498 hint_parser.register_hint_type(HintType("urgent"))
500 def initialise(self, britney: "Britney") -> None:
501 super().initialise(britney)
502 self._read_dates_file()
503 self._read_urgencies_file()
504 if self._default_urgency not in self._min_days: # pragma: no cover
505 raise ValueError(
506 "Missing age-requirement for default urgency (MINDAYS_%s)"
507 % self._default_urgency
508 )
509 self._min_days_default = self._min_days[self._default_urgency]
510 try:
511 self._bounty_min_age = int(self.options.bounty_min_age)
512 except ValueError: 512 ↛ 513line 512 didn't jump to line 513 because the exception caught by line 512 didn't happen
513 if self.options.bounty_min_age in self._min_days:
514 self._bounty_min_age = self._min_days[self.options.bounty_min_age]
515 else: # pragma: no cover
516 raise ValueError(
517 "Please fix BOUNTY_MIN_AGE in the britney configuration"
518 )
519 except AttributeError:
520 # The option wasn't defined in the configuration
521 self._bounty_min_age = 0
523 def save_state(self, britney: "Britney") -> None:
524 super().save_state(britney)
525 self._write_dates_file()
527 def apply_src_policy_impl(
528 self,
529 age_info: dict[str, Any],
530 item: MigrationItem,
531 source_data_tdist: SourcePackage | None,
532 source_data_srcdist: SourcePackage,
533 excuse: "Excuse",
534 ) -> PolicyVerdict:
535 # retrieve the urgency for the upload, ignoring it if this is a NEW package
536 # (not present in the target suite)
537 source_name = item.package
538 urgency = self._urgencies.get(source_name, self._default_urgency)
540 if urgency not in self._min_days: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true
541 age_info["unknown-urgency"] = urgency
542 urgency = self._default_urgency
544 if not source_data_tdist:
545 if self._min_days[urgency] < self._min_days_default:
546 age_info["urgency-reduced"] = {
547 "from": urgency,
548 "to": self._default_urgency,
549 }
550 urgency = self._default_urgency
552 if source_name not in self._dates:
553 self._dates[source_name] = (source_data_srcdist.version, self._date_now)
554 elif self._dates[source_name][0] != source_data_srcdist.version:
555 self._dates[source_name] = (source_data_srcdist.version, self._date_now)
557 days_old = self._date_now - self._dates[source_name][1]
558 min_days = self._min_days[urgency]
559 for bounty in excuse.bounty:
560 if excuse.bounty[bounty]: 560 ↛ 559line 560 didn't jump to line 559 because the condition on line 560 was always true
561 self.logger.info(
562 "Applying bounty for %s granted by %s: %d days",
563 source_name,
564 bounty,
565 excuse.bounty[bounty],
566 )
567 excuse.addinfo(
568 "Required age reduced by %d days because of %s"
569 % (excuse.bounty[bounty], bounty)
570 )
571 assert excuse.bounty[bounty] > 0, "negative bounties shouldn't happen"
572 min_days -= excuse.bounty[bounty]
573 if urgency not in self._penalty_immune_urgencies:
574 for penalty in excuse.penalty:
575 if excuse.penalty[penalty]: 575 ↛ 574line 575 didn't jump to line 574 because the condition on line 575 was always true
576 self.logger.info(
577 "Applying penalty for %s given by %s: %d days",
578 source_name,
579 penalty,
580 excuse.penalty[penalty],
581 )
582 excuse.addinfo(
583 "Required age increased by %d days because of %s"
584 % (excuse.penalty[penalty], penalty)
585 )
586 assert (
587 excuse.penalty[penalty] > 0
588 ), "negative penalties should be handled earlier"
589 min_days += excuse.penalty[penalty]
591 assert self._bounty_min_age is not None
592 # the age in BOUNTY_MIN_AGE can be higher than the one associated with
593 # the real urgency, so don't forget to take it into account
594 bounty_min_age = min(self._bounty_min_age, self._min_days[urgency])
595 if min_days < bounty_min_age:
596 min_days = bounty_min_age
597 excuse.addinfo(
598 "Required age is not allowed to drop below %d days" % min_days
599 )
601 age_info["current-age"] = days_old
603 assert self.hints is not None
604 for age_days_hint in cast(
605 "list[AgeDayHint]",
606 self.hints.search(
607 "age-days", package=source_name, version=source_data_srcdist.version
608 ),
609 ):
610 new_req = age_days_hint.days
611 age_info["age-requirement-reduced"] = {
612 "new-requirement": new_req,
613 "changed-by": age_days_hint.user,
614 }
615 if "original-age-requirement" not in age_info: 615 ↛ 617line 615 didn't jump to line 617 because the condition on line 615 was always true
616 age_info["original-age-requirement"] = min_days
617 min_days = new_req
619 age_info["age-requirement"] = min_days
620 res = PolicyVerdict.PASS
622 if days_old < min_days:
623 urgent_hints = self.hints.search(
624 "urgent", package=source_name, version=source_data_srcdist.version
625 )
626 if urgent_hints:
627 age_info["age-requirement-reduced"] = {
628 "new-requirement": 0,
629 "changed-by": urgent_hints[0].user,
630 }
631 res = PolicyVerdict.PASS_HINTED
632 else:
633 res = PolicyVerdict.REJECTED_TEMPORARILY
635 # update excuse
636 age_hint = age_info.get("age-requirement-reduced", None)
637 age_min_req = age_info["age-requirement"]
638 if age_hint:
639 new_req = age_hint["new-requirement"]
640 who = age_hint["changed-by"]
641 if new_req:
642 excuse.addinfo(
643 "Overriding age needed from %d days to %d by %s"
644 % (age_min_req, new_req, who)
645 )
646 age_min_req = new_req
647 else:
648 excuse.addinfo("Too young, but urgency pushed by %s" % who)
649 age_min_req = 0
650 excuse.setdaysold(age_info["current-age"], age_min_req)
652 if age_min_req == 0:
653 excuse.addinfo("%d days old" % days_old)
654 elif days_old < age_min_req:
655 excuse.add_verdict_info(
656 res, "Too young, only %d of %d days old" % (days_old, age_min_req)
657 )
658 else:
659 excuse.addinfo("%d days old (needed %d days)" % (days_old, age_min_req))
661 return res
663 def _read_dates_file(self) -> None:
664 """Parse the dates file"""
665 dates = self._dates
666 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Dates")
667 using_new_name = False
668 try:
669 filename = os.path.join(self.state_dir, "age-policy-dates")
670 if not os.path.exists(filename) and os.path.exists(fallback_filename): 670 ↛ 671line 670 didn't jump to line 671 because the condition on line 670 was never true
671 filename = fallback_filename
672 else:
673 using_new_name = True
674 except AttributeError:
675 if os.path.exists(fallback_filename):
676 filename = fallback_filename
677 else:
678 raise RuntimeError("Please set STATE_DIR in the britney configuration")
680 try:
681 with open(filename, encoding="utf-8") as fd:
682 for line in fd:
683 if line.startswith("#"):
684 # Ignore comment lines (mostly used for tests)
685 continue
686 # <source> <version> <date>)
687 ln = line.split()
688 if len(ln) != 3: # pragma: no cover
689 continue
690 try:
691 dates[ln[0]] = (ln[1], int(ln[2]))
692 except ValueError: # pragma: no cover
693 pass
694 except FileNotFoundError:
695 if not using_new_name: 695 ↛ 697line 695 didn't jump to line 697 because the condition on line 695 was never true
696 # If we using the legacy name, then just give up
697 raise
698 self.logger.info("%s does not appear to exist. Creating it", filename)
699 with open(filename, mode="x", encoding="utf-8"):
700 pass
702 def _read_urgencies_file(self) -> None:
703 urgencies = self._urgencies
704 min_days_default = self._min_days_default
705 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Urgency")
706 try:
707 filename = os.path.join(self.state_dir, "age-policy-urgencies")
708 if not os.path.exists(filename) and os.path.exists(fallback_filename): 708 ↛ 709line 708 didn't jump to line 709 because the condition on line 708 was never true
709 filename = fallback_filename
710 except AttributeError:
711 filename = fallback_filename
713 sources_s = self.suite_info.primary_source_suite.sources
714 sources_t = self.suite_info.target_suite.sources
716 with open(filename, errors="surrogateescape", encoding="ascii") as fd:
717 for line in fd:
718 if line.startswith("#"):
719 # Ignore comment lines (mostly used for tests)
720 continue
721 # <source> <version> <urgency>
722 ln = line.split()
723 if len(ln) != 3: 723 ↛ 724line 723 didn't jump to line 724 because the condition on line 723 was never true
724 continue
726 # read the minimum days associated with the urgencies
727 urgency_old = urgencies.get(ln[0], None)
728 mindays_old = self._min_days.get(urgency_old, 1000) # type: ignore[arg-type]
729 mindays_new = self._min_days.get(ln[2], min_days_default)
731 # if the new urgency is lower (so the min days are higher), do nothing
732 if mindays_old <= mindays_new:
733 continue
735 # if the package exists in the target suite and it is more recent, do nothing
736 tsrcv = sources_t.get(ln[0], None)
737 if tsrcv and apt_pkg.version_compare(tsrcv.version, ln[1]) >= 0:
738 continue
740 # if the package doesn't exist in the primary source suite or it is older, do nothing
741 usrcv = sources_s.get(ln[0], None)
742 if not usrcv or apt_pkg.version_compare(usrcv.version, ln[1]) < 0: 742 ↛ 743line 742 didn't jump to line 743 because the condition on line 742 was never true
743 continue
745 # update the urgency for the package
746 urgencies[ln[0]] = ln[2]
748 def _write_dates_file(self) -> None:
749 dates = self._dates
750 try:
751 directory = self.state_dir
752 basename = "age-policy-dates"
753 old_file = os.path.join(self.suite_info.target_suite.path, "Dates")
754 except AttributeError:
755 directory = self.suite_info.target_suite.path
756 basename = "Dates"
757 old_file = None
758 filename = os.path.join(directory, basename)
759 filename_tmp = os.path.join(directory, "%s_new" % basename)
760 with open(filename_tmp, "w", encoding="utf-8") as fd:
761 for pkg in sorted(dates):
762 version, date = dates[pkg]
763 fd.write("%s %s %d\n" % (pkg, version, date))
764 os.rename(filename_tmp, filename)
765 if old_file is not None and os.path.exists(old_file): 765 ↛ 766line 765 didn't jump to line 766 because the condition on line 765 was never true
766 self.logger.info("Removing old age-policy-dates file %s", old_file)
767 os.unlink(old_file)
770class RCBugPolicy(AbstractBasePolicy):
771 """RC bug regression policy for source migrations
773 The RCBugPolicy will read provided list of RC bugs and block any
774 source upload that would introduce a *new* RC bug in the target
775 suite.
777 The RCBugPolicy's decision is influenced by the following:
779 State files:
780 * ${STATE_DIR}/rc-bugs-${SUITE_NAME}: File containing RC bugs for packages in
781 the given suite (one for both primary source suite and the target sutie is
782 needed).
783 - These files need to be updated externally.
784 """
786 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
787 super().__init__(
788 "rc-bugs", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
789 )
790 self._bugs_source: dict[str, set[str]] | None = None
791 self._bugs_target: dict[str, set[str]] | None = None
793 def register_hints(self, hint_parser: HintParser) -> None:
794 f = simple_policy_hint_parser_function(
795 IgnoreRCBugHint, lambda x: frozenset(x.split(","))
796 )
797 hint_parser.register_hint_type(HintType("ignore-rc-bugs", f, min_args=2))
799 def initialise(self, britney: "Britney") -> None:
800 super().initialise(britney)
801 source_suite = self.suite_info.primary_source_suite
802 target_suite = self.suite_info.target_suite
803 fallback_unstable = os.path.join(source_suite.path, "BugsV")
804 fallback_testing = os.path.join(target_suite.path, "BugsV")
805 try:
806 filename_unstable = os.path.join(
807 self.state_dir, "rc-bugs-%s" % source_suite.name
808 )
809 filename_testing = os.path.join(
810 self.state_dir, "rc-bugs-%s" % target_suite.name
811 )
812 if ( 812 ↛ 818line 812 didn't jump to line 818
813 not os.path.exists(filename_unstable)
814 and not os.path.exists(filename_testing)
815 and os.path.exists(fallback_unstable)
816 and os.path.exists(fallback_testing)
817 ):
818 filename_unstable = fallback_unstable
819 filename_testing = fallback_testing
820 except AttributeError:
821 filename_unstable = fallback_unstable
822 filename_testing = fallback_testing
823 self._bugs_source = self._read_bugs(filename_unstable)
824 self._bugs_target = self._read_bugs(filename_testing)
826 def apply_src_policy_impl(
827 self,
828 rcbugs_info: dict[str, Any],
829 item: MigrationItem,
830 source_data_tdist: SourcePackage | None,
831 source_data_srcdist: SourcePackage,
832 excuse: "Excuse",
833 ) -> PolicyVerdict:
834 assert self._bugs_source is not None # for type checking
835 assert self._bugs_target is not None # for type checking
836 bugs_t = set()
837 bugs_u = set()
838 source_name = item.package
840 for src_key in (source_name, "src:%s" % source_name):
841 if source_data_tdist and src_key in self._bugs_target:
842 bugs_t.update(self._bugs_target[src_key])
843 if src_key in self._bugs_source:
844 bugs_u.update(self._bugs_source[src_key])
846 for pkg, _, _ in source_data_srcdist.binaries:
847 if pkg in self._bugs_source:
848 bugs_u |= self._bugs_source[pkg]
849 if source_data_tdist:
850 for pkg, _, _ in source_data_tdist.binaries:
851 if pkg in self._bugs_target:
852 bugs_t |= self._bugs_target[pkg]
854 # If a package is not in the target suite, it has no RC bugs per
855 # definition. Unfortunately, it seems that the live-data is
856 # not always accurate (e.g. live-2011-12-13 suggests that
857 # obdgpslogger had the same bug in testing and unstable,
858 # but obdgpslogger was not in testing at that time).
859 # - For the curious, obdgpslogger was removed on that day
860 # and the BTS probably had not caught up with that fact.
861 # (https://tracker.debian.org/news/415935)
862 assert not bugs_t or source_data_tdist, (
863 "%s had bugs in the target suite but is not present" % source_name
864 )
866 verdict = PolicyVerdict.PASS
868 assert self.hints is not None
869 for ignore_hint in cast(
870 list[IgnoreRCBugHint],
871 self.hints.search(
872 "ignore-rc-bugs",
873 package=source_name,
874 version=source_data_srcdist.version,
875 ),
876 ):
877 ignored_bugs = ignore_hint.ignored_rcbugs
879 # Only handle one hint for now
880 if "ignored-bugs" in rcbugs_info: 880 ↛ 881line 880 didn't jump to line 881 because the condition on line 880 was never true
881 self.logger.info(
882 "Ignoring ignore-rc-bugs hint from %s on %s due to another hint from %s",
883 ignore_hint.user,
884 source_name,
885 rcbugs_info["ignored-bugs"]["issued-by"],
886 )
887 continue
888 if not ignored_bugs.isdisjoint(bugs_u): 888 ↛ 897line 888 didn't jump to line 897 because the condition on line 888 was always true
889 bugs_u -= ignored_bugs
890 bugs_t -= ignored_bugs
891 rcbugs_info["ignored-bugs"] = {
892 "bugs": sorted(ignored_bugs),
893 "issued-by": ignore_hint.user,
894 }
895 verdict = PolicyVerdict.PASS_HINTED
896 else:
897 self.logger.info(
898 "Ignoring ignore-rc-bugs hint from %s on %s as none of %s affect the package",
899 ignore_hint.user,
900 source_name,
901 str(ignored_bugs),
902 )
904 rcbugs_info["shared-bugs"] = sorted(bugs_u & bugs_t)
905 rcbugs_info["unique-source-bugs"] = sorted(bugs_u - bugs_t)
906 rcbugs_info["unique-target-bugs"] = sorted(bugs_t - bugs_u)
908 # update excuse
909 new_bugs = rcbugs_info["unique-source-bugs"]
910 old_bugs = rcbugs_info["unique-target-bugs"]
911 excuse.setbugs(old_bugs, new_bugs)
913 if new_bugs:
914 verdict = PolicyVerdict.REJECTED_PERMANENTLY
915 excuse.add_verdict_info(
916 verdict,
917 "Updating %s would introduce bugs in %s: %s"
918 % (
919 source_name,
920 self.suite_info.target_suite.name,
921 ", ".join(
922 [
923 '<a href="https://bugs.debian.org/%s">#%s</a>'
924 % (quote(a), a)
925 for a in new_bugs
926 ]
927 ),
928 ),
929 )
931 if old_bugs:
932 excuse.addinfo(
933 "Updating %s will fix bugs in %s: %s"
934 % (
935 source_name,
936 self.suite_info.target_suite.name,
937 ", ".join(
938 [
939 '<a href="https://bugs.debian.org/%s">#%s</a>'
940 % (quote(a), a)
941 for a in old_bugs
942 ]
943 ),
944 )
945 )
947 return verdict
949 def _read_bugs(self, filename: str) -> dict[str, set[str]]:
950 """Read the release critical bug summary from the specified file
952 The file contains rows with the format:
954 <package-name> <bug number>[,<bug number>...]
956 The method returns a dictionary where the key is the binary package
957 name and the value is the list of open RC bugs for it.
958 """
959 bugs: dict[str, set[str]] = {}
960 self.logger.info("Loading RC bugs data from %s", filename)
961 with open(filename, encoding="ascii") as f:
962 for line in f:
963 ln = line.split()
964 if len(ln) != 2: # pragma: no cover
965 self.logger.warning("Malformed line found in line %s", line)
966 continue
967 pkg = ln[0]
968 if pkg not in bugs: 968 ↛ 962line 968 didn't jump to line 962 because the condition on line 968 was always true
969 bugs[pkg] = set()
970 bugs[pkg].update(ln[1].split(","))
971 return bugs
974class PiupartsPolicy(AbstractBasePolicy):
975 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
976 super().__init__(
977 "piuparts", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
978 )
979 self._piuparts_source: dict[str, tuple[str, str]] | None = None
980 self._piuparts_target: dict[str, tuple[str, str]] | None = None
982 def register_hints(self, hint_parser: HintParser) -> None:
983 hint_parser.register_hint_type(HintType("ignore-piuparts"))
985 def initialise(self, britney: "Britney") -> None:
986 super().initialise(britney)
987 source_suite = self.suite_info.primary_source_suite
988 target_suite = self.suite_info.target_suite
989 try:
990 filename_unstable = os.path.join(
991 self.state_dir, "piuparts-summary-%s.json" % source_suite.name
992 )
993 filename_testing = os.path.join(
994 self.state_dir, "piuparts-summary-%s.json" % target_suite.name
995 )
996 except AttributeError as e: # pragma: no cover
997 raise RuntimeError(
998 "Please set STATE_DIR in the britney configuration"
999 ) from e
1000 self._piuparts_source = self._read_piuparts_summary(
1001 filename_unstable, keep_url=True
1002 )
1003 self._piuparts_target = self._read_piuparts_summary(
1004 filename_testing, keep_url=False
1005 )
1007 def apply_src_policy_impl(
1008 self,
1009 piuparts_info: dict[str, Any],
1010 item: MigrationItem,
1011 source_data_tdist: SourcePackage | None,
1012 source_data_srcdist: SourcePackage,
1013 excuse: "Excuse",
1014 ) -> PolicyVerdict:
1015 assert self._piuparts_source is not None # for type checking
1016 assert self._piuparts_target is not None # for type checking
1017 source_name = item.package
1019 if source_name in self._piuparts_target:
1020 testing_state = self._piuparts_target[source_name][0]
1021 else:
1022 testing_state = "X"
1023 url: str | None
1024 if source_name in self._piuparts_source:
1025 unstable_state, url = self._piuparts_source[source_name]
1026 else:
1027 unstable_state = "X"
1028 url = None
1029 url_html = "(no link yet)"
1030 if url is not None:
1031 url_html = '<a href="{0}">{0}</a>'.format(url)
1033 if unstable_state == "P":
1034 # Not a regression
1035 msg = f"Piuparts tested OK - {url_html}"
1036 result = PolicyVerdict.PASS
1037 piuparts_info["test-results"] = "pass"
1038 elif unstable_state == "F":
1039 if testing_state != unstable_state:
1040 piuparts_info["test-results"] = "regression"
1041 msg = f"Rejected due to piuparts regression - {url_html}"
1042 result = PolicyVerdict.REJECTED_PERMANENTLY
1043 else:
1044 piuparts_info["test-results"] = "failed"
1045 msg = f"Ignoring piuparts failure (Not a regression) - {url_html}"
1046 result = PolicyVerdict.PASS
1047 elif unstable_state == "W":
1048 msg = f"Waiting for piuparts test results (stalls migration) - {url_html}"
1049 result = PolicyVerdict.REJECTED_TEMPORARILY
1050 piuparts_info["test-results"] = "waiting-for-test-results"
1051 else:
1052 msg = f"Cannot be tested by piuparts (not a blocker) - {url_html}"
1053 piuparts_info["test-results"] = "cannot-be-tested"
1054 result = PolicyVerdict.PASS
1056 if url is not None:
1057 piuparts_info["piuparts-test-url"] = url
1058 if result.is_rejected:
1059 excuse.add_verdict_info(result, msg)
1060 else:
1061 excuse.addinfo(msg)
1063 if result.is_rejected:
1064 assert self.hints is not None
1065 for ignore_hint in self.hints.search(
1066 "ignore-piuparts",
1067 package=source_name,
1068 version=source_data_srcdist.version,
1069 ):
1070 piuparts_info["ignored-piuparts"] = {"issued-by": ignore_hint.user}
1071 result = PolicyVerdict.PASS_HINTED
1072 excuse.addinfo(
1073 f"Ignoring piuparts issue as requested by {ignore_hint.user}"
1074 )
1075 break
1077 return result
1079 def _read_piuparts_summary(
1080 self, filename: str, keep_url: bool = True
1081 ) -> dict[str, tuple[str, str]]:
1082 summary: dict[str, tuple[str, str]] = {}
1083 self.logger.info("Loading piuparts report from %s", filename)
1084 with open(filename) as fd: 1084 ↛ exitline 1084 didn't return from function '_read_piuparts_summary' because the return on line 1086 wasn't executed
1085 if os.fstat(fd.fileno()).st_size < 1: 1085 ↛ 1086line 1085 didn't jump to line 1086 because the condition on line 1085 was never true
1086 return summary
1087 data = json.load(fd)
1088 try:
1089 if (
1090 data["_id"] != "Piuparts Package Test Results Summary"
1091 or data["_version"] != "1.0"
1092 ): # pragma: no cover
1093 raise ValueError(
1094 f"Piuparts results in {filename} does not have the correct ID or version"
1095 )
1096 except KeyError as e: # pragma: no cover
1097 raise ValueError(
1098 f"Piuparts results in {filename} is missing id or version field"
1099 ) from e
1100 for source, suite_data in data["packages"].items():
1101 if len(suite_data) != 1: # pragma: no cover
1102 raise ValueError(
1103 f"Piuparts results in {filename}, the source {source} does not have "
1104 "exactly one result set"
1105 )
1106 item = next(iter(suite_data.values()))
1107 state, _, url = item
1108 if not keep_url:
1109 url = None
1110 summary[source] = (state, url)
1112 return summary
1115class DependsPolicy(AbstractBasePolicy):
1116 pkg_universe: "BinaryPackageUniverse"
1117 broken_packages: frozenset["BinaryPackageId"]
1118 all_binaries: dict["BinaryPackageId", "BinaryPackage"]
1119 allow_uninst: dict[str, set[str | None]]
1121 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1122 super().__init__(
1123 "depends",
1124 options,
1125 suite_info,
1126 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1127 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1128 )
1129 self.nobreakall_arches = None
1130 self.new_arches = None
1131 self.break_arches = None
1133 def initialise(self, britney: "Britney") -> None:
1134 super().initialise(britney)
1135 self.pkg_universe = britney.pkg_universe
1136 self.broken_packages = self.pkg_universe.broken_packages
1137 self.all_binaries = britney.all_binaries
1138 self.nobreakall_arches = self.options.nobreakall_arches
1139 self.new_arches = self.options.new_arches
1140 self.break_arches = self.options.break_arches
1141 self.allow_uninst = britney.allow_uninst
1143 def apply_srcarch_policy_impl(
1144 self,
1145 deps_info: dict[str, Any],
1146 item: MigrationItem,
1147 arch: str,
1148 source_data_tdist: SourcePackage | None,
1149 source_data_srcdist: SourcePackage,
1150 excuse: "Excuse",
1151 ) -> PolicyVerdict:
1152 verdict = PolicyVerdict.PASS
1154 assert self.break_arches is not None
1155 assert self.new_arches is not None
1156 if arch in self.break_arches or arch in self.new_arches:
1157 # we don't check these in the policy (TODO - for now?)
1158 return verdict
1160 source_suite = item.suite
1161 target_suite = self.suite_info.target_suite
1163 packages_s_a = source_suite.binaries[arch]
1164 packages_t_a = target_suite.binaries[arch]
1166 my_bins = sorted(excuse.packages[arch])
1168 arch_all_installable = set()
1169 arch_arch_installable = set()
1170 consider_it_regression = True
1172 for pkg_id in my_bins:
1173 pkg_name = pkg_id.package_name
1174 binary_u = packages_s_a[pkg_name]
1175 pkg_arch = binary_u.architecture
1177 # in some cases, we want to track the uninstallability of a
1178 # package (because the autopkgtest policy uses this), but we still
1179 # want to allow the package to be uninstallable
1180 skip_dep_check = False
1182 if binary_u.source_version != source_data_srcdist.version:
1183 # don't check cruft in unstable
1184 continue
1186 if item.architecture != "source" and pkg_arch == "all":
1187 # we don't care about the existing arch: all binaries when
1188 # checking a binNMU item, because the arch: all binaries won't
1189 # migrate anyway
1190 skip_dep_check = True
1192 if pkg_arch == "all" and arch not in self.nobreakall_arches:
1193 skip_dep_check = True
1195 if pkg_name in self.allow_uninst[arch]: 1195 ↛ 1198line 1195 didn't jump to line 1198 because the condition on line 1195 was never true
1196 # this binary is allowed to become uninstallable, so we don't
1197 # need to check anything
1198 skip_dep_check = True
1200 if pkg_name in packages_t_a:
1201 oldbin = packages_t_a[pkg_name]
1202 if not target_suite.is_installable(oldbin.pkg_id):
1203 # as the current binary in testing is already
1204 # uninstallable, the newer version is allowed to be
1205 # uninstallable as well, so we don't need to check
1206 # anything
1207 skip_dep_check = True
1208 consider_it_regression = False
1210 if pkg_id in self.broken_packages:
1211 if pkg_arch == "all":
1212 arch_all_installable.add(False)
1213 else:
1214 arch_arch_installable.add(False)
1215 # dependencies can't be satisfied by all the known binaries -
1216 # this certainly won't work...
1217 excuse.add_unsatisfiable_on_arch(arch)
1218 if skip_dep_check:
1219 # ...but if the binary is allowed to become uninstallable,
1220 # we don't care
1221 # we still want the binary to be listed as uninstallable,
1222 continue
1223 verdict = PolicyVerdict.REJECTED_PERMANENTLY
1224 excuse.add_verdict_info(
1225 verdict, f"{pkg_name}/{arch} has unsatisfiable dependency"
1226 )
1227 excuse.addreason("depends")
1228 else:
1229 if pkg_arch == "all":
1230 arch_all_installable.add(True)
1231 else:
1232 arch_arch_installable.add(True)
1234 if skip_dep_check:
1235 continue
1237 deps = self.pkg_universe.dependencies_of(pkg_id)
1239 for dep in deps:
1240 # dep is a list of packages, each of which satisfy the
1241 # dependency
1243 if dep == frozenset():
1244 continue
1245 is_ok = False
1246 needed_for_dep = set()
1248 for alternative in dep:
1249 if target_suite.is_pkg_in_the_suite(alternative):
1250 # dep can be satisfied in testing - ok
1251 is_ok = True
1252 elif alternative in my_bins:
1253 # can be satisfied by binary from same item: will be
1254 # ok if item migrates
1255 is_ok = True
1256 else:
1257 needed_for_dep.add(alternative)
1259 if not is_ok:
1260 spec = DependencySpec(DependencyType.DEPENDS, arch)
1261 excuse.add_package_depends(spec, needed_for_dep)
1263 # The autopkgtest policy needs delicate trade offs for
1264 # non-installability. The current choice (considering source
1265 # migration and only binaries built by the version of the
1266 # source):
1267 #
1268 # * Run autopkgtest if all arch:$arch binaries are installable
1269 # (but some or all arch:all binaries are not)
1270 #
1271 # * Don't schedule nor wait for not installable arch:all only package
1272 # on ! NOBREAKALL_ARCHES
1273 #
1274 # * Run autopkgtest if installability isn't a regression (there are (or
1275 # rather, should) not be a lot of packages in this state, and most
1276 # likely they'll just fail quickly)
1277 #
1278 # * Don't schedule, but wait otherwise
1279 if arch_arch_installable == {True} and False in arch_all_installable: 1279 ↛ 1280line 1279 didn't jump to line 1280 because the condition on line 1279 was never true
1280 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch)
1281 elif (
1282 arch not in self.nobreakall_arches
1283 and arch_arch_installable == set()
1284 and False in arch_all_installable
1285 ):
1286 deps_info.setdefault("arch_all_not_installable", []).append(arch)
1287 elif not consider_it_regression:
1288 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch)
1290 return verdict
1293@unique
1294class BuildDepResult(IntEnum):
1295 # relation is satisfied in target
1296 OK = 1
1297 # relation can be satisfied by other packages in source
1298 DEPENDS = 2
1299 # relation cannot be satisfied
1300 FAILED = 3
1303class BuildDependsPolicy(AbstractBasePolicy):
1305 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1306 super().__init__(
1307 "build-depends",
1308 options,
1309 suite_info,
1310 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1311 )
1312 self._all_buildarch: list[str] = []
1314 parse_option(options, "all_buildarch")
1316 def initialise(self, britney: "Britney") -> None:
1317 super().initialise(britney)
1318 if self.options.all_buildarch:
1319 self._all_buildarch = SuiteContentLoader.config_str_as_list(
1320 self.options.all_buildarch, []
1321 )
1323 def apply_src_policy_impl(
1324 self,
1325 build_deps_info: dict[str, Any],
1326 item: MigrationItem,
1327 source_data_tdist: SourcePackage | None,
1328 source_data_srcdist: SourcePackage,
1329 excuse: "Excuse",
1330 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers,
1331 ) -> PolicyVerdict:
1332 verdict = PolicyVerdict.PASS
1334 # analyze the dependency fields (if present)
1335 if deps := source_data_srcdist.build_deps_arch:
1336 v = self._check_build_deps(
1337 deps,
1338 DependencyType.BUILD_DEPENDS,
1339 build_deps_info,
1340 item,
1341 source_data_tdist,
1342 source_data_srcdist,
1343 excuse,
1344 get_dependency_solvers=get_dependency_solvers,
1345 )
1346 verdict = PolicyVerdict.worst_of(verdict, v)
1348 if ideps := source_data_srcdist.build_deps_indep:
1349 v = self._check_build_deps(
1350 ideps,
1351 DependencyType.BUILD_DEPENDS_INDEP,
1352 build_deps_info,
1353 item,
1354 source_data_tdist,
1355 source_data_srcdist,
1356 excuse,
1357 get_dependency_solvers=get_dependency_solvers,
1358 )
1359 verdict = PolicyVerdict.worst_of(verdict, v)
1361 return verdict
1363 def _get_check_archs(
1364 self, archs: Container[str], dep_type: DependencyType
1365 ) -> list[str]:
1366 oos = self.options.outofsync_arches
1368 if dep_type == DependencyType.BUILD_DEPENDS:
1369 return [
1370 arch
1371 for arch in self.options.architectures
1372 if arch in archs and arch not in oos
1373 ]
1375 # first try the all buildarch
1376 checkarchs = list(self._all_buildarch)
1377 # then try the architectures where this source has arch specific
1378 # binaries (in the order of the architecture config file)
1379 checkarchs.extend(
1380 arch
1381 for arch in self.options.architectures
1382 if arch in archs and arch not in checkarchs
1383 )
1384 # then try all other architectures
1385 checkarchs.extend(
1386 arch for arch in self.options.architectures if arch not in checkarchs
1387 )
1389 # and drop OUTOFSYNC_ARCHES
1390 return [arch for arch in checkarchs if arch not in oos]
1392 def _add_info_for_arch(
1393 self,
1394 arch: str,
1395 excuses_info: dict[str, list[str]],
1396 blockers: dict[str, set[BinaryPackageId]],
1397 results: dict[str, BuildDepResult],
1398 dep_type: DependencyType,
1399 target_suite: TargetSuite,
1400 source_suite: Suite,
1401 excuse: "Excuse",
1402 verdict: PolicyVerdict,
1403 ) -> PolicyVerdict:
1404 if arch in blockers:
1405 packages = blockers[arch]
1407 # for the solving packages, update the excuse to add the dependencies
1408 for p in packages:
1409 if arch not in self.options.break_arches: 1409 ↛ 1408line 1409 didn't jump to line 1408 because the condition on line 1409 was always true
1410 spec = DependencySpec(dep_type, arch)
1411 excuse.add_package_depends(spec, {p})
1413 if arch in results and results[arch] == BuildDepResult.FAILED:
1414 verdict = PolicyVerdict.worst_of(
1415 verdict, PolicyVerdict.REJECTED_PERMANENTLY
1416 )
1418 if arch in excuses_info:
1419 for excuse_text in excuses_info[arch]:
1420 if verdict.is_rejected: 1420 ↛ 1423line 1420 didn't jump to line 1423 because the condition on line 1420 was always true
1421 excuse.add_verdict_info(verdict, excuse_text)
1422 else:
1423 excuse.addinfo(excuse_text)
1425 return verdict
1427 def _check_build_deps(
1428 self,
1429 deps: str,
1430 dep_type: DependencyType,
1431 build_deps_info: dict[str, Any],
1432 item: MigrationItem,
1433 source_data_tdist: SourcePackage | None,
1434 source_data_srcdist: SourcePackage,
1435 excuse: "Excuse",
1436 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers,
1437 ) -> PolicyVerdict:
1438 verdict = PolicyVerdict.PASS
1439 any_arch_ok = dep_type == DependencyType.BUILD_DEPENDS_INDEP
1441 britney = self.britney
1443 # local copies for better performance
1444 parse_src_depends = apt_pkg.parse_src_depends
1446 source_name = item.package
1447 source_suite = item.suite
1448 target_suite = self.suite_info.target_suite
1449 binaries_s = source_suite.binaries
1450 provides_s = source_suite.provides_table
1451 binaries_t = target_suite.binaries
1452 provides_t = target_suite.provides_table
1453 unsat_bd: dict[str, list[str]] = {}
1454 relevant_archs: set[str] = {
1455 binary.architecture
1456 for binary in source_data_srcdist.binaries
1457 if britney.all_binaries[binary].architecture != "all"
1458 }
1460 excuses_info: dict[str, list[str]] = defaultdict(list)
1461 blockers: dict[str, set[BinaryPackageId]] = defaultdict(set)
1462 arch_results = {}
1463 result_archs = defaultdict(list)
1464 bestresult = BuildDepResult.FAILED
1465 check_archs = self._get_check_archs(relevant_archs, dep_type)
1466 if not check_archs:
1467 # when the arch list is empty, we check the b-d on any arch, instead of all archs
1468 # this happens for Build-Depens on a source package that only produces arch: all binaries
1469 any_arch_ok = True
1470 check_archs = self._get_check_archs(
1471 self.options.architectures, DependencyType.BUILD_DEPENDS_INDEP
1472 )
1474 for arch in check_archs:
1475 # retrieve the binary package from the specified suite and arch
1476 binaries_s_a = binaries_s[arch]
1477 provides_s_a = provides_s[arch]
1478 binaries_t_a = binaries_t[arch]
1479 provides_t_a = provides_t[arch]
1480 arch_results[arch] = BuildDepResult.OK
1481 # for every dependency block (formed as conjunction of disjunction)
1482 for block_txt in deps.split(","):
1483 block_list = parse_src_depends(block_txt, False, arch)
1484 # Unlike regular dependencies, some clauses of the Build-Depends(-Arch|-Indep) can be
1485 # filtered out by (e.g.) architecture restrictions. We need to cope with this while
1486 # keeping block_txt and block aligned.
1487 if not block_list:
1488 # Relation is not relevant for this architecture.
1489 continue
1490 block = block_list[0]
1491 # if the block is satisfied in the target suite, then skip the block
1492 if get_dependency_solvers(
1493 block, binaries_t_a, provides_t_a, build_depends=True
1494 ):
1495 # Satisfied in the target suite; all ok.
1496 continue
1498 # check if the block can be satisfied in the source suite, and list the solving packages
1499 packages = get_dependency_solvers(
1500 block, binaries_s_a, provides_s_a, build_depends=True
1501 )
1502 sources = sorted(p.source for p in packages)
1504 # if the dependency can be satisfied by the same source package, skip the block:
1505 # obviously both binary packages will enter the target suite together
1506 if source_name in sources: 1506 ↛ 1507line 1506 didn't jump to line 1507 because the condition on line 1506 was never true
1507 continue
1509 # if no package can satisfy the dependency, add this information to the excuse
1510 if not packages:
1511 excuses_info[arch].append(
1512 "%s unsatisfiable %s on %s: %s"
1513 % (source_name, dep_type, arch, block_txt.strip())
1514 )
1515 if arch not in unsat_bd: 1515 ↛ 1517line 1515 didn't jump to line 1517 because the condition on line 1515 was always true
1516 unsat_bd[arch] = []
1517 unsat_bd[arch].append(block_txt.strip())
1518 arch_results[arch] = BuildDepResult.FAILED
1519 continue
1521 blockers[arch].update(p.pkg_id for p in packages)
1522 if arch_results[arch] < BuildDepResult.DEPENDS:
1523 arch_results[arch] = BuildDepResult.DEPENDS
1525 if any_arch_ok:
1526 if arch_results[arch] < bestresult:
1527 bestresult = arch_results[arch]
1528 result_archs[arch_results[arch]].append(arch)
1529 if bestresult == BuildDepResult.OK: 1529 ↛ 1532line 1529 didn't jump to line 1532 because the condition on line 1529 was never true
1530 # we found an architecture where the b-deps-indep are
1531 # satisfied in the target suite, so we can stop
1532 break
1534 if any_arch_ok:
1535 arch = result_archs[bestresult][0]
1536 excuse.add_detailed_info(f"Checking {dep_type.get_description()} on {arch}")
1537 key = "check-%s-on-arch" % dep_type.get_reason()
1538 build_deps_info[key] = arch
1539 verdict = self._add_info_for_arch(
1540 arch,
1541 excuses_info,
1542 blockers,
1543 arch_results,
1544 dep_type,
1545 target_suite,
1546 source_suite,
1547 excuse,
1548 verdict,
1549 )
1551 else:
1552 for arch in check_archs:
1553 verdict = self._add_info_for_arch(
1554 arch,
1555 excuses_info,
1556 blockers,
1557 arch_results,
1558 dep_type,
1559 target_suite,
1560 source_suite,
1561 excuse,
1562 verdict,
1563 )
1565 if unsat_bd:
1566 build_deps_info["unsatisfiable-arch-build-depends"] = unsat_bd
1568 return verdict
1571class BuiltUsingPolicy(AbstractBasePolicy):
1572 """Built-Using policy
1574 Binaries that incorporate (part of) another source package must list these
1575 sources under 'Built-Using'.
1577 This policy checks if the corresponding sources are available in the
1578 target suite. If they are not, but they are candidates for migration, a
1579 dependency is added.
1581 If the binary incorporates a newer version of a source, that is not (yet)
1582 a candidate, we don't want to accept that binary. A rebuild later in the
1583 primary suite wouldn't fix the issue, because that would incorporate the
1584 newer version again.
1586 If the binary incorporates an older version of the source, a newer version
1587 will be accepted as a replacement. We assume that this can be fixed by
1588 rebuilding the binary at some point during the development cycle.
1590 Requiring exact version of the source would not be useful in practice. A
1591 newer upload of that source wouldn't be blocked by this policy, so the
1592 built-using would be outdated anyway.
1594 """
1596 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1597 super().__init__(
1598 "built-using",
1599 options,
1600 suite_info,
1601 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1602 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1603 )
1605 def initialise(self, britney: "Britney") -> None:
1606 super().initialise(britney)
1608 def apply_srcarch_policy_impl(
1609 self,
1610 build_deps_info: dict[str, Any],
1611 item: MigrationItem,
1612 arch: str,
1613 source_data_tdist: SourcePackage | None,
1614 source_data_srcdist: SourcePackage,
1615 excuse: "Excuse",
1616 ) -> PolicyVerdict:
1617 verdict = PolicyVerdict.PASS
1619 source_suite = item.suite
1620 target_suite = self.suite_info.target_suite
1621 binaries_s = source_suite.binaries
1623 def check_bu_in_suite(
1624 bu_source: str, bu_version: str, source_suite: Suite
1625 ) -> bool:
1626 found = False
1627 if bu_source not in source_suite.sources:
1628 return found
1629 s_source = source_suite.sources[bu_source]
1630 s_ver = s_source.version
1631 if apt_pkg.version_compare(s_ver, bu_version) >= 0:
1632 found = True
1633 dep = PackageId(bu_source, s_ver, "source")
1634 if arch in self.options.break_arches: 1634 ↛ 1635line 1634 didn't jump to line 1635 because the condition on line 1634 was never true
1635 excuse.add_detailed_info(
1636 "Ignoring Built-Using for %s/%s on %s"
1637 % (pkg_name, arch, dep.uvname)
1638 )
1639 else:
1640 spec = DependencySpec(DependencyType.BUILT_USING, arch)
1641 excuse.add_package_depends(spec, {dep})
1642 excuse.add_detailed_info(
1643 f"{pkg_name}/{arch} has Built-Using on {dep.uvname}"
1644 )
1646 return found
1648 for pkg_id in sorted(
1649 x for x in source_data_srcdist.binaries if x.architecture == arch
1650 ):
1651 pkg_name = pkg_id.package_name
1653 # retrieve the testing (if present) and unstable corresponding binary packages
1654 binary_s = binaries_s[arch][pkg_name]
1656 for bu in binary_s.builtusing:
1657 bu_source = bu[0]
1658 bu_version = bu[1]
1659 found = False
1660 if bu_source in target_suite.sources:
1661 t_source = target_suite.sources[bu_source]
1662 t_ver = t_source.version
1663 if apt_pkg.version_compare(t_ver, bu_version) >= 0:
1664 found = True
1666 if not found:
1667 found = check_bu_in_suite(bu_source, bu_version, source_suite)
1669 if not found and source_suite.suite_class.is_additional_source:
1670 found = check_bu_in_suite(
1671 bu_source, bu_version, self.suite_info.primary_source_suite
1672 )
1674 if not found:
1675 if arch in self.options.break_arches: 1675 ↛ 1676line 1675 didn't jump to line 1676 because the condition on line 1675 was never true
1676 excuse.add_detailed_info(
1677 "Ignoring unsatisfiable Built-Using for %s/%s on %s %s"
1678 % (pkg_name, arch, bu_source, bu_version)
1679 )
1680 else:
1681 verdict = PolicyVerdict.worst_of(
1682 verdict, PolicyVerdict.REJECTED_PERMANENTLY
1683 )
1684 excuse.add_verdict_info(
1685 verdict,
1686 "%s/%s has unsatisfiable Built-Using on %s %s"
1687 % (pkg_name, arch, bu_source, bu_version),
1688 )
1690 return verdict
1693class BlockPolicy(AbstractBasePolicy):
1694 BLOCK_HINT_REGEX = re.compile("^(un)?(block-?.*)$")
1696 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1697 super().__init__(
1698 "block",
1699 options,
1700 suite_info,
1701 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1702 )
1703 self._blockall: dict[str | None, Hint] = {}
1705 def initialise(self, britney: "Britney") -> None:
1706 super().initialise(britney)
1707 assert self.hints is not None
1708 for hint in self.hints.search(type="block-all"):
1709 self._blockall[hint.package] = hint
1711 self._key_packages = []
1712 if "key" in self._blockall:
1713 self._key_packages = self._read_key_packages()
1715 def _read_key_packages(self) -> list[str]:
1716 """Read the list of key packages
1718 The file contains data in the yaml format :
1720 - reason: <something>
1721 source: <package>
1723 The method returns a list of all key packages.
1724 """
1725 filename = os.path.join(self.state_dir, "key_packages.yaml")
1726 self.logger.info("Loading key packages from %s", filename)
1727 if os.path.exists(filename): 1727 ↛ 1732line 1727 didn't jump to line 1732 because the condition on line 1727 was always true
1728 with open(filename) as f:
1729 data = yaml.safe_load(f)
1730 key_packages = [item["source"] for item in data]
1731 else:
1732 self.logger.error(
1733 "Britney was asked to block key packages, "
1734 + "but no key_packages.yaml file was found."
1735 )
1736 sys.exit(1)
1738 return key_packages
1740 def register_hints(self, hint_parser: HintParser) -> None:
1741 # block related hints are currently defined in hint.py
1742 pass
1744 def _check_blocked(
1745 self, item: "MigrationItem", arch: str, version: str, excuse: "Excuse"
1746 ) -> PolicyVerdict:
1747 verdict = PolicyVerdict.PASS
1748 blocked = {}
1749 unblocked = {}
1750 block_info = {}
1751 source_suite = item.suite
1752 suite_name = source_suite.name
1753 src = item.package
1754 is_primary = source_suite.suite_class == SuiteClass.PRIMARY_SOURCE_SUITE
1756 tooltip = (
1757 "please contact %s-release if update is needed" % self.options.distribution
1758 )
1760 assert self.hints is not None
1761 shints = self.hints.search(package=src)
1762 mismatches = False
1763 r = self.BLOCK_HINT_REGEX
1764 for hint in shints:
1765 m = r.match(hint.type)
1766 if m:
1767 if m.group(1) == "un":
1768 assert hint.suite is not None
1769 if (
1770 hint.version != version
1771 or hint.suite.name != suite_name
1772 or (hint.architecture != arch and hint.architecture != "source")
1773 ):
1774 self.logger.info(
1775 "hint mismatch: %s %s %s", version, arch, suite_name
1776 )
1777 mismatches = True
1778 else:
1779 unblocked[m.group(2)] = hint.user
1780 excuse.add_hint(hint)
1781 else:
1782 # block(-*) hint: only accepts a source, so this will
1783 # always match
1784 blocked[m.group(2)] = hint.user
1785 excuse.add_hint(hint)
1787 if "block" not in blocked and is_primary:
1788 # if there is a specific block hint for this package, we don't
1789 # check for the general hints
1791 if self.options.distribution == "debian": 1791 ↛ 1798line 1791 didn't jump to line 1798 because the condition on line 1791 was always true
1792 url = "https://release.debian.org/testing/freeze_policy.html"
1793 tooltip = (
1794 'Follow the <a href="%s">freeze policy</a> when applying for an unblock'
1795 % url
1796 )
1798 if "source" in self._blockall:
1799 blocked["block"] = self._blockall["source"].user
1800 excuse.add_hint(self._blockall["source"])
1801 elif (
1802 "new-source" in self._blockall
1803 and src not in self.suite_info.target_suite.sources
1804 ):
1805 blocked["block"] = self._blockall["new-source"].user
1806 excuse.add_hint(self._blockall["new-source"])
1807 # no tooltip: new sources will probably not be accepted anyway
1808 block_info["block"] = "blocked by {}: is not in {}".format(
1809 self._blockall["new-source"].user,
1810 self.suite_info.target_suite.name,
1811 )
1812 elif "key" in self._blockall and src in self._key_packages:
1813 blocked["block"] = self._blockall["key"].user
1814 excuse.add_hint(self._blockall["key"])
1815 block_info["block"] = "blocked by {}: is a key package ({})".format(
1816 self._blockall["key"].user,
1817 tooltip,
1818 )
1819 elif "no-autopkgtest" in self._blockall:
1820 if excuse.autopkgtest_results == {"PASS"}:
1821 if not blocked: 1821 ↛ 1847line 1821 didn't jump to line 1847 because the condition on line 1821 was always true
1822 excuse.addinfo("not blocked: has successful autopkgtest")
1823 else:
1824 blocked["block"] = self._blockall["no-autopkgtest"].user
1825 excuse.add_hint(self._blockall["no-autopkgtest"])
1826 if not excuse.autopkgtest_results:
1827 block_info["block"] = (
1828 "blocked by %s: does not have autopkgtest (%s)"
1829 % (
1830 self._blockall["no-autopkgtest"].user,
1831 tooltip,
1832 )
1833 )
1834 else:
1835 block_info["block"] = (
1836 "blocked by %s: autopkgtest not fully successful (%s)"
1837 % (
1838 self._blockall["no-autopkgtest"].user,
1839 tooltip,
1840 )
1841 )
1843 elif not is_primary:
1844 blocked["block"] = suite_name
1845 excuse.needs_approval = True
1847 for block_cmd in blocked:
1848 unblock_cmd = "un" + block_cmd
1849 if block_cmd in unblocked:
1850 if is_primary or block_cmd == "block-udeb":
1851 excuse.addinfo(
1852 "Ignoring %s request by %s, due to %s request by %s"
1853 % (
1854 block_cmd,
1855 blocked[block_cmd],
1856 unblock_cmd,
1857 unblocked[block_cmd],
1858 )
1859 )
1860 else:
1861 excuse.addinfo("Approved by %s" % (unblocked[block_cmd]))
1862 else:
1863 verdict = PolicyVerdict.REJECTED_NEEDS_APPROVAL
1864 if is_primary or block_cmd == "block-udeb":
1865 # redirect people to d-i RM for udeb things:
1866 if block_cmd == "block-udeb":
1867 tooltip = "please contact the d-i release manager if an update is needed"
1868 if block_cmd in block_info:
1869 info = block_info[block_cmd]
1870 else:
1871 info = (
1872 "Not touching package due to {} request by {} ({})".format(
1873 block_cmd,
1874 blocked[block_cmd],
1875 tooltip,
1876 )
1877 )
1878 excuse.add_verdict_info(verdict, info)
1879 else:
1880 excuse.add_verdict_info(verdict, "NEEDS APPROVAL BY RM")
1881 excuse.addreason("block")
1882 if mismatches:
1883 excuse.add_detailed_info(
1884 "Some hints for %s do not match this item" % src
1885 )
1886 return verdict
1888 def apply_src_policy_impl(
1889 self,
1890 block_info: dict[str, Any],
1891 item: MigrationItem,
1892 source_data_tdist: SourcePackage | None,
1893 source_data_srcdist: SourcePackage,
1894 excuse: "Excuse",
1895 ) -> PolicyVerdict:
1896 return self._check_blocked(item, "source", source_data_srcdist.version, excuse)
1898 def apply_srcarch_policy_impl(
1899 self,
1900 block_info: dict[str, Any],
1901 item: MigrationItem,
1902 arch: str,
1903 source_data_tdist: SourcePackage | None,
1904 source_data_srcdist: SourcePackage,
1905 excuse: "Excuse",
1906 ) -> PolicyVerdict:
1907 return self._check_blocked(item, arch, source_data_srcdist.version, excuse)
1910class BuiltOnBuilddPolicy(AbstractBasePolicy):
1912 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1913 super().__init__(
1914 "builtonbuildd",
1915 options,
1916 suite_info,
1917 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1918 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1919 )
1920 self._builtonbuildd: dict[str, Any] = {
1921 "signerinfo": None,
1922 }
1924 def register_hints(self, hint_parser: HintParser) -> None:
1925 hint_parser.register_hint_type(
1926 HintType(
1927 "allow-archall-maintainer-upload",
1928 versioned=HintAnnotate.FORBIDDEN,
1929 )
1930 )
1932 def initialise(self, britney: "Britney") -> None:
1933 super().initialise(britney)
1934 try:
1935 filename_signerinfo = os.path.join(self.state_dir, "signers.json")
1936 except AttributeError as e: # pragma: no cover
1937 raise RuntimeError(
1938 "Please set STATE_DIR in the britney configuration"
1939 ) from e
1940 self._builtonbuildd["signerinfo"] = self._read_signerinfo(filename_signerinfo)
1942 def apply_srcarch_policy_impl(
1943 self,
1944 buildd_info: dict[str, Any],
1945 item: MigrationItem,
1946 arch: str,
1947 source_data_tdist: SourcePackage | None,
1948 source_data_srcdist: SourcePackage,
1949 excuse: "Excuse",
1950 ) -> PolicyVerdict:
1951 verdict = PolicyVerdict.PASS
1952 signers = self._builtonbuildd["signerinfo"]
1954 if "signed-by" not in buildd_info:
1955 buildd_info["signed-by"] = {}
1957 source_suite = item.suite
1959 # horribe hard-coding, but currently, we don't keep track of the
1960 # component when loading the packages files
1961 component = "main"
1962 # we use the source component, because a binary in contrib can
1963 # belong to a source in main
1964 section = source_data_srcdist.section
1965 if section.find("/") > -1:
1966 component = section.split("/")[0]
1968 packages_s_a = source_suite.binaries[arch]
1969 assert self.hints is not None
1971 for pkg_id in sorted(
1972 x for x in source_data_srcdist.binaries if x.architecture == arch
1973 ):
1974 pkg_name = pkg_id.package_name
1975 binary_u = packages_s_a[pkg_name]
1976 pkg_arch = binary_u.architecture
1978 if binary_u.source_version != source_data_srcdist.version: 1978 ↛ 1979line 1978 didn't jump to line 1979 because the condition on line 1978 was never true
1979 continue
1981 if item.architecture != "source" and pkg_arch == "all":
1982 # we don't care about the existing arch: all binaries when
1983 # checking a binNMU item, because the arch: all binaries won't
1984 # migrate anyway
1985 continue
1987 signer = None
1988 uid = None
1989 uidinfo = ""
1990 buildd_ok = False
1991 failure_verdict = PolicyVerdict.REJECTED_PERMANENTLY
1992 try:
1993 signer = signers[pkg_name][pkg_id.version][pkg_arch]
1994 if signer["buildd"]:
1995 buildd_ok = True
1996 uid = signer["uid"]
1997 uidinfo = f"arch {pkg_arch} binaries uploaded by {uid}"
1998 except KeyError:
1999 self.logger.info(
2000 "signer info for %s %s (%s) on %s not found "
2001 % (pkg_name, binary_u.version, pkg_arch, arch)
2002 )
2003 uidinfo = "upload info for arch %s binaries not found" % (pkg_arch)
2004 failure_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT
2005 if not buildd_ok:
2006 if component != "main":
2007 if not buildd_ok and pkg_arch not in buildd_info["signed-by"]: 2007 ↛ 2011line 2007 didn't jump to line 2011 because the condition on line 2007 was always true
2008 excuse.add_detailed_info(
2009 f"{uidinfo}, but package in {component}"
2010 )
2011 buildd_ok = True
2012 elif pkg_arch == "all":
2013 allow_hints = self.hints.search(
2014 "allow-archall-maintainer-upload", package=item.package
2015 )
2016 if allow_hints:
2017 buildd_ok = True
2018 verdict = PolicyVerdict.worst_of(
2019 verdict, PolicyVerdict.PASS_HINTED
2020 )
2021 if pkg_arch not in buildd_info["signed-by"]:
2022 excuse.addinfo(
2023 "%s, but whitelisted by %s"
2024 % (uidinfo, allow_hints[0].user)
2025 )
2026 if not buildd_ok:
2027 verdict = failure_verdict
2028 if pkg_arch not in buildd_info["signed-by"]:
2029 if pkg_arch == "all":
2030 uidinfo += (
2031 ", a new source-only upload is needed to allow migration"
2032 )
2033 excuse.add_verdict_info(
2034 verdict, "Not built on buildd: %s" % (uidinfo)
2035 )
2037 if ( 2037 ↛ 2041line 2037 didn't jump to line 2041
2038 pkg_arch in buildd_info["signed-by"]
2039 and buildd_info["signed-by"][pkg_arch] != uid
2040 ):
2041 self.logger.info(
2042 "signer mismatch for %s (%s %s) on %s: %s, while %s already listed"
2043 % (
2044 pkg_name,
2045 binary_u.source,
2046 binary_u.source_version,
2047 pkg_arch,
2048 uid,
2049 buildd_info["signed-by"][pkg_arch],
2050 )
2051 )
2053 buildd_info["signed-by"][pkg_arch] = uid
2055 return verdict
2057 def _read_signerinfo(self, filename: str) -> dict[str, Any]:
2058 signerinfo: dict[str, Any] = {}
2059 self.logger.info("Loading signer info from %s", filename)
2060 with open(filename) as fd: 2060 ↛ exitline 2060 didn't return from function '_read_signerinfo' because the return on line 2062 wasn't executed
2061 if os.fstat(fd.fileno()).st_size < 1: 2061 ↛ 2062line 2061 didn't jump to line 2062 because the condition on line 2061 was never true
2062 return signerinfo
2063 signerinfo = json.load(fd)
2065 return signerinfo
2068class ImplicitDependencyPolicy(AbstractBasePolicy):
2069 """Implicit Dependency policy
2071 Upgrading a package pkg-a can break the installability of a package pkg-b.
2072 A newer version (or the removal) of pkg-b might fix the issue. In that
2073 case, pkg-a has an 'implicit dependency' on pkg-b, because pkg-a can only
2074 migrate if pkg-b also migrates.
2076 This policy tries to discover a few common cases, and adds the relevant
2077 info to the excuses. If another item is needed to fix the
2078 uninstallability, a dependency is added. If no newer item can fix it, this
2079 excuse will be blocked.
2081 Note that the migration step will check the installability of every
2082 package, so this policy doesn't need to handle every corner case. It
2083 must, however, make sure that no excuse is unnecessarily blocked.
2085 Some cases that should be detected by this policy:
2087 * pkg-a is upgraded from 1.0-1 to 2.0-1, while
2088 pkg-b has "Depends: pkg-a (<< 2.0)"
2089 This typically happens if pkg-b has a strict dependency on pkg-a because
2090 it uses some non-stable internal interface (examples are glibc,
2091 binutils, python3-defaults, ...)
2093 * pkg-a is upgraded from 1.0-1 to 2.0-1, and
2094 pkg-a 1.0-1 has "Provides: provides-1",
2095 pkg-a 2.0-1 has "Provides: provides-2",
2096 pkg-b has "Depends: provides-1"
2097 This typically happens when pkg-a has an interface that changes between
2098 versions, and a virtual package is used to identify the version of this
2099 interface (e.g. perl-api-x.y)
2101 """
2103 _pkg_universe: "BinaryPackageUniverse"
2104 _all_binaries: dict["BinaryPackageId", "BinaryPackage"]
2105 _allow_uninst: dict[str, set[str | None]]
2106 _nobreakall_arches: list[str]
2108 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2109 super().__init__(
2110 "implicit-deps",
2111 options,
2112 suite_info,
2113 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
2114 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
2115 )
2117 def initialise(self, britney: "Britney") -> None:
2118 super().initialise(britney)
2119 self._pkg_universe = britney.pkg_universe
2120 self._all_binaries = britney.all_binaries
2121 self._smooth_updates = britney.options.smooth_updates
2122 self._nobreakall_arches = self.options.nobreakall_arches
2123 self._new_arches = self.options.new_arches
2124 self._break_arches = self.options.break_arches
2125 self._allow_uninst = britney.allow_uninst
2126 self._outofsync_arches = self.options.outofsync_arches
2128 def can_be_removed(self, pkg: BinaryPackage) -> bool:
2129 src = pkg.source
2130 target_suite = self.suite_info.target_suite
2132 # TODO these conditions shouldn't be hardcoded here
2133 # ideally, we would be able to look up excuses to see if the removal
2134 # is in there, but in the current flow, this policy is called before
2135 # all possible excuses exist, so there is no list for us to check
2137 if src not in self.suite_info.primary_source_suite.sources:
2138 # source for pkg not in unstable: candidate for removal
2139 return True
2141 source_t = target_suite.sources[src]
2142 assert self.hints is not None
2143 for hint in self.hints.search("remove", package=src, version=source_t.version):
2144 # removal hint for the source in testing: candidate for removal
2145 return True
2147 if target_suite.is_cruft(pkg):
2148 # if pkg is cruft in testing, removal will be tried
2149 return True
2151 # the case were the newer version of the source no longer includes the
2152 # binary (or includes a cruft version of the binary) will be handled
2153 # separately (in that case there might be an implicit dependency on
2154 # the newer source)
2156 return False
2158 def should_skip_rdep(
2159 self, pkg: BinaryPackage, source_name: str, myarch: str
2160 ) -> bool:
2161 target_suite = self.suite_info.target_suite
2163 if not target_suite.is_pkg_in_the_suite(pkg.pkg_id):
2164 # it is not in the target suite, migration cannot break anything
2165 return True
2167 if pkg.source == source_name:
2168 # if it is built from the same source, it will be upgraded
2169 # with the source
2170 return True
2172 if self.can_be_removed(pkg):
2173 # could potentially be removed, so if that happens, it won't be
2174 # broken
2175 return True
2177 if pkg.architecture == "all" and myarch not in self._nobreakall_arches:
2178 # arch all on non nobreakarch is allowed to become uninstallable
2179 return True
2181 if pkg.pkg_id.package_name in self._allow_uninst[myarch]:
2182 # there is a hint to allow this binary to become uninstallable
2183 return True
2185 if not target_suite.is_installable(pkg.pkg_id):
2186 # it is already uninstallable in the target suite, migration
2187 # cannot break anything
2188 return True
2190 return False
2192 def breaks_installability(
2193 self,
2194 pkg_id_t: BinaryPackageId,
2195 pkg_id_s: BinaryPackageId | None,
2196 pkg_to_check: BinaryPackageId,
2197 ) -> bool:
2198 """
2199 Check if upgrading pkg_id_t to pkg_id_s breaks the installability of
2200 pkg_to_check.
2202 To check if removing pkg_id_t breaks pkg_to_check, set pkg_id_s to
2203 None.
2204 """
2206 pkg_universe = self._pkg_universe
2207 negative_deps = pkg_universe.negative_dependencies_of(pkg_to_check)
2209 for dep in pkg_universe.dependencies_of(pkg_to_check):
2210 if pkg_id_t not in dep:
2211 # this depends doesn't have pkg_id_t as alternative, so
2212 # upgrading pkg_id_t cannot break this dependency clause
2213 continue
2215 # We check all the alternatives for this dependency, to find one
2216 # that can satisfy it when pkg_id_t is upgraded to pkg_id_s
2217 found_alternative = False
2218 for d in dep:
2219 if d in negative_deps:
2220 # If this alternative dependency conflicts with
2221 # pkg_to_check, it cannot be used to satisfy the
2222 # dependency.
2223 # This commonly happens when breaks are added to pkg_id_s.
2224 continue
2226 if d.package_name != pkg_id_t.package_name:
2227 # a binary different from pkg_id_t can satisfy the dep, so
2228 # upgrading pkg_id_t won't break this dependency
2229 found_alternative = True
2230 break
2232 if d != pkg_id_s:
2233 # We want to know the impact of the upgrade of
2234 # pkg_id_t to pkg_id_s. If pkg_id_s migrates to the
2235 # target suite, any other version of this binary will
2236 # not be there, so it cannot satisfy this dependency.
2237 # This includes pkg_id_t, but also other versions.
2238 continue
2240 # pkg_id_s can satisfy the dep
2241 found_alternative = True
2243 if not found_alternative:
2244 return True
2245 return False
2247 def check_upgrade(
2248 self,
2249 pkg_id_t: BinaryPackageId,
2250 pkg_id_s: BinaryPackageId | None,
2251 source_name: str,
2252 myarch: str,
2253 broken_binaries: set[str],
2254 excuse: "Excuse",
2255 ) -> PolicyVerdict:
2256 verdict = PolicyVerdict.PASS
2258 pkg_universe = self._pkg_universe
2259 all_binaries = self._all_binaries
2261 # check all rdeps of the package in testing
2262 rdeps_t = pkg_universe.reverse_dependencies_of(pkg_id_t)
2264 for rdep_pkg in sorted(rdeps_t):
2265 rdep_p = all_binaries[rdep_pkg]
2267 # check some cases where the rdep won't become uninstallable, or
2268 # where we don't care if it does
2269 if self.should_skip_rdep(rdep_p, source_name, myarch):
2270 continue
2272 if not self.breaks_installability(pkg_id_t, pkg_id_s, rdep_pkg):
2273 # if upgrading pkg_id_t to pkg_id_s doesn't break rdep_pkg,
2274 # there is no implicit dependency
2275 continue
2277 # The upgrade breaks the installability of the rdep. We need to
2278 # find out if there is a newer version of the rdep that solves the
2279 # uninstallability. If that is the case, there is an implicit
2280 # dependency. If not, the upgrade will fail.
2282 # check source versions
2283 newer_versions = find_newer_binaries(
2284 self.suite_info, rdep_p, add_source_for_dropped_bin=True
2285 )
2286 good_newer_versions = set()
2287 for npkg, suite in newer_versions:
2288 if npkg.architecture == "source":
2289 # When a newer version of the source package doesn't have
2290 # the binary, we get the source as 'newer version'. In
2291 # this case, the binary will not be uninstallable if the
2292 # newer source migrates, because it is no longer there.
2293 good_newer_versions.add(npkg)
2294 continue
2295 assert isinstance(npkg, BinaryPackageId)
2296 if not self.breaks_installability(pkg_id_t, pkg_id_s, npkg):
2297 good_newer_versions.add(npkg)
2299 if good_newer_versions:
2300 spec = DependencySpec(DependencyType.IMPLICIT_DEPENDENCY, myarch)
2301 excuse.add_package_depends(spec, good_newer_versions)
2302 else:
2303 # no good newer versions: no possible solution
2304 broken_binaries.add(rdep_pkg.name)
2305 if pkg_id_s:
2306 action = "migrating {} to {}".format(
2307 pkg_id_s.name,
2308 self.suite_info.target_suite.name,
2309 )
2310 else:
2311 action = "removing {} from {}".format(
2312 pkg_id_t.name,
2313 self.suite_info.target_suite.name,
2314 )
2315 info = '{0} makes <a href="#{1}">{1}</a> uninstallable'.format(
2316 action, rdep_pkg.name
2317 )
2318 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2319 excuse.add_verdict_info(verdict, info)
2321 return verdict
2323 def apply_srcarch_policy_impl(
2324 self,
2325 implicit_dep_info: dict[str, Any],
2326 item: MigrationItem,
2327 arch: str,
2328 source_data_tdist: SourcePackage | None,
2329 source_data_srcdist: SourcePackage,
2330 excuse: "Excuse",
2331 ) -> PolicyVerdict:
2332 verdict = PolicyVerdict.PASS
2334 if not source_data_tdist:
2335 # this item is not currently in testing: no implicit dependency
2336 return verdict
2338 if excuse.hasreason("missingbuild"):
2339 # if the build is missing, the policy would treat this as if the
2340 # binaries would be removed, which would give incorrect (and
2341 # confusing) info
2342 info = "missing build, not checking implicit dependencies on %s" % (arch)
2343 excuse.add_detailed_info(info)
2344 return verdict
2346 source_suite = item.suite
2347 source_name = item.package
2348 target_suite = self.suite_info.target_suite
2349 all_binaries = self._all_binaries
2351 # we check all binaries for this excuse that are currently in testing
2352 relevant_binaries = [
2353 x
2354 for x in source_data_tdist.binaries
2355 if (arch == "source" or x.architecture == arch)
2356 and x.package_name in target_suite.binaries[x.architecture]
2357 and x.architecture not in self._new_arches
2358 and x.architecture not in self._break_arches
2359 and x.architecture not in self._outofsync_arches
2360 ]
2362 broken_binaries: set[str] = set()
2364 assert self.hints is not None
2365 for pkg_id_t in sorted(relevant_binaries):
2366 mypkg = pkg_id_t.package_name
2367 myarch = pkg_id_t.architecture
2368 binaries_t_a = target_suite.binaries[myarch]
2369 binaries_s_a = source_suite.binaries[myarch]
2371 if target_suite.is_cruft(all_binaries[pkg_id_t]):
2372 # this binary is cruft in testing: it will stay around as long
2373 # as necessary to satisfy dependencies, so we don't need to
2374 # care
2375 continue
2377 if mypkg in binaries_s_a:
2378 mybin = binaries_s_a[mypkg]
2379 pkg_id_s: Optional["BinaryPackageId"] = mybin.pkg_id
2380 if mybin.source != source_name:
2381 # hijack: this is too complicated to check, so we ignore
2382 # it (the migration code will check the installability
2383 # later anyway)
2384 pass
2385 elif mybin.source_version != source_data_srcdist.version:
2386 # cruft in source suite: pretend the binary doesn't exist
2387 pkg_id_s = None
2388 elif pkg_id_t == pkg_id_s:
2389 # same binary (probably arch: all from a binNMU):
2390 # 'upgrading' doesn't change anything, for this binary, so
2391 # it won't break anything
2392 continue
2393 else:
2394 pkg_id_s = None
2396 if not pkg_id_s and is_smooth_update_allowed(
2397 binaries_t_a[mypkg], self._smooth_updates, self.hints
2398 ):
2399 # the binary isn't in the new version (or is cruft there), and
2400 # smooth updates are allowed: the binary can stay around if
2401 # that is necessary to satisfy dependencies, so we don't need
2402 # to check it
2403 continue
2405 if (
2406 not pkg_id_s
2407 and source_data_tdist.version == source_data_srcdist.version
2408 and source_suite.suite_class == SuiteClass.ADDITIONAL_SOURCE_SUITE
2409 and binaries_t_a[mypkg].architecture == "all"
2410 ):
2411 # we're very probably migrating a binNMU built in tpu where the arch:all
2412 # binaries were not copied to it as that's not needed. This policy could
2413 # needlessly block.
2414 continue
2416 v = self.check_upgrade(
2417 pkg_id_t, pkg_id_s, source_name, myarch, broken_binaries, excuse
2418 )
2419 verdict = PolicyVerdict.worst_of(verdict, v)
2421 # each arch is processed separately, so if we already have info from
2422 # other archs, we need to merge the info from this arch
2423 broken_old = set()
2424 if "implicit-deps" not in implicit_dep_info:
2425 implicit_dep_info["implicit-deps"] = {}
2426 else:
2427 broken_old = set(implicit_dep_info["implicit-deps"]["broken-binaries"])
2429 implicit_dep_info["implicit-deps"]["broken-binaries"] = sorted(
2430 broken_old | broken_binaries
2431 )
2433 return verdict
2436class ReverseRemovalPolicy(AbstractBasePolicy):
2437 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2438 super().__init__(
2439 "reverseremoval",
2440 options,
2441 suite_info,
2442 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
2443 )
2445 def register_hints(self, hint_parser: HintParser) -> None:
2446 hint_parser.register_hint_type(HintType("ignore-reverse-remove"))
2448 def initialise(self, britney: "Britney") -> None:
2449 super().initialise(britney)
2451 pkg_universe = britney.pkg_universe
2452 source_suites = britney.suite_info.source_suites
2453 target_suite = britney.suite_info.target_suite
2455 # Build set of the sources of reverse (Build-) Depends
2456 assert self.hints is not None
2457 hints = self.hints.search("remove")
2459 rev_bin: dict[BinaryPackageId, set[str]] = defaultdict(set)
2460 for hint in hints:
2461 for item in hint.packages:
2462 # I think we don't need to look at the target suite
2463 for src_suite in source_suites:
2464 try:
2465 my_bins = set(src_suite.sources[item.uvname].binaries)
2466 except KeyError:
2467 continue
2468 compute_reverse_tree(pkg_universe, my_bins)
2469 for this_bin in my_bins:
2470 rev_bin.setdefault(this_bin, set()).add(item.uvname)
2472 rev_src: dict[str, set[str]] = defaultdict(set)
2473 for bin_pkg, reasons in rev_bin.items():
2474 # If the pkg is in the target suite, there's nothing this
2475 # policy wants to do.
2476 if target_suite.is_pkg_in_the_suite(bin_pkg):
2477 continue
2478 that_bin = britney.all_binaries[bin_pkg]
2479 bin_src = that_bin.source + "/" + that_bin.source_version
2480 rev_src.setdefault(bin_src, set()).update(reasons)
2481 self._block_src_for_rm_hint = rev_src
2483 def apply_src_policy_impl(
2484 self,
2485 rev_remove_info: dict[str, Any],
2486 item: MigrationItem,
2487 source_data_tdist: SourcePackage | None,
2488 source_data_srcdist: SourcePackage,
2489 excuse: "Excuse",
2490 ) -> PolicyVerdict:
2491 verdict = PolicyVerdict.PASS
2493 if item.name in self._block_src_for_rm_hint:
2494 reason = ", ".join(sorted(self._block_src_for_rm_hint[item.name]))
2495 assert self.hints is not None
2496 ignore_hints = self.hints.search(
2497 "ignore-reverse-remove", package=item.uvname, version=item.version
2498 )
2499 excuse.addreason("reverseremoval")
2500 if ignore_hints: 2500 ↛ 2501line 2500 didn't jump to line 2501 because the condition on line 2500 was never true
2501 excuse.addreason("ignore-reverse-remove")
2502 excuse.addinfo(
2503 "Should block migration because of remove hint for %s, but forced by %s"
2504 % (reason, ignore_hints[0].user)
2505 )
2506 verdict = PolicyVerdict.PASS_HINTED
2507 else:
2508 excuse.addinfo("Remove hint for (transitive) dependency: %s" % reason)
2509 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2511 return verdict
2514class ReproduciblePolicy(AbstractBasePolicy):
2515 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2516 super().__init__(
2517 "reproducible",
2518 options,
2519 suite_info,
2520 {SuiteClass.PRIMARY_SOURCE_SUITE},
2521 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
2522 )
2523 self._reproducible: dict[str, Any] = {
2524 "source": {},
2525 "target": {},
2526 }
2528 # Default values for this policy's options
2529 parse_option(options, "repro_success_bounty", default=0, to_int=True)
2530 parse_option(options, "repro_regression_penalty", default=0, to_int=True)
2531 parse_option(options, "repro_url")
2532 parse_option(options, "repro_retry_url")
2533 parse_option(options, "repro_components")
2535 def register_hints(self, hint_parser: HintParser) -> None:
2536 hint_parser.register_hint_type(
2537 HintType("ignore-reproducible", architectured=HintAnnotate.OPTIONAL)
2538 )
2540 def initialise(self, britney: "Britney") -> None:
2541 super().initialise(britney)
2542 source_suite = self.suite_info.primary_source_suite
2543 target_suite = self.suite_info.target_suite
2544 try:
2545 filename = os.path.join(self.state_dir, "reproducible.json")
2546 except AttributeError as e: # pragma: no cover
2547 raise RuntimeError(
2548 "Please set STATE_DIR in the britney configuration"
2549 ) from e
2551 self._reproducible = self._read_repro_status(
2552 filename,
2553 source={source_suite.name, source_suite.codename},
2554 target={target_suite.name, target_suite.codename},
2555 )
2557 def apply_srcarch_policy_impl(
2558 self,
2559 reproducible_info: dict[str, Any],
2560 item: MigrationItem,
2561 arch: str,
2562 source_data_tdist: SourcePackage | None,
2563 source_data_srcdist: SourcePackage,
2564 excuse: "Excuse",
2565 ) -> PolicyVerdict:
2566 verdict = PolicyVerdict.PASS
2568 # we don't want to apply this policy (yet) on binNMUs
2569 if item.architecture != "source":
2570 return verdict
2572 # we're not supposed to judge on this arch
2573 if arch not in self.options.repro_arches:
2574 return verdict
2576 # bail out if this arch has no packages for this source (not build
2577 # here)
2578 if arch not in excuse.packages:
2579 return verdict
2581 # horrible hard-coding, but currently, we don't keep track of the
2582 # component when loading the packages files
2583 component = "main"
2584 if "/" in (section := source_data_srcdist.section):
2585 component = section.split("/")[0]
2587 if (
2588 self.options.repro_components
2589 and component not in self.options.repro_components
2590 ):
2591 return verdict
2593 source_name = item.package
2594 try:
2595 tar_res = self._reproducible["target"][arch]
2596 src_res = self._reproducible["source"][arch]
2597 except KeyError:
2598 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2599 msg = "No reproducible data available at all for %s" % arch
2600 excuse.add_verdict_info(verdict, msg)
2601 return verdict
2603 if source_data_tdist is None:
2604 target_suite_state = "new"
2605 elif source_name not in tar_res:
2606 target_suite_state = "unknown"
2607 elif tar_res[source_name]["version"] == source_data_tdist.version:
2608 target_suite_state = tar_res[source_name]["status"]
2609 else:
2610 target_suite_state = "stale"
2612 if source_name in src_res and src_res[source_name]["version"] == item.version:
2613 source_suite_state = src_res[source_name]["status"]
2614 else:
2615 source_suite_state = "unknown"
2617 # status of ['E404', 'FTBFS', 'FTBR', 'NFU', 'blacklisted', 'depwait',
2618 # 'reproducible', 'timeout'] with ['new', 'stale', 'unknown']
2619 wait_states = ("E404", "depwait", "stale", "timeout", "unknown")
2620 no_build_states = ("FTBFS", "NFU", "blacklisted")
2622 # if this package doesn't build on this architecture, we don't need to
2623 # judge it
2624 # FTBFS: Fails to build from source on r-b infra
2625 # NFU: the package explicitly doesn't support building on this arch
2626 # blacklisted: per package per arch per suite
2627 if source_suite_state in no_build_states:
2628 return verdict
2629 # Assume depwait in the source suite only are intermittent (might not
2630 # be true, e.g. with new build depends)
2631 if source_suite_state == target_suite_state and target_suite_state == "depwait":
2632 return verdict
2634 if self.options.repro_url:
2635 url = self.options.repro_url.format(package=quote(source_name), arch=arch)
2636 url_html = ' - <a href="%s">info</a>' % url
2637 if self.options.repro_retry_url:
2638 url_html += (
2639 ' <a href="%s">♻ </a>'
2640 % self.options.repro_retry_url.format(
2641 package=quote(source_name), arch=arch
2642 )
2643 )
2644 # When run on multiple archs, the last one "wins"
2645 reproducible_info["reproducible-test-url"] = url
2646 else:
2647 url = None
2648 url_html = ""
2650 eligible_for_bounty = False
2651 if source_suite_state == "reproducible":
2652 verdict = PolicyVerdict.PASS
2653 msg = f"Reproducible on {arch}{url_html}"
2654 reproducible_info.setdefault("test-results", []).append(
2655 "reproducible on %s" % arch
2656 )
2657 eligible_for_bounty = True
2658 elif source_suite_state == "FTBR":
2659 if target_suite_state == "new":
2660 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2661 msg = f"New but not reproducible on {arch}{url_html}"
2662 reproducible_info.setdefault("test-results", []).append(
2663 "new but not reproducible on %s" % arch
2664 )
2665 elif target_suite_state in wait_states:
2666 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2667 msg = "Waiting for reproducibility reference results on {}{}".format(
2668 arch,
2669 url_html,
2670 )
2671 reproducible_info.setdefault("test-results", []).append(
2672 "waiting-for-reference-results on %s" % arch
2673 )
2674 elif target_suite_state == "reproducible":
2675 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2676 msg = f"Reproducibility regression on {arch}{url_html}"
2677 reproducible_info.setdefault("test-results", []).append(
2678 "regression on %s" % arch
2679 )
2680 elif target_suite_state == "FTBR":
2681 verdict = PolicyVerdict.PASS
2682 msg = "Ignoring non-reproducibility on {} (not a regression){}".format(
2683 arch,
2684 url_html,
2685 )
2686 reproducible_info.setdefault("test-results", []).append(
2687 "not reproducible on %s" % arch
2688 )
2689 else:
2690 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2691 msg = "No reference result, but not reproducibility on {}{}".format(
2692 arch,
2693 url_html,
2694 )
2695 reproducible_info.setdefault("test-results", []).append(
2696 f"reference {target_suite_state} on {arch}"
2697 )
2698 elif source_suite_state in wait_states:
2699 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2700 msg = f"Waiting for reproducibility test results on {arch}{url_html}"
2701 reproducible_info.setdefault("test-results", []).append(
2702 "waiting-for-test-results on %s" % arch
2703 )
2704 else:
2705 raise KeyError("Unhandled reproducibility state %s" % source_suite_state)
2707 if verdict.is_rejected:
2708 assert self.hints is not None
2709 for hint_arch in ("source", arch):
2710 for ignore_hint in self.hints.search(
2711 "ignore-reproducible",
2712 package=source_name,
2713 version=source_data_srcdist.version,
2714 architecture=hint_arch,
2715 ):
2716 verdict = PolicyVerdict.PASS_HINTED
2717 reproducible_info.setdefault("ignored-reproducible", {}).setdefault(
2718 arch, {}
2719 ).setdefault("issued-by", []).append(ignore_hint.user)
2720 excuse.addinfo(
2721 "Ignoring reproducibility issue on %s as requested "
2722 "by %s" % (arch, ignore_hint.user)
2723 )
2724 break
2726 if self.options.repro_success_bounty and eligible_for_bounty:
2727 excuse.add_bounty("reproducibility", self.options.repro_success_bounty)
2729 if self.options.repro_regression_penalty and verdict in {
2730 PolicyVerdict.REJECTED_PERMANENTLY,
2731 PolicyVerdict.REJECTED_TEMPORARILY,
2732 }:
2733 if self.options.repro_regression_penalty > 0:
2734 excuse.add_penalty(
2735 "reproducibility", self.options.repro_regression_penalty
2736 )
2737 # In case we give penalties instead of blocking, we must always pass
2738 verdict = PolicyVerdict.PASS
2740 if verdict.is_rejected:
2741 excuse.add_verdict_info(verdict, msg)
2742 else:
2743 excuse.addinfo(msg)
2745 return verdict
2747 def _read_repro_status(
2748 self, filename: str, source: set[str], target: set[str]
2749 ) -> dict[str, dict[str, str]]:
2750 summary = self._reproducible
2751 self.logger.info("Loading reproducibility report from %s", filename)
2752 with open(filename) as fd:
2753 if os.fstat(fd.fileno()).st_size < 1:
2754 return summary
2755 data = json.load(fd)
2757 for result in data:
2758 if result["suite"] in source:
2759 summary["source"].setdefault(result["architecture"], {})[
2760 result["package"]
2761 ] = result
2762 if result["suite"] in target:
2763 summary["target"].setdefault(result["architecture"], {})[
2764 result["package"]
2765 ] = result
2767 return summary