Coverage for britney2/policies/policy.py: 85%
1255 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-08 19:15 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-08 19:15 +0000
1import json
2import logging
3import optparse
4import os
5import re
6import sys
7import time
8from abc import ABC, abstractmethod
9from collections import defaultdict
10from collections.abc import Callable, Container
11from enum import IntEnum, unique
12from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, cast
13from urllib.parse import quote
15import apt_pkg
16import yaml
18from britney2 import (
19 BinaryPackage,
20 BinaryPackageId,
21 DependencyType,
22 PackageId,
23 SourcePackage,
24 Suite,
25 SuiteClass,
26 Suites,
27 TargetSuite,
28)
29from britney2.excusedeps import DependencySpec
30from britney2.hints import (
31 Hint,
32 HintAnnotate,
33 HintCollection,
34 HintParser,
35 HintType,
36 PolicyHintParserProto,
37)
38from britney2.inputs.suiteloader import SuiteContentLoader
39from britney2.migrationitem import MigrationItem, MigrationItemFactory
40from britney2.policies import ApplySrcPolicy, PolicyVerdict
41from britney2.utils import (
42 GetDependencySolversProto,
43 compute_reverse_tree,
44 filter_out_faux,
45 find_newer_binaries,
46 get_dependency_solvers,
47 is_smooth_update_allowed,
48 parse_option,
49)
51if TYPE_CHECKING: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true
52 from ..britney import Britney
53 from ..excuse import Excuse
54 from ..installability.universe import BinaryPackageUniverse
57class PolicyLoadRequest:
58 __slots__ = ("_options_name", "_default_value", "_policy_constructor")
60 def __init__(
61 self,
62 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"],
63 options_name: str | None,
64 default_value: bool,
65 ) -> None:
66 self._policy_constructor = policy_constructor
67 self._options_name = options_name
68 self._default_value = default_value
70 def is_enabled(self, options: optparse.Values) -> bool:
71 if self._options_name is None:
72 assert self._default_value
73 return True
74 actual_value = getattr(options, self._options_name, None)
75 if actual_value is None:
76 return self._default_value
77 return actual_value.lower() in ("yes", "y", "true", "t")
79 def load(self, options: optparse.Values, suite_info: Suites) -> "BasePolicy":
80 return self._policy_constructor(options, suite_info)
82 @classmethod
83 def always_load(
84 cls, policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"]
85 ) -> "PolicyLoadRequest":
86 return cls(policy_constructor, None, True)
88 @classmethod
89 def conditionally_load(
90 cls,
91 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"],
92 option_name: str,
93 default_value: bool,
94 ) -> "PolicyLoadRequest":
95 return cls(policy_constructor, option_name, default_value)
98class PolicyEngine:
99 def __init__(self) -> None:
100 self._policies: list["BasePolicy"] = []
102 def add_policy(self, policy: "BasePolicy") -> None:
103 self._policies.append(policy)
105 def load_policies(
106 self,
107 options: optparse.Values,
108 suite_info: Suites,
109 policy_load_requests: list[PolicyLoadRequest],
110 ) -> None:
111 for policy_load_request in policy_load_requests:
112 if policy_load_request.is_enabled(options):
113 self.add_policy(policy_load_request.load(options, suite_info))
115 def register_policy_hints(self, hint_parser: HintParser) -> None:
116 for policy in self._policies:
117 policy.register_hints(hint_parser)
119 def initialise(self, britney: "Britney", hints: HintCollection) -> None:
120 for policy in self._policies:
121 policy.hints = hints
122 policy.initialise(britney)
124 def save_state(self, britney: "Britney") -> None:
125 for policy in self._policies:
126 policy.save_state(britney)
128 def apply_src_policies(
129 self,
130 item: MigrationItem,
131 source_t: SourcePackage | None,
132 source_u: SourcePackage,
133 excuse: "Excuse",
134 ) -> None:
135 excuse_verdict = excuse.policy_verdict
136 source_suite = item.suite
137 suite_class = source_suite.suite_class
138 for policy in self._policies:
139 pinfo: dict[str, Any] = {}
140 policy_verdict = PolicyVerdict.NOT_APPLICABLE
141 if suite_class in policy.applicable_suites:
142 if policy.src_policy.run_arch:
143 for arch in policy.options.architectures:
144 v = policy.apply_srcarch_policy_impl(
145 pinfo, item, arch, source_t, source_u, excuse
146 )
147 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v)
148 if policy.src_policy.run_src:
149 v = policy.apply_src_policy_impl(
150 pinfo, item, source_t, source_u, excuse
151 )
152 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v)
153 # The base policy provides this field, so the subclass should leave it blank
154 assert "verdict" not in pinfo
155 if policy_verdict != PolicyVerdict.NOT_APPLICABLE:
156 excuse.policy_info[policy.policy_id] = pinfo
157 pinfo["verdict"] = policy_verdict.name
158 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict)
159 excuse.policy_verdict = excuse_verdict
161 def apply_srcarch_policies(
162 self,
163 item: MigrationItem,
164 arch: str,
165 source_t: SourcePackage | None,
166 source_u: SourcePackage,
167 excuse: "Excuse",
168 ) -> None:
169 excuse_verdict = excuse.policy_verdict
170 source_suite = item.suite
171 suite_class = source_suite.suite_class
172 for policy in self._policies:
173 pinfo: dict[str, Any] = {}
174 if suite_class in policy.applicable_suites:
175 policy_verdict = policy.apply_srcarch_policy_impl(
176 pinfo, item, arch, source_t, source_u, excuse
177 )
178 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict)
179 # The base policy provides this field, so the subclass should leave it blank
180 assert "verdict" not in pinfo
181 if policy_verdict != PolicyVerdict.NOT_APPLICABLE:
182 excuse.policy_info[policy.policy_id] = pinfo
183 pinfo["verdict"] = policy_verdict.name
184 excuse.policy_verdict = excuse_verdict
187class BasePolicy(ABC):
188 britney: "Britney"
189 policy_id: str
190 hints: HintCollection | None
191 applicable_suites: set[SuiteClass]
192 src_policy: ApplySrcPolicy
193 options: optparse.Values
194 suite_info: Suites
196 def __init__(
197 self,
198 options: optparse.Values,
199 suite_info: Suites,
200 ) -> None:
201 """The BasePolicy constructor
203 :param options: The options member of Britney with all the
204 config values.
205 """
207 @property
208 @abstractmethod
209 def state_dir(self) -> str: ... 209 ↛ exitline 209 didn't return from function 'state_dir' because
211 def register_hints(self, hint_parser: HintParser) -> None: # pragma: no cover
212 """Register new hints that this policy accepts
214 :param hint_parser: (see HintParser.register_hint_type)
215 """
217 def initialise(self, britney: "Britney") -> None: # pragma: no cover
218 """Called once to make the policy initialise any data structures
220 This is useful for e.g. parsing files or other "heavy do-once" work.
222 :param britney: This is the instance of the "Britney" class.
223 """
224 self.britney = britney
226 def save_state(self, britney: "Britney") -> None: # pragma: no cover
227 """Called once at the end of the run to make the policy save any persistent data
229 Note this will *not* be called for "dry-runs" as such runs should not change
230 the state.
232 :param britney: This is the instance of the "Britney" class.
233 """
235 def apply_src_policy_impl(
236 self,
237 policy_info: dict[str, Any],
238 item: MigrationItem,
239 source_data_tdist: SourcePackage | None,
240 source_data_srcdist: SourcePackage,
241 excuse: "Excuse",
242 ) -> PolicyVerdict: # pragma: no cover
243 """Apply a policy on a given source migration
245 Britney will call this method on a given source package, when
246 Britney is considering to migrate it from the given source
247 suite to the target suite. The policy will then evaluate the
248 the migration and then return a verdict.
250 :param policy_info: A dictionary of all policy results. The
251 policy can add a value stored in a key related to its name.
252 (e.g. policy_info['age'] = {...}). This will go directly into
253 the "excuses.yaml" output.
255 :param item: The migration item the policy is applied to.
257 :param source_data_tdist: Information about the source package
258 in the target distribution (e.g. "testing"). This is the
259 data structure in source_suite.sources[source_name]
261 :param source_data_srcdist: Information about the source
262 package in the source distribution (e.g. "unstable" or "tpu").
263 This is the data structure in target_suite.sources[source_name]
265 :return: A Policy Verdict (e.g. PolicyVerdict.PASS)
266 """
267 return PolicyVerdict.NOT_APPLICABLE
269 def apply_srcarch_policy_impl(
270 self,
271 policy_info: dict[str, Any],
272 item: MigrationItem,
273 arch: str,
274 source_data_tdist: SourcePackage | None,
275 source_data_srcdist: SourcePackage,
276 excuse: "Excuse",
277 ) -> PolicyVerdict:
278 """Apply a policy on a given binary migration
280 Britney will call this method on binaries from a given source package
281 on a given architecture, when Britney is considering to migrate them
282 from the given source suite to the target suite. The policy will then
283 evaluate the migration and then return a verdict.
285 :param policy_info: A dictionary of all policy results. The
286 policy can add a value stored in a key related to its name.
287 (e.g. policy_info['age'] = {...}). This will go directly into
288 the "excuses.yaml" output.
290 :param item: The migration item the policy is applied to.
292 :param arch: The architecture the item is applied to. This is mostly
293 relevant for policies where src_policy is not ApplySrcPolicy.RUN_SRC
294 (as that is the only case where arch can differ from item.architecture)
296 :param source_data_tdist: Information about the source package
297 in the target distribution (e.g. "testing"). This is the
298 data structure in source_suite.sources[source_name]
300 :param source_data_srcdist: Information about the source
301 package in the source distribution (e.g. "unstable" or "tpu").
302 This is the data structure in target_suite.sources[source_name]
304 :return: A Policy Verdict (e.g. PolicyVerdict.PASS)
305 """
306 # if the policy doesn't implement this function, assume it's OK
307 return PolicyVerdict.NOT_APPLICABLE
310class AbstractBasePolicy(BasePolicy):
311 """
312 A shared abstract class for building BasePolicy objects.
314 tests/test_policy.py:initialize_policy() needs to be able to build BasePolicy
315 objects with just a two-item constructor, while all other uses of BasePolicy-
316 derived objects need the 5-item constructor. So AbstractBasePolicy was split
317 out to document this.
318 """
320 def __init__(
321 self,
322 policy_id: str,
323 options: optparse.Values,
324 suite_info: Suites,
325 applicable_suites: set[SuiteClass],
326 src_policy: ApplySrcPolicy = ApplySrcPolicy.RUN_SRC,
327 ) -> None:
328 """Concrete initializer.
330 :param policy_id: Identifies the policy. It will
331 determine the key used for the excuses.yaml etc.
333 :param options: The options member of Britney with all the
334 config values.
336 :param applicable_suites: Where this policy applies.
337 """
338 self.policy_id = policy_id
339 self.options = options
340 self.suite_info = suite_info
341 self.applicable_suites = applicable_suites
342 self.src_policy = src_policy
343 self.hints: HintCollection | None = None
344 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
345 self.logger = logging.getLogger(logger_name)
347 @property
348 def state_dir(self) -> str:
349 return cast(str, self.options.state_dir)
352_T = TypeVar("_T")
355class SimplePolicyHint(Hint, Generic[_T]):
356 def __init__(
357 self,
358 user: str,
359 hint_type: HintType,
360 policy_parameter: _T,
361 packages: list[MigrationItem],
362 ) -> None:
363 super().__init__(user, hint_type, packages)
364 self._policy_parameter = policy_parameter
366 def __eq__(self, other: Any) -> bool:
367 if self.type != other.type or self._policy_parameter != other._policy_parameter:
368 return False
369 return super().__eq__(other)
371 def str(self) -> str:
372 return "{} {} {}".format(
373 self._type,
374 str(self._policy_parameter),
375 " ".join(x.name for x in self._packages),
376 )
379class AgeDayHint(SimplePolicyHint[int]):
380 @property
381 def days(self) -> int:
382 return self._policy_parameter
385class IgnoreRCBugHint(SimplePolicyHint[frozenset[str]]):
386 @property
387 def ignored_rcbugs(self) -> frozenset[str]:
388 return self._policy_parameter
391def simple_policy_hint_parser_function(
392 class_name: Callable[[str, HintType, _T, list[MigrationItem]], Hint],
393 converter: Callable[[str], _T],
394) -> PolicyHintParserProto:
395 def f(
396 mi_factory: MigrationItemFactory,
397 hints: HintCollection,
398 who: str,
399 hint_type: HintType,
400 *args: str,
401 ) -> None:
402 policy_parameter = args[0]
403 args = args[1:]
404 for item in mi_factory.parse_items(*args):
405 hints.add_hint(
406 class_name(who, hint_type, converter(policy_parameter), [item])
407 )
409 return f
412class AgePolicy(AbstractBasePolicy):
413 """Configurable Aging policy for source migrations
415 The AgePolicy will let packages stay in the source suite for a pre-defined
416 amount of days before letting migrate (based on their urgency, if any).
418 The AgePolicy's decision is influenced by the following:
420 State files:
421 * ${STATE_DIR}/age-policy-urgencies: File containing urgencies for source
422 packages. Note that urgencies are "sticky" and the most "urgent" urgency
423 will be used (i.e. the one with lowest age-requirements).
424 - This file needs to be updated externally, if the policy should take
425 urgencies into consideration. If empty (or not updated), the policy
426 will simply use the default urgency (see the "Config" section below)
427 - In Debian, these values are taken from the .changes file, but that is
428 not a requirement for Britney.
429 * ${STATE_DIR}/age-policy-dates: File containing the age of all source
430 packages.
431 - The policy will automatically update this file.
432 Config:
433 * DEFAULT_URGENCY: Name of the urgency used for packages without an urgency
434 (or for unknown urgencies). Will also be used to set the "minimum"
435 aging requirements for packages not in the target suite.
436 * MINDAYS_<URGENCY>: The age-requirements in days for packages with the
437 given urgency.
438 - Commonly used urgencies are: low, medium, high, emergency, critical
439 Hints:
440 * urgent <source>/<version>: Disregard the age requirements for a given
441 source/version.
442 * age-days X <source>/<version>: Set the age requirements for a given
443 source/version to X days. Note that X can exceed the highest
444 age-requirement normally given.
446 """
448 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
449 super().__init__("age", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE})
450 self._min_days = self._generate_mindays_table()
451 self._min_days_default = 0
452 # britney's "day" begins at 7pm (we want aging to occur in the 22:00Z run and we run Britney 2-4 times a day)
453 # NB: _date_now is used in tests
454 time_now = time.time()
455 if hasattr(self.options, "fake_runtime"):
456 time_now = int(self.options.fake_runtime)
457 self.logger.info("overriding runtime with fake_runtime %d" % time_now)
459 self._date_now = int(((time_now / (60 * 60)) - 19) / 24)
460 self._dates: dict[str, tuple[str, int]] = {}
461 self._urgencies: dict[str, str] = {}
462 self._default_urgency: str = self.options.default_urgency
463 self._penalty_immune_urgencies: frozenset[str] = frozenset()
464 if hasattr(self.options, "no_penalties"):
465 self._penalty_immune_urgencies = frozenset(
466 x.strip() for x in self.options.no_penalties.split()
467 )
468 self._bounty_min_age: int | None = None # initialised later
470 def _generate_mindays_table(self) -> dict[str, int]:
471 mindays: dict[str, int] = {}
472 for k in dir(self.options):
473 if not k.startswith("mindays_"):
474 continue
475 v = getattr(self.options, k)
476 try:
477 as_days = int(v)
478 except ValueError:
479 raise ValueError(
480 "Unable to parse "
481 + k
482 + " as a number of days. Must be 0 or a positive integer"
483 )
484 if as_days < 0: 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true
485 raise ValueError(
486 "The value of " + k + " must be zero or a positive integer"
487 )
488 mindays[k.split("_")[1]] = as_days
489 return mindays
491 def register_hints(self, hint_parser: HintParser) -> None:
492 hint_parser.register_hint_type(
493 HintType(
494 "age-days",
495 simple_policy_hint_parser_function(AgeDayHint, int),
496 min_args=2,
497 )
498 )
499 hint_parser.register_hint_type(HintType("urgent"))
501 def initialise(self, britney: "Britney") -> None:
502 super().initialise(britney)
503 self._read_dates_file()
504 self._read_urgencies_file()
505 if self._default_urgency not in self._min_days: # pragma: no cover
506 raise ValueError(
507 "Missing age-requirement for default urgency (MINDAYS_%s)"
508 % self._default_urgency
509 )
510 self._min_days_default = self._min_days[self._default_urgency]
511 try:
512 self._bounty_min_age = int(self.options.bounty_min_age)
513 except ValueError: 513 ↛ 514line 513 didn't jump to line 514 because the exception caught by line 513 didn't happen
514 if self.options.bounty_min_age in self._min_days:
515 self._bounty_min_age = self._min_days[self.options.bounty_min_age]
516 else: # pragma: no cover
517 raise ValueError(
518 "Please fix BOUNTY_MIN_AGE in the britney configuration"
519 )
520 except AttributeError:
521 # The option wasn't defined in the configuration
522 self._bounty_min_age = 0
524 def save_state(self, britney: "Britney") -> None:
525 super().save_state(britney)
526 self._write_dates_file()
528 def apply_src_policy_impl(
529 self,
530 age_info: dict[str, Any],
531 item: MigrationItem,
532 source_data_tdist: SourcePackage | None,
533 source_data_srcdist: SourcePackage,
534 excuse: "Excuse",
535 ) -> PolicyVerdict:
536 # retrieve the urgency for the upload, ignoring it if this is a NEW package
537 # (not present in the target suite)
538 source_name = item.package
539 urgency = self._urgencies.get(source_name, self._default_urgency)
541 if urgency not in self._min_days: 541 ↛ 542line 541 didn't jump to line 542 because the condition on line 541 was never true
542 age_info["unknown-urgency"] = urgency
543 urgency = self._default_urgency
545 if not source_data_tdist:
546 if self._min_days[urgency] < self._min_days_default:
547 age_info["urgency-reduced"] = {
548 "from": urgency,
549 "to": self._default_urgency,
550 }
551 urgency = self._default_urgency
553 if source_name not in self._dates:
554 self._dates[source_name] = (source_data_srcdist.version, self._date_now)
555 elif self._dates[source_name][0] != source_data_srcdist.version:
556 self._dates[source_name] = (source_data_srcdist.version, self._date_now)
558 days_old = self._date_now - self._dates[source_name][1]
559 min_days = self._min_days[urgency]
560 for bounty in excuse.bounty:
561 if excuse.bounty[bounty]: 561 ↛ 560line 561 didn't jump to line 560 because the condition on line 561 was always true
562 self.logger.info(
563 "Applying bounty for %s granted by %s: %d days",
564 source_name,
565 bounty,
566 excuse.bounty[bounty],
567 )
568 excuse.addinfo(
569 "Required age reduced by %d days because of %s"
570 % (excuse.bounty[bounty], bounty)
571 )
572 assert excuse.bounty[bounty] > 0, "negative bounties shouldn't happen"
573 min_days -= excuse.bounty[bounty]
574 if urgency not in self._penalty_immune_urgencies:
575 for penalty in excuse.penalty:
576 if excuse.penalty[penalty]: 576 ↛ 575line 576 didn't jump to line 575 because the condition on line 576 was always true
577 self.logger.info(
578 "Applying penalty for %s given by %s: %d days",
579 source_name,
580 penalty,
581 excuse.penalty[penalty],
582 )
583 excuse.addinfo(
584 "Required age increased by %d days because of %s"
585 % (excuse.penalty[penalty], penalty)
586 )
587 assert (
588 excuse.penalty[penalty] > 0
589 ), "negative penalties should be handled earlier"
590 min_days += excuse.penalty[penalty]
592 assert self._bounty_min_age is not None
593 # the age in BOUNTY_MIN_AGE can be higher than the one associated with
594 # the real urgency, so don't forget to take it into account
595 bounty_min_age = min(self._bounty_min_age, self._min_days[urgency])
596 if min_days < bounty_min_age:
597 min_days = bounty_min_age
598 excuse.addinfo(
599 "Required age is not allowed to drop below %d days" % min_days
600 )
602 age_info["current-age"] = days_old
604 assert self.hints is not None
605 for age_days_hint in cast(
606 "list[AgeDayHint]",
607 self.hints.search(
608 "age-days", package=source_name, version=source_data_srcdist.version
609 ),
610 ):
611 new_req = age_days_hint.days
612 age_info["age-requirement-reduced"] = {
613 "new-requirement": new_req,
614 "changed-by": age_days_hint.user,
615 }
616 if "original-age-requirement" not in age_info: 616 ↛ 618line 616 didn't jump to line 618 because the condition on line 616 was always true
617 age_info["original-age-requirement"] = min_days
618 min_days = new_req
620 age_info["age-requirement"] = min_days
621 res = PolicyVerdict.PASS
623 if days_old < min_days:
624 urgent_hints = self.hints.search(
625 "urgent", package=source_name, version=source_data_srcdist.version
626 )
627 if urgent_hints:
628 age_info["age-requirement-reduced"] = {
629 "new-requirement": 0,
630 "changed-by": urgent_hints[0].user,
631 }
632 res = PolicyVerdict.PASS_HINTED
633 else:
634 res = PolicyVerdict.REJECTED_TEMPORARILY
636 # update excuse
637 age_hint = age_info.get("age-requirement-reduced", None)
638 age_min_req = age_info["age-requirement"]
639 if age_hint:
640 new_req = age_hint["new-requirement"]
641 who = age_hint["changed-by"]
642 if new_req:
643 excuse.addinfo(
644 "Overriding age needed from %d days to %d by %s"
645 % (age_min_req, new_req, who)
646 )
647 age_min_req = new_req
648 else:
649 excuse.addinfo("Too young, but urgency pushed by %s" % who)
650 age_min_req = 0
651 excuse.setdaysold(age_info["current-age"], age_min_req)
653 if age_min_req == 0:
654 excuse.addinfo("%d days old" % days_old)
655 elif days_old < age_min_req:
656 excuse.add_verdict_info(
657 res, "Too young, only %d of %d days old" % (days_old, age_min_req)
658 )
659 else:
660 excuse.addinfo("%d days old (needed %d days)" % (days_old, age_min_req))
662 return res
664 def _read_dates_file(self) -> None:
665 """Parse the dates file"""
666 dates = self._dates
667 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Dates")
668 using_new_name = False
669 try:
670 filename = os.path.join(self.state_dir, "age-policy-dates")
671 if not os.path.exists(filename) and os.path.exists(fallback_filename): 671 ↛ 672line 671 didn't jump to line 672 because the condition on line 671 was never true
672 filename = fallback_filename
673 else:
674 using_new_name = True
675 except AttributeError:
676 if os.path.exists(fallback_filename):
677 filename = fallback_filename
678 else:
679 raise RuntimeError("Please set STATE_DIR in the britney configuration")
681 try:
682 with open(filename, encoding="utf-8") as fd:
683 for line in fd:
684 if line.startswith("#"):
685 # Ignore comment lines (mostly used for tests)
686 continue
687 # <source> <version> <date>)
688 ln = line.split()
689 if len(ln) != 3: # pragma: no cover
690 continue
691 try:
692 dates[ln[0]] = (ln[1], int(ln[2]))
693 except ValueError: # pragma: no cover
694 pass
695 except FileNotFoundError:
696 if not using_new_name: 696 ↛ 698line 696 didn't jump to line 698 because the condition on line 696 was never true
697 # If we using the legacy name, then just give up
698 raise
699 self.logger.info("%s does not appear to exist. Creating it", filename)
700 with open(filename, mode="x", encoding="utf-8"):
701 pass
703 def _read_urgencies_file(self) -> None:
704 urgencies = self._urgencies
705 min_days_default = self._min_days_default
706 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Urgency")
707 try:
708 filename = os.path.join(self.state_dir, "age-policy-urgencies")
709 if not os.path.exists(filename) and os.path.exists(fallback_filename): 709 ↛ 710line 709 didn't jump to line 710 because the condition on line 709 was never true
710 filename = fallback_filename
711 except AttributeError:
712 filename = fallback_filename
714 sources_s = self.suite_info.primary_source_suite.sources
715 sources_t = self.suite_info.target_suite.sources
717 with open(filename, errors="surrogateescape", encoding="ascii") as fd:
718 for line in fd:
719 if line.startswith("#"):
720 # Ignore comment lines (mostly used for tests)
721 continue
722 # <source> <version> <urgency>
723 ln = line.split()
724 if len(ln) != 3: 724 ↛ 725line 724 didn't jump to line 725 because the condition on line 724 was never true
725 continue
727 # read the minimum days associated with the urgencies
728 urgency_old = urgencies.get(ln[0], None)
729 mindays_old = self._min_days.get(urgency_old, 1000) # type: ignore[arg-type]
730 mindays_new = self._min_days.get(ln[2], min_days_default)
732 # if the new urgency is lower (so the min days are higher), do nothing
733 if mindays_old <= mindays_new:
734 continue
736 # if the package exists in the target suite and it is more recent, do nothing
737 tsrcv = sources_t.get(ln[0], None)
738 if tsrcv and apt_pkg.version_compare(tsrcv.version, ln[1]) >= 0:
739 continue
741 # if the package doesn't exist in the primary source suite or it is older, do nothing
742 usrcv = sources_s.get(ln[0], None)
743 if not usrcv or apt_pkg.version_compare(usrcv.version, ln[1]) < 0: 743 ↛ 744line 743 didn't jump to line 744 because the condition on line 743 was never true
744 continue
746 # update the urgency for the package
747 urgencies[ln[0]] = ln[2]
749 def _write_dates_file(self) -> None:
750 dates = self._dates
751 try:
752 directory = self.state_dir
753 basename = "age-policy-dates"
754 old_file = os.path.join(self.suite_info.target_suite.path, "Dates")
755 except AttributeError:
756 directory = self.suite_info.target_suite.path
757 basename = "Dates"
758 old_file = None
759 filename = os.path.join(directory, basename)
760 filename_tmp = os.path.join(directory, "%s_new" % basename)
761 with open(filename_tmp, "w", encoding="utf-8") as fd:
762 for pkg in sorted(dates):
763 version, date = dates[pkg]
764 fd.write("%s %s %d\n" % (pkg, version, date))
765 os.rename(filename_tmp, filename)
766 if old_file is not None and os.path.exists(old_file): 766 ↛ 767line 766 didn't jump to line 767 because the condition on line 766 was never true
767 self.logger.info("Removing old age-policy-dates file %s", old_file)
768 os.unlink(old_file)
771class RCBugPolicy(AbstractBasePolicy):
772 """RC bug regression policy for source migrations
774 The RCBugPolicy will read provided list of RC bugs and block any
775 source upload that would introduce a *new* RC bug in the target
776 suite.
778 The RCBugPolicy's decision is influenced by the following:
780 State files:
781 * ${STATE_DIR}/rc-bugs-${SUITE_NAME}: File containing RC bugs for packages in
782 the given suite (one for both primary source suite and the target sutie is
783 needed).
784 - These files need to be updated externally.
785 """
787 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
788 super().__init__(
789 "rc-bugs", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
790 )
791 self._bugs_source: dict[str, set[str]] | None = None
792 self._bugs_target: dict[str, set[str]] | None = None
794 def register_hints(self, hint_parser: HintParser) -> None:
795 f = simple_policy_hint_parser_function(
796 IgnoreRCBugHint, lambda x: frozenset(x.split(","))
797 )
798 hint_parser.register_hint_type(HintType("ignore-rc-bugs", f, min_args=2))
800 def initialise(self, britney: "Britney") -> None:
801 super().initialise(britney)
802 source_suite = self.suite_info.primary_source_suite
803 target_suite = self.suite_info.target_suite
804 fallback_unstable = os.path.join(source_suite.path, "BugsV")
805 fallback_testing = os.path.join(target_suite.path, "BugsV")
806 try:
807 filename_unstable = os.path.join(
808 self.state_dir, "rc-bugs-%s" % source_suite.name
809 )
810 filename_testing = os.path.join(
811 self.state_dir, "rc-bugs-%s" % target_suite.name
812 )
813 if ( 813 ↛ 819line 813 didn't jump to line 819
814 not os.path.exists(filename_unstable)
815 and not os.path.exists(filename_testing)
816 and os.path.exists(fallback_unstable)
817 and os.path.exists(fallback_testing)
818 ):
819 filename_unstable = fallback_unstable
820 filename_testing = fallback_testing
821 except AttributeError:
822 filename_unstable = fallback_unstable
823 filename_testing = fallback_testing
824 self._bugs_source = self._read_bugs(filename_unstable)
825 self._bugs_target = self._read_bugs(filename_testing)
827 def apply_src_policy_impl(
828 self,
829 rcbugs_info: dict[str, Any],
830 item: MigrationItem,
831 source_data_tdist: SourcePackage | None,
832 source_data_srcdist: SourcePackage,
833 excuse: "Excuse",
834 ) -> PolicyVerdict:
835 assert self._bugs_source is not None # for type checking
836 assert self._bugs_target is not None # for type checking
837 bugs_t = set()
838 bugs_s = set()
839 source_name = item.package
840 binaries_s = {x[0] for x in source_data_srcdist.binaries}
841 try:
842 binaries_t = {x[0] for x in source_data_tdist.binaries} # type: ignore[union-attr]
843 except AttributeError:
844 binaries_t = set()
846 src_key = f"src:{source_name}"
847 if source_data_tdist and src_key in self._bugs_target:
848 bugs_t.update(self._bugs_target[src_key])
849 if src_key in self._bugs_source:
850 bugs_s.update(self._bugs_source[src_key])
852 for pkg in binaries_s:
853 if pkg in self._bugs_source:
854 bugs_s |= self._bugs_source[pkg]
855 for pkg in binaries_t:
856 if pkg in self._bugs_target:
857 bugs_t |= self._bugs_target[pkg]
859 # The bts seems to support filing source bugs against a binary of the
860 # same name if that binary isn't built by any source. An example is bug
861 # 820347 against Package: juce (in the live-2016-04-11 test). Add those
862 # bugs too.
863 if (
864 source_name not in (binaries_s | binaries_t)
865 and source_name
866 not in {
867 x.package_name
868 for x in self.suite_info.primary_source_suite.all_binaries_in_suite.keys()
869 }
870 and source_name
871 not in {
872 x.package_name
873 for x in self.suite_info.target_suite.all_binaries_in_suite.keys()
874 }
875 ):
876 if source_name in self._bugs_source:
877 bugs_s |= self._bugs_source[source_name]
878 if source_name in self._bugs_target: 878 ↛ 879line 878 didn't jump to line 879 because the condition on line 878 was never true
879 bugs_t |= self._bugs_target[source_name]
881 # If a package is not in the target suite, it has no RC bugs per
882 # definition. Unfortunately, it seems that the live-data is
883 # not always accurate (e.g. live-2011-12-13 suggests that
884 # obdgpslogger had the same bug in testing and unstable,
885 # but obdgpslogger was not in testing at that time).
886 # - For the curious, obdgpslogger was removed on that day
887 # and the BTS probably had not caught up with that fact.
888 # (https://tracker.debian.org/news/415935)
889 assert not bugs_t or source_data_tdist, (
890 "%s had bugs in the target suite but is not present" % source_name
891 )
893 verdict = PolicyVerdict.PASS
895 assert self.hints is not None
896 for ignore_hint in cast(
897 list[IgnoreRCBugHint],
898 self.hints.search(
899 "ignore-rc-bugs",
900 package=source_name,
901 version=source_data_srcdist.version,
902 ),
903 ):
904 ignored_bugs = ignore_hint.ignored_rcbugs
906 # Only handle one hint for now
907 if "ignored-bugs" in rcbugs_info:
908 self.logger.info(
909 "Ignoring ignore-rc-bugs hint from %s on %s due to another hint from %s",
910 ignore_hint.user,
911 source_name,
912 rcbugs_info["ignored-bugs"]["issued-by"],
913 )
914 continue
915 if not ignored_bugs.isdisjoint(bugs_s): 915 ↛ 924line 915 didn't jump to line 924 because the condition on line 915 was always true
916 bugs_s -= ignored_bugs
917 bugs_t -= ignored_bugs
918 rcbugs_info["ignored-bugs"] = {
919 "bugs": sorted(ignored_bugs),
920 "issued-by": ignore_hint.user,
921 }
922 verdict = PolicyVerdict.PASS_HINTED
923 else:
924 self.logger.info(
925 "Ignoring ignore-rc-bugs hint from %s on %s as none of %s affect the package",
926 ignore_hint.user,
927 source_name,
928 str(ignored_bugs),
929 )
931 rcbugs_info["shared-bugs"] = sorted(bugs_s & bugs_t)
932 rcbugs_info["unique-source-bugs"] = sorted(bugs_s - bugs_t)
933 rcbugs_info["unique-target-bugs"] = sorted(bugs_t - bugs_s)
935 # update excuse
936 new_bugs = rcbugs_info["unique-source-bugs"]
937 old_bugs = rcbugs_info["unique-target-bugs"]
938 excuse.setbugs(old_bugs, new_bugs)
940 if new_bugs:
941 verdict = PolicyVerdict.REJECTED_PERMANENTLY
942 excuse.add_verdict_info(
943 verdict,
944 "Updating %s would introduce bugs in %s: %s"
945 % (
946 source_name,
947 self.suite_info.target_suite.name,
948 ", ".join(
949 [
950 '<a href="https://bugs.debian.org/%s">#%s</a>'
951 % (quote(a), a)
952 for a in new_bugs
953 ]
954 ),
955 ),
956 )
958 if old_bugs:
959 excuse.addinfo(
960 "Updating %s will fix bugs in %s: %s"
961 % (
962 source_name,
963 self.suite_info.target_suite.name,
964 ", ".join(
965 [
966 '<a href="https://bugs.debian.org/%s">#%s</a>'
967 % (quote(a), a)
968 for a in old_bugs
969 ]
970 ),
971 )
972 )
974 return verdict
976 def _read_bugs(self, filename: str) -> dict[str, set[str]]:
977 """Read the release critical bug summary from the specified file
979 The file contains rows with the format:
981 <package-name> <bug number>[,<bug number>...]
983 The method returns a dictionary where the key is the binary package
984 name and the value is the list of open RC bugs for it.
985 """
986 bugs: dict[str, set[str]] = {}
987 self.logger.info("Loading RC bugs data from %s", filename)
988 with open(filename, encoding="ascii") as f:
989 for line in f:
990 ln = line.split()
991 if len(ln) != 2: # pragma: no cover
992 self.logger.warning("Malformed line found in line %s", line)
993 continue
994 pkg = ln[0]
995 if pkg not in bugs:
996 bugs[pkg] = set()
997 bugs[pkg].update(ln[1].split(","))
998 return bugs
1001class PiupartsPolicy(AbstractBasePolicy):
1002 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1003 super().__init__(
1004 "piuparts", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
1005 )
1006 self._piuparts_source: dict[str, tuple[str, str]] | None = None
1007 self._piuparts_target: dict[str, tuple[str, str]] | None = None
1009 def register_hints(self, hint_parser: HintParser) -> None:
1010 hint_parser.register_hint_type(HintType("ignore-piuparts"))
1012 def initialise(self, britney: "Britney") -> None:
1013 super().initialise(britney)
1014 source_suite = self.suite_info.primary_source_suite
1015 target_suite = self.suite_info.target_suite
1016 try:
1017 filename_unstable = os.path.join(
1018 self.state_dir, "piuparts-summary-%s.json" % source_suite.name
1019 )
1020 filename_testing = os.path.join(
1021 self.state_dir, "piuparts-summary-%s.json" % target_suite.name
1022 )
1023 except AttributeError as e: # pragma: no cover
1024 raise RuntimeError(
1025 "Please set STATE_DIR in the britney configuration"
1026 ) from e
1027 self._piuparts_source = self._read_piuparts_summary(
1028 filename_unstable, keep_url=True
1029 )
1030 self._piuparts_target = self._read_piuparts_summary(
1031 filename_testing, keep_url=False
1032 )
1034 def apply_src_policy_impl(
1035 self,
1036 piuparts_info: dict[str, Any],
1037 item: MigrationItem,
1038 source_data_tdist: SourcePackage | None,
1039 source_data_srcdist: SourcePackage,
1040 excuse: "Excuse",
1041 ) -> PolicyVerdict:
1042 assert self._piuparts_source is not None # for type checking
1043 assert self._piuparts_target is not None # for type checking
1044 source_name = item.package
1046 if source_name in self._piuparts_target:
1047 testing_state = self._piuparts_target[source_name][0]
1048 else:
1049 testing_state = "X"
1050 url: str | None
1051 if source_name in self._piuparts_source:
1052 unstable_state, url = self._piuparts_source[source_name]
1053 else:
1054 unstable_state = "X"
1055 url = None
1056 url_html = "(no link yet)"
1057 if url is not None:
1058 url_html = '<a href="{0}">{0}</a>'.format(url)
1060 if unstable_state == "P":
1061 # Not a regression
1062 msg = f"Piuparts tested OK - {url_html}"
1063 result = PolicyVerdict.PASS
1064 piuparts_info["test-results"] = "pass"
1065 elif unstable_state == "F":
1066 if testing_state != unstable_state:
1067 piuparts_info["test-results"] = "regression"
1068 msg = f"Piuparts regression - {url_html}"
1069 result = PolicyVerdict.REJECTED_PERMANENTLY
1070 else:
1071 piuparts_info["test-results"] = "failed"
1072 msg = f"Piuparts failure (not a regression) - {url_html}"
1073 result = PolicyVerdict.PASS
1074 elif unstable_state == "W":
1075 msg = f"Piuparts check waiting for test results - {url_html}"
1076 result = PolicyVerdict.REJECTED_TEMPORARILY
1077 piuparts_info["test-results"] = "waiting-for-test-results"
1078 else:
1079 msg = f"Piuparts can't test {source_name} (not a blocker) - {url_html}"
1080 piuparts_info["test-results"] = "cannot-be-tested"
1081 result = PolicyVerdict.PASS
1083 if url is not None:
1084 piuparts_info["piuparts-test-url"] = url
1085 if result.is_rejected:
1086 excuse.add_verdict_info(result, msg)
1087 else:
1088 excuse.addinfo(msg)
1090 if result.is_rejected:
1091 assert self.hints is not None
1092 for ignore_hint in self.hints.search(
1093 "ignore-piuparts",
1094 package=source_name,
1095 version=source_data_srcdist.version,
1096 ):
1097 piuparts_info["ignored-piuparts"] = {"issued-by": ignore_hint.user}
1098 result = PolicyVerdict.PASS_HINTED
1099 excuse.addinfo(
1100 f"Piuparts issue ignored as requested by {ignore_hint.user}"
1101 )
1102 break
1104 return result
1106 def _read_piuparts_summary(
1107 self, filename: str, keep_url: bool = True
1108 ) -> dict[str, tuple[str, str]]:
1109 summary: dict[str, tuple[str, str]] = {}
1110 self.logger.info("Loading piuparts report from %s", filename)
1111 with open(filename) as fd: 1111 ↛ exitline 1111 didn't return from function '_read_piuparts_summary' because the return on line 1113 wasn't executed
1112 if os.fstat(fd.fileno()).st_size < 1: 1112 ↛ 1113line 1112 didn't jump to line 1113 because the condition on line 1112 was never true
1113 return summary
1114 data = json.load(fd)
1115 try:
1116 if (
1117 data["_id"] != "Piuparts Package Test Results Summary"
1118 or data["_version"] != "1.0"
1119 ): # pragma: no cover
1120 raise ValueError(
1121 f"Piuparts results in {filename} does not have the correct ID or version"
1122 )
1123 except KeyError as e: # pragma: no cover
1124 raise ValueError(
1125 f"Piuparts results in {filename} is missing id or version field"
1126 ) from e
1127 for source, suite_data in data["packages"].items():
1128 if len(suite_data) != 1: # pragma: no cover
1129 raise ValueError(
1130 f"Piuparts results in {filename}, the source {source} does not have "
1131 "exactly one result set"
1132 )
1133 item = next(iter(suite_data.values()))
1134 state, _, url = item
1135 if not keep_url:
1136 url = None
1137 summary[source] = (state, url)
1139 return summary
1142class DependsPolicy(AbstractBasePolicy):
1143 pkg_universe: "BinaryPackageUniverse"
1144 broken_packages: frozenset["BinaryPackageId"]
1145 all_binaries: dict["BinaryPackageId", "BinaryPackage"]
1146 allow_uninst: dict[str, set[str | None]]
1148 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1149 super().__init__(
1150 "depends",
1151 options,
1152 suite_info,
1153 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1154 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1155 )
1156 self.nobreakall_arches = None
1157 self.new_arches = None
1158 self.break_arches = None
1160 def initialise(self, britney: "Britney") -> None:
1161 super().initialise(britney)
1162 self.pkg_universe = britney.pkg_universe
1163 self.broken_packages = self.pkg_universe.broken_packages
1164 self.all_binaries = britney.all_binaries
1165 self.nobreakall_arches = self.options.nobreakall_arches
1166 self.new_arches = self.options.new_arches
1167 self.break_arches = self.options.break_arches
1168 self.allow_uninst = britney.allow_uninst
1170 def apply_srcarch_policy_impl(
1171 self,
1172 deps_info: dict[str, Any],
1173 item: MigrationItem,
1174 arch: str,
1175 source_data_tdist: SourcePackage | None,
1176 source_data_srcdist: SourcePackage,
1177 excuse: "Excuse",
1178 ) -> PolicyVerdict:
1179 verdict = PolicyVerdict.PASS
1181 assert self.break_arches is not None
1182 assert self.new_arches is not None
1183 if arch in self.break_arches or arch in self.new_arches:
1184 # we don't check these in the policy (TODO - for now?)
1185 return verdict
1187 source_suite = item.suite
1188 target_suite = self.suite_info.target_suite
1190 packages_s_a = source_suite.binaries[arch]
1191 packages_t_a = target_suite.binaries[arch]
1193 my_bins = sorted(filter_out_faux(excuse.packages[arch]))
1195 arch_all_installable = set()
1196 arch_arch_installable = set()
1197 consider_it_regression = True
1199 for pkg_id in my_bins:
1200 pkg_name = pkg_id.package_name
1201 binary_u = packages_s_a[pkg_name]
1202 pkg_arch = binary_u.architecture
1204 # in some cases, we want to track the uninstallability of a
1205 # package (because the autopkgtest policy uses this), but we still
1206 # want to allow the package to be uninstallable
1207 skip_dep_check = False
1209 if binary_u.source_version != source_data_srcdist.version:
1210 # don't check cruft in unstable
1211 continue
1213 if item.architecture != "source" and pkg_arch == "all":
1214 # we don't care about the existing arch: all binaries when
1215 # checking a binNMU item, because the arch: all binaries won't
1216 # migrate anyway
1217 skip_dep_check = True
1219 if pkg_arch == "all" and arch not in self.nobreakall_arches:
1220 skip_dep_check = True
1222 if pkg_name in self.allow_uninst[arch]: 1222 ↛ 1225line 1222 didn't jump to line 1225 because the condition on line 1222 was never true
1223 # this binary is allowed to become uninstallable, so we don't
1224 # need to check anything
1225 skip_dep_check = True
1227 if pkg_name in packages_t_a:
1228 oldbin = packages_t_a[pkg_name]
1229 if not target_suite.is_installable(oldbin.pkg_id):
1230 # as the current binary in testing is already
1231 # uninstallable, the newer version is allowed to be
1232 # uninstallable as well, so we don't need to check
1233 # anything
1234 skip_dep_check = True
1235 consider_it_regression = False
1237 if pkg_id in self.broken_packages:
1238 if pkg_arch == "all":
1239 arch_all_installable.add(False)
1240 else:
1241 arch_arch_installable.add(False)
1242 # dependencies can't be satisfied by all the known binaries -
1243 # this certainly won't work...
1244 excuse.add_unsatisfiable_on_arch(arch)
1245 if skip_dep_check:
1246 # ...but if the binary is allowed to become uninstallable,
1247 # we don't care
1248 # we still want the binary to be listed as uninstallable,
1249 continue
1250 verdict = PolicyVerdict.REJECTED_PERMANENTLY
1251 if pkg_name.endswith("-faux-build-depends"): 1251 ↛ 1252line 1251 didn't jump to line 1252 because the condition on line 1251 was never true
1252 name = pkg_name.replace("-faux-build-depends", "")
1253 excuse.add_verdict_info(
1254 verdict,
1255 f"src:{name} has unsatisfiable build dependency",
1256 )
1257 else:
1258 excuse.add_verdict_info(
1259 verdict, f"{pkg_name}/{arch} has unsatisfiable dependency"
1260 )
1261 excuse.addreason("depends")
1262 else:
1263 if pkg_arch == "all":
1264 arch_all_installable.add(True)
1265 else:
1266 arch_arch_installable.add(True)
1268 if skip_dep_check:
1269 continue
1271 deps = self.pkg_universe.dependencies_of(pkg_id)
1273 for dep in deps:
1274 # dep is a list of packages, each of which satisfy the
1275 # dependency
1277 if dep == frozenset():
1278 continue
1279 is_ok = False
1280 needed_for_dep = set()
1282 for alternative in dep:
1283 if target_suite.is_pkg_in_the_suite(alternative):
1284 # dep can be satisfied in testing - ok
1285 is_ok = True
1286 elif alternative in my_bins:
1287 # can be satisfied by binary from same item: will be
1288 # ok if item migrates
1289 is_ok = True
1290 else:
1291 needed_for_dep.add(alternative)
1293 if not is_ok:
1294 spec = DependencySpec(DependencyType.DEPENDS, arch)
1295 excuse.add_package_depends(spec, needed_for_dep)
1297 # The autopkgtest policy needs delicate trade offs for
1298 # non-installability. The current choice (considering source
1299 # migration and only binaries built by the version of the
1300 # source):
1301 #
1302 # * Run autopkgtest if all arch:$arch binaries are installable
1303 # (but some or all arch:all binaries are not)
1304 #
1305 # * Don't schedule nor wait for not installable arch:all only package
1306 # on ! NOBREAKALL_ARCHES
1307 #
1308 # * Run autopkgtest if installability isn't a regression (there are (or
1309 # rather, should) not be a lot of packages in this state, and most
1310 # likely they'll just fail quickly)
1311 #
1312 # * Don't schedule, but wait otherwise
1313 if arch_arch_installable == {True} and False in arch_all_installable:
1314 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch)
1315 elif (
1316 arch not in self.nobreakall_arches
1317 and arch_arch_installable == set()
1318 and False in arch_all_installable
1319 ):
1320 deps_info.setdefault("arch_all_not_installable", []).append(arch)
1321 elif not consider_it_regression:
1322 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch)
1324 return verdict
1327@unique
1328class BuildDepResult(IntEnum):
1329 # relation is satisfied in target
1330 OK = 1
1331 # relation can be satisfied by other packages in source
1332 DEPENDS = 2
1333 # relation cannot be satisfied
1334 FAILED = 3
1337class BuildDependsPolicy(AbstractBasePolicy):
1339 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1340 super().__init__(
1341 "build-depends",
1342 options,
1343 suite_info,
1344 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1345 )
1346 self._all_buildarch: list[str] = []
1348 parse_option(options, "all_buildarch")
1350 def initialise(self, britney: "Britney") -> None:
1351 super().initialise(britney)
1352 if self.options.all_buildarch:
1353 self._all_buildarch = SuiteContentLoader.config_str_as_list(
1354 self.options.all_buildarch, []
1355 )
1357 def apply_src_policy_impl(
1358 self,
1359 build_deps_info: dict[str, Any],
1360 item: MigrationItem,
1361 source_data_tdist: SourcePackage | None,
1362 source_data_srcdist: SourcePackage,
1363 excuse: "Excuse",
1364 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers,
1365 ) -> PolicyVerdict:
1366 verdict = PolicyVerdict.PASS
1368 # analyze the dependency fields (if present)
1369 if deps := source_data_srcdist.build_deps_arch:
1370 v = self._check_build_deps(
1371 deps,
1372 DependencyType.BUILD_DEPENDS,
1373 build_deps_info,
1374 item,
1375 source_data_tdist,
1376 source_data_srcdist,
1377 excuse,
1378 get_dependency_solvers=get_dependency_solvers,
1379 )
1380 verdict = PolicyVerdict.worst_of(verdict, v)
1382 if ideps := source_data_srcdist.build_deps_indep:
1383 v = self._check_build_deps(
1384 ideps,
1385 DependencyType.BUILD_DEPENDS_INDEP,
1386 build_deps_info,
1387 item,
1388 source_data_tdist,
1389 source_data_srcdist,
1390 excuse,
1391 get_dependency_solvers=get_dependency_solvers,
1392 )
1393 verdict = PolicyVerdict.worst_of(verdict, v)
1395 return verdict
1397 def _get_check_archs(
1398 self, archs: Container[str], dep_type: DependencyType
1399 ) -> list[str]:
1400 oos = self.options.outofsync_arches
1402 if dep_type == DependencyType.BUILD_DEPENDS:
1403 return [
1404 arch
1405 for arch in self.options.architectures
1406 if arch in archs and arch not in oos
1407 ]
1409 # first try the all buildarch
1410 checkarchs = list(self._all_buildarch)
1411 # then try the architectures where this source has arch specific
1412 # binaries (in the order of the architecture config file)
1413 checkarchs.extend(
1414 arch
1415 for arch in self.options.architectures
1416 if arch in archs and arch not in checkarchs
1417 )
1418 # then try all other architectures
1419 checkarchs.extend(
1420 arch for arch in self.options.architectures if arch not in checkarchs
1421 )
1423 # and drop OUTOFSYNC_ARCHES
1424 return [arch for arch in checkarchs if arch not in oos]
1426 def _add_info_for_arch(
1427 self,
1428 arch: str,
1429 excuses_info: dict[str, list[str]],
1430 blockers: dict[str, set[BinaryPackageId]],
1431 results: dict[str, BuildDepResult],
1432 dep_type: DependencyType,
1433 target_suite: TargetSuite,
1434 source_suite: Suite,
1435 excuse: "Excuse",
1436 verdict: PolicyVerdict,
1437 ) -> PolicyVerdict:
1438 if arch in blockers:
1439 packages = blockers[arch]
1441 # for the solving packages, update the excuse to add the dependencies
1442 for p in packages:
1443 if arch not in self.options.break_arches: 1443 ↛ 1442line 1443 didn't jump to line 1442 because the condition on line 1443 was always true
1444 spec = DependencySpec(dep_type, arch)
1445 excuse.add_package_depends(spec, {p})
1447 if arch in results and results[arch] == BuildDepResult.FAILED:
1448 verdict = PolicyVerdict.worst_of(
1449 verdict, PolicyVerdict.REJECTED_PERMANENTLY
1450 )
1452 if arch in excuses_info:
1453 for excuse_text in excuses_info[arch]:
1454 if verdict.is_rejected: 1454 ↛ 1457line 1454 didn't jump to line 1457 because the condition on line 1454 was always true
1455 excuse.add_verdict_info(verdict, excuse_text)
1456 else:
1457 excuse.addinfo(excuse_text)
1459 return verdict
1461 def _check_build_deps(
1462 self,
1463 deps: str,
1464 dep_type: DependencyType,
1465 build_deps_info: dict[str, Any],
1466 item: MigrationItem,
1467 source_data_tdist: SourcePackage | None,
1468 source_data_srcdist: SourcePackage,
1469 excuse: "Excuse",
1470 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers,
1471 ) -> PolicyVerdict:
1472 verdict = PolicyVerdict.PASS
1473 any_arch_ok = dep_type == DependencyType.BUILD_DEPENDS_INDEP
1475 britney = self.britney
1477 # local copies for better performance
1478 parse_src_depends = apt_pkg.parse_src_depends
1480 source_name = item.package
1481 source_suite = item.suite
1482 target_suite = self.suite_info.target_suite
1483 binaries_s = source_suite.binaries
1484 provides_s = source_suite.provides_table
1485 binaries_t = target_suite.binaries
1486 provides_t = target_suite.provides_table
1487 unsat_bd: dict[str, list[str]] = {}
1488 relevant_archs: set[str] = {
1489 binary.architecture
1490 for binary in filter_out_faux(source_data_srcdist.binaries)
1491 if britney.all_binaries[binary].architecture != "all"
1492 }
1494 excuses_info: dict[str, list[str]] = defaultdict(list)
1495 blockers: dict[str, set[BinaryPackageId]] = defaultdict(set)
1496 arch_results = {}
1497 result_archs = defaultdict(list)
1498 bestresult = BuildDepResult.FAILED
1499 check_archs = self._get_check_archs(relevant_archs, dep_type)
1500 if not check_archs:
1501 # when the arch list is empty, we check the b-d on any arch, instead of all archs
1502 # this happens for Build-Depens on a source package that only produces arch: all binaries
1503 any_arch_ok = True
1504 check_archs = self._get_check_archs(
1505 self.options.architectures, DependencyType.BUILD_DEPENDS_INDEP
1506 )
1508 for arch in check_archs:
1509 # retrieve the binary package from the specified suite and arch
1510 binaries_s_a = binaries_s[arch]
1511 provides_s_a = provides_s[arch]
1512 binaries_t_a = binaries_t[arch]
1513 provides_t_a = provides_t[arch]
1514 arch_results[arch] = BuildDepResult.OK
1515 # for every dependency block (formed as conjunction of disjunction)
1516 for block_txt in deps.split(","):
1517 block_list = parse_src_depends(block_txt, False, arch)
1518 # Unlike regular dependencies, some clauses of the Build-Depends(-Arch|-Indep) can be
1519 # filtered out by (e.g.) architecture restrictions. We need to cope with this while
1520 # keeping block_txt and block aligned.
1521 if not block_list:
1522 # Relation is not relevant for this architecture.
1523 continue
1524 block = block_list[0]
1525 # if the block is satisfied in the target suite, then skip the block
1526 if get_dependency_solvers(
1527 block, binaries_t_a, provides_t_a, build_depends=True
1528 ):
1529 # Satisfied in the target suite; all ok.
1530 continue
1532 # check if the block can be satisfied in the source suite, and list the solving packages
1533 packages = get_dependency_solvers(
1534 block, binaries_s_a, provides_s_a, build_depends=True
1535 )
1536 sources = sorted(p.source for p in packages)
1538 # if the dependency can be satisfied by the same source package, skip the block:
1539 # obviously both binary packages will enter the target suite together
1540 if source_name in sources: 1540 ↛ 1541line 1540 didn't jump to line 1541 because the condition on line 1540 was never true
1541 continue
1543 # if no package can satisfy the dependency, add this information to the excuse
1544 if not packages:
1545 excuses_info[arch].append(
1546 "%s unsatisfiable %s on %s: %s"
1547 % (source_name, dep_type, arch, block_txt.strip())
1548 )
1549 if arch not in unsat_bd: 1549 ↛ 1551line 1549 didn't jump to line 1551 because the condition on line 1549 was always true
1550 unsat_bd[arch] = []
1551 unsat_bd[arch].append(block_txt.strip())
1552 arch_results[arch] = BuildDepResult.FAILED
1553 continue
1555 blockers[arch].update(p.pkg_id for p in packages)
1556 if arch_results[arch] < BuildDepResult.DEPENDS:
1557 arch_results[arch] = BuildDepResult.DEPENDS
1559 if any_arch_ok:
1560 if arch_results[arch] < bestresult:
1561 bestresult = arch_results[arch]
1562 result_archs[arch_results[arch]].append(arch)
1563 if bestresult == BuildDepResult.OK:
1564 # we found an architecture where the b-deps-indep are
1565 # satisfied in the target suite, so we can stop
1566 break
1568 if any_arch_ok:
1569 arch = result_archs[bestresult][0]
1570 excuse.add_detailed_info(f"Checking {dep_type.get_description()} on {arch}")
1571 key = "check-%s-on-arch" % dep_type.get_reason()
1572 build_deps_info[key] = arch
1573 verdict = self._add_info_for_arch(
1574 arch,
1575 excuses_info,
1576 blockers,
1577 arch_results,
1578 dep_type,
1579 target_suite,
1580 source_suite,
1581 excuse,
1582 verdict,
1583 )
1585 else:
1586 for arch in check_archs:
1587 verdict = self._add_info_for_arch(
1588 arch,
1589 excuses_info,
1590 blockers,
1591 arch_results,
1592 dep_type,
1593 target_suite,
1594 source_suite,
1595 excuse,
1596 verdict,
1597 )
1599 if unsat_bd:
1600 build_deps_info["unsatisfiable-arch-build-depends"] = unsat_bd
1602 return verdict
1605class BuiltUsingPolicy(AbstractBasePolicy):
1606 """Built-Using policy
1608 Binaries that incorporate (part of) another source package must list these
1609 sources under 'Built-Using'.
1611 This policy checks if the corresponding sources are available in the
1612 target suite. If they are not, but they are candidates for migration, a
1613 dependency is added.
1615 If the binary incorporates a newer version of a source, that is not (yet)
1616 a candidate, we don't want to accept that binary. A rebuild later in the
1617 primary suite wouldn't fix the issue, because that would incorporate the
1618 newer version again.
1620 If the binary incorporates an older version of the source, a newer version
1621 will be accepted as a replacement. We assume that this can be fixed by
1622 rebuilding the binary at some point during the development cycle.
1624 Requiring exact version of the source would not be useful in practice. A
1625 newer upload of that source wouldn't be blocked by this policy, so the
1626 built-using would be outdated anyway.
1628 """
1630 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1631 super().__init__(
1632 "built-using",
1633 options,
1634 suite_info,
1635 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1636 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1637 )
1639 def initialise(self, britney: "Britney") -> None:
1640 super().initialise(britney)
1642 def apply_srcarch_policy_impl(
1643 self,
1644 build_deps_info: dict[str, Any],
1645 item: MigrationItem,
1646 arch: str,
1647 source_data_tdist: SourcePackage | None,
1648 source_data_srcdist: SourcePackage,
1649 excuse: "Excuse",
1650 ) -> PolicyVerdict:
1651 verdict = PolicyVerdict.PASS
1653 source_suite = item.suite
1654 target_suite = self.suite_info.target_suite
1655 binaries_s = source_suite.binaries
1657 def check_bu_in_suite(
1658 bu_source: str, bu_version: str, source_suite: Suite
1659 ) -> bool:
1660 found = False
1661 if bu_source not in source_suite.sources:
1662 return found
1663 s_source = source_suite.sources[bu_source]
1664 s_ver = s_source.version
1665 if apt_pkg.version_compare(s_ver, bu_version) >= 0:
1666 found = True
1667 dep = PackageId(bu_source, s_ver, "source")
1668 if arch in self.options.break_arches:
1669 excuse.add_detailed_info(
1670 "Ignoring Built-Using for %s/%s on %s"
1671 % (pkg_name, arch, dep.uvname)
1672 )
1673 else:
1674 spec = DependencySpec(DependencyType.BUILT_USING, arch)
1675 excuse.add_package_depends(spec, {dep})
1676 excuse.add_detailed_info(
1677 f"{pkg_name}/{arch} has Built-Using on {dep.uvname}"
1678 )
1680 return found
1682 for pkg_id in sorted(
1683 x
1684 for x in filter_out_faux(source_data_srcdist.binaries)
1685 if x.architecture == arch
1686 ):
1687 pkg_name = pkg_id.package_name
1689 # retrieve the testing (if present) and unstable corresponding binary packages
1690 binary_s = binaries_s[arch][pkg_name]
1692 for bu in binary_s.builtusing:
1693 bu_source = bu[0]
1694 bu_version = bu[1]
1695 found = False
1696 if bu_source in target_suite.sources:
1697 t_source = target_suite.sources[bu_source]
1698 t_ver = t_source.version
1699 if apt_pkg.version_compare(t_ver, bu_version) >= 0:
1700 found = True
1702 if not found:
1703 found = check_bu_in_suite(bu_source, bu_version, source_suite)
1705 if not found and source_suite.suite_class.is_additional_source:
1706 found = check_bu_in_suite(
1707 bu_source, bu_version, self.suite_info.primary_source_suite
1708 )
1710 if not found:
1711 if arch in self.options.break_arches:
1712 excuse.add_detailed_info(
1713 "Ignoring unsatisfiable Built-Using for %s/%s on %s %s"
1714 % (pkg_name, arch, bu_source, bu_version)
1715 )
1716 else:
1717 verdict = PolicyVerdict.worst_of(
1718 verdict, PolicyVerdict.REJECTED_PERMANENTLY
1719 )
1720 excuse.add_verdict_info(
1721 verdict,
1722 "%s/%s has unsatisfiable Built-Using on %s %s"
1723 % (pkg_name, arch, bu_source, bu_version),
1724 )
1726 return verdict
1729class BlockPolicy(AbstractBasePolicy):
1730 BLOCK_HINT_REGEX = re.compile("^(un)?(block-?.*)$")
1732 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1733 super().__init__(
1734 "block",
1735 options,
1736 suite_info,
1737 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1738 )
1739 self._blockall: dict[str | None, Hint] = {}
1741 def initialise(self, britney: "Britney") -> None:
1742 super().initialise(britney)
1743 assert self.hints is not None
1744 for hint in self.hints.search(type="block-all"):
1745 self._blockall[hint.package] = hint
1747 self._key_packages = []
1748 if "key" in self._blockall:
1749 self._key_packages = self._read_key_packages()
1751 def _read_key_packages(self) -> list[str]:
1752 """Read the list of key packages
1754 The file contains data in the yaml format :
1756 - reason: <something>
1757 source: <package>
1759 The method returns a list of all key packages.
1760 """
1761 filename = os.path.join(self.state_dir, "key_packages.yaml")
1762 self.logger.info("Loading key packages from %s", filename)
1763 if os.path.exists(filename): 1763 ↛ 1768line 1763 didn't jump to line 1768 because the condition on line 1763 was always true
1764 with open(filename) as f:
1765 data = yaml.safe_load(f)
1766 key_packages = [item["source"] for item in data]
1767 else:
1768 self.logger.error(
1769 "Britney was asked to block key packages, "
1770 + "but no key_packages.yaml file was found."
1771 )
1772 sys.exit(1)
1774 return key_packages
1776 def register_hints(self, hint_parser: HintParser) -> None:
1777 # block related hints are currently defined in hint.py
1778 pass
1780 def _check_blocked(
1781 self, item: "MigrationItem", arch: str, version: str, excuse: "Excuse"
1782 ) -> PolicyVerdict:
1783 verdict = PolicyVerdict.PASS
1784 blocked = {}
1785 unblocked = {}
1786 block_info = {}
1787 source_suite = item.suite
1788 suite_name = source_suite.name
1789 src = item.package
1790 is_primary = source_suite.suite_class == SuiteClass.PRIMARY_SOURCE_SUITE
1792 tooltip = (
1793 "please contact %s-release if update is needed" % self.options.distribution
1794 )
1796 assert self.hints is not None
1797 shints = self.hints.search(package=src)
1798 mismatches = False
1799 r = self.BLOCK_HINT_REGEX
1800 for hint in shints:
1801 m = r.match(hint.type)
1802 if m:
1803 if m.group(1) == "un":
1804 assert hint.suite is not None
1805 if (
1806 hint.version != version
1807 or hint.suite.name != suite_name
1808 or (hint.architecture != arch and hint.architecture != "source")
1809 ):
1810 self.logger.info(
1811 "hint mismatch: %s %s %s", version, arch, suite_name
1812 )
1813 mismatches = True
1814 else:
1815 unblocked[m.group(2)] = hint.user
1816 excuse.add_hint(hint)
1817 else:
1818 # block(-*) hint: only accepts a source, so this will
1819 # always match
1820 blocked[m.group(2)] = hint.user
1821 excuse.add_hint(hint)
1823 if "block" not in blocked and is_primary:
1824 # if there is a specific block hint for this package, we don't
1825 # check for the general hints
1827 if self.options.distribution == "debian": 1827 ↛ 1834line 1827 didn't jump to line 1834 because the condition on line 1827 was always true
1828 url = "https://release.debian.org/testing/freeze_policy.html"
1829 tooltip = (
1830 'Follow the <a href="%s">freeze policy</a> when applying for an unblock'
1831 % url
1832 )
1834 if "source" in self._blockall:
1835 blocked["block"] = self._blockall["source"].user
1836 excuse.add_hint(self._blockall["source"])
1837 elif (
1838 "new-source" in self._blockall
1839 and src not in self.suite_info.target_suite.sources
1840 ):
1841 blocked["block"] = self._blockall["new-source"].user
1842 excuse.add_hint(self._blockall["new-source"])
1843 # no tooltip: new sources will probably not be accepted anyway
1844 block_info["block"] = "blocked by {}: is not in {}".format(
1845 self._blockall["new-source"].user,
1846 self.suite_info.target_suite.name,
1847 )
1848 elif "key" in self._blockall and src in self._key_packages:
1849 blocked["block"] = self._blockall["key"].user
1850 excuse.add_hint(self._blockall["key"])
1851 block_info["block"] = "blocked by {}: is a key package ({})".format(
1852 self._blockall["key"].user,
1853 tooltip,
1854 )
1855 elif "no-autopkgtest" in self._blockall:
1856 if excuse.autopkgtest_results == {"PASS"}:
1857 if not blocked: 1857 ↛ 1883line 1857 didn't jump to line 1883 because the condition on line 1857 was always true
1858 excuse.addinfo("not blocked: has successful autopkgtest")
1859 else:
1860 blocked["block"] = self._blockall["no-autopkgtest"].user
1861 excuse.add_hint(self._blockall["no-autopkgtest"])
1862 if not excuse.autopkgtest_results:
1863 block_info["block"] = (
1864 "blocked by %s: does not have autopkgtest (%s)"
1865 % (
1866 self._blockall["no-autopkgtest"].user,
1867 tooltip,
1868 )
1869 )
1870 else:
1871 block_info["block"] = (
1872 "blocked by %s: autopkgtest not fully successful (%s)"
1873 % (
1874 self._blockall["no-autopkgtest"].user,
1875 tooltip,
1876 )
1877 )
1879 elif not is_primary:
1880 blocked["block"] = suite_name
1881 excuse.needs_approval = True
1883 for block_cmd in blocked:
1884 unblock_cmd = "un" + block_cmd
1885 if block_cmd in unblocked:
1886 if is_primary or block_cmd == "block-udeb":
1887 excuse.addinfo(
1888 "Ignoring %s request by %s, due to %s request by %s"
1889 % (
1890 block_cmd,
1891 blocked[block_cmd],
1892 unblock_cmd,
1893 unblocked[block_cmd],
1894 )
1895 )
1896 else:
1897 excuse.addinfo("Approved by %s" % (unblocked[block_cmd]))
1898 else:
1899 verdict = PolicyVerdict.REJECTED_NEEDS_APPROVAL
1900 if is_primary or block_cmd == "block-udeb":
1901 # redirect people to d-i RM for udeb things:
1902 if block_cmd == "block-udeb":
1903 tooltip = "please contact the d-i release manager if an update is needed"
1904 if block_cmd in block_info:
1905 info = block_info[block_cmd]
1906 else:
1907 info = (
1908 "Not touching package due to {} request by {} ({})".format(
1909 block_cmd,
1910 blocked[block_cmd],
1911 tooltip,
1912 )
1913 )
1914 excuse.add_verdict_info(verdict, info)
1915 else:
1916 excuse.add_verdict_info(verdict, "NEEDS APPROVAL BY RM")
1917 excuse.addreason("block")
1918 if mismatches:
1919 excuse.add_detailed_info(
1920 "Some hints for %s do not match this item" % src
1921 )
1922 return verdict
1924 def apply_src_policy_impl(
1925 self,
1926 block_info: dict[str, Any],
1927 item: MigrationItem,
1928 source_data_tdist: SourcePackage | None,
1929 source_data_srcdist: SourcePackage,
1930 excuse: "Excuse",
1931 ) -> PolicyVerdict:
1932 return self._check_blocked(item, "source", source_data_srcdist.version, excuse)
1934 def apply_srcarch_policy_impl(
1935 self,
1936 block_info: dict[str, Any],
1937 item: MigrationItem,
1938 arch: str,
1939 source_data_tdist: SourcePackage | None,
1940 source_data_srcdist: SourcePackage,
1941 excuse: "Excuse",
1942 ) -> PolicyVerdict:
1943 return self._check_blocked(item, arch, source_data_srcdist.version, excuse)
1946class BuiltOnBuilddPolicy(AbstractBasePolicy):
1948 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1949 super().__init__(
1950 "builtonbuildd",
1951 options,
1952 suite_info,
1953 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1954 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1955 )
1956 self._builtonbuildd: dict[str, Any] = {
1957 "signerinfo": None,
1958 }
1960 def register_hints(self, hint_parser: HintParser) -> None:
1961 hint_parser.register_hint_type(
1962 HintType(
1963 "allow-archall-maintainer-upload",
1964 versioned=HintAnnotate.FORBIDDEN,
1965 )
1966 )
1968 def initialise(self, britney: "Britney") -> None:
1969 super().initialise(britney)
1970 try:
1971 filename_signerinfo = os.path.join(self.state_dir, "signers.json")
1972 except AttributeError as e: # pragma: no cover
1973 raise RuntimeError(
1974 "Please set STATE_DIR in the britney configuration"
1975 ) from e
1976 self._builtonbuildd["signerinfo"] = self._read_signerinfo(filename_signerinfo)
1978 def apply_srcarch_policy_impl(
1979 self,
1980 buildd_info: dict[str, Any],
1981 item: MigrationItem,
1982 arch: str,
1983 source_data_tdist: SourcePackage | None,
1984 source_data_srcdist: SourcePackage,
1985 excuse: "Excuse",
1986 ) -> PolicyVerdict:
1987 verdict = PolicyVerdict.PASS
1988 signers = self._builtonbuildd["signerinfo"]
1990 if "signed-by" not in buildd_info:
1991 buildd_info["signed-by"] = {}
1993 source_suite = item.suite
1995 # horrible hard-coding, but currently, we don't keep track of the
1996 # component when loading the packages files
1997 component = "main"
1998 # we use the source component, because a binary in contrib can
1999 # belong to a source in main
2000 section = source_data_srcdist.section
2001 if section.find("/") > -1:
2002 component = section.split("/")[0]
2004 packages_s_a = source_suite.binaries[arch]
2005 assert self.hints is not None
2007 for pkg_id in sorted(
2008 x
2009 for x in filter_out_faux(source_data_srcdist.binaries)
2010 if x.architecture == arch
2011 ):
2012 pkg_name = pkg_id.package_name
2013 binary_u = packages_s_a[pkg_name]
2014 pkg_arch = binary_u.architecture
2016 if binary_u.source_version != source_data_srcdist.version: 2016 ↛ 2017line 2016 didn't jump to line 2017 because the condition on line 2016 was never true
2017 continue
2019 if item.architecture != "source" and pkg_arch == "all":
2020 # we don't care about the existing arch: all binaries when
2021 # checking a binNMU item, because the arch: all binaries won't
2022 # migrate anyway
2023 continue
2025 signer = None
2026 uid = None
2027 uidinfo = ""
2028 buildd_ok = False
2029 failure_verdict = PolicyVerdict.REJECTED_PERMANENTLY
2030 try:
2031 signer = signers[pkg_name][pkg_id.version][pkg_arch]
2032 if signer["buildd"]:
2033 buildd_ok = True
2034 uid = signer["uid"]
2035 uidinfo = f"arch {pkg_arch} binaries uploaded by {uid}"
2036 except KeyError:
2037 self.logger.info(
2038 "signer info for %s %s (%s) on %s not found "
2039 % (pkg_name, binary_u.version, pkg_arch, arch)
2040 )
2041 uidinfo = "upload info for arch %s binaries not found" % (pkg_arch)
2042 failure_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT
2043 if not buildd_ok:
2044 if component != "main":
2045 if not buildd_ok and pkg_arch not in buildd_info["signed-by"]: 2045 ↛ 2049line 2045 didn't jump to line 2049 because the condition on line 2045 was always true
2046 excuse.add_detailed_info(
2047 f"{uidinfo}, but package in {component}"
2048 )
2049 buildd_ok = True
2050 elif pkg_arch == "all":
2051 allow_hints = self.hints.search(
2052 "allow-archall-maintainer-upload", package=item.package
2053 )
2054 if allow_hints:
2055 buildd_ok = True
2056 verdict = PolicyVerdict.worst_of(
2057 verdict, PolicyVerdict.PASS_HINTED
2058 )
2059 if pkg_arch not in buildd_info["signed-by"]:
2060 excuse.addinfo(
2061 "%s, but whitelisted by %s"
2062 % (uidinfo, allow_hints[0].user)
2063 )
2064 if not buildd_ok:
2065 verdict = failure_verdict
2066 if pkg_arch not in buildd_info["signed-by"]:
2067 if pkg_arch == "all":
2068 uidinfo += (
2069 ", a new source-only upload is needed to allow migration"
2070 )
2071 excuse.add_verdict_info(
2072 verdict, "Not built on buildd: %s" % (uidinfo)
2073 )
2075 if ( 2075 ↛ 2079line 2075 didn't jump to line 2079
2076 pkg_arch in buildd_info["signed-by"]
2077 and buildd_info["signed-by"][pkg_arch] != uid
2078 ):
2079 self.logger.info(
2080 "signer mismatch for %s (%s %s) on %s: %s, while %s already listed"
2081 % (
2082 pkg_name,
2083 binary_u.source,
2084 binary_u.source_version,
2085 pkg_arch,
2086 uid,
2087 buildd_info["signed-by"][pkg_arch],
2088 )
2089 )
2091 buildd_info["signed-by"][pkg_arch] = uid
2093 return verdict
2095 def _read_signerinfo(self, filename: str) -> dict[str, Any]:
2096 signerinfo: dict[str, Any] = {}
2097 self.logger.info("Loading signer info from %s", filename)
2098 with open(filename) as fd: 2098 ↛ exitline 2098 didn't return from function '_read_signerinfo' because the return on line 2100 wasn't executed
2099 if os.fstat(fd.fileno()).st_size < 1: 2099 ↛ 2100line 2099 didn't jump to line 2100 because the condition on line 2099 was never true
2100 return signerinfo
2101 signerinfo = json.load(fd)
2103 return signerinfo
2106class ImplicitDependencyPolicy(AbstractBasePolicy):
2107 """Implicit Dependency policy
2109 Upgrading a package pkg-a can break the installability of a package pkg-b.
2110 A newer version (or the removal) of pkg-b might fix the issue. In that
2111 case, pkg-a has an 'implicit dependency' on pkg-b, because pkg-a can only
2112 migrate if pkg-b also migrates.
2114 This policy tries to discover a few common cases, and adds the relevant
2115 info to the excuses. If another item is needed to fix the
2116 uninstallability, a dependency is added. If no newer item can fix it, this
2117 excuse will be blocked.
2119 Note that the migration step will check the installability of every
2120 package, so this policy doesn't need to handle every corner case. It
2121 must, however, make sure that no excuse is unnecessarily blocked.
2123 Some cases that should be detected by this policy:
2125 * pkg-a is upgraded from 1.0-1 to 2.0-1, while
2126 pkg-b has "Depends: pkg-a (<< 2.0)"
2127 This typically happens if pkg-b has a strict dependency on pkg-a because
2128 it uses some non-stable internal interface (examples are glibc,
2129 binutils, python3-defaults, ...)
2131 * pkg-a is upgraded from 1.0-1 to 2.0-1, and
2132 pkg-a 1.0-1 has "Provides: provides-1",
2133 pkg-a 2.0-1 has "Provides: provides-2",
2134 pkg-b has "Depends: provides-1"
2135 This typically happens when pkg-a has an interface that changes between
2136 versions, and a virtual package is used to identify the version of this
2137 interface (e.g. perl-api-x.y)
2139 """
2141 _pkg_universe: "BinaryPackageUniverse"
2142 _all_binaries: dict["BinaryPackageId", "BinaryPackage"]
2143 _allow_uninst: dict[str, set[str | None]]
2144 _nobreakall_arches: list[str]
2146 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2147 super().__init__(
2148 "implicit-deps",
2149 options,
2150 suite_info,
2151 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
2152 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
2153 )
2155 def initialise(self, britney: "Britney") -> None:
2156 super().initialise(britney)
2157 self._pkg_universe = britney.pkg_universe
2158 self._all_binaries = britney.all_binaries
2159 self._smooth_updates = britney.options.smooth_updates
2160 self._nobreakall_arches = self.options.nobreakall_arches
2161 self._new_arches = self.options.new_arches
2162 self._break_arches = self.options.break_arches
2163 self._allow_uninst = britney.allow_uninst
2164 self._outofsync_arches = self.options.outofsync_arches
2166 def can_be_removed(self, pkg: BinaryPackage) -> bool:
2167 src = pkg.source
2168 target_suite = self.suite_info.target_suite
2170 # TODO these conditions shouldn't be hardcoded here
2171 # ideally, we would be able to look up excuses to see if the removal
2172 # is in there, but in the current flow, this policy is called before
2173 # all possible excuses exist, so there is no list for us to check
2175 if src not in self.suite_info.primary_source_suite.sources:
2176 # source for pkg not in unstable: candidate for removal
2177 return True
2179 source_t = target_suite.sources[src]
2180 assert self.hints is not None
2181 for hint in self.hints.search("remove", package=src, version=source_t.version):
2182 # removal hint for the source in testing: candidate for removal
2183 return True
2185 if target_suite.is_cruft(pkg):
2186 # if pkg is cruft in testing, removal will be tried
2187 return True
2189 # the case were the newer version of the source no longer includes the
2190 # binary (or includes a cruft version of the binary) will be handled
2191 # separately (in that case there might be an implicit dependency on
2192 # the newer source)
2194 return False
2196 def should_skip_rdep(
2197 self, pkg: BinaryPackage, source_name: str, myarch: str
2198 ) -> bool:
2199 target_suite = self.suite_info.target_suite
2201 if not target_suite.is_pkg_in_the_suite(pkg.pkg_id):
2202 # it is not in the target suite, migration cannot break anything
2203 return True
2205 if pkg.source == source_name:
2206 # if it is built from the same source, it will be upgraded
2207 # with the source
2208 return True
2210 if self.can_be_removed(pkg):
2211 # could potentially be removed, so if that happens, it won't be
2212 # broken
2213 return True
2215 if pkg.architecture == "all" and myarch not in self._nobreakall_arches:
2216 # arch all on non nobreakarch is allowed to become uninstallable
2217 return True
2219 if pkg.pkg_id.package_name in self._allow_uninst[myarch]:
2220 # there is a hint to allow this binary to become uninstallable
2221 return True
2223 if not target_suite.is_installable(pkg.pkg_id):
2224 # it is already uninstallable in the target suite, migration
2225 # cannot break anything
2226 return True
2228 return False
2230 def breaks_installability(
2231 self,
2232 pkg_id_t: BinaryPackageId,
2233 pkg_id_s: BinaryPackageId | None,
2234 pkg_to_check: BinaryPackageId,
2235 ) -> bool:
2236 """
2237 Check if upgrading pkg_id_t to pkg_id_s breaks the installability of
2238 pkg_to_check.
2240 To check if removing pkg_id_t breaks pkg_to_check, set pkg_id_s to
2241 None.
2242 """
2244 pkg_universe = self._pkg_universe
2245 negative_deps = pkg_universe.negative_dependencies_of(pkg_to_check)
2247 for dep in pkg_universe.dependencies_of(pkg_to_check):
2248 if pkg_id_t not in dep:
2249 # this depends doesn't have pkg_id_t as alternative, so
2250 # upgrading pkg_id_t cannot break this dependency clause
2251 continue
2253 # We check all the alternatives for this dependency, to find one
2254 # that can satisfy it when pkg_id_t is upgraded to pkg_id_s
2255 found_alternative = False
2256 for d in dep:
2257 if d in negative_deps:
2258 # If this alternative dependency conflicts with
2259 # pkg_to_check, it cannot be used to satisfy the
2260 # dependency.
2261 # This commonly happens when breaks are added to pkg_id_s.
2262 continue
2264 if d.package_name != pkg_id_t.package_name:
2265 # a binary different from pkg_id_t can satisfy the dep, so
2266 # upgrading pkg_id_t won't break this dependency
2267 found_alternative = True
2268 break
2270 if d != pkg_id_s:
2271 # We want to know the impact of the upgrade of
2272 # pkg_id_t to pkg_id_s. If pkg_id_s migrates to the
2273 # target suite, any other version of this binary will
2274 # not be there, so it cannot satisfy this dependency.
2275 # This includes pkg_id_t, but also other versions.
2276 continue
2278 # pkg_id_s can satisfy the dep
2279 found_alternative = True
2281 if not found_alternative:
2282 return True
2283 return False
2285 def check_upgrade(
2286 self,
2287 pkg_id_t: BinaryPackageId,
2288 pkg_id_s: BinaryPackageId | None,
2289 source_name: str,
2290 myarch: str,
2291 broken_binaries: set[str],
2292 excuse: "Excuse",
2293 ) -> PolicyVerdict:
2294 verdict = PolicyVerdict.PASS
2296 pkg_universe = self._pkg_universe
2297 all_binaries = self._all_binaries
2299 # check all rdeps of the package in testing
2300 rdeps_t = pkg_universe.reverse_dependencies_of(pkg_id_t)
2302 for rdep_pkg in sorted(rdeps_t):
2303 rdep_p = all_binaries[rdep_pkg]
2305 # check some cases where the rdep won't become uninstallable, or
2306 # where we don't care if it does
2307 if self.should_skip_rdep(rdep_p, source_name, myarch):
2308 continue
2310 if not self.breaks_installability(pkg_id_t, pkg_id_s, rdep_pkg):
2311 # if upgrading pkg_id_t to pkg_id_s doesn't break rdep_pkg,
2312 # there is no implicit dependency
2313 continue
2315 # The upgrade breaks the installability of the rdep. We need to
2316 # find out if there is a newer version of the rdep that solves the
2317 # uninstallability. If that is the case, there is an implicit
2318 # dependency. If not, the upgrade will fail.
2320 # check source versions
2321 newer_versions = find_newer_binaries(
2322 self.suite_info, rdep_p, add_source_for_dropped_bin=True
2323 )
2324 good_newer_versions = set()
2325 for npkg, suite in newer_versions:
2326 if npkg.architecture == "source":
2327 # When a newer version of the source package doesn't have
2328 # the binary, we get the source as 'newer version'. In
2329 # this case, the binary will not be uninstallable if the
2330 # newer source migrates, because it is no longer there.
2331 good_newer_versions.add(npkg)
2332 continue
2333 assert isinstance(npkg, BinaryPackageId)
2334 if not self.breaks_installability(pkg_id_t, pkg_id_s, npkg):
2335 good_newer_versions.add(npkg)
2337 if good_newer_versions:
2338 spec = DependencySpec(DependencyType.IMPLICIT_DEPENDENCY, myarch)
2339 excuse.add_package_depends(spec, good_newer_versions)
2340 else:
2341 # no good newer versions: no possible solution
2342 broken_binaries.add(rdep_pkg.name)
2343 if pkg_id_s:
2344 action = "migrating {} to {}".format(
2345 pkg_id_s.name,
2346 self.suite_info.target_suite.name,
2347 )
2348 else:
2349 action = "removing {} from {}".format(
2350 pkg_id_t.name,
2351 self.suite_info.target_suite.name,
2352 )
2353 if rdep_pkg[0].endswith("-faux-build-depends"):
2354 name = rdep_pkg[0].replace("-faux-build-depends", "")
2355 info = f'{action} makes Build-Depends of src:<a href="#{name}">{name}</a> uninstallable'
2356 else:
2357 info = '{0} makes <a href="#{1}">{1}</a> uninstallable'.format(
2358 action, rdep_pkg.name
2359 )
2360 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2361 excuse.add_verdict_info(verdict, info)
2363 return verdict
2365 def apply_srcarch_policy_impl(
2366 self,
2367 implicit_dep_info: dict[str, Any],
2368 item: MigrationItem,
2369 arch: str,
2370 source_data_tdist: SourcePackage | None,
2371 source_data_srcdist: SourcePackage,
2372 excuse: "Excuse",
2373 ) -> PolicyVerdict:
2374 verdict = PolicyVerdict.PASS
2376 if not source_data_tdist:
2377 # this item is not currently in testing: no implicit dependency
2378 return verdict
2380 if excuse.hasreason("missingbuild"):
2381 # if the build is missing, the policy would treat this as if the
2382 # binaries would be removed, which would give incorrect (and
2383 # confusing) info
2384 info = "missing build, not checking implicit dependencies on %s" % (arch)
2385 excuse.add_detailed_info(info)
2386 return verdict
2388 source_suite = item.suite
2389 source_name = item.package
2390 target_suite = self.suite_info.target_suite
2391 all_binaries = self._all_binaries
2393 # we check all binaries for this excuse that are currently in testing
2394 relevant_binaries = [
2395 x
2396 for x in source_data_tdist.binaries
2397 if (arch == "source" or x.architecture == arch)
2398 and x.package_name in target_suite.binaries[x.architecture]
2399 and x.architecture not in self._new_arches
2400 and x.architecture not in self._break_arches
2401 and x.architecture not in self._outofsync_arches
2402 ]
2404 broken_binaries: set[str] = set()
2406 assert self.hints is not None
2407 for pkg_id_t in sorted(relevant_binaries):
2408 mypkg = pkg_id_t.package_name
2409 myarch = pkg_id_t.architecture
2410 binaries_t_a = target_suite.binaries[myarch]
2411 binaries_s_a = source_suite.binaries[myarch]
2413 if target_suite.is_cruft(all_binaries[pkg_id_t]):
2414 # this binary is cruft in testing: it will stay around as long
2415 # as necessary to satisfy dependencies, so we don't need to
2416 # care
2417 continue
2419 if mypkg in binaries_s_a:
2420 mybin = binaries_s_a[mypkg]
2421 pkg_id_s: Optional["BinaryPackageId"] = mybin.pkg_id
2422 if mybin.source != source_name:
2423 # hijack: this is too complicated to check, so we ignore
2424 # it (the migration code will check the installability
2425 # later anyway)
2426 pass
2427 elif mybin.source_version != source_data_srcdist.version:
2428 # cruft in source suite: pretend the binary doesn't exist
2429 pkg_id_s = None
2430 elif pkg_id_t == pkg_id_s:
2431 # same binary (probably arch: all from a binNMU):
2432 # 'upgrading' doesn't change anything, for this binary, so
2433 # it won't break anything
2434 continue
2435 else:
2436 pkg_id_s = None
2438 if not pkg_id_s and is_smooth_update_allowed(
2439 binaries_t_a[mypkg], self._smooth_updates, self.hints
2440 ):
2441 # the binary isn't in the new version (or is cruft there), and
2442 # smooth updates are allowed: the binary can stay around if
2443 # that is necessary to satisfy dependencies, so we don't need
2444 # to check it
2445 continue
2447 if (
2448 not pkg_id_s
2449 and source_data_tdist.version == source_data_srcdist.version
2450 and source_suite.suite_class == SuiteClass.ADDITIONAL_SOURCE_SUITE
2451 and binaries_t_a[mypkg].architecture == "all"
2452 ):
2453 # we're very probably migrating a binNMU built in tpu where the arch:all
2454 # binaries were not copied to it as that's not needed. This policy could
2455 # needlessly block.
2456 continue
2458 v = self.check_upgrade(
2459 pkg_id_t, pkg_id_s, source_name, myarch, broken_binaries, excuse
2460 )
2461 verdict = PolicyVerdict.worst_of(verdict, v)
2463 # each arch is processed separately, so if we already have info from
2464 # other archs, we need to merge the info from this arch
2465 broken_old = set()
2466 if "implicit-deps" not in implicit_dep_info:
2467 implicit_dep_info["implicit-deps"] = {}
2468 else:
2469 broken_old = set(implicit_dep_info["implicit-deps"]["broken-binaries"])
2471 implicit_dep_info["implicit-deps"]["broken-binaries"] = sorted(
2472 broken_old | broken_binaries
2473 )
2475 return verdict
2478class ReverseRemovalPolicy(AbstractBasePolicy):
2479 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2480 super().__init__(
2481 "reverseremoval",
2482 options,
2483 suite_info,
2484 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
2485 )
2487 def register_hints(self, hint_parser: HintParser) -> None:
2488 hint_parser.register_hint_type(HintType("ignore-reverse-remove"))
2490 def initialise(self, britney: "Britney") -> None:
2491 super().initialise(britney)
2493 pkg_universe = britney.pkg_universe
2494 source_suites = britney.suite_info.source_suites
2495 target_suite = britney.suite_info.target_suite
2497 # Build set of the sources of reverse (Build-) Depends
2498 assert self.hints is not None
2499 hints = self.hints.search("remove")
2501 rev_bin: dict[BinaryPackageId, set[str]] = defaultdict(set)
2502 for hint in hints:
2503 for item in hint.packages:
2504 # I think we don't need to look at the target suite
2505 for src_suite in source_suites:
2506 try:
2507 # Explicitly not running filter_out_faux here
2508 my_bins = set(src_suite.sources[item.uvname].binaries)
2509 except KeyError:
2510 continue
2511 compute_reverse_tree(pkg_universe, my_bins)
2512 for this_bin in my_bins:
2513 rev_bin.setdefault(this_bin, set()).add(item.uvname)
2515 rev_src: dict[str, set[str]] = defaultdict(set)
2516 for bin_pkg, reasons in rev_bin.items():
2517 # If the pkg is in the target suite, there's nothing this
2518 # policy wants to do.
2519 if target_suite.is_pkg_in_the_suite(bin_pkg):
2520 continue
2521 that_bin = britney.all_binaries[bin_pkg]
2522 bin_src = that_bin.source + "/" + that_bin.source_version
2523 rev_src.setdefault(bin_src, set()).update(reasons)
2524 self._block_src_for_rm_hint = rev_src
2526 def apply_src_policy_impl(
2527 self,
2528 rev_remove_info: dict[str, Any],
2529 item: MigrationItem,
2530 source_data_tdist: SourcePackage | None,
2531 source_data_srcdist: SourcePackage,
2532 excuse: "Excuse",
2533 ) -> PolicyVerdict:
2534 verdict = PolicyVerdict.PASS
2536 if item.name in self._block_src_for_rm_hint:
2537 reason = ", ".join(sorted(self._block_src_for_rm_hint[item.name]))
2538 assert self.hints is not None
2539 ignore_hints = self.hints.search(
2540 "ignore-reverse-remove", package=item.uvname, version=item.version
2541 )
2542 excuse.addreason("reverseremoval")
2543 if ignore_hints:
2544 excuse.addreason("ignore-reverse-remove")
2545 excuse.addinfo(
2546 "Should block migration because of remove hint for %s, but forced by %s"
2547 % (reason, ignore_hints[0].user)
2548 )
2549 verdict = PolicyVerdict.PASS_HINTED
2550 else:
2551 excuse.addinfo("Remove hint for (transitive) dependency: %s" % reason)
2552 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2554 return verdict
2557class ReproduciblePolicy(AbstractBasePolicy):
2558 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2559 super().__init__(
2560 "reproducible",
2561 options,
2562 suite_info,
2563 {SuiteClass.PRIMARY_SOURCE_SUITE},
2564 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
2565 )
2566 self._reproducible: dict[str, Any] = {
2567 "source": {},
2568 "target": {},
2569 }
2571 # Default values for this policy's options
2572 parse_option(options, "repro_success_bounty", default=0, to_int=True)
2573 parse_option(options, "repro_regression_penalty", default=0, to_int=True)
2574 parse_option(options, "repro_url")
2575 parse_option(options, "repro_retry_url")
2576 parse_option(options, "repro_components")
2578 def register_hints(self, hint_parser: HintParser) -> None:
2579 hint_parser.register_hint_type(
2580 HintType("ignore-reproducible", architectured=HintAnnotate.OPTIONAL)
2581 )
2583 def initialise(self, britney: "Britney") -> None:
2584 super().initialise(britney)
2585 source_suite = self.suite_info.primary_source_suite
2586 target_suite = self.suite_info.target_suite
2587 try:
2588 filename = os.path.join(self.state_dir, "reproducible.json")
2589 except AttributeError as e: # pragma: no cover
2590 raise RuntimeError(
2591 "Please set STATE_DIR in the britney configuration"
2592 ) from e
2594 self._reproducible = self._read_repro_status(
2595 filename,
2596 source={source_suite.name, source_suite.codename},
2597 target={target_suite.name, target_suite.codename},
2598 )
2600 def apply_srcarch_policy_impl(
2601 self,
2602 reproducible_info: dict[str, Any],
2603 item: MigrationItem,
2604 arch: str,
2605 source_data_tdist: SourcePackage | None,
2606 source_data_srcdist: SourcePackage,
2607 excuse: "Excuse",
2608 ) -> PolicyVerdict:
2609 verdict = PolicyVerdict.PASS
2611 # we don't want to apply this policy (yet) on binNMUs
2612 if item.architecture != "source":
2613 return verdict
2615 # we're not supposed to judge on this arch
2616 if arch not in self.options.repro_arches:
2617 return verdict
2619 # bail out if this arch has no packages for this source (not build
2620 # here)
2621 if arch not in excuse.packages:
2622 return verdict
2624 # horrible hard-coding, but currently, we don't keep track of the
2625 # component when loading the packages files
2626 component = "main"
2627 if "/" in (section := source_data_srcdist.section):
2628 component = section.split("/")[0]
2630 if (
2631 self.options.repro_components
2632 and component not in self.options.repro_components
2633 ):
2634 return verdict
2636 source_name = item.package
2637 try:
2638 tar_res = self._reproducible["target"][arch]
2639 src_res = self._reproducible["source"][arch]
2640 except KeyError:
2641 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2642 msg = "No reproducible data available at all for %s" % arch
2643 excuse.add_verdict_info(verdict, msg)
2644 return verdict
2646 if source_data_tdist is None:
2647 target_suite_state = "new"
2648 elif source_name not in tar_res:
2649 target_suite_state = "unknown"
2650 elif tar_res[source_name]["version"] == source_data_tdist.version:
2651 target_suite_state = tar_res[source_name]["status"]
2652 else:
2653 target_suite_state = "stale"
2655 if source_name in src_res and src_res[source_name]["version"] == item.version:
2656 source_suite_state = src_res[source_name]["status"]
2657 else:
2658 source_suite_state = "unknown"
2660 # status of ['E404', 'FTBFS', 'FTBR', 'NFU', 'blacklisted', 'depwait',
2661 # 'reproducible', 'timeout'] with ['new', 'stale', 'unknown']
2662 wait_states = ("E404", "depwait", "stale", "timeout", "unknown")
2663 no_build_states = ("FTBFS", "NFU", "blacklisted")
2665 # if this package doesn't build on this architecture, we don't need to
2666 # judge it
2667 # FTBFS: Fails to build from source on r-b infra
2668 # NFU: the package explicitly doesn't support building on this arch
2669 # blacklisted: per package per arch per suite
2670 if source_suite_state in no_build_states:
2671 return verdict
2672 # Assume depwait in the source suite only are intermittent (might not
2673 # be true, e.g. with new build depends)
2674 if source_suite_state == target_suite_state and target_suite_state == "depwait":
2675 return verdict
2677 if self.options.repro_url:
2678 url = self.options.repro_url.format(package=quote(source_name), arch=arch)
2679 url_html = ' - <a href="%s">info</a>' % url
2680 if self.options.repro_retry_url:
2681 url_html += (
2682 ' <a href="%s">♻ </a>'
2683 % self.options.repro_retry_url.format(
2684 package=quote(source_name), arch=arch
2685 )
2686 )
2687 # When run on multiple archs, the last one "wins"
2688 reproducible_info["reproducible-test-url"] = url
2689 else:
2690 url = None
2691 url_html = ""
2693 eligible_for_bounty = False
2694 if source_suite_state == "reproducible":
2695 verdict = PolicyVerdict.PASS
2696 msg = f"Reproducible on {arch}{url_html}"
2697 reproducible_info.setdefault("test-results", []).append(
2698 "reproducible on %s" % arch
2699 )
2700 eligible_for_bounty = True
2701 elif source_suite_state == "FTBR":
2702 if target_suite_state == "new":
2703 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2704 msg = f"New but not reproducible on {arch}{url_html}"
2705 reproducible_info.setdefault("test-results", []).append(
2706 "new but not reproducible on %s" % arch
2707 )
2708 elif target_suite_state in wait_states:
2709 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2710 msg = "Waiting for reproducibility reference results on {}{}".format(
2711 arch,
2712 url_html,
2713 )
2714 reproducible_info.setdefault("test-results", []).append(
2715 "waiting-for-reference-results on %s" % arch
2716 )
2717 elif target_suite_state == "reproducible":
2718 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2719 msg = f"Reproducibility regression on {arch}{url_html}"
2720 reproducible_info.setdefault("test-results", []).append(
2721 "regression on %s" % arch
2722 )
2723 elif target_suite_state == "FTBR":
2724 verdict = PolicyVerdict.PASS
2725 msg = "Ignoring non-reproducibility on {} (not a regression){}".format(
2726 arch,
2727 url_html,
2728 )
2729 reproducible_info.setdefault("test-results", []).append(
2730 "not reproducible on %s" % arch
2731 )
2732 else:
2733 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2734 msg = "No reference result, but not reproducibility on {}{}".format(
2735 arch,
2736 url_html,
2737 )
2738 reproducible_info.setdefault("test-results", []).append(
2739 f"reference {target_suite_state} on {arch}"
2740 )
2741 elif source_suite_state in wait_states:
2742 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2743 msg = f"Waiting for reproducibility test results on {arch}{url_html}"
2744 reproducible_info.setdefault("test-results", []).append(
2745 "waiting-for-test-results on %s" % arch
2746 )
2747 else:
2748 raise KeyError("Unhandled reproducibility state %s" % source_suite_state)
2750 if verdict.is_rejected:
2751 assert self.hints is not None
2752 for hint_arch in ("source", arch):
2753 for ignore_hint in self.hints.search(
2754 "ignore-reproducible",
2755 package=source_name,
2756 version=source_data_srcdist.version,
2757 architecture=hint_arch,
2758 ):
2759 verdict = PolicyVerdict.PASS_HINTED
2760 reproducible_info.setdefault("ignored-reproducible", {}).setdefault(
2761 arch, {}
2762 ).setdefault("issued-by", []).append(ignore_hint.user)
2763 excuse.addinfo(
2764 "Ignoring reproducibility issue on %s as requested "
2765 "by %s" % (arch, ignore_hint.user)
2766 )
2767 break
2769 if self.options.repro_success_bounty and eligible_for_bounty:
2770 excuse.add_bounty("reproducibility", self.options.repro_success_bounty)
2772 if self.options.repro_regression_penalty and verdict in {
2773 PolicyVerdict.REJECTED_PERMANENTLY,
2774 PolicyVerdict.REJECTED_TEMPORARILY,
2775 }:
2776 if self.options.repro_regression_penalty > 0:
2777 excuse.add_penalty(
2778 "reproducibility", self.options.repro_regression_penalty
2779 )
2780 # In case we give penalties instead of blocking, we must always pass
2781 verdict = PolicyVerdict.PASS
2783 if verdict.is_rejected:
2784 excuse.add_verdict_info(verdict, msg)
2785 else:
2786 excuse.addinfo(msg)
2788 return verdict
2790 def _read_repro_status(
2791 self, filename: str, source: set[str], target: set[str]
2792 ) -> dict[str, dict[str, str]]:
2793 summary = self._reproducible
2794 self.logger.info("Loading reproducibility report from %s", filename)
2795 with open(filename) as fd:
2796 if os.fstat(fd.fileno()).st_size < 1:
2797 return summary
2798 data = json.load(fd)
2800 for result in data:
2801 if result["suite"] in source:
2802 summary["source"].setdefault(result["architecture"], {})[
2803 result["package"]
2804 ] = result
2805 if result["suite"] in target:
2806 summary["target"].setdefault(result["architecture"], {})[
2807 result["package"]
2808 ] = result
2810 return summary