Coverage for britney2/excusefinder.py: 89%

344 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-10-17 17:32 +0000

1import logging 

2import optparse 

3from collections.abc import Iterable 

4from itertools import chain 

5from typing import TYPE_CHECKING, Any, Optional, cast 

6from urllib.parse import quote 

7 

8import apt_pkg 

9 

10from britney2 import BinaryPackage, BinaryPackageId, PackageId, Suites 

11from britney2.excuse import Excuse 

12from britney2.migrationitem import MigrationItem, MigrationItemFactory 

13from britney2.policies import PolicyVerdict 

14from britney2.utils import find_smooth_updateable_binaries, invalidate_excuses 

15 

16if TYPE_CHECKING: 16 ↛ 17line 16 didn't jump to line 17 because the condition on line 16 was never true

17 from .hints import HintCollection 

18 from .installability.universe import BinaryPackageUniverse 

19 from .policies.policy import PolicyEngine 

20 

21 

22class ExcuseFinder: 

23 

24 def __init__( 

25 self, 

26 options: optparse.Values, 

27 suite_info: Suites, 

28 all_binaries: dict[BinaryPackageId, BinaryPackage], 

29 pkg_universe: "BinaryPackageUniverse", 

30 policy_engine: "PolicyEngine", 

31 mi_factory: MigrationItemFactory, 

32 hints: "HintCollection", 

33 ) -> None: 

34 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

35 self.logger = logging.getLogger(logger_name) 

36 self.options = options 

37 self.suite_info = suite_info 

38 self.all_binaries = all_binaries 

39 self.pkg_universe = pkg_universe 

40 self._policy_engine = policy_engine 

41 self._migration_item_factory = mi_factory 

42 self.hints = hints 

43 self.excuses: dict[str, Excuse] = {} 

44 

45 def _get_build_link( 

46 self, arch: str, src: str, ver: str, label: str | None = None 

47 ) -> str: 

48 """Return a link to the build logs, labelled 'arch' per default""" 

49 if label is None: 

50 label = arch 

51 if self.options.build_url: 51 ↛ 57line 51 didn't jump to line 57 because the condition on line 51 was always true

52 url = self.options.build_url.format( 

53 arch=arch, source=quote(src), version=quote(ver) 

54 ) 

55 return f'<a href="{url}" target="_blank">{label}</a>' 

56 else: 

57 return label 

58 

59 def _should_remove_source(self, item: MigrationItem) -> bool: 

60 """Check if a source package should be removed from testing 

61 

62 This method checks if a source package should be removed from the 

63 target suite; this happens if the source package is not 

64 present in the primary source suite anymore. 

65 

66 It returns True if the package can be removed, False otherwise. 

67 In the former case, a new excuse is appended to the object 

68 attribute excuses. 

69 """ 

70 if hasattr(self.options, "partial_source"): 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 return False 

72 # if the source package is available in unstable, then do nothing 

73 source_suite = self.suite_info.primary_source_suite 

74 pkg = item.package 

75 if pkg in source_suite.sources: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true

76 return False 

77 # otherwise, add a new excuse for its removal 

78 src = item.suite.sources[pkg] 

79 excuse = Excuse(item) 

80 excuse.addinfo("Package not in %s, will try to remove" % source_suite.name) 

81 excuse.set_vers(src.version, None) 

82 if src.maintainer: 

83 excuse.set_maint(src.maintainer) 

84 if src.section: 84 ↛ 88line 84 didn't jump to line 88 because the condition on line 84 was always true

85 excuse.set_section(src.section) 

86 

87 # if the package is blocked, skip it 

88 for hint in self.hints.search("block", package=pkg, removal=True): 

89 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

90 excuse.add_verdict_info( 

91 excuse.policy_verdict, 

92 "Not touching package, as requested by %s " 

93 "(contact %s-release if update is needed)" 

94 % (hint.user, self.options.distribution), 

95 ) 

96 excuse.addreason("block") 

97 self.excuses[excuse.name] = excuse 

98 return False 

99 

100 excuse.policy_verdict = PolicyVerdict.PASS 

101 self.excuses[excuse.name] = excuse 

102 return True 

103 

104 def _should_upgrade_srcarch(self, item: MigrationItem) -> bool: 

105 """Check if a set of binary packages should be upgraded 

106 

107 This method checks if the binary packages produced by the source 

108 package on the given architecture should be upgraded; this can 

109 happen also if the migration is a binary-NMU for the given arch. 

110 

111 It returns False if the given packages don't need to be upgraded, 

112 True otherwise. In the former case, a new excuse is appended to 

113 the object attribute excuses. 

114 """ 

115 # retrieve the source packages for testing and suite 

116 

117 target_suite = self.suite_info.target_suite 

118 source_suite = item.suite 

119 src = item.package 

120 arch = item.architecture 

121 source_t = target_suite.sources[src] 

122 source_u = source_suite.sources[src] 

123 

124 excuse = Excuse(item) 

125 excuse.set_vers(source_t.version, source_t.version) 

126 if source_u.maintainer: 126 ↛ 128line 126 didn't jump to line 128 because the condition on line 126 was always true

127 excuse.set_maint(source_u.maintainer) 

128 if source_u.section: 128 ↛ 135line 128 didn't jump to line 135 because the condition on line 128 was always true

129 excuse.set_section(source_u.section) 

130 

131 # if there is a `remove' hint and the requested version is the same as the 

132 # version in testing, then stop here and return False 

133 # (as a side effect, a removal may generate such excuses for both the source 

134 # package and its binary packages on each architecture) 

135 for hint in self.hints.search("remove", package=src, version=source_t.version): 

136 excuse.add_hint(hint) 

137 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

138 excuse.add_verdict_info( 

139 excuse.policy_verdict, "Removal request by %s" % (hint.user) 

140 ) 

141 excuse.add_verdict_info( 

142 excuse.policy_verdict, "Trying to remove package, not update it" 

143 ) 

144 self.excuses[excuse.name] = excuse 

145 return False 

146 

147 # the starting point is that there is nothing wrong and nothing worth doing 

148 anywrongver = False 

149 anyworthdoing = False 

150 

151 packages_t_a = target_suite.binaries[arch] 

152 packages_s_a = source_suite.binaries[arch] 

153 

154 wrong_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

155 

156 # for every binary package produced by this source in unstable for this architecture 

157 for pkg_id in sorted(x for x in source_u.binaries if x.architecture == arch): 

158 pkg_name = pkg_id.package_name 

159 # TODO filter binaries based on checks below? 

160 excuse.add_package(pkg_id) 

161 

162 # retrieve the testing (if present) and unstable corresponding binary packages 

163 binary_t = packages_t_a[pkg_name] if pkg_name in packages_t_a else None 

164 binary_u = packages_s_a[pkg_name] 

165 

166 # this is the source version for the new binary package 

167 pkgsv = binary_u.source_version 

168 

169 # if the new binary package is architecture-independent, then skip it 

170 if binary_u.architecture == "all": 

171 if pkg_id not in source_t.binaries: 

172 # only add a note if the arch:all does not match the expected version 

173 excuse.add_detailed_info( 

174 "Ignoring %s %s (from %s) as it is arch: all" 

175 % (pkg_name, binary_u.version, pkgsv) 

176 ) 

177 continue 

178 

179 # if the new binary package is not from the same source as the testing one, then skip it 

180 # this implies that this binary migration is part of a source migration 

181 if source_u.version == pkgsv and source_t.version != pkgsv: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 anywrongver = True 

183 excuse.add_verdict_info( 

184 wrong_verdict, 

185 "From wrong source: %s %s (%s not %s)" 

186 % (pkg_name, binary_u.version, pkgsv, source_t.version), 

187 ) 

188 continue 

189 

190 # cruft in unstable 

191 if source_u.version != pkgsv and source_t.version != pkgsv: 

192 if self.options.ignore_cruft: 

193 excuse.add_detailed_info( 

194 "Old cruft: %s %s (but ignoring cruft, so nevermind)" 

195 % (pkg_name, pkgsv) 

196 ) 

197 else: 

198 anywrongver = True 

199 excuse.add_verdict_info( 

200 wrong_verdict, f"Old cruft: {pkg_name} {pkgsv}" 

201 ) 

202 continue 

203 

204 # if the source package has been updated in unstable and this is a binary migration, skip it 

205 # (the binaries are now out-of-date) 

206 if source_t.version == pkgsv and source_t.version != source_u.version: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true

207 anywrongver = True 

208 excuse.add_verdict_info( 

209 wrong_verdict, 

210 "From wrong source: %s %s (%s not %s)" 

211 % (pkg_name, binary_u.version, pkgsv, source_u.version), 

212 ) 

213 continue 

214 

215 # if the binary is not present in testing, then it is a new binary; 

216 # in this case, there is something worth doing 

217 if not binary_t: 

218 excuse.add_detailed_info(f"New binary: {pkg_name} ({binary_u.version})") 

219 anyworthdoing = True 

220 continue 

221 

222 # at this point, the binary package is present in testing, so we can compare 

223 # the versions of the packages ... 

224 vcompare = apt_pkg.version_compare(binary_t.version, binary_u.version) 

225 

226 # ... if updating would mean downgrading, then stop here: there is something wrong 

227 if vcompare > 0: 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true

228 anywrongver = True 

229 excuse.add_verdict_info( 

230 wrong_verdict, 

231 "Not downgrading: %s (%s to %s)" 

232 % (pkg_name, binary_t.version, binary_u.version), 

233 ) 

234 break 

235 # ... if updating would mean upgrading, then there is something worth doing 

236 elif vcompare < 0: 

237 excuse.add_detailed_info( 

238 "Updated binary: %s (%s to %s)" 

239 % (pkg_name, binary_t.version, binary_u.version) 

240 ) 

241 anyworthdoing = True 

242 

243 srcv = source_u.version 

244 same_source = source_t.version == srcv 

245 primary_source_suite = self.suite_info.primary_source_suite 

246 is_primary_source = source_suite == primary_source_suite 

247 

248 # if there is nothing wrong and there is something worth doing or the source 

249 # package is not fake, then check what packages should be removed 

250 if not anywrongver and (anyworthdoing or not source_u.is_fakesrc): 

251 # we want to remove binaries that are no longer produced by the 

252 # new source, but there are some special cases: 

253 # - if this is binary-only (same_source) and not from the primary 

254 # source, we don't do any removals: 

255 # binNMUs in *pu on some architectures would otherwise result in 

256 # the removal of binaries on other architectures 

257 # - for the primary source, smooth binaries in the target suite 

258 # are not considered for removal 

259 if not same_source or is_primary_source: 

260 smoothbins = set() 

261 if is_primary_source: 261 ↛ 279line 261 didn't jump to line 279 because the condition on line 261 was always true

262 binaries_t = target_suite.binaries 

263 possible_smooth_updates = [ 

264 p for p in source_t.binaries if p.architecture == arch 

265 ] 

266 smoothbins = find_smooth_updateable_binaries( 

267 possible_smooth_updates, 

268 source_u, 

269 self.pkg_universe, 

270 target_suite, 

271 binaries_t, 

272 source_suite.binaries, 

273 cast(frozenset["BinaryPackageId"], frozenset()), 

274 self.options.smooth_updates, 

275 self.hints, 

276 ) 

277 

278 # for every binary package produced by this source in testing for this architecture 

279 for pkg_id in sorted( 

280 x for x in source_t.binaries if x.architecture == arch 

281 ): 

282 pkg = pkg_id.package_name 

283 # if the package is architecture-independent, then ignore it 

284 tpkg_data = packages_t_a[pkg] 

285 if tpkg_data.architecture == "all": 

286 if pkg_id not in source_u.binaries: 

287 # only add a note if the arch:all does not match the expected version 

288 excuse.add_detailed_info( 

289 "Ignoring removal of %s as it is arch: all" % (pkg) 

290 ) 

291 continue 

292 # if the package is not produced by the new source package, then remove it from testing 

293 if pkg not in packages_s_a: 

294 excuse.add_detailed_info( 

295 f"Removed binary: {pkg} {tpkg_data.version}" 

296 ) 

297 # the removed binary is only interesting if this is a binary-only migration, 

298 # as otherwise the updated source will already cause the binary packages 

299 # to be updated 

300 if same_source and pkg_id not in smoothbins: 

301 # Special-case, if the binary is a candidate for a smooth update, we do not consider 

302 # it "interesting" on its own. This case happens quite often with smooth updatable 

303 # packages, where the old binary "survives" a full run because it still has 

304 # reverse dependencies. 

305 anyworthdoing = True 

306 

307 if not anyworthdoing and not ( 

308 self.options.archall_inconsistency_allowed and excuse.detailed_info 

309 ): 

310 # nothing worth doing, we don't add an excuse to the list, we just return false 

311 return False 

312 

313 if not anyworthdoing: 313 ↛ 319line 313 didn't jump to line 319 because the condition on line 313 was never true

314 # This source has binary differences between the target and source 

315 # suite, but we're not going to upgrade them. Part of the purpose 

316 # of options.archall_inconsistency_allowed is to log the excuse 

317 # with a temporary failure such that the administrators can take 

318 # action so they wish. 

319 excuse.policy_verdict = PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

320 excuse.addreason("everything-ignored") 

321 

322 else: 

323 # there is something worth doing 

324 # we assume that this package will be ok, if not invalidated below 

325 excuse.policy_verdict = PolicyVerdict.PASS 

326 

327 # if there is something something wrong, reject this package 

328 if anywrongver: 

329 excuse.policy_verdict = wrong_verdict 

330 

331 self._policy_engine.apply_srcarch_policies( 

332 item, arch, source_t, source_u, excuse 

333 ) 

334 

335 self.excuses[excuse.name] = excuse 

336 return excuse.is_valid 

337 

338 def _should_upgrade_src(self, item: MigrationItem) -> bool: 

339 """Check if source package should be upgraded 

340 

341 This method checks if a source package should be upgraded. The analysis 

342 is performed for the source package specified by the `src' parameter, 

343 for the distribution `source_suite'. 

344 

345 It returns False if the given package doesn't need to be upgraded, 

346 True otherwise. In the former case, a new excuse is appended to 

347 the object attribute excuses. 

348 """ 

349 

350 src = item.package 

351 source_suite = item.suite 

352 suite_name = source_suite.name 

353 source_u = source_suite.sources[src] 

354 if source_u.is_fakesrc: 354 ↛ 356line 354 didn't jump to line 356 because the condition on line 354 was never true

355 # it is a fake package created to satisfy Britney implementation details; silently ignore it 

356 return False 

357 

358 target_suite = self.suite_info.target_suite 

359 # retrieve the source packages for testing (if available) and suite 

360 if src in target_suite.sources: 

361 source_t = target_suite.sources[src] 

362 # if testing and unstable have the same version, then this is a candidate for binary-NMUs only 

363 if apt_pkg.version_compare(source_t.version, source_u.version) == 0: 363 ↛ 364line 363 didn't jump to line 364 because the condition on line 363 was never true

364 return False 

365 else: 

366 source_t = None 

367 

368 excuse = Excuse(item) 

369 excuse.set_vers(source_t and source_t.version or None, source_u.version) 

370 if source_u.maintainer: 370 ↛ 372line 370 didn't jump to line 372 because the condition on line 370 was always true

371 excuse.set_maint(source_u.maintainer) 

372 if source_u.section: 372 ↛ 374line 372 didn't jump to line 374 because the condition on line 372 was always true

373 excuse.set_section(source_u.section) 

374 excuse.add_package(PackageId(src, source_u.version, "source")) 

375 

376 # if the version in unstable is older, then stop here with a warning in the excuse and return False 

377 if source_t and apt_pkg.version_compare(source_u.version, source_t.version) < 0: 

378 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

379 excuse.add_verdict_info( 

380 excuse.policy_verdict, 

381 "ALERT: %s is newer in the target suite (%s %s)" 

382 % (src, source_t.version, source_u.version), 

383 ) 

384 self.excuses[excuse.name] = excuse 

385 excuse.addreason("newerintesting") 

386 return False 

387 

388 # the starting point is that we will update the candidate 

389 excuse.policy_verdict = PolicyVerdict.PASS 

390 

391 # if there is a `remove' hint and the requested version is the same as the 

392 # version in testing, then stop here and return False 

393 for hint in self.hints.search("remove", package=src): 

394 if ( 394 ↛ 393line 394 didn't jump to line 393

395 source_t 

396 and source_t.version == hint.version 

397 or source_u.version == hint.version 

398 ): 

399 excuse.add_hint(hint) 

400 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

401 excuse.add_verdict_info( 

402 excuse.policy_verdict, "Removal request by %s" % (hint.user) 

403 ) 

404 excuse.add_verdict_info( 

405 excuse.policy_verdict, "Trying to remove package, not update it" 

406 ) 

407 break 

408 

409 all_binaries = self.all_binaries 

410 

411 # at this point, we check the status of the builds on all the supported architectures 

412 # to catch the out-of-date ones 

413 archs_to_consider = list(self.options.architectures) 

414 archs_to_consider.append("all") 

415 for arch in archs_to_consider: 

416 oodbins: dict[str, set[str]] = {} 

417 uptodatebins = False 

418 # for every binary package produced by this source in the suite for this architecture 

419 if arch == "all": 

420 consider_binaries: Iterable[BinaryPackageId] = source_u.binaries 

421 else: 

422 # Will also include arch:all for the given architecture (they are filtered out 

423 # below) 

424 consider_binaries = sorted( 

425 x for x in source_u.binaries if x.architecture == arch 

426 ) 

427 for pkg_id in consider_binaries: 

428 pkg = pkg_id.package_name 

429 

430 # retrieve the binary package and its source version 

431 binary_u = all_binaries[pkg_id] 

432 pkgsv = binary_u.source_version 

433 

434 # arch:all packages are treated separately from arch:arch 

435 if binary_u.architecture != arch: 

436 continue 

437 

438 # TODO filter binaries based on checks below? 

439 excuse.add_package(pkg_id) 

440 

441 # if it wasn't built by the same source, it is out-of-date 

442 # if there is at least one binary on this arch which is 

443 # up-to-date, there is a build on this arch 

444 if source_u.version != pkgsv or pkg_id.architecture == "faux": 

445 if pkgsv not in oodbins: 

446 oodbins[pkgsv] = set() 

447 oodbins[pkgsv].add(pkg) 

448 if pkg_id.architecture != "faux": 448 ↛ 450line 448 didn't jump to line 450 because the condition on line 448 was always true

449 excuse.add_old_binary(pkg, pkgsv) 

450 continue 

451 else: 

452 uptodatebins = True 

453 

454 # if there are out-of-date packages, warn about them in the excuse and set excuse.is_valid 

455 # to False to block the update; if the architecture where the package is out-of-date is 

456 # in the `outofsync_arches' list, then do not block the update 

457 if oodbins: 

458 oodtxt = "" 

459 for v in sorted(oodbins): 

460 if oodtxt: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true

461 oodtxt = oodtxt + "; " 

462 oodtxt = oodtxt + "{} (from {})".format( 

463 ", ".join(sorted(oodbins[v])), 

464 self._get_build_link(arch, src, v, label=v), 

465 ) 

466 

467 if uptodatebins: 

468 text = "old binaries left on {}: {}".format( 

469 self._get_build_link(arch, src, source_u.version), 

470 oodtxt, 

471 ) 

472 else: 

473 text = "missing build on %s" % ( 

474 self._get_build_link(arch, src, source_u.version) 

475 ) 

476 

477 if arch in self.options.outofsync_arches: 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true

478 text = text + " (but %s isn't keeping up, so nevermind)" % (arch) 

479 if not uptodatebins: 

480 excuse.missing_build_on_ood_arch(arch) 

481 else: 

482 if uptodatebins: 

483 if self.options.ignore_cruft: 

484 text = text + " (but ignoring cruft, so nevermind)" 

485 excuse.add_detailed_info(text) 

486 else: 

487 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

488 excuse.addreason("cruft") 

489 excuse.add_verdict_info(excuse.policy_verdict, text) 

490 else: 

491 excuse.policy_verdict = ( 

492 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

493 ) 

494 excuse.missing_build_on_arch(arch) 

495 excuse.addreason("missingbuild") 

496 excuse.add_verdict_info(excuse.policy_verdict, text) 

497 if excuse.old_binaries: 497 ↛ 415line 497 didn't jump to line 415 because the condition on line 497 was always true

498 excuse.add_detailed_info( 

499 f"old binaries on {arch}: {oodtxt}" 

500 ) 

501 

502 # if the source package has no binaries, set is_valid to False to block the update 

503 if not {x for x in source_u.binaries if x[2] != "faux"}: 

504 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

505 excuse.add_verdict_info( 

506 excuse.policy_verdict, "%s has no binaries on any arch" % src 

507 ) 

508 excuse.addreason("no-binaries") 

509 

510 self._policy_engine.apply_src_policies(item, source_t, source_u, excuse) 

511 

512 if source_suite.suite_class.is_additional_source and source_t: 

513 # o-o-d(ish) checks for (t-)p-u 

514 # This only makes sense if the package is actually in testing. 

515 for arch in self.options.architectures: 

516 # if the package in testing has no binaries on this 

517 # architecture, it can't be out-of-date 

518 if not any( 

519 x 

520 for x in source_t.binaries 

521 if x.architecture == arch and all_binaries[x].architecture != "all" 

522 ): 

523 continue 

524 

525 # if the (t-)p-u package has produced any binaries on 

526 # this architecture then we assume it's ok. this allows for 

527 # uploads to (t-)p-u which intentionally drop binary 

528 # packages 

529 if any( 

530 x 

531 for x in source_suite.binaries[arch].values() 

532 if x.source == src 

533 and x.source_version == source_u.version 

534 and x.architecture != "all" 

535 ): 

536 continue 

537 

538 # TODO: Find a way to avoid hardcoding pu/stable relation. 

539 if suite_name == "pu": 539 ↛ 540line 539 didn't jump to line 540 because the condition on line 539 was never true

540 base = "stable" 

541 else: 

542 base = target_suite.name 

543 text = "Not yet built on %s (relative to target suite)" % ( 

544 self._get_build_link(arch, src, source_u.version) 

545 ) 

546 

547 if arch in self.options.outofsync_arches: 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true

548 text = text + " (but %s isn't keeping up, so never mind)" % (arch) 

549 excuse.missing_build_on_ood_arch(arch) 

550 excuse.addinfo(text) 

551 else: 

552 excuse.policy_verdict = ( 

553 PolicyVerdict.REJECTED_CANNOT_DETERMINE_IF_PERMANENT 

554 ) 

555 excuse.missing_build_on_arch(arch) 

556 excuse.addreason("missingbuild") 

557 excuse.add_verdict_info(excuse.policy_verdict, text) 

558 

559 # check if there is a `force' hint for this package, which allows it to go in even if it is not updateable 

560 forces = self.hints.search("force", package=src, version=source_u.version) 

561 if forces: 

562 # force() updates the final verdict for us 

563 changed_state = excuse.force() 

564 if changed_state: 

565 excuse.addinfo("Should ignore, but forced by %s" % (forces[0].user)) 

566 

567 self.excuses[excuse.name] = excuse 

568 return excuse.is_valid 

569 

570 def _compute_excuses_and_initial_actionable_items(self) -> set[MigrationItem]: 

571 # list of local methods and variables (for better performance) 

572 excuses = self.excuses 

573 suite_info = self.suite_info 

574 pri_source_suite = suite_info.primary_source_suite 

575 architectures = self.options.architectures 

576 should_remove_source = self._should_remove_source 

577 should_upgrade_srcarch = self._should_upgrade_srcarch 

578 should_upgrade_src = self._should_upgrade_src 

579 

580 sources_ps = pri_source_suite.sources 

581 sources_t = suite_info.target_suite.sources 

582 

583 # this set will contain the packages which are valid candidates; 

584 # if a package is going to be removed, it will have a "-" prefix 

585 actionable_items: set[MigrationItem] = set() 

586 actionable_items_add = actionable_items.add # Every . in a loop slows it down 

587 

588 # for every source package in testing, check if it should be removed 

589 for pkg in sources_t: 

590 if pkg not in sources_ps: 

591 src_t = sources_t[pkg] 

592 item = MigrationItem( 

593 package=pkg, 

594 version=src_t.version, 

595 suite=suite_info.target_suite, 

596 is_removal=True, 

597 ) 

598 if should_remove_source(item): 

599 actionable_items_add(item) 

600 

601 # for every source package in the source suites, check if it should be upgraded 

602 for suite in chain((pri_source_suite, *suite_info.additional_source_suites)): 

603 sources_s = suite.sources 

604 for pkg in sources_s: 

605 src_s_data = sources_s[pkg] 

606 if src_s_data.is_fakesrc: 

607 continue 

608 src_t_data = sources_t.get(pkg) 

609 

610 if ( 

611 src_t_data is None 

612 or apt_pkg.version_compare(src_s_data.version, src_t_data.version) 

613 != 0 

614 ): 

615 item = MigrationItem( 

616 package=pkg, version=src_s_data.version, suite=suite 

617 ) 

618 # check if the source package should be upgraded 

619 if should_upgrade_src(item): 

620 actionable_items_add(item) 

621 else: 

622 # package has same version in source and target suite; check if any of the 

623 # binaries have changed on the various architectures 

624 for arch in architectures: 

625 item = MigrationItem( 

626 package=pkg, 

627 version=src_s_data.version, 

628 architecture=arch, 

629 suite=suite, 

630 ) 

631 if should_upgrade_srcarch(item): 

632 actionable_items_add(item) 

633 

634 # process the `remove' hints, if the given package is not yet in actionable_items 

635 for hint in self.hints["remove"]: 

636 src_r = hint.package 

637 if src_r not in sources_t: 

638 continue 

639 

640 existing_items = {x for x in actionable_items if x.package == src_r} 

641 if existing_items: 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true

642 self.logger.info( 

643 "removal hint '%s' ignored due to existing item(s) %s" 

644 % (hint, [i.name for i in existing_items]) 

645 ) 

646 continue 

647 

648 tsrcv = sources_t[src_r].version 

649 item = MigrationItem( 

650 package=src_r, 

651 version=tsrcv, 

652 suite=suite_info.target_suite, 

653 is_removal=True, 

654 ) 

655 

656 # check if the version specified in the hint is the same as the considered package 

657 if tsrcv != hint.version: 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true

658 continue 

659 

660 # add the removal of the package to actionable_items and build a new excuse 

661 excuse = Excuse(item) 

662 excuse.set_vers(tsrcv, None) 

663 excuse.addinfo("Removal request by %s" % (hint.user)) 

664 # if the removal of the package is blocked, skip it 

665 blocked = False 

666 for blockhint in self.hints.search("block", package=src_r, removal=True): 

667 excuse.policy_verdict = PolicyVerdict.REJECTED_PERMANENTLY 

668 excuse.add_verdict_info( 

669 excuse.policy_verdict, 

670 "Not removing package, due to block hint by %s " 

671 "(contact %s-release if update is needed)" 

672 % (blockhint.user, self.options.distribution), 

673 ) 

674 excuse.addreason("block") 

675 blocked = True 

676 

677 if blocked: 

678 excuses[excuse.name] = excuse 

679 continue 

680 

681 actionable_items_add(item) 

682 excuse.addinfo("Package is broken, will try to remove") 

683 excuse.add_hint(hint) 

684 # Using "PASS" here as "Created by a hint" != "accepted due to hint". In a future 

685 # where there might be policy checks on removals, it would make sense to distinguish 

686 # those two states. Not sure that future will ever be. 

687 excuse.policy_verdict = PolicyVerdict.PASS 

688 excuses[excuse.name] = excuse 

689 

690 return actionable_items 

691 

692 def find_actionable_excuses(self) -> tuple[dict[str, Excuse], set[MigrationItem]]: 

693 excuses = self.excuses 

694 actionable_items = self._compute_excuses_and_initial_actionable_items() 

695 valid = {x.name for x in actionable_items} 

696 

697 # extract the not considered packages, which are in the excuses but not in upgrade_me 

698 unconsidered = {ename for ename in excuses if ename not in valid} 

699 invalidated: set[str] = set() 

700 

701 invalidate_excuses(excuses, valid, unconsidered, invalidated) 

702 

703 # check that the list of actionable items matches the list of valid 

704 # excuses 

705 assert_sets_equal(valid, {x for x in excuses if excuses[x].is_valid}) 

706 

707 # check that the rdeps for all invalid excuses were invalidated 

708 assert_sets_equal(invalidated, {x for x in excuses if not excuses[x].is_valid}) 

709 

710 actionable_items = {x for x in actionable_items if x.name in valid} 

711 return excuses, actionable_items 

712 

713 

714def assert_sets_equal(a: Any, b: Any) -> None: 

715 if a != b: 715 ↛ 716line 715 didn't jump to line 716 because the condition on line 715 was never true

716 raise AssertionError(f"sets not equal a-b {a - b} b-a {b - a}")