Coverage for britney2/excusefinder.py: 92%

344 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-03-23 07:34 +0000

1import logging 

2import optparse 

3from itertools import chain 

4from typing import TYPE_CHECKING, Any, Optional, cast 

5from collections.abc import Iterable 

6from urllib.parse import quote 

7 

8import apt_pkg 

9 

10from britney2 import BinaryPackage, BinaryPackageId, PackageId, Suites 

11from britney2.excuse import Excuse 

12from britney2.migrationitem import MigrationItem, MigrationItemFactory 

13from britney2.policies import PolicyVerdict 

14from britney2.utils import find_smooth_updateable_binaries, invalidate_excuses 

15 

16if TYPE_CHECKING: 16 ↛ 17line 16 didn't jump to line 17, because the condition on line 16 was never true

17 from .hints import HintCollection 

18 from .installability.universe import BinaryPackageUniverse 

19 from .policies.policy import PolicyEngine 

20 

21 

22class ExcuseFinder(object): 

23 

24 def __init__( 

25 self, 

26 options: optparse.Values, 

27 suite_info: Suites, 

28 all_binaries: dict[BinaryPackageId, BinaryPackage], 

29 pkg_universe: "BinaryPackageUniverse", 

30 policy_engine: "PolicyEngine", 

31 mi_factory: MigrationItemFactory, 

32 hints: "HintCollection", 

33 ) -> None: 

34 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

35 self.logger = logging.getLogger(logger_name) 

36 self.options = options 

37 self.suite_info = suite_info 

38 self.all_binaries = all_binaries 

39 self.pkg_universe = pkg_universe 

40 self._policy_engine = policy_engine 

41 self._migration_item_factory = mi_factory 

42 self.hints = hints 

43 self.excuses: dict[str, Excuse] = {} 

44 

45 def _get_build_link( 

46 self, arch: str, src: str, ver: str, label: Optional[str] = None 

47 ) -> str: 

48 """Return a link to the build logs, labelled 'arch' per default""" 

49 if label is None: 

50 label = arch 

51 if self.options.build_url: 

52 url = self.options.build_url.format( 

53 arch=arch, source=quote(src), version=quote(ver) 

54 ) 

55 return '<a href="%s" target="_blank">%s</a>' % (url, label) 

56 else: 

57 return label 

58 

59 def _should_remove_source(self, item: MigrationItem) -> bool: 

60 """Check if a source package should be removed from testing 

61 

62 This method checks if a source package should be removed from the 

63 target suite; this happens if the source package is not 

64 present in the primary source suite anymore. 

65 

66 It returns True if the package can be removed, False otherwise. 

67 In the former case, a new excuse is appended to the object 

68 attribute excuses. 

69 """ 

70 if hasattr(self.options, "partial_source"): 70 ↛ 71line 70 didn't jump to line 71, because the condition on line 70 was never true

71 return False 

72 # if the source package is available in unstable, then do nothing 

73 source_suite = self.suite_info.primary_source_suite 

74 pkg = item.package 

75 if pkg in source_suite.sources: 75 ↛ 76line 75 didn't jump to line 76, because the condition on line 75 was never true

76 return False 

77 # otherwise, add a new excuse for its removal 

78 src = item.suite.sources[pkg] 

79 excuse = Excuse(item) 

80 excuse.addinfo("Package not in %s, will try to remove" % source_suite.name) 

81 excuse.set_vers(src.version, None) 

82 if src.maintainer: 

83 excuse.set_maint(src.maintainer) 

84 if src.section: 84 ↛ 88line 84 didn't jump to line 88, because the condition on line 84 was never false

85 excuse.set_section(src.section) 

86 

87 # if the package is blocked, skip it 

88 for hint in self.hints.search("block", package=pkg, removal=True): 

89 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

90 excuse.add_verdict_info( 

91 excuse.policy_verdict, 

92 "Not touching package, as requested by %s " 

93 "(contact %s-release if update is needed)" 

94 % (hint.user, self.options.distribution), 

95 ) 

96 excuse.addreason("block") 

97 self.excuses[excuse.name] = excuse 

98 return False 

99 

100 excuse.policy_verdict = PolicyVerdict.PASS 

101 self.excuses[excuse.name] = excuse 

102 return True 

103 

104 def _should_upgrade_srcarch(self, item: MigrationItem) -> bool: 

105 """Check if a set of binary packages should be upgraded 

106 

107 This method checks if the binary packages produced by the source 

108 package on the given architecture should be upgraded; this can 

109 happen also if the migration is a binary-NMU for the given arch. 

110 

111 It returns False if the given packages don't need to be upgraded, 

112 True otherwise. In the former case, a new excuse is appended to 

113 the object attribute excuses. 

114 """ 

115 # retrieve the source packages for testing and suite 

116 

117 target_suite = self.suite_info.target_suite 

118 source_suite = item.suite 

119 src = item.package 

120 arch = item.architecture 

121 source_t = target_suite.sources[src] 

122 source_u = source_suite.sources[src] 

123 

124 excuse = Excuse(item) 

125 excuse.set_vers(source_t.version, source_t.version) 

126 if source_u.maintainer: 126 ↛ 128line 126 didn't jump to line 128, because the condition on line 126 was never false

127 excuse.set_maint(source_u.maintainer) 

128 if source_u.section: 128 ↛ 135line 128 didn't jump to line 135, because the condition on line 128 was never false

129 excuse.set_section(source_u.section) 

130 

131 # if there is a `remove' hint and the requested version is the same as the 

132 # version in testing, then stop here and return False 

133 # (as a side effect, a removal may generate such excuses for both the source 

134 # package and its binary packages on each architecture) 

135 for hint in self.hints.search("remove", package=src, version=source_t.version): 

136 excuse.add_hint(hint) 

137 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

138 excuse.add_verdict_info( 

139 excuse.policy_verdict, "Removal request by %s" % (hint.user) 

140 ) 

141 excuse.add_verdict_info( 

142 excuse.policy_verdict, "Trying to remove package, not update it" 

143 ) 

144 self.excuses[excuse.name] = excuse 

145 return False 

146 

147 # the starting point is that there is nothing wrong and nothing worth doing 

148 anywrongver = False 

149 anyworthdoing = False 

150 

151 packages_t_a = target_suite.binaries[arch] 

152 packages_s_a = source_suite.binaries[arch] 

153 

154 wrong_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

155 

156 # for every binary package produced by this source in unstable for this architecture 

157 for pkg_id in sorted(x for x in source_u.binaries if x.architecture == arch): 

158 pkg_name = pkg_id.package_name 

159 # TODO filter binaries based on checks below? 

160 excuse.add_package(pkg_id) 

161 

162 # retrieve the testing (if present) and unstable corresponding binary packages 

163 binary_t = packages_t_a[pkg_name] if pkg_name in packages_t_a else None 

164 binary_u = packages_s_a[pkg_name] 

165 

166 # this is the source version for the new binary package 

167 pkgsv = binary_u.source_version 

168 

169 # if the new binary package is architecture-independent, then skip it 

170 if binary_u.architecture == "all": 

171 if pkg_id not in source_t.binaries: 

172 # only add a note if the arch:all does not match the expected version 

173 excuse.add_detailed_info( 

174 "Ignoring %s %s (from %s) as it is arch: all" 

175 % (pkg_name, binary_u.version, pkgsv) 

176 ) 

177 continue 

178 

179 # if the new binary package is not from the same source as the testing one, then skip it 

180 # this implies that this binary migration is part of a source migration 

181 if source_u.version == pkgsv and source_t.version != pkgsv: 181 ↛ 182line 181 didn't jump to line 182, because the condition on line 181 was never true

182 anywrongver = True 

183 excuse.add_verdict_info( 

184 wrong_verdict, 

185 "From wrong source: %s %s (%s not %s)" 

186 % (pkg_name, binary_u.version, pkgsv, source_t.version), 

187 ) 

188 continue 

189 

190 # cruft in unstable 

191 if source_u.version != pkgsv and source_t.version != pkgsv: 

192 if self.options.ignore_cruft: 

193 excuse.add_detailed_info( 

194 "Old cruft: %s %s (but ignoring cruft, so nevermind)" 

195 % (pkg_name, pkgsv) 

196 ) 

197 else: 

198 anywrongver = True 

199 excuse.add_verdict_info( 

200 wrong_verdict, "Old cruft: %s %s" % (pkg_name, pkgsv) 

201 ) 

202 continue 

203 

204 # if the source package has been updated in unstable and this is a binary migration, skip it 

205 # (the binaries are now out-of-date) 

206 if source_t.version == pkgsv and source_t.version != source_u.version: 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true

207 anywrongver = True 

208 excuse.add_verdict_info( 

209 wrong_verdict, 

210 "From wrong source: %s %s (%s not %s)" 

211 % (pkg_name, binary_u.version, pkgsv, source_u.version), 

212 ) 

213 continue 

214 

215 # if the binary is not present in testing, then it is a new binary; 

216 # in this case, there is something worth doing 

217 if not binary_t: 

218 excuse.add_detailed_info( 

219 "New binary: %s (%s)" % (pkg_name, binary_u.version) 

220 ) 

221 anyworthdoing = True 

222 continue 

223 

224 # at this point, the binary package is present in testing, so we can compare 

225 # the versions of the packages ... 

226 vcompare = apt_pkg.version_compare(binary_t.version, binary_u.version) 

227 

228 # ... if updating would mean downgrading, then stop here: there is something wrong 

229 if vcompare > 0: 229 ↛ 230line 229 didn't jump to line 230, because the condition on line 229 was never true

230 anywrongver = True 

231 excuse.add_verdict_info( 

232 wrong_verdict, 

233 "Not downgrading: %s (%s to %s)" 

234 % (pkg_name, binary_t.version, binary_u.version), 

235 ) 

236 break 

237 # ... if updating would mean upgrading, then there is something worth doing 

238 elif vcompare < 0: 

239 excuse.add_detailed_info( 

240 "Updated binary: %s (%s to %s)" 

241 % (pkg_name, binary_t.version, binary_u.version) 

242 ) 

243 anyworthdoing = True 

244 

245 srcv = source_u.version 

246 same_source = source_t.version == srcv 

247 primary_source_suite = self.suite_info.primary_source_suite 

248 is_primary_source = source_suite == primary_source_suite 

249 

250 # if there is nothing wrong and there is something worth doing or the source 

251 # package is not fake, then check what packages should be removed 

252 if not anywrongver and (anyworthdoing or not source_u.is_fakesrc): 

253 # we want to remove binaries that are no longer produced by the 

254 # new source, but there are some special cases: 

255 # - if this is binary-only (same_source) and not from the primary 

256 # source, we don't do any removals: 

257 # binNMUs in *pu on some architectures would otherwise result in 

258 # the removal of binaries on other architectures 

259 # - for the primary source, smooth binaries in the target suite 

260 # are not considered for removal 

261 if not same_source or is_primary_source: 

262 smoothbins = set() 

263 if is_primary_source: 263 ↛ 281line 263 didn't jump to line 281, because the condition on line 263 was never false

264 binaries_t = target_suite.binaries 

265 possible_smooth_updates = [ 

266 p for p in source_t.binaries if p.architecture == arch 

267 ] 

268 smoothbins = find_smooth_updateable_binaries( 

269 possible_smooth_updates, 

270 source_u, 

271 self.pkg_universe, 

272 target_suite, 

273 binaries_t, 

274 source_suite.binaries, 

275 cast(frozenset["BinaryPackageId"], frozenset()), 

276 self.options.smooth_updates, 

277 self.hints, 

278 ) 

279 

280 # for every binary package produced by this source in testing for this architecture 

281 for pkg_id in sorted( 

282 x for x in source_t.binaries if x.architecture == arch 

283 ): 

284 pkg = pkg_id.package_name 

285 # if the package is architecture-independent, then ignore it 

286 tpkg_data = packages_t_a[pkg] 

287 if tpkg_data.architecture == "all": 

288 if pkg_id not in source_u.binaries: 

289 # only add a note if the arch:all does not match the expected version 

290 excuse.add_detailed_info( 

291 "Ignoring removal of %s as it is arch: all" % (pkg) 

292 ) 

293 continue 

294 # if the package is not produced by the new source package, then remove it from testing 

295 if pkg not in packages_s_a: 

296 excuse.add_detailed_info( 

297 "Removed binary: %s %s" % (pkg, tpkg_data.version) 

298 ) 

299 # the removed binary is only interesting if this is a binary-only migration, 

300 # as otherwise the updated source will already cause the binary packages 

301 # to be updated 

302 if same_source and pkg_id not in smoothbins: 

303 # Special-case, if the binary is a candidate for a smooth update, we do not consider 

304 # it "interesting" on its own. This case happens quite often with smooth updatable 

305 # packages, where the old binary "survives" a full run because it still has 

306 # reverse dependencies. 

307 anyworthdoing = True 

308 

309 if not anyworthdoing and not ( 

310 self.options.archall_inconsistency_allowed and excuse.detailed_info 

311 ): 

312 # nothing worth doing, we don't add an excuse to the list, we just return false 

313 return False 

314 

315 if not anyworthdoing: 

316 # This source has binary differences between the target and source 

317 # suite, but we're not going to upgrade them. Part of the purpose 

318 # of options.archall_inconsistency_allowed is to log the excuse 

319 # with a temporary failure such that the administrators can take 

320 # action so they wish. 

321 excuse.policy_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

322 excuse.addreason("everything-ignored") 

323 

324 else: 

325 # there is something worth doing 

326 # we assume that this package will be ok, if not invalidated below 

327 excuse.policy_verdict = PolicyVerdict.PASS 

328 

329 # if there is something something wrong, reject this package 

330 if anywrongver: 

331 excuse.policy_verdict = wrong_verdict 

332 

333 self._policy_engine.apply_srcarch_policies( 

334 item, arch, source_t, source_u, excuse 

335 ) 

336 

337 self.excuses[excuse.name] = excuse 

338 return excuse.is_valid 

339 

340 def _should_upgrade_src(self, item: MigrationItem) -> bool: 

341 """Check if source package should be upgraded 

342 

343 This method checks if a source package should be upgraded. The analysis 

344 is performed for the source package specified by the `src' parameter, 

345 for the distribution `source_suite'. 

346 

347 It returns False if the given package doesn't need to be upgraded, 

348 True otherwise. In the former case, a new excuse is appended to 

349 the object attribute excuses. 

350 """ 

351 

352 src = item.package 

353 source_suite = item.suite 

354 suite_name = source_suite.name 

355 source_u = source_suite.sources[src] 

356 if source_u.is_fakesrc: 356 ↛ 358line 356 didn't jump to line 358, because the condition on line 356 was never true

357 # it is a fake package created to satisfy Britney implementation details; silently ignore it 

358 return False 

359 

360 target_suite = self.suite_info.target_suite 

361 # retrieve the source packages for testing (if available) and suite 

362 if src in target_suite.sources: 

363 source_t = target_suite.sources[src] 

364 # if testing and unstable have the same version, then this is a candidate for binary-NMUs only 

365 if apt_pkg.version_compare(source_t.version, source_u.version) == 0: 365 ↛ 366line 365 didn't jump to line 366, because the condition on line 365 was never true

366 return False 

367 else: 

368 source_t = None 

369 

370 excuse = Excuse(item) 

371 excuse.set_vers(source_t and source_t.version or None, source_u.version) 

372 if source_u.maintainer: 372 ↛ 374line 372 didn't jump to line 374, because the condition on line 372 was never false

373 excuse.set_maint(source_u.maintainer) 

374 if source_u.section: 374 ↛ 376line 374 didn't jump to line 376, because the condition on line 374 was never false

375 excuse.set_section(source_u.section) 

376 excuse.add_package(PackageId(src, source_u.version, "source")) 

377 

378 # if the version in unstable is older, then stop here with a warning in the excuse and return False 

379 if source_t and apt_pkg.version_compare(source_u.version, source_t.version) < 0: 

380 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

381 excuse.add_verdict_info( 

382 excuse.policy_verdict, 

383 "ALERT: %s is newer in the target suite (%s %s)" 

384 % (src, source_t.version, source_u.version), 

385 ) 

386 self.excuses[excuse.name] = excuse 

387 excuse.addreason("newerintesting") 

388 return False 

389 

390 # the starting point is that we will update the candidate 

391 excuse.policy_verdict = PolicyVerdict.PASS 

392 

393 # if there is a `remove' hint and the requested version is the same as the 

394 # version in testing, then stop here and return False 

395 for hint in self.hints.search("remove", package=src): 

396 if ( 396 ↛ 395line 396 didn't jump to line 395

397 source_t 

398 and source_t.version == hint.version 

399 or source_u.version == hint.version 

400 ): 

401 excuse.add_hint(hint) 

402 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

403 excuse.add_verdict_info( 

404 excuse.policy_verdict, "Removal request by %s" % (hint.user) 

405 ) 

406 excuse.add_verdict_info( 

407 excuse.policy_verdict, "Trying to remove package, not update it" 

408 ) 

409 break 

410 

411 all_binaries = self.all_binaries 

412 

413 # at this point, we check the status of the builds on all the supported architectures 

414 # to catch the out-of-date ones 

415 archs_to_consider = list(self.options.architectures) 

416 archs_to_consider.append("all") 

417 for arch in archs_to_consider: 

418 oodbins: dict[str, set[str]] = {} 

419 uptodatebins = False 

420 # for every binary package produced by this source in the suite for this architecture 

421 if arch == "all": 

422 consider_binaries: Iterable[BinaryPackageId] = source_u.binaries 

423 else: 

424 # Will also include arch:all for the given architecture (they are filtered out 

425 # below) 

426 consider_binaries = sorted( 

427 x for x in source_u.binaries if x.architecture == arch 

428 ) 

429 for pkg_id in consider_binaries: 

430 pkg = pkg_id.package_name 

431 

432 # retrieve the binary package and its source version 

433 binary_u = all_binaries[pkg_id] 

434 pkgsv = binary_u.source_version 

435 

436 # arch:all packages are treated separately from arch:arch 

437 if binary_u.architecture != arch: 

438 continue 

439 

440 # TODO filter binaries based on checks below? 

441 excuse.add_package(pkg_id) 

442 

443 # if it wasn't built by the same source, it is out-of-date 

444 # if there is at least one binary on this arch which is 

445 # up-to-date, there is a build on this arch 

446 if source_u.version != pkgsv or pkg_id.architecture == "faux": 

447 if pkgsv not in oodbins: 

448 oodbins[pkgsv] = set() 

449 oodbins[pkgsv].add(pkg) 

450 if pkg_id.architecture != "faux": 

451 excuse.add_old_binary(pkg, pkgsv) 

452 continue 

453 else: 

454 uptodatebins = True 

455 

456 # if there are out-of-date packages, warn about them in the excuse and set excuse.is_valid 

457 # to False to block the update; if the architecture where the package is out-of-date is 

458 # in the `outofsync_arches' list, then do not block the update 

459 if oodbins: 

460 oodtxt = "" 

461 for v in sorted(oodbins): 

462 if oodtxt: 462 ↛ 463line 462 didn't jump to line 463, because the condition on line 462 was never true

463 oodtxt = oodtxt + "; " 

464 oodtxt = oodtxt + "%s (from %s)" % ( 

465 ", ".join(sorted(oodbins[v])), 

466 self._get_build_link(arch, src, v, label=v), 

467 ) 

468 

469 if uptodatebins: 

470 text = "old binaries left on %s: %s" % ( 

471 self._get_build_link(arch, src, source_u.version), 

472 oodtxt, 

473 ) 

474 else: 

475 text = "missing build on %s" % ( 

476 self._get_build_link(arch, src, source_u.version) 

477 ) 

478 

479 if arch in self.options.outofsync_arches: 

480 text = text + " (but %s isn't keeping up, so nevermind)" % (arch) 

481 if not uptodatebins: 481 ↛ 417line 481 didn't jump to line 417, because the condition on line 481 was never false

482 excuse.missing_build_on_ood_arch(arch) 

483 else: 

484 if uptodatebins: 

485 if self.options.ignore_cruft: 

486 text = text + " (but ignoring cruft, so nevermind)" 

487 excuse.add_detailed_info(text) 

488 else: 

489 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

490 excuse.addreason("cruft") 

491 excuse.add_verdict_info(excuse.policy_verdict, text) 

492 else: 

493 excuse.policy_verdict = ( 

494 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

495 ) 

496 excuse.missing_build_on_arch(arch) 

497 excuse.addreason("missingbuild") 

498 excuse.add_verdict_info(excuse.policy_verdict, text) 

499 if excuse.old_binaries: 

500 excuse.add_detailed_info( 

501 "old binaries on %s: %s" % (arch, oodtxt) 

502 ) 

503 

504 # if the source package has no binaries, set is_valid to False to block the update 

505 if not {x for x in source_u.binaries if x[2] != "faux"}: 

506 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

507 excuse.add_verdict_info( 

508 excuse.policy_verdict, "%s has no binaries on any arch" % src 

509 ) 

510 excuse.addreason("no-binaries") 

511 

512 self._policy_engine.apply_src_policies(item, source_t, source_u, excuse) 

513 

514 if source_suite.suite_class.is_additional_source and source_t: 

515 # o-o-d(ish) checks for (t-)p-u 

516 # This only makes sense if the package is actually in testing. 

517 for arch in self.options.architectures: 

518 # if the package in testing has no binaries on this 

519 # architecture, it can't be out-of-date 

520 if not any( 

521 x 

522 for x in source_t.binaries 

523 if x.architecture == arch and all_binaries[x].architecture != "all" 

524 ): 

525 continue 

526 

527 # if the (t-)p-u package has produced any binaries on 

528 # this architecture then we assume it's ok. this allows for 

529 # uploads to (t-)p-u which intentionally drop binary 

530 # packages 

531 if any( 

532 x 

533 for x in source_suite.binaries[arch].values() 

534 if x.source == src 

535 and x.source_version == source_u.version 

536 and x.architecture != "all" 

537 ): 

538 continue 

539 

540 # TODO: Find a way to avoid hardcoding pu/stable relation. 

541 if suite_name == "pu": 541 ↛ 542line 541 didn't jump to line 542, because the condition on line 541 was never true

542 base = "stable" 

543 else: 

544 base = target_suite.name 

545 text = "Not yet built on %s (relative to target suite)" % ( 

546 self._get_build_link(arch, src, source_u.version) 

547 ) 

548 

549 if arch in self.options.outofsync_arches: 549 ↛ 550line 549 didn't jump to line 550, because the condition on line 549 was never true

550 text = text + " (but %s isn't keeping up, so never mind)" % (arch) 

551 excuse.missing_build_on_ood_arch(arch) 

552 excuse.addinfo(text) 

553 else: 

554 excuse.policy_verdict = ( 

555 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

556 ) 

557 excuse.missing_build_on_arch(arch) 

558 excuse.addreason("missingbuild") 

559 excuse.add_verdict_info(excuse.policy_verdict, text) 

560 

561 # check if there is a `force' hint for this package, which allows it to go in even if it is not updateable 

562 forces = self.hints.search("force", package=src, version=source_u.version) 

563 if forces: 

564 # force() updates the final verdict for us 

565 changed_state = excuse.force() 

566 if changed_state: 

567 excuse.addinfo("Should ignore, but forced by %s" % (forces[0].user)) 

568 

569 self.excuses[excuse.name] = excuse 

570 return excuse.is_valid 

571 

572 def _compute_excuses_and_initial_actionable_items(self) -> set[MigrationItem]: 

573 # list of local methods and variables (for better performance) 

574 excuses = self.excuses 

575 suite_info = self.suite_info 

576 pri_source_suite = suite_info.primary_source_suite 

577 architectures = self.options.architectures 

578 should_remove_source = self._should_remove_source 

579 should_upgrade_srcarch = self._should_upgrade_srcarch 

580 should_upgrade_src = self._should_upgrade_src 

581 

582 sources_ps = pri_source_suite.sources 

583 sources_t = suite_info.target_suite.sources 

584 

585 # this set will contain the packages which are valid candidates; 

586 # if a package is going to be removed, it will have a "-" prefix 

587 actionable_items: set[MigrationItem] = set() 

588 actionable_items_add = actionable_items.add # Every . in a loop slows it down 

589 

590 # for every source package in testing, check if it should be removed 

591 for pkg in sources_t: 

592 if pkg not in sources_ps: 

593 src_t = sources_t[pkg] 

594 item = MigrationItem( 

595 package=pkg, 

596 version=src_t.version, 

597 suite=suite_info.target_suite, 

598 is_removal=True, 

599 ) 

600 if should_remove_source(item): 

601 actionable_items_add(item) 

602 

603 # for every source package in the source suites, check if it should be upgraded 

604 for suite in chain((pri_source_suite, *suite_info.additional_source_suites)): 

605 sources_s = suite.sources 

606 for pkg in sources_s: 

607 src_s_data = sources_s[pkg] 

608 if src_s_data.is_fakesrc: 

609 continue 

610 src_t_data = sources_t.get(pkg) 

611 

612 if ( 

613 src_t_data is None 

614 or apt_pkg.version_compare(src_s_data.version, src_t_data.version) 

615 != 0 

616 ): 

617 item = MigrationItem( 

618 package=pkg, version=src_s_data.version, suite=suite 

619 ) 

620 # check if the source package should be upgraded 

621 if should_upgrade_src(item): 

622 actionable_items_add(item) 

623 else: 

624 # package has same version in source and target suite; check if any of the 

625 # binaries have changed on the various architectures 

626 for arch in architectures: 

627 item = MigrationItem( 

628 package=pkg, 

629 version=src_s_data.version, 

630 architecture=arch, 

631 suite=suite, 

632 ) 

633 if should_upgrade_srcarch(item): 

634 actionable_items_add(item) 

635 

636 # process the `remove' hints, if the given package is not yet in actionable_items 

637 for hint in self.hints["remove"]: 

638 src_r = hint.package 

639 if src_r not in sources_t: 

640 continue 

641 

642 existing_items = set(x for x in actionable_items if x.package == src_r) 

643 if existing_items: 

644 self.logger.info( 

645 "removal hint '%s' ignored due to existing item(s) %s" 

646 % (hint, [i.name for i in existing_items]) 

647 ) 

648 continue 

649 

650 tsrcv = sources_t[src_r].version 

651 item = MigrationItem( 

652 package=src_r, 

653 version=tsrcv, 

654 suite=suite_info.target_suite, 

655 is_removal=True, 

656 ) 

657 

658 # check if the version specified in the hint is the same as the considered package 

659 if tsrcv != hint.version: 659 ↛ 660line 659 didn't jump to line 660, because the condition on line 659 was never true

660 continue 

661 

662 # add the removal of the package to actionable_items and build a new excuse 

663 excuse = Excuse(item) 

664 excuse.set_vers(tsrcv, None) 

665 excuse.addinfo("Removal request by %s" % (hint.user)) 

666 # if the removal of the package is blocked, skip it 

667 blocked = False 

668 for blockhint in self.hints.search("block", package=src_r, removal=True): 

669 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

670 excuse.add_verdict_info( 

671 excuse.policy_verdict, 

672 "Not removing package, due to block hint by %s " 

673 "(contact %s-release if update is needed)" 

674 % (blockhint.user, self.options.distribution), 

675 ) 

676 excuse.addreason("block") 

677 blocked = True 

678 

679 if blocked: 

680 excuses[excuse.name] = excuse 

681 continue 

682 

683 actionable_items_add(item) 

684 excuse.addinfo("Package is broken, will try to remove") 

685 excuse.add_hint(hint) 

686 # Using "PASS" here as "Created by a hint" != "accepted due to hint". In a future 

687 # where there might be policy checks on removals, it would make sense to distinguish 

688 # those two states. Not sure that future will ever be. 

689 excuse.policy_verdict = PolicyVerdict.PASS 

690 excuses[excuse.name] = excuse 

691 

692 return actionable_items 

693 

694 def find_actionable_excuses(self) -> tuple[dict[str, Excuse], set[MigrationItem]]: 

695 excuses = self.excuses 

696 actionable_items = self._compute_excuses_and_initial_actionable_items() 

697 valid = {x.name for x in actionable_items} 

698 

699 # extract the not considered packages, which are in the excuses but not in upgrade_me 

700 unconsidered = {ename for ename in excuses if ename not in valid} 

701 invalidated: set[str] = set() 

702 

703 invalidate_excuses(excuses, valid, unconsidered, invalidated) 

704 

705 # check that the list of actionable items matches the list of valid 

706 # excuses 

707 assert_sets_equal(valid, {x for x in excuses if excuses[x].is_valid}) 

708 

709 # check that the rdeps for all invalid excuses were invalidated 

710 assert_sets_equal(invalidated, {x for x in excuses if not excuses[x].is_valid}) 

711 

712 actionable_items = {x for x in actionable_items if x.name in valid} 

713 return excuses, actionable_items 

714 

715 

716def assert_sets_equal(a: Any, b: Any) -> None: 

717 if a != b: 717 ↛ 718line 717 didn't jump to line 718, because the condition on line 717 was never true

718 raise AssertionError("sets not equal a-b {} b-a {}".format(a - b, b - a))