Coverage for britney2/utils.py: 91%

464 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-01-29 17:21 +0000

1# Refactored parts from britney.py, which is/was: 

2# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org> 

3# Andreas Barth <aba@debian.org> 

4# Fabio Tranchitella <kobold@debian.org> 

5# Copyright (C) 2010-2012 Adam D. Barratt <adsb@debian.org> 

6# Copyright (C) 2012 Niels Thykier <niels@thykier.net> 

7# 

8# New portions 

9# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org> 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21 

22import errno 

23import logging 

24import optparse 

25import os 

26import sys 

27import time 

28from collections import defaultdict 

29from collections.abc import Callable, Container, Iterable, Iterator, Mapping, MutableSet 

30from datetime import UTC, datetime 

31from functools import partial 

32from itertools import chain, filterfalse 

33from typing import ( 

34 IO, 

35 TYPE_CHECKING, 

36 Any, 

37 Literal, 

38 Protocol, 

39 TypeVar, 

40 Union, 

41 cast, 

42 overload, 

43) 

44 

45import apt_pkg 

46import yaml 

47 

48from britney2 import ( 

49 BinaryPackage, 

50 BinaryPackageId, 

51 PackageId, 

52 SourcePackage, 

53 Suite, 

54 SuiteClass, 

55 Suites, 

56 TargetSuite, 

57) 

58from britney2.excusedeps import DependencyState, ImpossibleDependencyState 

59from britney2.policies import PolicyVerdict 

60 

61if TYPE_CHECKING: 61 ↛ 63line 61 didn't jump to line 63 because the condition on line 61 was never true

62 

63 from _typeshed import SupportsRichComparisonT 

64 from apt_pkg import TagSection 

65 

66 from .excuse import Excuse 

67 from .hints import HintCollection 

68 from .installability.universe import BinaryPackageUniverse 

69 from .migrationitem import MigrationItem, MigrationItemFactory 

70 

71_T = TypeVar("_T") 

72 

73 

74class MigrationConstraintException(Exception): 

75 pass 

76 

77 

78@overload 

79def ifilter_except( 79 ↛ exitline 79 didn't jump to the function exit

80 container: Container[_T], iterable: Literal[None] = None 

81) -> "partial[filterfalse[_T]]": ... 

82 

83 

84@overload 

85def ifilter_except( 85 ↛ exitline 85 didn't jump to the function exit

86 container: Container[_T], iterable: Iterable[_T] 

87) -> "filterfalse[_T]": ... 

88 

89 

90def ifilter_except( 

91 container: Container[_T], iterable: Iterable[_T] | None = None 

92) -> Union["filterfalse[_T]", "partial[filterfalse[_T]]"]: 

93 """Filter out elements in container 

94 

95 If given an iterable it returns a filtered iterator, otherwise it 

96 returns a function to generate filtered iterators. The latter is 

97 useful if the same filter has to be (re-)used on multiple 

98 iterators that are not known on beforehand. 

99 """ 

100 if iterable is not None: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true

101 return filterfalse(container.__contains__, iterable) 

102 return cast( 

103 "partial[filterfalse[_T]]", partial(filterfalse, container.__contains__) 

104 ) 

105 

106 

107@overload 

108def ifilter_only(container: Container[_T], iterable: Iterable[_T]) -> "filter[_T]": ... 108 ↛ exitline 108 didn't return from function 'ifilter_only' because

109 

110 

111@overload 

112def ifilter_only(container: Container[_T], iterable: None) -> "partial[filter[_T]]": ... 112 ↛ exitline 112 didn't return from function 'ifilter_only' because

113 

114 

115def ifilter_only( 

116 container: Container[_T], iterable: Iterable[_T] | None = None 

117) -> Union["filter[_T]", "partial[filter[_T]]"]: 

118 """Filter out elements in which are not in container 

119 

120 If given an iterable it returns a filtered iterator, otherwise it 

121 returns a function to generate filtered iterators. The latter is 

122 useful if the same filter has to be (re-)used on multiple 

123 iterators that are not known on beforehand. 

124 """ 

125 if iterable is not None: 125 ↛ 127line 125 didn't jump to line 127 because the condition on line 125 was always true

126 return filter(container.__contains__, iterable) 

127 return partial(filter, container.__contains__) 

128 

129 

130# iter_except is from the "itertools" recipe 

131def iter_except( 

132 func: Callable[[], _T], 

133 exception: type[BaseException] | tuple[type[BaseException], ...], 

134 first: Any = None, 

135) -> Iterator[_T]: # pragma: no cover - itertools recipe function 

136 """Call a function repeatedly until an exception is raised. 

137 

138 Converts a call-until-exception interface to an iterator interface. 

139 Like __builtin__.iter(func, sentinel) but uses an exception instead 

140 of a sentinel to end the loop. 

141 

142 Examples: 

143 bsddbiter = iter_except(db.next, bsddb.error, db.first) 

144 heapiter = iter_except(functools.partial(heappop, h), IndexError) 

145 dictiter = iter_except(d.popitem, KeyError) 

146 dequeiter = iter_except(d.popleft, IndexError) 

147 queueiter = iter_except(q.get_nowait, Queue.Empty) 

148 setiter = iter_except(s.pop, KeyError) 

149 

150 """ 

151 try: 

152 if first is not None: 

153 yield first() 

154 while 1: 

155 yield func() 

156 except exception: 

157 pass 

158 

159 

160def log_and_format_old_libraries( 

161 logger: logging.Logger, libs: list["MigrationItem"] 

162) -> None: 

163 """Format and log old libraries in a table (no header)""" 

164 libraries: dict[str, list[str]] = {} 

165 for i in libs: 

166 pkg = i.package 

167 if pkg in libraries: 

168 libraries[pkg].append(i.architecture) 

169 else: 

170 libraries[pkg] = [i.architecture] 

171 

172 for lib in sorted(libraries): 

173 logger.info(" %s: %s", lib, " ".join(libraries[lib])) 

174 

175 

176def compute_reverse_tree( 

177 pkg_universe: "BinaryPackageUniverse", affected: set[BinaryPackageId] 

178) -> None: 

179 """Calculate the full dependency tree for a set of packages 

180 

181 This method returns the full dependency tree for a given set of 

182 packages. The first argument is an instance of the BinaryPackageUniverse 

183 and the second argument are a set of BinaryPackageId. 

184 

185 The set of affected packages will be updated in place and must 

186 therefore be mutable. 

187 """ 

188 remain = list(affected) 

189 while remain: 

190 pkg_id = remain.pop() 

191 new_pkg_ids = pkg_universe.reverse_dependencies_of(pkg_id) - affected 

192 affected.update(new_pkg_ids) 

193 remain.extend(new_pkg_ids) 

194 

195 

196def add_transitive_dependencies_flatten( 

197 pkg_universe: "BinaryPackageUniverse", initial_set: MutableSet[BinaryPackageId] 

198) -> None: 

199 """Find and include all transitive dependencies 

200 

201 This method updates the initial_set parameter to include all transitive 

202 dependencies. The first argument is an instance of the BinaryPackageUniverse 

203 and the second argument are a set of BinaryPackageId. 

204 

205 The set of initial packages will be updated in place and must 

206 therefore be mutable. 

207 """ 

208 remain = list(initial_set) 

209 while remain: 

210 pkg_id = remain.pop() 

211 new_pkg_ids = { 

212 x 

213 for x in chain.from_iterable(pkg_universe.dependencies_of(pkg_id)) 

214 if x not in initial_set 

215 } 

216 initial_set |= new_pkg_ids 

217 remain.extend(new_pkg_ids) 

218 

219 

220def write_nuninst(filename: str, nuninst: dict[str, set[str]]) -> None: 

221 """Write the non-installable report 

222 

223 Write the non-installable report derived from "nuninst" to the 

224 file denoted by "filename". 

225 """ 

226 with open(filename, "w", encoding="utf-8") as f: 

227 # Having two fields with (almost) identical dates seems a bit 

228 # redundant. 

229 f.write( 

230 "Built on: " 

231 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

232 + "\n" 

233 ) 

234 f.write( 

235 "Last update: " 

236 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

237 + "\n\n" 

238 ) 

239 for k in nuninst: 

240 f.write("{}: {}\n".format(k, " ".join(nuninst[k]))) 

241 

242 

243def read_nuninst(filename: str, architectures: set[str]) -> dict[str, set[str]]: 

244 """Read the non-installable report 

245 

246 Read the non-installable report from the file denoted by 

247 "filename" and return it. Only architectures in "architectures" 

248 will be included in the report. 

249 """ 

250 nuninst: dict[str, set[str]] = {} 

251 with open(filename, encoding="ascii") as f: 

252 for r in f: 

253 if ":" not in r: 

254 continue 

255 arch, packages = r.strip().split(":", 1) 

256 if arch.split("+", 1)[0] in architectures: 

257 nuninst[arch] = set(packages.split()) 

258 return nuninst 

259 

260 

261def newly_uninst( 

262 nuold: dict[str, set[str]], nunew: dict[str, set[str]] 

263) -> dict[str, list[str]]: 

264 """Return a nuninst statistic with only new uninstallable packages 

265 

266 This method subtracts the uninstallable packages of the statistic 

267 "nunew" from the statistic "nuold". 

268 

269 It returns a dictionary with the architectures as keys and the list 

270 of uninstallable packages as values. If there are no regressions 

271 on a given architecture, then the architecture will be omitted in 

272 the result. Accordingly, if none of the architectures have 

273 regressions an empty directory is returned. 

274 """ 

275 res: dict[str, list[str]] = {} 

276 for arch in ifilter_only(nunew, nuold): 

277 arch_nuninst = [x for x in nunew[arch] if x not in nuold[arch]] 

278 # Leave res empty if there are no newly uninst packages 

279 if arch_nuninst: 

280 res[arch] = arch_nuninst 

281 return res 

282 

283 

284def format_and_log_uninst( 

285 logger: logging.Logger, 

286 architectures: Iterable[str], 

287 nuninst: Mapping[str, Iterable[str]], 

288 *, 

289 loglevel: int = logging.INFO, 

290) -> None: 

291 """Emits the uninstallable packages to the log 

292 

293 An example of the output string is: 

294 * i386: broken-pkg1, broken-pkg2 

295 

296 Note that if there is no uninstallable packages, then nothing is emitted. 

297 """ 

298 for arch in architectures: 

299 if arch in nuninst and nuninst[arch]: 

300 msg = " * {}: {}".format(arch, ", ".join(sorted(nuninst[arch]))) 

301 logger.log(loglevel, msg) 

302 

303 

304class Sorted(Protocol): 

305 def __call__( 305 ↛ exitline 305 didn't jump to the function exit

306 self, 

307 iterable: Iterable["SupportsRichComparisonT"], 

308 /, 

309 *, 

310 key: None = None, 

311 reverse: bool = False, 

312 ) -> list["SupportsRichComparisonT"]: ... 

313 

314 

315def write_heidi( 

316 filename: str, 

317 target_suite: TargetSuite, 

318 *, 

319 outofsync_arches: frozenset[str] = frozenset(), 

320 sorted: Sorted = sorted, 

321) -> None: 

322 """Write the output HeidiResult 

323 

324 This method write the output for Heidi, which contains all the 

325 binary packages and the source packages in the form: 

326 

327 <pkg-name> <pkg-version> <pkg-architecture> <pkg-section> 

328 <src-name> <src-version> source <src-section> 

329 

330 The file is written as "filename" using the sources and packages 

331 from the "target_suite" parameter. 

332 

333 outofsync_arches: If given, it is a set of architectures marked 

334 as "out of sync". The output file may exclude some out of date 

335 arch:all packages for those architectures to reduce the noise. 

336 

337 The "X=X" parameters are optimizations to avoid "load global" in 

338 the loops. 

339 """ 

340 sources_t = target_suite.sources 

341 packages_t = target_suite.binaries 

342 

343 with open(filename, "w", encoding="ascii") as f: 

344 

345 # write binary packages 

346 for arch in sorted(packages_t): 

347 binaries = packages_t[arch] 

348 for pkg_name in sorted(binaries): 

349 pkg = binaries[pkg_name] 

350 pkgv = pkg.version 

351 pkgarch = pkg.architecture or "all" 

352 pkgsec = pkg.section or "faux" 

353 if pkgsec == "faux" or pkgsec.endswith("/faux"): 

354 # Faux package; not really a part of testing 

355 continue 

356 if ( 356 ↛ 368line 356 didn't jump to line 368

357 pkg.source_version 

358 and pkgarch == "all" 

359 and pkg.source_version != sources_t[pkg.source].version 

360 and arch in outofsync_arches 

361 ): 

362 # when architectures are marked as "outofsync", their binary 

363 # versions may be lower than those of the associated 

364 # source package in testing. the binary package list for 

365 # such architectures will include arch:all packages 

366 # matching those older versions, but we only want the 

367 # newer arch:all in testing 

368 continue 

369 f.write(f"{pkg_name} {pkgv} {pkgarch} {pkgsec}\n") 

370 

371 # write sources 

372 for src_name in sorted(sources_t): 

373 src = sources_t[src_name] 

374 srcv = src.version 

375 srcsec = src.section or "unknown" 

376 if srcsec == "faux" or srcsec.endswith("/faux"): 

377 # Faux package; not really a part of testing 

378 continue 

379 f.write(f"{src_name} {srcv} source {srcsec}\n") 

380 

381 

382def write_heidi_delta(filename: str, all_selected: list["MigrationItem"]) -> None: 

383 """Write the output delta 

384 

385 This method writes the packages to be upgraded, in the form: 

386 <src-name> <src-version> 

387 or (if the source is to be removed): 

388 -<src-name> <src-version> 

389 

390 The order corresponds to that shown in update_output. 

391 """ 

392 with open(filename, "w", encoding="ascii") as fd: 

393 

394 fd.write("#HeidiDelta\n") 

395 

396 for item in all_selected: 

397 prefix = "" 

398 

399 if item.is_removal: 

400 prefix = "-" 

401 

402 if item.architecture == "source": 

403 fd.write(f"{prefix}{item.package} {item.version}\n") 

404 else: 

405 fd.write( 

406 "%s%s %s %s\n" 

407 % (prefix, item.package, item.version, item.architecture) 

408 ) 

409 

410 

411class Opener(Protocol): 

412 def __call__( 412 ↛ exitline 412 didn't jump to the function exit

413 self, file: str, mode: Literal["wt"], encoding: Literal["utf-8"] 

414 ) -> IO[Any]: ... 

415 

416 

417def write_excuses( 

418 excuses: dict[str, "Excuse"] | dict[PackageId, "Excuse"], 

419 dest_file: str, 

420 output_format: Literal["yaml", "legacy-html"] = "yaml", 

421) -> None: 

422 """Write the excuses to dest_file 

423 

424 Writes a list of excuses in a specified output_format to the 

425 path denoted by dest_file. The output_format can either be "yaml" 

426 or "legacy-html". 

427 """ 

428 excuselist = sorted(excuses.values(), key=lambda x: x.sortkey()) 

429 if output_format == "yaml": 

430 os.makedirs(os.path.dirname(dest_file), exist_ok=True) 

431 opener: Opener = open # type: ignore[assignment] 

432 if dest_file.endswith(".xz"): 432 ↛ 433line 432 didn't jump to line 433 because the condition on line 432 was never true

433 import lzma 

434 

435 opener = lzma.open # type: ignore[assignment] 

436 elif dest_file.endswith(".gz"): 436 ↛ 437line 436 didn't jump to line 437 because the condition on line 436 was never true

437 import gzip 

438 

439 opener = gzip.open # type: ignore[assignment] 

440 with opener(dest_file, "wt", encoding="utf-8") as f: 

441 edatalist = [e.excusedata(excuses) for e in excuselist] 

442 excusesdata = { 

443 "sources": edatalist, 

444 "generated-date": datetime.now(UTC), 

445 } 

446 f.write( 

447 yaml.dump(excusesdata, default_flow_style=False, allow_unicode=True) 

448 ) 

449 elif output_format == "legacy-html": 

450 with open(dest_file, "w", encoding="utf-8") as f: 

451 f.write( 

452 '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/REC-html40/strict.dtd">\n' 

453 ) 

454 f.write("<html><head><title>excuses...</title>") 

455 f.write( 

456 '<meta http-equiv="Content-Type" content="text/html;charset=utf-8"></head><body>\n' 

457 ) 

458 f.write( 

459 "<p>Generated: " 

460 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

461 + "</p>\n" 

462 ) 

463 f.write("<ul>\n") 

464 for e in excuselist: 

465 f.write("<li>%s" % e.html(excuses)) 

466 f.write("</ul></body></html>\n") 

467 else: # pragma: no cover 

468 raise ValueError('Output format must be either "yaml or "legacy-html"') 

469 

470 

471def old_libraries( 

472 mi_factory: "MigrationItemFactory", 

473 suite_info: Suites, 

474 outofsync_arches: Iterable[str] = frozenset(), 

475) -> list["MigrationItem"]: 

476 """Detect old libraries left in the target suite for smooth transitions 

477 

478 This method detects old libraries which are in the target suite but no 

479 longer built from the source package: they are still there because 

480 other packages still depend on them, but they should be removed as 

481 soon as possible. 

482 

483 For "outofsync" architectures, outdated binaries are allowed to be in 

484 the target suite, so they are only added to the removal list if they 

485 are no longer in the (primary) source suite. 

486 """ 

487 sources_t = suite_info.target_suite.sources 

488 binaries_t = suite_info.target_suite.binaries 

489 binaries_s = suite_info.primary_source_suite.binaries 

490 removals = [] 

491 for arch in binaries_t: 

492 for pkg_name in binaries_t[arch]: 

493 pkg = binaries_t[arch][pkg_name] 

494 if sources_t[pkg.source].version != pkg.source_version and ( 

495 arch not in outofsync_arches or pkg_name not in binaries_s[arch] 

496 ): 

497 removals.append(mi_factory.generate_removal_for_cruft_item(pkg.pkg_id)) 

498 return removals 

499 

500 

501def is_nuninst_asgood_generous( 

502 constraints: dict[str, list[str]], 

503 allow_uninst: dict[str, set[str | None]], 

504 architectures: list[str], 

505 old: dict[str, set[str]], 

506 new: dict[str, set[str]], 

507 break_arches: set[str] = cast(set[str], frozenset()), 

508) -> bool: 

509 """Compares the nuninst counters and constraints to see if they improved 

510 

511 Given a list of architectures, the previous and the current nuninst 

512 counters, this function determines if the current nuninst counter 

513 is better than the previous one. Optionally it also accepts a set 

514 of "break_arches", the nuninst counter for any architecture listed 

515 in this set are completely ignored. 

516 

517 If the nuninst counters are equal or better, then the constraints 

518 are checked for regressions (ignoring break_arches). 

519 

520 Returns True if the new nuninst counter is better than the 

521 previous and there are no constraint regressions (ignoring Break-archs). 

522 Returns False otherwise. 

523 

524 """ 

525 diff = 0 

526 for arch in architectures: 

527 if arch in break_arches: 

528 continue 

529 diff = diff + ( 

530 len(new[arch] - allow_uninst[arch]) - len(old[arch] - allow_uninst[arch]) 

531 ) 

532 if diff > 0: 

533 return False 

534 must_be_installable = constraints["keep-installable"] 

535 for arch in architectures: 

536 if arch in break_arches: 

537 continue 

538 regression = new[arch] - old[arch] 

539 if not regression.isdisjoint(must_be_installable): 539 ↛ 540line 539 didn't jump to line 540 because the condition on line 539 was never true

540 return False 

541 return True 

542 

543 

544def clone_nuninst( 

545 nuninst: dict[str, set[str]], 

546 *, 

547 packages_s: dict[str, dict[str, BinaryPackage]] | None = None, 

548 architectures: Iterable[str] | None = None, 

549) -> dict[str, set[str]]: 

550 """Completely or Selectively deep clone nuninst 

551 

552 Given nuninst table, the package table for a given suite and 

553 a list of architectures, this function will clone the nuninst 

554 table. Only the listed architectures will be deep cloned - 

555 the rest will only be shallow cloned. When packages_s is given, 

556 packages not listed in packages_s will be pruned from the clone 

557 (if packages_s is omitted, the per architecture nuninst is cloned 

558 as-is) 

559 """ 

560 clone = nuninst.copy() 

561 if architectures is None: 561 ↛ 562line 561 didn't jump to line 562 because the condition on line 561 was never true

562 return clone 

563 if packages_s is not None: 

564 for arch in architectures: 

565 clone[arch] = {x for x in nuninst[arch] if x in packages_s[arch]} 

566 clone[arch + "+all"] = { 

567 x for x in nuninst[arch + "+all"] if x in packages_s[arch] 

568 } 

569 else: 

570 for arch in architectures: 

571 clone[arch] = set(nuninst[arch]) 

572 clone[arch + "+all"] = set(nuninst[arch + "+all"]) 

573 return clone 

574 

575 

576def test_installability( 

577 target_suite: TargetSuite, 

578 pkg_name: str, 

579 pkg_id: BinaryPackageId, 

580 broken: set[str], 

581 nuninst_arch: set[str] | None, 

582) -> None: 

583 """Test for installability of a package on an architecture 

584 

585 (pkg_name, pkg_version, pkg_arch) is the package to check. 

586 

587 broken is the set of broken packages. If p changes 

588 installability (e.g. goes from uninstallable to installable), 

589 broken will be updated accordingly. 

590 

591 If nuninst_arch is not None then it also updated in the same 

592 way as broken is. 

593 """ 

594 if not target_suite.is_installable(pkg_id): 

595 # if pkg_name not in broken: regression else: already broken 

596 broken.add(pkg_name) 

597 if nuninst_arch is not None: 

598 nuninst_arch.add(pkg_name) 

599 else: 

600 # if pkg_name in broken: # improvement else: already not broken 

601 broken.discard(pkg_name) 

602 if nuninst_arch is not None: 

603 nuninst_arch.discard(pkg_name) 

604 

605 

606def check_installability( 

607 target_suite: TargetSuite, 

608 binaries: dict[str, dict[str, BinaryPackage]], 

609 arch: str, 

610 updates: set[BinaryPackageId], 

611 check_archall: bool, 

612 nuninst: dict[str, set[str]], 

613) -> None: 

614 broken = nuninst[arch + "+all"] 

615 packages_t_a = binaries[arch] 

616 

617 for pkg_id in (x for x in updates if x.architecture == arch): 

618 name, version, parch = pkg_id 

619 if name not in packages_t_a: 

620 continue 

621 pkgdata = packages_t_a[name] 

622 if version != pkgdata.version: 

623 # Not the version in testing right now, ignore 

624 continue 

625 actual_arch = pkgdata.architecture 

626 nuninst_arch = None 

627 # only check arch:all packages if requested 

628 if check_archall or actual_arch != "all": 

629 nuninst_arch = nuninst[parch] 

630 elif actual_arch == "all": 630 ↛ 632line 630 didn't jump to line 632 because the condition on line 630 was always true

631 nuninst[parch].discard(name) 

632 test_installability(target_suite, name, pkg_id, broken, nuninst_arch) 

633 

634 

635def possibly_compressed( 

636 path: str, *, permitted_compressions: list[str] | None = None 

637) -> str: 

638 """Find and select a (possibly compressed) variant of a path 

639 

640 If the given path exists, it will be returned 

641 

642 :param path: The base path. 

643 :param permitted_compressions: Alternative extensions to look for. Defaults to "gz" and "xz". 

644 :return: The path given possibly with one of the permitted extensions. 

645 :raises FileNotFoundError: if the path is not found 

646 """ 

647 if os.path.exists(path): 647 ↛ 649line 647 didn't jump to line 649 because the condition on line 647 was always true

648 return path 

649 if permitted_compressions is None: 

650 permitted_compressions = ["gz", "xz"] 

651 for ext in permitted_compressions: 

652 cpath = f"{path}.{ext}" 

653 if os.path.exists(cpath): 

654 return cpath 

655 raise FileNotFoundError( 

656 errno.ENOENT, os.strerror(errno.ENOENT), path 

657 ) # pragma: no cover 

658 

659 

660def create_provides_map( 

661 packages: dict[str, BinaryPackage], 

662) -> dict[str, set[tuple[str, str]]]: 

663 """Create a provides map from a map binary package names and their BinaryPackage objects 

664 

665 :param packages: A dict mapping binary package names to their BinaryPackage object 

666 :return: A provides map 

667 """ 

668 # create provides 

669 provides = defaultdict(set) 

670 

671 for pkg, dpkg in packages.items(): 

672 # register virtual packages and real packages that provide 

673 # them 

674 for provided_pkg, provided_version, _ in dpkg.provides: 

675 provides[provided_pkg].add((pkg, provided_version)) 

676 

677 return provides 

678 

679 

680def read_release_file(suite_dir: str) -> "TagSection[str]": 

681 """Parses a given "Release" file 

682 

683 :param suite_dir: The directory to the suite 

684 :return: A dict of the first (and only) paragraph in an Release file 

685 """ 

686 release_file = os.path.join(suite_dir, "Release") 

687 with open(release_file) as fd: 

688 tag_file = iter(apt_pkg.TagFile(fd)) 

689 result = next(tag_file) 

690 if next(tag_file, None) is not None: # pragma: no cover 

691 raise TypeError("%s has more than one paragraph" % release_file) 

692 return result 

693 

694 

695def read_sources_file( 

696 filename: str, 

697 sources: dict[str, SourcePackage] | None = None, 

698 add_faux: bool = True, 

699 intern: Callable[[str], str] = sys.intern, 

700) -> dict[str, SourcePackage]: 

701 """Parse a single Sources file into a hash 

702 

703 Parse a single Sources file into a dict mapping a source package 

704 name to a SourcePackage object. If there are multiple source 

705 packages with the same version, then highest versioned source 

706 package (that is not marked as "Extra-Source-Only") is the 

707 version kept in the dict. 

708 

709 :param filename: Path to the Sources file. Can be compressed by any algorithm supported by apt_pkg.TagFile 

710 :param sources: Optional dict to add the packages to. If given, this is also the value returned. 

711 :param add_faux: Add a faux arch:all binary for each source that claims it has arch:all 

712 :param intern: Internal optimisation / implementation detail to avoid python's "LOAD_GLOBAL" instruction in a loop 

713 :return: mapping from names to a source package 

714 """ 

715 if sources is None: 

716 sources = {} 

717 

718 tag_file = apt_pkg.TagFile(filename) 

719 get_field = tag_file.section.get 

720 step = tag_file.step 

721 

722 while step(): 

723 if get_field("Extra-Source-Only", "no") == "yes": 

724 # Ignore sources only referenced by Built-Using 

725 continue 

726 pkg = get_field("Package") 

727 ver = get_field("Version") 

728 # There may be multiple versions of the source package 

729 # (in unstable) if some architectures have out-of-date 

730 # binaries. We only ever consider the source with the 

731 # largest version for migration. 

732 if pkg in sources and apt_pkg.version_compare(sources[pkg].version, ver) > 0: 

733 continue 

734 maint = get_field("Maintainer") 

735 if maint: 735 ↛ 737line 735 didn't jump to line 737 because the condition on line 735 was always true

736 maint = intern(maint.strip()) 

737 section = get_field("Section") 

738 if section: 738 ↛ 741line 738 didn't jump to line 741 because the condition on line 738 was always true

739 section = intern(section.strip()) 

740 build_deps_arch: str | None 

741 build_deps_arch = ", ".join( 

742 x 

743 for x in (get_field("Build-Depends"), get_field("Build-Depends-Arch")) 

744 if x is not None 

745 ) 

746 if build_deps_arch != "": 

747 build_deps_arch = sys.intern(build_deps_arch) 

748 else: 

749 build_deps_arch = None 

750 build_deps_indep = get_field("Build-Depends-Indep") 

751 if build_deps_indep is not None: 

752 build_deps_indep = sys.intern(build_deps_indep) 

753 

754 # Adding arch:all packages to the list of binaries already to be able 

755 # to check for them later. Helps mitigate bug 887060 and is the 

756 # (partial?) answer to bug 1064428. 

757 binaries: set[BinaryPackageId] = set() 

758 if add_faux and "all" in get_field("Architecture", "").split(): 

759 # the value "faux" in arch:faux is used elsewhere, so keep in sync 

760 pkg_id = BinaryPackageId(pkg + "-faux", intern("0~~~~"), intern("faux")) 

761 binaries.add(pkg_id) 

762 

763 sources[intern(pkg)] = SourcePackage( 

764 intern(pkg), 

765 intern(ver), 

766 section, 

767 binaries, 

768 maint, 

769 False, 

770 build_deps_arch, 

771 build_deps_indep, 

772 get_field("Testsuite", "").split(), 

773 get_field("Testsuite-Triggers", "").replace(",", "").split(), 

774 ) 

775 return sources 

776 

777 

778def _check_and_update_packages( 

779 packages: list[BinaryPackage], 

780 package: BinaryPackage, 

781 archqual: str | None, 

782 build_depends: bool, 

783) -> None: 

784 """Helper for get_dependency_solvers 

785 

786 This method updates the list of packages with a given package if that 

787 package is a valid (Build-)Depends. 

788 

789 :param packages: which packages are to be updated 

790 :param archqual: Architecture qualifier 

791 :param build_depends: If True, check if the "package" parameter is valid as a build-dependency. 

792 """ 

793 

794 # See also bug #971739 and #1059929 

795 if archqual is None: 

796 packages.append(package) 

797 elif archqual == "native" and build_depends: 

798 # Multi-arch handling for build-dependencies 

799 # - :native is ok always 

800 packages.append(package) 

801 elif archqual == "any" and package.multi_arch == "allowed": 

802 # Multi-arch handling for both build-dependencies and regular dependencies 

803 # - :any is ok iff the target has "M-A: allowed" 

804 packages.append(package) 

805 

806 

807class GetDependencySolversProto(Protocol): 

808 def __call__( 808 ↛ exitline 808 didn't jump to the function exit

809 self, 

810 block: list[tuple[str, str, str]], 

811 binaries_s_a: dict[str, BinaryPackage], 

812 provides_s_a: dict[str, set[tuple[str, str]]], 

813 *, 

814 build_depends: bool = False, 

815 empty_set: Any = frozenset(), 

816 ) -> list[BinaryPackage]: ... 

817 

818 

819def get_dependency_solvers( 

820 block: list[tuple[str, str, str]], 

821 binaries_s_a: dict[str, BinaryPackage], 

822 provides_s_a: dict[str, set[tuple[str, str]]], 

823 *, 

824 build_depends: bool = False, 

825 empty_set: Any = frozenset(), 

826) -> list[BinaryPackage]: 

827 """Find the packages which satisfy a dependency block 

828 

829 This method returns the list of packages which satisfy a dependency 

830 block (as returned by apt_pkg.parse_depends) in a package table 

831 for a given suite and architecture (a la self.binaries[suite][arch]) 

832 

833 It can also handle build-dependency relations if the named parameter 

834 "build_depends" is set to True. In this case, block should be based 

835 on the return value from apt_pkg.parse_src_depends. 

836 

837 :param block: The dependency block as parsed by apt_pkg.parse_depends (or apt_pkg.parse_src_depends 

838 if the "build_depends" is True) 

839 :param binaries_s_a: Mapping of package names to the relevant BinaryPackage 

840 :param provides_s_a: Mapping of package names to their providers (as generated by parse_provides) 

841 :param build_depends: If True, treat the "block" parameter as a build-dependency relation rather than 

842 a regular dependency relation. 

843 :param empty_set: Internal implementation detail / optimisation 

844 :return: package names solving the relation 

845 """ 

846 packages: list[BinaryPackage] = [] 

847 

848 # for every package, version and operation in the block 

849 for name, version, op in block: 

850 if ":" in name: 

851 name, archqual = name.split(":", 1) 

852 else: 

853 archqual = None 

854 

855 # look for the package in unstable 

856 if name in binaries_s_a: 

857 package = binaries_s_a[name] 

858 # check the versioned dependency and architecture qualifier 

859 # (if present) 

860 if (op == "" and version == "") or apt_pkg.check_dep( 

861 package.version, op, version 

862 ): 

863 _check_and_update_packages(packages, package, archqual, build_depends) 

864 

865 # look for the package in the virtual packages list and loop on them 

866 for prov, prov_version in provides_s_a.get(name, empty_set): 

867 assert prov in binaries_s_a 

868 package = binaries_s_a[prov] 

869 # See Policy Manual §7.5 

870 if (op == "" and version == "") or ( 

871 prov_version != "" and apt_pkg.check_dep(prov_version, op, version) 

872 ): 

873 _check_and_update_packages(packages, package, archqual, build_depends) 

874 

875 return packages 

876 

877 

878def invalidate_excuses( 

879 excuses: dict[str, "Excuse"], 

880 valid: set[str], 

881 invalid: set[str], 

882 invalidated: set[str], 

883) -> None: 

884 """Invalidate impossible excuses 

885 

886 This method invalidates the impossible excuses, which depend 

887 on invalid excuses. The two parameters contains the sets of 

888 `valid' and `invalid' excuses. 

889 """ 

890 # make a list of all packages (source and binary) that are present in the 

891 # excuses we have 

892 excuses_packages: dict[PackageId | BinaryPackageId, set[str]] = defaultdict(set) 

893 for exc in excuses.values(): 

894 for arch in exc.packages: 

895 for pkg_arch_id in exc.packages[arch]: 

896 # note that the same package can be in multiple excuses 

897 # eg. when unstable and TPU have the same packages 

898 excuses_packages[pkg_arch_id].add(exc.name) 

899 

900 # create dependencies between excuses based on packages 

901 excuses_rdeps = defaultdict(set) 

902 for exc in excuses.values(): 

903 # Note that excuses_rdeps is only populated by dependencies generated 

904 # based on packages below. There are currently no dependencies between 

905 # excuses that are added directly, so this is ok. 

906 

907 for pkg_dep in exc.depends_packages: 

908 # set of excuses, each of which can satisfy this specific 

909 # dependency 

910 # if there is a dependency on a package for which no 

911 # excuses exist (e.g. a cruft binary), the set will 

912 # contain an ImpossibleDependencyState 

913 dep_exc: set[str | DependencyState] = set() 

914 for pkg_dep_id in cast(set[BinaryPackageId], pkg_dep.deps): 

915 pkg_excuses = excuses_packages[pkg_dep_id] 

916 # if the dependency isn't found, we get an empty set 

917 if pkg_excuses == frozenset(): 

918 imp_dep = ImpossibleDependencyState( 

919 PolicyVerdict.REJECTED_PERMANENTLY, "%s" % (pkg_dep_id.name) 

920 ) 

921 dep_exc.add(imp_dep) 

922 

923 else: 

924 dep_exc |= pkg_excuses 

925 for e in pkg_excuses: 

926 excuses_rdeps[e].add(exc.name) 

927 if not exc.add_dependency(dep_exc, pkg_dep.spec): 

928 valid.discard(exc.name) 

929 invalid.add(exc.name) 

930 

931 # loop on the invalid excuses 

932 # Convert invalid to a list for deterministic results 

933 invalid2 = sorted(invalid) 

934 for ename in iter_except(invalid2.pop, IndexError): 

935 invalidated.add(ename) 

936 # if there is no reverse dependency, skip the item 

937 if ename not in excuses_rdeps: 

938 continue 

939 

940 rdep_verdict = PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM 

941 if excuses[ename].policy_verdict.is_blocked: 

942 rdep_verdict = PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM 

943 

944 # loop on the reverse dependencies 

945 for x in sorted(excuses_rdeps[ename]): 

946 exc = excuses[x] 

947 # if the item is valid and it is not marked as `forced', then we 

948 # invalidate this specific dependency 

949 if x in valid and not exc.forced: 

950 # mark this specific dependency as invalid 

951 still_valid = exc.invalidate_dependency(ename, rdep_verdict) 

952 

953 # if there are no alternatives left for this dependency, 

954 # invalidate the excuse 

955 if not still_valid: 

956 valid.discard(x) 

957 invalid2.append(x) 

958 

959 

960def compile_nuninst( 

961 target_suite: TargetSuite, architectures: list[str], nobreakall_arches: list[str] 

962) -> dict[str, set[str]]: 

963 """Compile a nuninst dict from the current testing 

964 

965 :param target_suite: The target suite 

966 :param architectures: Which architectures to check 

967 :param nobreakall_arches: Which architectures where arch:all packages must be installable 

968 """ 

969 nuninst: dict[str, set[str]] = {} 

970 binaries_t = target_suite.binaries 

971 

972 # for all the architectures 

973 for arch in architectures: 

974 # if it is in the nobreakall ones, check arch-independent packages too 

975 check_archall = arch in nobreakall_arches 

976 

977 # check all the packages for this architecture 

978 nuninst[arch] = set() 

979 packages_t_a = binaries_t[arch] 

980 for pkg_name, pkg_data in packages_t_a.items(): 

981 r = target_suite.is_installable(pkg_data.pkg_id) 

982 if not r: 

983 nuninst[arch].add(pkg_name) 

984 

985 # if they are not required, remove architecture-independent packages 

986 nuninst[arch + "+all"] = nuninst[arch].copy() 

987 if not check_archall: 

988 for pkg_name in nuninst[arch + "+all"]: 

989 pkg_data = packages_t_a[pkg_name] 

990 if pkg_data.architecture == "all": 

991 nuninst[arch].remove(pkg_name) 

992 

993 return nuninst 

994 

995 

996def is_smooth_update_allowed( 

997 binary: BinaryPackage, smooth_updates: list[str], hints: "HintCollection" 

998) -> bool: 

999 if "ALL" in smooth_updates: 999 ↛ 1000line 999 didn't jump to line 1000 because the condition on line 999 was never true

1000 return True 

1001 section = binary.section.split("/")[-1] 

1002 if section in smooth_updates: 

1003 return True 

1004 if hints.search( 

1005 "allow-smooth-update", package=binary.source, version=binary.source_version 

1006 ): 

1007 # note that this needs to match the source version *IN TESTING* 

1008 return True 

1009 return False 

1010 

1011 

1012def find_smooth_updateable_binaries( 

1013 binaries_to_check: list[BinaryPackageId], 

1014 source_data: SourcePackage, 

1015 pkg_universe: "BinaryPackageUniverse", 

1016 target_suite: TargetSuite, 

1017 binaries_t: dict[str, dict[str, BinaryPackage]], 

1018 binaries_s: dict[str, dict[str, BinaryPackage]], 

1019 removals: set[BinaryPackageId] | frozenset[BinaryPackageId], 

1020 smooth_updates: list[str], 

1021 hints: "HintCollection", 

1022) -> set[BinaryPackageId]: 

1023 check: set[BinaryPackageId] = set() 

1024 smoothbins: set[BinaryPackageId] = set() 

1025 

1026 for check_pkg_id in binaries_to_check: 

1027 binary, _, parch = check_pkg_id 

1028 

1029 cruftbins: set[BinaryPackageId] = set() 

1030 

1031 # Not a candidate for smooth up date (newer non-cruft version in unstable) 

1032 if binary in binaries_s[parch]: 

1033 if binaries_s[parch][binary].source_version == source_data.version: 

1034 continue 

1035 cruftbins.add(binaries_s[parch][binary].pkg_id) 

1036 

1037 # Maybe a candidate (cruft or removed binary): check if config allows us to smooth update it. 

1038 if is_smooth_update_allowed(binaries_t[parch][binary], smooth_updates, hints): 

1039 # if the package has reverse-dependencies which are 

1040 # built from other sources, it's a valid candidate for 

1041 # a smooth update. if not, it may still be a valid 

1042 # candidate if one if its r-deps is itself a candidate, 

1043 # so note it for checking later 

1044 rdeps = set(pkg_universe.reverse_dependencies_of(check_pkg_id)) 

1045 # We ignore all binaries listed in "removals" as we 

1046 # assume they will leave at the same time as the 

1047 # given package. 

1048 rdeps.difference_update(removals, binaries_to_check) 

1049 

1050 smooth_update_it = False 

1051 if target_suite.any_of_these_are_in_the_suite(rdeps): 

1052 combined = set(smoothbins) 

1053 combined.add(check_pkg_id) 

1054 for rdep in rdeps: 

1055 # each dependency clause has a set of possible 

1056 # alternatives that can satisfy that dependency. 

1057 # if any of them is outside the set of smoothbins, the 

1058 # dependency can be satisfied even if this binary was 

1059 # removed, so there is no need to keep it around for a 

1060 # smooth update 

1061 # if not, only this binary can satisfy the dependency, so 

1062 # we should keep it around until the rdep is no longer in 

1063 # testing 

1064 for dep_clause in pkg_universe.dependencies_of(rdep): 

1065 # filter out cruft binaries from unstable, because 

1066 # they will not be added to the set of packages that 

1067 # will be migrated 

1068 if dep_clause - cruftbins <= combined: 

1069 smooth_update_it = True 

1070 break 

1071 

1072 if smooth_update_it: 

1073 smoothbins = combined 

1074 else: 

1075 check.add(check_pkg_id) 

1076 

1077 # check whether we should perform a smooth update for 

1078 # packages which are candidates but do not have r-deps 

1079 # outside of the current source 

1080 while 1: 

1081 found_any = False 

1082 for candidate_pkg_id in check: 

1083 rdeps = pkg_universe.reverse_dependencies_of(candidate_pkg_id) 

1084 if not rdeps.isdisjoint(smoothbins): 

1085 smoothbins.add(candidate_pkg_id) 

1086 found_any = True 

1087 if not found_any: 

1088 break 

1089 check = {x for x in check if x not in smoothbins} 

1090 

1091 return smoothbins 

1092 

1093 

1094def find_newer_binaries( 

1095 suite_info: Suites, pkg: BinaryPackage, add_source_for_dropped_bin: bool = False 

1096) -> list[tuple[PackageId, Suite]]: 

1097 """ 

1098 Find newer binaries for pkg in any of the source suites. 

1099 

1100 :param pkg: BinaryPackage (is assumed to be in the target suite) 

1101 

1102 :param add_source_for_dropped_bin: If True, newer versions of the 

1103 source of pkg will be added if they don't have the binary pkg 

1104 

1105 :return: the newer binaries (or sources) and their suites 

1106 """ 

1107 source = pkg.source 

1108 newer_versions: list[tuple[PackageId, Suite]] = [] 

1109 for suite in suite_info: 

1110 if suite.suite_class == SuiteClass.TARGET_SUITE: 

1111 continue 

1112 

1113 suite_binaries_on_arch = suite.binaries.get(pkg.pkg_id.architecture) 

1114 if not suite_binaries_on_arch: 1114 ↛ 1115line 1114 didn't jump to line 1115 because the condition on line 1114 was never true

1115 continue 

1116 

1117 newerbin = None 

1118 if pkg.pkg_id.package_name in suite_binaries_on_arch: 

1119 newerbin = suite_binaries_on_arch[pkg.pkg_id.package_name] 

1120 if suite.is_cruft(newerbin): 

1121 # We pretend the cruft binary doesn't exist. 

1122 # We handle this as if the source didn't have the binary 

1123 # (see below) 

1124 newerbin = None 

1125 elif apt_pkg.version_compare(newerbin.version, pkg.version) <= 0: 

1126 continue 

1127 else: 

1128 if source not in suite.sources: 

1129 # bin and source not in suite: no newer version 

1130 continue 

1131 

1132 if not newerbin: 

1133 if not add_source_for_dropped_bin: 1133 ↛ 1134line 1133 didn't jump to line 1134 because the condition on line 1133 was never true

1134 continue 

1135 # We only get here if there is a newer version of the source, 

1136 # which doesn't have the binary anymore (either it doesn't 

1137 # exist, or it's cruft and we pretend it doesn't exist). 

1138 # Add the new source instead. 

1139 nsrc = suite.sources[source] 

1140 n_id = PackageId(source, nsrc.version, "source") 

1141 overs = pkg.source_version 

1142 if apt_pkg.version_compare(nsrc.version, overs) <= 0: 

1143 continue 

1144 else: 

1145 n_id = newerbin.pkg_id 

1146 

1147 newer_versions.append((n_id, suite)) 

1148 

1149 return newer_versions 

1150 

1151 

1152def parse_provides( 

1153 provides_raw: str, 

1154 pkg_id: BinaryPackageId | None = None, 

1155 logger: logging.Logger | None = None, 

1156) -> list[tuple[str, str, str]]: 

1157 parts = apt_pkg.parse_depends(provides_raw, False) 

1158 nprov = [] 

1159 for or_clause in parts: 

1160 if len(or_clause) != 1: # pragma: no cover 

1161 if logger is not None: 

1162 msg = "Ignoring invalid provides in %s: Alternatives [%s]" 

1163 logger.warning(msg, str(pkg_id), str(or_clause)) 

1164 continue 

1165 for part in or_clause: 

1166 provided, provided_version, op = part 

1167 if op != "" and op != "=": # pragma: no cover 

1168 if logger is not None: 

1169 msg = "Ignoring invalid provides in %s: %s (%s %s)" 

1170 logger.warning(msg, str(pkg_id), provided, op, provided_version) 

1171 continue 

1172 provided = sys.intern(provided) 

1173 provided_version = sys.intern(provided_version) 

1174 part = (provided, provided_version, sys.intern(op)) 

1175 nprov.append(part) 

1176 return nprov 

1177 

1178 

1179def parse_builtusing( 

1180 builtusing_raw: str, 

1181 pkg_id: BinaryPackageId | None = None, 

1182 logger: logging.Logger | None = None, 

1183) -> list[tuple[str, str]]: 

1184 parts = apt_pkg.parse_depends(builtusing_raw, False) 

1185 nbu = [] 

1186 for or_clause in parts: 

1187 if len(or_clause) != 1: # pragma: no cover 

1188 if logger is not None: 

1189 msg = "Ignoring invalid builtusing in %s: Alternatives [%s]" 

1190 logger.warning(msg, str(pkg_id), str(or_clause)) 

1191 continue 

1192 for part in or_clause: 

1193 bu, bu_version, op = part 

1194 if op != "=": # pragma: no cover 

1195 if logger is not None: 

1196 msg = "Ignoring invalid builtusing in %s: %s (%s %s)" 

1197 logger.warning(msg, str(pkg_id), bu, op, bu_version) 

1198 continue 

1199 bu = sys.intern(bu) 

1200 bu_version = sys.intern(bu_version) 

1201 nbu.append((bu, bu_version)) 

1202 return nbu 

1203 

1204 

1205def parse_option( 

1206 options: "optparse.Values", 

1207 option_name: str, 

1208 default: Any | None = None, 

1209 to_bool: bool = False, 

1210 to_int: bool = False, 

1211 day_to_sec: bool = False, 

1212) -> None: 

1213 """Ensure the option exist and has a sane value 

1214 

1215 :param options: dict with options 

1216 

1217 :param option_name: string with the name of the option 

1218 

1219 :param default: the default value for the option 

1220 

1221 :param to_int: convert the input to int (defaults to sys.maxsize) 

1222 

1223 :param to_bool: convert the input to bool 

1224 

1225 :param day_to_sec: convert the input from days to seconds (implies to_int=True) 

1226 """ 

1227 value = getattr(options, option_name, default) 

1228 

1229 # Option was provided with no value (or default is '') so pick up the default 

1230 if value == "": 

1231 value = default 

1232 

1233 if (to_int or day_to_sec) and value in (None, ""): 

1234 value = sys.maxsize 

1235 

1236 if day_to_sec: 

1237 value = int(float(value) * 24 * 60 * 60) # type: ignore[arg-type] 

1238 

1239 if to_int: 

1240 value = int(value) # type: ignore[arg-type] 

1241 

1242 if to_bool: 

1243 if value and ( 

1244 isinstance(value, bool) or value.lower() in ("yes", "y", "true", "t", "1") 

1245 ): 

1246 value = True 

1247 else: 

1248 value = False 

1249 

1250 setattr(options, option_name, value) 

1251 

1252 

1253def filter_out_faux(binaries: Iterable[BinaryPackageId]) -> set[BinaryPackageId]: 

1254 """Returns a set without faux packages""" 

1255 

1256 return { 

1257 pkg for pkg in binaries if not pkg.package_name.endswith("-faux-build-depends") 

1258 }