Coverage for britney2/policies/policy.py: 91%
1331 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-29 17:21 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-29 17:21 +0000
1import json
2import logging
3import optparse
4import os
5import re
6import sys
7import time
8from abc import ABC, abstractmethod
9from collections import defaultdict
10from collections.abc import Callable, Container
11from enum import IntEnum, unique
12from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, cast
13from urllib.parse import quote
15import apt_pkg
16import yaml
18from britney2 import (
19 BinaryPackage,
20 BinaryPackageId,
21 DependencyType,
22 PackageId,
23 SourcePackage,
24 Suite,
25 SuiteClass,
26 Suites,
27 TargetSuite,
28)
29from britney2.excusedeps import DependencySpec
30from britney2.hints import (
31 Hint,
32 HintAnnotate,
33 HintCollection,
34 HintParser,
35 HintType,
36 PolicyHintParserProto,
37)
38from britney2.inputs.suiteloader import SuiteContentLoader
39from britney2.migrationitem import MigrationItem, MigrationItemFactory
40from britney2.policies import ApplySrcPolicy, PolicyVerdict
41from britney2.utils import (
42 GetDependencySolversProto,
43 compute_reverse_tree,
44 filter_out_faux,
45 find_newer_binaries,
46 get_dependency_solvers,
47 is_smooth_update_allowed,
48 parse_option,
49)
51if TYPE_CHECKING: 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true
52 from ..britney import Britney
53 from ..excuse import Excuse
54 from ..installability.universe import BinaryPackageUniverse
57class PolicyLoadRequest:
58 __slots__ = ("_options_name", "_default_value", "_policy_constructor")
60 def __init__(
61 self,
62 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"],
63 options_name: str | None,
64 default_value: bool,
65 ) -> None:
66 self._policy_constructor = policy_constructor
67 self._options_name = options_name
68 self._default_value = default_value
70 def is_enabled(self, options: optparse.Values) -> bool:
71 if self._options_name is None:
72 assert self._default_value
73 return True
74 actual_value = getattr(options, self._options_name, None)
75 if actual_value is None:
76 return self._default_value
77 return actual_value.lower() in ("yes", "y", "true", "t")
79 def load(self, options: optparse.Values, suite_info: Suites) -> "BasePolicy":
80 return self._policy_constructor(options, suite_info)
82 @classmethod
83 def always_load(
84 cls, policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"]
85 ) -> "PolicyLoadRequest":
86 return cls(policy_constructor, None, True)
88 @classmethod
89 def conditionally_load(
90 cls,
91 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"],
92 option_name: str,
93 default_value: bool,
94 ) -> "PolicyLoadRequest":
95 return cls(policy_constructor, option_name, default_value)
98class PolicyEngine:
99 def __init__(self) -> None:
100 self._policies: list["BasePolicy"] = []
102 def add_policy(self, policy: "BasePolicy") -> None:
103 self._policies.append(policy)
105 def load_policies(
106 self,
107 options: optparse.Values,
108 suite_info: Suites,
109 policy_load_requests: list[PolicyLoadRequest],
110 ) -> None:
111 for policy_load_request in policy_load_requests:
112 if policy_load_request.is_enabled(options):
113 self.add_policy(policy_load_request.load(options, suite_info))
115 def register_policy_hints(self, hint_parser: HintParser) -> None:
116 for policy in self._policies:
117 policy.register_hints(hint_parser)
119 def initialise(self, britney: "Britney", hints: HintCollection) -> None:
120 for policy in self._policies:
121 policy.hints = hints
122 policy.initialise(britney)
124 def save_state(self, britney: "Britney") -> None:
125 for policy in self._policies:
126 policy.save_state(britney)
128 def apply_src_policies(
129 self,
130 source_t: SourcePackage | None,
131 source_u: SourcePackage,
132 excuse: "Excuse",
133 ) -> None:
134 excuse_verdict = excuse.policy_verdict
135 source_suite = excuse.item.suite
136 suite_class = source_suite.suite_class
137 for policy in self._policies:
138 pinfo: dict[str, Any] = {}
139 policy_verdict = PolicyVerdict.NOT_APPLICABLE
140 if suite_class in policy.applicable_suites:
141 if policy.src_policy.run_arch:
142 for arch in policy.options.architectures:
143 v = policy.apply_srcarch_policy_impl(
144 pinfo, arch, source_t, source_u, excuse
145 )
146 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v)
147 if policy.src_policy.run_src:
148 v = policy.apply_src_policy_impl(pinfo, source_t, source_u, excuse)
149 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v)
150 # The base policy provides this field, so the subclass should leave it blank
151 assert "verdict" not in pinfo
152 if policy_verdict != PolicyVerdict.NOT_APPLICABLE:
153 excuse.policy_info[policy.policy_id] = pinfo
154 pinfo["verdict"] = policy_verdict.name
155 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict)
156 excuse.policy_verdict = excuse_verdict
158 def apply_srcarch_policies(
159 self,
160 arch: str,
161 source_t: SourcePackage | None,
162 source_u: SourcePackage,
163 excuse: "Excuse",
164 ) -> None:
165 excuse_verdict = excuse.policy_verdict
166 source_suite = excuse.item.suite
167 suite_class = source_suite.suite_class
168 for policy in self._policies:
169 pinfo: dict[str, Any] = {}
170 if suite_class in policy.applicable_suites:
171 policy_verdict = policy.apply_srcarch_policy_impl(
172 pinfo, arch, source_t, source_u, excuse
173 )
174 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict)
175 # The base policy provides this field, so the subclass should leave it blank
176 assert "verdict" not in pinfo
177 if policy_verdict != PolicyVerdict.NOT_APPLICABLE:
178 excuse.policy_info[policy.policy_id] = pinfo
179 pinfo["verdict"] = policy_verdict.name
180 excuse.policy_verdict = excuse_verdict
183class BasePolicy(ABC):
184 britney: "Britney"
185 policy_id: str
186 hints: HintCollection | None
187 applicable_suites: set[SuiteClass]
188 src_policy: ApplySrcPolicy
189 options: optparse.Values
190 suite_info: Suites
192 def __init__(
193 self,
194 options: optparse.Values,
195 suite_info: Suites,
196 ) -> None:
197 """The BasePolicy constructor
199 :param options: The options member of Britney with all the
200 config values.
201 """
203 @property
204 @abstractmethod
205 def state_dir(self) -> str: ... 205 ↛ exitline 205 didn't return from function 'state_dir' because
207 def register_hints(self, hint_parser: HintParser) -> None: # pragma: no cover
208 """Register new hints that this policy accepts
210 :param hint_parser: (see HintParser.register_hint_type)
211 """
213 def initialise(self, britney: "Britney") -> None: # pragma: no cover
214 """Called once to make the policy initialise any data structures
216 This is useful for e.g. parsing files or other "heavy do-once" work.
218 :param britney: This is the instance of the "Britney" class.
219 """
220 self.britney = britney
222 def save_state(self, britney: "Britney") -> None: # pragma: no cover
223 """Called once at the end of the run to make the policy save any persistent data
225 Note this will *not* be called for "dry-runs" as such runs should not change
226 the state.
228 :param britney: This is the instance of the "Britney" class.
229 """
231 def apply_src_policy_impl(
232 self,
233 policy_info: dict[str, Any],
234 source_data_tdist: SourcePackage | None,
235 source_data_srcdist: SourcePackage,
236 excuse: "Excuse",
237 ) -> PolicyVerdict: # pragma: no cover
238 """Apply a policy on a given source migration
240 Britney will call this method on a given source package, when
241 Britney is considering to migrate it from the given source
242 suite to the target suite. The policy will then evaluate the
243 the migration and then return a verdict.
245 :param policy_info: A dictionary of all policy results. The
246 policy can add a value stored in a key related to its name.
247 (e.g. policy_info['age'] = {...}). This will go directly into
248 the "excuses.yaml" output.
250 :param source_data_tdist: Information about the source package
251 in the target distribution (e.g. "testing"). This is the
252 data structure in source_suite.sources[source_name]
254 :param source_data_srcdist: Information about the source
255 package in the source distribution (e.g. "unstable" or "tpu").
256 This is the data structure in target_suite.sources[source_name]
258 :return: A Policy Verdict (e.g. PolicyVerdict.PASS)
259 """
260 return PolicyVerdict.NOT_APPLICABLE
262 def apply_srcarch_policy_impl(
263 self,
264 policy_info: dict[str, Any],
265 arch: str,
266 source_data_tdist: SourcePackage | None,
267 source_data_srcdist: SourcePackage,
268 excuse: "Excuse",
269 ) -> PolicyVerdict:
270 """Apply a policy on a given binary migration
272 Britney will call this method on binaries from a given source package
273 on a given architecture, when Britney is considering to migrate them
274 from the given source suite to the target suite. The policy will then
275 evaluate the migration and then return a verdict.
277 :param policy_info: A dictionary of all policy results. The
278 policy can add a value stored in a key related to its name.
279 (e.g. policy_info['age'] = {...}). This will go directly into
280 the "excuses.yaml" output.
282 :param arch: The architecture the item is applied to. This is mostly
283 relevant for policies where src_policy is not ApplySrcPolicy.RUN_SRC
284 (as that is the only case where arch can differ from item.architecture)
286 :param source_data_tdist: Information about the source package
287 in the target distribution (e.g. "testing"). This is the
288 data structure in source_suite.sources[source_name]
290 :param source_data_srcdist: Information about the source
291 package in the source distribution (e.g. "unstable" or "tpu").
292 This is the data structure in target_suite.sources[source_name]
294 :return: A Policy Verdict (e.g. PolicyVerdict.PASS)
295 """
296 # if the policy doesn't implement this function, assume it's OK
297 return PolicyVerdict.NOT_APPLICABLE
300class AbstractBasePolicy(BasePolicy):
301 """
302 A shared abstract class for building BasePolicy objects.
304 tests/test_policy.py:initialize_policy() needs to be able to build BasePolicy
305 objects with just a two-item constructor, while all other uses of BasePolicy-
306 derived objects need the 5-item constructor. So AbstractBasePolicy was split
307 out to document this.
308 """
310 def __init__(
311 self,
312 policy_id: str,
313 options: optparse.Values,
314 suite_info: Suites,
315 applicable_suites: set[SuiteClass],
316 src_policy: ApplySrcPolicy = ApplySrcPolicy.RUN_SRC,
317 ) -> None:
318 """Concrete initializer.
320 :param policy_id: Identifies the policy. It will
321 determine the key used for the excuses.yaml etc.
323 :param options: The options member of Britney with all the
324 config values.
326 :param applicable_suites: Where this policy applies.
327 """
328 self.policy_id = policy_id
329 self.options = options
330 self.suite_info = suite_info
331 self.applicable_suites = applicable_suites
332 self.src_policy = src_policy
333 self.hints: HintCollection | None = None
334 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
335 self.logger = logging.getLogger(logger_name)
337 @property
338 def state_dir(self) -> str:
339 return cast(str, self.options.state_dir)
342_T = TypeVar("_T")
345class SimplePolicyHint(Hint, Generic[_T]):
346 def __init__(
347 self,
348 user: str,
349 hint_type: HintType,
350 policy_parameter: _T,
351 packages: list[MigrationItem],
352 ) -> None:
353 super().__init__(user, hint_type, packages)
354 self._policy_parameter = policy_parameter
356 def __eq__(self, other: Any) -> bool:
357 if self.type != other.type or self._policy_parameter != other._policy_parameter:
358 return False
359 return super().__eq__(other)
361 def str(self) -> str:
362 return "{} {} {}".format(
363 self._type,
364 str(self._policy_parameter),
365 " ".join(x.name for x in self._packages),
366 )
369class AgeDayHint(SimplePolicyHint[int]):
370 @property
371 def days(self) -> int:
372 return self._policy_parameter
375class IgnoreRCBugHint(SimplePolicyHint[frozenset[str]]):
376 @property
377 def ignored_rcbugs(self) -> frozenset[str]:
378 return self._policy_parameter
381def simple_policy_hint_parser_function(
382 class_name: Callable[[str, HintType, _T, list[MigrationItem]], Hint],
383 converter: Callable[[str], _T],
384) -> PolicyHintParserProto:
385 def f(
386 mi_factory: MigrationItemFactory,
387 hints: HintCollection,
388 who: str,
389 hint_type: HintType,
390 *args: str,
391 ) -> None:
392 policy_parameter = args[0]
393 args = args[1:]
394 for item in mi_factory.parse_items(*args):
395 hints.add_hint(
396 class_name(who, hint_type, converter(policy_parameter), [item])
397 )
399 return f
402class AgePolicy(AbstractBasePolicy):
403 """Configurable Aging policy for source migrations
405 The AgePolicy will let packages stay in the source suite for a pre-defined
406 amount of days before letting migrate (based on their urgency, if any).
408 The AgePolicy's decision is influenced by the following:
410 State files:
411 * ${STATE_DIR}/age-policy-urgencies: File containing urgencies for source
412 packages. Note that urgencies are "sticky" and the most "urgent" urgency
413 will be used (i.e. the one with lowest age-requirements).
414 - This file needs to be updated externally, if the policy should take
415 urgencies into consideration. If empty (or not updated), the policy
416 will simply use the default urgency (see the "Config" section below)
417 - In Debian, these values are taken from the .changes file, but that is
418 not a requirement for Britney.
419 * ${STATE_DIR}/age-policy-dates: File containing the age of all source
420 packages.
421 - The policy will automatically update this file.
422 Config:
423 * DEFAULT_URGENCY: Name of the urgency used for packages without an urgency
424 (or for unknown urgencies). Will also be used to set the "minimum"
425 aging requirements for packages not in the target suite.
426 * MINDAYS_<URGENCY>: The age-requirements in days for packages with the
427 given urgency.
428 - Commonly used urgencies are: low, medium, high, emergency, critical
429 Hints:
430 * urgent <source>/<version>: Disregard the age requirements for a given
431 source/version.
432 * age-days X <source>/<version>: Set the age requirements for a given
433 source/version to X days. Note that X can exceed the highest
434 age-requirement normally given.
436 """
438 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
439 super().__init__("age", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE})
440 self._min_days = self._generate_mindays_table()
441 self._min_days_default = 0
442 # britney's "day" begins at 7pm (we want aging to occur in the 22:00Z run and we run Britney 2-4 times a day)
443 # NB: _date_now is used in tests
444 time_now = time.time()
445 if hasattr(self.options, "fake_runtime"):
446 time_now = int(self.options.fake_runtime)
447 self.logger.info("overriding runtime with fake_runtime %d" % time_now)
449 self._date_now = int(((time_now / (60 * 60)) - 19) / 24)
450 self._dates: dict[str, tuple[str, int]] = {}
451 self._urgencies: dict[str, str] = {}
452 self._default_urgency: str = self.options.default_urgency
453 self._penalty_immune_urgencies: frozenset[str] = frozenset()
454 if hasattr(self.options, "no_penalties"):
455 self._penalty_immune_urgencies = frozenset(
456 x.strip() for x in self.options.no_penalties.split()
457 )
458 self._bounty_min_age: int | None = None # initialised later
460 def _generate_mindays_table(self) -> dict[str, int]:
461 mindays: dict[str, int] = {}
462 for k in dir(self.options):
463 if not k.startswith("mindays_"):
464 continue
465 v = getattr(self.options, k)
466 try:
467 as_days = int(v)
468 except ValueError:
469 raise ValueError(
470 "Unable to parse "
471 + k
472 + " as a number of days. Must be 0 or a positive integer"
473 )
474 if as_days < 0: 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true
475 raise ValueError(
476 "The value of " + k + " must be zero or a positive integer"
477 )
478 mindays[k.split("_")[1]] = as_days
479 return mindays
481 def register_hints(self, hint_parser: HintParser) -> None:
482 hint_parser.register_hint_type(
483 HintType(
484 "age-days",
485 simple_policy_hint_parser_function(AgeDayHint, int),
486 min_args=2,
487 )
488 )
489 hint_parser.register_hint_type(HintType("urgent"))
491 def initialise(self, britney: "Britney") -> None:
492 super().initialise(britney)
493 self._read_dates_file()
494 self._read_urgencies_file()
495 if self._default_urgency not in self._min_days: # pragma: no cover
496 raise ValueError(
497 "Missing age-requirement for default urgency (MINDAYS_%s)"
498 % self._default_urgency
499 )
500 self._min_days_default = self._min_days[self._default_urgency]
501 try:
502 self._bounty_min_age = int(self.options.bounty_min_age)
503 except ValueError: 503 ↛ 504line 503 didn't jump to line 504 because the exception caught by line 503 didn't happen
504 if self.options.bounty_min_age in self._min_days:
505 self._bounty_min_age = self._min_days[self.options.bounty_min_age]
506 else: # pragma: no cover
507 raise ValueError(
508 "Please fix BOUNTY_MIN_AGE in the britney configuration"
509 )
510 except AttributeError:
511 # The option wasn't defined in the configuration
512 self._bounty_min_age = 0
514 def save_state(self, britney: "Britney") -> None:
515 super().save_state(britney)
516 self._write_dates_file()
518 def apply_src_policy_impl(
519 self,
520 age_info: dict[str, Any],
521 source_data_tdist: SourcePackage | None,
522 source_data_srcdist: SourcePackage,
523 excuse: "Excuse",
524 ) -> PolicyVerdict:
525 # retrieve the urgency for the upload, ignoring it if this is a NEW package
526 # (not present in the target suite)
527 source_name = excuse.item.package
528 urgency = self._urgencies.get(source_name, self._default_urgency)
530 if urgency not in self._min_days: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true
531 age_info["unknown-urgency"] = urgency
532 urgency = self._default_urgency
534 if not source_data_tdist:
535 if self._min_days[urgency] < self._min_days_default:
536 age_info["urgency-reduced"] = {
537 "from": urgency,
538 "to": self._default_urgency,
539 }
540 urgency = self._default_urgency
542 if source_name not in self._dates:
543 self._dates[source_name] = (source_data_srcdist.version, self._date_now)
544 elif self._dates[source_name][0] != source_data_srcdist.version:
545 self._dates[source_name] = (source_data_srcdist.version, self._date_now)
547 days_old = self._date_now - self._dates[source_name][1]
548 min_days = self._min_days[urgency]
549 for bounty in excuse.bounty:
550 if excuse.bounty[bounty]: 550 ↛ 549line 550 didn't jump to line 549 because the condition on line 550 was always true
551 self.logger.info(
552 "Applying bounty for %s granted by %s: %d days",
553 source_name,
554 bounty,
555 excuse.bounty[bounty],
556 )
557 excuse.addinfo(
558 "Required age reduced by %d days because of %s"
559 % (excuse.bounty[bounty], bounty)
560 )
561 assert excuse.bounty[bounty] > 0, "negative bounties shouldn't happen"
562 min_days -= excuse.bounty[bounty]
563 if urgency not in self._penalty_immune_urgencies:
564 for penalty in excuse.penalty:
565 if excuse.penalty[penalty]: 565 ↛ 564line 565 didn't jump to line 564 because the condition on line 565 was always true
566 self.logger.info(
567 "Applying penalty for %s given by %s: %d days",
568 source_name,
569 penalty,
570 excuse.penalty[penalty],
571 )
572 excuse.addinfo(
573 "Required age increased by %d days because of %s"
574 % (excuse.penalty[penalty], penalty)
575 )
576 assert (
577 excuse.penalty[penalty] > 0
578 ), "negative penalties should be handled earlier"
579 min_days += excuse.penalty[penalty]
581 assert self._bounty_min_age is not None
582 # the age in BOUNTY_MIN_AGE can be higher than the one associated with
583 # the real urgency, so don't forget to take it into account
584 bounty_min_age = min(self._bounty_min_age, self._min_days[urgency])
585 if min_days < bounty_min_age:
586 min_days = bounty_min_age
587 excuse.addinfo(
588 "Required age is not allowed to drop below %d days" % min_days
589 )
591 age_info["current-age"] = days_old
593 assert self.hints is not None
594 for age_days_hint in cast(
595 "list[AgeDayHint]",
596 self.hints.search(
597 "age-days", package=source_name, version=source_data_srcdist.version
598 ),
599 ):
600 new_req = age_days_hint.days
601 age_info["age-requirement-reduced"] = {
602 "new-requirement": new_req,
603 "changed-by": age_days_hint.user,
604 }
605 if "original-age-requirement" not in age_info: 605 ↛ 607line 605 didn't jump to line 607 because the condition on line 605 was always true
606 age_info["original-age-requirement"] = min_days
607 min_days = new_req
609 age_info["age-requirement"] = min_days
610 res = PolicyVerdict.PASS
612 if days_old < min_days:
613 urgent_hints = self.hints.search(
614 "urgent", package=source_name, version=source_data_srcdist.version
615 )
616 if urgent_hints:
617 age_info["age-requirement-reduced"] = {
618 "new-requirement": 0,
619 "changed-by": urgent_hints[0].user,
620 }
621 res = PolicyVerdict.PASS_HINTED
622 else:
623 res = PolicyVerdict.REJECTED_TEMPORARILY
625 # update excuse
626 age_hint = age_info.get("age-requirement-reduced", None)
627 age_min_req = age_info["age-requirement"]
628 if age_hint:
629 new_req = age_hint["new-requirement"]
630 who = age_hint["changed-by"]
631 if new_req:
632 excuse.addinfo(
633 "Overriding age needed from %d days to %d by %s"
634 % (age_min_req, new_req, who)
635 )
636 age_min_req = new_req
637 else:
638 excuse.addinfo("Too young, but urgency pushed by %s" % who)
639 age_min_req = 0
640 excuse.setdaysold(age_info["current-age"], age_min_req)
642 if age_min_req == 0:
643 excuse.addinfo("%d days old" % days_old)
644 elif days_old < age_min_req:
645 excuse.add_verdict_info(
646 res, "Too young, only %d of %d days old" % (days_old, age_min_req)
647 )
648 else:
649 excuse.addinfo("%d days old (needed %d days)" % (days_old, age_min_req))
651 return res
653 def _read_dates_file(self) -> None:
654 """Parse the dates file"""
655 dates = self._dates
656 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Dates")
657 using_new_name = False
658 try:
659 filename = os.path.join(self.state_dir, "age-policy-dates")
660 if not os.path.exists(filename) and os.path.exists(fallback_filename): 660 ↛ 661line 660 didn't jump to line 661 because the condition on line 660 was never true
661 filename = fallback_filename
662 else:
663 using_new_name = True
664 except AttributeError:
665 if os.path.exists(fallback_filename):
666 filename = fallback_filename
667 else:
668 raise RuntimeError("Please set STATE_DIR in the britney configuration")
670 try:
671 with open(filename, encoding="utf-8") as fd:
672 for line in fd:
673 if line.startswith("#"):
674 # Ignore comment lines (mostly used for tests)
675 continue
676 # <source> <version> <date>)
677 ln = line.split()
678 if len(ln) != 3: # pragma: no cover
679 continue
680 try:
681 dates[ln[0]] = (ln[1], int(ln[2]))
682 except ValueError: # pragma: no cover
683 pass
684 except FileNotFoundError:
685 if not using_new_name: 685 ↛ 687line 685 didn't jump to line 687 because the condition on line 685 was never true
686 # If we using the legacy name, then just give up
687 raise
688 self.logger.info("%s does not appear to exist. Creating it", filename)
689 with open(filename, mode="x", encoding="utf-8"):
690 pass
692 def _read_urgencies_file(self) -> None:
693 urgencies = self._urgencies
694 min_days_default = self._min_days_default
695 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Urgency")
696 try:
697 filename = os.path.join(self.state_dir, "age-policy-urgencies")
698 if not os.path.exists(filename) and os.path.exists(fallback_filename): 698 ↛ 699line 698 didn't jump to line 699 because the condition on line 698 was never true
699 filename = fallback_filename
700 except AttributeError:
701 filename = fallback_filename
703 sources_s = self.suite_info.primary_source_suite.sources
704 sources_t = self.suite_info.target_suite.sources
706 with open(filename, errors="surrogateescape", encoding="ascii") as fd:
707 for line in fd:
708 if line.startswith("#"):
709 # Ignore comment lines (mostly used for tests)
710 continue
711 # <source> <version> <urgency>
712 ln = line.split()
713 if len(ln) != 3: 713 ↛ 714line 713 didn't jump to line 714 because the condition on line 713 was never true
714 continue
716 # read the minimum days associated with the urgencies
717 urgency_old = urgencies.get(ln[0], None)
718 mindays_old = self._min_days.get(urgency_old, 1000) # type: ignore[arg-type]
719 mindays_new = self._min_days.get(ln[2], min_days_default)
721 # if the new urgency is lower (so the min days are higher), do nothing
722 if mindays_old <= mindays_new:
723 continue
725 # if the package exists in the target suite and it is more recent, do nothing
726 tsrcv = sources_t.get(ln[0], None)
727 if tsrcv and apt_pkg.version_compare(tsrcv.version, ln[1]) >= 0:
728 continue
730 # if the package doesn't exist in the primary source suite or it is older, do nothing
731 usrcv = sources_s.get(ln[0], None)
732 if not usrcv or apt_pkg.version_compare(usrcv.version, ln[1]) < 0: 732 ↛ 733line 732 didn't jump to line 733 because the condition on line 732 was never true
733 continue
735 # update the urgency for the package
736 urgencies[ln[0]] = ln[2]
738 def _write_dates_file(self) -> None:
739 dates = self._dates
740 try:
741 directory = self.state_dir
742 basename = "age-policy-dates"
743 old_file = os.path.join(self.suite_info.target_suite.path, "Dates")
744 except AttributeError:
745 directory = self.suite_info.target_suite.path
746 basename = "Dates"
747 old_file = None
748 filename = os.path.join(directory, basename)
749 filename_tmp = os.path.join(directory, "%s_new" % basename)
750 with open(filename_tmp, "w", encoding="utf-8") as fd:
751 for pkg in sorted(dates):
752 version, date = dates[pkg]
753 fd.write("%s %s %d\n" % (pkg, version, date))
754 os.rename(filename_tmp, filename)
755 if old_file is not None and os.path.exists(old_file): 755 ↛ 756line 755 didn't jump to line 756 because the condition on line 755 was never true
756 self.logger.info("Removing old age-policy-dates file %s", old_file)
757 os.unlink(old_file)
760class RCBugPolicy(AbstractBasePolicy):
761 """RC bug regression policy for source migrations
763 The RCBugPolicy will read provided list of RC bugs and block any
764 source upload that would introduce a *new* RC bug in the target
765 suite.
767 The RCBugPolicy's decision is influenced by the following:
769 State files:
770 * ${STATE_DIR}/rc-bugs-${SUITE_NAME}: File containing RC bugs for packages in
771 the given suite (one for both primary source suite and the target sutie is
772 needed).
773 - These files need to be updated externally.
774 """
776 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
777 super().__init__(
778 "rc-bugs", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
779 )
780 self._bugs_source: dict[str, set[str]] | None = None
781 self._bugs_target: dict[str, set[str]] | None = None
783 def register_hints(self, hint_parser: HintParser) -> None:
784 f = simple_policy_hint_parser_function(
785 IgnoreRCBugHint, lambda x: frozenset(x.split(","))
786 )
787 hint_parser.register_hint_type(HintType("ignore-rc-bugs", f, min_args=2))
789 def initialise(self, britney: "Britney") -> None:
790 super().initialise(britney)
791 source_suite = self.suite_info.primary_source_suite
792 target_suite = self.suite_info.target_suite
793 fallback_unstable = os.path.join(source_suite.path, "BugsV")
794 fallback_testing = os.path.join(target_suite.path, "BugsV")
795 try:
796 filename_unstable = os.path.join(
797 self.state_dir, "rc-bugs-%s" % source_suite.name
798 )
799 filename_testing = os.path.join(
800 self.state_dir, "rc-bugs-%s" % target_suite.name
801 )
802 if ( 802 ↛ 808line 802 didn't jump to line 808
803 not os.path.exists(filename_unstable)
804 and not os.path.exists(filename_testing)
805 and os.path.exists(fallback_unstable)
806 and os.path.exists(fallback_testing)
807 ):
808 filename_unstable = fallback_unstable
809 filename_testing = fallback_testing
810 except AttributeError:
811 filename_unstable = fallback_unstable
812 filename_testing = fallback_testing
813 self._bugs_source = self._read_bugs(filename_unstable)
814 self._bugs_target = self._read_bugs(filename_testing)
816 def apply_src_policy_impl(
817 self,
818 rcbugs_info: dict[str, Any],
819 source_data_tdist: SourcePackage | None,
820 source_data_srcdist: SourcePackage,
821 excuse: "Excuse",
822 ) -> PolicyVerdict:
823 assert self._bugs_source is not None # for type checking
824 assert self._bugs_target is not None # for type checking
825 bugs_t = set()
826 bugs_s = set()
827 source_name = excuse.item.package
828 binaries_s = {x[0] for x in source_data_srcdist.binaries}
829 try:
830 binaries_t = {x[0] for x in source_data_tdist.binaries} # type: ignore[union-attr]
831 except AttributeError:
832 binaries_t = set()
834 src_key = f"src:{source_name}"
835 if source_data_tdist and src_key in self._bugs_target:
836 bugs_t.update(self._bugs_target[src_key])
837 if src_key in self._bugs_source:
838 bugs_s.update(self._bugs_source[src_key])
840 for pkg in binaries_s:
841 if pkg in self._bugs_source:
842 bugs_s |= self._bugs_source[pkg]
843 for pkg in binaries_t:
844 if pkg in self._bugs_target:
845 bugs_t |= self._bugs_target[pkg]
847 # The bts seems to support filing source bugs against a binary of the
848 # same name if that binary isn't built by any source. An example is bug
849 # 820347 against Package: juce (in the live-2016-04-11 test). Add those
850 # bugs too.
851 if (
852 source_name not in (binaries_s | binaries_t)
853 and source_name
854 not in {
855 x.package_name
856 for x in self.suite_info.primary_source_suite.all_binaries_in_suite.keys()
857 }
858 and source_name
859 not in {
860 x.package_name
861 for x in self.suite_info.target_suite.all_binaries_in_suite.keys()
862 }
863 ):
864 if source_name in self._bugs_source:
865 bugs_s |= self._bugs_source[source_name]
866 if source_name in self._bugs_target: 866 ↛ 867line 866 didn't jump to line 867 because the condition on line 866 was never true
867 bugs_t |= self._bugs_target[source_name]
869 # If a package is not in the target suite, it has no RC bugs per
870 # definition. Unfortunately, it seems that the live-data is
871 # not always accurate (e.g. live-2011-12-13 suggests that
872 # obdgpslogger had the same bug in testing and unstable,
873 # but obdgpslogger was not in testing at that time).
874 # - For the curious, obdgpslogger was removed on that day
875 # and the BTS probably had not caught up with that fact.
876 # (https://tracker.debian.org/news/415935)
877 assert not bugs_t or source_data_tdist, (
878 "%s had bugs in the target suite but is not present" % source_name
879 )
881 verdict = PolicyVerdict.PASS
883 assert self.hints is not None
884 for ignore_hint in cast(
885 list[IgnoreRCBugHint],
886 self.hints.search(
887 "ignore-rc-bugs",
888 package=source_name,
889 version=source_data_srcdist.version,
890 ),
891 ):
892 ignored_bugs = ignore_hint.ignored_rcbugs
894 # Only handle one hint for now
895 if "ignored-bugs" in rcbugs_info:
896 self.logger.info(
897 "Ignoring ignore-rc-bugs hint from %s on %s due to another hint from %s",
898 ignore_hint.user,
899 source_name,
900 rcbugs_info["ignored-bugs"]["issued-by"],
901 )
902 continue
903 if not ignored_bugs.isdisjoint(bugs_s): 903 ↛ 912line 903 didn't jump to line 912 because the condition on line 903 was always true
904 bugs_s -= ignored_bugs
905 bugs_t -= ignored_bugs
906 rcbugs_info["ignored-bugs"] = {
907 "bugs": sorted(ignored_bugs),
908 "issued-by": ignore_hint.user,
909 }
910 verdict = PolicyVerdict.PASS_HINTED
911 else:
912 self.logger.info(
913 "Ignoring ignore-rc-bugs hint from %s on %s as none of %s affect the package",
914 ignore_hint.user,
915 source_name,
916 str(ignored_bugs),
917 )
919 rcbugs_info["shared-bugs"] = sorted(bugs_s & bugs_t)
920 rcbugs_info["unique-source-bugs"] = sorted(bugs_s - bugs_t)
921 rcbugs_info["unique-target-bugs"] = sorted(bugs_t - bugs_s)
923 # update excuse
924 new_bugs = rcbugs_info["unique-source-bugs"]
925 old_bugs = rcbugs_info["unique-target-bugs"]
926 excuse.setbugs(old_bugs, new_bugs)
928 if new_bugs:
929 verdict = PolicyVerdict.REJECTED_PERMANENTLY
930 excuse.add_verdict_info(
931 verdict,
932 "Updating %s would introduce bugs in %s: %s"
933 % (
934 source_name,
935 self.suite_info.target_suite.name,
936 ", ".join(
937 [
938 '<a href="https://bugs.debian.org/%s">#%s</a>'
939 % (quote(a), a)
940 for a in new_bugs
941 ]
942 ),
943 ),
944 )
946 if old_bugs:
947 excuse.addinfo(
948 "Updating %s will fix bugs in %s: %s"
949 % (
950 source_name,
951 self.suite_info.target_suite.name,
952 ", ".join(
953 [
954 '<a href="https://bugs.debian.org/%s">#%s</a>'
955 % (quote(a), a)
956 for a in old_bugs
957 ]
958 ),
959 )
960 )
962 return verdict
964 def _read_bugs(self, filename: str) -> dict[str, set[str]]:
965 """Read the release critical bug summary from the specified file
967 The file contains rows with the format:
969 <package-name> <bug number>[,<bug number>...]
971 The method returns a dictionary where the key is the binary package
972 name and the value is the list of open RC bugs for it.
973 """
974 bugs: dict[str, set[str]] = {}
975 self.logger.info("Loading RC bugs data from %s", filename)
976 with open(filename, encoding="ascii") as f:
977 for line in f:
978 ln = line.split()
979 if len(ln) != 2: # pragma: no cover
980 self.logger.warning("Malformed line found in line %s", line)
981 continue
982 pkg = ln[0]
983 if pkg not in bugs:
984 bugs[pkg] = set()
985 bugs[pkg].update(ln[1].split(","))
986 return bugs
989class PiupartsPolicy(AbstractBasePolicy):
990 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
991 super().__init__(
992 "piuparts", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
993 )
994 self._piuparts_source: dict[str, tuple[str, str]] | None = None
995 self._piuparts_target: dict[str, tuple[str, str]] | None = None
997 def register_hints(self, hint_parser: HintParser) -> None:
998 hint_parser.register_hint_type(HintType("ignore-piuparts"))
1000 def initialise(self, britney: "Britney") -> None:
1001 super().initialise(britney)
1002 source_suite = self.suite_info.primary_source_suite
1003 target_suite = self.suite_info.target_suite
1004 try:
1005 filename_unstable = os.path.join(
1006 self.state_dir, "piuparts-summary-%s.json" % source_suite.name
1007 )
1008 filename_testing = os.path.join(
1009 self.state_dir, "piuparts-summary-%s.json" % target_suite.name
1010 )
1011 except AttributeError as e: # pragma: no cover
1012 raise RuntimeError(
1013 "Please set STATE_DIR in the britney configuration"
1014 ) from e
1015 self._piuparts_source = self._read_piuparts_summary(
1016 filename_unstable, keep_url=True
1017 )
1018 self._piuparts_target = self._read_piuparts_summary(
1019 filename_testing, keep_url=False
1020 )
1022 def apply_src_policy_impl(
1023 self,
1024 piuparts_info: dict[str, Any],
1025 source_data_tdist: SourcePackage | None,
1026 source_data_srcdist: SourcePackage,
1027 excuse: "Excuse",
1028 ) -> PolicyVerdict:
1029 assert self._piuparts_source is not None # for type checking
1030 assert self._piuparts_target is not None # for type checking
1031 source_name = excuse.item.package
1033 if source_name in self._piuparts_target:
1034 testing_state = self._piuparts_target[source_name][0]
1035 else:
1036 testing_state = "X"
1037 url: str | None
1038 if source_name in self._piuparts_source:
1039 unstable_state, url = self._piuparts_source[source_name]
1040 else:
1041 unstable_state = "X"
1042 url = None
1043 url_html = "(no link yet)"
1044 if url is not None:
1045 url_html = '<a href="{0}">{0}</a>'.format(url)
1047 match unstable_state:
1048 case "P":
1049 # Not a regression
1050 msg = f"Piuparts tested OK - {url_html}"
1051 result = PolicyVerdict.PASS
1052 piuparts_info["test-results"] = "pass"
1053 case "F" if testing_state != "F":
1054 piuparts_info["test-results"] = "regression"
1055 msg = f"Piuparts regression - {url_html}"
1056 result = PolicyVerdict.REJECTED_PERMANENTLY
1057 case "F":
1058 piuparts_info["test-results"] = "failed"
1059 msg = f"Piuparts failure (not a regression) - {url_html}"
1060 result = PolicyVerdict.PASS
1061 case "W":
1062 msg = f"Piuparts check waiting for test results - {url_html}"
1063 result = PolicyVerdict.REJECTED_TEMPORARILY
1064 piuparts_info["test-results"] = "waiting-for-test-results"
1065 case _:
1066 msg = f"Piuparts can't test {source_name} (not a blocker) - {url_html}"
1067 piuparts_info["test-results"] = "cannot-be-tested"
1068 result = PolicyVerdict.PASS
1070 if url is not None:
1071 piuparts_info["piuparts-test-url"] = url
1072 if result.is_rejected:
1073 excuse.add_verdict_info(result, msg)
1074 else:
1075 excuse.addinfo(msg)
1077 if result.is_rejected:
1078 assert self.hints is not None
1079 for ignore_hint in self.hints.search(
1080 "ignore-piuparts",
1081 package=source_name,
1082 version=source_data_srcdist.version,
1083 ):
1084 piuparts_info["ignored-piuparts"] = {"issued-by": ignore_hint.user}
1085 result = PolicyVerdict.PASS_HINTED
1086 excuse.addinfo(
1087 f"Piuparts issue ignored as requested by {ignore_hint.user}"
1088 )
1089 break
1091 return result
1093 def _read_piuparts_summary(
1094 self, filename: str, keep_url: bool = True
1095 ) -> dict[str, tuple[str, str]]:
1096 summary: dict[str, tuple[str, str]] = {}
1097 self.logger.info("Loading piuparts report from %s", filename)
1098 with open(filename) as fd: 1098 ↛ exitline 1098 didn't return from function '_read_piuparts_summary' because the return on line 1100 wasn't executed
1099 if os.fstat(fd.fileno()).st_size < 1: 1099 ↛ 1100line 1099 didn't jump to line 1100 because the condition on line 1099 was never true
1100 return summary
1101 data = json.load(fd)
1102 try:
1103 if (
1104 data["_id"] != "Piuparts Package Test Results Summary"
1105 or data["_version"] != "1.0"
1106 ): # pragma: no cover
1107 raise ValueError(
1108 f"Piuparts results in {filename} does not have the correct ID or version"
1109 )
1110 except KeyError as e: # pragma: no cover
1111 raise ValueError(
1112 f"Piuparts results in {filename} is missing id or version field"
1113 ) from e
1114 for source, suite_data in data["packages"].items():
1115 if len(suite_data) != 1: # pragma: no cover
1116 raise ValueError(
1117 f"Piuparts results in {filename}, the source {source} does not have "
1118 "exactly one result set"
1119 )
1120 item = next(iter(suite_data.values()))
1121 state, _, url = item
1122 if not keep_url:
1123 url = None
1124 summary[source] = (state, url)
1126 return summary
1129class DependsPolicy(AbstractBasePolicy):
1130 pkg_universe: "BinaryPackageUniverse"
1131 broken_packages: frozenset["BinaryPackageId"]
1132 all_binaries: dict["BinaryPackageId", "BinaryPackage"]
1133 allow_uninst: dict[str, set[str | None]]
1135 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1136 super().__init__(
1137 "depends",
1138 options,
1139 suite_info,
1140 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1141 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1142 )
1143 self.nobreakall_arches = None
1144 self.new_arches = None
1145 self.break_arches = None
1147 def initialise(self, britney: "Britney") -> None:
1148 super().initialise(britney)
1149 self.pkg_universe = britney.pkg_universe
1150 self.broken_packages = self.pkg_universe.broken_packages
1151 self.all_binaries = britney.all_binaries
1152 self.nobreakall_arches = self.options.nobreakall_arches
1153 self.new_arches = self.options.new_arches
1154 self.break_arches = self.options.break_arches
1155 self.allow_uninst = britney.allow_uninst
1157 def apply_srcarch_policy_impl(
1158 self,
1159 deps_info: dict[str, Any],
1160 arch: str,
1161 source_data_tdist: SourcePackage | None,
1162 source_data_srcdist: SourcePackage,
1163 excuse: "Excuse",
1164 ) -> PolicyVerdict:
1165 verdict = PolicyVerdict.PASS
1167 assert self.break_arches is not None
1168 assert self.new_arches is not None
1169 if arch in self.break_arches or arch in self.new_arches:
1170 # we don't check these in the policy (TODO - for now?)
1171 return verdict
1173 item = excuse.item
1174 source_suite = item.suite
1175 target_suite = self.suite_info.target_suite
1177 packages_s_a = source_suite.binaries[arch]
1178 packages_t_a = target_suite.binaries[arch]
1180 my_bins = sorted(filter_out_faux(excuse.packages[arch]))
1182 arch_all_installable = set()
1183 arch_arch_installable = set()
1184 consider_it_regression = True
1186 for pkg_id in my_bins:
1187 pkg_name = pkg_id.package_name
1188 binary_u = packages_s_a[pkg_name]
1189 pkg_arch = binary_u.architecture
1191 # in some cases, we want to track the uninstallability of a
1192 # package (because the autopkgtest policy uses this), but we still
1193 # want to allow the package to be uninstallable
1194 skip_dep_check = False
1196 if binary_u.source_version != source_data_srcdist.version:
1197 # don't check cruft in unstable
1198 continue
1200 if item.architecture != "source" and pkg_arch == "all":
1201 # we don't care about the existing arch: all binaries when
1202 # checking a binNMU item, because the arch: all binaries won't
1203 # migrate anyway
1204 skip_dep_check = True
1206 if pkg_arch == "all" and arch not in self.nobreakall_arches:
1207 skip_dep_check = True
1209 if pkg_name in self.allow_uninst[arch]: 1209 ↛ 1212line 1209 didn't jump to line 1212 because the condition on line 1209 was never true
1210 # this binary is allowed to become uninstallable, so we don't
1211 # need to check anything
1212 skip_dep_check = True
1214 if pkg_name in packages_t_a:
1215 oldbin = packages_t_a[pkg_name]
1216 if not target_suite.is_installable(oldbin.pkg_id):
1217 # as the current binary in testing is already
1218 # uninstallable, the newer version is allowed to be
1219 # uninstallable as well, so we don't need to check
1220 # anything
1221 skip_dep_check = True
1222 consider_it_regression = False
1224 if pkg_id in self.broken_packages:
1225 if pkg_arch == "all":
1226 arch_all_installable.add(False)
1227 else:
1228 arch_arch_installable.add(False)
1229 # dependencies can't be satisfied by all the known binaries -
1230 # this certainly won't work...
1231 excuse.add_unsatisfiable_on_arch(arch)
1232 if skip_dep_check:
1233 # ...but if the binary is allowed to become uninstallable,
1234 # we don't care
1235 # we still want the binary to be listed as uninstallable,
1236 continue
1237 verdict = PolicyVerdict.REJECTED_PERMANENTLY
1238 if pkg_name.endswith("-faux-build-depends"): 1238 ↛ 1239line 1238 didn't jump to line 1239 because the condition on line 1238 was never true
1239 name = pkg_name.replace("-faux-build-depends", "")
1240 excuse.add_verdict_info(
1241 verdict,
1242 f"src:{name} has unsatisfiable build dependency",
1243 )
1244 else:
1245 excuse.add_verdict_info(
1246 verdict, f"{pkg_name}/{arch} has unsatisfiable dependency"
1247 )
1248 excuse.addreason("depends")
1249 else:
1250 if pkg_arch == "all":
1251 arch_all_installable.add(True)
1252 else:
1253 arch_arch_installable.add(True)
1255 if skip_dep_check:
1256 continue
1258 deps = self.pkg_universe.dependencies_of(pkg_id)
1260 for dep in deps:
1261 # dep is a list of packages, each of which satisfy the
1262 # dependency
1264 if dep == frozenset():
1265 continue
1266 is_ok = False
1267 needed_for_dep = set()
1269 for alternative in dep:
1270 if target_suite.is_pkg_in_the_suite(alternative):
1271 # dep can be satisfied in testing - ok
1272 is_ok = True
1273 elif alternative in my_bins:
1274 # can be satisfied by binary from same item: will be
1275 # ok if item migrates
1276 is_ok = True
1277 else:
1278 needed_for_dep.add(alternative)
1280 if not is_ok:
1281 spec = DependencySpec(DependencyType.DEPENDS, arch)
1282 excuse.add_package_depends(spec, needed_for_dep)
1284 # The autopkgtest policy needs delicate trade offs for
1285 # non-installability. The current choice (considering source
1286 # migration and only binaries built by the version of the
1287 # source):
1288 #
1289 # * Run autopkgtest if all arch:$arch binaries are installable
1290 # (but some or all arch:all binaries are not)
1291 #
1292 # * Don't schedule nor wait for not installable arch:all only package
1293 # on ! NOBREAKALL_ARCHES
1294 #
1295 # * Run autopkgtest if installability isn't a regression (there are (or
1296 # rather, should) not be a lot of packages in this state, and most
1297 # likely they'll just fail quickly)
1298 #
1299 # * Don't schedule, but wait otherwise
1300 if arch_arch_installable == {True} and False in arch_all_installable:
1301 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch)
1302 elif (
1303 arch not in self.nobreakall_arches
1304 and arch_arch_installable == set()
1305 and False in arch_all_installable
1306 ):
1307 deps_info.setdefault("arch_all_not_installable", []).append(arch)
1308 elif not consider_it_regression:
1309 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch)
1311 return verdict
1314@unique
1315class BuildDepResult(IntEnum):
1316 # relation is satisfied in target
1317 OK = 1
1318 # relation can be satisfied by other packages in source
1319 DEPENDS = 2
1320 # relation cannot be satisfied
1321 FAILED = 3
1324class BuildDependsPolicy(AbstractBasePolicy):
1326 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1327 super().__init__(
1328 "build-depends",
1329 options,
1330 suite_info,
1331 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1332 )
1333 self._all_buildarch: list[str] = []
1335 parse_option(options, "all_buildarch")
1337 def initialise(self, britney: "Britney") -> None:
1338 super().initialise(britney)
1339 if self.options.all_buildarch:
1340 self._all_buildarch = SuiteContentLoader.config_str_as_list(
1341 self.options.all_buildarch, []
1342 )
1344 def apply_src_policy_impl(
1345 self,
1346 build_deps_info: dict[str, Any],
1347 source_data_tdist: SourcePackage | None,
1348 source_data_srcdist: SourcePackage,
1349 excuse: "Excuse",
1350 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers,
1351 ) -> PolicyVerdict:
1352 verdict = PolicyVerdict.PASS
1354 # analyze the dependency fields (if present)
1355 if deps := source_data_srcdist.build_deps_arch:
1356 v = self._check_build_deps(
1357 deps,
1358 DependencyType.BUILD_DEPENDS,
1359 build_deps_info,
1360 source_data_tdist,
1361 source_data_srcdist,
1362 excuse,
1363 get_dependency_solvers=get_dependency_solvers,
1364 )
1365 verdict = PolicyVerdict.worst_of(verdict, v)
1367 if ideps := source_data_srcdist.build_deps_indep:
1368 v = self._check_build_deps(
1369 ideps,
1370 DependencyType.BUILD_DEPENDS_INDEP,
1371 build_deps_info,
1372 source_data_tdist,
1373 source_data_srcdist,
1374 excuse,
1375 get_dependency_solvers=get_dependency_solvers,
1376 )
1377 verdict = PolicyVerdict.worst_of(verdict, v)
1379 return verdict
1381 def _get_check_archs(
1382 self, archs: Container[str], dep_type: DependencyType
1383 ) -> list[str]:
1384 oos = self.options.outofsync_arches
1386 if dep_type == DependencyType.BUILD_DEPENDS:
1387 return [
1388 arch
1389 for arch in self.options.architectures
1390 if arch in archs and arch not in oos
1391 ]
1393 # first try the all buildarch
1394 checkarchs = list(self._all_buildarch)
1395 # then try the architectures where this source has arch specific
1396 # binaries (in the order of the architecture config file)
1397 checkarchs.extend(
1398 arch
1399 for arch in self.options.architectures
1400 if arch in archs and arch not in checkarchs
1401 )
1402 # then try all other architectures
1403 checkarchs.extend(
1404 arch for arch in self.options.architectures if arch not in checkarchs
1405 )
1407 # and drop OUTOFSYNC_ARCHES
1408 return [arch for arch in checkarchs if arch not in oos]
1410 def _add_info_for_arch(
1411 self,
1412 arch: str,
1413 excuses_info: dict[str, list[str]],
1414 blockers: dict[str, set[BinaryPackageId]],
1415 results: dict[str, BuildDepResult],
1416 dep_type: DependencyType,
1417 target_suite: TargetSuite,
1418 source_suite: Suite,
1419 excuse: "Excuse",
1420 verdict: PolicyVerdict,
1421 ) -> PolicyVerdict:
1422 if arch in blockers:
1423 packages = blockers[arch]
1425 # for the solving packages, update the excuse to add the dependencies
1426 for p in packages:
1427 if arch not in self.options.break_arches: 1427 ↛ 1426line 1427 didn't jump to line 1426 because the condition on line 1427 was always true
1428 spec = DependencySpec(dep_type, arch)
1429 excuse.add_package_depends(spec, {p})
1431 if arch in results and results[arch] == BuildDepResult.FAILED:
1432 verdict = PolicyVerdict.worst_of(
1433 verdict, PolicyVerdict.REJECTED_PERMANENTLY
1434 )
1436 if arch in excuses_info:
1437 for excuse_text in excuses_info[arch]:
1438 if verdict.is_rejected: 1438 ↛ 1441line 1438 didn't jump to line 1441 because the condition on line 1438 was always true
1439 excuse.add_verdict_info(verdict, excuse_text)
1440 else:
1441 excuse.addinfo(excuse_text)
1443 return verdict
1445 def _check_build_deps(
1446 self,
1447 deps: str,
1448 dep_type: DependencyType,
1449 build_deps_info: dict[str, Any],
1450 source_data_tdist: SourcePackage | None,
1451 source_data_srcdist: SourcePackage,
1452 excuse: "Excuse",
1453 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers,
1454 ) -> PolicyVerdict:
1455 verdict = PolicyVerdict.PASS
1456 any_arch_ok = dep_type == DependencyType.BUILD_DEPENDS_INDEP
1458 britney = self.britney
1460 # local copies for better performance
1461 parse_src_depends = apt_pkg.parse_src_depends
1463 source_name = excuse.item.package
1464 source_suite = excuse.item.suite
1465 target_suite = self.suite_info.target_suite
1466 binaries_s = source_suite.binaries
1467 provides_s = source_suite.provides_table
1468 binaries_t = target_suite.binaries
1469 provides_t = target_suite.provides_table
1470 unsat_bd: dict[str, list[str]] = {}
1471 relevant_archs: set[str] = {
1472 binary.architecture
1473 for binary in filter_out_faux(source_data_srcdist.binaries)
1474 if britney.all_binaries[binary].architecture != "all"
1475 }
1477 excuses_info: dict[str, list[str]] = defaultdict(list)
1478 blockers: dict[str, set[BinaryPackageId]] = defaultdict(set)
1479 arch_results = {}
1480 result_archs = defaultdict(list)
1481 bestresult = BuildDepResult.FAILED
1482 check_archs = self._get_check_archs(relevant_archs, dep_type)
1483 if not check_archs:
1484 # when the arch list is empty, we check the b-d on any arch, instead of all archs
1485 # this happens for Build-Depens on a source package that only produces arch: all binaries
1486 any_arch_ok = True
1487 check_archs = self._get_check_archs(
1488 self.options.architectures, DependencyType.BUILD_DEPENDS_INDEP
1489 )
1491 for arch in check_archs:
1492 # retrieve the binary package from the specified suite and arch
1493 binaries_s_a = binaries_s[arch]
1494 provides_s_a = provides_s[arch]
1495 binaries_t_a = binaries_t[arch]
1496 provides_t_a = provides_t[arch]
1497 arch_results[arch] = BuildDepResult.OK
1498 # for every dependency block (formed as conjunction of disjunction)
1499 for block_txt in deps.split(","):
1500 block_list = parse_src_depends(block_txt, False, arch)
1501 # Unlike regular dependencies, some clauses of the Build-Depends(-Arch|-Indep) can be
1502 # filtered out by (e.g.) architecture restrictions. We need to cope with this while
1503 # keeping block_txt and block aligned.
1504 if not block_list:
1505 # Relation is not relevant for this architecture.
1506 continue
1507 block = block_list[0]
1508 # if the block is satisfied in the target suite, then skip the block
1509 if get_dependency_solvers(
1510 block, binaries_t_a, provides_t_a, build_depends=True
1511 ):
1512 # Satisfied in the target suite; all ok.
1513 continue
1515 # check if the block can be satisfied in the source suite, and list the solving packages
1516 packages = get_dependency_solvers(
1517 block, binaries_s_a, provides_s_a, build_depends=True
1518 )
1519 sources = sorted(p.source for p in packages)
1521 # if the dependency can be satisfied by the same source package, skip the block:
1522 # obviously both binary packages will enter the target suite together
1523 if source_name in sources: 1523 ↛ 1524line 1523 didn't jump to line 1524 because the condition on line 1523 was never true
1524 continue
1526 # if no package can satisfy the dependency, add this information to the excuse
1527 if not packages:
1528 excuses_info[arch].append(
1529 "%s unsatisfiable %s on %s: %s"
1530 % (source_name, dep_type, arch, block_txt.strip())
1531 )
1532 if arch not in unsat_bd: 1532 ↛ 1534line 1532 didn't jump to line 1534 because the condition on line 1532 was always true
1533 unsat_bd[arch] = []
1534 unsat_bd[arch].append(block_txt.strip())
1535 arch_results[arch] = BuildDepResult.FAILED
1536 continue
1538 blockers[arch].update(p.pkg_id for p in packages)
1539 if arch_results[arch] < BuildDepResult.DEPENDS:
1540 arch_results[arch] = BuildDepResult.DEPENDS
1542 if any_arch_ok:
1543 if arch_results[arch] < bestresult:
1544 bestresult = arch_results[arch]
1545 result_archs[arch_results[arch]].append(arch)
1546 if bestresult == BuildDepResult.OK:
1547 # we found an architecture where the b-deps-indep are
1548 # satisfied in the target suite, so we can stop
1549 break
1551 if any_arch_ok:
1552 arch = result_archs[bestresult][0]
1553 excuse.add_detailed_info(f"Checking {dep_type.get_description()} on {arch}")
1554 key = "check-%s-on-arch" % dep_type.get_reason()
1555 build_deps_info[key] = arch
1556 verdict = self._add_info_for_arch(
1557 arch,
1558 excuses_info,
1559 blockers,
1560 arch_results,
1561 dep_type,
1562 target_suite,
1563 source_suite,
1564 excuse,
1565 verdict,
1566 )
1568 else:
1569 for arch in check_archs:
1570 verdict = self._add_info_for_arch(
1571 arch,
1572 excuses_info,
1573 blockers,
1574 arch_results,
1575 dep_type,
1576 target_suite,
1577 source_suite,
1578 excuse,
1579 verdict,
1580 )
1582 if unsat_bd:
1583 build_deps_info["unsatisfiable-arch-build-depends"] = unsat_bd
1585 return verdict
1588class BuiltUsingPolicy(AbstractBasePolicy):
1589 """Built-Using policy
1591 Binaries that incorporate (part of) another source package must list these
1592 sources under 'Built-Using'.
1594 This policy checks if the corresponding sources are available in the
1595 target suite. If they are not, but they are candidates for migration, a
1596 dependency is added.
1598 If the binary incorporates a newer version of a source, that is not (yet)
1599 a candidate, we don't want to accept that binary. A rebuild later in the
1600 primary suite wouldn't fix the issue, because that would incorporate the
1601 newer version again.
1603 If the binary incorporates an older version of the source, a newer version
1604 will be accepted as a replacement. We assume that this can be fixed by
1605 rebuilding the binary at some point during the development cycle.
1607 Requiring exact version of the source would not be useful in practice. A
1608 newer upload of that source wouldn't be blocked by this policy, so the
1609 built-using would be outdated anyway.
1611 """
1613 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1614 super().__init__(
1615 "built-using",
1616 options,
1617 suite_info,
1618 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1619 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1620 )
1622 def initialise(self, britney: "Britney") -> None:
1623 super().initialise(britney)
1625 def apply_srcarch_policy_impl(
1626 self,
1627 build_deps_info: dict[str, Any],
1628 arch: str,
1629 source_data_tdist: SourcePackage | None,
1630 source_data_srcdist: SourcePackage,
1631 excuse: "Excuse",
1632 ) -> PolicyVerdict:
1633 verdict = PolicyVerdict.PASS
1635 source_suite = excuse.item.suite
1636 target_suite = self.suite_info.target_suite
1637 binaries_s = source_suite.binaries
1639 def check_bu_in_suite(
1640 bu_source: str, bu_version: str, source_suite: Suite
1641 ) -> bool:
1642 found = False
1643 if bu_source not in source_suite.sources:
1644 return found
1645 s_source = source_suite.sources[bu_source]
1646 s_ver = s_source.version
1647 if apt_pkg.version_compare(s_ver, bu_version) >= 0:
1648 found = True
1649 dep = PackageId(bu_source, s_ver, "source")
1650 if arch in self.options.break_arches:
1651 excuse.add_detailed_info(
1652 "Ignoring Built-Using for %s/%s on %s"
1653 % (pkg_name, arch, dep.uvname)
1654 )
1655 else:
1656 spec = DependencySpec(DependencyType.BUILT_USING, arch)
1657 excuse.add_package_depends(spec, {dep})
1658 excuse.add_detailed_info(
1659 f"{pkg_name}/{arch} has Built-Using on {dep.uvname}"
1660 )
1662 return found
1664 for pkg_id in sorted(
1665 x
1666 for x in filter_out_faux(source_data_srcdist.binaries)
1667 if x.architecture == arch
1668 ):
1669 pkg_name = pkg_id.package_name
1671 # retrieve the testing (if present) and unstable corresponding binary packages
1672 binary_s = binaries_s[arch][pkg_name]
1674 for bu in binary_s.builtusing:
1675 bu_source = bu[0]
1676 bu_version = bu[1]
1677 found = False
1678 if bu_source in target_suite.sources:
1679 t_source = target_suite.sources[bu_source]
1680 t_ver = t_source.version
1681 if apt_pkg.version_compare(t_ver, bu_version) >= 0:
1682 found = True
1684 if not found:
1685 found = check_bu_in_suite(bu_source, bu_version, source_suite)
1687 if not found and source_suite.suite_class.is_additional_source:
1688 found = check_bu_in_suite(
1689 bu_source, bu_version, self.suite_info.primary_source_suite
1690 )
1692 if not found:
1693 if arch in self.options.break_arches:
1694 excuse.add_detailed_info(
1695 "Ignoring unsatisfiable Built-Using for %s/%s on %s %s"
1696 % (pkg_name, arch, bu_source, bu_version)
1697 )
1698 else:
1699 verdict = PolicyVerdict.worst_of(
1700 verdict, PolicyVerdict.REJECTED_PERMANENTLY
1701 )
1702 excuse.add_verdict_info(
1703 verdict,
1704 "%s/%s has unsatisfiable Built-Using on %s %s"
1705 % (pkg_name, arch, bu_source, bu_version),
1706 )
1708 return verdict
1711class BlockPolicy(AbstractBasePolicy):
1712 BLOCK_HINT_REGEX = re.compile("^(un)?(block-?.*)$")
1714 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1715 super().__init__(
1716 "block",
1717 options,
1718 suite_info,
1719 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1720 )
1721 self._blockall: dict[str | None, Hint] = {}
1723 def initialise(self, britney: "Britney") -> None:
1724 super().initialise(britney)
1725 assert self.hints is not None
1726 for hint in self.hints.search(type="block-all"):
1727 self._blockall[hint.package] = hint
1729 self._key_packages = []
1730 if "key" in self._blockall:
1731 self._key_packages = self._read_key_packages()
1733 def _read_key_packages(self) -> list[str]:
1734 """Read the list of key packages
1736 The file contains data in the yaml format :
1738 - reason: <something>
1739 source: <package>
1741 The method returns a list of all key packages.
1742 """
1743 filename = os.path.join(self.state_dir, "key_packages.yaml")
1744 self.logger.info("Loading key packages from %s", filename)
1745 if os.path.exists(filename): 1745 ↛ 1750line 1745 didn't jump to line 1750 because the condition on line 1745 was always true
1746 with open(filename) as f:
1747 data = yaml.safe_load(f)
1748 key_packages = [item["source"] for item in data]
1749 else:
1750 self.logger.error(
1751 "Britney was asked to block key packages, "
1752 + "but no key_packages.yaml file was found."
1753 )
1754 sys.exit(1)
1756 return key_packages
1758 def register_hints(self, hint_parser: HintParser) -> None:
1759 # block related hints are currently defined in hint.py
1760 pass
1762 def _check_blocked(
1763 self, arch: str, version: str, excuse: "Excuse"
1764 ) -> PolicyVerdict:
1765 verdict = PolicyVerdict.PASS
1766 blocked = {}
1767 unblocked = {}
1768 block_info = {}
1769 source_suite = excuse.item.suite
1770 suite_name = source_suite.name
1771 src = excuse.item.package
1772 is_primary = source_suite.suite_class == SuiteClass.PRIMARY_SOURCE_SUITE
1774 tooltip = (
1775 "please contact %s-release if update is needed" % self.options.distribution
1776 )
1778 assert self.hints is not None
1779 shints = self.hints.search(package=src)
1780 mismatches = False
1781 r = self.BLOCK_HINT_REGEX
1782 for hint in shints:
1783 m = r.match(hint.type)
1784 if m:
1785 if m.group(1) == "un":
1786 assert hint.suite is not None
1787 if (
1788 hint.version != version
1789 or hint.suite.name != suite_name
1790 or (hint.architecture != arch and hint.architecture != "source")
1791 ):
1792 self.logger.info(
1793 "hint mismatch: %s %s %s", version, arch, suite_name
1794 )
1795 mismatches = True
1796 else:
1797 unblocked[m.group(2)] = hint.user
1798 excuse.add_hint(hint)
1799 else:
1800 # block(-*) hint: only accepts a source, so this will
1801 # always match
1802 blocked[m.group(2)] = hint.user
1803 excuse.add_hint(hint)
1805 if "block" not in blocked and is_primary:
1806 # if there is a specific block hint for this package, we don't
1807 # check for the general hints
1809 if self.options.distribution == "debian": 1809 ↛ 1816line 1809 didn't jump to line 1816 because the condition on line 1809 was always true
1810 url = "https://release.debian.org/testing/freeze_policy.html"
1811 tooltip = (
1812 'Follow the <a href="%s">freeze policy</a> when applying for an unblock'
1813 % url
1814 )
1816 if "source" in self._blockall:
1817 blocked["block"] = self._blockall["source"].user
1818 excuse.add_hint(self._blockall["source"])
1819 elif (
1820 "new-source" in self._blockall
1821 and src not in self.suite_info.target_suite.sources
1822 ):
1823 blocked["block"] = self._blockall["new-source"].user
1824 excuse.add_hint(self._blockall["new-source"])
1825 # no tooltip: new sources will probably not be accepted anyway
1826 block_info["block"] = "blocked by {}: is not in {}".format(
1827 self._blockall["new-source"].user,
1828 self.suite_info.target_suite.name,
1829 )
1830 elif "key" in self._blockall and src in self._key_packages:
1831 blocked["block"] = self._blockall["key"].user
1832 excuse.add_hint(self._blockall["key"])
1833 block_info["block"] = "blocked by {}: is a key package ({})".format(
1834 self._blockall["key"].user,
1835 tooltip,
1836 )
1837 elif "no-autopkgtest" in self._blockall:
1838 if excuse.autopkgtest_results == {"PASS"}:
1839 if not blocked: 1839 ↛ 1865line 1839 didn't jump to line 1865 because the condition on line 1839 was always true
1840 excuse.addinfo("not blocked: has successful autopkgtest")
1841 else:
1842 blocked["block"] = self._blockall["no-autopkgtest"].user
1843 excuse.add_hint(self._blockall["no-autopkgtest"])
1844 if not excuse.autopkgtest_results:
1845 block_info["block"] = (
1846 "blocked by %s: does not have autopkgtest (%s)"
1847 % (
1848 self._blockall["no-autopkgtest"].user,
1849 tooltip,
1850 )
1851 )
1852 else:
1853 block_info["block"] = (
1854 "blocked by %s: autopkgtest not fully successful (%s)"
1855 % (
1856 self._blockall["no-autopkgtest"].user,
1857 tooltip,
1858 )
1859 )
1861 elif not is_primary:
1862 blocked["block"] = suite_name
1863 excuse.needs_approval = True
1865 for block_cmd in blocked:
1866 unblock_cmd = "un" + block_cmd
1867 if block_cmd in unblocked:
1868 if is_primary or block_cmd == "block-udeb":
1869 excuse.addinfo(
1870 "Ignoring %s request by %s, due to %s request by %s"
1871 % (
1872 block_cmd,
1873 blocked[block_cmd],
1874 unblock_cmd,
1875 unblocked[block_cmd],
1876 )
1877 )
1878 else:
1879 excuse.addinfo("Approved by %s" % (unblocked[block_cmd]))
1880 else:
1881 verdict = PolicyVerdict.REJECTED_NEEDS_APPROVAL
1882 if is_primary or block_cmd == "block-udeb":
1883 # redirect people to d-i RM for udeb things:
1884 if block_cmd == "block-udeb":
1885 tooltip = "please contact the d-i release manager if an update is needed"
1886 if block_cmd in block_info:
1887 info = block_info[block_cmd]
1888 else:
1889 info = (
1890 "Not touching package due to {} request by {} ({})".format(
1891 block_cmd,
1892 blocked[block_cmd],
1893 tooltip,
1894 )
1895 )
1896 excuse.add_verdict_info(verdict, info)
1897 else:
1898 excuse.add_verdict_info(verdict, "NEEDS APPROVAL BY RM")
1899 excuse.addreason("block")
1900 if mismatches:
1901 excuse.add_detailed_info(
1902 "Some hints for %s do not match this item" % src
1903 )
1904 return verdict
1906 def apply_src_policy_impl(
1907 self,
1908 block_info: dict[str, Any],
1909 source_data_tdist: SourcePackage | None,
1910 source_data_srcdist: SourcePackage,
1911 excuse: "Excuse",
1912 ) -> PolicyVerdict:
1913 return self._check_blocked("source", source_data_srcdist.version, excuse)
1915 def apply_srcarch_policy_impl(
1916 self,
1917 block_info: dict[str, Any],
1918 arch: str,
1919 source_data_tdist: SourcePackage | None,
1920 source_data_srcdist: SourcePackage,
1921 excuse: "Excuse",
1922 ) -> PolicyVerdict:
1923 return self._check_blocked(arch, source_data_srcdist.version, excuse)
1926class BuiltOnBuilddPolicy(AbstractBasePolicy):
1928 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1929 super().__init__(
1930 "builtonbuildd",
1931 options,
1932 suite_info,
1933 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1934 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1935 )
1936 self._builtonbuildd: dict[str, Any] = {
1937 "signerinfo": None,
1938 }
1940 def register_hints(self, hint_parser: HintParser) -> None:
1941 hint_parser.register_hint_type(
1942 HintType(
1943 "allow-archall-maintainer-upload",
1944 versioned=HintAnnotate.FORBIDDEN,
1945 )
1946 )
1948 def initialise(self, britney: "Britney") -> None:
1949 super().initialise(britney)
1950 try:
1951 filename_signerinfo = os.path.join(self.state_dir, "signers.json")
1952 except AttributeError as e: # pragma: no cover
1953 raise RuntimeError(
1954 "Please set STATE_DIR in the britney configuration"
1955 ) from e
1956 self._builtonbuildd["signerinfo"] = self._read_signerinfo(filename_signerinfo)
1958 def apply_srcarch_policy_impl(
1959 self,
1960 buildd_info: dict[str, Any],
1961 arch: str,
1962 source_data_tdist: SourcePackage | None,
1963 source_data_srcdist: SourcePackage,
1964 excuse: "Excuse",
1965 ) -> PolicyVerdict:
1966 verdict = PolicyVerdict.PASS
1967 signers = self._builtonbuildd["signerinfo"]
1969 if "signed-by" not in buildd_info:
1970 buildd_info["signed-by"] = {}
1972 item = excuse.item
1973 source_suite = item.suite
1975 # horrible hard-coding, but currently, we don't keep track of the
1976 # component when loading the packages files
1977 component = "main"
1978 # we use the source component, because a binary in contrib can
1979 # belong to a source in main
1980 section = source_data_srcdist.section
1981 if section.find("/") > -1:
1982 component = section.split("/")[0]
1984 packages_s_a = source_suite.binaries[arch]
1985 assert self.hints is not None
1987 for pkg_id in sorted(
1988 x
1989 for x in filter_out_faux(source_data_srcdist.binaries)
1990 if x.architecture == arch
1991 ):
1992 pkg_name = pkg_id.package_name
1993 binary_u = packages_s_a[pkg_name]
1994 pkg_arch = binary_u.architecture
1996 if binary_u.source_version != source_data_srcdist.version: 1996 ↛ 1997line 1996 didn't jump to line 1997 because the condition on line 1996 was never true
1997 continue
1999 if item.architecture != "source" and pkg_arch == "all":
2000 # we don't care about the existing arch: all binaries when
2001 # checking a binNMU item, because the arch: all binaries won't
2002 # migrate anyway
2003 continue
2005 signer = None
2006 uid = None
2007 uidinfo = ""
2008 buildd_ok = False
2009 failure_verdict = PolicyVerdict.REJECTED_PERMANENTLY
2010 try:
2011 signer = signers[pkg_name][pkg_id.version][pkg_arch]
2012 if signer["buildd"]:
2013 buildd_ok = True
2014 uid = signer["uid"]
2015 uidinfo = f"arch {pkg_arch} binaries uploaded by {uid}"
2016 except KeyError:
2017 self.logger.info(
2018 "signer info for %s %s (%s) on %s not found "
2019 % (pkg_name, binary_u.version, pkg_arch, arch)
2020 )
2021 uidinfo = "upload info for arch %s binaries not found" % (pkg_arch)
2022 failure_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT
2023 if not buildd_ok:
2024 if component != "main":
2025 if not buildd_ok and pkg_arch not in buildd_info["signed-by"]: 2025 ↛ 2029line 2025 didn't jump to line 2029 because the condition on line 2025 was always true
2026 excuse.add_detailed_info(
2027 f"{uidinfo}, but package in {component}"
2028 )
2029 buildd_ok = True
2030 elif pkg_arch == "all":
2031 allow_hints = self.hints.search(
2032 "allow-archall-maintainer-upload", package=item.package
2033 )
2034 if allow_hints:
2035 buildd_ok = True
2036 verdict = PolicyVerdict.worst_of(
2037 verdict, PolicyVerdict.PASS_HINTED
2038 )
2039 if pkg_arch not in buildd_info["signed-by"]:
2040 excuse.addinfo(
2041 "%s, but whitelisted by %s"
2042 % (uidinfo, allow_hints[0].user)
2043 )
2044 if not buildd_ok:
2045 verdict = failure_verdict
2046 if pkg_arch not in buildd_info["signed-by"]:
2047 if pkg_arch == "all":
2048 uidinfo += (
2049 ", a new source-only upload is needed to allow migration"
2050 )
2051 excuse.add_verdict_info(
2052 verdict, "Not built on buildd: %s" % (uidinfo)
2053 )
2055 if ( 2055 ↛ 2059line 2055 didn't jump to line 2059
2056 pkg_arch in buildd_info["signed-by"]
2057 and buildd_info["signed-by"][pkg_arch] != uid
2058 ):
2059 self.logger.info(
2060 "signer mismatch for %s (%s %s) on %s: %s, while %s already listed"
2061 % (
2062 pkg_name,
2063 binary_u.source,
2064 binary_u.source_version,
2065 pkg_arch,
2066 uid,
2067 buildd_info["signed-by"][pkg_arch],
2068 )
2069 )
2071 buildd_info["signed-by"][pkg_arch] = uid
2073 return verdict
2075 def _read_signerinfo(self, filename: str) -> dict[str, Any]:
2076 signerinfo: dict[str, Any] = {}
2077 self.logger.info("Loading signer info from %s", filename)
2078 with open(filename) as fd: 2078 ↛ exitline 2078 didn't return from function '_read_signerinfo' because the return on line 2080 wasn't executed
2079 if os.fstat(fd.fileno()).st_size < 1: 2079 ↛ 2080line 2079 didn't jump to line 2080 because the condition on line 2079 was never true
2080 return signerinfo
2081 signerinfo = json.load(fd)
2083 return signerinfo
2086class ImplicitDependencyPolicy(AbstractBasePolicy):
2087 """Implicit Dependency policy
2089 Upgrading a package pkg-a can break the installability of a package pkg-b.
2090 A newer version (or the removal) of pkg-b might fix the issue. In that
2091 case, pkg-a has an 'implicit dependency' on pkg-b, because pkg-a can only
2092 migrate if pkg-b also migrates.
2094 This policy tries to discover a few common cases, and adds the relevant
2095 info to the excuses. If another item is needed to fix the
2096 uninstallability, a dependency is added. If no newer item can fix it, this
2097 excuse will be blocked.
2099 Note that the migration step will check the installability of every
2100 package, so this policy doesn't need to handle every corner case. It
2101 must, however, make sure that no excuse is unnecessarily blocked.
2103 Some cases that should be detected by this policy:
2105 * pkg-a is upgraded from 1.0-1 to 2.0-1, while
2106 pkg-b has "Depends: pkg-a (<< 2.0)"
2107 This typically happens if pkg-b has a strict dependency on pkg-a because
2108 it uses some non-stable internal interface (examples are glibc,
2109 binutils, python3-defaults, ...)
2111 * pkg-a is upgraded from 1.0-1 to 2.0-1, and
2112 pkg-a 1.0-1 has "Provides: provides-1",
2113 pkg-a 2.0-1 has "Provides: provides-2",
2114 pkg-b has "Depends: provides-1"
2115 This typically happens when pkg-a has an interface that changes between
2116 versions, and a virtual package is used to identify the version of this
2117 interface (e.g. perl-api-x.y)
2119 """
2121 _pkg_universe: "BinaryPackageUniverse"
2122 _all_binaries: dict["BinaryPackageId", "BinaryPackage"]
2123 _allow_uninst: dict[str, set[str | None]]
2124 _nobreakall_arches: list[str]
2126 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2127 super().__init__(
2128 "implicit-deps",
2129 options,
2130 suite_info,
2131 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
2132 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
2133 )
2135 def initialise(self, britney: "Britney") -> None:
2136 super().initialise(britney)
2137 self._pkg_universe = britney.pkg_universe
2138 self._all_binaries = britney.all_binaries
2139 self._smooth_updates = britney.options.smooth_updates
2140 self._nobreakall_arches = self.options.nobreakall_arches
2141 self._new_arches = self.options.new_arches
2142 self._break_arches = self.options.break_arches
2143 self._allow_uninst = britney.allow_uninst
2144 self._outofsync_arches = self.options.outofsync_arches
2146 def can_be_removed(self, pkg: BinaryPackage) -> bool:
2147 src = pkg.source
2148 target_suite = self.suite_info.target_suite
2150 # TODO these conditions shouldn't be hardcoded here
2151 # ideally, we would be able to look up excuses to see if the removal
2152 # is in there, but in the current flow, this policy is called before
2153 # all possible excuses exist, so there is no list for us to check
2155 if src not in self.suite_info.primary_source_suite.sources:
2156 # source for pkg not in unstable: candidate for removal
2157 return True
2159 source_t = target_suite.sources[src]
2160 assert self.hints is not None
2161 for hint in self.hints.search("remove", package=src, version=source_t.version):
2162 # removal hint for the source in testing: candidate for removal
2163 return True
2165 if target_suite.is_cruft(pkg):
2166 # if pkg is cruft in testing, removal will be tried
2167 return True
2169 # the case were the newer version of the source no longer includes the
2170 # binary (or includes a cruft version of the binary) will be handled
2171 # separately (in that case there might be an implicit dependency on
2172 # the newer source)
2174 return False
2176 def should_skip_rdep(
2177 self, pkg: BinaryPackage, source_name: str, myarch: str
2178 ) -> bool:
2179 target_suite = self.suite_info.target_suite
2181 if not target_suite.is_pkg_in_the_suite(pkg.pkg_id):
2182 # it is not in the target suite, migration cannot break anything
2183 return True
2185 if pkg.source == source_name:
2186 # if it is built from the same source, it will be upgraded
2187 # with the source
2188 return True
2190 if self.can_be_removed(pkg):
2191 # could potentially be removed, so if that happens, it won't be
2192 # broken
2193 return True
2195 if pkg.architecture == "all" and myarch not in self._nobreakall_arches:
2196 # arch all on non nobreakarch is allowed to become uninstallable
2197 return True
2199 if pkg.pkg_id.package_name in self._allow_uninst[myarch]:
2200 # there is a hint to allow this binary to become uninstallable
2201 return True
2203 if not target_suite.is_installable(pkg.pkg_id):
2204 # it is already uninstallable in the target suite, migration
2205 # cannot break anything
2206 return True
2208 return False
2210 def breaks_installability(
2211 self,
2212 pkg_id_t: BinaryPackageId,
2213 pkg_id_s: BinaryPackageId | None,
2214 pkg_to_check: BinaryPackageId,
2215 ) -> bool:
2216 """
2217 Check if upgrading pkg_id_t to pkg_id_s breaks the installability of
2218 pkg_to_check.
2220 To check if removing pkg_id_t breaks pkg_to_check, set pkg_id_s to
2221 None.
2222 """
2224 pkg_universe = self._pkg_universe
2225 negative_deps = pkg_universe.negative_dependencies_of(pkg_to_check)
2227 for dep in pkg_universe.dependencies_of(pkg_to_check):
2228 if pkg_id_t not in dep:
2229 # this depends doesn't have pkg_id_t as alternative, so
2230 # upgrading pkg_id_t cannot break this dependency clause
2231 continue
2233 # We check all the alternatives for this dependency, to find one
2234 # that can satisfy it when pkg_id_t is upgraded to pkg_id_s
2235 found_alternative = False
2236 for d in dep:
2237 if d in negative_deps:
2238 # If this alternative dependency conflicts with
2239 # pkg_to_check, it cannot be used to satisfy the
2240 # dependency.
2241 # This commonly happens when breaks are added to pkg_id_s.
2242 continue
2244 if d.package_name != pkg_id_t.package_name:
2245 # a binary different from pkg_id_t can satisfy the dep, so
2246 # upgrading pkg_id_t won't break this dependency
2247 found_alternative = True
2248 break
2250 if d != pkg_id_s:
2251 # We want to know the impact of the upgrade of
2252 # pkg_id_t to pkg_id_s. If pkg_id_s migrates to the
2253 # target suite, any other version of this binary will
2254 # not be there, so it cannot satisfy this dependency.
2255 # This includes pkg_id_t, but also other versions.
2256 continue
2258 # pkg_id_s can satisfy the dep
2259 found_alternative = True
2261 if not found_alternative:
2262 return True
2263 return False
2265 def check_upgrade(
2266 self,
2267 pkg_id_t: BinaryPackageId,
2268 pkg_id_s: BinaryPackageId | None,
2269 source_name: str,
2270 myarch: str,
2271 broken_binaries: set[str],
2272 excuse: "Excuse",
2273 ) -> PolicyVerdict:
2274 verdict = PolicyVerdict.PASS
2276 pkg_universe = self._pkg_universe
2277 all_binaries = self._all_binaries
2279 # check all rdeps of the package in testing
2280 rdeps_t = pkg_universe.reverse_dependencies_of(pkg_id_t)
2282 for rdep_pkg in sorted(rdeps_t):
2283 rdep_p = all_binaries[rdep_pkg]
2285 # check some cases where the rdep won't become uninstallable, or
2286 # where we don't care if it does
2287 if self.should_skip_rdep(rdep_p, source_name, myarch):
2288 continue
2290 if not self.breaks_installability(pkg_id_t, pkg_id_s, rdep_pkg):
2291 # if upgrading pkg_id_t to pkg_id_s doesn't break rdep_pkg,
2292 # there is no implicit dependency
2293 continue
2295 # The upgrade breaks the installability of the rdep. We need to
2296 # find out if there is a newer version of the rdep that solves the
2297 # uninstallability. If that is the case, there is an implicit
2298 # dependency. If not, the upgrade will fail.
2300 # check source versions
2301 newer_versions = find_newer_binaries(
2302 self.suite_info, rdep_p, add_source_for_dropped_bin=True
2303 )
2304 good_newer_versions = set()
2305 for npkg, suite in newer_versions:
2306 if npkg.architecture == "source":
2307 # When a newer version of the source package doesn't have
2308 # the binary, we get the source as 'newer version'. In
2309 # this case, the binary will not be uninstallable if the
2310 # newer source migrates, because it is no longer there.
2311 good_newer_versions.add(npkg)
2312 continue
2313 assert isinstance(npkg, BinaryPackageId)
2314 if not self.breaks_installability(pkg_id_t, pkg_id_s, npkg):
2315 good_newer_versions.add(npkg)
2317 if good_newer_versions:
2318 spec = DependencySpec(DependencyType.IMPLICIT_DEPENDENCY, myarch)
2319 excuse.add_package_depends(spec, good_newer_versions)
2320 else:
2321 # no good newer versions: no possible solution
2322 broken_binaries.add(rdep_pkg.name)
2323 if pkg_id_s:
2324 action = "migrating {} to {}".format(
2325 pkg_id_s.name,
2326 self.suite_info.target_suite.name,
2327 )
2328 else:
2329 action = "removing {} from {}".format(
2330 pkg_id_t.name,
2331 self.suite_info.target_suite.name,
2332 )
2333 if rdep_pkg[0].endswith("-faux-build-depends"):
2334 name = rdep_pkg[0].replace("-faux-build-depends", "")
2335 info = f'{action} makes Build-Depends of src:<a href="#{name}">{name}</a> uninstallable'
2336 else:
2337 info = '{0} makes <a href="#{1}">{1}</a> uninstallable'.format(
2338 action, rdep_pkg.name
2339 )
2340 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2341 excuse.add_verdict_info(verdict, info)
2343 return verdict
2345 def apply_srcarch_policy_impl(
2346 self,
2347 implicit_dep_info: dict[str, Any],
2348 arch: str,
2349 source_data_tdist: SourcePackage | None,
2350 source_data_srcdist: SourcePackage,
2351 excuse: "Excuse",
2352 ) -> PolicyVerdict:
2353 verdict = PolicyVerdict.PASS
2355 if not source_data_tdist:
2356 # this item is not currently in testing: no implicit dependency
2357 return verdict
2359 if excuse.hasreason("missingbuild"):
2360 # if the build is missing, the policy would treat this as if the
2361 # binaries would be removed, which would give incorrect (and
2362 # confusing) info
2363 info = "missing build, not checking implicit dependencies on %s" % (arch)
2364 excuse.add_detailed_info(info)
2365 return verdict
2367 source_suite = excuse.item.suite
2368 source_name = excuse.item.package
2369 target_suite = self.suite_info.target_suite
2370 all_binaries = self._all_binaries
2372 # we check all binaries for this excuse that are currently in testing
2373 relevant_binaries = [
2374 x
2375 for x in source_data_tdist.binaries
2376 if (arch == "source" or x.architecture == arch)
2377 and x.package_name in target_suite.binaries[x.architecture]
2378 and x.architecture not in self._new_arches
2379 and x.architecture not in self._break_arches
2380 and x.architecture not in self._outofsync_arches
2381 ]
2383 broken_binaries: set[str] = set()
2385 assert self.hints is not None
2386 for pkg_id_t in sorted(relevant_binaries):
2387 mypkg = pkg_id_t.package_name
2388 myarch = pkg_id_t.architecture
2389 binaries_t_a = target_suite.binaries[myarch]
2390 binaries_s_a = source_suite.binaries[myarch]
2392 if target_suite.is_cruft(all_binaries[pkg_id_t]):
2393 # this binary is cruft in testing: it will stay around as long
2394 # as necessary to satisfy dependencies, so we don't need to
2395 # care
2396 continue
2398 if mypkg in binaries_s_a:
2399 mybin = binaries_s_a[mypkg]
2400 pkg_id_s: Optional["BinaryPackageId"] = mybin.pkg_id
2401 if mybin.source != source_name:
2402 # hijack: this is too complicated to check, so we ignore
2403 # it (the migration code will check the installability
2404 # later anyway)
2405 pass
2406 elif mybin.source_version != source_data_srcdist.version:
2407 # cruft in source suite: pretend the binary doesn't exist
2408 pkg_id_s = None
2409 elif pkg_id_t == pkg_id_s:
2410 # same binary (probably arch: all from a binNMU):
2411 # 'upgrading' doesn't change anything, for this binary, so
2412 # it won't break anything
2413 continue
2414 else:
2415 pkg_id_s = None
2417 if not pkg_id_s and is_smooth_update_allowed(
2418 binaries_t_a[mypkg], self._smooth_updates, self.hints
2419 ):
2420 # the binary isn't in the new version (or is cruft there), and
2421 # smooth updates are allowed: the binary can stay around if
2422 # that is necessary to satisfy dependencies, so we don't need
2423 # to check it
2424 continue
2426 if (
2427 not pkg_id_s
2428 and source_data_tdist.version == source_data_srcdist.version
2429 and source_suite.suite_class == SuiteClass.ADDITIONAL_SOURCE_SUITE
2430 and binaries_t_a[mypkg].architecture == "all"
2431 ):
2432 # we're very probably migrating a binNMU built in tpu where the arch:all
2433 # binaries were not copied to it as that's not needed. This policy could
2434 # needlessly block.
2435 continue
2437 v = self.check_upgrade(
2438 pkg_id_t, pkg_id_s, source_name, myarch, broken_binaries, excuse
2439 )
2440 verdict = PolicyVerdict.worst_of(verdict, v)
2442 # each arch is processed separately, so if we already have info from
2443 # other archs, we need to merge the info from this arch
2444 broken_old = set()
2445 if "implicit-deps" not in implicit_dep_info:
2446 implicit_dep_info["implicit-deps"] = {}
2447 else:
2448 broken_old = set(implicit_dep_info["implicit-deps"]["broken-binaries"])
2450 implicit_dep_info["implicit-deps"]["broken-binaries"] = sorted(
2451 broken_old | broken_binaries
2452 )
2454 return verdict
2457class ReverseRemovalPolicy(AbstractBasePolicy):
2458 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2459 super().__init__(
2460 "reverseremoval",
2461 options,
2462 suite_info,
2463 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
2464 )
2466 def register_hints(self, hint_parser: HintParser) -> None:
2467 hint_parser.register_hint_type(HintType("ignore-reverse-remove"))
2469 def initialise(self, britney: "Britney") -> None:
2470 super().initialise(britney)
2472 pkg_universe = britney.pkg_universe
2473 source_suites = britney.suite_info.source_suites
2474 target_suite = britney.suite_info.target_suite
2476 # Build set of the sources of reverse (Build-) Depends
2477 assert self.hints is not None
2478 hints = self.hints.search("remove")
2480 rev_bin: dict[BinaryPackageId, set[str]] = defaultdict(set)
2481 for hint in hints:
2482 for item in hint.packages:
2483 # I think we don't need to look at the target suite
2484 for src_suite in source_suites:
2485 try:
2486 # Explicitly not running filter_out_faux here
2487 my_bins = set(src_suite.sources[item.uvname].binaries)
2488 except KeyError:
2489 continue
2490 compute_reverse_tree(pkg_universe, my_bins)
2491 for this_bin in my_bins:
2492 rev_bin.setdefault(this_bin, set()).add(item.uvname)
2494 rev_src: dict[str, set[str]] = defaultdict(set)
2495 for bin_pkg, reasons in rev_bin.items():
2496 # If the pkg is in the target suite, there's nothing this
2497 # policy wants to do.
2498 if target_suite.is_pkg_in_the_suite(bin_pkg):
2499 continue
2500 that_bin = britney.all_binaries[bin_pkg]
2501 bin_src = that_bin.source + "/" + that_bin.source_version
2502 rev_src.setdefault(bin_src, set()).update(reasons)
2503 self._block_src_for_rm_hint = rev_src
2505 def apply_src_policy_impl(
2506 self,
2507 rev_remove_info: dict[str, Any],
2508 source_data_tdist: SourcePackage | None,
2509 source_data_srcdist: SourcePackage,
2510 excuse: "Excuse",
2511 ) -> PolicyVerdict:
2512 verdict = PolicyVerdict.PASS
2514 item = excuse.item
2515 if item.name in self._block_src_for_rm_hint:
2516 reason = ", ".join(sorted(self._block_src_for_rm_hint[item.name]))
2517 assert self.hints is not None
2518 ignore_hints = self.hints.search(
2519 "ignore-reverse-remove", package=item.uvname, version=item.version
2520 )
2521 excuse.addreason("reverseremoval")
2522 if ignore_hints:
2523 excuse.addreason("ignore-reverse-remove")
2524 excuse.addinfo(
2525 "Should block migration because of remove hint for %s, but forced by %s"
2526 % (reason, ignore_hints[0].user)
2527 )
2528 verdict = PolicyVerdict.PASS_HINTED
2529 else:
2530 excuse.addinfo("Remove hint for (transitive) dependency: %s" % reason)
2531 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2533 return verdict
2536class ReproduciblePolicy(AbstractBasePolicy):
2537 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2538 super().__init__(
2539 "reproducible",
2540 options,
2541 suite_info,
2542 {SuiteClass.PRIMARY_SOURCE_SUITE},
2543 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
2544 )
2545 self._reproducible: dict[str, Any] = {}
2547 # Default values for this policy's options
2548 parse_option(options, "repro_success_bounty", default=0, to_int=True)
2549 parse_option(options, "repro_regression_penalty", default=0, to_int=True)
2550 parse_option(options, "repro_log_url")
2551 parse_option(options, "repro_url")
2552 parse_option(options, "repro_retry_url")
2553 parse_option(options, "repro_components")
2555 def register_hints(self, hint_parser: HintParser) -> None:
2556 hint_parser.register_hint_type(
2557 HintType(
2558 "ignore-reproducible-src",
2559 versioned=HintAnnotate.OPTIONAL,
2560 architectured=HintAnnotate.OPTIONAL,
2561 )
2562 )
2563 hint_parser.register_hint_type(
2564 HintType(
2565 "ignore-reproducible",
2566 versioned=HintAnnotate.OPTIONAL,
2567 architectured=HintAnnotate.OPTIONAL,
2568 )
2569 )
2571 def initialise(self, britney: "Britney") -> None:
2572 super().initialise(britney)
2573 summary = self._reproducible
2575 assert hasattr(
2576 self, "state_dir"
2577 ), "Please set STATE_DIR in the britney configuration"
2578 assert (
2579 self.options.repro_components
2580 ), "Please set REPRO_COMPONENTS in the britney configuration"
2581 for file in os.listdir(self.state_dir):
2582 if not file.startswith("reproducible-"): 2582 ↛ 2583line 2582 didn't jump to line 2583 because the condition on line 2582 was never true
2583 continue
2584 filename = os.path.join(self.state_dir, file)
2586 self.logger.info("Loading reproducibility report from %s", filename)
2587 with open(filename) as fd: 2587 ↛ 2581line 2587 didn't jump to line 2581 because the continue on line 2589 wasn't executed
2588 if os.fstat(fd.fileno()).st_size < 1: 2588 ↛ 2589line 2588 didn't jump to line 2589 because the condition on line 2588 was never true
2589 continue
2590 data = json.load(fd)
2592 for result in data["records"]:
2593 if result["component"] in self.options.repro_components: 2593 ↛ 2592line 2593 didn't jump to line 2592 because the condition on line 2593 was always true
2594 summary.setdefault(result["architecture"], {})[
2595 (result["name"], result["version"])
2596 ] = result
2598 def _create_link_to_log(self, arch: str, failed_bpids: set[BinaryPackageId]) -> str:
2599 link = ""
2600 for bpid in sorted(failed_bpids):
2601 link += f"{bpid[0]}, "
2602 link = link.strip(", ") # remove end
2604 if self.options.repro_log_url: 2604 ↛ 2615line 2604 didn't jump to line 2615 because the condition on line 2604 was always true
2605 # log should be the same for all binaries, using the last bpid
2606 try:
2607 log = self._reproducible[arch][(bpid[0], bpid[1])]["build_id"]
2608 except KeyError:
2609 log = self._reproducible["all"][(bpid[0], bpid[1])]["build_id"]
2610 url_log = self.options.repro_log_url.format(
2611 package=quote(bpid[0]), arch=arch, log=log
2612 )
2613 return f': <a href="{url_log}">{link}</a>'
2614 else:
2615 return ": " + link
2617 def apply_srcarch_policy_impl(
2618 self,
2619 policy_info: dict[str, Any],
2620 arch: str,
2621 source_data_tdist: SourcePackage | None,
2622 source_data_srcdist: SourcePackage,
2623 excuse: "Excuse",
2624 ) -> PolicyVerdict:
2625 verdict = PolicyVerdict.PASS
2626 eligible_for_bounty = False
2628 # we don't want to apply this policy (yet) on binNMUs
2629 if excuse.item.architecture != "source": 2629 ↛ 2630line 2629 didn't jump to line 2630 because the condition on line 2629 was never true
2630 return verdict
2632 # we're not supposed to judge on this arch
2633 if arch not in self.options.repro_arches: 2633 ↛ 2634line 2633 didn't jump to line 2634 because the condition on line 2633 was never true
2634 return verdict
2636 # bail out if this arch has no packages for this source (not build
2637 # here)
2638 if arch not in excuse.packages: 2638 ↛ 2639line 2638 didn't jump to line 2639 because the condition on line 2638 was never true
2639 return verdict
2641 # horrible hard-coding, but currently, we don't keep track of the
2642 # component when loading the packages files
2643 component = "main"
2644 if "/" in (section := source_data_srcdist.section): 2644 ↛ 2645line 2644 didn't jump to line 2645 because the condition on line 2644 was never true
2645 component = section.split("/")[0]
2647 if ( 2647 ↛ 2651line 2647 didn't jump to line 2651
2648 self.options.repro_components
2649 and component not in self.options.repro_components.split()
2650 ):
2651 return verdict
2653 source_name = excuse.item.package
2655 if self.options.repro_url: 2655 ↛ 2656line 2655 didn't jump to line 2656 because the condition on line 2655 was never true
2656 url = self.options.repro_url.format(package=quote(source_name), arch=arch)
2657 url_html = ' - <a href="%s">info</a>' % url
2658 if self.options.repro_retry_url:
2659 url_html += (
2660 ' <a href="%s">♻ </a>'
2661 % self.options.repro_retry_url.format(
2662 package=quote(source_name), arch=arch
2663 )
2664 )
2665 # When run on multiple archs, the last one "wins"
2666 policy_info["status-url"] = url
2667 else:
2668 url = None
2669 url_html = ""
2671 try:
2672 repro = self._reproducible[arch].copy()
2673 except KeyError:
2674 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2675 msg = f"No reproducibility data available at all for {arch}"
2676 excuse.add_verdict_info(verdict, msg)
2677 return verdict
2678 try:
2679 repro |= self._reproducible["all"]
2680 except KeyError:
2681 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2682 msg = "No reproducibility data available at all for arch:all"
2683 excuse.add_verdict_info(verdict, msg)
2684 return verdict
2686 # skip/delay policy until both arch:arch and arch:all builds are done
2687 if (arch or "all") in excuse.missing_builds: 2687 ↛ 2688line 2687 didn't jump to line 2688 because the condition on line 2687 was never true
2688 self.logger.debug(
2689 f"{excuse.name} not built for {arch} or all, skipping reproducible policy",
2690 )
2691 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2692 excuse.add_verdict_info(
2693 verdict,
2694 f"Reproducibility check deferred on {arch}: missing builds",
2695 )
2696 return verdict
2698 source_suite_state = "not-unknown"
2699 failed_bpids: set[BinaryPackageId] = set()
2700 # The states should either be GOOD/BAD for all binaries, UNKNOWN for all
2701 # binaries, or missing for all binaries, but let's not assume that.
2702 for bpid in filter_out_faux(source_data_srcdist.binaries):
2703 if bpid[2] not in ("all", arch): 2703 ↛ 2704line 2703 didn't jump to line 2704 because the condition on line 2703 was never true
2704 continue
2705 try:
2706 state = repro[(bpid[0], bpid[1])]["status"]
2707 assert state in [
2708 "BAD",
2709 "FAIL",
2710 "GOOD",
2711 "UNKNOWN",
2712 None,
2713 ], f"Unexpected reproducible state {state}"
2714 self.logger.debug(f"repro data for {bpid}: {state}")
2715 if state == "BAD":
2716 failed_bpids.add(bpid)
2717 # not changing source_suite_state here on purpose
2718 elif state is None or state in ("FAIL", "UNKNOWN"): 2718 ↛ 2719line 2718 didn't jump to line 2719 because the condition on line 2718 was never true
2719 source_suite_state = "unknown"
2720 except KeyError:
2721 self.logger.debug(f"No repro data found for {bpid}")
2722 source_suite_state = "unknown"
2723 break
2725 if source_suite_state == "not-unknown":
2726 source_suite_state = "known"
2728 excuse_info = []
2729 if source_suite_state == "unknown":
2730 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2731 excuse_info.append(
2732 f"Reproducibility check waiting for results on {arch}{url_html}"
2733 )
2734 policy_info.setdefault("state", {}).setdefault(arch, "unavailable")
2735 elif failed_bpids:
2736 ignored_bpids: set[BinaryPackageId] = set()
2737 if source_data_tdist is None: 2737 ↛ 2738line 2737 didn't jump to line 2738 because the condition on line 2737 was never true
2738 target_suite_state = "new"
2739 else:
2740 target_suite_state = "reproducible"
2741 for bpid in list(failed_bpids):
2742 pkg_name = bpid[0]
2743 for bpid_t in filter_out_faux(source_data_tdist.binaries):
2744 if bpid_t[2] not in ("all", arch): 2744 ↛ 2745line 2744 didn't jump to line 2745 because the condition on line 2744 was never true
2745 continue
2746 if pkg_name != bpid_t[0]: 2746 ↛ 2747line 2746 didn't jump to line 2747 because the condition on line 2746 was never true
2747 continue
2748 try:
2749 state = repro[(pkg_name, bpid_t[1])]["status"]
2750 assert state in [
2751 "BAD",
2752 "FAIL",
2753 "GOOD",
2754 "UNKNOWN",
2755 None,
2756 ], f"Unexpected reproducible state {state}"
2757 self.logger.debug(
2758 f"testing repro data for {bpid_t}: {state}"
2759 )
2760 if state == "BAD":
2761 ignored_bpids.add(bpid)
2762 elif state is None or state in ("FAIL", "UNKNOWN"): 2762 ↛ 2763line 2762 didn't jump to line 2763 because the condition on line 2762 was never true
2763 target_suite_state = "unknown"
2764 except KeyError:
2765 self.logger.debug(
2766 f"No testing repro data found for {bpid_t}"
2767 )
2768 # This shouldn't happen as for the past migration
2769 # to have been allowed, there should be data.
2770 target_suite_state = "unknown"
2771 break
2773 # Reminder: code here is part of the non-reproducibile source-suite branch
2774 if target_suite_state == "new": 2774 ↛ 2775line 2774 didn't jump to line 2775 because the condition on line 2774 was never true
2775 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2776 excuse_info.append(
2777 f"New but not reproducible on {arch}{url_html}"
2778 f"{self._create_link_to_log(arch, failed_bpids)}"
2779 )
2780 policy_info.setdefault("state", {}).setdefault(
2781 arch, "new but not reproducible"
2782 )
2783 elif target_suite_state == "unknown": 2783 ↛ 2785line 2783 didn't jump to line 2785 because the condition on line 2783 was never true
2784 # Shouldn't happen after initial bootstrap once blocking
2785 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2786 excuse_info.append(
2787 f"Reproducibility check failed and now waiting for reference "
2788 f"results on {arch}{url_html}"
2789 )
2790 policy_info.setdefault("state", {}).setdefault(
2791 arch, "waiting for reference"
2792 )
2793 elif len(failed_bpids - ignored_bpids) == 0:
2794 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2795 excuse_info.append(
2796 f"Not reproducible on {arch} (not a regression)"
2797 f"{self._create_link_to_log(arch, failed_bpids)}"
2798 )
2799 policy_info.setdefault("state", {}).setdefault(arch, "not reproducible")
2800 else:
2801 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2802 excuse_info.append(
2803 f"Reproducibility regression on {arch}"
2804 f"{self._create_link_to_log(arch, failed_bpids - ignored_bpids)}"
2805 )
2806 policy_info.setdefault("state", {}).setdefault(arch, "regression")
2808 # non-reproducible source-suite cases are handled above, so here we
2809 # handle the last of the source-suite cases
2810 else:
2811 excuse_info.append(f"Reproducible on {arch}{url_html}")
2812 policy_info.setdefault("state", {}).setdefault(arch, "reproducible")
2813 eligible_for_bounty = True
2815 if verdict.is_rejected:
2816 assert self.hints is not None
2817 for hint_arch in ("source", arch):
2818 ignore_hints = self.hints.search(
2819 "ignore-reproducible-src",
2820 package=source_name,
2821 version=source_data_srcdist.version,
2822 architecture=hint_arch,
2823 )
2824 # one hint is enough, take the first one encountered
2825 if ignore_hints:
2826 verdict = PolicyVerdict.PASS_HINTED
2827 policy_info.setdefault("hints", {}).setdefault(arch, []).append(
2828 ignore_hints[0].user + ": " + str(ignore_hints[0])
2829 )
2830 if hint_arch == arch: 2830 ↛ 2833line 2830 didn't jump to line 2833 because the condition on line 2830 was always true
2831 on_arch = f" on {arch}"
2832 else:
2833 on_arch = ""
2834 excuse_info.append(
2835 f"Reproducibility issues ignored for src:{ignore_hints[0].package}"
2836 f"{on_arch} as requested by {ignore_hints[0].user}"
2837 )
2838 break
2840 if verdict.is_rejected:
2841 all_hints = []
2842 all_hinted = True
2843 any_hinted = False
2845 if source_suite_state == "known":
2846 check_bpids = failed_bpids
2847 else:
2848 # Let's not wait for results if all binaries have a hint
2849 check_bpids = filter_out_faux(source_data_srcdist.binaries)
2850 missed_bpids = set()
2852 for bpid in check_bpids:
2853 bpid_hints = self.hints.search(
2854 "ignore-reproducible",
2855 package=bpid[0],
2856 version=bpid[1],
2857 architecture=bpid[2],
2858 )
2859 if bpid_hints:
2860 # one hint per binary is enough
2861 all_hints.append(bpid_hints[0])
2862 any_hinted = True
2863 self.logger.debug(
2864 f"repro: hint found for {source_name}: {bpid}"
2865 )
2866 else:
2867 missed_bpids.add(bpid)
2868 all_hinted = False
2870 if all_hinted:
2871 verdict = PolicyVerdict.PASS_HINTED
2872 for hint in all_hints:
2873 policy_info.setdefault("hints", {}).setdefault(arch, []).append(
2874 hint.user + ": " + str(hint)
2875 )
2876 # TODO: we're going to print this for arch:all binaries on each arch
2877 excuse_info.append(
2878 f"Reproducibility issues ignored for {hint.package} on {arch} as "
2879 f"requested by {hint.user}"
2880 )
2881 elif any_hinted: 2881 ↛ 2882line 2881 didn't jump to line 2882 because the condition on line 2881 was never true
2882 self.logger.info(
2883 f"repro: binary hints for {source_name} ignored as they don't "
2884 f"cover these binaries {missed_bpids}"
2885 )
2887 if self.options.repro_success_bounty and eligible_for_bounty: 2887 ↛ 2888line 2887 didn't jump to line 2888 because the condition on line 2887 was never true
2888 excuse.add_bounty("reproducibility", self.options.repro_success_bounty)
2890 if verdict.is_rejected and self.options.repro_regression_penalty: 2890 ↛ 2892line 2890 didn't jump to line 2892 because the condition on line 2890 was never true
2891 # With a non-zero penalty, we shouldn't block on this policy
2892 verdict = PolicyVerdict.PASS
2893 if self.options.repro_regression_penalty > 0:
2894 excuse.add_penalty(
2895 "reproducibility", self.options.repro_regression_penalty
2896 )
2898 for msg in excuse_info:
2899 if verdict.is_rejected:
2900 excuse.add_verdict_info(verdict, msg)
2901 else:
2902 excuse.addinfo(msg)
2904 return verdict