Coverage for britney2/excuse.py: 95%

332 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-10-17 17:32 +0000

1# Copyright (C) 2001-2004 Anthony Towns <ajt@debian.org> 

2# Andreas Barth <aba@debian.org> 

3# Fabio Tranchitella <kobold@debian.org> 

4 

5# This program is free software; you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation; either version 2 of the License, or 

8# (at your option) any later version. 

9 

10# This program is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14 

15import re 

16from collections import defaultdict 

17from typing import TYPE_CHECKING, Any, Optional, Union, cast 

18 

19from britney2 import BinaryPackageId, DependencyType, PackageId 

20from britney2.excusedeps import ( 

21 DependencySpec, 

22 DependencyState, 

23 ImpossibleDependencyState, 

24) 

25from britney2.migrationitem import MigrationItem 

26from britney2.policies import PolicyVerdict 

27 

28if TYPE_CHECKING: 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true

29 from .hints import Hint 

30 

31ExcusesType = Union[dict[PackageId, "Excuse"], dict[str, "Excuse"]] 

32 

33 

34VERDICT2DESC = { 

35 PolicyVerdict.PASS: "Will attempt migration (Any information below is purely informational)", 

36 PolicyVerdict.PASS_HINTED: "Will attempt migration due to a hint (Any information below is purely informational)", 

37 PolicyVerdict.REJECTED_TEMPORARILY: ( 

38 "Waiting for test results or another package, or too young (no action " 

39 "required now - check later)" 

40 ), 

41 PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM: ( 

42 "Waiting for another item to be ready to migrate (no action required now " 

43 "- check later)" 

44 ), 

45 PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM: ( 

46 "BLOCKED: Cannot migrate due to another item, which is blocked (please " 

47 "check which dependencies are stuck)" 

48 ), 

49 PolicyVerdict.REJECTED_NEEDS_APPROVAL: ( 

50 "BLOCKED: Needs an approval (either due to a freeze, the source suite or " 

51 "a manual hint)" 

52 ), 

53 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT: ( 

54 "BLOCKED: Maybe temporary, maybe blocked but Britney is missing information " 

55 "(check below)" 

56 ), 

57 PolicyVerdict.REJECTED_PERMANENTLY: "BLOCKED: Rejected/violates migration policy/introduces a regression", 

58} 

59 

60 

61class ExcuseDependency: 

62 """Object to represent a specific dependency of an excuse on a package 

63 (source or binary) or on other excuses""" 

64 

65 def __init__(self, spec: DependencySpec, depstates: list[DependencyState]) -> None: 

66 """ 

67 :param: spec: DependencySpec 

68 :param: depstates: list of DependencyState, each of which can satisfy 

69 the dependency 

70 """ 

71 self.spec = spec 

72 self.depstates = depstates 

73 

74 @property 

75 def deptype(self) -> DependencyType: 

76 return self.spec.deptype 

77 

78 @property 

79 def valid(self) -> bool: 

80 if {d for d in self.depstates if d.valid}: 

81 return True 

82 else: 

83 return False 

84 

85 @property 

86 def deps(self) -> set[str | PackageId | None]: 

87 return {d.dep for d in self.depstates} 

88 

89 @property 

90 def possible(self) -> bool: 

91 if {d for d in self.depstates if d.possible}: 

92 return True 

93 else: 

94 return False 

95 

96 @property 

97 def first_dep(self) -> str | PackageId | None: 

98 """return the first valid dependency, if there is one, otherwise the 

99 first possible one 

100 

101 return None if there are only impossible dependencies 

102 """ 

103 first = None 

104 for d in self.depstates: 

105 if d.valid: 

106 return d.dep 

107 elif d.possible and not first: 

108 first = d.dep 

109 return first 

110 

111 @property 

112 def first_impossible_dep(self) -> str | None: 

113 """return the first impossible dependency, if there is one""" 

114 first = None 

115 for d in self.depstates: 115 ↛ 119line 115 didn't jump to line 119 because the loop on line 115 didn't complete

116 if not d.possible: 116 ↛ 115line 116 didn't jump to line 115 because the condition on line 116 was always true

117 assert isinstance(d, ImpossibleDependencyState) # for type checking 

118 return d.desc 

119 return first 

120 

121 @property 

122 def verdict(self) -> PolicyVerdict: 

123 return min({d.verdict for d in self.depstates}) 

124 

125 def invalidate(self, excuse: str, verdict: PolicyVerdict) -> bool: 

126 """invalidate the dependencies on a specific excuse 

127 

128 :param excuse: the excuse which is no longer valid 

129 :param verdict: the PolicyVerdict causing the invalidation 

130 """ 

131 valid_alternative_left = False 

132 for ds in self.depstates: 

133 if ds.dep == excuse: 

134 ds.invalidate(verdict) 

135 elif ds.valid: 

136 valid_alternative_left = True 

137 

138 return valid_alternative_left 

139 

140 

141class Excuse: 

142 """Excuse class 

143 

144 This class represents an update excuse, which is a detailed explanation 

145 of why a package can or cannot be updated in the testing distribution from 

146 a newer package in another distribution (like for example unstable). 

147 

148 The main purpose of the excuses is to be written in an HTML file which 

149 will be published over HTTP. The maintainers will be able to parse it 

150 manually or automatically to find the explanation of why their packages 

151 have been updated or not. 

152 """ 

153 

154 # @var reemail 

155 # Regular expression for removing the email address 

156 reemail = re.compile(r" *<.*?>") 

157 

158 def __init__(self, migrationitem: str | MigrationItem) -> None: 

159 """Class constructor 

160 

161 This method initializes the excuse with the specified name and 

162 the default values. 

163 """ 

164 self.item = migrationitem 

165 self.ver = ("-", "-") 

166 self.maint: str | None = None 

167 self.daysold: int | None = None 

168 self.mindays: int | None = None 

169 self.section: str | None = None 

170 self._is_valid = False 

171 self.needs_approval = False 

172 self.hints: list["Hint"] = [] 

173 self.forced = False 

174 self._policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

175 

176 self.all_deps: list[ExcuseDependency] = [] 

177 self.unsatisfiable_on_archs: list[str] = [] 

178 self.unsat_deps: dict[str, set[str]] = defaultdict(set) 

179 self.newbugs: set[str] = set() 

180 self.oldbugs: set[str] = set() 

181 self.reason: dict[str, int] = {} 

182 self.htmlline: list[str] = [] 

183 self.missing_builds: set[str] = set() 

184 self.missing_builds_ood_arch: set[str] = set() 

185 self.old_binaries: dict[str, set[str]] = defaultdict(set) 

186 self.policy_info: dict[str, dict[str, Any]] = {} 

187 self.verdict_info: dict[PolicyVerdict, list[str]] = defaultdict(list) 

188 self.infoline: list[str] = [] 

189 self.detailed_info: list[str] = [] 

190 self.dep_info_rendered: bool = False 

191 

192 # packages (source and binary) that will migrate to testing if the 

193 # item from this excuse migrates 

194 self.packages: dict[str, set[PackageId]] = defaultdict(set) 

195 

196 # list of ExcuseDependency, with dependencies on packages 

197 self.depends_packages: list[ExcuseDependency] = [] 

198 # contains all PackageIds in any over the sets above 

199 self.depends_packages_flattened: set[PackageId] = set() 

200 

201 self.bounty: dict[str, int] = {} 

202 self.penalty: dict[str, int] = {} 

203 

204 # messenger from AutopkgtestPolicy to BlockPolicy 

205 self.autopkgtest_results: set[str] | None = None 

206 

207 def sortkey(self) -> tuple[int, str]: 

208 if self.daysold is None: 

209 return (-1, self.uvname) 

210 return (self.daysold, self.uvname) 

211 

212 @property 

213 def name(self) -> str: 

214 assert isinstance(self.item, MigrationItem) 

215 return self.item.name 

216 

217 @property 

218 def uvname(self) -> str: 

219 assert isinstance(self.item, MigrationItem) 

220 return self.item.uvname 

221 

222 @property 

223 def source(self) -> str: 

224 assert isinstance(self.item, MigrationItem) 

225 return self.item.package 

226 

227 @property 

228 def is_valid(self) -> bool: 

229 return False if self._policy_verdict.is_rejected else True 

230 

231 @property 

232 def policy_verdict(self) -> PolicyVerdict: 

233 return self._policy_verdict 

234 

235 @policy_verdict.setter 

236 def policy_verdict(self, value: PolicyVerdict) -> None: 

237 if value.is_rejected and self.forced: 237 ↛ 240line 237 didn't jump to line 240 because the condition on line 237 was never true

238 # By virtue of being forced, the item was hinted to 

239 # undo the rejection 

240 value = PolicyVerdict.PASS_HINTED 

241 self._policy_verdict = value 

242 

243 def set_vers(self, tver: str | None, uver: str | None) -> None: 

244 """Set the versions of the item from target and source suite""" 

245 if tver and uver: 

246 self.ver = (tver, uver) 

247 elif tver: 

248 self.ver = (tver, self.ver[1]) 

249 elif uver: 249 ↛ exitline 249 didn't return from function 'set_vers' because the condition on line 249 was always true

250 self.ver = (self.ver[0], uver) 

251 

252 def set_maint(self, maint: str) -> None: 

253 """Set the package maintainer's name""" 

254 self.maint = self.reemail.sub("", maint) 

255 

256 def set_section(self, section: str) -> None: 

257 """Set the section of the package""" 

258 self.section = section 

259 

260 def add_dependency( 

261 self, dep: set[str | DependencyState], spec: DependencySpec 

262 ) -> bool: 

263 """Add a dependency of type deptype 

264 

265 :param dep: set with names of excuses, each of which satisfies the dep 

266 :param spec: DependencySpec 

267 

268 """ 

269 

270 assert dep != frozenset(), "%s: Adding empty list of dependencies" % self.name 

271 

272 deps: list[DependencyState] = [] 

273 try: 

274 # Casting to a sorted list makes excuses more 

275 # deterministic, but fails if the list has more than one 

276 # element *and* at least one DependencyState 

277 dep = sorted(dep) # type: ignore[type-var,assignment] 

278 except TypeError: 

279 pass 

280 for d in dep: 

281 if isinstance(d, DependencyState): 

282 deps.append(d) 

283 else: 

284 deps.append(DependencyState(d)) 

285 ed = ExcuseDependency(spec, deps) 

286 self.all_deps.append(ed) 

287 if not ed.valid: 

288 self.do_invalidate(ed) 

289 return ed.valid 

290 

291 def get_deps(self) -> set[str]: 

292 # the autohinter uses the excuses data to query dependencies between 

293 # excuses. For now, we keep the current behaviour by just returning 

294 # the data that was in the old deps set 

295 """Get the dependencies of type DEPENDS""" 

296 deps: set[str] = set() 

297 for dep in [d for d in self.all_deps if d.deptype == DependencyType.DEPENDS]: 

298 # add the first valid dependency 

299 for d in dep.depstates: 299 ↛ 297line 299 didn't jump to line 297 because the loop on line 299 didn't complete

300 if d.valid: 300 ↛ 299line 300 didn't jump to line 299 because the condition on line 300 was always true

301 deps.add(cast(str, d.dep)) 

302 break 

303 return deps 

304 

305 def add_unsatisfiable_on_arch(self, arch: str) -> None: 

306 """Add an arch that has unsatisfiable dependencies""" 

307 if arch not in self.unsatisfiable_on_archs: 307 ↛ exitline 307 didn't return from function 'add_unsatisfiable_on_arch' because the condition on line 307 was always true

308 self.unsatisfiable_on_archs.append(arch) 

309 

310 def add_unsatisfiable_dep(self, signature: str, arch: str) -> None: 

311 """Add an unsatisfiable dependency""" 

312 self.unsat_deps[arch].add(signature) 

313 

314 def do_invalidate(self, dep: ExcuseDependency) -> None: 

315 """ 

316 param: dep: ExcuseDependency 

317 """ 

318 self.addreason(dep.deptype.get_reason()) 

319 self.policy_verdict = PolicyVerdict.worst_of(self.policy_verdict, dep.verdict) 

320 

321 def invalidate_dependency(self, name: str, verdict: PolicyVerdict) -> bool: 

322 """Invalidate dependency""" 

323 invalidate = False 

324 

325 for dep in self.all_deps: 

326 if not dep.invalidate(name, verdict): 

327 invalidate = True 

328 self.do_invalidate(dep) 

329 

330 return not invalidate 

331 

332 def setdaysold(self, daysold: int, mindays: int) -> None: 

333 """Set the number of days from the upload and the minimum number of days for the update""" 

334 self.daysold = daysold 

335 self.mindays = mindays 

336 

337 def force(self) -> bool: 

338 """Add force hint""" 

339 self.forced = True 

340 if self._policy_verdict.is_rejected: 

341 self._policy_verdict = PolicyVerdict.PASS_HINTED 

342 return True 

343 return False 

344 

345 def addinfo(self, note: str) -> None: 

346 """Add a note in HTML""" 

347 self.infoline.append(note) 

348 

349 def add_verdict_info(self, verdict: PolicyVerdict, note: str) -> None: 

350 """Add a note to info about this verdict level""" 

351 self.verdict_info[verdict].append(note) 

352 

353 def add_detailed_info(self, note: str) -> None: 

354 """Add a note to detailed info""" 

355 self.detailed_info.append(note) 

356 

357 def missing_build_on_arch(self, arch: str) -> None: 

358 """Note that the item is missing a build on a given architecture""" 

359 self.missing_builds.add(arch) 

360 

361 def missing_build_on_ood_arch(self, arch: str) -> None: 

362 """Note that the item is missing a build on a given "out of date" architecture""" 

363 self.missing_builds_ood_arch.add(arch) 

364 

365 def add_old_binary(self, binary: str, from_source_version: str) -> None: 

366 """Denote than an old binary ("cruft") is available from a previous source version""" 

367 self.old_binaries[from_source_version].add(binary) 

368 

369 def add_hint(self, hint: "Hint") -> None: 

370 self.hints.append(hint) 

371 

372 def add_package(self, pkg_id: PackageId) -> None: 

373 self.packages[pkg_id.architecture].add(pkg_id) 

374 

375 def add_package_depends( 

376 self, 

377 spec: DependencySpec, 

378 depends: set[PackageId] | set[BinaryPackageId], 

379 ) -> None: 

380 """Add dependency on a package (source or binary) 

381 

382 :param spec: DependencySpec 

383 :param depends: set of PackageIds (source or binary), each of which can satisfy the dependency 

384 """ 

385 

386 assert depends != cast(set[PackageId], frozenset()), ( 

387 "%s: Adding empty list of package dependencies" % self.name 

388 ) 

389 

390 # we use DependencyState for consistency with excuse dependencies, but 

391 # package dependencies are never invalidated, they are used to add 

392 # excuse dependencies (in invalidate_excuses()), and these are 

393 # (potentially) invalidated 

394 ed = ExcuseDependency(spec, [DependencyState(d) for d in depends]) 

395 self.depends_packages.append(ed) 

396 self.depends_packages_flattened |= depends 

397 

398 def _format_verdict_summary(self) -> str: 

399 verdict = self._policy_verdict 

400 if verdict in VERDICT2DESC: 400 ↛ 402line 400 didn't jump to line 402 because the condition on line 400 was always true

401 return VERDICT2DESC[verdict] 

402 return ( 

403 f"UNKNOWN: Missing description for {verdict.name} - " 

404 "Please file a bug against Britney" 

405 ) 

406 

407 def _render_dep_issues(self, excuses: ExcusesType) -> None: 

408 if self.dep_info_rendered: 

409 return 

410 

411 dep_issues = defaultdict(set) 

412 for d in self.all_deps: 

413 info = "" 

414 if not d.possible: 

415 desc = d.first_impossible_dep 

416 info = f"Impossible {d.deptype}: {self.uvname} -> {desc}" 

417 else: 

418 dep = d.first_dep 

419 duv = excuses[dep].uvname # type: ignore[index] 

420 # Make sure we link to package names 

421 duv_src = duv.split("/")[0] 

422 verdict = excuses[dep].policy_verdict # type: ignore[index] 

423 if not d.valid or verdict in ( 

424 PolicyVerdict.REJECTED_NEEDS_APPROVAL, 

425 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT, 

426 PolicyVerdict.REJECTED_PERMANENTLY, 

427 ): 

428 info = ( 

429 f"{d.deptype}: {self.uvname} " 

430 f'<a href="#{duv_src}">{duv}</a> ' 

431 "(not considered)" 

432 ) 

433 if not d.valid: 

434 dep_issues[d.verdict].add( 

435 "Invalidated by %s" % d.deptype.get_description() 

436 ) 

437 else: 

438 info = f'{d.deptype}: {self.uvname} <a href="#{duv_src}">{duv}</a>' 

439 dep_issues[d.verdict].add(info) 

440 

441 seen = set() 

442 for v in sorted(dep_issues.keys(), reverse=True): 

443 for i in sorted(dep_issues[v]): 

444 if i not in seen: 444 ↛ 443line 444 didn't jump to line 443 because the condition on line 444 was always true

445 self.add_verdict_info(v, i) 

446 seen.add(i) 

447 

448 self.dep_info_rendered = True 

449 

450 def html(self, excuses: ExcusesType) -> str: 

451 """Render the excuse in HTML""" 

452 res = '<a id="{0}" name="{0}">{0}</a> ({1} to {2})\n<ul>\n'.format( 

453 self.uvname, 

454 self.ver[0], 

455 self.ver[1], 

456 ) 

457 info = self._text(excuses) 

458 indented = False 

459 for line in info: 

460 stripped_this_line = False 

461 if line.startswith("∙ ∙ "): 

462 line = line[4:] 

463 stripped_this_line = True 

464 if not indented and stripped_this_line: 

465 res += "<ul>\n" 

466 indented = True 

467 elif indented and not stripped_this_line: 

468 res += "</ul>\n" 

469 indented = False 

470 res += "<li>%s\n" % line 

471 if indented: 

472 res += "</ul>\n" 

473 res = res + "</ul>\n" 

474 return res 

475 

476 def setbugs(self, oldbugs: list[str], newbugs: list[str]) -> None: 

477 """ "Set the list of old and new bugs""" 

478 self.newbugs.update(newbugs) 

479 self.oldbugs.update(oldbugs) 

480 

481 def addreason(self, reason: str) -> None: 

482 """ "adding reason""" 

483 self.reason[reason] = 1 

484 

485 def hasreason(self, reason: str) -> bool: 

486 return reason in self.reason 

487 

488 def _text(self, excuses: ExcusesType) -> list[str]: 

489 """Render the excuse in text""" 

490 self._render_dep_issues(excuses) 

491 res = [] 

492 res.append( 

493 "Migration status for %s (%s to %s): %s" 

494 % (self.uvname, self.ver[0], self.ver[1], self._format_verdict_summary()) 

495 ) 

496 if not self.is_valid: 

497 res.append("Issues preventing migration:") 

498 for v in sorted(self.verdict_info.keys(), reverse=True): 

499 for x in self.verdict_info[v]: 

500 res.append("∙ ∙ " + x + "") 

501 if self.infoline: 

502 res.append("Additional info (not blocking or delaying):") 

503 for x in self.infoline: 

504 res.append("∙ ∙ " + x + "") 

505 if self.htmlline: 505 ↛ 506line 505 didn't jump to line 506 because the condition on line 505 was never true

506 res.append("Legacy info:") 

507 for x in self.htmlline: 

508 res.append("∙ ∙ " + x + "") 

509 return res 

510 

511 def excusedata(self, excuses: ExcusesType) -> dict[str, str]: 

512 """Render the excuse in as key-value data""" 

513 excusedata: dict[str, Any] = {} 

514 excusedata["excuses"] = self._text(excuses) 

515 excusedata["item-name"] = self.uvname 

516 excusedata["source"] = self.source 

517 excusedata["migration-policy-verdict"] = self._policy_verdict.name 

518 excusedata["old-version"] = self.ver[0] 

519 excusedata["new-version"] = self.ver[1] 

520 if self.maint: 

521 excusedata["maintainer"] = self.maint 

522 if self.section and self.section.find("/") > -1: 

523 excusedata["component"] = self.section.split("/")[0] 

524 if self.policy_info: 

525 excusedata["policy_info"] = self.policy_info 

526 if self.missing_builds or self.missing_builds_ood_arch: 

527 excusedata["missing-builds"] = { 

528 "on-architectures": sorted(self.missing_builds), 

529 "on-unimportant-architectures": sorted(self.missing_builds_ood_arch), 

530 } 

531 if {d for d in self.all_deps if not d.valid and d.possible}: 

532 excusedata["invalidated-by-other-package"] = True 

533 if self.all_deps or self.unsat_deps: 

534 dep_data: dict[str, Any] 

535 excusedata["dependencies"] = dep_data = {} 

536 

537 migrate_after = {d.first_dep for d in self.all_deps if d.valid} 

538 blocked_by = { 

539 d.first_dep for d in self.all_deps if not d.valid and d.possible 

540 } 

541 

542 def sorted_uvnames(deps: set[str | PackageId | None]) -> list[str]: 

543 return sorted(excuses[d].uvname for d in deps if d is not None) # type: ignore[index] 

544 

545 if blocked_by: 

546 dep_data["blocked-by"] = sorted_uvnames(blocked_by) 

547 if migrate_after: 

548 dep_data["migrate-after"] = sorted_uvnames(migrate_after) 

549 if self.unsat_deps: 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true

550 dep_data["unsatisfiable-dependencies"] = { 

551 x: sorted(self.unsat_deps[x]) for x in self.unsat_deps 

552 } 

553 if self.needs_approval: 

554 status = "not-approved" 

555 if any(h.type == "unblock" for h in self.hints): 

556 status = "approved" 

557 excusedata["manual-approval-status"] = status 

558 if self.hints: 

559 hint_info = [ 

560 { 

561 "hint-type": h.type, 

562 "hint-from": h.user, 

563 } 

564 for h in self.hints 

565 ] 

566 

567 excusedata["hints"] = hint_info 

568 if self.old_binaries: 

569 excusedata["old-binaries"] = { 

570 x: sorted(self.old_binaries[x]) for x in self.old_binaries 

571 } 

572 if self.forced: 

573 excusedata["forced-reason"] = sorted(list(self.reason.keys())) 

574 excusedata["reason"] = [] 

575 else: 

576 excusedata["reason"] = sorted(list(self.reason.keys())) 

577 excusedata["is-candidate"] = self.is_valid 

578 if self.detailed_info: 

579 di = [] 

580 for x in self.detailed_info: 

581 di.append("" + x + "") 

582 excusedata["detailed-info"] = di 

583 return excusedata 

584 

585 def add_bounty(self, policy: str, bounty: int) -> None: 

586 """ "adding bounty""" 

587 self.bounty[policy] = bounty 

588 

589 def add_penalty(self, policy: str, penalty: int) -> None: 

590 """ "adding penalty""" 

591 self.penalty[policy] = penalty