Coverage for britney2/excuse.py: 95%

332 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-03-23 07:34 +0000

1# -*- coding: utf-8 -*- 

2 

3# Copyright (C) 2001-2004 Anthony Towns <ajt@debian.org> 

4# Andreas Barth <aba@debian.org> 

5# Fabio Tranchitella <kobold@debian.org> 

6 

7# This program is free software; you can redistribute it and/or modify 

8# it under the terms of the GNU General Public License as published by 

9# the Free Software Foundation; either version 2 of the License, or 

10# (at your option) any later version. 

11 

12# This program is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16 

17import re 

18from collections import defaultdict 

19from typing import TYPE_CHECKING, Any, Optional, Union, cast 

20 

21from britney2 import BinaryPackageId, DependencyType, PackageId 

22from britney2.excusedeps import ( 

23 DependencySpec, 

24 DependencyState, 

25 ImpossibleDependencyState, 

26) 

27from britney2.migrationitem import MigrationItem 

28from britney2.policies import PolicyVerdict 

29 

30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31, because the condition on line 30 was never true

31 from .hints import Hint 

32 

33ExcusesType = Union[dict[PackageId, "Excuse"], dict[str, "Excuse"]] 

34 

35 

36VERDICT2DESC = { 

37 PolicyVerdict.PASS: "Will attempt migration (Any information below is purely informational)", 

38 PolicyVerdict.PASS_HINTED: "Will attempt migration due to a hint (Any information below is purely informational)", 

39 PolicyVerdict.REJECTED_TEMPORARILY: ( 

40 "Waiting for test results or another package, or too young (no action " 

41 "required now - check later)" 

42 ), 

43 PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM: ( 

44 "Waiting for another item to be ready to migrate (no action required now " 

45 "- check later)" 

46 ), 

47 PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM: ( 

48 "BLOCKED: Cannot migrate due to another item, which is blocked (please " 

49 "check which dependencies are stuck)" 

50 ), 

51 PolicyVerdict.REJECTED_NEEDS_APPROVAL: ( 

52 "BLOCKED: Needs an approval (either due to a freeze, the source suite or " 

53 "a manual hint)" 

54 ), 

55 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT: ( 

56 "BLOCKED: Maybe temporary, maybe blocked but Britney is missing information " 

57 "(check below)" 

58 ), 

59 PolicyVerdict.REJECTED_PERMANENTLY: "BLOCKED: Rejected/violates migration policy/introduces a regression", 

60} 

61 

62 

63class ExcuseDependency(object): 

64 """Object to represent a specific dependency of an excuse on a package 

65 (source or binary) or on other excuses""" 

66 

67 def __init__(self, spec: DependencySpec, depstates: list[DependencyState]) -> None: 

68 """ 

69 :param: spec: DependencySpec 

70 :param: depstates: list of DependencyState, each of which can satisfy 

71 the dependency 

72 """ 

73 self.spec = spec 

74 self.depstates = depstates 

75 

76 @property 

77 def deptype(self) -> DependencyType: 

78 return self.spec.deptype 

79 

80 @property 

81 def valid(self) -> bool: 

82 if {d for d in self.depstates if d.valid}: 

83 return True 

84 else: 

85 return False 

86 

87 @property 

88 def deps(self) -> set[Optional[Union[str, PackageId]]]: 

89 return {d.dep for d in self.depstates} 

90 

91 @property 

92 def possible(self) -> bool: 

93 if {d for d in self.depstates if d.possible}: 

94 return True 

95 else: 

96 return False 

97 

98 @property 

99 def first_dep(self) -> Optional[Union[str, PackageId]]: 

100 """return the first valid dependency, if there is one, otherwise the 

101 first possible one 

102 

103 return None if there are only impossible dependencies 

104 """ 

105 first = None 

106 for d in self.depstates: 

107 if d.valid: 

108 return d.dep 

109 elif d.possible and not first: 

110 first = d.dep 

111 return first 

112 

113 @property 

114 def first_impossible_dep(self) -> Optional[str]: 

115 """return the first impossible dependency, if there is one""" 

116 first = None 

117 for d in self.depstates: 117 ↛ 121line 117 didn't jump to line 121, because the loop on line 117 didn't complete

118 if not d.possible: 118 ↛ 117line 118 didn't jump to line 117, because the condition on line 118 was never false

119 assert isinstance(d, ImpossibleDependencyState) # for type checking 

120 return d.desc 

121 return first 

122 

123 @property 

124 def verdict(self) -> PolicyVerdict: 

125 return min({d.verdict for d in self.depstates}) 

126 

127 def invalidate(self, excuse: str, verdict: PolicyVerdict) -> bool: 

128 """invalidate the dependencies on a specific excuse 

129 

130 :param excuse: the excuse which is no longer valid 

131 :param verdict: the PolicyVerdict causing the invalidation 

132 """ 

133 valid_alternative_left = False 

134 for ds in self.depstates: 

135 if ds.dep == excuse: 

136 ds.invalidate(verdict) 

137 elif ds.valid: 

138 valid_alternative_left = True 

139 

140 return valid_alternative_left 

141 

142 

143class Excuse(object): 

144 """Excuse class 

145 

146 This class represents an update excuse, which is a detailed explanation 

147 of why a package can or cannot be updated in the testing distribution from 

148 a newer package in another distribution (like for example unstable). 

149 

150 The main purpose of the excuses is to be written in an HTML file which 

151 will be published over HTTP. The maintainers will be able to parse it 

152 manually or automatically to find the explanation of why their packages 

153 have been updated or not. 

154 """ 

155 

156 # @var reemail 

157 # Regular expression for removing the email address 

158 reemail = re.compile(r" *<.*?>") 

159 

160 def __init__(self, migrationitem: Union[str, MigrationItem]) -> None: 

161 """Class constructor 

162 

163 This method initializes the excuse with the specified name and 

164 the default values. 

165 """ 

166 self.item = migrationitem 

167 self.ver = ("-", "-") 

168 self.maint: Optional[str] = None 

169 self.daysold: Optional[int] = None 

170 self.mindays: Optional[int] = None 

171 self.section: Optional[str] = None 

172 self._is_valid = False 

173 self.needs_approval = False 

174 self.hints: list["Hint"] = [] 

175 self.forced = False 

176 self._policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

177 

178 self.all_deps: list[ExcuseDependency] = [] 

179 self.unsatisfiable_on_archs: list[str] = [] 

180 self.unsat_deps: dict[str, set[str]] = defaultdict(set) 

181 self.newbugs: set[str] = set() 

182 self.oldbugs: set[str] = set() 

183 self.reason: dict[str, int] = {} 

184 self.htmlline: list[str] = [] 

185 self.missing_builds: set[str] = set() 

186 self.missing_builds_ood_arch: set[str] = set() 

187 self.old_binaries: dict[str, set[str]] = defaultdict(set) 

188 self.policy_info: dict[str, dict[str, Any]] = {} 

189 self.verdict_info: dict[PolicyVerdict, list[str]] = defaultdict(list) 

190 self.infoline: list[str] = [] 

191 self.detailed_info: list[str] = [] 

192 self.dep_info_rendered: bool = False 

193 

194 # packages (source and binary) that will migrate to testing if the 

195 # item from this excuse migrates 

196 self.packages: dict[str, set[PackageId]] = defaultdict(set) 

197 

198 # list of ExcuseDependency, with dependencies on packages 

199 self.depends_packages: list[ExcuseDependency] = [] 

200 # contains all PackageIds in any over the sets above 

201 self.depends_packages_flattened: set[PackageId] = set() 

202 

203 self.bounty: dict[str, int] = {} 

204 self.penalty: dict[str, int] = {} 

205 

206 # messenger from AutopkgtestPolicy to BlockPolicy 

207 self.autopkgtest_results: Optional[set[str]] = None 

208 

209 def sortkey(self) -> tuple[int, str]: 

210 if self.daysold is None: 

211 return (-1, self.uvname) 

212 return (self.daysold, self.uvname) 

213 

214 @property 

215 def name(self) -> str: 

216 assert isinstance(self.item, MigrationItem) 

217 return self.item.name 

218 

219 @property 

220 def uvname(self) -> str: 

221 assert isinstance(self.item, MigrationItem) 

222 return self.item.uvname 

223 

224 @property 

225 def source(self) -> str: 

226 assert isinstance(self.item, MigrationItem) 

227 return self.item.package 

228 

229 @property 

230 def is_valid(self) -> bool: 

231 return False if self._policy_verdict.is_rejected else True 

232 

233 @property 

234 def policy_verdict(self) -> PolicyVerdict: 

235 return self._policy_verdict 

236 

237 @policy_verdict.setter 

238 def policy_verdict(self, value: PolicyVerdict) -> None: 

239 if value.is_rejected and self.forced: 239 ↛ 242line 239 didn't jump to line 242, because the condition on line 239 was never true

240 # By virtue of being forced, the item was hinted to 

241 # undo the rejection 

242 value = PolicyVerdict.PASS_HINTED 

243 self._policy_verdict = value 

244 

245 def set_vers(self, tver: Optional[str], uver: Optional[str]) -> None: 

246 """Set the versions of the item from target and source suite""" 

247 if tver and uver: 

248 self.ver = (tver, uver) 

249 elif tver: 

250 self.ver = (tver, self.ver[1]) 

251 elif uver: 251 ↛ exitline 251 didn't return from function 'set_vers', because the condition on line 251 was never false

252 self.ver = (self.ver[0], uver) 

253 

254 def set_maint(self, maint: str) -> None: 

255 """Set the package maintainer's name""" 

256 self.maint = self.reemail.sub("", maint) 

257 

258 def set_section(self, section: str) -> None: 

259 """Set the section of the package""" 

260 self.section = section 

261 

262 def add_dependency( 

263 self, dep: set[Union[str, DependencyState]], spec: DependencySpec 

264 ) -> bool: 

265 """Add a dependency of type deptype 

266 

267 :param dep: set with names of excuses, each of which satisfies the dep 

268 :param spec: DependencySpec 

269 

270 """ 

271 

272 assert dep != frozenset(), "%s: Adding empty list of dependencies" % self.name 

273 

274 deps: list[DependencyState] = [] 

275 try: 

276 # Casting to a sorted list makes excuses more 

277 # deterministic, but fails if the list has more than one 

278 # element *and* at least one DependencyState 

279 dep = sorted(dep) # type: ignore[type-var,assignment] 

280 except TypeError: 

281 pass 

282 for d in dep: 

283 if isinstance(d, DependencyState): 

284 deps.append(d) 

285 else: 

286 deps.append(DependencyState(d)) 

287 ed = ExcuseDependency(spec, deps) 

288 self.all_deps.append(ed) 

289 if not ed.valid: 

290 self.do_invalidate(ed) 

291 return ed.valid 

292 

293 def get_deps(self) -> set[str]: 

294 # the autohinter uses the excuses data to query dependencies between 

295 # excuses. For now, we keep the current behaviour by just returning 

296 # the data that was in the old deps set 

297 """Get the dependencies of type DEPENDS""" 

298 deps: set[str] = set() 

299 for dep in [d for d in self.all_deps if d.deptype == DependencyType.DEPENDS]: 

300 # add the first valid dependency 

301 for d in dep.depstates: 301 ↛ 299line 301 didn't jump to line 299, because the loop on line 301 didn't complete

302 if d.valid: 302 ↛ 301line 302 didn't jump to line 301, because the condition on line 302 was never false

303 deps.add(cast(str, d.dep)) 

304 break 

305 return deps 

306 

307 def add_unsatisfiable_on_arch(self, arch: str) -> None: 

308 """Add an arch that has unsatisfiable dependencies""" 

309 if arch not in self.unsatisfiable_on_archs: 

310 self.unsatisfiable_on_archs.append(arch) 

311 

312 def add_unsatisfiable_dep(self, signature: str, arch: str) -> None: 

313 """Add an unsatisfiable dependency""" 

314 self.unsat_deps[arch].add(signature) 

315 

316 def do_invalidate(self, dep: ExcuseDependency) -> None: 

317 """ 

318 param: dep: ExcuseDependency 

319 """ 

320 self.addreason(dep.deptype.get_reason()) 

321 self.policy_verdict = PolicyVerdict.worst_of(self.policy_verdict, dep.verdict) 

322 

323 def invalidate_dependency(self, name: str, verdict: PolicyVerdict) -> bool: 

324 """Invalidate dependency""" 

325 invalidate = False 

326 

327 for dep in self.all_deps: 

328 if not dep.invalidate(name, verdict): 

329 invalidate = True 

330 self.do_invalidate(dep) 

331 

332 return not invalidate 

333 

334 def setdaysold(self, daysold: int, mindays: int) -> None: 

335 """Set the number of days from the upload and the minimum number of days for the update""" 

336 self.daysold = daysold 

337 self.mindays = mindays 

338 

339 def force(self) -> bool: 

340 """Add force hint""" 

341 self.forced = True 

342 if self._policy_verdict.is_rejected: 

343 self._policy_verdict = PolicyVerdict.PASS_HINTED 

344 return True 

345 return False 

346 

347 def addinfo(self, note: str) -> None: 

348 """Add a note in HTML""" 

349 self.infoline.append(note) 

350 

351 def add_verdict_info(self, verdict: PolicyVerdict, note: str) -> None: 

352 """Add a note to info about this verdict level""" 

353 self.verdict_info[verdict].append(note) 

354 

355 def add_detailed_info(self, note: str) -> None: 

356 """Add a note to detailed info""" 

357 self.detailed_info.append(note) 

358 

359 def missing_build_on_arch(self, arch: str) -> None: 

360 """Note that the item is missing a build on a given architecture""" 

361 self.missing_builds.add(arch) 

362 

363 def missing_build_on_ood_arch(self, arch: str) -> None: 

364 """Note that the item is missing a build on a given "out of date" architecture""" 

365 self.missing_builds_ood_arch.add(arch) 

366 

367 def add_old_binary(self, binary: str, from_source_version: str) -> None: 

368 """Denote than an old binary ("cruft") is available from a previous source version""" 

369 self.old_binaries[from_source_version].add(binary) 

370 

371 def add_hint(self, hint: "Hint") -> None: 

372 self.hints.append(hint) 

373 

374 def add_package(self, pkg_id: PackageId) -> None: 

375 self.packages[pkg_id.architecture].add(pkg_id) 

376 

377 def add_package_depends( 

378 self, 

379 spec: DependencySpec, 

380 depends: Union[set[PackageId], set[BinaryPackageId]], 

381 ) -> None: 

382 """Add dependency on a package (source or binary) 

383 

384 :param spec: DependencySpec 

385 :param depends: set of PackageIds (source or binary), each of which can satisfy the dependency 

386 """ 

387 

388 assert depends != cast(set[PackageId], frozenset()), ( 

389 "%s: Adding empty list of package dependencies" % self.name 

390 ) 

391 

392 # we use DependencyState for consistency with excuse dependencies, but 

393 # package dependencies are never invalidated, they are used to add 

394 # excuse dependencies (in invalidate_excuses()), and these are 

395 # (potentially) invalidated 

396 ed = ExcuseDependency(spec, [DependencyState(d) for d in depends]) 

397 self.depends_packages.append(ed) 

398 self.depends_packages_flattened |= depends 

399 

400 def _format_verdict_summary(self) -> str: 

401 verdict = self._policy_verdict 

402 if verdict in VERDICT2DESC: 402 ↛ 404line 402 didn't jump to line 404, because the condition on line 402 was never false

403 return VERDICT2DESC[verdict] 

404 return "UNKNOWN: Missing description for {0} - Please file a bug against Britney".format( 

405 verdict.name 

406 ) 

407 

408 def _render_dep_issues(self, excuses: ExcusesType) -> None: 

409 if self.dep_info_rendered: 

410 return 

411 

412 dep_issues = defaultdict(set) 

413 for d in self.all_deps: 

414 info = "" 

415 if not d.possible: 

416 desc = d.first_impossible_dep 

417 info = "Impossible %s: %s -> %s" % (d.deptype, self.uvname, desc) 

418 else: 

419 dep = d.first_dep 

420 duv = excuses[dep].uvname # type: ignore[index] 

421 # Make sure we link to package names 

422 duv_src = duv.split("/")[0] 

423 verdict = excuses[dep].policy_verdict # type: ignore[index] 

424 if not d.valid or verdict in ( 

425 PolicyVerdict.REJECTED_NEEDS_APPROVAL, 

426 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT, 

427 PolicyVerdict.REJECTED_PERMANENTLY, 

428 ): 

429 info = '%s: %s <a href="#%s">%s</a> (not considered)' % ( 

430 d.deptype, 

431 self.uvname, 

432 duv_src, 

433 duv, 

434 ) 

435 if not d.valid: 

436 dep_issues[d.verdict].add( 

437 "Invalidated by %s" % d.deptype.get_description() 

438 ) 

439 else: 

440 info = '%s: %s <a href="#%s">%s</a>' % ( 

441 d.deptype, 

442 self.uvname, 

443 duv_src, 

444 duv, 

445 ) 

446 dep_issues[d.verdict].add(info) 

447 

448 seen = set() 

449 for v in sorted(dep_issues.keys(), reverse=True): 

450 for i in sorted(dep_issues[v]): 

451 if i not in seen: 451 ↛ 450line 451 didn't jump to line 450, because the condition on line 451 was never false

452 self.add_verdict_info(v, i) 

453 seen.add(i) 

454 

455 self.dep_info_rendered = True 

456 

457 def html(self, excuses: ExcusesType) -> str: 

458 """Render the excuse in HTML""" 

459 res = '<a id="%s" name="%s">%s</a> (%s to %s)\n<ul>\n' % ( 

460 self.uvname, 

461 self.uvname, 

462 self.uvname, 

463 self.ver[0], 

464 self.ver[1], 

465 ) 

466 info = self._text(excuses) 

467 indented = False 

468 for line in info: 

469 stripped_this_line = False 

470 if line.startswith("∙ ∙ "): 

471 line = line[4:] 

472 stripped_this_line = True 

473 if not indented and stripped_this_line: 

474 res += "<ul>\n" 

475 indented = True 

476 elif indented and not stripped_this_line: 

477 res += "</ul>\n" 

478 indented = False 

479 res += "<li>%s\n" % line 

480 if indented: 

481 res += "</ul>\n" 

482 res = res + "</ul>\n" 

483 return res 

484 

485 def setbugs(self, oldbugs: list[str], newbugs: list[str]) -> None: 

486 """ "Set the list of old and new bugs""" 

487 self.newbugs.update(newbugs) 

488 self.oldbugs.update(oldbugs) 

489 

490 def addreason(self, reason: str) -> None: 

491 """ "adding reason""" 

492 self.reason[reason] = 1 

493 

494 def hasreason(self, reason: str) -> bool: 

495 return reason in self.reason 

496 

497 def _text(self, excuses: ExcusesType) -> list[str]: 

498 """Render the excuse in text""" 

499 self._render_dep_issues(excuses) 

500 res = [] 

501 res.append( 

502 "Migration status for %s (%s to %s): %s" 

503 % (self.uvname, self.ver[0], self.ver[1], self._format_verdict_summary()) 

504 ) 

505 if not self.is_valid: 

506 res.append("Issues preventing migration:") 

507 for v in sorted(self.verdict_info.keys(), reverse=True): 

508 for x in self.verdict_info[v]: 

509 res.append("∙ ∙ " + x + "") 

510 if self.infoline: 

511 res.append("Additional info:") 

512 for x in self.infoline: 

513 res.append("∙ ∙ " + x + "") 

514 if self.htmlline: 514 ↛ 515line 514 didn't jump to line 515, because the condition on line 514 was never true

515 res.append("Legacy info:") 

516 for x in self.htmlline: 

517 res.append("∙ ∙ " + x + "") 

518 return res 

519 

520 def excusedata(self, excuses: ExcusesType) -> dict[str, str]: 

521 """Render the excuse in as key-value data""" 

522 excusedata: dict[str, Any] = {} 

523 excusedata["excuses"] = self._text(excuses) 

524 excusedata["item-name"] = self.uvname 

525 excusedata["source"] = self.source 

526 excusedata["migration-policy-verdict"] = self._policy_verdict.name 

527 excusedata["old-version"] = self.ver[0] 

528 excusedata["new-version"] = self.ver[1] 

529 if self.maint: 

530 excusedata["maintainer"] = self.maint 

531 if self.section and self.section.find("/") > -1: 

532 excusedata["component"] = self.section.split("/")[0] 

533 if self.policy_info: 

534 excusedata["policy_info"] = self.policy_info 

535 if self.missing_builds or self.missing_builds_ood_arch: 

536 excusedata["missing-builds"] = { 

537 "on-architectures": sorted(self.missing_builds), 

538 "on-unimportant-architectures": sorted(self.missing_builds_ood_arch), 

539 } 

540 if {d for d in self.all_deps if not d.valid and d.possible}: 

541 excusedata["invalidated-by-other-package"] = True 

542 if self.all_deps or self.unsat_deps: 

543 dep_data: dict[str, Any] 

544 excusedata["dependencies"] = dep_data = {} 

545 

546 migrate_after = set(d.first_dep for d in self.all_deps if d.valid) 

547 blocked_by = set( 

548 d.first_dep for d in self.all_deps if not d.valid and d.possible 

549 ) 

550 

551 def sorted_uvnames(deps: set[Union[str, PackageId, None]]) -> list[str]: 

552 return sorted(excuses[d].uvname for d in deps if d is not None) # type: ignore[index] 

553 

554 if blocked_by: 

555 dep_data["blocked-by"] = sorted_uvnames(blocked_by) 

556 if migrate_after: 

557 dep_data["migrate-after"] = sorted_uvnames(migrate_after) 

558 if self.unsat_deps: 558 ↛ 559line 558 didn't jump to line 559, because the condition on line 558 was never true

559 dep_data["unsatisfiable-dependencies"] = { 

560 x: sorted(self.unsat_deps[x]) for x in self.unsat_deps 

561 } 

562 if self.needs_approval: 

563 status = "not-approved" 

564 if any(h.type == "unblock" for h in self.hints): 

565 status = "approved" 

566 excusedata["manual-approval-status"] = status 

567 if self.hints: 

568 hint_info = [ 

569 { 

570 "hint-type": h.type, 

571 "hint-from": h.user, 

572 } 

573 for h in self.hints 

574 ] 

575 

576 excusedata["hints"] = hint_info 

577 if self.old_binaries: 

578 excusedata["old-binaries"] = { 

579 x: sorted(self.old_binaries[x]) for x in self.old_binaries 

580 } 

581 if self.forced: 

582 excusedata["forced-reason"] = sorted(list(self.reason.keys())) 

583 excusedata["reason"] = [] 

584 else: 

585 excusedata["reason"] = sorted(list(self.reason.keys())) 

586 excusedata["is-candidate"] = self.is_valid 

587 if self.detailed_info: 

588 di = [] 

589 for x in self.detailed_info: 

590 di.append("" + x + "") 

591 excusedata["detailed-info"] = di 

592 return excusedata 

593 

594 def add_bounty(self, policy: str, bounty: int) -> None: 

595 """ "adding bounty""" 

596 self.bounty[policy] = bounty 

597 

598 def add_penalty(self, policy: str, penalty: int) -> None: 

599 """ "adding penalty""" 

600 self.penalty[policy] = penalty