Coverage for britney2/excuse.py: 96%

326 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-04-18 20:48 +0000

1# -*- coding: utf-8 -*- 

2 

3# Copyright (C) 2001-2004 Anthony Towns <ajt@debian.org> 

4# Andreas Barth <aba@debian.org> 

5# Fabio Tranchitella <kobold@debian.org> 

6 

7# This program is free software; you can redistribute it and/or modify 

8# it under the terms of the GNU General Public License as published by 

9# the Free Software Foundation; either version 2 of the License, or 

10# (at your option) any later version. 

11 

12# This program is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16 

17from collections import defaultdict 

18import re 

19from typing import Optional 

20 

21from britney2 import DependencyType 

22from britney2.migrationitem import MigrationItem 

23from britney2.excusedeps import DependencySpec, DependencyState, ImpossibleDependencyState 

24from britney2.policies import PolicyVerdict 

25 

26VERDICT2DESC = { 

27 PolicyVerdict.PASS: 

28 'Will attempt migration (Any information below is purely informational)', 

29 PolicyVerdict.PASS_HINTED: 

30 'Will attempt migration due to a hint (Any information below is purely informational)', 

31 PolicyVerdict.REJECTED_TEMPORARILY: 

32 'Waiting for test results or another package, or too young (no action required now - check later)', 

33 PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM: 

34 'Waiting for another item to be ready to migrate (no action required now - check later)', 

35 PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM: 

36 'BLOCKED: Cannot migrate due to another item, which is blocked (please check which dependencies are stuck)', 

37 PolicyVerdict.REJECTED_NEEDS_APPROVAL: 

38 'BLOCKED: Needs an approval (either due to a freeze, the source suite or a manual hint)', 

39 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT: 

40 'BLOCKED: Maybe temporary, maybe blocked but Britney is missing information (check below)', 

41 PolicyVerdict.REJECTED_PERMANENTLY: 

42 'BLOCKED: Rejected/violates migration policy/introduces a regression', 

43} 

44 

45 

46class ExcuseDependency(object): 

47 """Object to represent a specific dependency of an excuse on a package 

48 (source or binary) or on other excuses""" 

49 

50 def __init__(self, spec: DependencySpec, depstates: list[DependencyState]): 

51 """ 

52 :param: spec: DependencySpec 

53 :param: depstates: list of DependencyState, each of which can satisfy 

54 the dependency 

55 """ 

56 self.spec = spec 

57 self.depstates = depstates 

58 

59 @property 

60 def deptype(self): 

61 return self.spec.deptype 

62 

63 @property 

64 def valid(self) -> bool: 

65 if {d for d in self.depstates if d.valid}: 

66 return True 

67 else: 

68 return False 

69 

70 @property 

71 def deps(self): 

72 return {d.dep for d in self.depstates} 

73 

74 @property 

75 def possible(self) -> bool: 

76 if {d for d in self.depstates if d.possible}: 

77 return True 

78 else: 

79 return False 

80 

81 @property 

82 def first_dep(self): 

83 """return the first valid dependency, if there is one, otherwise the 

84 first possible one 

85 

86 return None if there are only impossible dependencies 

87 """ 

88 first = None 

89 for d in self.depstates: 

90 if d.valid: 

91 return d.dep 

92 elif d.possible and not first: 

93 first = d.dep 

94 return first 

95 

96 @property 

97 def first_impossible_dep(self): 

98 """return the first impossible dependency, if there is one""" 

99 first = None 

100 for d in self.depstates: 100 ↛ 104line 100 didn't jump to line 104, because the loop on line 100 didn't complete

101 if not d.possible: 101 ↛ 100line 101 didn't jump to line 100, because the condition on line 101 was never false

102 assert isinstance(d, ImpossibleDependencyState) # for type checking 

103 return d.desc 

104 return first 

105 

106 @property 

107 def verdict(self) -> PolicyVerdict: 

108 return min({d.verdict for d in self.depstates}) 

109 

110 def invalidate(self, excuse, verdict: PolicyVerdict) -> bool: 

111 """invalidate the dependencies on a specific excuse 

112 

113 :param excuse: the excuse which is no longer valid 

114 :param verdict: the PolicyVerdict causing the invalidation 

115 """ 

116 valid_alternative_left = False 

117 for ds in self.depstates: 

118 if ds.dep == excuse: 

119 ds.invalidate(verdict) 

120 elif ds.valid: 

121 valid_alternative_left = True 

122 

123 return valid_alternative_left 

124 

125 

126class Excuse(object): 

127 """Excuse class 

128 

129 This class represents an update excuse, which is a detailed explanation 

130 of why a package can or cannot be updated in the testing distribution from 

131 a newer package in another distribution (like for example unstable). 

132 

133 The main purpose of the excuses is to be written in an HTML file which 

134 will be published over HTTP. The maintainers will be able to parse it 

135 manually or automatically to find the explanation of why their packages 

136 have been updated or not. 

137 """ 

138 

139 # @var reemail 

140 # Regular expression for removing the email address 

141 reemail = re.compile(r" *<.*?>") 

142 

143 def __init__(self, migrationitem: MigrationItem): 

144 """Class constructor 

145 

146 This method initializes the excuse with the specified name and 

147 the default values. 

148 """ 

149 self.item = migrationitem 

150 self.ver = ("-", "-") 

151 self.maint = None 

152 self.daysold: Optional[int] = None 

153 self.mindays = None 

154 self.section = None 

155 self._is_valid = False 

156 self.needs_approval = False 

157 self.hints = [] 

158 self.forced = False 

159 self._policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

160 

161 self.all_deps: list[ExcuseDependency] = [] 

162 self.unsatisfiable_on_archs: list[str] = [] 

163 self.unsat_deps = defaultdict(set) 

164 self.newbugs = set() 

165 self.oldbugs = set() 

166 self.reason = {} 

167 self.htmlline = [] 

168 self.missing_builds = set() 

169 self.missing_builds_ood_arch = set() 

170 self.old_binaries = defaultdict(set) 

171 self.policy_info = {} 

172 self.verdict_info = defaultdict(list) 

173 self.infoline = [] 

174 self.detailed_info = [] 

175 self.dep_info_rendered = False 

176 

177 # packages (source and binary) that will migrate to testing if the 

178 # item from this excuse migrates 

179 self.packages = defaultdict(set) 

180 

181 # list of ExcuseDependency, with dependencies on packages 

182 self.depends_packages = [] 

183 # contains all PackageIds in any over the sets above 

184 self.depends_packages_flattened = set() 

185 

186 self.bounty = {} 

187 self.penalty = {} 

188 

189 # messenger from AutopkgtestPolicy to BlockPolicy 

190 self.autopkgtest_results = None 

191 

192 def sortkey(self) -> tuple[int, str]: 

193 if self.daysold is None: 

194 return (-1, self.uvname) 

195 return (self.daysold, self.uvname) 

196 

197 @property 

198 def name(self) -> str: 

199 return self.item.name 

200 

201 @property 

202 def uvname(self) -> str: 

203 return self.item.uvname 

204 

205 @property 

206 def source(self) -> str: 

207 return self.item.package 

208 

209 @property 

210 def is_valid(self) -> bool: 

211 return False if self._policy_verdict.is_rejected else True 

212 

213 @property 

214 def policy_verdict(self) -> PolicyVerdict: 

215 return self._policy_verdict 

216 

217 @policy_verdict.setter 

218 def policy_verdict(self, value): 

219 if value.is_rejected and self.forced: 219 ↛ 222line 219 didn't jump to line 222, because the condition on line 219 was never true

220 # By virtue of being forced, the item was hinted to 

221 # undo the rejection 

222 value = PolicyVerdict.PASS_HINTED 

223 self._policy_verdict = value 

224 

225 def set_vers(self, tver, uver): 

226 """Set the versions of the item from target and source suite""" 

227 if tver and uver: 

228 self.ver = (tver, uver) 

229 elif tver: 

230 self.ver = (tver, self.ver[1]) 

231 elif uver: 231 ↛ exitline 231 didn't return from function 'set_vers', because the condition on line 231 was never false

232 self.ver = (self.ver[0], uver) 

233 

234 def set_maint(self, maint): 

235 """Set the package maintainer's name""" 

236 self.maint = self.reemail.sub("", maint) 

237 

238 def set_section(self, section): 

239 """Set the section of the package""" 

240 self.section = section 

241 

242 def add_dependency(self, dep, spec: DependencySpec): 

243 """Add a dependency of type deptype 

244 

245 :param dep: set with names of excuses, each of which satisfies the dep 

246 :param spec: DependencySpec 

247 

248 """ 

249 

250 assert dep != frozenset(), "%s: Adding empty list of dependencies" % self.name 

251 

252 deps = [] 

253 try: 

254 # Casting to a sorted list makes excuses more 

255 # deterministic, but fails if the list has more than one 

256 # element *and* at least one DependencyState 

257 dep = sorted(dep) 

258 except TypeError: 

259 pass 

260 for d in dep: 

261 if isinstance(d, DependencyState): 

262 deps.append(d) 

263 else: 

264 deps.append(DependencyState(d)) 

265 ed = ExcuseDependency(spec, deps) 

266 self.all_deps.append(ed) 

267 if not ed.valid: 

268 self.do_invalidate(ed) 

269 return ed.valid 

270 

271 def get_deps(self): 

272 # the autohinter uses the excuses data to query dependencies between 

273 # excuses. For now, we keep the current behaviour by just returning 

274 # the data that was in the old deps set 

275 """ Get the dependencies of type DEPENDS """ 

276 deps = set() 

277 for dep in [d for d in self.all_deps if d.deptype == DependencyType.DEPENDS]: 

278 # add the first valid dependency 

279 for d in dep.depstates: 279 ↛ 277line 279 didn't jump to line 277, because the loop on line 279 didn't complete

280 if d.valid: 280 ↛ 279line 280 didn't jump to line 279, because the condition on line 280 was never false

281 deps.add(d.dep) 

282 break 

283 return deps 

284 

285 def add_unsatisfiable_on_arch(self, arch: str) -> None: 

286 """Add an arch that has unsatisfiable dependencies""" 

287 if arch not in self.unsatisfiable_on_archs: 

288 self.unsatisfiable_on_archs.append(arch) 

289 

290 def add_unsatisfiable_dep(self, signature, arch): 

291 """Add an unsatisfiable dependency""" 

292 self.unsat_deps[arch].add(signature) 

293 

294 def do_invalidate(self, dep: ExcuseDependency) -> None: 

295 """ 

296 param: dep: ExcuseDependency 

297 """ 

298 self.addreason(dep.deptype.get_reason()) 

299 self.policy_verdict = PolicyVerdict.worst_of(self.policy_verdict, dep.verdict) 

300 

301 def invalidate_dependency(self, name, verdict): 

302 """Invalidate dependency""" 

303 invalidate = False 

304 

305 for dep in self.all_deps: 

306 if not dep.invalidate(name, verdict): 

307 invalidate = True 

308 self.do_invalidate(dep) 

309 

310 return not invalidate 

311 

312 def setdaysold(self, daysold, mindays) -> None: 

313 """Set the number of days from the upload and the minimum number of days for the update""" 

314 self.daysold = daysold 

315 self.mindays = mindays 

316 

317 def force(self) -> bool: 

318 """Add force hint""" 

319 self.forced = True 

320 if self._policy_verdict.is_rejected: 

321 self._policy_verdict = PolicyVerdict.PASS_HINTED 

322 return True 

323 return False 

324 

325 def addinfo(self, note): 

326 """Add a note in HTML""" 

327 self.infoline.append(note) 

328 

329 def add_verdict_info(self, verdict, note): 

330 """Add a note to info about this verdict level""" 

331 self.verdict_info[verdict].append(note) 

332 

333 def add_detailed_info(self, note): 

334 """Add a note to detailed info""" 

335 self.detailed_info.append(note) 

336 

337 def missing_build_on_arch(self, arch): 

338 """Note that the item is missing a build on a given architecture""" 

339 self.missing_builds.add(arch) 

340 

341 def missing_build_on_ood_arch(self, arch): 

342 """Note that the item is missing a build on a given "out of date" architecture""" 

343 self.missing_builds_ood_arch.add(arch) 

344 

345 def add_old_binary(self, binary, from_source_version): 

346 """Denote than an old binary ("cruft") is available from a previous source version""" 

347 self.old_binaries[from_source_version].add(binary) 

348 

349 def add_hint(self, hint): 

350 self.hints.append(hint) 

351 

352 def add_package(self, pkg_id): 

353 self.packages[pkg_id.architecture].add(pkg_id) 

354 

355 def add_package_depends(self, spec: DependencySpec, depends) -> None: 

356 """Add dependency on a package (source or binary) 

357 

358 :param spec: DependencySpec 

359 :param depends: set of PackageIds (source or binary), each of which can satisfy the dependency 

360 """ 

361 

362 assert depends != frozenset(), "%s: Adding empty list of package dependencies" % self.name 

363 

364 # we use DependencyState for consistency with excuse dependencies, but 

365 # package dependencies are never invalidated, they are used to add 

366 # excuse dependencies (in invalidate_excuses()), and these are 

367 # (potentially) invalidated 

368 ed = ExcuseDependency(spec, [DependencyState(d) for d in depends]) 

369 self.depends_packages.append(ed) 

370 self.depends_packages_flattened |= depends 

371 

372 def _format_verdict_summary(self) -> str: 

373 verdict = self._policy_verdict 

374 if verdict in VERDICT2DESC: 374 ↛ 376line 374 didn't jump to line 376, because the condition on line 374 was never false

375 return VERDICT2DESC[verdict] 

376 return "UNKNOWN: Missing description for {0} - Please file a bug against Britney".format(verdict.name) 

377 

378 def _render_dep_issues(self, excuses) -> None: 

379 if self.dep_info_rendered: 

380 return 

381 

382 dep_issues = defaultdict(set) 

383 for d in self.all_deps: 

384 info = "" 

385 if not d.possible: 

386 desc = d.first_impossible_dep 

387 info = "Impossible %s: %s -> %s" % (d.deptype, self.uvname, desc) 

388 else: 

389 dep = d.first_dep 

390 duv = excuses[dep].uvname 

391 # Make sure we link to package names 

392 duv_src = duv.split("/")[0] 

393 verdict = excuses[dep].policy_verdict 

394 if not d.valid or verdict in (PolicyVerdict.REJECTED_NEEDS_APPROVAL, 

395 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT, 

396 PolicyVerdict.REJECTED_PERMANENTLY): 

397 info = "%s: %s <a href=\"#%s\">%s</a> (not considered)" % (d.deptype, self.uvname, duv_src, duv) 

398 if not d.valid: 

399 dep_issues[d.verdict].add("Invalidated by %s" % d.deptype.get_description()) 

400 else: 

401 info = "%s: %s <a href=\"#%s\">%s</a>" % (d.deptype, self.uvname, duv_src, duv) 

402 dep_issues[d.verdict].add(info) 

403 

404 seen = set() 

405 for v in sorted(dep_issues.keys(), reverse=True): 

406 for i in sorted(dep_issues[v]): 

407 if i not in seen: 407 ↛ 406line 407 didn't jump to line 406, because the condition on line 407 was never false

408 self.add_verdict_info(v, i) 

409 seen.add(i) 

410 

411 self.dep_info_rendered = True 

412 

413 def html(self, excuses) -> str: 

414 """Render the excuse in HTML""" 

415 res = "<a id=\"%s\" name=\"%s\">%s</a> (%s to %s)\n<ul>\n" % \ 

416 (self.uvname, self.uvname, self.uvname, self.ver[0], self.ver[1]) 

417 info = self._text(excuses) 

418 indented = False 

419 for line in info: 

420 stripped_this_line = False 

421 if line.startswith("∙ ∙ "): 

422 line = line[4:] 

423 stripped_this_line = True 

424 if not indented and stripped_this_line: 

425 res += "<ul>\n" 

426 indented = True 

427 elif indented and not stripped_this_line: 

428 res += "</ul>\n" 

429 indented = False 

430 res += "<li>%s\n" % line 

431 if indented: 

432 res += "</ul>\n" 

433 res = res + "</ul>\n" 

434 return res 

435 

436 def setbugs(self, oldbugs, newbugs): 

437 """"Set the list of old and new bugs""" 

438 self.newbugs.update(newbugs) 

439 self.oldbugs.update(oldbugs) 

440 

441 def addreason(self, reason) -> None: 

442 """"adding reason""" 

443 self.reason[reason] = 1 

444 

445 def hasreason(self, reason) -> bool: 

446 return reason in self.reason 

447 

448 def _text(self, excuses): 

449 """Render the excuse in text""" 

450 self._render_dep_issues(excuses) 

451 res = [] 

452 res.append( 

453 "Migration status for %s (%s to %s): %s" % 

454 (self.uvname, self.ver[0], self.ver[1], self._format_verdict_summary())) 

455 if not self.is_valid: 

456 res.append("Issues preventing migration:") 

457 for v in sorted(self.verdict_info.keys(), reverse=True): 

458 for x in self.verdict_info[v]: 

459 res.append("∙ ∙ " + x + "") 

460 if self.infoline: 

461 res.append("Additional info:") 

462 for x in self.infoline: 

463 res.append("∙ ∙ " + x + "") 

464 if self.htmlline: 464 ↛ 465line 464 didn't jump to line 465, because the condition on line 464 was never true

465 res.append("Legacy info:") 

466 for x in self.htmlline: 

467 res.append("∙ ∙ " + x + "") 

468 return res 

469 

470 def excusedata(self, excuses): 

471 """Render the excuse in as key-value data""" 

472 excusedata = {} 

473 excusedata["excuses"] = self._text(excuses) 

474 excusedata["item-name"] = self.uvname 

475 excusedata["source"] = self.source 

476 excusedata["migration-policy-verdict"] = self._policy_verdict.name 

477 excusedata["old-version"] = self.ver[0] 

478 excusedata["new-version"] = self.ver[1] 

479 if self.maint: 

480 excusedata['maintainer'] = self.maint 

481 if self.section and self.section.find("/") > -1: 

482 excusedata['component'] = self.section.split('/')[0] 

483 if self.policy_info: 

484 excusedata['policy_info'] = self.policy_info 

485 if self.missing_builds or self.missing_builds_ood_arch: 

486 excusedata['missing-builds'] = { 

487 'on-architectures': sorted(self.missing_builds), 

488 'on-unimportant-architectures': sorted(self.missing_builds_ood_arch), 

489 } 

490 if {d for d in self.all_deps if not d.valid and d.possible}: 

491 excusedata['invalidated-by-other-package'] = True 

492 if self.all_deps \ 

493 or self.unsat_deps: 

494 excusedata['dependencies'] = dep_data = {} 

495 

496 migrate_after = set(d.first_dep for d in self.all_deps if d.valid) 

497 blocked_by = set(d.first_dep for d in self.all_deps 

498 if not d.valid and d.possible) 

499 

500 def sorted_uvnames(deps): 

501 return sorted(excuses[d].uvname for d in deps) 

502 

503 if blocked_by: 

504 dep_data['blocked-by'] = sorted_uvnames(blocked_by) 

505 if migrate_after: 

506 dep_data['migrate-after'] = sorted_uvnames(migrate_after) 

507 if self.unsat_deps: 507 ↛ 508line 507 didn't jump to line 508, because the condition on line 507 was never true

508 dep_data['unsatisfiable-dependencies'] = {x: sorted(self.unsat_deps[x]) for x in self.unsat_deps} 

509 if self.needs_approval: 

510 status = 'not-approved' 

511 if any(h.type == 'unblock' for h in self.hints): 

512 status = 'approved' 

513 excusedata['manual-approval-status'] = status 

514 if self.hints: 

515 hint_info = [{ 

516 'hint-type': h.type, 

517 'hint-from': h.user, 

518 } for h in self.hints] 

519 

520 excusedata['hints'] = hint_info 

521 if self.old_binaries: 

522 excusedata['old-binaries'] = {x: sorted(self.old_binaries[x]) for x in self.old_binaries} 

523 if self.forced: 

524 excusedata["forced-reason"] = sorted(list(self.reason.keys())) 

525 excusedata["reason"] = [] 

526 else: 

527 excusedata["reason"] = sorted(list(self.reason.keys())) 

528 excusedata["is-candidate"] = self.is_valid 

529 if self.detailed_info: 

530 di = [] 

531 for x in self.detailed_info: 

532 di.append("" + x + "") 

533 excusedata["detailed-info"] = di 

534 return excusedata 

535 

536 def add_bounty(self, policy, bounty) -> None: 

537 """"adding bounty""" 

538 self.bounty[policy] = bounty 

539 

540 def add_penalty(self, policy, penalty) -> None: 

541 """"adding penalty""" 

542 self.penalty[policy] = penalty