Coverage for britney2/excusefinder.py: 92%
344 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
1import logging
2import optparse
3from itertools import chain
4from typing import TYPE_CHECKING, Any, Optional, cast
5from collections.abc import Iterable
6from urllib.parse import quote
8import apt_pkg
10from britney2 import BinaryPackage, BinaryPackageId, PackageId, Suites
11from britney2.excuse import Excuse
12from britney2.migrationitem import MigrationItem, MigrationItemFactory
13from britney2.policies import PolicyVerdict
14from britney2.utils import find_smooth_updateable_binaries, invalidate_excuses
16if TYPE_CHECKING: 16 ↛ 17line 16 didn't jump to line 17, because the condition on line 16 was never true
17 from .hints import HintCollection
18 from .installability.universe import BinaryPackageUniverse
19 from .policies.policy import PolicyEngine
22class ExcuseFinder(object):
24 def __init__(
25 self,
26 options: optparse.Values,
27 suite_info: Suites,
28 all_binaries: dict[BinaryPackageId, BinaryPackage],
29 pkg_universe: "BinaryPackageUniverse",
30 policy_engine: "PolicyEngine",
31 mi_factory: MigrationItemFactory,
32 hints: "HintCollection",
33 ) -> None:
34 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
35 self.logger = logging.getLogger(logger_name)
36 self.options = options
37 self.suite_info = suite_info
38 self.all_binaries = all_binaries
39 self.pkg_universe = pkg_universe
40 self._policy_engine = policy_engine
41 self._migration_item_factory = mi_factory
42 self.hints = hints
43 self.excuses: dict[str, Excuse] = {}
45 def _get_build_link(
46 self, arch: str, src: str, ver: str, label: Optional[str] = None
47 ) -> str:
48 """Return a link to the build logs, labelled 'arch' per default"""
49 if label is None:
50 label = arch
51 if self.options.build_url:
52 url = self.options.build_url.format(
53 arch=arch, source=quote(src), version=quote(ver)
54 )
55 return '<a href="%s" target="_blank">%s</a>' % (url, label)
56 else:
57 return label
59 def _should_remove_source(self, item: MigrationItem) -> bool:
60 """Check if a source package should be removed from testing
62 This method checks if a source package should be removed from the
63 target suite; this happens if the source package is not
64 present in the primary source suite anymore.
66 It returns True if the package can be removed, False otherwise.
67 In the former case, a new excuse is appended to the object
68 attribute excuses.
69 """
70 if hasattr(self.options, "partial_source"): 70 ↛ 71line 70 didn't jump to line 71, because the condition on line 70 was never true
71 return False
72 # if the source package is available in unstable, then do nothing
73 source_suite = self.suite_info.primary_source_suite
74 pkg = item.package
75 if pkg in source_suite.sources: 75 ↛ 76line 75 didn't jump to line 76, because the condition on line 75 was never true
76 return False
77 # otherwise, add a new excuse for its removal
78 src = item.suite.sources[pkg]
79 excuse = Excuse(item)
80 excuse.addinfo("Package not in %s, will try to remove" % source_suite.name)
81 excuse.set_vers(src.version, None)
82 if src.maintainer:
83 excuse.set_maint(src.maintainer)
84 if src.section: 84 ↛ 88line 84 didn't jump to line 88, because the condition on line 84 was never false
85 excuse.set_section(src.section)
87 # if the package is blocked, skip it
88 for hint in self.hints.search("block", package=pkg, removal=True):
89 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY
90 excuse.add_verdict_info(
91 excuse.policy_verdict,
92 "Not touching package, as requested by %s "
93 "(contact %s-release if update is needed)"
94 % (hint.user, self.options.distribution),
95 )
96 excuse.addreason("block")
97 self.excuses[excuse.name] = excuse
98 return False
100 excuse.policy_verdict = PolicyVerdict.PASS
101 self.excuses[excuse.name] = excuse
102 return True
104 def _should_upgrade_srcarch(self, item: MigrationItem) -> bool:
105 """Check if a set of binary packages should be upgraded
107 This method checks if the binary packages produced by the source
108 package on the given architecture should be upgraded; this can
109 happen also if the migration is a binary-NMU for the given arch.
111 It returns False if the given packages don't need to be upgraded,
112 True otherwise. In the former case, a new excuse is appended to
113 the object attribute excuses.
114 """
115 # retrieve the source packages for testing and suite
117 target_suite = self.suite_info.target_suite
118 source_suite = item.suite
119 src = item.package
120 arch = item.architecture
121 source_t = target_suite.sources[src]
122 source_u = source_suite.sources[src]
124 excuse = Excuse(item)
125 excuse.set_vers(source_t.version, source_t.version)
126 if source_u.maintainer: 126 ↛ 128line 126 didn't jump to line 128, because the condition on line 126 was never false
127 excuse.set_maint(source_u.maintainer)
128 if source_u.section: 128 ↛ 135line 128 didn't jump to line 135, because the condition on line 128 was never false
129 excuse.set_section(source_u.section)
131 # if there is a `remove' hint and the requested version is the same as the
132 # version in testing, then stop here and return False
133 # (as a side effect, a removal may generate such excuses for both the source
134 # package and its binary packages on each architecture)
135 for hint in self.hints.search("remove", package=src, version=source_t.version):
136 excuse.add_hint(hint)
137 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY
138 excuse.add_verdict_info(
139 excuse.policy_verdict, "Removal request by %s" % (hint.user)
140 )
141 excuse.add_verdict_info(
142 excuse.policy_verdict, "Trying to remove package, not update it"
143 )
144 self.excuses[excuse.name] = excuse
145 return False
147 # the starting point is that there is nothing wrong and nothing worth doing
148 anywrongver = False
149 anyworthdoing = False
151 packages_t_a = target_suite.binaries[arch]
152 packages_s_a = source_suite.binaries[arch]
154 wrong_verdict = PolicyVerdict.REJECTED_PERMANENTLY
156 # for every binary package produced by this source in unstable for this architecture
157 for pkg_id in sorted(x for x in source_u.binaries if x.architecture == arch):
158 pkg_name = pkg_id.package_name
159 # TODO filter binaries based on checks below?
160 excuse.add_package(pkg_id)
162 # retrieve the testing (if present) and unstable corresponding binary packages
163 binary_t = packages_t_a[pkg_name] if pkg_name in packages_t_a else None
164 binary_u = packages_s_a[pkg_name]
166 # this is the source version for the new binary package
167 pkgsv = binary_u.source_version
169 # if the new binary package is architecture-independent, then skip it
170 if binary_u.architecture == "all":
171 if pkg_id not in source_t.binaries:
172 # only add a note if the arch:all does not match the expected version
173 excuse.add_detailed_info(
174 "Ignoring %s %s (from %s) as it is arch: all"
175 % (pkg_name, binary_u.version, pkgsv)
176 )
177 continue
179 # if the new binary package is not from the same source as the testing one, then skip it
180 # this implies that this binary migration is part of a source migration
181 if source_u.version == pkgsv and source_t.version != pkgsv: 181 ↛ 182line 181 didn't jump to line 182, because the condition on line 181 was never true
182 anywrongver = True
183 excuse.add_verdict_info(
184 wrong_verdict,
185 "From wrong source: %s %s (%s not %s)"
186 % (pkg_name, binary_u.version, pkgsv, source_t.version),
187 )
188 continue
190 # cruft in unstable
191 if source_u.version != pkgsv and source_t.version != pkgsv:
192 if self.options.ignore_cruft:
193 excuse.add_detailed_info(
194 "Old cruft: %s %s (but ignoring cruft, so nevermind)"
195 % (pkg_name, pkgsv)
196 )
197 else:
198 anywrongver = True
199 excuse.add_verdict_info(
200 wrong_verdict, "Old cruft: %s %s" % (pkg_name, pkgsv)
201 )
202 continue
204 # if the source package has been updated in unstable and this is a binary migration, skip it
205 # (the binaries are now out-of-date)
206 if source_t.version == pkgsv and source_t.version != source_u.version: 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true
207 anywrongver = True
208 excuse.add_verdict_info(
209 wrong_verdict,
210 "From wrong source: %s %s (%s not %s)"
211 % (pkg_name, binary_u.version, pkgsv, source_u.version),
212 )
213 continue
215 # if the binary is not present in testing, then it is a new binary;
216 # in this case, there is something worth doing
217 if not binary_t:
218 excuse.add_detailed_info(
219 "New binary: %s (%s)" % (pkg_name, binary_u.version)
220 )
221 anyworthdoing = True
222 continue
224 # at this point, the binary package is present in testing, so we can compare
225 # the versions of the packages ...
226 vcompare = apt_pkg.version_compare(binary_t.version, binary_u.version)
228 # ... if updating would mean downgrading, then stop here: there is something wrong
229 if vcompare > 0: 229 ↛ 230line 229 didn't jump to line 230, because the condition on line 229 was never true
230 anywrongver = True
231 excuse.add_verdict_info(
232 wrong_verdict,
233 "Not downgrading: %s (%s to %s)"
234 % (pkg_name, binary_t.version, binary_u.version),
235 )
236 break
237 # ... if updating would mean upgrading, then there is something worth doing
238 elif vcompare < 0:
239 excuse.add_detailed_info(
240 "Updated binary: %s (%s to %s)"
241 % (pkg_name, binary_t.version, binary_u.version)
242 )
243 anyworthdoing = True
245 srcv = source_u.version
246 same_source = source_t.version == srcv
247 primary_source_suite = self.suite_info.primary_source_suite
248 is_primary_source = source_suite == primary_source_suite
250 # if there is nothing wrong and there is something worth doing or the source
251 # package is not fake, then check what packages should be removed
252 if not anywrongver and (anyworthdoing or not source_u.is_fakesrc):
253 # we want to remove binaries that are no longer produced by the
254 # new source, but there are some special cases:
255 # - if this is binary-only (same_source) and not from the primary
256 # source, we don't do any removals:
257 # binNMUs in *pu on some architectures would otherwise result in
258 # the removal of binaries on other architectures
259 # - for the primary source, smooth binaries in the target suite
260 # are not considered for removal
261 if not same_source or is_primary_source:
262 smoothbins = set()
263 if is_primary_source: 263 ↛ 281line 263 didn't jump to line 281, because the condition on line 263 was never false
264 binaries_t = target_suite.binaries
265 possible_smooth_updates = [
266 p for p in source_t.binaries if p.architecture == arch
267 ]
268 smoothbins = find_smooth_updateable_binaries(
269 possible_smooth_updates,
270 source_u,
271 self.pkg_universe,
272 target_suite,
273 binaries_t,
274 source_suite.binaries,
275 cast(frozenset["BinaryPackageId"], frozenset()),
276 self.options.smooth_updates,
277 self.hints,
278 )
280 # for every binary package produced by this source in testing for this architecture
281 for pkg_id in sorted(
282 x for x in source_t.binaries if x.architecture == arch
283 ):
284 pkg = pkg_id.package_name
285 # if the package is architecture-independent, then ignore it
286 tpkg_data = packages_t_a[pkg]
287 if tpkg_data.architecture == "all":
288 if pkg_id not in source_u.binaries:
289 # only add a note if the arch:all does not match the expected version
290 excuse.add_detailed_info(
291 "Ignoring removal of %s as it is arch: all" % (pkg)
292 )
293 continue
294 # if the package is not produced by the new source package, then remove it from testing
295 if pkg not in packages_s_a:
296 excuse.add_detailed_info(
297 "Removed binary: %s %s" % (pkg, tpkg_data.version)
298 )
299 # the removed binary is only interesting if this is a binary-only migration,
300 # as otherwise the updated source will already cause the binary packages
301 # to be updated
302 if same_source and pkg_id not in smoothbins:
303 # Special-case, if the binary is a candidate for a smooth update, we do not consider
304 # it "interesting" on its own. This case happens quite often with smooth updatable
305 # packages, where the old binary "survives" a full run because it still has
306 # reverse dependencies.
307 anyworthdoing = True
309 if not anyworthdoing and not (
310 self.options.archall_inconsistency_allowed and excuse.detailed_info
311 ):
312 # nothing worth doing, we don't add an excuse to the list, we just return false
313 return False
315 if not anyworthdoing:
316 # This source has binary differences between the target and source
317 # suite, but we're not going to upgrade them. Part of the purpose
318 # of options.archall_inconsistency_allowed is to log the excuse
319 # with a temporary failure such that the administrators can take
320 # action so they wish.
321 excuse.policy_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT
322 excuse.addreason("everything-ignored")
324 else:
325 # there is something worth doing
326 # we assume that this package will be ok, if not invalidated below
327 excuse.policy_verdict = PolicyVerdict.PASS
329 # if there is something something wrong, reject this package
330 if anywrongver:
331 excuse.policy_verdict = wrong_verdict
333 self._policy_engine.apply_srcarch_policies(
334 item, arch, source_t, source_u, excuse
335 )
337 self.excuses[excuse.name] = excuse
338 return excuse.is_valid
340 def _should_upgrade_src(self, item: MigrationItem) -> bool:
341 """Check if source package should be upgraded
343 This method checks if a source package should be upgraded. The analysis
344 is performed for the source package specified by the `src' parameter,
345 for the distribution `source_suite'.
347 It returns False if the given package doesn't need to be upgraded,
348 True otherwise. In the former case, a new excuse is appended to
349 the object attribute excuses.
350 """
352 src = item.package
353 source_suite = item.suite
354 suite_name = source_suite.name
355 source_u = source_suite.sources[src]
356 if source_u.is_fakesrc: 356 ↛ 358line 356 didn't jump to line 358, because the condition on line 356 was never true
357 # it is a fake package created to satisfy Britney implementation details; silently ignore it
358 return False
360 target_suite = self.suite_info.target_suite
361 # retrieve the source packages for testing (if available) and suite
362 if src in target_suite.sources:
363 source_t = target_suite.sources[src]
364 # if testing and unstable have the same version, then this is a candidate for binary-NMUs only
365 if apt_pkg.version_compare(source_t.version, source_u.version) == 0: 365 ↛ 366line 365 didn't jump to line 366, because the condition on line 365 was never true
366 return False
367 else:
368 source_t = None
370 excuse = Excuse(item)
371 excuse.set_vers(source_t and source_t.version or None, source_u.version)
372 if source_u.maintainer: 372 ↛ 374line 372 didn't jump to line 374, because the condition on line 372 was never false
373 excuse.set_maint(source_u.maintainer)
374 if source_u.section: 374 ↛ 376line 374 didn't jump to line 376, because the condition on line 374 was never false
375 excuse.set_section(source_u.section)
376 excuse.add_package(PackageId(src, source_u.version, "source"))
378 # if the version in unstable is older, then stop here with a warning in the excuse and return False
379 if source_t and apt_pkg.version_compare(source_u.version, source_t.version) < 0:
380 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY
381 excuse.add_verdict_info(
382 excuse.policy_verdict,
383 "ALERT: %s is newer in the target suite (%s %s)"
384 % (src, source_t.version, source_u.version),
385 )
386 self.excuses[excuse.name] = excuse
387 excuse.addreason("newerintesting")
388 return False
390 # the starting point is that we will update the candidate
391 excuse.policy_verdict = PolicyVerdict.PASS
393 # if there is a `remove' hint and the requested version is the same as the
394 # version in testing, then stop here and return False
395 for hint in self.hints.search("remove", package=src):
396 if ( 396 ↛ 395line 396 didn't jump to line 395
397 source_t
398 and source_t.version == hint.version
399 or source_u.version == hint.version
400 ):
401 excuse.add_hint(hint)
402 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY
403 excuse.add_verdict_info(
404 excuse.policy_verdict, "Removal request by %s" % (hint.user)
405 )
406 excuse.add_verdict_info(
407 excuse.policy_verdict, "Trying to remove package, not update it"
408 )
409 break
411 all_binaries = self.all_binaries
413 # at this point, we check the status of the builds on all the supported architectures
414 # to catch the out-of-date ones
415 archs_to_consider = list(self.options.architectures)
416 archs_to_consider.append("all")
417 for arch in archs_to_consider:
418 oodbins: dict[str, set[str]] = {}
419 uptodatebins = False
420 # for every binary package produced by this source in the suite for this architecture
421 if arch == "all":
422 consider_binaries: Iterable[BinaryPackageId] = source_u.binaries
423 else:
424 # Will also include arch:all for the given architecture (they are filtered out
425 # below)
426 consider_binaries = sorted(
427 x for x in source_u.binaries if x.architecture == arch
428 )
429 for pkg_id in consider_binaries:
430 pkg = pkg_id.package_name
432 # retrieve the binary package and its source version
433 binary_u = all_binaries[pkg_id]
434 pkgsv = binary_u.source_version
436 # arch:all packages are treated separately from arch:arch
437 if binary_u.architecture != arch:
438 continue
440 # TODO filter binaries based on checks below?
441 excuse.add_package(pkg_id)
443 # if it wasn't built by the same source, it is out-of-date
444 # if there is at least one binary on this arch which is
445 # up-to-date, there is a build on this arch
446 if source_u.version != pkgsv or pkg_id.architecture == "faux":
447 if pkgsv not in oodbins:
448 oodbins[pkgsv] = set()
449 oodbins[pkgsv].add(pkg)
450 if pkg_id.architecture != "faux":
451 excuse.add_old_binary(pkg, pkgsv)
452 continue
453 else:
454 uptodatebins = True
456 # if there are out-of-date packages, warn about them in the excuse and set excuse.is_valid
457 # to False to block the update; if the architecture where the package is out-of-date is
458 # in the `outofsync_arches' list, then do not block the update
459 if oodbins:
460 oodtxt = ""
461 for v in sorted(oodbins):
462 if oodtxt: 462 ↛ 463line 462 didn't jump to line 463, because the condition on line 462 was never true
463 oodtxt = oodtxt + "; "
464 oodtxt = oodtxt + "%s (from %s)" % (
465 ", ".join(sorted(oodbins[v])),
466 self._get_build_link(arch, src, v, label=v),
467 )
469 if uptodatebins:
470 text = "old binaries left on %s: %s" % (
471 self._get_build_link(arch, src, source_u.version),
472 oodtxt,
473 )
474 else:
475 text = "missing build on %s" % (
476 self._get_build_link(arch, src, source_u.version)
477 )
479 if arch in self.options.outofsync_arches:
480 text = text + " (but %s isn't keeping up, so nevermind)" % (arch)
481 if not uptodatebins: 481 ↛ 417line 481 didn't jump to line 417, because the condition on line 481 was never false
482 excuse.missing_build_on_ood_arch(arch)
483 else:
484 if uptodatebins:
485 if self.options.ignore_cruft:
486 text = text + " (but ignoring cruft, so nevermind)"
487 excuse.add_detailed_info(text)
488 else:
489 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY
490 excuse.addreason("cruft")
491 excuse.add_verdict_info(excuse.policy_verdict, text)
492 else:
493 excuse.policy_verdict = (
494 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT
495 )
496 excuse.missing_build_on_arch(arch)
497 excuse.addreason("missingbuild")
498 excuse.add_verdict_info(excuse.policy_verdict, text)
499 if excuse.old_binaries:
500 excuse.add_detailed_info(
501 "old binaries on %s: %s" % (arch, oodtxt)
502 )
504 # if the source package has no binaries, set is_valid to False to block the update
505 if not {x for x in source_u.binaries if x[2] != "faux"}:
506 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY
507 excuse.add_verdict_info(
508 excuse.policy_verdict, "%s has no binaries on any arch" % src
509 )
510 excuse.addreason("no-binaries")
512 self._policy_engine.apply_src_policies(item, source_t, source_u, excuse)
514 if source_suite.suite_class.is_additional_source and source_t:
515 # o-o-d(ish) checks for (t-)p-u
516 # This only makes sense if the package is actually in testing.
517 for arch in self.options.architectures:
518 # if the package in testing has no binaries on this
519 # architecture, it can't be out-of-date
520 if not any(
521 x
522 for x in source_t.binaries
523 if x.architecture == arch and all_binaries[x].architecture != "all"
524 ):
525 continue
527 # if the (t-)p-u package has produced any binaries on
528 # this architecture then we assume it's ok. this allows for
529 # uploads to (t-)p-u which intentionally drop binary
530 # packages
531 if any(
532 x
533 for x in source_suite.binaries[arch].values()
534 if x.source == src
535 and x.source_version == source_u.version
536 and x.architecture != "all"
537 ):
538 continue
540 # TODO: Find a way to avoid hardcoding pu/stable relation.
541 if suite_name == "pu": 541 ↛ 542line 541 didn't jump to line 542, because the condition on line 541 was never true
542 base = "stable"
543 else:
544 base = target_suite.name
545 text = "Not yet built on %s (relative to target suite)" % (
546 self._get_build_link(arch, src, source_u.version)
547 )
549 if arch in self.options.outofsync_arches: 549 ↛ 550line 549 didn't jump to line 550, because the condition on line 549 was never true
550 text = text + " (but %s isn't keeping up, so never mind)" % (arch)
551 excuse.missing_build_on_ood_arch(arch)
552 excuse.addinfo(text)
553 else:
554 excuse.policy_verdict = (
555 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT
556 )
557 excuse.missing_build_on_arch(arch)
558 excuse.addreason("missingbuild")
559 excuse.add_verdict_info(excuse.policy_verdict, text)
561 # check if there is a `force' hint for this package, which allows it to go in even if it is not updateable
562 forces = self.hints.search("force", package=src, version=source_u.version)
563 if forces:
564 # force() updates the final verdict for us
565 changed_state = excuse.force()
566 if changed_state:
567 excuse.addinfo("Should ignore, but forced by %s" % (forces[0].user))
569 self.excuses[excuse.name] = excuse
570 return excuse.is_valid
572 def _compute_excuses_and_initial_actionable_items(self) -> set[MigrationItem]:
573 # list of local methods and variables (for better performance)
574 excuses = self.excuses
575 suite_info = self.suite_info
576 pri_source_suite = suite_info.primary_source_suite
577 architectures = self.options.architectures
578 should_remove_source = self._should_remove_source
579 should_upgrade_srcarch = self._should_upgrade_srcarch
580 should_upgrade_src = self._should_upgrade_src
582 sources_ps = pri_source_suite.sources
583 sources_t = suite_info.target_suite.sources
585 # this set will contain the packages which are valid candidates;
586 # if a package is going to be removed, it will have a "-" prefix
587 actionable_items: set[MigrationItem] = set()
588 actionable_items_add = actionable_items.add # Every . in a loop slows it down
590 # for every source package in testing, check if it should be removed
591 for pkg in sources_t:
592 if pkg not in sources_ps:
593 src_t = sources_t[pkg]
594 item = MigrationItem(
595 package=pkg,
596 version=src_t.version,
597 suite=suite_info.target_suite,
598 is_removal=True,
599 )
600 if should_remove_source(item):
601 actionable_items_add(item)
603 # for every source package in the source suites, check if it should be upgraded
604 for suite in chain((pri_source_suite, *suite_info.additional_source_suites)):
605 sources_s = suite.sources
606 for pkg in sources_s:
607 src_s_data = sources_s[pkg]
608 if src_s_data.is_fakesrc:
609 continue
610 src_t_data = sources_t.get(pkg)
612 if (
613 src_t_data is None
614 or apt_pkg.version_compare(src_s_data.version, src_t_data.version)
615 != 0
616 ):
617 item = MigrationItem(
618 package=pkg, version=src_s_data.version, suite=suite
619 )
620 # check if the source package should be upgraded
621 if should_upgrade_src(item):
622 actionable_items_add(item)
623 else:
624 # package has same version in source and target suite; check if any of the
625 # binaries have changed on the various architectures
626 for arch in architectures:
627 item = MigrationItem(
628 package=pkg,
629 version=src_s_data.version,
630 architecture=arch,
631 suite=suite,
632 )
633 if should_upgrade_srcarch(item):
634 actionable_items_add(item)
636 # process the `remove' hints, if the given package is not yet in actionable_items
637 for hint in self.hints["remove"]:
638 src_r = hint.package
639 if src_r not in sources_t:
640 continue
642 existing_items = set(x for x in actionable_items if x.package == src_r)
643 if existing_items:
644 self.logger.info(
645 "removal hint '%s' ignored due to existing item(s) %s"
646 % (hint, [i.name for i in existing_items])
647 )
648 continue
650 tsrcv = sources_t[src_r].version
651 item = MigrationItem(
652 package=src_r,
653 version=tsrcv,
654 suite=suite_info.target_suite,
655 is_removal=True,
656 )
658 # check if the version specified in the hint is the same as the considered package
659 if tsrcv != hint.version: 659 ↛ 660line 659 didn't jump to line 660, because the condition on line 659 was never true
660 continue
662 # add the removal of the package to actionable_items and build a new excuse
663 excuse = Excuse(item)
664 excuse.set_vers(tsrcv, None)
665 excuse.addinfo("Removal request by %s" % (hint.user))
666 # if the removal of the package is blocked, skip it
667 blocked = False
668 for blockhint in self.hints.search("block", package=src_r, removal=True):
669 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY
670 excuse.add_verdict_info(
671 excuse.policy_verdict,
672 "Not removing package, due to block hint by %s "
673 "(contact %s-release if update is needed)"
674 % (blockhint.user, self.options.distribution),
675 )
676 excuse.addreason("block")
677 blocked = True
679 if blocked:
680 excuses[excuse.name] = excuse
681 continue
683 actionable_items_add(item)
684 excuse.addinfo("Package is broken, will try to remove")
685 excuse.add_hint(hint)
686 # Using "PASS" here as "Created by a hint" != "accepted due to hint". In a future
687 # where there might be policy checks on removals, it would make sense to distinguish
688 # those two states. Not sure that future will ever be.
689 excuse.policy_verdict = PolicyVerdict.PASS
690 excuses[excuse.name] = excuse
692 return actionable_items
694 def find_actionable_excuses(self) -> tuple[dict[str, Excuse], set[MigrationItem]]:
695 excuses = self.excuses
696 actionable_items = self._compute_excuses_and_initial_actionable_items()
697 valid = {x.name for x in actionable_items}
699 # extract the not considered packages, which are in the excuses but not in upgrade_me
700 unconsidered = {ename for ename in excuses if ename not in valid}
701 invalidated: set[str] = set()
703 invalidate_excuses(excuses, valid, unconsidered, invalidated)
705 # check that the list of actionable items matches the list of valid
706 # excuses
707 assert_sets_equal(valid, {x for x in excuses if excuses[x].is_valid})
709 # check that the rdeps for all invalid excuses were invalidated
710 assert_sets_equal(invalidated, {x for x in excuses if not excuses[x].is_valid})
712 actionable_items = {x for x in actionable_items if x.name in valid}
713 return excuses, actionable_items
716def assert_sets_equal(a: Any, b: Any) -> None:
717 if a != b: 717 ↛ 718line 717 didn't jump to line 718, because the condition on line 717 was never true
718 raise AssertionError("sets not equal a-b {} b-a {}".format(a - b, b - a))