Coverage for britney2/excuse.py: 95%
332 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
1# -*- coding: utf-8 -*-
3# Copyright (C) 2001-2004 Anthony Towns <ajt@debian.org>
4# Andreas Barth <aba@debian.org>
5# Fabio Tranchitella <kobold@debian.org>
7# This program is free software; you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation; either version 2 of the License, or
10# (at your option) any later version.
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
17import re
18from collections import defaultdict
19from typing import TYPE_CHECKING, Any, Optional, Union, cast
21from britney2 import BinaryPackageId, DependencyType, PackageId
22from britney2.excusedeps import (
23 DependencySpec,
24 DependencyState,
25 ImpossibleDependencyState,
26)
27from britney2.migrationitem import MigrationItem
28from britney2.policies import PolicyVerdict
30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31, because the condition on line 30 was never true
31 from .hints import Hint
33ExcusesType = Union[dict[PackageId, "Excuse"], dict[str, "Excuse"]]
36VERDICT2DESC = {
37 PolicyVerdict.PASS: "Will attempt migration (Any information below is purely informational)",
38 PolicyVerdict.PASS_HINTED: "Will attempt migration due to a hint (Any information below is purely informational)",
39 PolicyVerdict.REJECTED_TEMPORARILY: (
40 "Waiting for test results or another package, or too young (no action "
41 "required now - check later)"
42 ),
43 PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM: (
44 "Waiting for another item to be ready to migrate (no action required now "
45 "- check later)"
46 ),
47 PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM: (
48 "BLOCKED: Cannot migrate due to another item, which is blocked (please "
49 "check which dependencies are stuck)"
50 ),
51 PolicyVerdict.REJECTED_NEEDS_APPROVAL: (
52 "BLOCKED: Needs an approval (either due to a freeze, the source suite or "
53 "a manual hint)"
54 ),
55 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT: (
56 "BLOCKED: Maybe temporary, maybe blocked but Britney is missing information "
57 "(check below)"
58 ),
59 PolicyVerdict.REJECTED_PERMANENTLY: "BLOCKED: Rejected/violates migration policy/introduces a regression",
60}
63class ExcuseDependency(object):
64 """Object to represent a specific dependency of an excuse on a package
65 (source or binary) or on other excuses"""
67 def __init__(self, spec: DependencySpec, depstates: list[DependencyState]) -> None:
68 """
69 :param: spec: DependencySpec
70 :param: depstates: list of DependencyState, each of which can satisfy
71 the dependency
72 """
73 self.spec = spec
74 self.depstates = depstates
76 @property
77 def deptype(self) -> DependencyType:
78 return self.spec.deptype
80 @property
81 def valid(self) -> bool:
82 if {d for d in self.depstates if d.valid}:
83 return True
84 else:
85 return False
87 @property
88 def deps(self) -> set[Optional[Union[str, PackageId]]]:
89 return {d.dep for d in self.depstates}
91 @property
92 def possible(self) -> bool:
93 if {d for d in self.depstates if d.possible}:
94 return True
95 else:
96 return False
98 @property
99 def first_dep(self) -> Optional[Union[str, PackageId]]:
100 """return the first valid dependency, if there is one, otherwise the
101 first possible one
103 return None if there are only impossible dependencies
104 """
105 first = None
106 for d in self.depstates:
107 if d.valid:
108 return d.dep
109 elif d.possible and not first:
110 first = d.dep
111 return first
113 @property
114 def first_impossible_dep(self) -> Optional[str]:
115 """return the first impossible dependency, if there is one"""
116 first = None
117 for d in self.depstates: 117 ↛ 121line 117 didn't jump to line 121, because the loop on line 117 didn't complete
118 if not d.possible: 118 ↛ 117line 118 didn't jump to line 117, because the condition on line 118 was never false
119 assert isinstance(d, ImpossibleDependencyState) # for type checking
120 return d.desc
121 return first
123 @property
124 def verdict(self) -> PolicyVerdict:
125 return min({d.verdict for d in self.depstates})
127 def invalidate(self, excuse: str, verdict: PolicyVerdict) -> bool:
128 """invalidate the dependencies on a specific excuse
130 :param excuse: the excuse which is no longer valid
131 :param verdict: the PolicyVerdict causing the invalidation
132 """
133 valid_alternative_left = False
134 for ds in self.depstates:
135 if ds.dep == excuse:
136 ds.invalidate(verdict)
137 elif ds.valid:
138 valid_alternative_left = True
140 return valid_alternative_left
143class Excuse(object):
144 """Excuse class
146 This class represents an update excuse, which is a detailed explanation
147 of why a package can or cannot be updated in the testing distribution from
148 a newer package in another distribution (like for example unstable).
150 The main purpose of the excuses is to be written in an HTML file which
151 will be published over HTTP. The maintainers will be able to parse it
152 manually or automatically to find the explanation of why their packages
153 have been updated or not.
154 """
156 # @var reemail
157 # Regular expression for removing the email address
158 reemail = re.compile(r" *<.*?>")
160 def __init__(self, migrationitem: Union[str, MigrationItem]) -> None:
161 """Class constructor
163 This method initializes the excuse with the specified name and
164 the default values.
165 """
166 self.item = migrationitem
167 self.ver = ("-", "-")
168 self.maint: Optional[str] = None
169 self.daysold: Optional[int] = None
170 self.mindays: Optional[int] = None
171 self.section: Optional[str] = None
172 self._is_valid = False
173 self.needs_approval = False
174 self.hints: list["Hint"] = []
175 self.forced = False
176 self._policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY
178 self.all_deps: list[ExcuseDependency] = []
179 self.unsatisfiable_on_archs: list[str] = []
180 self.unsat_deps: dict[str, set[str]] = defaultdict(set)
181 self.newbugs: set[str] = set()
182 self.oldbugs: set[str] = set()
183 self.reason: dict[str, int] = {}
184 self.htmlline: list[str] = []
185 self.missing_builds: set[str] = set()
186 self.missing_builds_ood_arch: set[str] = set()
187 self.old_binaries: dict[str, set[str]] = defaultdict(set)
188 self.policy_info: dict[str, dict[str, Any]] = {}
189 self.verdict_info: dict[PolicyVerdict, list[str]] = defaultdict(list)
190 self.infoline: list[str] = []
191 self.detailed_info: list[str] = []
192 self.dep_info_rendered: bool = False
194 # packages (source and binary) that will migrate to testing if the
195 # item from this excuse migrates
196 self.packages: dict[str, set[PackageId]] = defaultdict(set)
198 # list of ExcuseDependency, with dependencies on packages
199 self.depends_packages: list[ExcuseDependency] = []
200 # contains all PackageIds in any over the sets above
201 self.depends_packages_flattened: set[PackageId] = set()
203 self.bounty: dict[str, int] = {}
204 self.penalty: dict[str, int] = {}
206 # messenger from AutopkgtestPolicy to BlockPolicy
207 self.autopkgtest_results: Optional[set[str]] = None
209 def sortkey(self) -> tuple[int, str]:
210 if self.daysold is None:
211 return (-1, self.uvname)
212 return (self.daysold, self.uvname)
214 @property
215 def name(self) -> str:
216 assert isinstance(self.item, MigrationItem)
217 return self.item.name
219 @property
220 def uvname(self) -> str:
221 assert isinstance(self.item, MigrationItem)
222 return self.item.uvname
224 @property
225 def source(self) -> str:
226 assert isinstance(self.item, MigrationItem)
227 return self.item.package
229 @property
230 def is_valid(self) -> bool:
231 return False if self._policy_verdict.is_rejected else True
233 @property
234 def policy_verdict(self) -> PolicyVerdict:
235 return self._policy_verdict
237 @policy_verdict.setter
238 def policy_verdict(self, value: PolicyVerdict) -> None:
239 if value.is_rejected and self.forced: 239 ↛ 242line 239 didn't jump to line 242, because the condition on line 239 was never true
240 # By virtue of being forced, the item was hinted to
241 # undo the rejection
242 value = PolicyVerdict.PASS_HINTED
243 self._policy_verdict = value
245 def set_vers(self, tver: Optional[str], uver: Optional[str]) -> None:
246 """Set the versions of the item from target and source suite"""
247 if tver and uver:
248 self.ver = (tver, uver)
249 elif tver:
250 self.ver = (tver, self.ver[1])
251 elif uver: 251 ↛ exitline 251 didn't return from function 'set_vers', because the condition on line 251 was never false
252 self.ver = (self.ver[0], uver)
254 def set_maint(self, maint: str) -> None:
255 """Set the package maintainer's name"""
256 self.maint = self.reemail.sub("", maint)
258 def set_section(self, section: str) -> None:
259 """Set the section of the package"""
260 self.section = section
262 def add_dependency(
263 self, dep: set[Union[str, DependencyState]], spec: DependencySpec
264 ) -> bool:
265 """Add a dependency of type deptype
267 :param dep: set with names of excuses, each of which satisfies the dep
268 :param spec: DependencySpec
270 """
272 assert dep != frozenset(), "%s: Adding empty list of dependencies" % self.name
274 deps: list[DependencyState] = []
275 try:
276 # Casting to a sorted list makes excuses more
277 # deterministic, but fails if the list has more than one
278 # element *and* at least one DependencyState
279 dep = sorted(dep) # type: ignore[type-var,assignment]
280 except TypeError:
281 pass
282 for d in dep:
283 if isinstance(d, DependencyState):
284 deps.append(d)
285 else:
286 deps.append(DependencyState(d))
287 ed = ExcuseDependency(spec, deps)
288 self.all_deps.append(ed)
289 if not ed.valid:
290 self.do_invalidate(ed)
291 return ed.valid
293 def get_deps(self) -> set[str]:
294 # the autohinter uses the excuses data to query dependencies between
295 # excuses. For now, we keep the current behaviour by just returning
296 # the data that was in the old deps set
297 """Get the dependencies of type DEPENDS"""
298 deps: set[str] = set()
299 for dep in [d for d in self.all_deps if d.deptype == DependencyType.DEPENDS]:
300 # add the first valid dependency
301 for d in dep.depstates: 301 ↛ 299line 301 didn't jump to line 299, because the loop on line 301 didn't complete
302 if d.valid: 302 ↛ 301line 302 didn't jump to line 301, because the condition on line 302 was never false
303 deps.add(cast(str, d.dep))
304 break
305 return deps
307 def add_unsatisfiable_on_arch(self, arch: str) -> None:
308 """Add an arch that has unsatisfiable dependencies"""
309 if arch not in self.unsatisfiable_on_archs:
310 self.unsatisfiable_on_archs.append(arch)
312 def add_unsatisfiable_dep(self, signature: str, arch: str) -> None:
313 """Add an unsatisfiable dependency"""
314 self.unsat_deps[arch].add(signature)
316 def do_invalidate(self, dep: ExcuseDependency) -> None:
317 """
318 param: dep: ExcuseDependency
319 """
320 self.addreason(dep.deptype.get_reason())
321 self.policy_verdict = PolicyVerdict.worst_of(self.policy_verdict, dep.verdict)
323 def invalidate_dependency(self, name: str, verdict: PolicyVerdict) -> bool:
324 """Invalidate dependency"""
325 invalidate = False
327 for dep in self.all_deps:
328 if not dep.invalidate(name, verdict):
329 invalidate = True
330 self.do_invalidate(dep)
332 return not invalidate
334 def setdaysold(self, daysold: int, mindays: int) -> None:
335 """Set the number of days from the upload and the minimum number of days for the update"""
336 self.daysold = daysold
337 self.mindays = mindays
339 def force(self) -> bool:
340 """Add force hint"""
341 self.forced = True
342 if self._policy_verdict.is_rejected:
343 self._policy_verdict = PolicyVerdict.PASS_HINTED
344 return True
345 return False
347 def addinfo(self, note: str) -> None:
348 """Add a note in HTML"""
349 self.infoline.append(note)
351 def add_verdict_info(self, verdict: PolicyVerdict, note: str) -> None:
352 """Add a note to info about this verdict level"""
353 self.verdict_info[verdict].append(note)
355 def add_detailed_info(self, note: str) -> None:
356 """Add a note to detailed info"""
357 self.detailed_info.append(note)
359 def missing_build_on_arch(self, arch: str) -> None:
360 """Note that the item is missing a build on a given architecture"""
361 self.missing_builds.add(arch)
363 def missing_build_on_ood_arch(self, arch: str) -> None:
364 """Note that the item is missing a build on a given "out of date" architecture"""
365 self.missing_builds_ood_arch.add(arch)
367 def add_old_binary(self, binary: str, from_source_version: str) -> None:
368 """Denote than an old binary ("cruft") is available from a previous source version"""
369 self.old_binaries[from_source_version].add(binary)
371 def add_hint(self, hint: "Hint") -> None:
372 self.hints.append(hint)
374 def add_package(self, pkg_id: PackageId) -> None:
375 self.packages[pkg_id.architecture].add(pkg_id)
377 def add_package_depends(
378 self,
379 spec: DependencySpec,
380 depends: Union[set[PackageId], set[BinaryPackageId]],
381 ) -> None:
382 """Add dependency on a package (source or binary)
384 :param spec: DependencySpec
385 :param depends: set of PackageIds (source or binary), each of which can satisfy the dependency
386 """
388 assert depends != cast(set[PackageId], frozenset()), (
389 "%s: Adding empty list of package dependencies" % self.name
390 )
392 # we use DependencyState for consistency with excuse dependencies, but
393 # package dependencies are never invalidated, they are used to add
394 # excuse dependencies (in invalidate_excuses()), and these are
395 # (potentially) invalidated
396 ed = ExcuseDependency(spec, [DependencyState(d) for d in depends])
397 self.depends_packages.append(ed)
398 self.depends_packages_flattened |= depends
400 def _format_verdict_summary(self) -> str:
401 verdict = self._policy_verdict
402 if verdict in VERDICT2DESC: 402 ↛ 404line 402 didn't jump to line 404, because the condition on line 402 was never false
403 return VERDICT2DESC[verdict]
404 return "UNKNOWN: Missing description for {0} - Please file a bug against Britney".format(
405 verdict.name
406 )
408 def _render_dep_issues(self, excuses: ExcusesType) -> None:
409 if self.dep_info_rendered:
410 return
412 dep_issues = defaultdict(set)
413 for d in self.all_deps:
414 info = ""
415 if not d.possible:
416 desc = d.first_impossible_dep
417 info = "Impossible %s: %s -> %s" % (d.deptype, self.uvname, desc)
418 else:
419 dep = d.first_dep
420 duv = excuses[dep].uvname # type: ignore[index]
421 # Make sure we link to package names
422 duv_src = duv.split("/")[0]
423 verdict = excuses[dep].policy_verdict # type: ignore[index]
424 if not d.valid or verdict in (
425 PolicyVerdict.REJECTED_NEEDS_APPROVAL,
426 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT,
427 PolicyVerdict.REJECTED_PERMANENTLY,
428 ):
429 info = '%s: %s <a href="#%s">%s</a> (not considered)' % (
430 d.deptype,
431 self.uvname,
432 duv_src,
433 duv,
434 )
435 if not d.valid:
436 dep_issues[d.verdict].add(
437 "Invalidated by %s" % d.deptype.get_description()
438 )
439 else:
440 info = '%s: %s <a href="#%s">%s</a>' % (
441 d.deptype,
442 self.uvname,
443 duv_src,
444 duv,
445 )
446 dep_issues[d.verdict].add(info)
448 seen = set()
449 for v in sorted(dep_issues.keys(), reverse=True):
450 for i in sorted(dep_issues[v]):
451 if i not in seen: 451 ↛ 450line 451 didn't jump to line 450, because the condition on line 451 was never false
452 self.add_verdict_info(v, i)
453 seen.add(i)
455 self.dep_info_rendered = True
457 def html(self, excuses: ExcusesType) -> str:
458 """Render the excuse in HTML"""
459 res = '<a id="%s" name="%s">%s</a> (%s to %s)\n<ul>\n' % (
460 self.uvname,
461 self.uvname,
462 self.uvname,
463 self.ver[0],
464 self.ver[1],
465 )
466 info = self._text(excuses)
467 indented = False
468 for line in info:
469 stripped_this_line = False
470 if line.startswith("∙ ∙ "):
471 line = line[4:]
472 stripped_this_line = True
473 if not indented and stripped_this_line:
474 res += "<ul>\n"
475 indented = True
476 elif indented and not stripped_this_line:
477 res += "</ul>\n"
478 indented = False
479 res += "<li>%s\n" % line
480 if indented:
481 res += "</ul>\n"
482 res = res + "</ul>\n"
483 return res
485 def setbugs(self, oldbugs: list[str], newbugs: list[str]) -> None:
486 """ "Set the list of old and new bugs"""
487 self.newbugs.update(newbugs)
488 self.oldbugs.update(oldbugs)
490 def addreason(self, reason: str) -> None:
491 """ "adding reason"""
492 self.reason[reason] = 1
494 def hasreason(self, reason: str) -> bool:
495 return reason in self.reason
497 def _text(self, excuses: ExcusesType) -> list[str]:
498 """Render the excuse in text"""
499 self._render_dep_issues(excuses)
500 res = []
501 res.append(
502 "Migration status for %s (%s to %s): %s"
503 % (self.uvname, self.ver[0], self.ver[1], self._format_verdict_summary())
504 )
505 if not self.is_valid:
506 res.append("Issues preventing migration:")
507 for v in sorted(self.verdict_info.keys(), reverse=True):
508 for x in self.verdict_info[v]:
509 res.append("∙ ∙ " + x + "")
510 if self.infoline:
511 res.append("Additional info:")
512 for x in self.infoline:
513 res.append("∙ ∙ " + x + "")
514 if self.htmlline: 514 ↛ 515line 514 didn't jump to line 515, because the condition on line 514 was never true
515 res.append("Legacy info:")
516 for x in self.htmlline:
517 res.append("∙ ∙ " + x + "")
518 return res
520 def excusedata(self, excuses: ExcusesType) -> dict[str, str]:
521 """Render the excuse in as key-value data"""
522 excusedata: dict[str, Any] = {}
523 excusedata["excuses"] = self._text(excuses)
524 excusedata["item-name"] = self.uvname
525 excusedata["source"] = self.source
526 excusedata["migration-policy-verdict"] = self._policy_verdict.name
527 excusedata["old-version"] = self.ver[0]
528 excusedata["new-version"] = self.ver[1]
529 if self.maint:
530 excusedata["maintainer"] = self.maint
531 if self.section and self.section.find("/") > -1:
532 excusedata["component"] = self.section.split("/")[0]
533 if self.policy_info:
534 excusedata["policy_info"] = self.policy_info
535 if self.missing_builds or self.missing_builds_ood_arch:
536 excusedata["missing-builds"] = {
537 "on-architectures": sorted(self.missing_builds),
538 "on-unimportant-architectures": sorted(self.missing_builds_ood_arch),
539 }
540 if {d for d in self.all_deps if not d.valid and d.possible}:
541 excusedata["invalidated-by-other-package"] = True
542 if self.all_deps or self.unsat_deps:
543 dep_data: dict[str, Any]
544 excusedata["dependencies"] = dep_data = {}
546 migrate_after = set(d.first_dep for d in self.all_deps if d.valid)
547 blocked_by = set(
548 d.first_dep for d in self.all_deps if not d.valid and d.possible
549 )
551 def sorted_uvnames(deps: set[Union[str, PackageId, None]]) -> list[str]:
552 return sorted(excuses[d].uvname for d in deps if d is not None) # type: ignore[index]
554 if blocked_by:
555 dep_data["blocked-by"] = sorted_uvnames(blocked_by)
556 if migrate_after:
557 dep_data["migrate-after"] = sorted_uvnames(migrate_after)
558 if self.unsat_deps: 558 ↛ 559line 558 didn't jump to line 559, because the condition on line 558 was never true
559 dep_data["unsatisfiable-dependencies"] = {
560 x: sorted(self.unsat_deps[x]) for x in self.unsat_deps
561 }
562 if self.needs_approval:
563 status = "not-approved"
564 if any(h.type == "unblock" for h in self.hints):
565 status = "approved"
566 excusedata["manual-approval-status"] = status
567 if self.hints:
568 hint_info = [
569 {
570 "hint-type": h.type,
571 "hint-from": h.user,
572 }
573 for h in self.hints
574 ]
576 excusedata["hints"] = hint_info
577 if self.old_binaries:
578 excusedata["old-binaries"] = {
579 x: sorted(self.old_binaries[x]) for x in self.old_binaries
580 }
581 if self.forced:
582 excusedata["forced-reason"] = sorted(list(self.reason.keys()))
583 excusedata["reason"] = []
584 else:
585 excusedata["reason"] = sorted(list(self.reason.keys()))
586 excusedata["is-candidate"] = self.is_valid
587 if self.detailed_info:
588 di = []
589 for x in self.detailed_info:
590 di.append("" + x + "")
591 excusedata["detailed-info"] = di
592 return excusedata
594 def add_bounty(self, policy: str, bounty: int) -> None:
595 """ "adding bounty"""
596 self.bounty[policy] = bounty
598 def add_penalty(self, policy: str, penalty: int) -> None:
599 """ "adding penalty"""
600 self.penalty[policy] = penalty