Coverage for britney2/policies/policy.py: 86%
1243 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
1import json
2import logging
3import optparse
4import os
5import re
6import sys
7import time
8from abc import ABC, abstractmethod
9from collections import defaultdict
10from collections.abc import Callable, Container
11from enum import IntEnum, unique
12from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, cast
13from urllib.parse import quote
15import apt_pkg
16import yaml
18from britney2 import (
19 BinaryPackage,
20 BinaryPackageId,
21 DependencyType,
22 PackageId,
23 SourcePackage,
24 Suite,
25 SuiteClass,
26 Suites,
27 TargetSuite,
28)
29from britney2.excusedeps import DependencySpec
30from britney2.hints import (
31 Hint,
32 HintCollection,
33 HintParser,
34 PolicyHintParserProto,
35 split_into_one_hint_per_package,
36)
37from britney2.inputs.suiteloader import SuiteContentLoader
38from britney2.migrationitem import MigrationItem, MigrationItemFactory
39from britney2.policies import ApplySrcPolicy, PolicyVerdict
40from britney2.utils import (
41 compute_reverse_tree,
42 find_newer_binaries,
43 get_dependency_solvers,
44 is_smooth_update_allowed,
45 parse_option,
46 GetDependencySolversProto,
47)
49if TYPE_CHECKING: 49 ↛ 50line 49 didn't jump to line 50, because the condition on line 49 was never true
50 from ..britney import Britney
51 from ..excuse import Excuse
52 from ..installability.universe import BinaryPackageUniverse
55class PolicyLoadRequest:
56 __slots__ = ("_options_name", "_default_value", "_policy_constructor")
58 def __init__(
59 self,
60 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"],
61 options_name: Optional[str],
62 default_value: bool,
63 ) -> None:
64 self._policy_constructor = policy_constructor
65 self._options_name = options_name
66 self._default_value = default_value
68 def is_enabled(self, options: optparse.Values) -> bool:
69 if self._options_name is None:
70 assert self._default_value
71 return True
72 actual_value = getattr(options, self._options_name, None)
73 if actual_value is None:
74 return self._default_value
75 return actual_value.lower() in ("yes", "y", "true", "t")
77 def load(self, options: optparse.Values, suite_info: Suites) -> "BasePolicy":
78 return self._policy_constructor(options, suite_info)
80 @classmethod
81 def always_load(
82 cls, policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"]
83 ) -> "PolicyLoadRequest":
84 return cls(policy_constructor, None, True)
86 @classmethod
87 def conditionally_load(
88 cls,
89 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"],
90 option_name: str,
91 default_value: bool,
92 ) -> "PolicyLoadRequest":
93 return cls(policy_constructor, option_name, default_value)
96class PolicyEngine(object):
97 def __init__(self) -> None:
98 self._policies: list["BasePolicy"] = []
100 def add_policy(self, policy: "BasePolicy") -> None:
101 self._policies.append(policy)
103 def load_policies(
104 self,
105 options: optparse.Values,
106 suite_info: Suites,
107 policy_load_requests: list[PolicyLoadRequest],
108 ) -> None:
109 for policy_load_request in policy_load_requests:
110 if policy_load_request.is_enabled(options):
111 self.add_policy(policy_load_request.load(options, suite_info))
113 def register_policy_hints(self, hint_parser: HintParser) -> None:
114 for policy in self._policies:
115 policy.register_hints(hint_parser)
117 def initialise(self, britney: "Britney", hints: HintCollection) -> None:
118 for policy in self._policies:
119 policy.hints = hints
120 policy.initialise(britney)
122 def save_state(self, britney: "Britney") -> None:
123 for policy in self._policies:
124 policy.save_state(britney)
126 def apply_src_policies(
127 self,
128 item: MigrationItem,
129 source_t: Optional[SourcePackage],
130 source_u: SourcePackage,
131 excuse: "Excuse",
132 ) -> None:
133 excuse_verdict = excuse.policy_verdict
134 source_suite = item.suite
135 suite_class = source_suite.suite_class
136 for policy in self._policies:
137 pinfo: dict[str, Any] = {}
138 policy_verdict = PolicyVerdict.NOT_APPLICABLE
139 if suite_class in policy.applicable_suites:
140 if policy.src_policy.run_arch:
141 for arch in policy.options.architectures:
142 v = policy.apply_srcarch_policy_impl(
143 pinfo, item, arch, source_t, source_u, excuse
144 )
145 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v)
146 if policy.src_policy.run_src:
147 v = policy.apply_src_policy_impl(
148 pinfo, item, source_t, source_u, excuse
149 )
150 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v)
151 # The base policy provides this field, so the subclass should leave it blank
152 assert "verdict" not in pinfo
153 if policy_verdict != PolicyVerdict.NOT_APPLICABLE:
154 excuse.policy_info[policy.policy_id] = pinfo
155 pinfo["verdict"] = policy_verdict.name
156 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict)
157 excuse.policy_verdict = excuse_verdict
159 def apply_srcarch_policies(
160 self,
161 item: MigrationItem,
162 arch: str,
163 source_t: Optional[SourcePackage],
164 source_u: SourcePackage,
165 excuse: "Excuse",
166 ) -> None:
167 excuse_verdict = excuse.policy_verdict
168 source_suite = item.suite
169 suite_class = source_suite.suite_class
170 for policy in self._policies:
171 pinfo: dict[str, Any] = {}
172 if suite_class in policy.applicable_suites:
173 policy_verdict = policy.apply_srcarch_policy_impl(
174 pinfo, item, arch, source_t, source_u, excuse
175 )
176 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict)
177 # The base policy provides this field, so the subclass should leave it blank
178 assert "verdict" not in pinfo
179 if policy_verdict != PolicyVerdict.NOT_APPLICABLE:
180 excuse.policy_info[policy.policy_id] = pinfo
181 pinfo["verdict"] = policy_verdict.name
182 excuse.policy_verdict = excuse_verdict
185class BasePolicy(ABC):
186 britney: "Britney"
187 policy_id: str
188 hints: Optional[HintCollection]
189 applicable_suites: set[SuiteClass]
190 src_policy: ApplySrcPolicy
191 options: optparse.Values
192 suite_info: Suites
194 def __init__(
195 self,
196 options: optparse.Values,
197 suite_info: Suites,
198 ) -> None:
199 """The BasePolicy constructor
201 :param options: The options member of Britney with all the
202 config values.
203 """
205 @property
206 @abstractmethod
207 def state_dir(self) -> str: ... 207 ↛ exitline 207 didn't return from function 'state_dir'
209 def register_hints(self, hint_parser: HintParser) -> None: # pragma: no cover
210 """Register new hints that this policy accepts
212 :param hint_parser: (see HintParser.register_hint_type)
213 """
215 def initialise(self, britney: "Britney") -> None: # pragma: no cover
216 """Called once to make the policy initialise any data structures
218 This is useful for e.g. parsing files or other "heavy do-once" work.
220 :param britney: This is the instance of the "Britney" class.
221 """
222 self.britney = britney
224 def save_state(self, britney: "Britney") -> None: # pragma: no cover
225 """Called once at the end of the run to make the policy save any persistent data
227 Note this will *not* be called for "dry-runs" as such runs should not change
228 the state.
230 :param britney: This is the instance of the "Britney" class.
231 """
233 def apply_src_policy_impl(
234 self,
235 policy_info: dict[str, Any],
236 item: MigrationItem,
237 source_data_tdist: Optional[SourcePackage],
238 source_data_srcdist: SourcePackage,
239 excuse: "Excuse",
240 ) -> PolicyVerdict: # pragma: no cover
241 """Apply a policy on a given source migration
243 Britney will call this method on a given source package, when
244 Britney is considering to migrate it from the given source
245 suite to the target suite. The policy will then evaluate the
246 the migration and then return a verdict.
248 :param policy_info: A dictionary of all policy results. The
249 policy can add a value stored in a key related to its name.
250 (e.g. policy_info['age'] = {...}). This will go directly into
251 the "excuses.yaml" output.
253 :param item: The migration item the policy is applied to.
255 :param source_data_tdist: Information about the source package
256 in the target distribution (e.g. "testing"). This is the
257 data structure in source_suite.sources[source_name]
259 :param source_data_srcdist: Information about the source
260 package in the source distribution (e.g. "unstable" or "tpu").
261 This is the data structure in target_suite.sources[source_name]
263 :return: A Policy Verdict (e.g. PolicyVerdict.PASS)
264 """
265 return PolicyVerdict.NOT_APPLICABLE
267 def apply_srcarch_policy_impl(
268 self,
269 policy_info: dict[str, Any],
270 item: MigrationItem,
271 arch: str,
272 source_data_tdist: Optional[SourcePackage],
273 source_data_srcdist: SourcePackage,
274 excuse: "Excuse",
275 ) -> PolicyVerdict:
276 """Apply a policy on a given binary migration
278 Britney will call this method on binaries from a given source package
279 on a given architecture, when Britney is considering to migrate them
280 from the given source suite to the target suite. The policy will then
281 evaluate the migration and then return a verdict.
283 :param policy_info: A dictionary of all policy results. The
284 policy can add a value stored in a key related to its name.
285 (e.g. policy_info['age'] = {...}). This will go directly into
286 the "excuses.yaml" output.
288 :param item: The migration item the policy is applied to.
290 :param arch: The architecture the item is applied to. This is mostly
291 relevant for policies where src_policy is not ApplySrcPolicy.RUN_SRC
292 (as that is the only case where arch can differ from item.architecture)
294 :param source_data_tdist: Information about the source package
295 in the target distribution (e.g. "testing"). This is the
296 data structure in source_suite.sources[source_name]
298 :param source_data_srcdist: Information about the source
299 package in the source distribution (e.g. "unstable" or "tpu").
300 This is the data structure in target_suite.sources[source_name]
302 :return: A Policy Verdict (e.g. PolicyVerdict.PASS)
303 """
304 # if the policy doesn't implement this function, assume it's OK
305 return PolicyVerdict.NOT_APPLICABLE
308class AbstractBasePolicy(BasePolicy):
309 """
310 A shared abstract class for building BasePolicy objects.
312 tests/test_policy.py:initialize_policy() needs to be able to build BasePolicy
313 objects with just a two-item constructor, while all other uses of BasePolicy-
314 derived objects need the 5-item constructor. So AbstractBasePolicy was split
315 out to document this.
316 """
318 def __init__(
319 self,
320 policy_id: str,
321 options: optparse.Values,
322 suite_info: Suites,
323 applicable_suites: set[SuiteClass],
324 src_policy: ApplySrcPolicy = ApplySrcPolicy.RUN_SRC,
325 ) -> None:
326 """Concrete initializer.
328 :param policy_id: Identifies the policy. It will
329 determine the key used for the excuses.yaml etc.
331 :param options: The options member of Britney with all the
332 config values.
334 :param applicable_suites: Where this policy applies.
335 """
336 self.policy_id = policy_id
337 self.options = options
338 self.suite_info = suite_info
339 self.applicable_suites = applicable_suites
340 self.src_policy = src_policy
341 self.hints: Optional[HintCollection] = None
342 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
343 self.logger = logging.getLogger(logger_name)
345 @property
346 def state_dir(self) -> str:
347 return cast(str, self.options.state_dir)
350_T = TypeVar("_T")
353class SimplePolicyHint(Hint, Generic[_T]):
354 def __init__(
355 self,
356 user: str,
357 hint_type: str,
358 policy_parameter: _T,
359 packages: list[MigrationItem],
360 ) -> None:
361 super().__init__(user, hint_type, packages)
362 self._policy_parameter = policy_parameter
364 def __eq__(self, other: Any) -> bool:
365 if self.type != other.type or self._policy_parameter != other._policy_parameter:
366 return False
367 return super().__eq__(other)
369 def str(self) -> str:
370 return "%s %s %s" % (
371 self._type,
372 str(self._policy_parameter),
373 " ".join(x.name for x in self._packages),
374 )
377class AgeDayHint(SimplePolicyHint[int]):
378 @property
379 def days(self) -> int:
380 return self._policy_parameter
383class IgnoreRCBugHint(SimplePolicyHint[frozenset[str]]):
384 @property
385 def ignored_rcbugs(self) -> frozenset[str]:
386 return self._policy_parameter
389def simple_policy_hint_parser_function(
390 class_name: Callable[[str, str, _T, list[MigrationItem]], Hint],
391 converter: Callable[[str], _T],
392) -> PolicyHintParserProto:
393 def f(
394 mi_factory: MigrationItemFactory,
395 hints: HintCollection,
396 who: str,
397 hint_name: str,
398 *args: str,
399 ) -> None:
400 policy_parameter = args[0]
401 args = args[1:]
402 for item in mi_factory.parse_items(*args):
403 hints.add_hint(
404 class_name(who, hint_name, converter(policy_parameter), [item])
405 )
407 return f
410class AgePolicy(AbstractBasePolicy):
411 """Configurable Aging policy for source migrations
413 The AgePolicy will let packages stay in the source suite for a pre-defined
414 amount of days before letting migrate (based on their urgency, if any).
416 The AgePolicy's decision is influenced by the following:
418 State files:
419 * ${STATE_DIR}/age-policy-urgencies: File containing urgencies for source
420 packages. Note that urgencies are "sticky" and the most "urgent" urgency
421 will be used (i.e. the one with lowest age-requirements).
422 - This file needs to be updated externally, if the policy should take
423 urgencies into consideration. If empty (or not updated), the policy
424 will simply use the default urgency (see the "Config" section below)
425 - In Debian, these values are taken from the .changes file, but that is
426 not a requirement for Britney.
427 * ${STATE_DIR}/age-policy-dates: File containing the age of all source
428 packages.
429 - The policy will automatically update this file.
430 Config:
431 * DEFAULT_URGENCY: Name of the urgency used for packages without an urgency
432 (or for unknown urgencies). Will also be used to set the "minimum"
433 aging requirements for packages not in the target suite.
434 * MINDAYS_<URGENCY>: The age-requirements in days for packages with the
435 given urgency.
436 - Commonly used urgencies are: low, medium, high, emergency, critical
437 Hints:
438 * urgent <source>/<version>: Disregard the age requirements for a given
439 source/version.
440 * age-days X <source>/<version>: Set the age requirements for a given
441 source/version to X days. Note that X can exceed the highest
442 age-requirement normally given.
444 """
446 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
447 super().__init__("age", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE})
448 self._min_days = self._generate_mindays_table()
449 self._min_days_default = 0
450 # britney's "day" begins at 7pm (we want aging to occur in the 22:00Z run and we run Britney 2-4 times a day)
451 # NB: _date_now is used in tests
452 time_now = time.time()
453 if hasattr(self.options, "fake_runtime"):
454 time_now = int(self.options.fake_runtime)
455 self.logger.info("overriding runtime with fake_runtime %d" % time_now)
457 self._date_now = int(((time_now / (60 * 60)) - 19) / 24)
458 self._dates: dict[str, tuple[str, int]] = {}
459 self._urgencies: dict[str, str] = {}
460 self._default_urgency: str = self.options.default_urgency
461 self._penalty_immune_urgencies: frozenset[str] = frozenset()
462 if hasattr(self.options, "no_penalties"):
463 self._penalty_immune_urgencies = frozenset(
464 x.strip() for x in self.options.no_penalties.split()
465 )
466 self._bounty_min_age: Optional[int] = None # initialised later
468 def _generate_mindays_table(self) -> dict[str, int]:
469 mindays: dict[str, int] = {}
470 for k in dir(self.options):
471 if not k.startswith("mindays_"):
472 continue
473 v = getattr(self.options, k)
474 try:
475 as_days = int(v)
476 except ValueError:
477 raise ValueError(
478 "Unable to parse "
479 + k
480 + " as a number of days. Must be 0 or a positive integer"
481 )
482 if as_days < 0: 482 ↛ 483line 482 didn't jump to line 483, because the condition on line 482 was never true
483 raise ValueError(
484 "The value of " + k + " must be zero or a positive integer"
485 )
486 mindays[k.split("_")[1]] = as_days
487 return mindays
489 def register_hints(self, hint_parser: HintParser) -> None:
490 hint_parser.register_hint_type(
491 "age-days", simple_policy_hint_parser_function(AgeDayHint, int), min_args=2
492 )
493 hint_parser.register_hint_type("urgent", split_into_one_hint_per_package)
495 def initialise(self, britney: "Britney") -> None:
496 super().initialise(britney)
497 self._read_dates_file()
498 self._read_urgencies_file()
499 if self._default_urgency not in self._min_days: # pragma: no cover
500 raise ValueError(
501 "Missing age-requirement for default urgency (MINDAYS_%s)"
502 % self._default_urgency
503 )
504 self._min_days_default = self._min_days[self._default_urgency]
505 try:
506 self._bounty_min_age = int(self.options.bounty_min_age)
507 except ValueError: 507 ↛ 508line 507 didn't jump to line 508, because the exception caught by line 507 didn't happen
508 if self.options.bounty_min_age in self._min_days:
509 self._bounty_min_age = self._min_days[self.options.bounty_min_age]
510 else: # pragma: no cover
511 raise ValueError(
512 "Please fix BOUNTY_MIN_AGE in the britney configuration"
513 )
514 except AttributeError:
515 # The option wasn't defined in the configuration
516 self._bounty_min_age = 0
518 def save_state(self, britney: "Britney") -> None:
519 super().save_state(britney)
520 self._write_dates_file()
522 def apply_src_policy_impl(
523 self,
524 age_info: dict[str, Any],
525 item: MigrationItem,
526 source_data_tdist: Optional[SourcePackage],
527 source_data_srcdist: SourcePackage,
528 excuse: "Excuse",
529 ) -> PolicyVerdict:
530 # retrieve the urgency for the upload, ignoring it if this is a NEW package
531 # (not present in the target suite)
532 source_name = item.package
533 urgency = self._urgencies.get(source_name, self._default_urgency)
535 if urgency not in self._min_days: 535 ↛ 536line 535 didn't jump to line 536, because the condition on line 535 was never true
536 age_info["unknown-urgency"] = urgency
537 urgency = self._default_urgency
539 if not source_data_tdist:
540 if self._min_days[urgency] < self._min_days_default:
541 age_info["urgency-reduced"] = {
542 "from": urgency,
543 "to": self._default_urgency,
544 }
545 urgency = self._default_urgency
547 if source_name not in self._dates:
548 self._dates[source_name] = (source_data_srcdist.version, self._date_now)
549 elif self._dates[source_name][0] != source_data_srcdist.version:
550 self._dates[source_name] = (source_data_srcdist.version, self._date_now)
552 days_old = self._date_now - self._dates[source_name][1]
553 min_days = self._min_days[urgency]
554 for bounty in excuse.bounty:
555 if excuse.bounty[bounty]: 555 ↛ 554line 555 didn't jump to line 554, because the condition on line 555 was never false
556 self.logger.info(
557 "Applying bounty for %s granted by %s: %d days",
558 source_name,
559 bounty,
560 excuse.bounty[bounty],
561 )
562 excuse.addinfo(
563 "Required age reduced by %d days because of %s"
564 % (excuse.bounty[bounty], bounty)
565 )
566 assert excuse.bounty[bounty] > 0, "negative bounties shouldn't happen"
567 min_days -= excuse.bounty[bounty]
568 if urgency not in self._penalty_immune_urgencies:
569 for penalty in excuse.penalty:
570 if excuse.penalty[penalty]: 570 ↛ 569line 570 didn't jump to line 569, because the condition on line 570 was never false
571 self.logger.info(
572 "Applying penalty for %s given by %s: %d days",
573 source_name,
574 penalty,
575 excuse.penalty[penalty],
576 )
577 excuse.addinfo(
578 "Required age increased by %d days because of %s"
579 % (excuse.penalty[penalty], penalty)
580 )
581 assert (
582 excuse.penalty[penalty] > 0
583 ), "negative penalties should be handled earlier"
584 min_days += excuse.penalty[penalty]
586 assert self._bounty_min_age is not None
587 # the age in BOUNTY_MIN_AGE can be higher than the one associated with
588 # the real urgency, so don't forget to take it into account
589 bounty_min_age = min(self._bounty_min_age, self._min_days[urgency])
590 if min_days < bounty_min_age:
591 min_days = bounty_min_age
592 excuse.addinfo(
593 "Required age is not allowed to drop below %d days" % min_days
594 )
596 age_info["current-age"] = days_old
598 assert self.hints is not None
599 for age_days_hint in cast(
600 "list[AgeDayHint]",
601 self.hints.search(
602 "age-days", package=source_name, version=source_data_srcdist.version
603 ),
604 ):
605 new_req = age_days_hint.days
606 age_info["age-requirement-reduced"] = {
607 "new-requirement": new_req,
608 "changed-by": age_days_hint.user,
609 }
610 if "original-age-requirement" not in age_info: 610 ↛ 612line 610 didn't jump to line 612, because the condition on line 610 was never false
611 age_info["original-age-requirement"] = min_days
612 min_days = new_req
614 age_info["age-requirement"] = min_days
615 res = PolicyVerdict.PASS
617 if days_old < min_days:
618 urgent_hints = self.hints.search(
619 "urgent", package=source_name, version=source_data_srcdist.version
620 )
621 if urgent_hints:
622 age_info["age-requirement-reduced"] = {
623 "new-requirement": 0,
624 "changed-by": urgent_hints[0].user,
625 }
626 res = PolicyVerdict.PASS_HINTED
627 else:
628 res = PolicyVerdict.REJECTED_TEMPORARILY
630 # update excuse
631 age_hint = age_info.get("age-requirement-reduced", None)
632 age_min_req = age_info["age-requirement"]
633 if age_hint:
634 new_req = age_hint["new-requirement"]
635 who = age_hint["changed-by"]
636 if new_req:
637 excuse.addinfo(
638 "Overriding age needed from %d days to %d by %s"
639 % (age_min_req, new_req, who)
640 )
641 age_min_req = new_req
642 else:
643 excuse.addinfo("Too young, but urgency pushed by %s" % who)
644 age_min_req = 0
645 excuse.setdaysold(age_info["current-age"], age_min_req)
647 if age_min_req == 0:
648 excuse.addinfo("%d days old" % days_old)
649 elif days_old < age_min_req:
650 excuse.add_verdict_info(
651 res, "Too young, only %d of %d days old" % (days_old, age_min_req)
652 )
653 else:
654 excuse.addinfo("%d days old (needed %d days)" % (days_old, age_min_req))
656 return res
658 def _read_dates_file(self) -> None:
659 """Parse the dates file"""
660 dates = self._dates
661 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Dates")
662 using_new_name = False
663 try:
664 filename = os.path.join(self.state_dir, "age-policy-dates")
665 if not os.path.exists(filename) and os.path.exists(fallback_filename):
666 filename = fallback_filename
667 else:
668 using_new_name = True
669 except AttributeError:
670 if os.path.exists(fallback_filename):
671 filename = fallback_filename
672 else:
673 raise RuntimeError("Please set STATE_DIR in the britney configuration")
675 try:
676 with open(filename, encoding="utf-8") as fd:
677 for line in fd:
678 if line.startswith("#"):
679 # Ignore comment lines (mostly used for tests)
680 continue
681 # <source> <version> <date>)
682 ln = line.split()
683 if len(ln) != 3: # pragma: no cover
684 continue
685 try:
686 dates[ln[0]] = (ln[1], int(ln[2]))
687 except ValueError: # pragma: no cover
688 pass
689 except FileNotFoundError:
690 if not using_new_name: 690 ↛ 692line 690 didn't jump to line 692, because the condition on line 690 was never true
691 # If we using the legacy name, then just give up
692 raise
693 self.logger.info("%s does not appear to exist. Creating it", filename)
694 with open(filename, mode="x", encoding="utf-8"):
695 pass
697 def _read_urgencies_file(self) -> None:
698 urgencies = self._urgencies
699 min_days_default = self._min_days_default
700 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Urgency")
701 try:
702 filename = os.path.join(self.state_dir, "age-policy-urgencies")
703 if not os.path.exists(filename) and os.path.exists(fallback_filename):
704 filename = fallback_filename
705 except AttributeError:
706 filename = fallback_filename
708 sources_s = self.suite_info.primary_source_suite.sources
709 sources_t = self.suite_info.target_suite.sources
711 with open(filename, errors="surrogateescape", encoding="ascii") as fd:
712 for line in fd:
713 if line.startswith("#"):
714 # Ignore comment lines (mostly used for tests)
715 continue
716 # <source> <version> <urgency>
717 ln = line.split()
718 if len(ln) != 3: 718 ↛ 719line 718 didn't jump to line 719, because the condition on line 718 was never true
719 continue
721 # read the minimum days associated with the urgencies
722 urgency_old = urgencies.get(ln[0], None)
723 mindays_old = self._min_days.get(urgency_old, 1000) # type: ignore[arg-type]
724 mindays_new = self._min_days.get(ln[2], min_days_default)
726 # if the new urgency is lower (so the min days are higher), do nothing
727 if mindays_old <= mindays_new:
728 continue
730 # if the package exists in the target suite and it is more recent, do nothing
731 tsrcv = sources_t.get(ln[0], None)
732 if tsrcv and apt_pkg.version_compare(tsrcv.version, ln[1]) >= 0:
733 continue
735 # if the package doesn't exist in the primary source suite or it is older, do nothing
736 usrcv = sources_s.get(ln[0], None)
737 if not usrcv or apt_pkg.version_compare(usrcv.version, ln[1]) < 0: 737 ↛ 738line 737 didn't jump to line 738, because the condition on line 737 was never true
738 continue
740 # update the urgency for the package
741 urgencies[ln[0]] = ln[2]
743 def _write_dates_file(self) -> None:
744 dates = self._dates
745 try:
746 directory = self.state_dir
747 basename = "age-policy-dates"
748 old_file = os.path.join(self.suite_info.target_suite.path, "Dates")
749 except AttributeError:
750 directory = self.suite_info.target_suite.path
751 basename = "Dates"
752 old_file = None
753 filename = os.path.join(directory, basename)
754 filename_tmp = os.path.join(directory, "%s_new" % basename)
755 with open(filename_tmp, "w", encoding="utf-8") as fd:
756 for pkg in sorted(dates):
757 version, date = dates[pkg]
758 fd.write("%s %s %d\n" % (pkg, version, date))
759 os.rename(filename_tmp, filename)
760 if old_file is not None and os.path.exists(old_file):
761 self.logger.info("Removing old age-policy-dates file %s", old_file)
762 os.unlink(old_file)
765class RCBugPolicy(AbstractBasePolicy):
766 """RC bug regression policy for source migrations
768 The RCBugPolicy will read provided list of RC bugs and block any
769 source upload that would introduce a *new* RC bug in the target
770 suite.
772 The RCBugPolicy's decision is influenced by the following:
774 State files:
775 * ${STATE_DIR}/rc-bugs-${SUITE_NAME}: File containing RC bugs for packages in
776 the given suite (one for both primary source suite and the target sutie is
777 needed).
778 - These files need to be updated externally.
779 """
781 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
782 super().__init__(
783 "rc-bugs", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
784 )
785 self._bugs_source: Optional[dict[str, set[str]]] = None
786 self._bugs_target: Optional[dict[str, set[str]]] = None
788 def register_hints(self, hint_parser: HintParser) -> None:
789 f = simple_policy_hint_parser_function(
790 IgnoreRCBugHint, lambda x: frozenset(x.split(","))
791 )
792 hint_parser.register_hint_type("ignore-rc-bugs", f, min_args=2)
794 def initialise(self, britney: "Britney") -> None:
795 super().initialise(britney)
796 source_suite = self.suite_info.primary_source_suite
797 target_suite = self.suite_info.target_suite
798 fallback_unstable = os.path.join(source_suite.path, "BugsV")
799 fallback_testing = os.path.join(target_suite.path, "BugsV")
800 try:
801 filename_unstable = os.path.join(
802 self.state_dir, "rc-bugs-%s" % source_suite.name
803 )
804 filename_testing = os.path.join(
805 self.state_dir, "rc-bugs-%s" % target_suite.name
806 )
807 if (
808 not os.path.exists(filename_unstable)
809 and not os.path.exists(filename_testing)
810 and os.path.exists(fallback_unstable)
811 and os.path.exists(fallback_testing)
812 ):
813 filename_unstable = fallback_unstable
814 filename_testing = fallback_testing
815 except AttributeError:
816 filename_unstable = fallback_unstable
817 filename_testing = fallback_testing
818 self._bugs_source = self._read_bugs(filename_unstable)
819 self._bugs_target = self._read_bugs(filename_testing)
821 def apply_src_policy_impl(
822 self,
823 rcbugs_info: dict[str, Any],
824 item: MigrationItem,
825 source_data_tdist: Optional[SourcePackage],
826 source_data_srcdist: SourcePackage,
827 excuse: "Excuse",
828 ) -> PolicyVerdict:
829 assert self._bugs_source is not None # for type checking
830 assert self._bugs_target is not None # for type checking
831 bugs_t = set()
832 bugs_u = set()
833 source_name = item.package
835 for src_key in (source_name, "src:%s" % source_name):
836 if source_data_tdist and src_key in self._bugs_target:
837 bugs_t.update(self._bugs_target[src_key])
838 if src_key in self._bugs_source:
839 bugs_u.update(self._bugs_source[src_key])
841 for pkg, _, _ in source_data_srcdist.binaries:
842 if pkg in self._bugs_source:
843 bugs_u |= self._bugs_source[pkg]
844 if source_data_tdist:
845 for pkg, _, _ in source_data_tdist.binaries:
846 if pkg in self._bugs_target:
847 bugs_t |= self._bugs_target[pkg]
849 # If a package is not in the target suite, it has no RC bugs per
850 # definition. Unfortunately, it seems that the live-data is
851 # not always accurate (e.g. live-2011-12-13 suggests that
852 # obdgpslogger had the same bug in testing and unstable,
853 # but obdgpslogger was not in testing at that time).
854 # - For the curious, obdgpslogger was removed on that day
855 # and the BTS probably had not caught up with that fact.
856 # (https://tracker.debian.org/news/415935)
857 assert not bugs_t or source_data_tdist, (
858 "%s had bugs in the target suite but is not present" % source_name
859 )
861 verdict = PolicyVerdict.PASS
863 assert self.hints is not None
864 for ignore_hint in cast(
865 list[IgnoreRCBugHint],
866 self.hints.search(
867 "ignore-rc-bugs",
868 package=source_name,
869 version=source_data_srcdist.version,
870 ),
871 ):
872 ignored_bugs = ignore_hint.ignored_rcbugs
874 # Only handle one hint for now
875 if "ignored-bugs" in rcbugs_info:
876 self.logger.info(
877 "Ignoring ignore-rc-bugs hint from %s on %s due to another hint from %s",
878 ignore_hint.user,
879 source_name,
880 rcbugs_info["ignored-bugs"]["issued-by"],
881 )
882 continue
883 if not ignored_bugs.isdisjoint(bugs_u): 883 ↛ 892line 883 didn't jump to line 892, because the condition on line 883 was never false
884 bugs_u -= ignored_bugs
885 bugs_t -= ignored_bugs
886 rcbugs_info["ignored-bugs"] = {
887 "bugs": sorted(ignored_bugs),
888 "issued-by": ignore_hint.user,
889 }
890 verdict = PolicyVerdict.PASS_HINTED
891 else:
892 self.logger.info(
893 "Ignoring ignore-rc-bugs hint from %s on %s as none of %s affect the package",
894 ignore_hint.user,
895 source_name,
896 str(ignored_bugs),
897 )
899 rcbugs_info["shared-bugs"] = sorted(bugs_u & bugs_t)
900 rcbugs_info["unique-source-bugs"] = sorted(bugs_u - bugs_t)
901 rcbugs_info["unique-target-bugs"] = sorted(bugs_t - bugs_u)
903 # update excuse
904 new_bugs = rcbugs_info["unique-source-bugs"]
905 old_bugs = rcbugs_info["unique-target-bugs"]
906 excuse.setbugs(old_bugs, new_bugs)
908 if new_bugs:
909 verdict = PolicyVerdict.REJECTED_PERMANENTLY
910 excuse.add_verdict_info(
911 verdict,
912 "Updating %s would introduce bugs in %s: %s"
913 % (
914 source_name,
915 self.suite_info.target_suite.name,
916 ", ".join(
917 [
918 '<a href="https://bugs.debian.org/%s">#%s</a>'
919 % (quote(a), a)
920 for a in new_bugs
921 ]
922 ),
923 ),
924 )
926 if old_bugs:
927 excuse.addinfo(
928 "Updating %s will fix bugs in %s: %s"
929 % (
930 source_name,
931 self.suite_info.target_suite.name,
932 ", ".join(
933 [
934 '<a href="https://bugs.debian.org/%s">#%s</a>'
935 % (quote(a), a)
936 for a in old_bugs
937 ]
938 ),
939 )
940 )
942 return verdict
944 def _read_bugs(self, filename: str) -> dict[str, set[str]]:
945 """Read the release critical bug summary from the specified file
947 The file contains rows with the format:
949 <package-name> <bug number>[,<bug number>...]
951 The method returns a dictionary where the key is the binary package
952 name and the value is the list of open RC bugs for it.
953 """
954 bugs: dict[str, set[str]] = {}
955 self.logger.info("Loading RC bugs data from %s", filename)
956 with open(filename, encoding="ascii") as f:
957 for line in f:
958 ln = line.split()
959 if len(ln) != 2: # pragma: no cover
960 self.logger.warning("Malformed line found in line %s", line)
961 continue
962 pkg = ln[0]
963 if pkg not in bugs:
964 bugs[pkg] = set()
965 bugs[pkg].update(ln[1].split(","))
966 return bugs
969class PiupartsPolicy(AbstractBasePolicy):
970 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
971 super().__init__(
972 "piuparts", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
973 )
974 self._piuparts_source: Optional[dict[str, tuple[str, str]]] = None
975 self._piuparts_target: Optional[dict[str, tuple[str, str]]] = None
977 def register_hints(self, hint_parser: HintParser) -> None:
978 hint_parser.register_hint_type(
979 "ignore-piuparts", split_into_one_hint_per_package
980 )
982 def initialise(self, britney: "Britney") -> None:
983 super().initialise(britney)
984 source_suite = self.suite_info.primary_source_suite
985 target_suite = self.suite_info.target_suite
986 try:
987 filename_unstable = os.path.join(
988 self.state_dir, "piuparts-summary-%s.json" % source_suite.name
989 )
990 filename_testing = os.path.join(
991 self.state_dir, "piuparts-summary-%s.json" % target_suite.name
992 )
993 except AttributeError as e: # pragma: no cover
994 raise RuntimeError(
995 "Please set STATE_DIR in the britney configuration"
996 ) from e
997 self._piuparts_source = self._read_piuparts_summary(
998 filename_unstable, keep_url=True
999 )
1000 self._piuparts_target = self._read_piuparts_summary(
1001 filename_testing, keep_url=False
1002 )
1004 def apply_src_policy_impl(
1005 self,
1006 piuparts_info: dict[str, Any],
1007 item: MigrationItem,
1008 source_data_tdist: Optional[SourcePackage],
1009 source_data_srcdist: SourcePackage,
1010 excuse: "Excuse",
1011 ) -> PolicyVerdict:
1012 assert self._piuparts_source is not None # for type checking
1013 assert self._piuparts_target is not None # for type checking
1014 source_name = item.package
1016 if source_name in self._piuparts_target:
1017 testing_state = self._piuparts_target[source_name][0]
1018 else:
1019 testing_state = "X"
1020 url: Optional[str]
1021 if source_name in self._piuparts_source:
1022 unstable_state, url = self._piuparts_source[source_name]
1023 else:
1024 unstable_state = "X"
1025 url = None
1026 url_html = "(no link yet)"
1027 if url is not None:
1028 url_html = '<a href="{0}">{0}</a>'.format(url)
1030 if unstable_state == "P":
1031 # Not a regression
1032 msg = "Piuparts tested OK - {0}".format(url_html)
1033 result = PolicyVerdict.PASS
1034 piuparts_info["test-results"] = "pass"
1035 elif unstable_state == "F":
1036 if testing_state != unstable_state:
1037 piuparts_info["test-results"] = "regression"
1038 msg = "Rejected due to piuparts regression - {0}".format(url_html)
1039 result = PolicyVerdict.REJECTED_PERMANENTLY
1040 else:
1041 piuparts_info["test-results"] = "failed"
1042 msg = "Ignoring piuparts failure (Not a regression) - {0}".format(
1043 url_html
1044 )
1045 result = PolicyVerdict.PASS
1046 elif unstable_state == "W":
1047 msg = "Waiting for piuparts test results (stalls migration) - {0}".format(
1048 url_html
1049 )
1050 result = PolicyVerdict.REJECTED_TEMPORARILY
1051 piuparts_info["test-results"] = "waiting-for-test-results"
1052 else:
1053 msg = "Cannot be tested by piuparts (not a blocker) - {0}".format(url_html)
1054 piuparts_info["test-results"] = "cannot-be-tested"
1055 result = PolicyVerdict.PASS
1057 if url is not None:
1058 piuparts_info["piuparts-test-url"] = url
1059 if result.is_rejected:
1060 excuse.add_verdict_info(result, msg)
1061 else:
1062 excuse.addinfo(msg)
1064 if result.is_rejected:
1065 assert self.hints is not None
1066 for ignore_hint in self.hints.search(
1067 "ignore-piuparts",
1068 package=source_name,
1069 version=source_data_srcdist.version,
1070 ):
1071 piuparts_info["ignored-piuparts"] = {"issued-by": ignore_hint.user}
1072 result = PolicyVerdict.PASS_HINTED
1073 excuse.addinfo(
1074 "Ignoring piuparts issue as requested by {0}".format(
1075 ignore_hint.user
1076 )
1077 )
1078 break
1080 return result
1082 def _read_piuparts_summary(
1083 self, filename: str, keep_url: bool = True
1084 ) -> dict[str, tuple[str, str]]:
1085 summary: dict[str, tuple[str, str]] = {}
1086 self.logger.info("Loading piuparts report from %s", filename)
1087 with open(filename) as fd:
1088 if os.fstat(fd.fileno()).st_size < 1:
1089 return summary
1090 data = json.load(fd)
1091 try:
1092 if (
1093 data["_id"] != "Piuparts Package Test Results Summary"
1094 or data["_version"] != "1.0"
1095 ): # pragma: no cover
1096 raise ValueError(
1097 "Piuparts results in {0} does not have the correct ID or version".format(
1098 filename
1099 )
1100 )
1101 except KeyError as e: # pragma: no cover
1102 raise ValueError(
1103 "Piuparts results in {0} is missing id or version field".format(
1104 filename
1105 )
1106 ) from e
1107 for source, suite_data in data["packages"].items():
1108 if len(suite_data) != 1: # pragma: no cover
1109 raise ValueError(
1110 "Piuparts results in {0}, the source {1} does not have exactly one result set".format(
1111 filename, source
1112 )
1113 )
1114 item = next(iter(suite_data.values()))
1115 state, _, url = item
1116 if not keep_url:
1117 url = None
1118 summary[source] = (state, url)
1120 return summary
1123class DependsPolicy(AbstractBasePolicy):
1124 pkg_universe: "BinaryPackageUniverse"
1125 broken_packages: frozenset["BinaryPackageId"]
1126 all_binaries: dict["BinaryPackageId", "BinaryPackage"]
1127 allow_uninst: dict[str, set[Optional[str]]]
1129 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1130 super().__init__(
1131 "depends",
1132 options,
1133 suite_info,
1134 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1135 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1136 )
1137 self.nobreakall_arches = None
1138 self.new_arches = None
1139 self.break_arches = None
1141 def initialise(self, britney: "Britney") -> None:
1142 super().initialise(britney)
1143 self.pkg_universe = britney.pkg_universe
1144 self.broken_packages = self.pkg_universe.broken_packages
1145 self.all_binaries = britney.all_binaries
1146 self.nobreakall_arches = self.options.nobreakall_arches
1147 self.new_arches = self.options.new_arches
1148 self.break_arches = self.options.break_arches
1149 self.allow_uninst = britney.allow_uninst
1151 def apply_srcarch_policy_impl(
1152 self,
1153 deps_info: dict[str, Any],
1154 item: MigrationItem,
1155 arch: str,
1156 source_data_tdist: Optional[SourcePackage],
1157 source_data_srcdist: SourcePackage,
1158 excuse: "Excuse",
1159 ) -> PolicyVerdict:
1160 verdict = PolicyVerdict.PASS
1162 assert self.break_arches is not None
1163 assert self.new_arches is not None
1164 if arch in self.break_arches or arch in self.new_arches:
1165 # we don't check these in the policy (TODO - for now?)
1166 return verdict
1168 source_suite = item.suite
1169 target_suite = self.suite_info.target_suite
1171 packages_s_a = source_suite.binaries[arch]
1172 packages_t_a = target_suite.binaries[arch]
1174 my_bins = sorted(excuse.packages[arch])
1176 arch_all_installable = set()
1177 arch_arch_installable = set()
1178 consider_it_regression = True
1180 for pkg_id in my_bins:
1181 pkg_name = pkg_id.package_name
1182 binary_u = packages_s_a[pkg_name]
1183 pkg_arch = binary_u.architecture
1185 # in some cases, we want to track the uninstallability of a
1186 # package (because the autopkgtest policy uses this), but we still
1187 # want to allow the package to be uninstallable
1188 skip_dep_check = False
1190 if binary_u.source_version != source_data_srcdist.version:
1191 # don't check cruft in unstable
1192 continue
1194 if item.architecture != "source" and pkg_arch == "all":
1195 # we don't care about the existing arch: all binaries when
1196 # checking a binNMU item, because the arch: all binaries won't
1197 # migrate anyway
1198 skip_dep_check = True
1200 if pkg_arch == "all" and arch not in self.nobreakall_arches:
1201 skip_dep_check = True
1203 if pkg_name in self.allow_uninst[arch]: 1203 ↛ 1206line 1203 didn't jump to line 1206, because the condition on line 1203 was never true
1204 # this binary is allowed to become uninstallable, so we don't
1205 # need to check anything
1206 skip_dep_check = True
1208 if pkg_name in packages_t_a:
1209 oldbin = packages_t_a[pkg_name]
1210 if not target_suite.is_installable(oldbin.pkg_id):
1211 # as the current binary in testing is already
1212 # uninstallable, the newer version is allowed to be
1213 # uninstallable as well, so we don't need to check
1214 # anything
1215 skip_dep_check = True
1216 consider_it_regression = False
1218 if pkg_id in self.broken_packages:
1219 if pkg_arch == "all":
1220 arch_all_installable.add(False)
1221 else:
1222 arch_arch_installable.add(False)
1223 # dependencies can't be satisfied by all the known binaries -
1224 # this certainly won't work...
1225 excuse.add_unsatisfiable_on_arch(arch)
1226 if skip_dep_check:
1227 # ...but if the binary is allowed to become uninstallable,
1228 # we don't care
1229 # we still want the binary to be listed as uninstallable,
1230 continue
1231 verdict = PolicyVerdict.REJECTED_PERMANENTLY
1232 excuse.add_verdict_info(
1233 verdict, "%s/%s has unsatisfiable dependency" % (pkg_name, arch)
1234 )
1235 excuse.addreason("depends")
1236 else:
1237 if pkg_arch == "all":
1238 arch_all_installable.add(True)
1239 else:
1240 arch_arch_installable.add(True)
1242 if skip_dep_check:
1243 continue
1245 deps = self.pkg_universe.dependencies_of(pkg_id)
1247 for dep in deps:
1248 # dep is a list of packages, each of which satisfy the
1249 # dependency
1251 if dep == frozenset():
1252 continue
1253 is_ok = False
1254 needed_for_dep = set()
1256 for alternative in dep:
1257 if target_suite.is_pkg_in_the_suite(alternative):
1258 # dep can be satisfied in testing - ok
1259 is_ok = True
1260 elif alternative in my_bins:
1261 # can be satisfied by binary from same item: will be
1262 # ok if item migrates
1263 is_ok = True
1264 else:
1265 needed_for_dep.add(alternative)
1267 if not is_ok:
1268 spec = DependencySpec(DependencyType.DEPENDS, arch)
1269 excuse.add_package_depends(spec, needed_for_dep)
1271 # The autopkgtest policy needs delicate trade offs for
1272 # non-installability. The current choice (considering source
1273 # migration and only binaries built by the version of the
1274 # source):
1275 #
1276 # * Run autopkgtest if all arch:$arch binaries are installable
1277 # (but some or all arch:all binaries are not)
1278 #
1279 # * Don't schedule nor wait for not installable arch:all only package
1280 # on ! NOBREAKALL_ARCHES
1281 #
1282 # * Run autopkgtest if installability isn't a regression (there are (or
1283 # rather, should) not be a lot of packages in this state, and most
1284 # likely they'll just fail quickly)
1285 #
1286 # * Don't schedule, but wait otherwise
1287 if arch_arch_installable == {True} and False in arch_all_installable:
1288 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch)
1289 elif (
1290 arch not in self.nobreakall_arches
1291 and arch_arch_installable == set()
1292 and False in arch_all_installable
1293 ):
1294 deps_info.setdefault("arch_all_not_installable", []).append(arch)
1295 elif not consider_it_regression:
1296 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch)
1298 return verdict
1301@unique
1302class BuildDepResult(IntEnum):
1303 # relation is satisfied in target
1304 OK = 1
1305 # relation can be satisfied by other packages in source
1306 DEPENDS = 2
1307 # relation cannot be satisfied
1308 FAILED = 3
1311class BuildDependsPolicy(AbstractBasePolicy):
1313 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1314 super().__init__(
1315 "build-depends",
1316 options,
1317 suite_info,
1318 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1319 )
1320 self._all_buildarch: list[str] = []
1322 parse_option(options, "all_buildarch")
1324 def initialise(self, britney: "Britney") -> None:
1325 super().initialise(britney)
1326 if self.options.all_buildarch:
1327 self._all_buildarch = SuiteContentLoader.config_str_as_list(
1328 self.options.all_buildarch, []
1329 )
1331 def apply_src_policy_impl(
1332 self,
1333 build_deps_info: dict[str, Any],
1334 item: MigrationItem,
1335 source_data_tdist: Optional[SourcePackage],
1336 source_data_srcdist: SourcePackage,
1337 excuse: "Excuse",
1338 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers,
1339 ) -> PolicyVerdict:
1340 verdict = PolicyVerdict.PASS
1342 # analyze the dependency fields (if present)
1343 deps = source_data_srcdist.build_deps_arch
1344 if deps:
1345 v = self._check_build_deps(
1346 deps,
1347 DependencyType.BUILD_DEPENDS,
1348 build_deps_info,
1349 item,
1350 source_data_tdist,
1351 source_data_srcdist,
1352 excuse,
1353 get_dependency_solvers=get_dependency_solvers,
1354 )
1355 verdict = PolicyVerdict.worst_of(verdict, v)
1357 ideps = source_data_srcdist.build_deps_indep
1358 if ideps:
1359 v = self._check_build_deps(
1360 ideps,
1361 DependencyType.BUILD_DEPENDS_INDEP,
1362 build_deps_info,
1363 item,
1364 source_data_tdist,
1365 source_data_srcdist,
1366 excuse,
1367 get_dependency_solvers=get_dependency_solvers,
1368 )
1369 verdict = PolicyVerdict.worst_of(verdict, v)
1371 return verdict
1373 def _get_check_archs(
1374 self, archs: Container[str], dep_type: DependencyType
1375 ) -> list[str]:
1376 oos = self.options.outofsync_arches
1378 if dep_type == DependencyType.BUILD_DEPENDS:
1379 return [
1380 arch
1381 for arch in self.options.architectures
1382 if arch in archs and arch not in oos
1383 ]
1385 # first try the all buildarch
1386 checkarchs = list(self._all_buildarch)
1387 # then try the architectures where this source has arch specific
1388 # binaries (in the order of the architecture config file)
1389 checkarchs.extend(
1390 arch
1391 for arch in self.options.architectures
1392 if arch in archs and arch not in checkarchs
1393 )
1394 # then try all other architectures
1395 checkarchs.extend(
1396 arch for arch in self.options.architectures if arch not in checkarchs
1397 )
1399 # and drop OUTOFSYNC_ARCHES
1400 return [arch for arch in checkarchs if arch not in oos]
1402 def _add_info_for_arch(
1403 self,
1404 arch: str,
1405 excuses_info: dict[str, list[str]],
1406 blockers: dict[str, set[BinaryPackageId]],
1407 results: dict[str, BuildDepResult],
1408 dep_type: DependencyType,
1409 target_suite: TargetSuite,
1410 source_suite: Suite,
1411 excuse: "Excuse",
1412 verdict: PolicyVerdict,
1413 ) -> PolicyVerdict:
1414 if arch in blockers:
1415 packages = blockers[arch]
1417 # for the solving packages, update the excuse to add the dependencies
1418 for p in packages:
1419 if arch not in self.options.break_arches: 1419 ↛ 1418line 1419 didn't jump to line 1418, because the condition on line 1419 was never false
1420 spec = DependencySpec(dep_type, arch)
1421 excuse.add_package_depends(spec, {p})
1423 if arch in results and results[arch] == BuildDepResult.FAILED:
1424 verdict = PolicyVerdict.worst_of(
1425 verdict, PolicyVerdict.REJECTED_PERMANENTLY
1426 )
1428 if arch in excuses_info:
1429 for excuse_text in excuses_info[arch]:
1430 if verdict.is_rejected: 1430 ↛ 1433line 1430 didn't jump to line 1433, because the condition on line 1430 was never false
1431 excuse.add_verdict_info(verdict, excuse_text)
1432 else:
1433 excuse.addinfo(excuse_text)
1435 return verdict
1437 def _check_build_deps(
1438 self,
1439 deps: str,
1440 dep_type: DependencyType,
1441 build_deps_info: dict[str, Any],
1442 item: MigrationItem,
1443 source_data_tdist: Optional[SourcePackage],
1444 source_data_srcdist: SourcePackage,
1445 excuse: "Excuse",
1446 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers,
1447 ) -> PolicyVerdict:
1448 verdict = PolicyVerdict.PASS
1449 any_arch_ok = dep_type == DependencyType.BUILD_DEPENDS_INDEP
1451 britney = self.britney
1453 # local copies for better performance
1454 parse_src_depends = apt_pkg.parse_src_depends
1456 source_name = item.package
1457 source_suite = item.suite
1458 target_suite = self.suite_info.target_suite
1459 binaries_s = source_suite.binaries
1460 provides_s = source_suite.provides_table
1461 binaries_t = target_suite.binaries
1462 provides_t = target_suite.provides_table
1463 unsat_bd: dict[str, list[str]] = {}
1464 relevant_archs: set[str] = {
1465 binary.architecture
1466 for binary in source_data_srcdist.binaries
1467 if britney.all_binaries[binary].architecture != "all"
1468 }
1470 excuses_info: dict[str, list[str]] = defaultdict(list)
1471 blockers: dict[str, set[BinaryPackageId]] = defaultdict(set)
1472 arch_results = {}
1473 result_archs = defaultdict(list)
1474 bestresult = BuildDepResult.FAILED
1475 check_archs = self._get_check_archs(relevant_archs, dep_type)
1476 if not check_archs:
1477 # when the arch list is empty, we check the b-d on any arch, instead of all archs
1478 # this happens for Build-Depens on a source package that only produces arch: all binaries
1479 any_arch_ok = True
1480 check_archs = self._get_check_archs(
1481 self.options.architectures, DependencyType.BUILD_DEPENDS_INDEP
1482 )
1484 for arch in check_archs:
1485 # retrieve the binary package from the specified suite and arch
1486 binaries_s_a = binaries_s[arch]
1487 provides_s_a = provides_s[arch]
1488 binaries_t_a = binaries_t[arch]
1489 provides_t_a = provides_t[arch]
1490 arch_results[arch] = BuildDepResult.OK
1491 # for every dependency block (formed as conjunction of disjunction)
1492 for block_txt in deps.split(","):
1493 block_list = parse_src_depends(block_txt, False, arch)
1494 # Unlike regular dependencies, some clauses of the Build-Depends(-Arch|-Indep) can be
1495 # filtered out by (e.g.) architecture restrictions. We need to cope with this while
1496 # keeping block_txt and block aligned.
1497 if not block_list:
1498 # Relation is not relevant for this architecture.
1499 continue
1500 block = block_list[0]
1501 # if the block is satisfied in the target suite, then skip the block
1502 if get_dependency_solvers(
1503 block, binaries_t_a, provides_t_a, build_depends=True
1504 ):
1505 # Satisfied in the target suite; all ok.
1506 continue
1508 # check if the block can be satisfied in the source suite, and list the solving packages
1509 packages = get_dependency_solvers(
1510 block, binaries_s_a, provides_s_a, build_depends=True
1511 )
1512 sources = sorted(p.source for p in packages)
1514 # if the dependency can be satisfied by the same source package, skip the block:
1515 # obviously both binary packages will enter the target suite together
1516 if source_name in sources: 1516 ↛ 1517line 1516 didn't jump to line 1517, because the condition on line 1516 was never true
1517 continue
1519 # if no package can satisfy the dependency, add this information to the excuse
1520 if not packages:
1521 excuses_info[arch].append(
1522 "%s unsatisfiable %s on %s: %s"
1523 % (source_name, dep_type, arch, block_txt.strip())
1524 )
1525 if arch not in unsat_bd: 1525 ↛ 1527line 1525 didn't jump to line 1527, because the condition on line 1525 was never false
1526 unsat_bd[arch] = []
1527 unsat_bd[arch].append(block_txt.strip())
1528 arch_results[arch] = BuildDepResult.FAILED
1529 continue
1531 blockers[arch].update(p.pkg_id for p in packages)
1532 if arch_results[arch] < BuildDepResult.DEPENDS:
1533 arch_results[arch] = BuildDepResult.DEPENDS
1535 if any_arch_ok:
1536 if arch_results[arch] < bestresult:
1537 bestresult = arch_results[arch]
1538 result_archs[arch_results[arch]].append(arch)
1539 if bestresult == BuildDepResult.OK:
1540 # we found an architecture where the b-deps-indep are
1541 # satisfied in the target suite, so we can stop
1542 break
1544 if any_arch_ok:
1545 arch = result_archs[bestresult][0]
1546 excuse.add_detailed_info(
1547 "Checking %s on %s" % (dep_type.get_description(), arch)
1548 )
1549 key = "check-%s-on-arch" % dep_type.get_reason()
1550 build_deps_info[key] = arch
1551 verdict = self._add_info_for_arch(
1552 arch,
1553 excuses_info,
1554 blockers,
1555 arch_results,
1556 dep_type,
1557 target_suite,
1558 source_suite,
1559 excuse,
1560 verdict,
1561 )
1563 else:
1564 for arch in check_archs:
1565 verdict = self._add_info_for_arch(
1566 arch,
1567 excuses_info,
1568 blockers,
1569 arch_results,
1570 dep_type,
1571 target_suite,
1572 source_suite,
1573 excuse,
1574 verdict,
1575 )
1577 if unsat_bd:
1578 build_deps_info["unsatisfiable-arch-build-depends"] = unsat_bd
1580 return verdict
1583class BuiltUsingPolicy(AbstractBasePolicy):
1584 """Built-Using policy
1586 Binaries that incorporate (part of) another source package must list these
1587 sources under 'Built-Using'.
1589 This policy checks if the corresponding sources are available in the
1590 target suite. If they are not, but they are candidates for migration, a
1591 dependency is added.
1593 If the binary incorporates a newer version of a source, that is not (yet)
1594 a candidate, we don't want to accept that binary. A rebuild later in the
1595 primary suite wouldn't fix the issue, because that would incorporate the
1596 newer version again.
1598 If the binary incorporates an older version of the source, a newer version
1599 will be accepted as a replacement. We assume that this can be fixed by
1600 rebuilding the binary at some point during the development cycle.
1602 Requiring exact version of the source would not be useful in practice. A
1603 newer upload of that source wouldn't be blocked by this policy, so the
1604 built-using would be outdated anyway.
1606 """
1608 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1609 super().__init__(
1610 "built-using",
1611 options,
1612 suite_info,
1613 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1614 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1615 )
1617 def initialise(self, britney: "Britney") -> None:
1618 super().initialise(britney)
1620 def apply_srcarch_policy_impl(
1621 self,
1622 build_deps_info: dict[str, Any],
1623 item: MigrationItem,
1624 arch: str,
1625 source_data_tdist: Optional[SourcePackage],
1626 source_data_srcdist: SourcePackage,
1627 excuse: "Excuse",
1628 ) -> PolicyVerdict:
1629 verdict = PolicyVerdict.PASS
1631 source_suite = item.suite
1632 target_suite = self.suite_info.target_suite
1633 binaries_s = source_suite.binaries
1635 def check_bu_in_suite(
1636 bu_source: str, bu_version: str, source_suite: Suite
1637 ) -> bool:
1638 found = False
1639 if bu_source not in source_suite.sources:
1640 return found
1641 s_source = source_suite.sources[bu_source]
1642 s_ver = s_source.version
1643 if apt_pkg.version_compare(s_ver, bu_version) >= 0:
1644 found = True
1645 dep = PackageId(bu_source, s_ver, "source")
1646 if arch in self.options.break_arches:
1647 excuse.add_detailed_info(
1648 "Ignoring Built-Using for %s/%s on %s"
1649 % (pkg_name, arch, dep.uvname)
1650 )
1651 else:
1652 spec = DependencySpec(DependencyType.BUILT_USING, arch)
1653 excuse.add_package_depends(spec, {dep})
1654 excuse.add_detailed_info(
1655 "%s/%s has Built-Using on %s" % (pkg_name, arch, dep.uvname)
1656 )
1658 return found
1660 for pkg_id in sorted(
1661 x for x in source_data_srcdist.binaries if x.architecture == arch
1662 ):
1663 pkg_name = pkg_id.package_name
1665 # retrieve the testing (if present) and unstable corresponding binary packages
1666 binary_s = binaries_s[arch][pkg_name]
1668 for bu in binary_s.builtusing:
1669 bu_source = bu[0]
1670 bu_version = bu[1]
1671 found = False
1672 if bu_source in target_suite.sources:
1673 t_source = target_suite.sources[bu_source]
1674 t_ver = t_source.version
1675 if apt_pkg.version_compare(t_ver, bu_version) >= 0:
1676 found = True
1678 if not found:
1679 found = check_bu_in_suite(bu_source, bu_version, source_suite)
1681 if not found and source_suite.suite_class.is_additional_source:
1682 found = check_bu_in_suite(
1683 bu_source, bu_version, self.suite_info.primary_source_suite
1684 )
1686 if not found:
1687 if arch in self.options.break_arches:
1688 excuse.add_detailed_info(
1689 "Ignoring unsatisfiable Built-Using for %s/%s on %s %s"
1690 % (pkg_name, arch, bu_source, bu_version)
1691 )
1692 else:
1693 verdict = PolicyVerdict.worst_of(
1694 verdict, PolicyVerdict.REJECTED_PERMANENTLY
1695 )
1696 excuse.add_verdict_info(
1697 verdict,
1698 "%s/%s has unsatisfiable Built-Using on %s %s"
1699 % (pkg_name, arch, bu_source, bu_version),
1700 )
1702 return verdict
1705class BlockPolicy(AbstractBasePolicy):
1706 BLOCK_HINT_REGEX = re.compile("^(un)?(block-?.*)$")
1708 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1709 super().__init__(
1710 "block",
1711 options,
1712 suite_info,
1713 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1714 )
1715 self._blockall: dict[Optional[str], Hint] = {}
1717 def initialise(self, britney: "Britney") -> None:
1718 super().initialise(britney)
1719 assert self.hints is not None
1720 for hint in self.hints.search(type="block-all"):
1721 self._blockall[hint.package] = hint
1723 self._key_packages = []
1724 if "key" in self._blockall:
1725 self._key_packages = self._read_key_packages()
1727 def _read_key_packages(self) -> list[str]:
1728 """Read the list of key packages
1730 The file contains data in the yaml format :
1732 - reason: <something>
1733 source: <package>
1735 The method returns a list of all key packages.
1736 """
1737 filename = os.path.join(self.state_dir, "key_packages.yaml")
1738 self.logger.info("Loading key packages from %s", filename)
1739 if os.path.exists(filename): 1739 ↛ 1744line 1739 didn't jump to line 1744, because the condition on line 1739 was never false
1740 with open(filename) as f:
1741 data = yaml.safe_load(f)
1742 key_packages = [item["source"] for item in data]
1743 else:
1744 self.logger.error(
1745 "Britney was asked to block key packages, "
1746 + "but no key_packages.yaml file was found."
1747 )
1748 sys.exit(1)
1750 return key_packages
1752 def register_hints(self, hint_parser: HintParser) -> None:
1753 # block related hints are currently defined in hint.py
1754 pass
1756 def _check_blocked(
1757 self, item: "MigrationItem", arch: str, version: str, excuse: "Excuse"
1758 ) -> PolicyVerdict:
1759 verdict = PolicyVerdict.PASS
1760 blocked = {}
1761 unblocked = {}
1762 block_info = {}
1763 source_suite = item.suite
1764 suite_name = source_suite.name
1765 src = item.package
1766 is_primary = source_suite.suite_class == SuiteClass.PRIMARY_SOURCE_SUITE
1768 tooltip = (
1769 "please contact %s-release if update is needed" % self.options.distribution
1770 )
1772 assert self.hints is not None
1773 shints = self.hints.search(package=src)
1774 mismatches = False
1775 r = self.BLOCK_HINT_REGEX
1776 for hint in shints:
1777 m = r.match(hint.type)
1778 if m:
1779 if m.group(1) == "un":
1780 assert hint.suite is not None
1781 if (
1782 hint.version != version
1783 or hint.suite.name != suite_name
1784 or (hint.architecture != arch and hint.architecture != "source")
1785 ):
1786 self.logger.info(
1787 "hint mismatch: %s %s %s", version, arch, suite_name
1788 )
1789 mismatches = True
1790 else:
1791 unblocked[m.group(2)] = hint.user
1792 excuse.add_hint(hint)
1793 else:
1794 # block(-*) hint: only accepts a source, so this will
1795 # always match
1796 blocked[m.group(2)] = hint.user
1797 excuse.add_hint(hint)
1799 if "block" not in blocked and is_primary:
1800 # if there is a specific block hint for this package, we don't
1801 # check for the general hints
1803 if self.options.distribution == "debian": 1803 ↛ 1810line 1803 didn't jump to line 1810, because the condition on line 1803 was never false
1804 url = "https://release.debian.org/testing/freeze_policy.html"
1805 tooltip = (
1806 'Follow the <a href="%s">freeze policy</a> when applying for an unblock'
1807 % url
1808 )
1810 if "source" in self._blockall:
1811 blocked["block"] = self._blockall["source"].user
1812 excuse.add_hint(self._blockall["source"])
1813 elif (
1814 "new-source" in self._blockall
1815 and src not in self.suite_info.target_suite.sources
1816 ):
1817 blocked["block"] = self._blockall["new-source"].user
1818 excuse.add_hint(self._blockall["new-source"])
1819 # no tooltip: new sources will probably not be accepted anyway
1820 block_info["block"] = "blocked by %s: is not in %s" % (
1821 self._blockall["new-source"].user,
1822 self.suite_info.target_suite.name,
1823 )
1824 elif "key" in self._blockall and src in self._key_packages:
1825 blocked["block"] = self._blockall["key"].user
1826 excuse.add_hint(self._blockall["key"])
1827 block_info["block"] = "blocked by %s: is a key package (%s)" % (
1828 self._blockall["key"].user,
1829 tooltip,
1830 )
1831 elif "no-autopkgtest" in self._blockall:
1832 if excuse.autopkgtest_results == {"PASS"}:
1833 if not blocked: 1833 ↛ 1859line 1833 didn't jump to line 1859, because the condition on line 1833 was never false
1834 excuse.addinfo("not blocked: has successful autopkgtest")
1835 else:
1836 blocked["block"] = self._blockall["no-autopkgtest"].user
1837 excuse.add_hint(self._blockall["no-autopkgtest"])
1838 if not excuse.autopkgtest_results:
1839 block_info["block"] = (
1840 "blocked by %s: does not have autopkgtest (%s)"
1841 % (
1842 self._blockall["no-autopkgtest"].user,
1843 tooltip,
1844 )
1845 )
1846 else:
1847 block_info["block"] = (
1848 "blocked by %s: autopkgtest not fully successful (%s)"
1849 % (
1850 self._blockall["no-autopkgtest"].user,
1851 tooltip,
1852 )
1853 )
1855 elif not is_primary:
1856 blocked["block"] = suite_name
1857 excuse.needs_approval = True
1859 for block_cmd in blocked:
1860 unblock_cmd = "un" + block_cmd
1861 if block_cmd in unblocked:
1862 if is_primary or block_cmd == "block-udeb":
1863 excuse.addinfo(
1864 "Ignoring %s request by %s, due to %s request by %s"
1865 % (
1866 block_cmd,
1867 blocked[block_cmd],
1868 unblock_cmd,
1869 unblocked[block_cmd],
1870 )
1871 )
1872 else:
1873 excuse.addinfo("Approved by %s" % (unblocked[block_cmd]))
1874 else:
1875 verdict = PolicyVerdict.REJECTED_NEEDS_APPROVAL
1876 if is_primary or block_cmd == "block-udeb":
1877 # redirect people to d-i RM for udeb things:
1878 if block_cmd == "block-udeb":
1879 tooltip = "please contact the d-i release manager if an update is needed"
1880 if block_cmd in block_info:
1881 info = block_info[block_cmd]
1882 else:
1883 info = "Not touching package due to %s request by %s (%s)" % (
1884 block_cmd,
1885 blocked[block_cmd],
1886 tooltip,
1887 )
1888 excuse.add_verdict_info(verdict, info)
1889 else:
1890 excuse.add_verdict_info(verdict, "NEEDS APPROVAL BY RM")
1891 excuse.addreason("block")
1892 if mismatches:
1893 excuse.add_detailed_info(
1894 "Some hints for %s do not match this item" % src
1895 )
1896 return verdict
1898 def apply_src_policy_impl(
1899 self,
1900 block_info: dict[str, Any],
1901 item: MigrationItem,
1902 source_data_tdist: Optional[SourcePackage],
1903 source_data_srcdist: SourcePackage,
1904 excuse: "Excuse",
1905 ) -> PolicyVerdict:
1906 return self._check_blocked(item, "source", source_data_srcdist.version, excuse)
1908 def apply_srcarch_policy_impl(
1909 self,
1910 block_info: dict[str, Any],
1911 item: MigrationItem,
1912 arch: str,
1913 source_data_tdist: Optional[SourcePackage],
1914 source_data_srcdist: SourcePackage,
1915 excuse: "Excuse",
1916 ) -> PolicyVerdict:
1917 return self._check_blocked(item, arch, source_data_srcdist.version, excuse)
1920class BuiltOnBuilddPolicy(AbstractBasePolicy):
1922 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1923 super().__init__(
1924 "builtonbuildd",
1925 options,
1926 suite_info,
1927 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1928 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1929 )
1930 self._builtonbuildd: dict[str, Any] = {
1931 "signerinfo": None,
1932 }
1934 def register_hints(self, hint_parser: HintParser) -> None:
1935 hint_parser.register_hint_type(
1936 "allow-archall-maintainer-upload", split_into_one_hint_per_package
1937 )
1939 def initialise(self, britney: "Britney") -> None:
1940 super().initialise(britney)
1941 try:
1942 filename_signerinfo = os.path.join(self.state_dir, "signers.json")
1943 except AttributeError as e: # pragma: no cover
1944 raise RuntimeError(
1945 "Please set STATE_DIR in the britney configuration"
1946 ) from e
1947 self._builtonbuildd["signerinfo"] = self._read_signerinfo(filename_signerinfo)
1949 def apply_srcarch_policy_impl(
1950 self,
1951 buildd_info: dict[str, Any],
1952 item: MigrationItem,
1953 arch: str,
1954 source_data_tdist: Optional[SourcePackage],
1955 source_data_srcdist: SourcePackage,
1956 excuse: "Excuse",
1957 ) -> PolicyVerdict:
1958 verdict = PolicyVerdict.PASS
1959 signers = self._builtonbuildd["signerinfo"]
1961 if "signed-by" not in buildd_info:
1962 buildd_info["signed-by"] = {}
1964 source_suite = item.suite
1966 # horribe hard-coding, but currently, we don't keep track of the
1967 # component when loading the packages files
1968 component = "main"
1969 # we use the source component, because a binary in contrib can
1970 # belong to a source in main
1971 section = source_data_srcdist.section
1972 if section.find("/") > -1:
1973 component = section.split("/")[0]
1975 packages_s_a = source_suite.binaries[arch]
1976 assert self.hints is not None
1978 for pkg_id in sorted(
1979 x for x in source_data_srcdist.binaries if x.architecture == arch
1980 ):
1981 pkg_name = pkg_id.package_name
1982 binary_u = packages_s_a[pkg_name]
1983 pkg_arch = binary_u.architecture
1985 if binary_u.source_version != source_data_srcdist.version: 1985 ↛ 1986line 1985 didn't jump to line 1986, because the condition on line 1985 was never true
1986 continue
1988 if item.architecture != "source" and pkg_arch == "all":
1989 # we don't care about the existing arch: all binaries when
1990 # checking a binNMU item, because the arch: all binaries won't
1991 # migrate anyway
1992 continue
1994 signer = None
1995 uid = None
1996 uidinfo = ""
1997 buildd_ok = False
1998 failure_verdict = PolicyVerdict.REJECTED_PERMANENTLY
1999 try:
2000 signer = signers[pkg_name][pkg_id.version][pkg_arch]
2001 if signer["buildd"]:
2002 buildd_ok = True
2003 uid = signer["uid"]
2004 uidinfo = "arch %s binaries uploaded by %s" % (pkg_arch, uid)
2005 except KeyError:
2006 self.logger.info(
2007 "signer info for %s %s (%s) on %s not found "
2008 % (pkg_name, binary_u.version, pkg_arch, arch)
2009 )
2010 uidinfo = "upload info for arch %s binaries not found" % (pkg_arch)
2011 failure_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT
2012 if not buildd_ok:
2013 if component != "main":
2014 if not buildd_ok and pkg_arch not in buildd_info["signed-by"]: 2014 ↛ 2018line 2014 didn't jump to line 2018, because the condition on line 2014 was never false
2015 excuse.add_detailed_info(
2016 "%s, but package in %s" % (uidinfo, component)
2017 )
2018 buildd_ok = True
2019 elif pkg_arch == "all":
2020 allow_hints = self.hints.search(
2021 "allow-archall-maintainer-upload", package=item.package
2022 )
2023 if allow_hints:
2024 buildd_ok = True
2025 verdict = PolicyVerdict.worst_of(
2026 verdict, PolicyVerdict.PASS_HINTED
2027 )
2028 if pkg_arch not in buildd_info["signed-by"]:
2029 excuse.addinfo(
2030 "%s, but whitelisted by %s"
2031 % (uidinfo, allow_hints[0].user)
2032 )
2033 if not buildd_ok:
2034 verdict = failure_verdict
2035 if pkg_arch not in buildd_info["signed-by"]:
2036 if pkg_arch == "all":
2037 uidinfo += (
2038 ", a new source-only upload is needed to allow migration"
2039 )
2040 excuse.add_verdict_info(
2041 verdict, "Not built on buildd: %s" % (uidinfo)
2042 )
2044 if ( 2044 ↛ 2048line 2044 didn't jump to line 2048
2045 pkg_arch in buildd_info["signed-by"]
2046 and buildd_info["signed-by"][pkg_arch] != uid
2047 ):
2048 self.logger.info(
2049 "signer mismatch for %s (%s %s) on %s: %s, while %s already listed"
2050 % (
2051 pkg_name,
2052 binary_u.source,
2053 binary_u.source_version,
2054 pkg_arch,
2055 uid,
2056 buildd_info["signed-by"][pkg_arch],
2057 )
2058 )
2060 buildd_info["signed-by"][pkg_arch] = uid
2062 return verdict
2064 def _read_signerinfo(self, filename: str) -> dict[str, Any]:
2065 signerinfo: dict[str, Any] = {}
2066 self.logger.info("Loading signer info from %s", filename)
2067 with open(filename) as fd: 2067 ↛ exitline 2067 didn't return from function '_read_signerinfo', because the return on line 2069 wasn't executed
2068 if os.fstat(fd.fileno()).st_size < 1: 2068 ↛ 2069line 2068 didn't jump to line 2069, because the condition on line 2068 was never true
2069 return signerinfo
2070 signerinfo = json.load(fd)
2072 return signerinfo
2075class ImplicitDependencyPolicy(AbstractBasePolicy):
2076 """Implicit Dependency policy
2078 Upgrading a package pkg-a can break the installability of a package pkg-b.
2079 A newer version (or the removal) of pkg-b might fix the issue. In that
2080 case, pkg-a has an 'implicit dependency' on pkg-b, because pkg-a can only
2081 migrate if pkg-b also migrates.
2083 This policy tries to discover a few common cases, and adds the relevant
2084 info to the excuses. If another item is needed to fix the
2085 uninstallability, a dependency is added. If no newer item can fix it, this
2086 excuse will be blocked.
2088 Note that the migration step will check the installability of every
2089 package, so this policy doesn't need to handle every corner case. It
2090 must, however, make sure that no excuse is unnecessarily blocked.
2092 Some cases that should be detected by this policy:
2094 * pkg-a is upgraded from 1.0-1 to 2.0-1, while
2095 pkg-b has "Depends: pkg-a (<< 2.0)"
2096 This typically happens if pkg-b has a strict dependency on pkg-a because
2097 it uses some non-stable internal interface (examples are glibc,
2098 binutils, python3-defaults, ...)
2100 * pkg-a is upgraded from 1.0-1 to 2.0-1, and
2101 pkg-a 1.0-1 has "Provides: provides-1",
2102 pkg-a 2.0-1 has "Provides: provides-2",
2103 pkg-b has "Depends: provides-1"
2104 This typically happens when pkg-a has an interface that changes between
2105 versions, and a virtual package is used to identify the version of this
2106 interface (e.g. perl-api-x.y)
2108 """
2110 _pkg_universe: "BinaryPackageUniverse"
2111 _all_binaries: dict["BinaryPackageId", "BinaryPackage"]
2112 _allow_uninst: dict[str, set[Optional[str]]]
2113 _nobreakall_arches: list[str]
2115 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2116 super().__init__(
2117 "implicit-deps",
2118 options,
2119 suite_info,
2120 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
2121 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
2122 )
2124 def initialise(self, britney: "Britney") -> None:
2125 super().initialise(britney)
2126 self._pkg_universe = britney.pkg_universe
2127 self._all_binaries = britney.all_binaries
2128 self._smooth_updates = britney.options.smooth_updates
2129 self._nobreakall_arches = self.options.nobreakall_arches
2130 self._new_arches = self.options.new_arches
2131 self._break_arches = self.options.break_arches
2132 self._allow_uninst = britney.allow_uninst
2133 self._outofsync_arches = self.options.outofsync_arches
2135 def can_be_removed(self, pkg: BinaryPackage) -> bool:
2136 src = pkg.source
2137 target_suite = self.suite_info.target_suite
2139 # TODO these conditions shouldn't be hardcoded here
2140 # ideally, we would be able to look up excuses to see if the removal
2141 # is in there, but in the current flow, this policy is called before
2142 # all possible excuses exist, so there is no list for us to check
2144 if src not in self.suite_info.primary_source_suite.sources:
2145 # source for pkg not in unstable: candidate for removal
2146 return True
2148 source_t = target_suite.sources[src]
2149 assert self.hints is not None
2150 for hint in self.hints.search("remove", package=src, version=source_t.version):
2151 # removal hint for the source in testing: candidate for removal
2152 return True
2154 if target_suite.is_cruft(pkg):
2155 # if pkg is cruft in testing, removal will be tried
2156 return True
2158 # the case were the newer version of the source no longer includes the
2159 # binary (or includes a cruft version of the binary) will be handled
2160 # separately (in that case there might be an implicit dependency on
2161 # the newer source)
2163 return False
2165 def should_skip_rdep(
2166 self, pkg: BinaryPackage, source_name: str, myarch: str
2167 ) -> bool:
2168 target_suite = self.suite_info.target_suite
2170 if not target_suite.is_pkg_in_the_suite(pkg.pkg_id):
2171 # it is not in the target suite, migration cannot break anything
2172 return True
2174 if pkg.source == source_name:
2175 # if it is built from the same source, it will be upgraded
2176 # with the source
2177 return True
2179 if self.can_be_removed(pkg):
2180 # could potentially be removed, so if that happens, it won't be
2181 # broken
2182 return True
2184 if pkg.architecture == "all" and myarch not in self._nobreakall_arches:
2185 # arch all on non nobreakarch is allowed to become uninstallable
2186 return True
2188 if pkg.pkg_id.package_name in self._allow_uninst[myarch]:
2189 # there is a hint to allow this binary to become uninstallable
2190 return True
2192 if not target_suite.is_installable(pkg.pkg_id):
2193 # it is already uninstallable in the target suite, migration
2194 # cannot break anything
2195 return True
2197 return False
2199 def breaks_installability(
2200 self,
2201 pkg_id_t: BinaryPackageId,
2202 pkg_id_s: Optional[BinaryPackageId],
2203 pkg_to_check: BinaryPackageId,
2204 ) -> bool:
2205 """
2206 Check if upgrading pkg_id_t to pkg_id_s breaks the installability of
2207 pkg_to_check.
2209 To check if removing pkg_id_t breaks pkg_to_check, set pkg_id_s to
2210 None.
2211 """
2213 pkg_universe = self._pkg_universe
2214 negative_deps = pkg_universe.negative_dependencies_of(pkg_to_check)
2216 for dep in pkg_universe.dependencies_of(pkg_to_check):
2217 if pkg_id_t not in dep:
2218 # this depends doesn't have pkg_id_t as alternative, so
2219 # upgrading pkg_id_t cannot break this dependency clause
2220 continue
2222 # We check all the alternatives for this dependency, to find one
2223 # that can satisfy it when pkg_id_t is upgraded to pkg_id_s
2224 found_alternative = False
2225 for d in dep:
2226 if d in negative_deps:
2227 # If this alternative dependency conflicts with
2228 # pkg_to_check, it cannot be used to satisfy the
2229 # dependency.
2230 # This commonly happens when breaks are added to pkg_id_s.
2231 continue
2233 if d.package_name != pkg_id_t.package_name:
2234 # a binary different from pkg_id_t can satisfy the dep, so
2235 # upgrading pkg_id_t won't break this dependency
2236 found_alternative = True
2237 break
2239 if d != pkg_id_s:
2240 # We want to know the impact of the upgrade of
2241 # pkg_id_t to pkg_id_s. If pkg_id_s migrates to the
2242 # target suite, any other version of this binary will
2243 # not be there, so it cannot satisfy this dependency.
2244 # This includes pkg_id_t, but also other versions.
2245 continue
2247 # pkg_id_s can satisfy the dep
2248 found_alternative = True
2250 if not found_alternative:
2251 return True
2252 return False
2254 def check_upgrade(
2255 self,
2256 pkg_id_t: BinaryPackageId,
2257 pkg_id_s: Optional[BinaryPackageId],
2258 source_name: str,
2259 myarch: str,
2260 broken_binaries: set[str],
2261 excuse: "Excuse",
2262 ) -> PolicyVerdict:
2263 verdict = PolicyVerdict.PASS
2265 pkg_universe = self._pkg_universe
2266 all_binaries = self._all_binaries
2268 # check all rdeps of the package in testing
2269 rdeps_t = pkg_universe.reverse_dependencies_of(pkg_id_t)
2271 for rdep_pkg in sorted(rdeps_t):
2272 rdep_p = all_binaries[rdep_pkg]
2274 # check some cases where the rdep won't become uninstallable, or
2275 # where we don't care if it does
2276 if self.should_skip_rdep(rdep_p, source_name, myarch):
2277 continue
2279 if not self.breaks_installability(pkg_id_t, pkg_id_s, rdep_pkg):
2280 # if upgrading pkg_id_t to pkg_id_s doesn't break rdep_pkg,
2281 # there is no implicit dependency
2282 continue
2284 # The upgrade breaks the installability of the rdep. We need to
2285 # find out if there is a newer version of the rdep that solves the
2286 # uninstallability. If that is the case, there is an implicit
2287 # dependency. If not, the upgrade will fail.
2289 # check source versions
2290 newer_versions = find_newer_binaries(
2291 self.suite_info, rdep_p, add_source_for_dropped_bin=True
2292 )
2293 good_newer_versions = set()
2294 for npkg, suite in newer_versions:
2295 if npkg.architecture == "source":
2296 # When a newer version of the source package doesn't have
2297 # the binary, we get the source as 'newer version'. In
2298 # this case, the binary will not be uninstallable if the
2299 # newer source migrates, because it is no longer there.
2300 good_newer_versions.add(npkg)
2301 continue
2302 assert isinstance(npkg, BinaryPackageId)
2303 if not self.breaks_installability(pkg_id_t, pkg_id_s, npkg):
2304 good_newer_versions.add(npkg)
2306 if good_newer_versions:
2307 spec = DependencySpec(DependencyType.IMPLICIT_DEPENDENCY, myarch)
2308 excuse.add_package_depends(spec, good_newer_versions)
2309 else:
2310 # no good newer versions: no possible solution
2311 broken_binaries.add(rdep_pkg.name)
2312 if pkg_id_s:
2313 action = "migrating %s to %s" % (
2314 pkg_id_s.name,
2315 self.suite_info.target_suite.name,
2316 )
2317 else:
2318 action = "removing %s from %s" % (
2319 pkg_id_t.name,
2320 self.suite_info.target_suite.name,
2321 )
2322 info = '{0} makes <a href="#{1}">{1}</a> uninstallable'.format(
2323 action, rdep_pkg.name
2324 )
2325 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2326 excuse.add_verdict_info(verdict, info)
2328 return verdict
2330 def apply_srcarch_policy_impl(
2331 self,
2332 implicit_dep_info: dict[str, Any],
2333 item: MigrationItem,
2334 arch: str,
2335 source_data_tdist: Optional[SourcePackage],
2336 source_data_srcdist: SourcePackage,
2337 excuse: "Excuse",
2338 ) -> PolicyVerdict:
2339 verdict = PolicyVerdict.PASS
2341 if not source_data_tdist:
2342 # this item is not currently in testing: no implicit dependency
2343 return verdict
2345 if excuse.hasreason("missingbuild"):
2346 # if the build is missing, the policy would treat this as if the
2347 # binaries would be removed, which would give incorrect (and
2348 # confusing) info
2349 info = "missing build, not checking implicit dependencies on %s" % (arch)
2350 excuse.add_detailed_info(info)
2351 return verdict
2353 source_suite = item.suite
2354 source_name = item.package
2355 target_suite = self.suite_info.target_suite
2356 all_binaries = self._all_binaries
2358 # we check all binaries for this excuse that are currently in testing
2359 relevant_binaries = [
2360 x
2361 for x in source_data_tdist.binaries
2362 if (arch == "source" or x.architecture == arch)
2363 and x.package_name in target_suite.binaries[x.architecture]
2364 and x.architecture not in self._new_arches
2365 and x.architecture not in self._break_arches
2366 and x.architecture not in self._outofsync_arches
2367 ]
2369 broken_binaries: set[str] = set()
2371 assert self.hints is not None
2372 for pkg_id_t in sorted(relevant_binaries):
2373 mypkg = pkg_id_t.package_name
2374 myarch = pkg_id_t.architecture
2375 binaries_t_a = target_suite.binaries[myarch]
2376 binaries_s_a = source_suite.binaries[myarch]
2378 if target_suite.is_cruft(all_binaries[pkg_id_t]):
2379 # this binary is cruft in testing: it will stay around as long
2380 # as necessary to satisfy dependencies, so we don't need to
2381 # care
2382 continue
2384 if mypkg in binaries_s_a:
2385 mybin = binaries_s_a[mypkg]
2386 pkg_id_s: Optional["BinaryPackageId"] = mybin.pkg_id
2387 if mybin.source != source_name:
2388 # hijack: this is too complicated to check, so we ignore
2389 # it (the migration code will check the installability
2390 # later anyway)
2391 pass
2392 elif mybin.source_version != source_data_srcdist.version:
2393 # cruft in source suite: pretend the binary doesn't exist
2394 pkg_id_s = None
2395 elif pkg_id_t == pkg_id_s:
2396 # same binary (probably arch: all from a binNMU):
2397 # 'upgrading' doesn't change anything, for this binary, so
2398 # it won't break anything
2399 continue
2400 else:
2401 pkg_id_s = None
2403 if not pkg_id_s and is_smooth_update_allowed(
2404 binaries_t_a[mypkg], self._smooth_updates, self.hints
2405 ):
2406 # the binary isn't in the new version (or is cruft there), and
2407 # smooth updates are allowed: the binary can stay around if
2408 # that is necessary to satisfy dependencies, so we don't need
2409 # to check it
2410 continue
2412 if (
2413 not pkg_id_s
2414 and source_data_tdist.version == source_data_srcdist.version
2415 and source_suite.suite_class == SuiteClass.ADDITIONAL_SOURCE_SUITE
2416 and binaries_t_a[mypkg].architecture == "all"
2417 ):
2418 # we're very probably migrating a binNMU built in tpu where the arch:all
2419 # binaries were not copied to it as that's not needed. This policy could
2420 # needlessly block.
2421 continue
2423 v = self.check_upgrade(
2424 pkg_id_t, pkg_id_s, source_name, myarch, broken_binaries, excuse
2425 )
2426 verdict = PolicyVerdict.worst_of(verdict, v)
2428 # each arch is processed separately, so if we already have info from
2429 # other archs, we need to merge the info from this arch
2430 broken_old = set()
2431 if "implicit-deps" not in implicit_dep_info:
2432 implicit_dep_info["implicit-deps"] = {}
2433 else:
2434 broken_old = set(implicit_dep_info["implicit-deps"]["broken-binaries"])
2436 implicit_dep_info["implicit-deps"]["broken-binaries"] = sorted(
2437 broken_old | broken_binaries
2438 )
2440 return verdict
2443class ReverseRemovalPolicy(AbstractBasePolicy):
2444 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2445 super().__init__(
2446 "reverseremoval",
2447 options,
2448 suite_info,
2449 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
2450 )
2452 def register_hints(self, hint_parser: HintParser) -> None:
2453 hint_parser.register_hint_type(
2454 "ignore-reverse-remove", split_into_one_hint_per_package
2455 )
2457 def initialise(self, britney: "Britney") -> None:
2458 super().initialise(britney)
2460 pkg_universe = britney.pkg_universe
2461 source_suites = britney.suite_info.source_suites
2462 target_suite = britney.suite_info.target_suite
2464 # Build set of the sources of reverse (Build-) Depends
2465 assert self.hints is not None
2466 hints = self.hints.search("remove")
2468 rev_bin: dict[BinaryPackageId, set[str]] = defaultdict(set)
2469 for hint in hints:
2470 for item in hint.packages:
2471 # I think we don't need to look at the target suite
2472 for src_suite in source_suites:
2473 try:
2474 my_bins = set(src_suite.sources[item.uvname].binaries)
2475 except KeyError:
2476 continue
2477 compute_reverse_tree(pkg_universe, my_bins)
2478 for this_bin in my_bins:
2479 rev_bin.setdefault(this_bin, set()).add(item.uvname)
2481 rev_src: dict[str, set[str]] = defaultdict(set)
2482 for bin_pkg, reasons in rev_bin.items():
2483 # If the pkg is in the target suite, there's nothing this
2484 # policy wants to do.
2485 if target_suite.is_pkg_in_the_suite(bin_pkg):
2486 continue
2487 that_bin = britney.all_binaries[bin_pkg]
2488 bin_src = that_bin.source + "/" + that_bin.source_version
2489 rev_src.setdefault(bin_src, set()).update(reasons)
2490 self._block_src_for_rm_hint = rev_src
2492 def apply_src_policy_impl(
2493 self,
2494 rev_remove_info: dict[str, Any],
2495 item: MigrationItem,
2496 source_data_tdist: Optional[SourcePackage],
2497 source_data_srcdist: SourcePackage,
2498 excuse: "Excuse",
2499 ) -> PolicyVerdict:
2500 verdict = PolicyVerdict.PASS
2502 if item.name in self._block_src_for_rm_hint:
2503 reason = ", ".join(sorted(self._block_src_for_rm_hint[item.name]))
2504 assert self.hints is not None
2505 ignore_hints = self.hints.search(
2506 "ignore-reverse-remove", package=item.uvname, version=item.version
2507 )
2508 excuse.addreason("reverseremoval")
2509 if ignore_hints:
2510 excuse.addreason("ignore-reverse-remove")
2511 excuse.addinfo(
2512 "Should block migration because of remove hint for %s, but forced by %s"
2513 % (reason, ignore_hints[0].user)
2514 )
2515 verdict = PolicyVerdict.PASS_HINTED
2516 else:
2517 excuse.addinfo("Remove hint for (transitive) dependency: %s" % reason)
2518 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2520 return verdict
2523class ReproduciblePolicy(AbstractBasePolicy):
2524 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2525 super().__init__(
2526 "reproducible",
2527 options,
2528 suite_info,
2529 {SuiteClass.PRIMARY_SOURCE_SUITE},
2530 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
2531 )
2532 self._reproducible: dict[str, Any] = {
2533 "source": {},
2534 "target": {},
2535 }
2537 # Default values for this policy's options
2538 parse_option(options, "repro_success_bounty", default=0, to_int=True)
2539 parse_option(options, "repro_regression_penalty", default=0, to_int=True)
2540 parse_option(options, "repro_url")
2541 parse_option(options, "repro_retry_url")
2542 parse_option(options, "repro_components")
2544 def register_hints(self, hint_parser: HintParser) -> None:
2545 hint_parser.register_hint_type(
2546 "ignore-reproducible", split_into_one_hint_per_package
2547 )
2549 def initialise(self, britney: "Britney") -> None:
2550 super().initialise(britney)
2551 source_suite = self.suite_info.primary_source_suite
2552 target_suite = self.suite_info.target_suite
2553 try:
2554 filename = os.path.join(self.state_dir, "reproducible.json")
2555 except AttributeError as e: # pragma: no cover
2556 raise RuntimeError(
2557 "Please set STATE_DIR in the britney configuration"
2558 ) from e
2560 self._reproducible = self._read_repro_status(
2561 filename,
2562 source=set((source_suite.name, source_suite.codename)),
2563 target=set((target_suite.name, target_suite.codename)),
2564 )
2566 def apply_srcarch_policy_impl(
2567 self,
2568 reproducible_info: dict[str, Any],
2569 item: MigrationItem,
2570 arch: str,
2571 source_data_tdist: Optional[SourcePackage],
2572 source_data_srcdist: SourcePackage,
2573 excuse: "Excuse",
2574 ) -> PolicyVerdict:
2575 verdict = PolicyVerdict.PASS
2577 # we don't want to apply this policy (yet) on binNMUs
2578 if item.architecture != "source":
2579 return verdict
2581 # we're not supposed to judge on this arch
2582 if arch not in self.options.repro_arches:
2583 return verdict
2585 # bail out if this arch has no packages for this source (not build
2586 # here)
2587 if arch not in excuse.packages:
2588 return verdict
2590 # horrible hard-coding, but currently, we don't keep track of the
2591 # component when loading the packages files
2592 component = "main"
2593 section = source_data_srcdist.section
2594 if "/" in section:
2595 component = section.split("/")[0]
2597 if (
2598 self.options.repro_components
2599 and component not in self.options.repro_components
2600 ):
2601 return verdict
2603 source_name = item.package
2604 try:
2605 tar_res = self._reproducible["target"][arch]
2606 src_res = self._reproducible["source"][arch]
2607 except KeyError:
2608 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2609 msg = "No reproducible data available at all for %s" % arch
2610 excuse.add_verdict_info(verdict, msg)
2611 return verdict
2613 if source_data_tdist is None:
2614 target_suite_state = "new"
2615 elif source_name not in tar_res:
2616 target_suite_state = "unknown"
2617 elif tar_res[source_name]["version"] == source_data_tdist.version:
2618 target_suite_state = tar_res[source_name]["status"]
2619 else:
2620 target_suite_state = "stale"
2622 if source_name in src_res and src_res[source_name]["version"] == item.version:
2623 source_suite_state = src_res[source_name]["status"]
2624 else:
2625 source_suite_state = "unknown"
2627 # status of ['E404', 'FTBFS', 'FTBR', 'NFU', 'blacklisted', 'depwait',
2628 # 'reproducible', 'timeout'] with ['new', 'stale', 'unknown']
2629 wait_states = ("E404", "depwait", "stale", "timeout", "unknown")
2630 no_build_states = ("FTBFS", "NFU", "blacklisted")
2632 # if this package doesn't build on this architecture, we don't need to
2633 # judge it
2634 # FTBFS: Fails to build from source on r-b infra
2635 # NFU: the package explicitly doesn't support building on this arch
2636 # blacklisted: per package per arch per suite
2637 if source_suite_state in no_build_states:
2638 return verdict
2639 # Assume depwait in the source suite only are intermittent (might not
2640 # be true, e.g. with new build depends)
2641 if source_suite_state == target_suite_state and target_suite_state == "depwait":
2642 return verdict
2644 if self.options.repro_url:
2645 url = self.options.repro_url.format(package=quote(source_name), arch=arch)
2646 url_html = ' - <a href="%s">info</a>' % url
2647 if self.options.repro_retry_url:
2648 url_html += (
2649 ' <a href="%s">♻ </a>'
2650 % self.options.repro_retry_url.format(
2651 package=quote(source_name), arch=arch
2652 )
2653 )
2654 # When run on multiple archs, the last one "wins"
2655 reproducible_info["reproducible-test-url"] = url
2656 else:
2657 url = None
2658 url_html = ""
2660 eligible_for_bounty = False
2661 if source_suite_state == "reproducible":
2662 verdict = PolicyVerdict.PASS
2663 msg = "Reproducible on %s%s" % (arch, url_html)
2664 reproducible_info.setdefault("test-results", []).append(
2665 "reproducible on %s" % arch
2666 )
2667 eligible_for_bounty = True
2668 elif source_suite_state == "FTBR":
2669 if target_suite_state == "new":
2670 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2671 msg = "New but not reproducible on %s%s" % (arch, url_html)
2672 reproducible_info.setdefault("test-results", []).append(
2673 "new but not reproducible on %s" % arch
2674 )
2675 elif target_suite_state in wait_states:
2676 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2677 msg = "Waiting for reproducibility reference results on %s%s" % (
2678 arch,
2679 url_html,
2680 )
2681 reproducible_info.setdefault("test-results", []).append(
2682 "waiting-for-reference-results on %s" % arch
2683 )
2684 elif target_suite_state == "reproducible":
2685 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2686 msg = "Reproducibility regression on %s%s" % (arch, url_html)
2687 reproducible_info.setdefault("test-results", []).append(
2688 "regression on %s" % arch
2689 )
2690 elif target_suite_state == "FTBR":
2691 verdict = PolicyVerdict.PASS
2692 msg = "Ignoring non-reproducibility on %s (not a regression)%s" % (
2693 arch,
2694 url_html,
2695 )
2696 reproducible_info.setdefault("test-results", []).append(
2697 "not reproducible on %s" % arch
2698 )
2699 else:
2700 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2701 msg = "No reference result, but not reproducibility on %s%s" % (
2702 arch,
2703 url_html,
2704 )
2705 reproducible_info.setdefault("test-results", []).append(
2706 "reference %s on %s" % (target_suite_state, arch)
2707 )
2708 elif source_suite_state in wait_states:
2709 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2710 msg = "Waiting for reproducibility test results on %s%s" % (arch, url_html)
2711 reproducible_info.setdefault("test-results", []).append(
2712 "waiting-for-test-results on %s" % arch
2713 )
2714 else:
2715 raise KeyError("Unhandled reproducibility state %s" % source_suite_state)
2717 if verdict.is_rejected:
2718 assert self.hints is not None
2719 for hint_arch in ("source", arch):
2720 for ignore_hint in self.hints.search(
2721 "ignore-reproducible",
2722 package=source_name,
2723 version=source_data_srcdist.version,
2724 architecture=hint_arch,
2725 ):
2726 verdict = PolicyVerdict.PASS_HINTED
2727 reproducible_info.setdefault("ignored-reproducible", {}).setdefault(
2728 arch, {}
2729 ).setdefault("issued-by", []).append(ignore_hint.user)
2730 excuse.addinfo(
2731 "Ignoring reproducibility issue on %s as requested "
2732 "by %s" % (arch, ignore_hint.user)
2733 )
2734 break
2736 if self.options.repro_success_bounty and eligible_for_bounty:
2737 excuse.add_bounty("reproducibility", self.options.repro_success_bounty)
2739 if self.options.repro_regression_penalty and verdict in {
2740 PolicyVerdict.REJECTED_PERMANENTLY,
2741 PolicyVerdict.REJECTED_TEMPORARILY,
2742 }:
2743 if self.options.repro_regression_penalty > 0:
2744 excuse.add_penalty(
2745 "reproducibility", self.options.repro_regression_penalty
2746 )
2747 # In case we give penalties instead of blocking, we must always pass
2748 verdict = PolicyVerdict.PASS
2750 if verdict.is_rejected:
2751 excuse.add_verdict_info(verdict, msg)
2752 else:
2753 excuse.addinfo(msg)
2755 return verdict
2757 def _read_repro_status(
2758 self, filename: str, source: set[str], target: set[str]
2759 ) -> dict[str, dict[str, str]]:
2760 summary = self._reproducible
2761 self.logger.info("Loading reproducibility report from %s", filename)
2762 with open(filename) as fd:
2763 if os.fstat(fd.fileno()).st_size < 1:
2764 return summary
2765 data = json.load(fd)
2767 for result in data:
2768 if result["suite"] in source:
2769 summary["source"].setdefault(result["architecture"], {})[
2770 result["package"]
2771 ] = result
2772 if result["suite"] in target:
2773 summary["target"].setdefault(result["architecture"], {})[
2774 result["package"]
2775 ] = result
2777 return summary