Coverage for britney2/excusefinder.py: 92%

347 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-06-17 09:00 +0000

1import logging 

2import optparse 

3from collections.abc import Iterable 

4from itertools import chain 

5from typing import TYPE_CHECKING, Any, cast 

6from urllib.parse import quote 

7 

8import apt_pkg 

9 

10from britney2 import BinaryPackage, BinaryPackageId, PackageId, Suites 

11from britney2.excuse import Excuse 

12from britney2.migrationitem import MigrationItem, MigrationItemFactory 

13from britney2.policies import PolicyVerdict 

14from britney2.utils import ( 

15 filter_out_faux_gen, 

16 find_smooth_updateable_binaries, 

17 invalidate_excuses, 

18) 

19 

20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true

21 from .hints import HintCollection 

22 from .installability.universe import BinaryPackageUniverse 

23 from .policies.policy import PolicyEngine 

24 

25 

26class ExcuseFinder: 

27 

28 def __init__( 

29 self, 

30 options: optparse.Values, 

31 suite_info: Suites, 

32 all_binaries: dict[BinaryPackageId, BinaryPackage], 

33 pkg_universe: "BinaryPackageUniverse", 

34 policy_engine: "PolicyEngine", 

35 mi_factory: MigrationItemFactory, 

36 hints: "HintCollection", 

37 ) -> None: 

38 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

39 self.logger = logging.getLogger(logger_name) 

40 self.options = options 

41 self.suite_info = suite_info 

42 self.all_binaries = all_binaries 

43 self.pkg_universe = pkg_universe 

44 self._policy_engine = policy_engine 

45 self._migration_item_factory = mi_factory 

46 self.hints = hints 

47 self.excuses: dict[str, Excuse] = {} 

48 

49 def _get_build_link( 

50 self, arch: str, src: str, ver: str, label: str | None = None 

51 ) -> str: 

52 """Return a link to the build logs, labelled 'arch' per default""" 

53 if label is None: 

54 label = arch 

55 if self.options.build_url: 

56 url = self.options.build_url.format( 

57 arch=arch, source=quote(src), version=quote(ver) 

58 ) 

59 return f'<a href="{url}" target="_blank">{label}</a>' 

60 else: 

61 return label 

62 

63 def _should_remove_source(self, item: MigrationItem) -> bool: 

64 """Check if a source package should be removed from testing 

65 

66 This method checks if a source package should be removed from the 

67 target suite; this happens if the source package is not 

68 present in the primary source suite anymore. 

69 

70 It returns True if the package can be removed, False otherwise. 

71 In the former case, a new excuse is appended to the object 

72 attribute excuses. 

73 """ 

74 if hasattr(self.options, "partial_source"): 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 return False 

76 # if the source package is available in unstable, then do nothing 

77 source_suite = self.suite_info.primary_source_suite 

78 pkg = item.package 

79 if pkg in source_suite.sources: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 return False 

81 # otherwise, add a new excuse for its removal 

82 src = item.suite.sources[pkg] 

83 excuse = Excuse(item) 

84 excuse.addinfo("Package not in %s, will try to remove" % source_suite.name) 

85 excuse.set_vers(src.version, None) 

86 if src.maintainer: 

87 excuse.set_maint(src.maintainer) 

88 if src.section: 88 ↛ 92line 88 didn't jump to line 92 because the condition on line 88 was always true

89 excuse.set_section(src.section) 

90 

91 # if the package is blocked, skip it 

92 if ( 

93 hint := self.hints.search_first("block", package=pkg, removal=True) 

94 ) is not None: 

95 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

96 excuse.add_verdict_info( 

97 excuse.policy_verdict, 

98 "Not touching package, as requested by %s " 

99 "(contact %s-release if update is needed)" 

100 % (hint.user, self.options.distribution), 

101 ) 

102 excuse.addreason("block") 

103 self.excuses[excuse.name] = excuse 

104 return False 

105 

106 excuse.policy_verdict = PolicyVerdict.PASS 

107 self.excuses[excuse.name] = excuse 

108 return True 

109 

110 def _should_upgrade_srcarch(self, item: MigrationItem) -> bool: 

111 """Check if a set of binary packages should be upgraded 

112 

113 This method checks if the binary packages produced by the source 

114 package on the given architecture should be upgraded; this can 

115 happen also if the migration is a binary-NMU for the given arch. 

116 

117 It returns False if the given packages don't need to be upgraded, 

118 True otherwise. In the former case, a new excuse is appended to 

119 the object attribute excuses. 

120 """ 

121 # retrieve the source packages for testing and suite 

122 

123 target_suite = self.suite_info.target_suite 

124 source_suite = item.suite 

125 src = item.package 

126 arch = item.architecture 

127 source_t = target_suite.sources[src] 

128 source_u = source_suite.sources[src] 

129 

130 excuse = Excuse(item) 

131 excuse.set_vers(source_t.version, source_t.version) 

132 if source_u.maintainer: 132 ↛ 134line 132 didn't jump to line 134 because the condition on line 132 was always true

133 excuse.set_maint(source_u.maintainer) 

134 if source_u.section: 134 ↛ 141line 134 didn't jump to line 141 because the condition on line 134 was always true

135 excuse.set_section(source_u.section) 

136 

137 # if there is a `remove' hint and the requested version is the same as the 

138 # version in testing, then stop here and return False 

139 # (as a side effect, a removal may generate such excuses for both the source 

140 # package and its binary packages on each architecture) 

141 if ( 

142 hint := self.hints.search_first( 

143 "remove", package=src, version=source_t.version 

144 ) 

145 ) is not None: 

146 excuse.add_hint(hint) 

147 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

148 excuse.add_verdict_info( 

149 excuse.policy_verdict, "Removal request by %s" % (hint.user) 

150 ) 

151 excuse.add_verdict_info( 

152 excuse.policy_verdict, "Trying to remove package, not update it" 

153 ) 

154 self.excuses[excuse.name] = excuse 

155 return False 

156 

157 # the starting point is that there is nothing wrong and nothing worth doing 

158 anywrongver = False 

159 anyworthdoing = False 

160 

161 packages_t_a = target_suite.binaries[arch] 

162 packages_s_a = source_suite.binaries[arch] 

163 

164 wrong_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

165 

166 # for every binary package produced by this source in unstable for this architecture 

167 for pkg_id in filter_out_faux_gen(source_u.binaries): 

168 if pkg_id.architecture != arch: 

169 continue 

170 

171 pkg_name = pkg_id.package_name 

172 # TODO filter binaries based on checks below? 

173 excuse.add_package(pkg_id) 

174 

175 # retrieve the testing (if present) and unstable corresponding binary packages 

176 binary_t = packages_t_a[pkg_name] if pkg_name in packages_t_a else None 

177 binary_u = packages_s_a[pkg_name] 

178 

179 # this is the source version for the new binary package 

180 pkgsv = binary_u.source_version 

181 

182 # if the new binary package is architecture-independent, then skip it 

183 if binary_u.architecture == "all": 

184 if pkg_id not in source_t.binaries: 

185 # only add a note if the arch:all does not match the expected version 

186 excuse.add_detailed_info( 

187 "Ignoring %s %s (from %s) as it is arch: all" 

188 % (pkg_name, binary_u.version, pkgsv) 

189 ) 

190 continue 

191 

192 # if the new binary package is not from the same source as the testing one, then skip it 

193 # this implies that this binary migration is part of a source migration 

194 if source_u.version == pkgsv and source_t.version != pkgsv: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true

195 anywrongver = True 

196 excuse.add_verdict_info( 

197 wrong_verdict, 

198 "From wrong source: %s %s (%s not %s)" 

199 % (pkg_name, binary_u.version, pkgsv, source_t.version), 

200 ) 

201 continue 

202 

203 # cruft in unstable 

204 if source_u.version != pkgsv and source_t.version != pkgsv: 

205 if self.options.ignore_cruft: 

206 excuse.add_detailed_info( 

207 "Old cruft: %s %s (but ignoring cruft, so nevermind)" 

208 % (pkg_name, pkgsv) 

209 ) 

210 else: 

211 anywrongver = True 

212 excuse.add_verdict_info( 

213 wrong_verdict, f"Old cruft: {pkg_name} {pkgsv}" 

214 ) 

215 continue 

216 

217 # if the source package has been updated in unstable and this is a binary migration, skip it 

218 # (the binaries are now out-of-date) 

219 if source_t.version == pkgsv and source_t.version != source_u.version: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true

220 anywrongver = True 

221 excuse.add_verdict_info( 

222 wrong_verdict, 

223 "From wrong source: %s %s (%s not %s)" 

224 % (pkg_name, binary_u.version, pkgsv, source_u.version), 

225 ) 

226 continue 

227 

228 # if the binary is not present in testing, then it is a new binary; 

229 # in this case, there is something worth doing 

230 if not binary_t: 

231 excuse.add_detailed_info(f"New binary: {pkg_name} ({binary_u.version})") 

232 anyworthdoing = True 

233 continue 

234 

235 # at this point, the binary package is present in testing, so we can compare 

236 # the versions of the packages ... 

237 vcompare = apt_pkg.version_compare(binary_t.version, binary_u.version) 

238 

239 # ... if updating would mean downgrading, then stop here: there is something wrong 

240 if vcompare > 0: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 anywrongver = True 

242 excuse.add_verdict_info( 

243 wrong_verdict, 

244 "Not downgrading: %s (%s to %s)" 

245 % (pkg_name, binary_t.version, binary_u.version), 

246 ) 

247 break 

248 # ... if updating would mean upgrading, then there is something worth doing 

249 elif vcompare < 0: 

250 excuse.add_detailed_info( 

251 "Updated binary: %s (%s to %s)" 

252 % (pkg_name, binary_t.version, binary_u.version) 

253 ) 

254 anyworthdoing = True 

255 

256 srcv = source_u.version 

257 same_source = source_t.version == srcv 

258 primary_source_suite = self.suite_info.primary_source_suite 

259 is_primary_source = source_suite == primary_source_suite 

260 

261 # if there is nothing wrong and there is something worth doing or the source 

262 # package is not fake, then check what packages should be removed 

263 if not anywrongver and (anyworthdoing or not source_u.is_fakesrc): 

264 # we want to remove binaries that are no longer produced by the 

265 # new source, but there are some special cases: 

266 # - if this is binary-only (same_source) and not from the primary 

267 # source, we don't do any removals: 

268 # binNMUs in *pu on some architectures would otherwise result in 

269 # the removal of binaries on other architectures 

270 # - for the primary source, smooth binaries in the target suite 

271 # are not considered for removal 

272 if not same_source or is_primary_source: 

273 smoothbins = set() 

274 if is_primary_source: 274 ↛ 292line 274 didn't jump to line 292 because the condition on line 274 was always true

275 binaries_t = target_suite.binaries 

276 possible_smooth_updates = [ 

277 p for p in source_t.binaries if p.architecture == arch 

278 ] 

279 smoothbins = find_smooth_updateable_binaries( 

280 possible_smooth_updates, 

281 source_u, 

282 self.pkg_universe, 

283 target_suite, 

284 binaries_t, 

285 source_suite.binaries, 

286 cast(frozenset["BinaryPackageId"], frozenset()), 

287 self.options.smooth_updates, 

288 self.hints, 

289 ) 

290 

291 # for every binary package produced by this source in testing for this architecture 

292 for pkg_id in sorted( 

293 x for x in source_t.binaries if x.architecture == arch 

294 ): 

295 pkg = pkg_id.package_name 

296 # if the package is architecture-independent, then ignore it 

297 tpkg_data = packages_t_a[pkg] 

298 if tpkg_data.architecture == "all": 

299 if pkg_id not in source_u.binaries: 

300 # only add a note if the arch:all does not match the expected version 

301 excuse.add_detailed_info( 

302 "Ignoring removal of %s as it is arch: all" % (pkg) 

303 ) 

304 continue 

305 # if the package is not produced by the new source package, then remove it from testing 

306 if pkg not in packages_s_a: 

307 excuse.add_detailed_info( 

308 f"Removed binary: {pkg} {tpkg_data.version}" 

309 ) 

310 # the removed binary is only interesting if this is a binary-only migration, 

311 # as otherwise the updated source will already cause the binary packages 

312 # to be updated 

313 if same_source and pkg_id not in smoothbins: 

314 # Special-case, if the binary is a candidate for a smooth update, we do not consider 

315 # it "interesting" on its own. This case happens quite often with smooth updatable 

316 # packages, where the old binary "survives" a full run because it still has 

317 # reverse dependencies. 

318 anyworthdoing = True 

319 

320 if not anyworthdoing and not ( 

321 self.options.archall_inconsistency_allowed and excuse.detailed_info 

322 ): 

323 # nothing worth doing, we don't add an excuse to the list, we just return false 

324 return False 

325 

326 if not anyworthdoing: 

327 # This source has binary differences between the target and source 

328 # suite, but we're not going to upgrade them. Part of the purpose 

329 # of options.archall_inconsistency_allowed is to log the excuse 

330 # with a temporary failure such that the administrators can take 

331 # action so they wish. 

332 excuse.policy_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

333 excuse.addreason("everything-ignored") 

334 

335 else: 

336 # there is something worth doing 

337 # we assume that this package will be ok, if not invalidated below 

338 excuse.policy_verdict = PolicyVerdict.PASS 

339 

340 # if there is something something wrong, reject this package 

341 if anywrongver: 

342 excuse.policy_verdict = wrong_verdict 

343 

344 self._policy_engine.apply_srcarch_policies(arch, source_t, source_u, excuse) 

345 

346 self.excuses[excuse.name] = excuse 

347 return excuse.is_valid 

348 

349 def _should_upgrade_src(self, item: MigrationItem) -> bool: 

350 """Check if source package should be upgraded 

351 

352 This method checks if a source package should be upgraded. The analysis 

353 is performed for the source package specified by the `src' parameter, 

354 for the distribution `source_suite'. 

355 

356 It returns False if the given package doesn't need to be upgraded, 

357 True otherwise. In the former case, a new excuse is appended to 

358 the object attribute excuses. 

359 """ 

360 

361 src = item.package 

362 source_suite = item.suite 

363 suite_name = source_suite.name 

364 source_u = source_suite.sources[src] 

365 if source_u.is_fakesrc: 365 ↛ 367line 365 didn't jump to line 367 because the condition on line 365 was never true

366 # it is a fake package created to satisfy Britney implementation details; silently ignore it 

367 return False 

368 

369 target_suite = self.suite_info.target_suite 

370 # retrieve the source packages for testing (if available) and suite 

371 if src in target_suite.sources: 

372 source_t = target_suite.sources[src] 

373 # if testing and unstable have the same version, then this is a candidate for binary-NMUs only 

374 if apt_pkg.version_compare(source_t.version, source_u.version) == 0: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true

375 return False 

376 else: 

377 source_t = None 

378 

379 excuse = Excuse(item) 

380 excuse.set_vers(source_t and source_t.version or None, source_u.version) 

381 if source_u.maintainer: 381 ↛ 383line 381 didn't jump to line 383 because the condition on line 381 was always true

382 excuse.set_maint(source_u.maintainer) 

383 if source_u.section: 383 ↛ 385line 383 didn't jump to line 385 because the condition on line 383 was always true

384 excuse.set_section(source_u.section) 

385 excuse.add_package(PackageId(src, source_u.version, "source")) 

386 

387 # if the version in unstable is older, then stop here with a warning in the excuse and return False 

388 if source_t and apt_pkg.version_compare(source_u.version, source_t.version) < 0: 

389 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

390 excuse.add_verdict_info( 

391 excuse.policy_verdict, 

392 "ALERT: %s is newer in the target suite (%s %s)" 

393 % (src, source_t.version, source_u.version), 

394 ) 

395 self.excuses[excuse.name] = excuse 

396 excuse.addreason("newerintesting") 

397 return False 

398 

399 # the starting point is that we will update the candidate 

400 excuse.policy_verdict = PolicyVerdict.PASS 

401 

402 # if there is a `remove' hint and the requested version is the same as the 

403 # version in testing, then stop here and return False 

404 for hint in self.hints.search("remove", package=src): 

405 if ( 405 ↛ 404line 405 didn't jump to line 404

406 source_t 

407 and source_t.version == hint.version 

408 or source_u.version == hint.version 

409 ): 

410 excuse.add_hint(hint) 

411 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

412 excuse.add_verdict_info( 

413 excuse.policy_verdict, "Removal request by %s" % (hint.user) 

414 ) 

415 excuse.add_verdict_info( 

416 excuse.policy_verdict, "Trying to remove package, not update it" 

417 ) 

418 break 

419 

420 all_binaries = self.all_binaries 

421 

422 # at this point, we check the status of the builds on all the supported architectures 

423 # to catch the out-of-date ones 

424 archs_to_consider = list(self.options.architectures) 

425 archs_to_consider.append("all") 

426 for arch in archs_to_consider: 

427 oodbins: dict[str, set[str]] = {} 

428 uptodatebins = False 

429 # for every binary package produced by this source in the suite for this architecture 

430 if arch == "all": 

431 consider_binaries: Iterable[BinaryPackageId] = source_u.binaries 

432 else: 

433 # Will also include arch:all for the given architecture (they are filtered out 

434 # below) 

435 consider_binaries = sorted( 

436 x for x in source_u.binaries if x.architecture == arch 

437 ) 

438 for pkg_id in consider_binaries: 

439 pkg = pkg_id.package_name 

440 

441 # retrieve the binary package and its source version 

442 binary_u = all_binaries[pkg_id] 

443 pkgsv = binary_u.source_version 

444 

445 # arch:all packages are treated separately from arch:arch 

446 if binary_u.architecture != arch: 

447 continue 

448 

449 # TODO filter binaries based on checks below? 

450 excuse.add_package(pkg_id) 

451 

452 if pkg_id.package_name.endswith("-faux-build-depends"): 

453 continue 

454 

455 # if it wasn't built by the same source, it is out-of-date 

456 # if there is at least one binary on this arch which is 

457 # up-to-date, there is a build on this arch 

458 if source_u.version != pkgsv or pkg_id.architecture == "faux": 

459 if pkgsv not in oodbins: 

460 oodbins[pkgsv] = set() 

461 oodbins[pkgsv].add(pkg) 

462 if pkg_id.architecture != "faux": 

463 excuse.add_old_binary(pkg, pkgsv) 

464 continue 

465 else: 

466 uptodatebins = True 

467 

468 # if there are out-of-date packages, warn about them in the excuse and set excuse.is_valid 

469 # to False to block the update; if the architecture where the package is out-of-date is 

470 # in the `outofsync_arches' list, then do not block the update 

471 if oodbins: 

472 oodtxt = "" 

473 for v in sorted(oodbins): 

474 if oodtxt: 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true

475 oodtxt = oodtxt + "; " 

476 oodtxt = oodtxt + "{} (from {})".format( 

477 ", ".join(sorted(oodbins[v])), 

478 self._get_build_link(arch, src, v, label=v), 

479 ) 

480 

481 if uptodatebins: 

482 text = "Old binaries left on {}: {}".format( 

483 self._get_build_link(arch, src, source_u.version), 

484 oodtxt, 

485 ) 

486 else: 

487 text = "Missing build on %s" % ( 

488 self._get_build_link(arch, src, source_u.version) 

489 ) 

490 

491 if arch in self.options.outofsync_arches: 

492 text = f"{text} (but {arch} isn't keeping up, so nevermind)" 

493 if not uptodatebins: 493 ↛ 426line 493 didn't jump to line 426 because the condition on line 493 was always true

494 excuse.missing_build_on_ood_arch(arch) 

495 else: 

496 if uptodatebins: 

497 if self.options.ignore_cruft: 

498 text = f"{text} (but ignoring cruft, so nevermind)" 

499 excuse.add_detailed_info(text) 

500 else: 

501 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

502 excuse.addreason("cruft") 

503 excuse.add_verdict_info(excuse.policy_verdict, text) 

504 else: 

505 excuse.policy_verdict = ( 

506 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

507 ) 

508 excuse.missing_build_on_arch(arch) 

509 excuse.addreason("missingbuild") 

510 excuse.add_verdict_info(excuse.policy_verdict, text) 

511 if excuse.old_binaries: 

512 excuse.add_detailed_info( 

513 f"old binaries on {arch}: {oodtxt}" 

514 ) 

515 

516 # if the source package has no binaries, set is_valid to False to block the update 

517 if not any( 

518 x 

519 for x in filter_out_faux_gen(source_u.binaries) 

520 if x.architecture != "faux" 

521 ): 

522 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

523 excuse.add_verdict_info( 

524 excuse.policy_verdict, f"{src} has no binaries on any arch" 

525 ) 

526 excuse.addreason("no-binaries") 

527 

528 self._policy_engine.apply_src_policies(source_t, source_u, excuse) 

529 

530 if source_suite.suite_class.is_additional_source and source_t: 

531 # o-o-d(ish) checks for (t-)p-u 

532 # This only makes sense if the package is actually in testing. 

533 for arch in self.options.architectures: 

534 # if the package in testing has no binaries on this 

535 # architecture, it can't be out-of-date 

536 if not any( 

537 x 

538 for x in source_t.binaries 

539 if x.architecture == arch and all_binaries[x].architecture != "all" 

540 ): 

541 continue 

542 

543 # if the (t-)p-u package has produced any binaries on 

544 # this architecture then we assume it's ok. this allows for 

545 # uploads to (t-)p-u which intentionally drop binary 

546 # packages 

547 if any( 

548 x 

549 for x in source_suite.binaries[arch].values() 

550 if x.source == src 

551 and x.source_version == source_u.version 

552 and x.architecture != "all" 

553 ): 

554 continue 

555 

556 # TODO: Find a way to avoid hardcoding pu/stable relation. 

557 if suite_name == "pu": 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 base = "stable" 

559 else: 

560 base = target_suite.name 

561 text = "Not yet built on %s (relative to target suite)" % ( 

562 self._get_build_link(arch, src, source_u.version) 

563 ) 

564 

565 if arch in self.options.outofsync_arches: 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true

566 text = "{text} (but {arch} isn't keeping up, so never mind)" 

567 excuse.missing_build_on_ood_arch(arch) 

568 excuse.addinfo(text) 

569 else: 

570 excuse.policy_verdict = ( 

571 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

572 ) 

573 excuse.missing_build_on_arch(arch) 

574 excuse.addreason("missingbuild") 

575 excuse.add_verdict_info(excuse.policy_verdict, text) 

576 

577 # check if there is a `force' hint for this package, which allows it to go in even if it is not updateable 

578 if ( 

579 force_hint := self.hints.search_first( 

580 "force", package=src, version=source_u.version 

581 ) 

582 ) is not None: 

583 # force() updates the final verdict for us 

584 changed_state = excuse.force() 

585 if changed_state: 

586 excuse.addinfo(f"Should ignore, but forced by {force_hint.user}") 

587 

588 self.excuses[excuse.name] = excuse 

589 return excuse.is_valid 

590 

591 def _compute_excuses_and_initial_actionable_items(self) -> set[MigrationItem]: 

592 # list of local methods and variables (for better performance) 

593 excuses = self.excuses 

594 suite_info = self.suite_info 

595 pri_source_suite = suite_info.primary_source_suite 

596 architectures = self.options.architectures 

597 should_remove_source = self._should_remove_source 

598 should_upgrade_srcarch = self._should_upgrade_srcarch 

599 should_upgrade_src = self._should_upgrade_src 

600 

601 sources_ps = pri_source_suite.sources 

602 sources_t = suite_info.target_suite.sources 

603 

604 # this set will contain the packages which are valid candidates; 

605 # if a package is going to be removed, it will have a "-" prefix 

606 actionable_items: set[MigrationItem] = set() 

607 actionable_items_add = actionable_items.add # Every . in a loop slows it down 

608 

609 # for every source package in testing, check if it should be removed 

610 for pkg in sources_t: 

611 if pkg not in sources_ps: 

612 src_t = sources_t[pkg] 

613 item = MigrationItem( 

614 package=pkg, 

615 version=src_t.version, 

616 suite=suite_info.target_suite, 

617 is_removal=True, 

618 ) 

619 if should_remove_source(item): 

620 actionable_items_add(item) 

621 

622 # for every source package in the source suites, check if it should be upgraded 

623 for suite in chain((pri_source_suite, *suite_info.additional_source_suites)): 

624 sources_s = suite.sources 

625 for pkg in sources_s: 

626 src_s_data = sources_s[pkg] 

627 if src_s_data.is_fakesrc: 

628 continue 

629 src_t_data = sources_t.get(pkg) 

630 

631 if ( 

632 src_t_data is None 

633 or apt_pkg.version_compare(src_s_data.version, src_t_data.version) 

634 != 0 

635 ): 

636 item = MigrationItem( 

637 package=pkg, version=src_s_data.version, suite=suite 

638 ) 

639 # check if the source package should be upgraded 

640 if should_upgrade_src(item): 

641 actionable_items_add(item) 

642 else: 

643 # package has same version in source and target suite; check if any of the 

644 # binaries have changed on the various architectures 

645 for arch in architectures: 

646 item = MigrationItem( 

647 package=pkg, 

648 version=src_s_data.version, 

649 architecture=arch, 

650 suite=suite, 

651 ) 

652 if should_upgrade_srcarch(item): 

653 actionable_items_add(item) 

654 

655 # process the `remove' hints, if the given package is not yet in actionable_items 

656 for hint in self.hints["remove"]: 

657 src_r = hint.package 

658 if src_r not in sources_t: 

659 continue 

660 

661 existing_items = {x for x in actionable_items if x.package == src_r} 

662 if existing_items: 

663 self.logger.info( 

664 "removal hint '%s' ignored due to existing item(s) %s" 

665 % (hint, [i.name for i in existing_items]) 

666 ) 

667 continue 

668 

669 tsrcv = sources_t[src_r].version 

670 item = MigrationItem( 

671 package=src_r, 

672 version=tsrcv, 

673 suite=suite_info.target_suite, 

674 is_removal=True, 

675 ) 

676 

677 # check if the version specified in the hint is the same as the considered package 

678 if tsrcv != hint.version: 678 ↛ 679line 678 didn't jump to line 679 because the condition on line 678 was never true

679 continue 

680 

681 # add the removal of the package to actionable_items and build a new excuse 

682 excuse = Excuse(item) 

683 excuse.set_vers(tsrcv, None) 

684 excuse.addinfo("Removal request by %s" % (hint.user)) 

685 # if the removal of the package is blocked, skip it 

686 blocked = False 

687 if ( 

688 blockhint := self.hints.search_first( 

689 "block", package=src_r, removal=True 

690 ) 

691 ) is not None: 

692 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

693 excuse.add_verdict_info( 

694 excuse.policy_verdict, 

695 "Not removing package, due to block hint by %s " 

696 "(contact %s-release if update is needed)" 

697 % (blockhint.user, self.options.distribution), 

698 ) 

699 excuse.addreason("block") 

700 blocked = True 

701 

702 if blocked: 

703 excuses[excuse.name] = excuse 

704 continue 

705 

706 actionable_items_add(item) 

707 excuse.addinfo("Package is broken, will try to remove") 

708 excuse.add_hint(hint) 

709 # Using "PASS" here as "Created by a hint" != "accepted due to hint". In a future 

710 # where there might be policy checks on removals, it would make sense to distinguish 

711 # those two states. Not sure that future will ever be. 

712 excuse.policy_verdict = PolicyVerdict.PASS 

713 excuses[excuse.name] = excuse 

714 

715 return actionable_items 

716 

717 def find_actionable_excuses(self) -> tuple[dict[str, Excuse], set[MigrationItem]]: 

718 excuses = self.excuses 

719 actionable_items = self._compute_excuses_and_initial_actionable_items() 

720 valid = {x.name for x in actionable_items} 

721 

722 # extract the not considered packages, which are in the excuses but not in upgrade_me 

723 unconsidered = {ename for ename in excuses if ename not in valid} 

724 invalidated: set[str] = set() 

725 

726 invalidate_excuses(excuses, valid, unconsidered, invalidated) 

727 

728 # check that the list of actionable items matches the list of valid 

729 # excuses 

730 assert_sets_equal(valid, {x for x in excuses if excuses[x].is_valid}) 

731 

732 # check that the rdeps for all invalid excuses were invalidated 

733 assert_sets_equal(invalidated, {x for x in excuses if not excuses[x].is_valid}) 

734 

735 actionable_items = {x for x in actionable_items if x.name in valid} 

736 return excuses, actionable_items 

737 

738 

739def assert_sets_equal(a: Any, b: Any) -> None: 

740 if a != b: 740 ↛ 741line 740 didn't jump to line 741 because the condition on line 740 was never true

741 raise AssertionError(f"sets not equal a-b {a - b} b-a {b - a}")