Coverage for britney2/policies/policy.py: 91%
1328 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-04-19 18:02 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-04-19 18:02 +0000
1import json
2import logging
3import optparse
4import os
5import re
6import sys
7import time
8from abc import ABC, abstractmethod
9from collections import defaultdict
10from collections.abc import Callable, Container
11from enum import IntEnum, unique
12from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar, cast
13from urllib.parse import quote
15import apt_pkg
16import yaml
18from britney2 import (
19 BinaryPackage,
20 BinaryPackageId,
21 DependencyType,
22 PackageId,
23 SourcePackage,
24 Suite,
25 SuiteClass,
26 Suites,
27 TargetSuite,
28)
29from britney2.excusedeps import DependencySpec
30from britney2.hints import (
31 Hint,
32 HintAnnotate,
33 HintCollection,
34 HintParser,
35 HintType,
36 PolicyHintParserProto,
37)
38from britney2.inputs.suiteloader import SuiteContentLoader
39from britney2.migrationitem import MigrationItem, MigrationItemFactory
40from britney2.policies import ApplySrcPolicy, PolicyVerdict
41from britney2.utils import (
42 GetDependencySolversProto,
43 binaries_from_source_version,
44 compute_reverse_tree,
45 filter_out_faux,
46 find_newer_binaries,
47 get_dependency_solvers,
48 is_smooth_update_allowed,
49 parse_option,
50)
52if TYPE_CHECKING: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true
53 from ..britney import Britney
54 from ..excuse import Excuse
55 from ..installability.universe import BinaryPackageUniverse
58class PolicyLoadRequest:
59 __slots__ = ("_options_name", "_default_value", "_policy_constructor")
61 def __init__(
62 self,
63 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"],
64 options_name: str | None,
65 default_value: bool,
66 ) -> None:
67 self._policy_constructor = policy_constructor
68 self._options_name = options_name
69 self._default_value = default_value
71 def is_enabled(self, options: optparse.Values) -> bool:
72 if self._options_name is None:
73 assert self._default_value
74 return True
75 actual_value = getattr(options, self._options_name, None)
76 if actual_value is None:
77 return self._default_value
78 return actual_value.lower() in ("yes", "y", "true", "t")
80 def load(self, options: optparse.Values, suite_info: Suites) -> "BasePolicy":
81 return self._policy_constructor(options, suite_info)
83 @classmethod
84 def always_load(
85 cls, policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"]
86 ) -> "PolicyLoadRequest":
87 return cls(policy_constructor, None, True)
89 @classmethod
90 def conditionally_load(
91 cls,
92 policy_constructor: Callable[[optparse.Values, Suites], "BasePolicy"],
93 option_name: str,
94 default_value: bool,
95 ) -> "PolicyLoadRequest":
96 return cls(policy_constructor, option_name, default_value)
99class PolicyEngine:
100 def __init__(self) -> None:
101 self._policies: list["BasePolicy"] = []
103 def add_policy(self, policy: "BasePolicy") -> None:
104 self._policies.append(policy)
106 def load_policies(
107 self,
108 options: optparse.Values,
109 suite_info: Suites,
110 policy_load_requests: list[PolicyLoadRequest],
111 ) -> None:
112 for policy_load_request in policy_load_requests:
113 if policy_load_request.is_enabled(options):
114 self.add_policy(policy_load_request.load(options, suite_info))
116 def register_policy_hints(self, hint_parser: HintParser) -> None:
117 for policy in self._policies:
118 policy.register_hints(hint_parser)
120 def initialise(self, britney: "Britney", hints: HintCollection) -> None:
121 for policy in self._policies:
122 policy.hints = hints
123 policy.initialise(britney)
125 def save_state(self, britney: "Britney") -> None:
126 for policy in self._policies:
127 policy.save_state(britney)
129 def apply_src_policies(
130 self,
131 source_t: SourcePackage | None,
132 source_u: SourcePackage,
133 excuse: "Excuse",
134 ) -> None:
135 excuse_verdict = excuse.policy_verdict
136 source_suite = excuse.item.suite
137 suite_class = source_suite.suite_class
138 for policy in self._policies:
139 pinfo: dict[str, Any] = {}
140 policy_verdict = PolicyVerdict.NOT_APPLICABLE
141 if suite_class in policy.applicable_suites:
142 if policy.src_policy.run_arch:
143 for arch in policy.options.architectures:
144 v = policy.apply_srcarch_policy_impl(
145 pinfo, arch, source_t, source_u, excuse
146 )
147 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v)
148 if policy.src_policy.run_src:
149 v = policy.apply_src_policy_impl(pinfo, source_t, source_u, excuse)
150 policy_verdict = PolicyVerdict.worst_of(policy_verdict, v)
151 # The base policy provides this field, so the subclass should leave it blank
152 assert "verdict" not in pinfo
153 if policy_verdict != PolicyVerdict.NOT_APPLICABLE:
154 excuse.policy_info[policy.policy_id] = pinfo
155 pinfo["verdict"] = policy_verdict.name
156 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict)
157 excuse.policy_verdict = excuse_verdict
159 def apply_srcarch_policies(
160 self,
161 arch: str,
162 source_t: SourcePackage | None,
163 source_u: SourcePackage,
164 excuse: "Excuse",
165 ) -> None:
166 excuse_verdict = excuse.policy_verdict
167 source_suite = excuse.item.suite
168 suite_class = source_suite.suite_class
169 for policy in self._policies:
170 pinfo: dict[str, Any] = {}
171 if suite_class in policy.applicable_suites:
172 policy_verdict = policy.apply_srcarch_policy_impl(
173 pinfo, arch, source_t, source_u, excuse
174 )
175 excuse_verdict = PolicyVerdict.worst_of(policy_verdict, excuse_verdict)
176 # The base policy provides this field, so the subclass should leave it blank
177 assert "verdict" not in pinfo
178 if policy_verdict != PolicyVerdict.NOT_APPLICABLE:
179 excuse.policy_info[policy.policy_id] = pinfo
180 pinfo["verdict"] = policy_verdict.name
181 excuse.policy_verdict = excuse_verdict
184class BasePolicy(ABC):
185 britney: "Britney"
186 policy_id: str
187 hints: HintCollection | None
188 applicable_suites: set[SuiteClass]
189 src_policy: ApplySrcPolicy
190 options: optparse.Values
191 suite_info: Suites
193 def __init__(
194 self,
195 options: optparse.Values,
196 suite_info: Suites,
197 ) -> None:
198 """The BasePolicy constructor
200 :param options: The options member of Britney with all the
201 config values.
202 """
204 @property
205 @abstractmethod
206 def state_dir(self) -> str: ... 206 ↛ exitline 206 didn't return from function 'state_dir' because
208 def register_hints(self, hint_parser: HintParser) -> None: # pragma: no cover
209 """Register new hints that this policy accepts
211 :param hint_parser: (see HintParser.register_hint_type)
212 """
214 def initialise(self, britney: "Britney") -> None: # pragma: no cover
215 """Called once to make the policy initialise any data structures
217 This is useful for e.g. parsing files or other "heavy do-once" work.
219 :param britney: This is the instance of the "Britney" class.
220 """
221 self.britney = britney
223 def save_state(self, britney: "Britney") -> None: # pragma: no cover
224 """Called once at the end of the run to make the policy save any persistent data
226 Note this will *not* be called for "dry-runs" as such runs should not change
227 the state.
229 :param britney: This is the instance of the "Britney" class.
230 """
232 def apply_src_policy_impl(
233 self,
234 policy_info: dict[str, Any],
235 source_data_tdist: SourcePackage | None,
236 source_data_srcdist: SourcePackage,
237 excuse: "Excuse",
238 ) -> PolicyVerdict: # pragma: no cover
239 """Apply a policy on a given source migration
241 Britney will call this method on a given source package, when
242 Britney is considering to migrate it from the given source
243 suite to the target suite. The policy will then evaluate the
244 the migration and then return a verdict.
246 :param policy_info: A dictionary of all policy results. The
247 policy can add a value stored in a key related to its name.
248 (e.g. policy_info['age'] = {...}). This will go directly into
249 the "excuses.yaml" output.
251 :param source_data_tdist: Information about the source package
252 in the target distribution (e.g. "testing"). This is the
253 data structure in source_suite.sources[source_name]
255 :param source_data_srcdist: Information about the source
256 package in the source distribution (e.g. "unstable" or "tpu").
257 This is the data structure in target_suite.sources[source_name]
259 :return: A Policy Verdict (e.g. PolicyVerdict.PASS)
260 """
261 return PolicyVerdict.NOT_APPLICABLE
263 def apply_srcarch_policy_impl(
264 self,
265 policy_info: dict[str, Any],
266 arch: str,
267 source_data_tdist: SourcePackage | None,
268 source_data_srcdist: SourcePackage,
269 excuse: "Excuse",
270 ) -> PolicyVerdict:
271 """Apply a policy on a given binary migration
273 Britney will call this method on binaries from a given source package
274 on a given architecture, when Britney is considering to migrate them
275 from the given source suite to the target suite. The policy will then
276 evaluate the migration and then return a verdict.
278 :param policy_info: A dictionary of all policy results. The
279 policy can add a value stored in a key related to its name.
280 (e.g. policy_info['age'] = {...}). This will go directly into
281 the "excuses.yaml" output.
283 :param arch: The architecture the item is applied to. This is mostly
284 relevant for policies where src_policy is not ApplySrcPolicy.RUN_SRC
285 (as that is the only case where arch can differ from item.architecture)
287 :param source_data_tdist: Information about the source package
288 in the target distribution (e.g. "testing"). This is the
289 data structure in source_suite.sources[source_name]
291 :param source_data_srcdist: Information about the source
292 package in the source distribution (e.g. "unstable" or "tpu").
293 This is the data structure in target_suite.sources[source_name]
295 :return: A Policy Verdict (e.g. PolicyVerdict.PASS)
296 """
297 # if the policy doesn't implement this function, assume it's OK
298 return PolicyVerdict.NOT_APPLICABLE
301class AbstractBasePolicy(BasePolicy):
302 """
303 A shared abstract class for building BasePolicy objects.
305 tests/test_policy.py:initialize_policy() needs to be able to build BasePolicy
306 objects with just a two-item constructor, while all other uses of BasePolicy-
307 derived objects need the 5-item constructor. So AbstractBasePolicy was split
308 out to document this.
309 """
311 def __init__(
312 self,
313 policy_id: str,
314 options: optparse.Values,
315 suite_info: Suites,
316 applicable_suites: set[SuiteClass],
317 src_policy: ApplySrcPolicy = ApplySrcPolicy.RUN_SRC,
318 ) -> None:
319 """Concrete initializer.
321 :param policy_id: Identifies the policy. It will
322 determine the key used for the excuses.yaml etc.
324 :param options: The options member of Britney with all the
325 config values.
327 :param applicable_suites: Where this policy applies.
328 """
329 self.policy_id = policy_id
330 self.options = options
331 self.suite_info = suite_info
332 self.applicable_suites = applicable_suites
333 self.src_policy = src_policy
334 self.hints: HintCollection | None = None
335 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
336 self.logger = logging.getLogger(logger_name)
338 @property
339 def state_dir(self) -> str:
340 return cast(str, self.options.state_dir)
343_T = TypeVar("_T")
346class SimplePolicyHint(Hint, Generic[_T]):
347 def __init__(
348 self,
349 user: str,
350 hint_type: HintType,
351 policy_parameter: _T,
352 packages: list[MigrationItem],
353 ) -> None:
354 super().__init__(user, hint_type, packages)
355 self._policy_parameter = policy_parameter
357 def __eq__(self, other: Any) -> bool:
358 if self.type != other.type or self._policy_parameter != other._policy_parameter:
359 return False
360 return super().__eq__(other)
362 def str(self) -> str:
363 return "{} {} {}".format(
364 self._type,
365 str(self._policy_parameter),
366 " ".join(x.name for x in self._packages),
367 )
370class AgeDayHint(SimplePolicyHint[int]):
371 @property
372 def days(self) -> int:
373 return self._policy_parameter
376class IgnoreRCBugHint(SimplePolicyHint[frozenset[str]]):
377 @property
378 def ignored_rcbugs(self) -> frozenset[str]:
379 return self._policy_parameter
382def simple_policy_hint_parser_function(
383 class_name: Callable[[str, HintType, _T, list[MigrationItem]], Hint],
384 converter: Callable[[str], _T],
385) -> PolicyHintParserProto:
386 def f(
387 mi_factory: MigrationItemFactory,
388 hints: HintCollection,
389 who: str,
390 hint_type: HintType,
391 *args: str,
392 ) -> None:
393 policy_parameter = args[0]
394 args = args[1:]
395 for item in mi_factory.parse_items(*args):
396 hints.add_hint(
397 class_name(who, hint_type, converter(policy_parameter), [item])
398 )
400 return f
403class AgePolicy(AbstractBasePolicy):
404 """Configurable Aging policy for source migrations
406 The AgePolicy will let packages stay in the source suite for a pre-defined
407 amount of days before letting migrate (based on their urgency, if any).
409 The AgePolicy's decision is influenced by the following:
411 State files:
412 * ${STATE_DIR}/age-policy-urgencies: File containing urgencies for source
413 packages. Note that urgencies are "sticky" and the most "urgent" urgency
414 will be used (i.e. the one with lowest age-requirements).
415 - This file needs to be updated externally, if the policy should take
416 urgencies into consideration. If empty (or not updated), the policy
417 will simply use the default urgency (see the "Config" section below)
418 - In Debian, these values are taken from the .changes file, but that is
419 not a requirement for Britney.
420 * ${STATE_DIR}/age-policy-dates: File containing the age of all source
421 packages.
422 - The policy will automatically update this file.
423 Config:
424 * DEFAULT_URGENCY: Name of the urgency used for packages without an urgency
425 (or for unknown urgencies). Will also be used to set the "minimum"
426 aging requirements for packages not in the target suite.
427 * MINDAYS_<URGENCY>: The age-requirements in days for packages with the
428 given urgency.
429 - Commonly used urgencies are: low, medium, high, emergency, critical
430 Hints:
431 * urgent <source>/<version>: Disregard the age requirements for a given
432 source/version.
433 * age-days X <source>/<version>: Set the age requirements for a given
434 source/version to X days. Note that X can exceed the highest
435 age-requirement normally given.
437 """
439 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
440 super().__init__("age", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE})
441 self._min_days = self._generate_mindays_table()
442 self._min_days_default = 0
443 # britney's "day" begins at 7pm (we want aging to occur in the 22:00Z run and we run Britney 2-4 times a day)
444 # NB: _date_now is used in tests
445 time_now = time.time()
446 if hasattr(self.options, "fake_runtime"):
447 time_now = int(self.options.fake_runtime)
448 self.logger.info("overriding runtime with fake_runtime %d" % time_now)
450 self._date_now = int(((time_now / (60 * 60)) - 19) / 24)
451 self._dates: dict[str, tuple[str, int]] = {}
452 self._urgencies: dict[str, str] = {}
453 self._default_urgency: str = self.options.default_urgency
454 self._penalty_immune_urgencies: frozenset[str] = frozenset()
455 if hasattr(self.options, "no_penalties"):
456 self._penalty_immune_urgencies = frozenset(
457 x.strip() for x in self.options.no_penalties.split()
458 )
459 self._bounty_min_age: int | None = None # initialised later
461 def _generate_mindays_table(self) -> dict[str, int]:
462 mindays: dict[str, int] = {}
463 for k in dir(self.options):
464 if not k.startswith("mindays_"):
465 continue
466 v = getattr(self.options, k)
467 try:
468 as_days = int(v)
469 except ValueError:
470 raise ValueError(
471 "Unable to parse "
472 + k
473 + " as a number of days. Must be 0 or a positive integer"
474 )
475 if as_days < 0: 475 ↛ 476line 475 didn't jump to line 476 because the condition on line 475 was never true
476 raise ValueError(
477 "The value of " + k + " must be zero or a positive integer"
478 )
479 mindays[k.split("_")[1]] = as_days
480 return mindays
482 def register_hints(self, hint_parser: HintParser) -> None:
483 hint_parser.register_hint_type(
484 HintType(
485 "age-days",
486 simple_policy_hint_parser_function(AgeDayHint, int),
487 min_args=2,
488 )
489 )
490 hint_parser.register_hint_type(HintType("urgent"))
492 def initialise(self, britney: "Britney") -> None:
493 super().initialise(britney)
494 self._read_dates_file()
495 self._read_urgencies_file()
496 if self._default_urgency not in self._min_days: # pragma: no cover
497 raise ValueError(
498 "Missing age-requirement for default urgency (MINDAYS_%s)"
499 % self._default_urgency
500 )
501 self._min_days_default = self._min_days[self._default_urgency]
502 try:
503 self._bounty_min_age = int(self.options.bounty_min_age)
504 except ValueError: 504 ↛ 505line 504 didn't jump to line 505 because the exception caught by line 504 didn't happen
505 if self.options.bounty_min_age in self._min_days:
506 self._bounty_min_age = self._min_days[self.options.bounty_min_age]
507 else: # pragma: no cover
508 raise ValueError(
509 "Please fix BOUNTY_MIN_AGE in the britney configuration"
510 )
511 except AttributeError:
512 # The option wasn't defined in the configuration
513 self._bounty_min_age = 0
515 def save_state(self, britney: "Britney") -> None:
516 super().save_state(britney)
517 self._write_dates_file()
519 def apply_src_policy_impl(
520 self,
521 age_info: dict[str, Any],
522 source_data_tdist: SourcePackage | None,
523 source_data_srcdist: SourcePackage,
524 excuse: "Excuse",
525 ) -> PolicyVerdict:
526 # retrieve the urgency for the upload, ignoring it if this is a NEW package
527 # (not present in the target suite)
528 source_name = excuse.item.package
529 urgency = self._urgencies.get(source_name, self._default_urgency)
531 if urgency not in self._min_days: 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true
532 age_info["unknown-urgency"] = urgency
533 urgency = self._default_urgency
535 if not source_data_tdist:
536 if self._min_days[urgency] < self._min_days_default:
537 age_info["urgency-reduced"] = {
538 "from": urgency,
539 "to": self._default_urgency,
540 }
541 urgency = self._default_urgency
543 if source_name not in self._dates:
544 self._dates[source_name] = (source_data_srcdist.version, self._date_now)
545 elif self._dates[source_name][0] != source_data_srcdist.version:
546 self._dates[source_name] = (source_data_srcdist.version, self._date_now)
548 days_old = self._date_now - self._dates[source_name][1]
549 min_days = self._min_days[urgency]
550 for bounty in excuse.bounty:
551 if excuse.bounty[bounty]: 551 ↛ 550line 551 didn't jump to line 550 because the condition on line 551 was always true
552 self.logger.info(
553 "Applying bounty for %s granted by %s: %d days",
554 source_name,
555 bounty,
556 excuse.bounty[bounty],
557 )
558 excuse.addinfo(
559 "Required age reduced by %d days because of %s"
560 % (excuse.bounty[bounty], bounty)
561 )
562 assert excuse.bounty[bounty] > 0, "negative bounties shouldn't happen"
563 min_days -= excuse.bounty[bounty]
564 if urgency not in self._penalty_immune_urgencies:
565 for penalty in excuse.penalty:
566 if excuse.penalty[penalty]: 566 ↛ 565line 566 didn't jump to line 565 because the condition on line 566 was always true
567 self.logger.info(
568 "Applying penalty for %s given by %s: %d days",
569 source_name,
570 penalty,
571 excuse.penalty[penalty],
572 )
573 excuse.addinfo(
574 "Required age increased by %d days because of %s"
575 % (excuse.penalty[penalty], penalty)
576 )
577 assert (
578 excuse.penalty[penalty] > 0
579 ), "negative penalties should be handled earlier"
580 min_days += excuse.penalty[penalty]
582 assert self._bounty_min_age is not None
583 # the age in BOUNTY_MIN_AGE can be higher than the one associated with
584 # the real urgency, so don't forget to take it into account
585 bounty_min_age = min(self._bounty_min_age, self._min_days[urgency])
586 if min_days < bounty_min_age:
587 min_days = bounty_min_age
588 excuse.addinfo(
589 "Required age is not allowed to drop below %d days" % min_days
590 )
592 age_info["current-age"] = days_old
594 assert self.hints is not None
595 for age_days_hint in cast(
596 "list[AgeDayHint]",
597 self.hints.search(
598 "age-days", package=source_name, version=source_data_srcdist.version
599 ),
600 ):
601 new_req = age_days_hint.days
602 age_info["age-requirement-reduced"] = {
603 "new-requirement": new_req,
604 "changed-by": age_days_hint.user,
605 }
606 if "original-age-requirement" not in age_info: 606 ↛ 608line 606 didn't jump to line 608 because the condition on line 606 was always true
607 age_info["original-age-requirement"] = min_days
608 min_days = new_req
610 age_info["age-requirement"] = min_days
611 res = PolicyVerdict.PASS
613 if days_old < min_days:
614 urgent_hints = self.hints.search(
615 "urgent", package=source_name, version=source_data_srcdist.version
616 )
617 if urgent_hints:
618 age_info["age-requirement-reduced"] = {
619 "new-requirement": 0,
620 "changed-by": urgent_hints[0].user,
621 }
622 res = PolicyVerdict.PASS_HINTED
623 else:
624 res = PolicyVerdict.REJECTED_TEMPORARILY
626 # update excuse
627 age_hint = age_info.get("age-requirement-reduced", None)
628 age_min_req = age_info["age-requirement"]
629 if age_hint:
630 new_req = age_hint["new-requirement"]
631 who = age_hint["changed-by"]
632 if new_req:
633 excuse.addinfo(
634 "Overriding age needed from %d days to %d by %s"
635 % (age_min_req, new_req, who)
636 )
637 age_min_req = new_req
638 else:
639 excuse.addinfo("Too young, but urgency pushed by %s" % who)
640 age_min_req = 0
641 excuse.setdaysold(age_info["current-age"], age_min_req)
643 if age_min_req == 0:
644 excuse.addinfo("%d days old" % days_old)
645 elif days_old < age_min_req:
646 excuse.add_verdict_info(
647 res, "Too young, only %d of %d days old" % (days_old, age_min_req)
648 )
649 else:
650 excuse.addinfo("%d days old (needed %d days)" % (days_old, age_min_req))
652 return res
654 def _read_dates_file(self) -> None:
655 """Parse the dates file"""
656 dates = self._dates
657 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Dates")
658 using_new_name = False
659 try:
660 filename = os.path.join(self.state_dir, "age-policy-dates")
661 if not os.path.exists(filename) and os.path.exists(fallback_filename): 661 ↛ 662line 661 didn't jump to line 662 because the condition on line 661 was never true
662 filename = fallback_filename
663 else:
664 using_new_name = True
665 except AttributeError:
666 if os.path.exists(fallback_filename):
667 filename = fallback_filename
668 else:
669 raise RuntimeError("Please set STATE_DIR in the britney configuration")
671 try:
672 with open(filename, encoding="utf-8") as fd:
673 for line in fd:
674 if line.startswith("#"):
675 # Ignore comment lines (mostly used for tests)
676 continue
677 # <source> <version> <date>)
678 ln = line.split()
679 if len(ln) != 3: # pragma: no cover
680 continue
681 try:
682 dates[ln[0]] = (ln[1], int(ln[2]))
683 except ValueError: # pragma: no cover
684 pass
685 except FileNotFoundError:
686 if not using_new_name: 686 ↛ 688line 686 didn't jump to line 688 because the condition on line 686 was never true
687 # If we using the legacy name, then just give up
688 raise
689 self.logger.info("%s does not appear to exist. Creating it", filename)
690 with open(filename, mode="x", encoding="utf-8"):
691 pass
693 def _read_urgencies_file(self) -> None:
694 urgencies = self._urgencies
695 min_days_default = self._min_days_default
696 fallback_filename = os.path.join(self.suite_info.target_suite.path, "Urgency")
697 try:
698 filename = os.path.join(self.state_dir, "age-policy-urgencies")
699 if not os.path.exists(filename) and os.path.exists(fallback_filename): 699 ↛ 700line 699 didn't jump to line 700 because the condition on line 699 was never true
700 filename = fallback_filename
701 except AttributeError:
702 filename = fallback_filename
704 sources_s = self.suite_info.primary_source_suite.sources
705 sources_t = self.suite_info.target_suite.sources
707 with open(filename, errors="surrogateescape", encoding="ascii") as fd:
708 for line in fd:
709 if line.startswith("#"):
710 # Ignore comment lines (mostly used for tests)
711 continue
712 # <source> <version> <urgency>
713 ln = line.split()
714 if len(ln) != 3: 714 ↛ 715line 714 didn't jump to line 715 because the condition on line 714 was never true
715 continue
717 # read the minimum days associated with the urgencies
718 urgency_old = urgencies.get(ln[0], None)
719 mindays_old = self._min_days.get(urgency_old, 1000) # type: ignore[arg-type]
720 mindays_new = self._min_days.get(ln[2], min_days_default)
722 # if the new urgency is lower (so the min days are higher), do nothing
723 if mindays_old <= mindays_new:
724 continue
726 # if the package exists in the target suite and it is more recent, do nothing
727 tsrcv = sources_t.get(ln[0], None)
728 if tsrcv and apt_pkg.version_compare(tsrcv.version, ln[1]) >= 0:
729 continue
731 # if the package doesn't exist in the primary source suite or it is older, do nothing
732 usrcv = sources_s.get(ln[0], None)
733 if not usrcv or apt_pkg.version_compare(usrcv.version, ln[1]) < 0: 733 ↛ 734line 733 didn't jump to line 734 because the condition on line 733 was never true
734 continue
736 # update the urgency for the package
737 urgencies[ln[0]] = ln[2]
739 def _write_dates_file(self) -> None:
740 dates = self._dates
741 try:
742 directory = self.state_dir
743 basename = "age-policy-dates"
744 old_file = os.path.join(self.suite_info.target_suite.path, "Dates")
745 except AttributeError:
746 directory = self.suite_info.target_suite.path
747 basename = "Dates"
748 old_file = None
749 filename = os.path.join(directory, basename)
750 filename_tmp = os.path.join(directory, "%s_new" % basename)
751 with open(filename_tmp, "w", encoding="utf-8") as fd:
752 for pkg in sorted(dates):
753 version, date = dates[pkg]
754 fd.write("%s %s %d\n" % (pkg, version, date))
755 os.rename(filename_tmp, filename)
756 if old_file is not None and os.path.exists(old_file): 756 ↛ 757line 756 didn't jump to line 757 because the condition on line 756 was never true
757 self.logger.info("Removing old age-policy-dates file %s", old_file)
758 os.unlink(old_file)
761class RCBugPolicy(AbstractBasePolicy):
762 """RC bug regression policy for source migrations
764 The RCBugPolicy will read provided list of RC bugs and block any
765 source upload that would introduce a *new* RC bug in the target
766 suite.
768 The RCBugPolicy's decision is influenced by the following:
770 State files:
771 * ${STATE_DIR}/rc-bugs-${SUITE_NAME}: File containing RC bugs for packages in
772 the given suite (one for both primary source suite and the target sutie is
773 needed).
774 - These files need to be updated externally.
775 """
777 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
778 super().__init__(
779 "rc-bugs", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
780 )
781 self._bugs_source: dict[str, set[str]] | None = None
782 self._bugs_target: dict[str, set[str]] | None = None
784 def register_hints(self, hint_parser: HintParser) -> None:
785 f = simple_policy_hint_parser_function(
786 IgnoreRCBugHint, lambda x: frozenset(x.split(","))
787 )
788 hint_parser.register_hint_type(HintType("ignore-rc-bugs", f, min_args=2))
790 def initialise(self, britney: "Britney") -> None:
791 super().initialise(britney)
792 source_suite = self.suite_info.primary_source_suite
793 target_suite = self.suite_info.target_suite
794 fallback_unstable = os.path.join(source_suite.path, "BugsV")
795 fallback_testing = os.path.join(target_suite.path, "BugsV")
796 try:
797 filename_unstable = os.path.join(
798 self.state_dir, "rc-bugs-%s" % source_suite.name
799 )
800 filename_testing = os.path.join(
801 self.state_dir, "rc-bugs-%s" % target_suite.name
802 )
803 if ( 803 ↛ 809line 803 didn't jump to line 809
804 not os.path.exists(filename_unstable)
805 and not os.path.exists(filename_testing)
806 and os.path.exists(fallback_unstable)
807 and os.path.exists(fallback_testing)
808 ):
809 filename_unstable = fallback_unstable
810 filename_testing = fallback_testing
811 except AttributeError:
812 filename_unstable = fallback_unstable
813 filename_testing = fallback_testing
814 self._bugs_source = self._read_bugs(filename_unstable)
815 self._bugs_target = self._read_bugs(filename_testing)
817 def apply_src_policy_impl(
818 self,
819 rcbugs_info: dict[str, Any],
820 source_data_tdist: SourcePackage | None,
821 source_data_srcdist: SourcePackage,
822 excuse: "Excuse",
823 ) -> PolicyVerdict:
824 assert self._bugs_source is not None # for type checking
825 assert self._bugs_target is not None # for type checking
826 bugs_t = set()
827 bugs_s = set()
828 source_name = excuse.item.package
829 binaries_s = {x[0] for x in source_data_srcdist.binaries}
830 try:
831 binaries_t = {x[0] for x in source_data_tdist.binaries} # type: ignore[union-attr]
832 except AttributeError:
833 binaries_t = set()
835 src_key = f"src:{source_name}"
836 if source_data_tdist and src_key in self._bugs_target:
837 bugs_t.update(self._bugs_target[src_key])
838 if src_key in self._bugs_source:
839 bugs_s.update(self._bugs_source[src_key])
841 for pkg in binaries_s:
842 if pkg in self._bugs_source:
843 bugs_s |= self._bugs_source[pkg]
844 for pkg in binaries_t:
845 if pkg in self._bugs_target:
846 bugs_t |= self._bugs_target[pkg]
848 # The bts seems to support filing source bugs against a binary of the
849 # same name if that binary isn't built by any source. An example is bug
850 # 820347 against Package: juce (in the live-2016-04-11 test). Add those
851 # bugs too.
852 if (
853 source_name not in (binaries_s | binaries_t)
854 and source_name
855 not in {
856 x.package_name
857 for x in self.suite_info.primary_source_suite.all_binaries_in_suite.keys()
858 }
859 and source_name
860 not in {
861 x.package_name
862 for x in self.suite_info.target_suite.all_binaries_in_suite.keys()
863 }
864 ):
865 if source_name in self._bugs_source:
866 bugs_s |= self._bugs_source[source_name]
867 if source_name in self._bugs_target: 867 ↛ 868line 867 didn't jump to line 868 because the condition on line 867 was never true
868 bugs_t |= self._bugs_target[source_name]
870 # If a package is not in the target suite, it has no RC bugs per
871 # definition. Unfortunately, it seems that the live-data is
872 # not always accurate (e.g. live-2011-12-13 suggests that
873 # obdgpslogger had the same bug in testing and unstable,
874 # but obdgpslogger was not in testing at that time).
875 # - For the curious, obdgpslogger was removed on that day
876 # and the BTS probably had not caught up with that fact.
877 # (https://tracker.debian.org/news/415935)
878 assert not bugs_t or source_data_tdist, (
879 "%s had bugs in the target suite but is not present" % source_name
880 )
882 verdict = PolicyVerdict.PASS
884 assert self.hints is not None
885 for ignore_hint in cast(
886 list[IgnoreRCBugHint],
887 self.hints.search(
888 "ignore-rc-bugs",
889 package=source_name,
890 version=source_data_srcdist.version,
891 ),
892 ):
893 ignored_bugs = ignore_hint.ignored_rcbugs
895 # Only handle one hint for now
896 if "ignored-bugs" in rcbugs_info:
897 self.logger.info(
898 "Ignoring ignore-rc-bugs hint from %s on %s due to another hint from %s",
899 ignore_hint.user,
900 source_name,
901 rcbugs_info["ignored-bugs"]["issued-by"],
902 )
903 continue
904 if not ignored_bugs.isdisjoint(bugs_s): 904 ↛ 913line 904 didn't jump to line 913 because the condition on line 904 was always true
905 bugs_s -= ignored_bugs
906 bugs_t -= ignored_bugs
907 rcbugs_info["ignored-bugs"] = {
908 "bugs": sorted(ignored_bugs),
909 "issued-by": ignore_hint.user,
910 }
911 verdict = PolicyVerdict.PASS_HINTED
912 else:
913 self.logger.info(
914 "Ignoring ignore-rc-bugs hint from %s on %s as none of %s affect the package",
915 ignore_hint.user,
916 source_name,
917 str(ignored_bugs),
918 )
920 rcbugs_info["shared-bugs"] = sorted(bugs_s & bugs_t)
921 rcbugs_info["unique-source-bugs"] = sorted(bugs_s - bugs_t)
922 rcbugs_info["unique-target-bugs"] = sorted(bugs_t - bugs_s)
924 # update excuse
925 new_bugs = rcbugs_info["unique-source-bugs"]
926 old_bugs = rcbugs_info["unique-target-bugs"]
927 excuse.setbugs(old_bugs, new_bugs)
929 if new_bugs:
930 verdict = PolicyVerdict.REJECTED_PERMANENTLY
931 excuse.add_verdict_info(
932 verdict,
933 "Updating %s would introduce bugs in %s: %s"
934 % (
935 source_name,
936 self.suite_info.target_suite.name,
937 ", ".join(
938 [
939 '<a href="https://bugs.debian.org/%s">#%s</a>'
940 % (quote(a), a)
941 for a in new_bugs
942 ]
943 ),
944 ),
945 )
947 if old_bugs:
948 excuse.addinfo(
949 "Updating %s will fix bugs in %s: %s"
950 % (
951 source_name,
952 self.suite_info.target_suite.name,
953 ", ".join(
954 [
955 '<a href="https://bugs.debian.org/%s">#%s</a>'
956 % (quote(a), a)
957 for a in old_bugs
958 ]
959 ),
960 )
961 )
963 return verdict
965 def _read_bugs(self, filename: str) -> dict[str, set[str]]:
966 """Read the release critical bug summary from the specified file
968 The file contains rows with the format:
970 <package-name> <bug number>[,<bug number>...]
972 The method returns a dictionary where the key is the binary package
973 name and the value is the list of open RC bugs for it.
974 """
975 bugs: dict[str, set[str]] = {}
976 self.logger.info("Loading RC bugs data from %s", filename)
977 with open(filename, encoding="ascii") as f:
978 for line in f:
979 ln = line.split()
980 if len(ln) != 2: # pragma: no cover
981 self.logger.warning("Malformed line found in line %s", line)
982 continue
983 pkg = ln[0]
984 if pkg not in bugs:
985 bugs[pkg] = set()
986 bugs[pkg].update(ln[1].split(","))
987 return bugs
990class PiupartsPolicy(AbstractBasePolicy):
991 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
992 super().__init__(
993 "piuparts", options, suite_info, {SuiteClass.PRIMARY_SOURCE_SUITE}
994 )
995 self._piuparts_source: dict[str, tuple[str, str]] | None = None
996 self._piuparts_target: dict[str, tuple[str, str]] | None = None
998 def register_hints(self, hint_parser: HintParser) -> None:
999 hint_parser.register_hint_type(HintType("ignore-piuparts"))
1001 def initialise(self, britney: "Britney") -> None:
1002 super().initialise(britney)
1003 source_suite = self.suite_info.primary_source_suite
1004 target_suite = self.suite_info.target_suite
1005 try:
1006 filename_unstable = os.path.join(
1007 self.state_dir, "piuparts-summary-%s.json" % source_suite.name
1008 )
1009 filename_testing = os.path.join(
1010 self.state_dir, "piuparts-summary-%s.json" % target_suite.name
1011 )
1012 except AttributeError as e: # pragma: no cover
1013 raise RuntimeError(
1014 "Please set STATE_DIR in the britney configuration"
1015 ) from e
1016 self._piuparts_source = self._read_piuparts_summary(
1017 filename_unstable, keep_url=True
1018 )
1019 self._piuparts_target = self._read_piuparts_summary(
1020 filename_testing, keep_url=False
1021 )
1023 def apply_src_policy_impl(
1024 self,
1025 piuparts_info: dict[str, Any],
1026 source_data_tdist: SourcePackage | None,
1027 source_data_srcdist: SourcePackage,
1028 excuse: "Excuse",
1029 ) -> PolicyVerdict:
1030 assert self._piuparts_source is not None # for type checking
1031 assert self._piuparts_target is not None # for type checking
1032 source_name = excuse.item.package
1034 if source_name in self._piuparts_target:
1035 testing_state = self._piuparts_target[source_name][0]
1036 else:
1037 testing_state = "X"
1038 url: str | None
1039 if source_name in self._piuparts_source:
1040 unstable_state, url = self._piuparts_source[source_name]
1041 else:
1042 unstable_state = "X"
1043 url = None
1044 url_html = "(no link yet)"
1045 if url is not None:
1046 url_html = '<a href="{0}">{0}</a>'.format(url)
1048 match unstable_state:
1049 case "P":
1050 # Not a regression
1051 msg = f"Piuparts tested OK - {url_html}"
1052 result = PolicyVerdict.PASS
1053 piuparts_info["test-results"] = "pass"
1054 case "F" if testing_state != "F":
1055 piuparts_info["test-results"] = "regression"
1056 msg = f"Piuparts regression - {url_html}"
1057 result = PolicyVerdict.REJECTED_PERMANENTLY
1058 case "F":
1059 piuparts_info["test-results"] = "failed"
1060 msg = f"Piuparts failure (not a regression) - {url_html}"
1061 result = PolicyVerdict.PASS
1062 case "W":
1063 msg = f"Piuparts check waiting for test results - {url_html}"
1064 result = PolicyVerdict.REJECTED_TEMPORARILY
1065 piuparts_info["test-results"] = "waiting-for-test-results"
1066 case _:
1067 msg = f"Piuparts can't test {source_name} (not a blocker) - {url_html}"
1068 piuparts_info["test-results"] = "cannot-be-tested"
1069 result = PolicyVerdict.PASS
1071 if url is not None:
1072 piuparts_info["piuparts-test-url"] = url
1073 if result.is_rejected:
1074 excuse.add_verdict_info(result, msg)
1075 else:
1076 excuse.addinfo(msg)
1078 if result.is_rejected:
1079 assert self.hints is not None
1080 for ignore_hint in self.hints.search(
1081 "ignore-piuparts",
1082 package=source_name,
1083 version=source_data_srcdist.version,
1084 ):
1085 piuparts_info["ignored-piuparts"] = {"issued-by": ignore_hint.user}
1086 result = PolicyVerdict.PASS_HINTED
1087 excuse.addinfo(
1088 f"Piuparts issue ignored as requested by {ignore_hint.user}"
1089 )
1090 break
1092 return result
1094 def _read_piuparts_summary(
1095 self, filename: str, keep_url: bool = True
1096 ) -> dict[str, tuple[str, str]]:
1097 summary: dict[str, tuple[str, str]] = {}
1098 self.logger.info("Loading piuparts report from %s", filename)
1099 with open(filename) as fd: 1099 ↛ exitline 1099 didn't return from function '_read_piuparts_summary' because the return on line 1101 wasn't executed
1100 if os.fstat(fd.fileno()).st_size < 1: 1100 ↛ 1101line 1100 didn't jump to line 1101 because the condition on line 1100 was never true
1101 return summary
1102 data = json.load(fd)
1103 try:
1104 if (
1105 data["_id"] != "Piuparts Package Test Results Summary"
1106 or data["_version"] != "1.0"
1107 ): # pragma: no cover
1108 raise ValueError(
1109 f"Piuparts results in {filename} does not have the correct ID or version"
1110 )
1111 except KeyError as e: # pragma: no cover
1112 raise ValueError(
1113 f"Piuparts results in {filename} is missing id or version field"
1114 ) from e
1115 for source, suite_data in data["packages"].items():
1116 if len(suite_data) != 1: # pragma: no cover
1117 raise ValueError(
1118 f"Piuparts results in {filename}, the source {source} does not have "
1119 "exactly one result set"
1120 )
1121 item = next(iter(suite_data.values()))
1122 state, _, url = item
1123 if not keep_url:
1124 url = None
1125 summary[source] = (state, url)
1127 return summary
1130class DependsPolicy(AbstractBasePolicy):
1131 pkg_universe: "BinaryPackageUniverse"
1132 broken_packages: frozenset["BinaryPackageId"]
1133 all_binaries: dict["BinaryPackageId", "BinaryPackage"]
1134 allow_uninst: dict[str, set[str | None]]
1136 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1137 super().__init__(
1138 "depends",
1139 options,
1140 suite_info,
1141 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1142 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1143 )
1144 self.nobreakall_arches = None
1145 self.new_arches = None
1146 self.break_arches = None
1148 def initialise(self, britney: "Britney") -> None:
1149 super().initialise(britney)
1150 self.pkg_universe = britney.pkg_universe
1151 self.broken_packages = self.pkg_universe.broken_packages
1152 self.all_binaries = britney.all_binaries
1153 self.nobreakall_arches = self.options.nobreakall_arches
1154 self.new_arches = self.options.new_arches
1155 self.break_arches = self.options.break_arches
1156 self.allow_uninst = britney.allow_uninst
1158 def apply_srcarch_policy_impl(
1159 self,
1160 deps_info: dict[str, Any],
1161 arch: str,
1162 source_data_tdist: SourcePackage | None,
1163 source_data_srcdist: SourcePackage,
1164 excuse: "Excuse",
1165 ) -> PolicyVerdict:
1166 verdict = PolicyVerdict.PASS
1168 assert self.break_arches is not None
1169 assert self.new_arches is not None
1170 if arch in self.break_arches or arch in self.new_arches:
1171 # we don't check these in the policy (TODO - for now?)
1172 return verdict
1174 item = excuse.item
1175 source_suite = item.suite
1176 target_suite = self.suite_info.target_suite
1178 packages_s_a = source_suite.binaries[arch]
1179 packages_t_a = target_suite.binaries[arch]
1181 my_bins = sorted(filter_out_faux(excuse.packages[arch]))
1183 arch_all_installable = set()
1184 arch_arch_installable = set()
1185 consider_it_regression = True
1187 for pkg_id in my_bins:
1188 pkg_name = pkg_id.package_name
1189 binary_u = packages_s_a[pkg_name]
1190 pkg_arch = binary_u.architecture
1192 # in some cases, we want to track the uninstallability of a
1193 # package (because the autopkgtest policy uses this), but we still
1194 # want to allow the package to be uninstallable
1195 skip_dep_check = False
1197 if binary_u.source_version != source_data_srcdist.version:
1198 # don't check cruft in unstable
1199 continue
1201 if item.architecture != "source" and pkg_arch == "all":
1202 # we don't care about the existing arch: all binaries when
1203 # checking a binNMU item, because the arch: all binaries won't
1204 # migrate anyway
1205 skip_dep_check = True
1207 if pkg_arch == "all" and arch not in self.nobreakall_arches:
1208 skip_dep_check = True
1210 if pkg_name in self.allow_uninst[arch]: 1210 ↛ 1213line 1210 didn't jump to line 1213 because the condition on line 1210 was never true
1211 # this binary is allowed to become uninstallable, so we don't
1212 # need to check anything
1213 skip_dep_check = True
1215 if pkg_name in packages_t_a:
1216 oldbin = packages_t_a[pkg_name]
1217 if not target_suite.is_installable(oldbin.pkg_id):
1218 # as the current binary in testing is already
1219 # uninstallable, the newer version is allowed to be
1220 # uninstallable as well, so we don't need to check
1221 # anything
1222 skip_dep_check = True
1223 consider_it_regression = False
1225 if pkg_id in self.broken_packages:
1226 if pkg_arch == "all":
1227 arch_all_installable.add(False)
1228 else:
1229 arch_arch_installable.add(False)
1230 # dependencies can't be satisfied by all the known binaries -
1231 # this certainly won't work...
1232 excuse.add_unsatisfiable_on_arch(arch)
1233 if skip_dep_check:
1234 # ...but if the binary is allowed to become uninstallable,
1235 # we don't care
1236 # we still want the binary to be listed as uninstallable,
1237 continue
1238 verdict = PolicyVerdict.REJECTED_PERMANENTLY
1239 if pkg_name.endswith("-faux-build-depends"): 1239 ↛ 1240line 1239 didn't jump to line 1240 because the condition on line 1239 was never true
1240 name = pkg_name.replace("-faux-build-depends", "")
1241 excuse.add_verdict_info(
1242 verdict,
1243 f"src:{name} has unsatisfiable build dependency",
1244 )
1245 else:
1246 excuse.add_verdict_info(
1247 verdict, f"{pkg_name}/{arch} has unsatisfiable dependency"
1248 )
1249 excuse.addreason("depends")
1250 else:
1251 if pkg_arch == "all":
1252 arch_all_installable.add(True)
1253 else:
1254 arch_arch_installable.add(True)
1256 if skip_dep_check:
1257 continue
1259 deps = self.pkg_universe.dependencies_of(pkg_id)
1261 for dep in deps:
1262 # dep is a list of packages, each of which satisfy the
1263 # dependency
1265 if dep == frozenset():
1266 continue
1267 is_ok = False
1268 needed_for_dep = set()
1270 for alternative in dep:
1271 if target_suite.is_pkg_in_the_suite(alternative):
1272 # dep can be satisfied in testing - ok
1273 is_ok = True
1274 elif alternative in my_bins:
1275 # can be satisfied by binary from same item: will be
1276 # ok if item migrates
1277 is_ok = True
1278 else:
1279 needed_for_dep.add(alternative)
1281 if not is_ok:
1282 spec = DependencySpec(DependencyType.DEPENDS, arch)
1283 excuse.add_package_depends(spec, needed_for_dep)
1285 # The autopkgtest policy needs delicate trade offs for
1286 # non-installability. The current choice (considering source
1287 # migration and only binaries built by the version of the
1288 # source):
1289 #
1290 # * Run autopkgtest if all arch:$arch binaries are installable
1291 # (but some or all arch:all binaries are not)
1292 #
1293 # * Don't schedule nor wait for not installable arch:all only package
1294 # on ! NOBREAKALL_ARCHES
1295 #
1296 # * Run autopkgtest if installability isn't a regression (there are (or
1297 # rather, should) not be a lot of packages in this state, and most
1298 # likely they'll just fail quickly)
1299 #
1300 # * Don't schedule, but wait otherwise
1301 if arch_arch_installable == {True} and False in arch_all_installable:
1302 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch)
1303 elif (
1304 arch not in self.nobreakall_arches
1305 and arch_arch_installable == set()
1306 and False in arch_all_installable
1307 ):
1308 deps_info.setdefault("arch_all_not_installable", []).append(arch)
1309 elif not consider_it_regression:
1310 deps_info.setdefault("autopkgtest_run_anyways", []).append(arch)
1312 return verdict
1315@unique
1316class BuildDepResult(IntEnum):
1317 # relation is satisfied in target
1318 OK = 1
1319 # relation can be satisfied by other packages in source
1320 DEPENDS = 2
1321 # relation cannot be satisfied
1322 FAILED = 3
1325class BuildDependsPolicy(AbstractBasePolicy):
1327 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1328 super().__init__(
1329 "build-depends",
1330 options,
1331 suite_info,
1332 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1333 )
1334 self._all_buildarch: list[str] = []
1336 parse_option(options, "all_buildarch")
1338 def initialise(self, britney: "Britney") -> None:
1339 super().initialise(britney)
1340 if self.options.all_buildarch:
1341 self._all_buildarch = SuiteContentLoader.config_str_as_list(
1342 self.options.all_buildarch, []
1343 )
1345 def apply_src_policy_impl(
1346 self,
1347 build_deps_info: dict[str, Any],
1348 source_data_tdist: SourcePackage | None,
1349 source_data_srcdist: SourcePackage,
1350 excuse: "Excuse",
1351 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers,
1352 ) -> PolicyVerdict:
1353 verdict = PolicyVerdict.PASS
1355 # analyze the dependency fields (if present)
1356 if deps := source_data_srcdist.build_deps_arch:
1357 v = self._check_build_deps(
1358 deps,
1359 DependencyType.BUILD_DEPENDS,
1360 build_deps_info,
1361 source_data_tdist,
1362 source_data_srcdist,
1363 excuse,
1364 get_dependency_solvers=get_dependency_solvers,
1365 )
1366 verdict = PolicyVerdict.worst_of(verdict, v)
1368 if ideps := source_data_srcdist.build_deps_indep:
1369 v = self._check_build_deps(
1370 ideps,
1371 DependencyType.BUILD_DEPENDS_INDEP,
1372 build_deps_info,
1373 source_data_tdist,
1374 source_data_srcdist,
1375 excuse,
1376 get_dependency_solvers=get_dependency_solvers,
1377 )
1378 verdict = PolicyVerdict.worst_of(verdict, v)
1380 return verdict
1382 def _get_check_archs(
1383 self, archs: Container[str], dep_type: DependencyType
1384 ) -> list[str]:
1385 oos = self.options.outofsync_arches
1387 if dep_type == DependencyType.BUILD_DEPENDS:
1388 return [
1389 arch
1390 for arch in self.options.architectures
1391 if arch in archs and arch not in oos
1392 ]
1394 # first try the all buildarch
1395 checkarchs = list(self._all_buildarch)
1396 # then try the architectures where this source has arch specific
1397 # binaries (in the order of the architecture config file)
1398 checkarchs.extend(
1399 arch
1400 for arch in self.options.architectures
1401 if arch in archs and arch not in checkarchs
1402 )
1403 # then try all other architectures
1404 checkarchs.extend(
1405 arch for arch in self.options.architectures if arch not in checkarchs
1406 )
1408 # and drop OUTOFSYNC_ARCHES
1409 return [arch for arch in checkarchs if arch not in oos]
1411 def _add_info_for_arch(
1412 self,
1413 arch: str,
1414 excuses_info: dict[str, list[str]],
1415 blockers: dict[str, set[BinaryPackageId]],
1416 results: dict[str, BuildDepResult],
1417 dep_type: DependencyType,
1418 target_suite: TargetSuite,
1419 source_suite: Suite,
1420 excuse: "Excuse",
1421 verdict: PolicyVerdict,
1422 ) -> PolicyVerdict:
1423 if arch in blockers:
1424 packages = blockers[arch]
1426 # for the solving packages, update the excuse to add the dependencies
1427 for p in packages:
1428 if arch not in self.options.break_arches: 1428 ↛ 1427line 1428 didn't jump to line 1427 because the condition on line 1428 was always true
1429 spec = DependencySpec(dep_type, arch)
1430 excuse.add_package_depends(spec, {p})
1432 if arch in results and results[arch] == BuildDepResult.FAILED:
1433 verdict = PolicyVerdict.worst_of(
1434 verdict, PolicyVerdict.REJECTED_PERMANENTLY
1435 )
1437 if arch in excuses_info:
1438 for excuse_text in excuses_info[arch]:
1439 if verdict.is_rejected: 1439 ↛ 1442line 1439 didn't jump to line 1442 because the condition on line 1439 was always true
1440 excuse.add_verdict_info(verdict, excuse_text)
1441 else:
1442 excuse.addinfo(excuse_text)
1444 return verdict
1446 def _check_build_deps(
1447 self,
1448 deps: str,
1449 dep_type: DependencyType,
1450 build_deps_info: dict[str, Any],
1451 source_data_tdist: SourcePackage | None,
1452 source_data_srcdist: SourcePackage,
1453 excuse: "Excuse",
1454 get_dependency_solvers: GetDependencySolversProto = get_dependency_solvers,
1455 ) -> PolicyVerdict:
1456 verdict = PolicyVerdict.PASS
1457 any_arch_ok = dep_type == DependencyType.BUILD_DEPENDS_INDEP
1459 britney = self.britney
1461 # local copies for better performance
1462 parse_src_depends = apt_pkg.parse_src_depends
1464 source_name = excuse.item.package
1465 source_suite = excuse.item.suite
1466 target_suite = self.suite_info.target_suite
1467 binaries_s = source_suite.binaries
1468 provides_s = source_suite.provides_table
1469 binaries_t = target_suite.binaries
1470 provides_t = target_suite.provides_table
1471 unsat_bd: dict[str, list[str]] = {}
1472 relevant_archs: set[str] = {
1473 binary.architecture
1474 for binary in filter_out_faux(source_data_srcdist.binaries)
1475 if britney.all_binaries[binary].architecture != "all"
1476 }
1478 excuses_info: dict[str, list[str]] = defaultdict(list)
1479 blockers: dict[str, set[BinaryPackageId]] = defaultdict(set)
1480 arch_results = {}
1481 result_archs = defaultdict(list)
1482 bestresult = BuildDepResult.FAILED
1483 check_archs = self._get_check_archs(relevant_archs, dep_type)
1484 if not check_archs:
1485 # when the arch list is empty, we check the b-d on any arch, instead of all archs
1486 # this happens for Build-Depens on a source package that only produces arch: all binaries
1487 any_arch_ok = True
1488 check_archs = self._get_check_archs(
1489 self.options.architectures, DependencyType.BUILD_DEPENDS_INDEP
1490 )
1492 for arch in check_archs:
1493 # retrieve the binary package from the specified suite and arch
1494 binaries_s_a = binaries_s[arch]
1495 provides_s_a = provides_s[arch]
1496 binaries_t_a = binaries_t[arch]
1497 provides_t_a = provides_t[arch]
1498 arch_results[arch] = BuildDepResult.OK
1499 # for every dependency block (formed as conjunction of disjunction)
1500 for block_txt in deps.split(","):
1501 block_list = parse_src_depends(block_txt, False, arch)
1502 # Unlike regular dependencies, some clauses of the Build-Depends(-Arch|-Indep) can be
1503 # filtered out by (e.g.) architecture restrictions. We need to cope with this while
1504 # keeping block_txt and block aligned.
1505 if not block_list:
1506 # Relation is not relevant for this architecture.
1507 continue
1508 block = block_list[0]
1509 # if the block is satisfied in the target suite, then skip the block
1510 if get_dependency_solvers(
1511 block, binaries_t_a, provides_t_a, build_depends=True
1512 ):
1513 # Satisfied in the target suite; all ok.
1514 continue
1516 # check if the block can be satisfied in the source suite, and list the solving packages
1517 packages = get_dependency_solvers(
1518 block, binaries_s_a, provides_s_a, build_depends=True
1519 )
1520 sources = sorted(p.source for p in packages)
1522 # if the dependency can be satisfied by the same source package, skip the block:
1523 # obviously both binary packages will enter the target suite together
1524 if source_name in sources: 1524 ↛ 1525line 1524 didn't jump to line 1525 because the condition on line 1524 was never true
1525 continue
1527 # if no package can satisfy the dependency, add this information to the excuse
1528 if not packages:
1529 excuses_info[arch].append(
1530 "%s unsatisfiable %s on %s: %s"
1531 % (source_name, dep_type, arch, block_txt.strip())
1532 )
1533 if arch not in unsat_bd: 1533 ↛ 1535line 1533 didn't jump to line 1535 because the condition on line 1533 was always true
1534 unsat_bd[arch] = []
1535 unsat_bd[arch].append(block_txt.strip())
1536 arch_results[arch] = BuildDepResult.FAILED
1537 continue
1539 blockers[arch].update(p.pkg_id for p in packages)
1540 if arch_results[arch] < BuildDepResult.DEPENDS:
1541 arch_results[arch] = BuildDepResult.DEPENDS
1543 if any_arch_ok:
1544 if arch_results[arch] < bestresult:
1545 bestresult = arch_results[arch]
1546 result_archs[arch_results[arch]].append(arch)
1547 if bestresult == BuildDepResult.OK:
1548 # we found an architecture where the b-deps-indep are
1549 # satisfied in the target suite, so we can stop
1550 break
1552 if any_arch_ok:
1553 arch = result_archs[bestresult][0]
1554 excuse.add_detailed_info(f"Checking {dep_type.get_description()} on {arch}")
1555 key = "check-%s-on-arch" % dep_type.get_reason()
1556 build_deps_info[key] = arch
1557 verdict = self._add_info_for_arch(
1558 arch,
1559 excuses_info,
1560 blockers,
1561 arch_results,
1562 dep_type,
1563 target_suite,
1564 source_suite,
1565 excuse,
1566 verdict,
1567 )
1569 else:
1570 for arch in check_archs:
1571 verdict = self._add_info_for_arch(
1572 arch,
1573 excuses_info,
1574 blockers,
1575 arch_results,
1576 dep_type,
1577 target_suite,
1578 source_suite,
1579 excuse,
1580 verdict,
1581 )
1583 if unsat_bd:
1584 build_deps_info["unsatisfiable-arch-build-depends"] = unsat_bd
1586 return verdict
1589class BuiltUsingPolicy(AbstractBasePolicy):
1590 """Built-Using policy
1592 Binaries that incorporate (part of) another source package must list these
1593 sources under 'Built-Using'.
1595 This policy checks if the corresponding sources are available in the
1596 target suite. If they are not, but they are candidates for migration, a
1597 dependency is added.
1599 If the binary incorporates a newer version of a source, that is not (yet)
1600 a candidate, we don't want to accept that binary. A rebuild later in the
1601 primary suite wouldn't fix the issue, because that would incorporate the
1602 newer version again.
1604 If the binary incorporates an older version of the source, a newer version
1605 will be accepted as a replacement. We assume that this can be fixed by
1606 rebuilding the binary at some point during the development cycle.
1608 Requiring exact version of the source would not be useful in practice. A
1609 newer upload of that source wouldn't be blocked by this policy, so the
1610 built-using would be outdated anyway.
1612 """
1614 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1615 super().__init__(
1616 "built-using",
1617 options,
1618 suite_info,
1619 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1620 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1621 )
1623 def initialise(self, britney: "Britney") -> None:
1624 super().initialise(britney)
1626 def apply_srcarch_policy_impl(
1627 self,
1628 build_deps_info: dict[str, Any],
1629 arch: str,
1630 source_data_tdist: SourcePackage | None,
1631 source_data_srcdist: SourcePackage,
1632 excuse: "Excuse",
1633 ) -> PolicyVerdict:
1634 verdict = PolicyVerdict.PASS
1636 source_suite = excuse.item.suite
1637 target_suite = self.suite_info.target_suite
1638 binaries_s = source_suite.binaries
1640 def check_bu_in_suite(
1641 bu_source: str, bu_version: str, source_suite: Suite
1642 ) -> bool:
1643 found = False
1644 if bu_source not in source_suite.sources:
1645 return found
1646 s_source = source_suite.sources[bu_source]
1647 s_ver = s_source.version
1648 if apt_pkg.version_compare(s_ver, bu_version) >= 0:
1649 found = True
1650 dep = PackageId(bu_source, s_ver, "source")
1651 if arch in self.options.break_arches:
1652 excuse.add_detailed_info(
1653 "Ignoring Built-Using for %s/%s on %s"
1654 % (pkg_name, arch, dep.uvname)
1655 )
1656 else:
1657 spec = DependencySpec(DependencyType.BUILT_USING, arch)
1658 excuse.add_package_depends(spec, {dep})
1659 excuse.add_detailed_info(
1660 f"{pkg_name}/{arch} has Built-Using on {dep.uvname}"
1661 )
1663 return found
1665 for pkg_id in sorted(
1666 x
1667 for x in filter_out_faux(source_data_srcdist.binaries)
1668 if x.architecture == arch
1669 ):
1670 pkg_name = pkg_id.package_name
1672 # retrieve the testing (if present) and unstable corresponding binary packages
1673 binary_s = binaries_s[arch][pkg_name]
1675 for bu in binary_s.builtusing:
1676 bu_source = bu[0]
1677 bu_version = bu[1]
1678 found = False
1679 if bu_source in target_suite.sources:
1680 t_source = target_suite.sources[bu_source]
1681 t_ver = t_source.version
1682 if apt_pkg.version_compare(t_ver, bu_version) >= 0:
1683 found = True
1685 if not found:
1686 found = check_bu_in_suite(bu_source, bu_version, source_suite)
1688 if not found and source_suite.suite_class.is_additional_source:
1689 found = check_bu_in_suite(
1690 bu_source, bu_version, self.suite_info.primary_source_suite
1691 )
1693 if not found:
1694 if arch in self.options.break_arches:
1695 excuse.add_detailed_info(
1696 "Ignoring unsatisfiable Built-Using for %s/%s on %s %s"
1697 % (pkg_name, arch, bu_source, bu_version)
1698 )
1699 else:
1700 verdict = PolicyVerdict.worst_of(
1701 verdict, PolicyVerdict.REJECTED_PERMANENTLY
1702 )
1703 excuse.add_verdict_info(
1704 verdict,
1705 "%s/%s has unsatisfiable Built-Using on %s %s"
1706 % (pkg_name, arch, bu_source, bu_version),
1707 )
1709 return verdict
1712class BlockPolicy(AbstractBasePolicy):
1713 BLOCK_HINT_REGEX = re.compile("^(un)?(block-?.*)$")
1715 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1716 super().__init__(
1717 "block",
1718 options,
1719 suite_info,
1720 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1721 )
1722 self._blockall: dict[str | None, Hint] = {}
1724 def initialise(self, britney: "Britney") -> None:
1725 super().initialise(britney)
1726 assert self.hints is not None
1727 for hint in self.hints.search(type="block-all"):
1728 self._blockall[hint.package] = hint
1730 self._key_packages = []
1731 if "key" in self._blockall:
1732 self._key_packages = self._read_key_packages()
1734 def _read_key_packages(self) -> list[str]:
1735 """Read the list of key packages
1737 The file contains data in the yaml format :
1739 - reason: <something>
1740 source: <package>
1742 The method returns a list of all key packages.
1743 """
1744 filename = os.path.join(self.state_dir, "key_packages.yaml")
1745 self.logger.info("Loading key packages from %s", filename)
1746 if os.path.exists(filename): 1746 ↛ 1751line 1746 didn't jump to line 1751 because the condition on line 1746 was always true
1747 with open(filename) as f:
1748 data = yaml.safe_load(f)
1749 key_packages = [item["source"] for item in data]
1750 else:
1751 self.logger.error(
1752 "Britney was asked to block key packages, "
1753 + "but no key_packages.yaml file was found."
1754 )
1755 sys.exit(1)
1757 return key_packages
1759 def register_hints(self, hint_parser: HintParser) -> None:
1760 # block related hints are currently defined in hint.py
1761 pass
1763 def _check_blocked(
1764 self, arch: str, version: str, excuse: "Excuse"
1765 ) -> PolicyVerdict:
1766 verdict = PolicyVerdict.PASS
1767 blocked = {}
1768 unblocked = {}
1769 block_info = {}
1770 source_suite = excuse.item.suite
1771 suite_name = source_suite.name
1772 src = excuse.item.package
1773 is_primary = source_suite.suite_class == SuiteClass.PRIMARY_SOURCE_SUITE
1775 tooltip = (
1776 "please contact %s-release if update is needed" % self.options.distribution
1777 )
1779 assert self.hints is not None
1780 shints = self.hints.search(package=src)
1781 mismatches = False
1782 r = self.BLOCK_HINT_REGEX
1783 for hint in shints:
1784 m = r.match(hint.type)
1785 if m:
1786 if m.group(1) == "un":
1787 assert hint.suite is not None
1788 if (
1789 hint.version != version
1790 or hint.suite.name != suite_name
1791 or (hint.architecture != arch and hint.architecture != "source")
1792 ):
1793 self.logger.info(
1794 "hint mismatch: %s %s %s", version, arch, suite_name
1795 )
1796 mismatches = True
1797 else:
1798 unblocked[m.group(2)] = hint.user
1799 excuse.add_hint(hint)
1800 else:
1801 # block(-*) hint: only accepts a source, so this will
1802 # always match
1803 blocked[m.group(2)] = hint.user
1804 excuse.add_hint(hint)
1806 if "block" not in blocked and is_primary:
1807 # if there is a specific block hint for this package, we don't
1808 # check for the general hints
1810 if self.options.distribution == "debian": 1810 ↛ 1817line 1810 didn't jump to line 1817 because the condition on line 1810 was always true
1811 url = "https://release.debian.org/testing/freeze_policy.html"
1812 tooltip = (
1813 'Follow the <a href="%s">freeze policy</a> when applying for an unblock'
1814 % url
1815 )
1817 if "source" in self._blockall:
1818 blocked["block"] = self._blockall["source"].user
1819 excuse.add_hint(self._blockall["source"])
1820 elif (
1821 "new-source" in self._blockall
1822 and src not in self.suite_info.target_suite.sources
1823 ):
1824 blocked["block"] = self._blockall["new-source"].user
1825 excuse.add_hint(self._blockall["new-source"])
1826 # no tooltip: new sources will probably not be accepted anyway
1827 block_info["block"] = "blocked by {}: is not in {}".format(
1828 self._blockall["new-source"].user,
1829 self.suite_info.target_suite.name,
1830 )
1831 elif "key" in self._blockall and src in self._key_packages:
1832 blocked["block"] = self._blockall["key"].user
1833 excuse.add_hint(self._blockall["key"])
1834 block_info["block"] = "blocked by {}: is a key package ({})".format(
1835 self._blockall["key"].user,
1836 tooltip,
1837 )
1838 elif "no-autopkgtest" in self._blockall:
1839 if excuse.autopkgtest_results == {"PASS"}:
1840 if not blocked: 1840 ↛ 1866line 1840 didn't jump to line 1866 because the condition on line 1840 was always true
1841 excuse.addinfo("not blocked: has successful autopkgtest")
1842 else:
1843 blocked["block"] = self._blockall["no-autopkgtest"].user
1844 excuse.add_hint(self._blockall["no-autopkgtest"])
1845 if not excuse.autopkgtest_results:
1846 block_info["block"] = (
1847 "blocked by %s: does not have autopkgtest (%s)"
1848 % (
1849 self._blockall["no-autopkgtest"].user,
1850 tooltip,
1851 )
1852 )
1853 else:
1854 block_info["block"] = (
1855 "blocked by %s: autopkgtest not fully successful (%s)"
1856 % (
1857 self._blockall["no-autopkgtest"].user,
1858 tooltip,
1859 )
1860 )
1862 elif not is_primary:
1863 blocked["block"] = suite_name
1864 excuse.needs_approval = True
1866 for block_cmd in blocked:
1867 unblock_cmd = "un" + block_cmd
1868 if block_cmd in unblocked:
1869 if is_primary or block_cmd == "block-udeb":
1870 excuse.addinfo(
1871 "Ignoring %s request by %s, due to %s request by %s"
1872 % (
1873 block_cmd,
1874 blocked[block_cmd],
1875 unblock_cmd,
1876 unblocked[block_cmd],
1877 )
1878 )
1879 else:
1880 excuse.addinfo("Approved by %s" % (unblocked[block_cmd]))
1881 else:
1882 verdict = PolicyVerdict.REJECTED_NEEDS_APPROVAL
1883 if is_primary or block_cmd == "block-udeb":
1884 # redirect people to d-i RM for udeb things:
1885 if block_cmd == "block-udeb":
1886 tooltip = "please contact the d-i release manager if an update is needed"
1887 if block_cmd in block_info:
1888 info = block_info[block_cmd]
1889 else:
1890 info = (
1891 "Not touching package due to {} request by {} ({})".format(
1892 block_cmd,
1893 blocked[block_cmd],
1894 tooltip,
1895 )
1896 )
1897 excuse.add_verdict_info(verdict, info)
1898 else:
1899 excuse.add_verdict_info(verdict, "NEEDS APPROVAL BY RM")
1900 excuse.addreason("block")
1901 if mismatches:
1902 excuse.add_detailed_info(
1903 "Some hints for %s do not match this item" % src
1904 )
1905 return verdict
1907 def apply_src_policy_impl(
1908 self,
1909 block_info: dict[str, Any],
1910 source_data_tdist: SourcePackage | None,
1911 source_data_srcdist: SourcePackage,
1912 excuse: "Excuse",
1913 ) -> PolicyVerdict:
1914 return self._check_blocked("source", source_data_srcdist.version, excuse)
1916 def apply_srcarch_policy_impl(
1917 self,
1918 block_info: dict[str, Any],
1919 arch: str,
1920 source_data_tdist: SourcePackage | None,
1921 source_data_srcdist: SourcePackage,
1922 excuse: "Excuse",
1923 ) -> PolicyVerdict:
1924 return self._check_blocked(arch, source_data_srcdist.version, excuse)
1927class BuiltOnBuilddPolicy(AbstractBasePolicy):
1929 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
1930 super().__init__(
1931 "builtonbuildd",
1932 options,
1933 suite_info,
1934 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
1935 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
1936 )
1937 self._builtonbuildd: dict[str, Any] = {
1938 "signerinfo": None,
1939 }
1941 def register_hints(self, hint_parser: HintParser) -> None:
1942 hint_parser.register_hint_type(
1943 HintType(
1944 "allow-archall-maintainer-upload",
1945 versioned=HintAnnotate.FORBIDDEN,
1946 )
1947 )
1949 def initialise(self, britney: "Britney") -> None:
1950 super().initialise(britney)
1951 try:
1952 filename_signerinfo = os.path.join(self.state_dir, "signers.json")
1953 except AttributeError as e: # pragma: no cover
1954 raise RuntimeError(
1955 "Please set STATE_DIR in the britney configuration"
1956 ) from e
1957 self._builtonbuildd["signerinfo"] = self._read_signerinfo(filename_signerinfo)
1959 def apply_srcarch_policy_impl(
1960 self,
1961 buildd_info: dict[str, Any],
1962 arch: str,
1963 source_data_tdist: SourcePackage | None,
1964 source_data_srcdist: SourcePackage,
1965 excuse: "Excuse",
1966 ) -> PolicyVerdict:
1967 verdict = PolicyVerdict.PASS
1968 signers = self._builtonbuildd["signerinfo"]
1970 if "signed-by" not in buildd_info:
1971 buildd_info["signed-by"] = {}
1973 item = excuse.item
1974 source_suite = item.suite
1976 # horrible hard-coding, but currently, we don't keep track of the
1977 # component when loading the packages files
1978 component = "main"
1979 # we use the source component, because a binary in contrib can
1980 # belong to a source in main
1981 section = source_data_srcdist.section
1982 if section.find("/") > -1:
1983 component = section.split("/")[0]
1985 packages_s_a = source_suite.binaries[arch]
1986 assert self.hints is not None
1988 for pkg_id in sorted(
1989 x
1990 for x in filter_out_faux(source_data_srcdist.binaries)
1991 if x.architecture == arch
1992 ):
1993 pkg_name = pkg_id.package_name
1994 binary_u = packages_s_a[pkg_name]
1995 pkg_arch = binary_u.architecture
1997 if binary_u.source_version != source_data_srcdist.version: 1997 ↛ 1998line 1997 didn't jump to line 1998 because the condition on line 1997 was never true
1998 continue
2000 if item.architecture != "source" and pkg_arch == "all":
2001 # we don't care about the existing arch: all binaries when
2002 # checking a binNMU item, because the arch: all binaries won't
2003 # migrate anyway
2004 continue
2006 signer = None
2007 uid = None
2008 uidinfo = ""
2009 buildd_ok = False
2010 failure_verdict = PolicyVerdict.REJECTED_PERMANENTLY
2011 try:
2012 signer = signers[pkg_name][pkg_id.version][pkg_arch]
2013 if signer["buildd"]:
2014 buildd_ok = True
2015 uid = signer["uid"]
2016 uidinfo = f"arch {pkg_arch} binaries uploaded by {uid}"
2017 except KeyError:
2018 self.logger.info(
2019 "signer info for %s %s (%s) on %s not found "
2020 % (pkg_name, binary_u.version, pkg_arch, arch)
2021 )
2022 uidinfo = "upload info for arch %s binaries not found" % (pkg_arch)
2023 failure_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT
2024 if not buildd_ok:
2025 if component != "main":
2026 if not buildd_ok and pkg_arch not in buildd_info["signed-by"]: 2026 ↛ 2030line 2026 didn't jump to line 2030 because the condition on line 2026 was always true
2027 excuse.add_detailed_info(
2028 f"{uidinfo}, but package in {component}"
2029 )
2030 buildd_ok = True
2031 elif pkg_arch == "all":
2032 allow_hints = self.hints.search(
2033 "allow-archall-maintainer-upload", package=item.package
2034 )
2035 if allow_hints:
2036 buildd_ok = True
2037 verdict = PolicyVerdict.worst_of(
2038 verdict, PolicyVerdict.PASS_HINTED
2039 )
2040 if pkg_arch not in buildd_info["signed-by"]:
2041 excuse.addinfo(
2042 "%s, but whitelisted by %s"
2043 % (uidinfo, allow_hints[0].user)
2044 )
2045 if not buildd_ok:
2046 verdict = failure_verdict
2047 if pkg_arch not in buildd_info["signed-by"]:
2048 if pkg_arch == "all":
2049 uidinfo += (
2050 ", a new source-only upload is needed to allow migration"
2051 )
2052 excuse.add_verdict_info(
2053 verdict, "Not built on buildd: %s" % (uidinfo)
2054 )
2056 if ( 2056 ↛ 2060line 2056 didn't jump to line 2060
2057 pkg_arch in buildd_info["signed-by"]
2058 and buildd_info["signed-by"][pkg_arch] != uid
2059 ):
2060 self.logger.info(
2061 "signer mismatch for %s (%s %s) on %s: %s, while %s already listed"
2062 % (
2063 pkg_name,
2064 binary_u.source,
2065 binary_u.source_version,
2066 pkg_arch,
2067 uid,
2068 buildd_info["signed-by"][pkg_arch],
2069 )
2070 )
2072 buildd_info["signed-by"][pkg_arch] = uid
2074 return verdict
2076 def _read_signerinfo(self, filename: str) -> dict[str, Any]:
2077 signerinfo: dict[str, Any] = {}
2078 self.logger.info("Loading signer info from %s", filename)
2079 with open(filename) as fd: 2079 ↛ exitline 2079 didn't return from function '_read_signerinfo' because the return on line 2081 wasn't executed
2080 if os.fstat(fd.fileno()).st_size < 1: 2080 ↛ 2081line 2080 didn't jump to line 2081 because the condition on line 2080 was never true
2081 return signerinfo
2082 signerinfo = json.load(fd)
2084 return signerinfo
2087class ImplicitDependencyPolicy(AbstractBasePolicy):
2088 """Implicit Dependency policy
2090 Upgrading a package pkg-a can break the installability of a package pkg-b.
2091 A newer version (or the removal) of pkg-b might fix the issue. In that
2092 case, pkg-a has an 'implicit dependency' on pkg-b, because pkg-a can only
2093 migrate if pkg-b also migrates.
2095 This policy tries to discover a few common cases, and adds the relevant
2096 info to the excuses. If another item is needed to fix the
2097 uninstallability, a dependency is added. If no newer item can fix it, this
2098 excuse will be blocked.
2100 Note that the migration step will check the installability of every
2101 package, so this policy doesn't need to handle every corner case. It
2102 must, however, make sure that no excuse is unnecessarily blocked.
2104 Some cases that should be detected by this policy:
2106 * pkg-a is upgraded from 1.0-1 to 2.0-1, while
2107 pkg-b has "Depends: pkg-a (<< 2.0)"
2108 This typically happens if pkg-b has a strict dependency on pkg-a because
2109 it uses some non-stable internal interface (examples are glibc,
2110 binutils, python3-defaults, ...)
2112 * pkg-a is upgraded from 1.0-1 to 2.0-1, and
2113 pkg-a 1.0-1 has "Provides: provides-1",
2114 pkg-a 2.0-1 has "Provides: provides-2",
2115 pkg-b has "Depends: provides-1"
2116 This typically happens when pkg-a has an interface that changes between
2117 versions, and a virtual package is used to identify the version of this
2118 interface (e.g. perl-api-x.y)
2120 """
2122 _pkg_universe: "BinaryPackageUniverse"
2123 _all_binaries: dict["BinaryPackageId", "BinaryPackage"]
2124 _allow_uninst: dict[str, set[str | None]]
2125 _nobreakall_arches: list[str]
2127 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2128 super().__init__(
2129 "implicit-deps",
2130 options,
2131 suite_info,
2132 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
2133 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
2134 )
2136 def initialise(self, britney: "Britney") -> None:
2137 super().initialise(britney)
2138 self._pkg_universe = britney.pkg_universe
2139 self._all_binaries = britney.all_binaries
2140 self._smooth_updates = britney.options.smooth_updates
2141 self._nobreakall_arches = self.options.nobreakall_arches
2142 self._new_arches = self.options.new_arches
2143 self._break_arches = self.options.break_arches
2144 self._allow_uninst = britney.allow_uninst
2145 self._outofsync_arches = self.options.outofsync_arches
2147 def can_be_removed(self, pkg: BinaryPackage) -> bool:
2148 src = pkg.source
2149 target_suite = self.suite_info.target_suite
2151 # TODO these conditions shouldn't be hardcoded here
2152 # ideally, we would be able to look up excuses to see if the removal
2153 # is in there, but in the current flow, this policy is called before
2154 # all possible excuses exist, so there is no list for us to check
2156 if src not in self.suite_info.primary_source_suite.sources:
2157 # source for pkg not in unstable: candidate for removal
2158 return True
2160 source_t = target_suite.sources[src]
2161 assert self.hints is not None
2162 for hint in self.hints.search("remove", package=src, version=source_t.version):
2163 # removal hint for the source in testing: candidate for removal
2164 return True
2166 if target_suite.is_cruft(pkg):
2167 # if pkg is cruft in testing, removal will be tried
2168 return True
2170 # the case were the newer version of the source no longer includes the
2171 # binary (or includes a cruft version of the binary) will be handled
2172 # separately (in that case there might be an implicit dependency on
2173 # the newer source)
2175 return False
2177 def should_skip_rdep(
2178 self, pkg: BinaryPackage, source_name: str, myarch: str
2179 ) -> bool:
2180 target_suite = self.suite_info.target_suite
2182 if not target_suite.is_pkg_in_the_suite(pkg.pkg_id):
2183 # it is not in the target suite, migration cannot break anything
2184 return True
2186 if pkg.source == source_name:
2187 # if it is built from the same source, it will be upgraded
2188 # with the source
2189 return True
2191 if self.can_be_removed(pkg):
2192 # could potentially be removed, so if that happens, it won't be
2193 # broken
2194 return True
2196 if pkg.architecture == "all" and myarch not in self._nobreakall_arches:
2197 # arch all on non nobreakarch is allowed to become uninstallable
2198 return True
2200 if pkg.pkg_id.package_name in self._allow_uninst[myarch]:
2201 # there is a hint to allow this binary to become uninstallable
2202 return True
2204 if not target_suite.is_installable(pkg.pkg_id):
2205 # it is already uninstallable in the target suite, migration
2206 # cannot break anything
2207 return True
2209 return False
2211 def breaks_installability(
2212 self,
2213 pkg_id_t: BinaryPackageId,
2214 pkg_id_s: BinaryPackageId | None,
2215 pkg_to_check: BinaryPackageId,
2216 ) -> bool:
2217 """
2218 Check if upgrading pkg_id_t to pkg_id_s breaks the installability of
2219 pkg_to_check.
2221 To check if removing pkg_id_t breaks pkg_to_check, set pkg_id_s to
2222 None.
2223 """
2225 pkg_universe = self._pkg_universe
2226 negative_deps = pkg_universe.negative_dependencies_of(pkg_to_check)
2228 for dep in pkg_universe.dependencies_of(pkg_to_check):
2229 if pkg_id_t not in dep:
2230 # this depends doesn't have pkg_id_t as alternative, so
2231 # upgrading pkg_id_t cannot break this dependency clause
2232 continue
2234 # We check all the alternatives for this dependency, to find one
2235 # that can satisfy it when pkg_id_t is upgraded to pkg_id_s
2236 found_alternative = False
2237 for d in dep:
2238 if d in negative_deps:
2239 # If this alternative dependency conflicts with
2240 # pkg_to_check, it cannot be used to satisfy the
2241 # dependency.
2242 # This commonly happens when breaks are added to pkg_id_s.
2243 continue
2245 if d.package_name != pkg_id_t.package_name:
2246 # a binary different from pkg_id_t can satisfy the dep, so
2247 # upgrading pkg_id_t won't break this dependency
2248 found_alternative = True
2249 break
2251 if d != pkg_id_s:
2252 # We want to know the impact of the upgrade of
2253 # pkg_id_t to pkg_id_s. If pkg_id_s migrates to the
2254 # target suite, any other version of this binary will
2255 # not be there, so it cannot satisfy this dependency.
2256 # This includes pkg_id_t, but also other versions.
2257 continue
2259 # pkg_id_s can satisfy the dep
2260 found_alternative = True
2262 if not found_alternative:
2263 return True
2264 return False
2266 def check_upgrade(
2267 self,
2268 pkg_id_t: BinaryPackageId,
2269 pkg_id_s: BinaryPackageId | None,
2270 source_name: str,
2271 myarch: str,
2272 broken_binaries: set[str],
2273 excuse: "Excuse",
2274 ) -> PolicyVerdict:
2275 verdict = PolicyVerdict.PASS
2277 pkg_universe = self._pkg_universe
2278 all_binaries = self._all_binaries
2280 # check all rdeps of the package in testing
2281 rdeps_t = pkg_universe.reverse_dependencies_of(pkg_id_t)
2283 for rdep_pkg in sorted(rdeps_t):
2284 rdep_p = all_binaries[rdep_pkg]
2286 # check some cases where the rdep won't become uninstallable, or
2287 # where we don't care if it does
2288 if self.should_skip_rdep(rdep_p, source_name, myarch):
2289 continue
2291 if not self.breaks_installability(pkg_id_t, pkg_id_s, rdep_pkg):
2292 # if upgrading pkg_id_t to pkg_id_s doesn't break rdep_pkg,
2293 # there is no implicit dependency
2294 continue
2296 # The upgrade breaks the installability of the rdep. We need to
2297 # find out if there is a newer version of the rdep that solves the
2298 # uninstallability. If that is the case, there is an implicit
2299 # dependency. If not, the upgrade will fail.
2301 # check source versions
2302 newer_versions = find_newer_binaries(
2303 self.suite_info, rdep_p, add_source_for_dropped_bin=True
2304 )
2305 good_newer_versions = set()
2306 for npkg, suite in newer_versions:
2307 if npkg.architecture == "source":
2308 # When a newer version of the source package doesn't have
2309 # the binary, we get the source as 'newer version'. In
2310 # this case, the binary will not be uninstallable if the
2311 # newer source migrates, because it is no longer there.
2312 good_newer_versions.add(npkg)
2313 continue
2314 assert isinstance(npkg, BinaryPackageId)
2315 if not self.breaks_installability(pkg_id_t, pkg_id_s, npkg):
2316 good_newer_versions.add(npkg)
2318 if good_newer_versions:
2319 spec = DependencySpec(DependencyType.IMPLICIT_DEPENDENCY, myarch)
2320 excuse.add_package_depends(spec, good_newer_versions)
2321 else:
2322 # no good newer versions: no possible solution
2323 broken_binaries.add(rdep_pkg.name)
2324 if pkg_id_s:
2325 action = "migrating {} to {}".format(
2326 pkg_id_s.name,
2327 self.suite_info.target_suite.name,
2328 )
2329 else:
2330 action = "removing {} from {}".format(
2331 pkg_id_t.name,
2332 self.suite_info.target_suite.name,
2333 )
2334 if rdep_pkg[0].endswith("-faux-build-depends"):
2335 name = rdep_pkg[0].replace("-faux-build-depends", "")
2336 info = f'{action} makes Build-Depends of src:<a href="#{name}">{name}</a> uninstallable'
2337 else:
2338 info = '{0} makes <a href="#{1}">{1}</a> uninstallable'.format(
2339 action, rdep_pkg.name
2340 )
2341 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2342 excuse.add_verdict_info(verdict, info)
2344 return verdict
2346 def apply_srcarch_policy_impl(
2347 self,
2348 implicit_dep_info: dict[str, Any],
2349 arch: str,
2350 source_data_tdist: SourcePackage | None,
2351 source_data_srcdist: SourcePackage,
2352 excuse: "Excuse",
2353 ) -> PolicyVerdict:
2354 verdict = PolicyVerdict.PASS
2356 if not source_data_tdist:
2357 # this item is not currently in testing: no implicit dependency
2358 return verdict
2360 if excuse.hasreason("missingbuild"):
2361 # if the build is missing, the policy would treat this as if the
2362 # binaries would be removed, which would give incorrect (and
2363 # confusing) info
2364 info = "missing build, not checking implicit dependencies on %s" % (arch)
2365 excuse.add_detailed_info(info)
2366 return verdict
2368 source_suite = excuse.item.suite
2369 source_name = excuse.item.package
2370 target_suite = self.suite_info.target_suite
2371 all_binaries = self._all_binaries
2373 # we check all binaries for this excuse that are currently in testing
2374 relevant_binaries = [
2375 x
2376 for x in source_data_tdist.binaries
2377 if (arch == "source" or x.architecture == arch)
2378 and x.package_name in target_suite.binaries[x.architecture]
2379 and x.architecture not in self._new_arches
2380 and x.architecture not in self._break_arches
2381 and x.architecture not in self._outofsync_arches
2382 ]
2384 broken_binaries: set[str] = set()
2386 assert self.hints is not None
2387 for pkg_id_t in sorted(relevant_binaries):
2388 mypkg = pkg_id_t.package_name
2389 myarch = pkg_id_t.architecture
2390 binaries_t_a = target_suite.binaries[myarch]
2391 binaries_s_a = source_suite.binaries[myarch]
2393 if target_suite.is_cruft(all_binaries[pkg_id_t]):
2394 # this binary is cruft in testing: it will stay around as long
2395 # as necessary to satisfy dependencies, so we don't need to
2396 # care
2397 continue
2399 if mypkg in binaries_s_a:
2400 mybin = binaries_s_a[mypkg]
2401 pkg_id_s: Optional["BinaryPackageId"] = mybin.pkg_id
2402 if mybin.source != source_name:
2403 # hijack: this is too complicated to check, so we ignore
2404 # it (the migration code will check the installability
2405 # later anyway)
2406 pass
2407 elif mybin.source_version != source_data_srcdist.version:
2408 # cruft in source suite: pretend the binary doesn't exist
2409 pkg_id_s = None
2410 elif pkg_id_t == pkg_id_s:
2411 # same binary (probably arch: all from a binNMU):
2412 # 'upgrading' doesn't change anything, for this binary, so
2413 # it won't break anything
2414 continue
2415 else:
2416 pkg_id_s = None
2418 if not pkg_id_s and is_smooth_update_allowed(
2419 binaries_t_a[mypkg], self._smooth_updates, self.hints
2420 ):
2421 # the binary isn't in the new version (or is cruft there), and
2422 # smooth updates are allowed: the binary can stay around if
2423 # that is necessary to satisfy dependencies, so we don't need
2424 # to check it
2425 continue
2427 if (
2428 not pkg_id_s
2429 and source_data_tdist.version == source_data_srcdist.version
2430 and source_suite.suite_class == SuiteClass.ADDITIONAL_SOURCE_SUITE
2431 and binaries_t_a[mypkg].architecture == "all"
2432 ):
2433 # we're very probably migrating a binNMU built in tpu where the arch:all
2434 # binaries were not copied to it as that's not needed. This policy could
2435 # needlessly block.
2436 continue
2438 v = self.check_upgrade(
2439 pkg_id_t, pkg_id_s, source_name, myarch, broken_binaries, excuse
2440 )
2441 verdict = PolicyVerdict.worst_of(verdict, v)
2443 # each arch is processed separately, so if we already have info from
2444 # other archs, we need to merge the info from this arch
2445 broken_old = set(implicit_dep_info.get("broken-binaries", []))
2446 implicit_dep_info["broken-binaries"] = sorted(broken_old | broken_binaries)
2448 return verdict
2451class ReverseRemovalPolicy(AbstractBasePolicy):
2452 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2453 super().__init__(
2454 "reverseremoval",
2455 options,
2456 suite_info,
2457 {SuiteClass.PRIMARY_SOURCE_SUITE, SuiteClass.ADDITIONAL_SOURCE_SUITE},
2458 )
2460 def register_hints(self, hint_parser: HintParser) -> None:
2461 hint_parser.register_hint_type(HintType("ignore-reverse-remove"))
2463 def initialise(self, britney: "Britney") -> None:
2464 super().initialise(britney)
2466 pkg_universe = britney.pkg_universe
2467 source_suites = britney.suite_info.source_suites
2468 target_suite = britney.suite_info.target_suite
2470 # Build set of the sources of reverse (Build-) Depends
2471 assert self.hints is not None
2472 hints = self.hints.search("remove")
2474 rev_bin: dict[BinaryPackageId, set[str]] = defaultdict(set)
2475 for hint in hints:
2476 for item in hint.packages:
2477 # I think we don't need to look at the target suite
2478 for src_suite in source_suites:
2479 try:
2480 # Explicitly not running filter_out_faux here
2481 my_bins = set(src_suite.sources[item.uvname].binaries)
2482 except KeyError:
2483 continue
2484 compute_reverse_tree(pkg_universe, my_bins)
2485 for this_bin in my_bins:
2486 rev_bin.setdefault(this_bin, set()).add(item.uvname)
2488 rev_src: dict[str, set[str]] = defaultdict(set)
2489 for bin_pkg, reasons in rev_bin.items():
2490 # If the pkg is in the target suite, there's nothing this
2491 # policy wants to do.
2492 if target_suite.is_pkg_in_the_suite(bin_pkg):
2493 continue
2494 that_bin = britney.all_binaries[bin_pkg]
2495 bin_src = that_bin.source + "/" + that_bin.source_version
2496 rev_src.setdefault(bin_src, set()).update(reasons)
2497 self._block_src_for_rm_hint = rev_src
2499 def apply_src_policy_impl(
2500 self,
2501 rev_remove_info: dict[str, Any],
2502 source_data_tdist: SourcePackage | None,
2503 source_data_srcdist: SourcePackage,
2504 excuse: "Excuse",
2505 ) -> PolicyVerdict:
2506 verdict = PolicyVerdict.PASS
2508 item = excuse.item
2509 if item.name in self._block_src_for_rm_hint:
2510 reason = ", ".join(sorted(self._block_src_for_rm_hint[item.name]))
2511 assert self.hints is not None
2512 ignore_hints = self.hints.search(
2513 "ignore-reverse-remove", package=item.uvname, version=item.version
2514 )
2515 excuse.addreason("reverseremoval")
2516 if ignore_hints:
2517 excuse.addreason("ignore-reverse-remove")
2518 excuse.addinfo(
2519 "Should block migration because of remove hint for %s, but forced by %s"
2520 % (reason, ignore_hints[0].user)
2521 )
2522 verdict = PolicyVerdict.PASS_HINTED
2523 else:
2524 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2525 excuse.add_verdict_info(
2526 verdict, "Remove hint for (transitive) dependency: %s" % reason
2527 )
2529 return verdict
2532class ReproduciblePolicy(AbstractBasePolicy):
2533 def __init__(self, options: optparse.Values, suite_info: Suites) -> None:
2534 super().__init__(
2535 "reproducible",
2536 options,
2537 suite_info,
2538 {SuiteClass.PRIMARY_SOURCE_SUITE},
2539 ApplySrcPolicy.RUN_ON_EVERY_ARCH_ONLY,
2540 )
2541 self._reproducible: dict[str, Any] = {}
2543 # Default values for this policy's options
2544 parse_option(options, "repro_success_bounty", default=0, to_int=True)
2545 parse_option(options, "repro_regression_penalty", default=0, to_int=True)
2546 parse_option(options, "repro_log_url")
2547 parse_option(options, "repro_url")
2548 parse_option(options, "repro_retry_url")
2549 parse_option(options, "repro_components")
2551 def register_hints(self, hint_parser: HintParser) -> None:
2552 hint_parser.register_hint_type(
2553 HintType(
2554 "ignore-reproducible-src",
2555 versioned=HintAnnotate.OPTIONAL,
2556 architectured=HintAnnotate.OPTIONAL,
2557 )
2558 )
2559 hint_parser.register_hint_type(
2560 HintType(
2561 "ignore-reproducible",
2562 versioned=HintAnnotate.OPTIONAL,
2563 architectured=HintAnnotate.OPTIONAL,
2564 )
2565 )
2567 def initialise(self, britney: "Britney") -> None:
2568 super().initialise(britney)
2569 summary = self._reproducible
2571 assert hasattr(
2572 self, "state_dir"
2573 ), "Please set STATE_DIR in the britney configuration"
2574 assert (
2575 self.options.repro_components
2576 ), "Please set REPRO_COMPONENTS in the britney configuration"
2577 for file in os.listdir(self.state_dir):
2578 if not file.startswith("reproducible-"): 2578 ↛ 2579line 2578 didn't jump to line 2579 because the condition on line 2578 was never true
2579 continue
2580 filename = os.path.join(self.state_dir, file)
2582 self.logger.info("Loading reproducibility report from %s", filename)
2583 with open(filename) as fd: 2583 ↛ 2577line 2583 didn't jump to line 2577 because the continue on line 2585 wasn't executed
2584 if os.fstat(fd.fileno()).st_size < 1: 2584 ↛ 2585line 2584 didn't jump to line 2585 because the condition on line 2584 was never true
2585 continue
2586 data = json.load(fd)
2588 for result in data["records"]:
2589 if result["component"] in self.options.repro_components: 2589 ↛ 2588line 2589 didn't jump to line 2588 because the condition on line 2589 was always true
2590 summary.setdefault(result["architecture"], {})[
2591 (result["name"], result["version"])
2592 ] = result
2594 def _create_link_to_log(
2595 self, arch: str, src: str, failed_bpids: set[BinaryPackageId]
2596 ) -> str:
2597 link = ""
2598 for bpid in sorted(failed_bpids):
2599 link += f"{bpid[0]}, "
2600 link = link.strip(", ") # remove end
2602 if self.options.repro_log_url: 2602 ↛ 2611line 2602 didn't jump to line 2611 because the condition on line 2602 was always true
2603 # log should be the same for all binaries, using the last bpid
2604 try:
2605 log = self._reproducible[arch][(bpid[0], bpid[1])]["build_id"]
2606 except KeyError:
2607 log = self._reproducible["all"][(bpid[0], bpid[1])]["build_id"]
2608 url_log = self.options.repro_log_url.format(package=src, arch=arch, log=log)
2609 return f': <a href="{url_log}">{link}</a>'
2610 else:
2611 return ": " + link
2613 def apply_srcarch_policy_impl(
2614 self,
2615 policy_info: dict[str, Any],
2616 arch: str,
2617 source_data_tdist: SourcePackage | None,
2618 source_data_srcdist: SourcePackage,
2619 excuse: "Excuse",
2620 ) -> PolicyVerdict:
2621 verdict = PolicyVerdict.PASS
2622 eligible_for_bounty = False
2624 # we don't want to apply this policy (yet) on binNMUs
2625 if excuse.item.architecture != "source": 2625 ↛ 2626line 2625 didn't jump to line 2626 because the condition on line 2625 was never true
2626 return verdict
2628 # we're not supposed to judge on this arch
2629 if arch not in self.options.repro_arches: 2629 ↛ 2630line 2629 didn't jump to line 2630 because the condition on line 2629 was never true
2630 return verdict
2632 # bail out if this arch has no packages for this source (not build
2633 # here)
2634 if arch not in excuse.packages: 2634 ↛ 2635line 2634 didn't jump to line 2635 because the condition on line 2634 was never true
2635 return verdict
2637 # horrible hard-coding, but currently, we don't keep track of the
2638 # component when loading the packages files
2639 component = "main"
2640 if "/" in (section := source_data_srcdist.section): 2640 ↛ 2641line 2640 didn't jump to line 2641 because the condition on line 2640 was never true
2641 component = section.split("/")[0]
2643 if ( 2643 ↛ 2647line 2643 didn't jump to line 2647
2644 self.options.repro_components
2645 and component not in self.options.repro_components.split()
2646 ):
2647 return verdict
2649 source_name = excuse.item.package
2651 if self.options.repro_url: 2651 ↛ 2652line 2651 didn't jump to line 2652 because the condition on line 2651 was never true
2652 url = self.options.repro_url.format(package=quote(source_name), arch=arch)
2653 url_html = ' - <a href="%s">info</a>' % url
2654 if self.options.repro_retry_url:
2655 url_html += (
2656 ' <a href="%s">♻ </a>'
2657 % self.options.repro_retry_url.format(
2658 package=quote(source_name), arch=arch
2659 )
2660 )
2661 # When run on multiple archs, the last one "wins"
2662 policy_info["status-url"] = url
2663 else:
2664 url = None
2665 url_html = ""
2667 try:
2668 repro = self._reproducible[arch].copy()
2669 except KeyError:
2670 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2671 msg = f"No reproducibility data available at all for {arch}"
2672 excuse.add_verdict_info(verdict, msg)
2673 return verdict
2674 try:
2675 repro |= self._reproducible["all"]
2676 except KeyError:
2677 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2678 msg = "No reproducibility data available at all for arch:all"
2679 excuse.add_verdict_info(verdict, msg)
2680 return verdict
2682 # skip/delay policy until both arch:arch and arch:all builds are done
2683 if (arch or "all") in excuse.missing_builds: 2683 ↛ 2684line 2683 didn't jump to line 2684 because the condition on line 2683 was never true
2684 self.logger.debug(
2685 f"{excuse.name} not built for {arch} or all, skipping reproducible policy",
2686 )
2687 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2688 excuse.add_verdict_info(
2689 verdict,
2690 f"Reproducibility check deferred on {arch}: missing builds",
2691 )
2692 return verdict
2694 source_suite_state = "not-unknown"
2695 failed_bpids: set[BinaryPackageId] = set()
2696 # The states should either be GOOD/BAD for all binaries, UNKNOWN for all
2697 # binaries, or missing for all binaries, but let's not assume that.
2698 for bpid in binaries_from_source_version(source_data_srcdist, self.suite_info):
2699 if bpid[2] not in ("all", arch): 2699 ↛ 2700line 2699 didn't jump to line 2700 because the condition on line 2699 was never true
2700 continue
2701 try:
2702 state = repro[(bpid[0], bpid[1])]["status"]
2703 assert state in [
2704 "BAD",
2705 "FAIL",
2706 "GOOD",
2707 "UNKNOWN",
2708 None,
2709 ], f"Unexpected reproducible state {state}"
2710 self.logger.debug(f"repro data for {bpid}: {state}")
2711 if state == "BAD":
2712 failed_bpids.add(bpid)
2713 # not changing source_suite_state here on purpose
2714 elif state is None or state in ("FAIL", "UNKNOWN"): 2714 ↛ 2715line 2714 didn't jump to line 2715 because the condition on line 2714 was never true
2715 source_suite_state = "unknown"
2716 except KeyError:
2717 self.logger.debug(f"No repro data found for {bpid}")
2718 source_suite_state = "unknown"
2719 break
2721 if source_suite_state == "not-unknown":
2722 source_suite_state = "known"
2724 excuse_info = []
2725 if source_suite_state == "unknown":
2726 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2727 excuse_info.append(
2728 f"Reproducibility check waiting for results on {arch}{url_html}"
2729 )
2730 policy_info.setdefault("state", {}).setdefault(arch, "unavailable")
2731 elif failed_bpids:
2732 ignored_bpids: set[BinaryPackageId] = set()
2733 if source_data_tdist is None: 2733 ↛ 2734line 2733 didn't jump to line 2734 because the condition on line 2733 was never true
2734 target_suite_state = "new"
2735 else:
2736 target_suite_state = "reproducible"
2737 for bpid in list(failed_bpids):
2738 pkg_name = bpid[0]
2739 for bpid_t in filter_out_faux(source_data_tdist.binaries):
2740 if bpid_t[2] not in ("all", arch): 2740 ↛ 2741line 2740 didn't jump to line 2741 because the condition on line 2740 was never true
2741 continue
2742 if pkg_name != bpid_t[0]: 2742 ↛ 2743line 2742 didn't jump to line 2743 because the condition on line 2742 was never true
2743 continue
2744 try:
2745 state = repro[(pkg_name, bpid_t[1])]["status"]
2746 assert state in [
2747 "BAD",
2748 "FAIL",
2749 "GOOD",
2750 "UNKNOWN",
2751 None,
2752 ], f"Unexpected reproducible state {state}"
2753 self.logger.debug(
2754 f"testing repro data for {bpid_t}: {state}"
2755 )
2756 if state == "BAD":
2757 ignored_bpids.add(bpid)
2758 elif state is None or state in ("FAIL", "UNKNOWN"): 2758 ↛ 2759line 2758 didn't jump to line 2759 because the condition on line 2758 was never true
2759 target_suite_state = "unknown"
2760 except KeyError:
2761 self.logger.debug(
2762 f"No testing repro data found for {bpid_t}"
2763 )
2764 # This shouldn't happen as for the past migration
2765 # to have been allowed, there should be data.
2766 target_suite_state = "unknown"
2767 break
2769 # Reminder: code here is part of the non-reproducibile source-suite branch
2770 if target_suite_state == "new": 2770 ↛ 2771line 2770 didn't jump to line 2771 because the condition on line 2770 was never true
2771 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2772 excuse_info.append(
2773 f"New but not reproduced on {arch}{url_html}"
2774 f"{self._create_link_to_log(arch, source_name, failed_bpids)}"
2775 )
2776 policy_info.setdefault("state", {}).setdefault(
2777 arch, "new but not reproducible"
2778 )
2779 elif target_suite_state == "unknown": 2779 ↛ 2781line 2779 didn't jump to line 2781 because the condition on line 2779 was never true
2780 # Shouldn't happen after initial bootstrap once blocking
2781 verdict = PolicyVerdict.REJECTED_TEMPORARILY
2782 excuse_info.append(
2783 f"Reproducibility check failed and now waiting for reference "
2784 f"results on {arch}{url_html}"
2785 )
2786 policy_info.setdefault("state", {}).setdefault(
2787 arch, "waiting for reference"
2788 )
2789 elif len(failed_bpids - ignored_bpids) == 0:
2790 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2791 excuse_info.append(
2792 f"Not reproduced on {arch} (not a regression)"
2793 f"{self._create_link_to_log(arch, source_name, failed_bpids)}"
2794 )
2795 policy_info.setdefault("state", {}).setdefault(arch, "not reproducible")
2796 else:
2797 verdict = PolicyVerdict.REJECTED_PERMANENTLY
2798 excuse_info.append(
2799 f"Reproducibility regression on {arch}"
2800 f"{self._create_link_to_log(arch, source_name, failed_bpids - ignored_bpids)}"
2801 )
2802 policy_info.setdefault("state", {}).setdefault(arch, "regression")
2804 # non-reproducible source-suite cases are handled above, so here we
2805 # handle the last of the source-suite cases
2806 else:
2807 excuse_info.append(f"Reproduced on {arch}{url_html}")
2808 policy_info.setdefault("state", {}).setdefault(arch, "reproducible")
2809 eligible_for_bounty = True
2811 if verdict.is_rejected:
2812 assert self.hints is not None
2813 for hint_arch in ("source", arch):
2814 ignore_hints = self.hints.search(
2815 "ignore-reproducible-src",
2816 package=source_name,
2817 version=source_data_srcdist.version,
2818 architecture=hint_arch,
2819 )
2820 # one hint is enough, take the first one encountered
2821 if ignore_hints:
2822 verdict = PolicyVerdict.PASS_HINTED
2823 policy_info.setdefault("hints", {}).setdefault(arch, []).append(
2824 ignore_hints[0].user + ": " + str(ignore_hints[0])
2825 )
2826 if hint_arch == arch: 2826 ↛ 2829line 2826 didn't jump to line 2829 because the condition on line 2826 was always true
2827 on_arch = f" on {arch}"
2828 else:
2829 on_arch = ""
2830 excuse_info.append(
2831 f"Reproducibility issues ignored for src:{ignore_hints[0].package}"
2832 f"{on_arch} as requested by {ignore_hints[0].user}"
2833 )
2834 break
2836 if verdict.is_rejected:
2837 all_hints = []
2838 all_hinted = True
2839 any_hinted = False
2841 if source_suite_state == "known":
2842 check_bpids = failed_bpids
2843 else:
2844 # Let's not wait for results if all binaries have a hint
2845 check_bpids = filter_out_faux(source_data_srcdist.binaries)
2846 missed_bpids = set()
2848 for bpid in check_bpids:
2849 bpid_hints = self.hints.search(
2850 "ignore-reproducible",
2851 package=bpid[0],
2852 version=bpid[1],
2853 architecture=bpid[2],
2854 )
2855 if bpid_hints:
2856 # one hint per binary is enough
2857 all_hints.append(bpid_hints[0])
2858 any_hinted = True
2859 self.logger.debug(
2860 f"repro: hint found for {source_name}: {bpid}"
2861 )
2862 else:
2863 missed_bpids.add(bpid)
2864 all_hinted = False
2866 if all_hinted:
2867 verdict = PolicyVerdict.PASS_HINTED
2868 for hint in all_hints:
2869 policy_info.setdefault("hints", {}).setdefault(arch, []).append(
2870 hint.user + ": " + str(hint)
2871 )
2872 # TODO: we're going to print this for arch:all binaries on each arch
2873 excuse_info.append(
2874 f"Reproducibility issues ignored for {hint.package} on {arch} as "
2875 f"requested by {hint.user}"
2876 )
2877 elif any_hinted: 2877 ↛ 2878line 2877 didn't jump to line 2878 because the condition on line 2877 was never true
2878 self.logger.info(
2879 f"repro: binary hints for {source_name} ignored as they don't "
2880 f"cover these binaries {missed_bpids}"
2881 )
2883 if self.options.repro_success_bounty and eligible_for_bounty: 2883 ↛ 2884line 2883 didn't jump to line 2884 because the condition on line 2883 was never true
2884 excuse.add_bounty("reproducibility", self.options.repro_success_bounty)
2886 if verdict.is_rejected and self.options.repro_regression_penalty: 2886 ↛ 2888line 2886 didn't jump to line 2888 because the condition on line 2886 was never true
2887 # With a non-zero penalty, we shouldn't block on this policy
2888 verdict = PolicyVerdict.PASS
2889 if self.options.repro_regression_penalty > 0:
2890 excuse.add_penalty(
2891 "reproducibility", self.options.repro_regression_penalty
2892 )
2894 for msg in excuse_info:
2895 if verdict.is_rejected:
2896 excuse.add_verdict_info(verdict, msg)
2897 else:
2898 excuse.addinfo(msg)
2900 return verdict