Coverage for britney2/utils.py: 90%

462 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-10-30 09:44 +0000

1# Refactored parts from britney.py, which is/was: 

2# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org> 

3# Andreas Barth <aba@debian.org> 

4# Fabio Tranchitella <kobold@debian.org> 

5# Copyright (C) 2010-2012 Adam D. Barratt <adsb@debian.org> 

6# Copyright (C) 2012 Niels Thykier <niels@thykier.net> 

7# 

8# New portions 

9# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org> 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21 

22import errno 

23import logging 

24import optparse 

25import os 

26import sys 

27import time 

28from collections import defaultdict 

29from collections.abc import Callable, Container, Iterable, Iterator, Mapping, MutableSet 

30from datetime import UTC, datetime 

31from functools import partial 

32from itertools import chain, filterfalse 

33from typing import ( 

34 IO, 

35 TYPE_CHECKING, 

36 Any, 

37 Literal, 

38 Optional, 

39 Protocol, 

40 TypeVar, 

41 Union, 

42 cast, 

43 overload, 

44) 

45 

46import apt_pkg 

47import yaml 

48 

49from britney2 import ( 

50 BinaryPackage, 

51 BinaryPackageId, 

52 PackageId, 

53 SourcePackage, 

54 Suite, 

55 SuiteClass, 

56 Suites, 

57 TargetSuite, 

58) 

59from britney2.excusedeps import DependencyState, ImpossibleDependencyState 

60from britney2.policies import PolicyVerdict 

61 

62if TYPE_CHECKING: 62 ↛ 64line 62 didn't jump to line 64 because the condition on line 62 was never true

63 

64 from _typeshed import SupportsRichComparisonT 

65 from apt_pkg import TagSection 

66 

67 from .excuse import Excuse 

68 from .hints import HintCollection 

69 from .installability.universe import BinaryPackageUniverse 

70 from .migrationitem import MigrationItem, MigrationItemFactory 

71 

72_T = TypeVar("_T") 

73 

74 

75class MigrationConstraintException(Exception): 

76 pass 

77 

78 

79@overload 

80def ifilter_except( 80 ↛ exitline 80 didn't jump to the function exit

81 container: Container[_T], iterable: Literal[None] = None 

82) -> "partial[filterfalse[_T]]": ... 

83 

84 

85@overload 

86def ifilter_except( 86 ↛ exitline 86 didn't jump to the function exit

87 container: Container[_T], iterable: Iterable[_T] 

88) -> "filterfalse[_T]": ... 

89 

90 

91def ifilter_except( 

92 container: Container[_T], iterable: Iterable[_T] | None = None 

93) -> Union["filterfalse[_T]", "partial[filterfalse[_T]]"]: 

94 """Filter out elements in container 

95 

96 If given an iterable it returns a filtered iterator, otherwise it 

97 returns a function to generate filtered iterators. The latter is 

98 useful if the same filter has to be (re-)used on multiple 

99 iterators that are not known on beforehand. 

100 """ 

101 if iterable is not None: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true

102 return filterfalse(container.__contains__, iterable) 

103 return cast( 

104 "partial[filterfalse[_T]]", partial(filterfalse, container.__contains__) 

105 ) 

106 

107 

108@overload 

109def ifilter_only(container: Container[_T], iterable: Iterable[_T]) -> "filter[_T]": ... 109 ↛ exitline 109 didn't return from function 'ifilter_only' because

110 

111 

112@overload 

113def ifilter_only(container: Container[_T], iterable: None) -> "partial[filter[_T]]": ... 113 ↛ exitline 113 didn't return from function 'ifilter_only' because

114 

115 

116def ifilter_only( 

117 container: Container[_T], iterable: Iterable[_T] | None = None 

118) -> Union["filter[_T]", "partial[filter[_T]]"]: 

119 """Filter out elements in which are not in container 

120 

121 If given an iterable it returns a filtered iterator, otherwise it 

122 returns a function to generate filtered iterators. The latter is 

123 useful if the same filter has to be (re-)used on multiple 

124 iterators that are not known on beforehand. 

125 """ 

126 if iterable is not None: 126 ↛ 128line 126 didn't jump to line 128 because the condition on line 126 was always true

127 return filter(container.__contains__, iterable) 

128 return partial(filter, container.__contains__) 

129 

130 

131# iter_except is from the "itertools" recipe 

132def iter_except( 

133 func: Callable[[], _T], 

134 exception: type[BaseException] | tuple[type[BaseException], ...], 

135 first: Any = None, 

136) -> Iterator[_T]: # pragma: no cover - itertools recipe function 

137 """Call a function repeatedly until an exception is raised. 

138 

139 Converts a call-until-exception interface to an iterator interface. 

140 Like __builtin__.iter(func, sentinel) but uses an exception instead 

141 of a sentinel to end the loop. 

142 

143 Examples: 

144 bsddbiter = iter_except(db.next, bsddb.error, db.first) 

145 heapiter = iter_except(functools.partial(heappop, h), IndexError) 

146 dictiter = iter_except(d.popitem, KeyError) 

147 dequeiter = iter_except(d.popleft, IndexError) 

148 queueiter = iter_except(q.get_nowait, Queue.Empty) 

149 setiter = iter_except(s.pop, KeyError) 

150 

151 """ 

152 try: 

153 if first is not None: 

154 yield first() 

155 while 1: 

156 yield func() 

157 except exception: 

158 pass 

159 

160 

161def log_and_format_old_libraries( 

162 logger: logging.Logger, libs: list["MigrationItem"] 

163) -> None: 

164 """Format and log old libraries in a table (no header)""" 

165 libraries: dict[str, list[str]] = {} 

166 for i in libs: 

167 pkg = i.package 

168 if pkg in libraries: 

169 libraries[pkg].append(i.architecture) 

170 else: 

171 libraries[pkg] = [i.architecture] 

172 

173 for lib in sorted(libraries): 

174 logger.info(" %s: %s", lib, " ".join(libraries[lib])) 

175 

176 

177def compute_reverse_tree( 

178 pkg_universe: "BinaryPackageUniverse", affected: set[BinaryPackageId] 

179) -> None: 

180 """Calculate the full dependency tree for a set of packages 

181 

182 This method returns the full dependency tree for a given set of 

183 packages. The first argument is an instance of the BinaryPackageUniverse 

184 and the second argument are a set of BinaryPackageId. 

185 

186 The set of affected packages will be updated in place and must 

187 therefore be mutable. 

188 """ 

189 remain = list(affected) 

190 while remain: 

191 pkg_id = remain.pop() 

192 new_pkg_ids = pkg_universe.reverse_dependencies_of(pkg_id) - affected 

193 affected.update(new_pkg_ids) 

194 remain.extend(new_pkg_ids) 

195 

196 

197def add_transitive_dependencies_flatten( 

198 pkg_universe: "BinaryPackageUniverse", initial_set: MutableSet[BinaryPackageId] 

199) -> None: 

200 """Find and include all transitive dependencies 

201 

202 This method updates the initial_set parameter to include all transitive 

203 dependencies. The first argument is an instance of the BinaryPackageUniverse 

204 and the second argument are a set of BinaryPackageId. 

205 

206 The set of initial packages will be updated in place and must 

207 therefore be mutable. 

208 """ 

209 remain = list(initial_set) 

210 while remain: 

211 pkg_id = remain.pop() 

212 new_pkg_ids = { 

213 x 

214 for x in chain.from_iterable(pkg_universe.dependencies_of(pkg_id)) 

215 if x not in initial_set 

216 } 

217 initial_set |= new_pkg_ids 

218 remain.extend(new_pkg_ids) 

219 

220 

221def write_nuninst(filename: str, nuninst: dict[str, set[str]]) -> None: 

222 """Write the non-installable report 

223 

224 Write the non-installable report derived from "nuninst" to the 

225 file denoted by "filename". 

226 """ 

227 with open(filename, "w", encoding="utf-8") as f: 

228 # Having two fields with (almost) identical dates seems a bit 

229 # redundant. 

230 f.write( 

231 "Built on: " 

232 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

233 + "\n" 

234 ) 

235 f.write( 

236 "Last update: " 

237 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

238 + "\n\n" 

239 ) 

240 for k in nuninst: 

241 f.write("{}: {}\n".format(k, " ".join(nuninst[k]))) 

242 

243 

244def read_nuninst(filename: str, architectures: set[str]) -> dict[str, set[str]]: 

245 """Read the non-installable report 

246 

247 Read the non-installable report from the file denoted by 

248 "filename" and return it. Only architectures in "architectures" 

249 will be included in the report. 

250 """ 

251 nuninst: dict[str, set[str]] = {} 

252 with open(filename, encoding="ascii") as f: 

253 for r in f: 

254 if ":" not in r: 

255 continue 

256 arch, packages = r.strip().split(":", 1) 

257 if arch.split("+", 1)[0] in architectures: 

258 nuninst[arch] = set(packages.split()) 

259 return nuninst 

260 

261 

262def newly_uninst( 

263 nuold: dict[str, set[str]], nunew: dict[str, set[str]] 

264) -> dict[str, list[str]]: 

265 """Return a nuninst statistic with only new uninstallable packages 

266 

267 This method subtracts the uninstallable packages of the statistic 

268 "nunew" from the statistic "nuold". 

269 

270 It returns a dictionary with the architectures as keys and the list 

271 of uninstallable packages as values. If there are no regressions 

272 on a given architecture, then the architecture will be omitted in 

273 the result. Accordingly, if none of the architectures have 

274 regressions an empty directory is returned. 

275 """ 

276 res: dict[str, list[str]] = {} 

277 for arch in ifilter_only(nunew, nuold): 

278 arch_nuninst = [x for x in nunew[arch] if x not in nuold[arch]] 

279 # Leave res empty if there are no newly uninst packages 

280 if arch_nuninst: 

281 res[arch] = arch_nuninst 

282 return res 

283 

284 

285def format_and_log_uninst( 

286 logger: logging.Logger, 

287 architectures: Iterable[str], 

288 nuninst: Mapping[str, Iterable[str]], 

289 *, 

290 loglevel: int = logging.INFO, 

291) -> None: 

292 """Emits the uninstallable packages to the log 

293 

294 An example of the output string is: 

295 * i386: broken-pkg1, broken-pkg2 

296 

297 Note that if there is no uninstallable packages, then nothing is emitted. 

298 """ 

299 for arch in architectures: 

300 if arch in nuninst and nuninst[arch]: 

301 msg = " * {}: {}".format(arch, ", ".join(sorted(nuninst[arch]))) 

302 logger.log(loglevel, msg) 

303 

304 

305class Sorted(Protocol): 

306 def __call__( 306 ↛ exitline 306 didn't jump to the function exit

307 self, 

308 iterable: Iterable["SupportsRichComparisonT"], 

309 /, 

310 *, 

311 key: None = None, 

312 reverse: bool = False, 

313 ) -> list["SupportsRichComparisonT"]: ... 

314 

315 

316def write_heidi( 

317 filename: str, 

318 target_suite: TargetSuite, 

319 *, 

320 outofsync_arches: frozenset[str] = frozenset(), 

321 sorted: Sorted = sorted, 

322) -> None: 

323 """Write the output HeidiResult 

324 

325 This method write the output for Heidi, which contains all the 

326 binary packages and the source packages in the form: 

327 

328 <pkg-name> <pkg-version> <pkg-architecture> <pkg-section> 

329 <src-name> <src-version> source <src-section> 

330 

331 The file is written as "filename" using the sources and packages 

332 from the "target_suite" parameter. 

333 

334 outofsync_arches: If given, it is a set of architectures marked 

335 as "out of sync". The output file may exclude some out of date 

336 arch:all packages for those architectures to reduce the noise. 

337 

338 The "X=X" parameters are optimizations to avoid "load global" in 

339 the loops. 

340 """ 

341 sources_t = target_suite.sources 

342 packages_t = target_suite.binaries 

343 

344 with open(filename, "w", encoding="ascii") as f: 

345 

346 # write binary packages 

347 for arch in sorted(packages_t): 

348 binaries = packages_t[arch] 

349 for pkg_name in sorted(binaries): 

350 pkg = binaries[pkg_name] 

351 pkgv = pkg.version 

352 pkgarch = pkg.architecture or "all" 

353 pkgsec = pkg.section or "faux" 

354 if pkgsec == "faux" or pkgsec.endswith("/faux"): 

355 # Faux package; not really a part of testing 

356 continue 

357 if ( 357 ↛ 369line 357 didn't jump to line 369

358 pkg.source_version 

359 and pkgarch == "all" 

360 and pkg.source_version != sources_t[pkg.source].version 

361 and arch in outofsync_arches 

362 ): 

363 # when architectures are marked as "outofsync", their binary 

364 # versions may be lower than those of the associated 

365 # source package in testing. the binary package list for 

366 # such architectures will include arch:all packages 

367 # matching those older versions, but we only want the 

368 # newer arch:all in testing 

369 continue 

370 f.write(f"{pkg_name} {pkgv} {pkgarch} {pkgsec}\n") 

371 

372 # write sources 

373 for src_name in sorted(sources_t): 

374 src = sources_t[src_name] 

375 srcv = src.version 

376 srcsec = src.section or "unknown" 

377 if srcsec == "faux" or srcsec.endswith("/faux"): 

378 # Faux package; not really a part of testing 

379 continue 

380 f.write(f"{src_name} {srcv} source {srcsec}\n") 

381 

382 

383def write_heidi_delta(filename: str, all_selected: list["MigrationItem"]) -> None: 

384 """Write the output delta 

385 

386 This method writes the packages to be upgraded, in the form: 

387 <src-name> <src-version> 

388 or (if the source is to be removed): 

389 -<src-name> <src-version> 

390 

391 The order corresponds to that shown in update_output. 

392 """ 

393 with open(filename, "w", encoding="ascii") as fd: 

394 

395 fd.write("#HeidiDelta\n") 

396 

397 for item in all_selected: 

398 prefix = "" 

399 

400 if item.is_removal: 

401 prefix = "-" 

402 

403 if item.architecture == "source": 

404 fd.write(f"{prefix}{item.package} {item.version}\n") 

405 else: 

406 fd.write( 

407 "%s%s %s %s\n" 

408 % (prefix, item.package, item.version, item.architecture) 

409 ) 

410 

411 

412class Opener(Protocol): 

413 def __call__( 413 ↛ exitline 413 didn't jump to the function exit

414 self, file: str, mode: Literal["wt"], encoding: Literal["utf-8"] 

415 ) -> IO[Any]: ... 

416 

417 

418def write_excuses( 

419 excuses: dict[str, "Excuse"] | dict[PackageId, "Excuse"], 

420 dest_file: str, 

421 output_format: Literal["yaml", "legacy-html"] = "yaml", 

422) -> None: 

423 """Write the excuses to dest_file 

424 

425 Writes a list of excuses in a specified output_format to the 

426 path denoted by dest_file. The output_format can either be "yaml" 

427 or "legacy-html". 

428 """ 

429 excuselist = sorted(excuses.values(), key=lambda x: x.sortkey()) 

430 if output_format == "yaml": 

431 os.makedirs(os.path.dirname(dest_file), exist_ok=True) 

432 opener: Opener = open # type: ignore[assignment] 

433 if dest_file.endswith(".xz"): 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true

434 import lzma 

435 

436 opener = lzma.open # type: ignore[assignment] 

437 elif dest_file.endswith(".gz"): 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 import gzip 

439 

440 opener = gzip.open # type: ignore[assignment] 

441 with opener(dest_file, "wt", encoding="utf-8") as f: 

442 edatalist = [e.excusedata(excuses) for e in excuselist] 

443 excusesdata = { 

444 "sources": edatalist, 

445 "generated-date": datetime.now(UTC), 

446 } 

447 f.write( 

448 yaml.dump(excusesdata, default_flow_style=False, allow_unicode=True) 

449 ) 

450 elif output_format == "legacy-html": 

451 with open(dest_file, "w", encoding="utf-8") as f: 

452 f.write( 

453 '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/REC-html40/strict.dtd">\n' 

454 ) 

455 f.write("<html><head><title>excuses...</title>") 

456 f.write( 

457 '<meta http-equiv="Content-Type" content="text/html;charset=utf-8"></head><body>\n' 

458 ) 

459 f.write( 

460 "<p>Generated: " 

461 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

462 + "</p>\n" 

463 ) 

464 f.write("<ul>\n") 

465 for e in excuselist: 

466 f.write("<li>%s" % e.html(excuses)) 

467 f.write("</ul></body></html>\n") 

468 else: # pragma: no cover 

469 raise ValueError('Output format must be either "yaml or "legacy-html"') 

470 

471 

472def old_libraries( 

473 mi_factory: "MigrationItemFactory", 

474 suite_info: Suites, 

475 outofsync_arches: Iterable[str] = frozenset(), 

476) -> list["MigrationItem"]: 

477 """Detect old libraries left in the target suite for smooth transitions 

478 

479 This method detects old libraries which are in the target suite but no 

480 longer built from the source package: they are still there because 

481 other packages still depend on them, but they should be removed as 

482 soon as possible. 

483 

484 For "outofsync" architectures, outdated binaries are allowed to be in 

485 the target suite, so they are only added to the removal list if they 

486 are no longer in the (primary) source suite. 

487 """ 

488 sources_t = suite_info.target_suite.sources 

489 binaries_t = suite_info.target_suite.binaries 

490 binaries_s = suite_info.primary_source_suite.binaries 

491 removals = [] 

492 for arch in binaries_t: 

493 for pkg_name in binaries_t[arch]: 

494 pkg = binaries_t[arch][pkg_name] 

495 if sources_t[pkg.source].version != pkg.source_version and ( 

496 arch not in outofsync_arches or pkg_name not in binaries_s[arch] 

497 ): 

498 removals.append(mi_factory.generate_removal_for_cruft_item(pkg.pkg_id)) 

499 return removals 

500 

501 

502def is_nuninst_asgood_generous( 

503 constraints: dict[str, list[str]], 

504 allow_uninst: dict[str, set[str | None]], 

505 architectures: list[str], 

506 old: dict[str, set[str]], 

507 new: dict[str, set[str]], 

508 break_arches: set[str] = cast(set[str], frozenset()), 

509) -> bool: 

510 """Compares the nuninst counters and constraints to see if they improved 

511 

512 Given a list of architectures, the previous and the current nuninst 

513 counters, this function determines if the current nuninst counter 

514 is better than the previous one. Optionally it also accepts a set 

515 of "break_arches", the nuninst counter for any architecture listed 

516 in this set are completely ignored. 

517 

518 If the nuninst counters are equal or better, then the constraints 

519 are checked for regressions (ignoring break_arches). 

520 

521 Returns True if the new nuninst counter is better than the 

522 previous and there are no constraint regressions (ignoring Break-archs). 

523 Returns False otherwise. 

524 

525 """ 

526 diff = 0 

527 for arch in architectures: 

528 if arch in break_arches: 

529 continue 

530 diff = diff + ( 

531 len(new[arch] - allow_uninst[arch]) - len(old[arch] - allow_uninst[arch]) 

532 ) 

533 if diff > 0: 533 ↛ 534line 533 didn't jump to line 534 because the condition on line 533 was never true

534 return False 

535 must_be_installable = constraints["keep-installable"] 

536 for arch in architectures: 

537 if arch in break_arches: 

538 continue 

539 regression = new[arch] - old[arch] 

540 if not regression.isdisjoint(must_be_installable): 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true

541 return False 

542 return True 

543 

544 

545def clone_nuninst( 

546 nuninst: dict[str, set[str]], 

547 *, 

548 packages_s: dict[str, dict[str, BinaryPackage]] | None = None, 

549 architectures: Iterable[str] | None = None, 

550) -> dict[str, set[str]]: 

551 """Completely or Selectively deep clone nuninst 

552 

553 Given nuninst table, the package table for a given suite and 

554 a list of architectures, this function will clone the nuninst 

555 table. Only the listed architectures will be deep cloned - 

556 the rest will only be shallow cloned. When packages_s is given, 

557 packages not listed in packages_s will be pruned from the clone 

558 (if packages_s is omitted, the per architecture nuninst is cloned 

559 as-is) 

560 """ 

561 clone = nuninst.copy() 

562 if architectures is None: 562 ↛ 563line 562 didn't jump to line 563 because the condition on line 562 was never true

563 return clone 

564 if packages_s is not None: 

565 for arch in architectures: 

566 clone[arch] = {x for x in nuninst[arch] if x in packages_s[arch]} 

567 clone[arch + "+all"] = { 

568 x for x in nuninst[arch + "+all"] if x in packages_s[arch] 

569 } 

570 else: 

571 for arch in architectures: 

572 clone[arch] = set(nuninst[arch]) 

573 clone[arch + "+all"] = set(nuninst[arch + "+all"]) 

574 return clone 

575 

576 

577def test_installability( 

578 target_suite: TargetSuite, 

579 pkg_name: str, 

580 pkg_id: BinaryPackageId, 

581 broken: set[str], 

582 nuninst_arch: set[str] | None, 

583) -> None: 

584 """Test for installability of a package on an architecture 

585 

586 (pkg_name, pkg_version, pkg_arch) is the package to check. 

587 

588 broken is the set of broken packages. If p changes 

589 installability (e.g. goes from uninstallable to installable), 

590 broken will be updated accordingly. 

591 

592 If nuninst_arch is not None then it also updated in the same 

593 way as broken is. 

594 """ 

595 if not target_suite.is_installable(pkg_id): 

596 # if pkg_name not in broken: regression else: already broken 

597 broken.add(pkg_name) 

598 if nuninst_arch is not None: 

599 nuninst_arch.add(pkg_name) 

600 else: 

601 # if pkg_name in broken: # improvement else: already not broken 

602 broken.discard(pkg_name) 

603 if nuninst_arch is not None: 

604 nuninst_arch.discard(pkg_name) 

605 

606 

607def check_installability( 

608 target_suite: TargetSuite, 

609 binaries: dict[str, dict[str, BinaryPackage]], 

610 arch: str, 

611 updates: set[BinaryPackageId], 

612 check_archall: bool, 

613 nuninst: dict[str, set[str]], 

614) -> None: 

615 broken = nuninst[arch + "+all"] 

616 packages_t_a = binaries[arch] 

617 

618 for pkg_id in (x for x in updates if x.architecture == arch): 

619 name, version, parch = pkg_id 

620 if name not in packages_t_a: 

621 continue 

622 pkgdata = packages_t_a[name] 

623 if version != pkgdata.version: 

624 # Not the version in testing right now, ignore 

625 continue 

626 actual_arch = pkgdata.architecture 

627 nuninst_arch = None 

628 # only check arch:all packages if requested 

629 if check_archall or actual_arch != "all": 

630 nuninst_arch = nuninst[parch] 

631 elif actual_arch == "all": 631 ↛ 633line 631 didn't jump to line 633 because the condition on line 631 was always true

632 nuninst[parch].discard(name) 

633 test_installability(target_suite, name, pkg_id, broken, nuninst_arch) 

634 

635 

636def possibly_compressed( 

637 path: str, *, permitted_compressions: list[str] | None = None 

638) -> str: 

639 """Find and select a (possibly compressed) variant of a path 

640 

641 If the given path exists, it will be returned 

642 

643 :param path: The base path. 

644 :param permitted_compressions: Alternative extensions to look for. Defaults to "gz" and "xz". 

645 :return: The path given possibly with one of the permitted extensions. 

646 :raises FileNotFoundError: if the path is not found 

647 """ 

648 if os.path.exists(path): 648 ↛ 650line 648 didn't jump to line 650 because the condition on line 648 was always true

649 return path 

650 if permitted_compressions is None: 

651 permitted_compressions = ["gz", "xz"] 

652 for ext in permitted_compressions: 

653 cpath = f"{path}.{ext}" 

654 if os.path.exists(cpath): 

655 return cpath 

656 raise FileNotFoundError( 

657 errno.ENOENT, os.strerror(errno.ENOENT), path 

658 ) # pragma: no cover 

659 

660 

661def create_provides_map( 

662 packages: dict[str, BinaryPackage], 

663) -> dict[str, set[tuple[str, str]]]: 

664 """Create a provides map from a map binary package names and their BinaryPackage objects 

665 

666 :param packages: A dict mapping binary package names to their BinaryPackage object 

667 :return: A provides map 

668 """ 

669 # create provides 

670 provides = defaultdict(set) 

671 

672 for pkg, dpkg in packages.items(): 

673 # register virtual packages and real packages that provide 

674 # them 

675 for provided_pkg, provided_version, _ in dpkg.provides: 

676 provides[provided_pkg].add((pkg, provided_version)) 

677 

678 return provides 

679 

680 

681def read_release_file(suite_dir: str) -> "TagSection[str]": 

682 """Parses a given "Release" file 

683 

684 :param suite_dir: The directory to the suite 

685 :return: A dict of the first (and only) paragraph in an Release file 

686 """ 

687 release_file = os.path.join(suite_dir, "Release") 

688 with open(release_file) as fd: 

689 tag_file = iter(apt_pkg.TagFile(fd)) 

690 result = next(tag_file) 

691 if next(tag_file, None) is not None: # pragma: no cover 

692 raise TypeError("%s has more than one paragraph" % release_file) 

693 return result 

694 

695 

696def read_sources_file( 

697 filename: str, 

698 sources: dict[str, SourcePackage] | None = None, 

699 add_faux: bool = True, 

700 intern: Callable[[str], str] = sys.intern, 

701) -> dict[str, SourcePackage]: 

702 """Parse a single Sources file into a hash 

703 

704 Parse a single Sources file into a dict mapping a source package 

705 name to a SourcePackage object. If there are multiple source 

706 packages with the same version, then highest versioned source 

707 package (that is not marked as "Extra-Source-Only") is the 

708 version kept in the dict. 

709 

710 :param filename: Path to the Sources file. Can be compressed by any algorithm supported by apt_pkg.TagFile 

711 :param sources: Optional dict to add the packages to. If given, this is also the value returned. 

712 :param add_faux: Add a faux arch:all binary for each source that claims it has arch:all 

713 :param intern: Internal optimisation / implementation detail to avoid python's "LOAD_GLOBAL" instruction in a loop 

714 :return: mapping from names to a source package 

715 """ 

716 if sources is None: 

717 sources = {} 

718 

719 tag_file = apt_pkg.TagFile(filename) 

720 get_field = tag_file.section.get 

721 step = tag_file.step 

722 

723 while step(): 

724 if get_field("Extra-Source-Only", "no") == "yes": 

725 # Ignore sources only referenced by Built-Using 

726 continue 

727 pkg = get_field("Package") 

728 ver = get_field("Version") 

729 # There may be multiple versions of the source package 

730 # (in unstable) if some architectures have out-of-date 

731 # binaries. We only ever consider the source with the 

732 # largest version for migration. 

733 if pkg in sources and apt_pkg.version_compare(sources[pkg].version, ver) > 0: 

734 continue 

735 maint = get_field("Maintainer") 

736 if maint: 736 ↛ 738line 736 didn't jump to line 738 because the condition on line 736 was always true

737 maint = intern(maint.strip()) 

738 section = get_field("Section") 

739 if section: 739 ↛ 742line 739 didn't jump to line 742 because the condition on line 739 was always true

740 section = intern(section.strip()) 

741 build_deps_arch: str | None 

742 build_deps_arch = ", ".join( 

743 x 

744 for x in (get_field("Build-Depends"), get_field("Build-Depends-Arch")) 

745 if x is not None 

746 ) 

747 if build_deps_arch != "": 

748 build_deps_arch = sys.intern(build_deps_arch) 

749 else: 

750 build_deps_arch = None 

751 build_deps_indep = get_field("Build-Depends-Indep") 

752 if build_deps_indep is not None: 

753 build_deps_indep = sys.intern(build_deps_indep) 

754 

755 # Adding arch:all packages to the list of binaries already to be able 

756 # to check for them later. Helps mitigate bug 887060 and is the 

757 # (partial?) answer to bug 1064428. 

758 binaries: set[BinaryPackageId] = set() 

759 if add_faux and "all" in get_field("Architecture", "").split(): 

760 # the value "faux" in arch:faux is used elsewhere, so keep in sync 

761 pkg_id = BinaryPackageId(pkg + "-faux", intern("0~~~~"), intern("faux")) 

762 binaries.add(pkg_id) 

763 

764 sources[intern(pkg)] = SourcePackage( 

765 intern(pkg), 

766 intern(ver), 

767 section, 

768 binaries, 

769 maint, 

770 False, 

771 build_deps_arch, 

772 build_deps_indep, 

773 get_field("Testsuite", "").split(), 

774 get_field("Testsuite-Triggers", "").replace(",", "").split(), 

775 ) 

776 return sources 

777 

778 

779def _check_and_update_packages( 

780 packages: list[BinaryPackage], 

781 package: BinaryPackage, 

782 archqual: str | None, 

783 build_depends: bool, 

784) -> None: 

785 """Helper for get_dependency_solvers 

786 

787 This method updates the list of packages with a given package if that 

788 package is a valid (Build-)Depends. 

789 

790 :param packages: which packages are to be updated 

791 :param archqual: Architecture qualifier 

792 :param build_depends: If True, check if the "package" parameter is valid as a build-dependency. 

793 """ 

794 

795 # See also bug #971739 and #1059929 

796 if archqual is None: 

797 packages.append(package) 

798 elif archqual == "native" and build_depends: 

799 # Multi-arch handling for build-dependencies 

800 # - :native is ok always 

801 packages.append(package) 

802 elif archqual == "any" and package.multi_arch == "allowed": 802 ↛ 805line 802 didn't jump to line 805 because the condition on line 802 was never true

803 # Multi-arch handling for both build-dependencies and regular dependencies 

804 # - :any is ok iff the target has "M-A: allowed" 

805 packages.append(package) 

806 

807 

808class GetDependencySolversProto(Protocol): 

809 def __call__( 809 ↛ exitline 809 didn't jump to the function exit

810 self, 

811 block: list[tuple[str, str, str]], 

812 binaries_s_a: dict[str, BinaryPackage], 

813 provides_s_a: dict[str, set[tuple[str, str]]], 

814 *, 

815 build_depends: bool = False, 

816 empty_set: Any = frozenset(), 

817 ) -> list[BinaryPackage]: ... 

818 

819 

820def get_dependency_solvers( 

821 block: list[tuple[str, str, str]], 

822 binaries_s_a: dict[str, BinaryPackage], 

823 provides_s_a: dict[str, set[tuple[str, str]]], 

824 *, 

825 build_depends: bool = False, 

826 empty_set: Any = frozenset(), 

827) -> list[BinaryPackage]: 

828 """Find the packages which satisfy a dependency block 

829 

830 This method returns the list of packages which satisfy a dependency 

831 block (as returned by apt_pkg.parse_depends) in a package table 

832 for a given suite and architecture (a la self.binaries[suite][arch]) 

833 

834 It can also handle build-dependency relations if the named parameter 

835 "build_depends" is set to True. In this case, block should be based 

836 on the return value from apt_pkg.parse_src_depends. 

837 

838 :param block: The dependency block as parsed by apt_pkg.parse_depends (or apt_pkg.parse_src_depends 

839 if the "build_depends" is True) 

840 :param binaries_s_a: Mapping of package names to the relevant BinaryPackage 

841 :param provides_s_a: Mapping of package names to their providers (as generated by parse_provides) 

842 :param build_depends: If True, treat the "block" parameter as a build-dependency relation rather than 

843 a regular dependency relation. 

844 :param empty_set: Internal implementation detail / optimisation 

845 :return: package names solving the relation 

846 """ 

847 packages: list[BinaryPackage] = [] 

848 

849 # for every package, version and operation in the block 

850 for name, version, op in block: 

851 if ":" in name: 

852 name, archqual = name.split(":", 1) 

853 else: 

854 archqual = None 

855 

856 # look for the package in unstable 

857 if name in binaries_s_a: 

858 package = binaries_s_a[name] 

859 # check the versioned dependency and architecture qualifier 

860 # (if present) 

861 if (op == "" and version == "") or apt_pkg.check_dep( 

862 package.version, op, version 

863 ): 

864 _check_and_update_packages(packages, package, archqual, build_depends) 

865 

866 # look for the package in the virtual packages list and loop on them 

867 for prov, prov_version in provides_s_a.get(name, empty_set): 

868 assert prov in binaries_s_a 

869 package = binaries_s_a[prov] 

870 # See Policy Manual §7.5 

871 if (op == "" and version == "") or ( 

872 prov_version != "" and apt_pkg.check_dep(prov_version, op, version) 

873 ): 

874 _check_and_update_packages(packages, package, archqual, build_depends) 

875 

876 return packages 

877 

878 

879def invalidate_excuses( 

880 excuses: dict[str, "Excuse"], 

881 valid: set[str], 

882 invalid: set[str], 

883 invalidated: set[str], 

884) -> None: 

885 """Invalidate impossible excuses 

886 

887 This method invalidates the impossible excuses, which depend 

888 on invalid excuses. The two parameters contains the sets of 

889 `valid' and `invalid' excuses. 

890 """ 

891 # make a list of all packages (source and binary) that are present in the 

892 # excuses we have 

893 excuses_packages: dict[PackageId | BinaryPackageId, set[str]] = defaultdict(set) 

894 for exc in excuses.values(): 

895 for arch in exc.packages: 

896 for pkg_arch_id in exc.packages[arch]: 

897 # note that the same package can be in multiple excuses 

898 # eg. when unstable and TPU have the same packages 

899 excuses_packages[pkg_arch_id].add(exc.name) 

900 

901 # create dependencies between excuses based on packages 

902 excuses_rdeps = defaultdict(set) 

903 for exc in excuses.values(): 

904 # Note that excuses_rdeps is only populated by dependencies generated 

905 # based on packages below. There are currently no dependencies between 

906 # excuses that are added directly, so this is ok. 

907 

908 for pkg_dep in exc.depends_packages: 

909 # set of excuses, each of which can satisfy this specific 

910 # dependency 

911 # if there is a dependency on a package for which no 

912 # excuses exist (e.g. a cruft binary), the set will 

913 # contain an ImpossibleDependencyState 

914 dep_exc: set[str | DependencyState] = set() 

915 for pkg_dep_id in cast(set[BinaryPackageId], pkg_dep.deps): 

916 pkg_excuses = excuses_packages[pkg_dep_id] 

917 # if the dependency isn't found, we get an empty set 

918 if pkg_excuses == frozenset(): 

919 imp_dep = ImpossibleDependencyState( 

920 PolicyVerdict.REJECTED_PERMANENTLY, "%s" % (pkg_dep_id.name) 

921 ) 

922 dep_exc.add(imp_dep) 

923 

924 else: 

925 dep_exc |= pkg_excuses 

926 for e in pkg_excuses: 

927 excuses_rdeps[e].add(exc.name) 

928 if not exc.add_dependency(dep_exc, pkg_dep.spec): 

929 valid.discard(exc.name) 

930 invalid.add(exc.name) 

931 

932 # loop on the invalid excuses 

933 # Convert invalid to a list for deterministic results 

934 invalid2 = sorted(invalid) 

935 for ename in iter_except(invalid2.pop, IndexError): 

936 invalidated.add(ename) 

937 # if there is no reverse dependency, skip the item 

938 if ename not in excuses_rdeps: 

939 continue 

940 

941 rdep_verdict = PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM 

942 if excuses[ename].policy_verdict.is_blocked: 

943 rdep_verdict = PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM 

944 

945 # loop on the reverse dependencies 

946 for x in sorted(excuses_rdeps[ename]): 

947 exc = excuses[x] 

948 # if the item is valid and it is not marked as `forced', then we 

949 # invalidate this specific dependency 

950 if x in valid and not exc.forced: 

951 # mark this specific dependency as invalid 

952 still_valid = exc.invalidate_dependency(ename, rdep_verdict) 

953 

954 # if there are no alternatives left for this dependency, 

955 # invalidate the excuse 

956 if not still_valid: 

957 valid.discard(x) 

958 invalid2.append(x) 

959 

960 

961def compile_nuninst( 

962 target_suite: TargetSuite, architectures: list[str], nobreakall_arches: list[str] 

963) -> dict[str, set[str]]: 

964 """Compile a nuninst dict from the current testing 

965 

966 :param target_suite: The target suite 

967 :param architectures: Which architectures to check 

968 :param nobreakall_arches: Which architectures where arch:all packages must be installable 

969 """ 

970 nuninst: dict[str, set[str]] = {} 

971 binaries_t = target_suite.binaries 

972 

973 # for all the architectures 

974 for arch in architectures: 

975 # if it is in the nobreakall ones, check arch-independent packages too 

976 check_archall = arch in nobreakall_arches 

977 

978 # check all the packages for this architecture 

979 nuninst[arch] = set() 

980 packages_t_a = binaries_t[arch] 

981 for pkg_name, pkg_data in packages_t_a.items(): 

982 r = target_suite.is_installable(pkg_data.pkg_id) 

983 if not r: 

984 nuninst[arch].add(pkg_name) 

985 

986 # if they are not required, remove architecture-independent packages 

987 nuninst[arch + "+all"] = nuninst[arch].copy() 

988 if not check_archall: 

989 for pkg_name in nuninst[arch + "+all"]: 

990 pkg_data = packages_t_a[pkg_name] 

991 if pkg_data.architecture == "all": 

992 nuninst[arch].remove(pkg_name) 

993 

994 return nuninst 

995 

996 

997def is_smooth_update_allowed( 

998 binary: BinaryPackage, smooth_updates: list[str], hints: "HintCollection" 

999) -> bool: 

1000 if "ALL" in smooth_updates: 1000 ↛ 1001line 1000 didn't jump to line 1001 because the condition on line 1000 was never true

1001 return True 

1002 section = binary.section.split("/")[-1] 

1003 if section in smooth_updates: 

1004 return True 

1005 if hints.search( 1005 ↛ 1009line 1005 didn't jump to line 1009 because the condition on line 1005 was never true

1006 "allow-smooth-update", package=binary.source, version=binary.source_version 

1007 ): 

1008 # note that this needs to match the source version *IN TESTING* 

1009 return True 

1010 return False 

1011 

1012 

1013def find_smooth_updateable_binaries( 

1014 binaries_to_check: list[BinaryPackageId], 

1015 source_data: SourcePackage, 

1016 pkg_universe: "BinaryPackageUniverse", 

1017 target_suite: TargetSuite, 

1018 binaries_t: dict[str, dict[str, BinaryPackage]], 

1019 binaries_s: dict[str, dict[str, BinaryPackage]], 

1020 removals: set[BinaryPackageId] | frozenset[BinaryPackageId], 

1021 smooth_updates: list[str], 

1022 hints: "HintCollection", 

1023) -> set[BinaryPackageId]: 

1024 check: set[BinaryPackageId] = set() 

1025 smoothbins: set[BinaryPackageId] = set() 

1026 

1027 for check_pkg_id in binaries_to_check: 

1028 binary, _, parch = check_pkg_id 

1029 

1030 cruftbins: set[BinaryPackageId] = set() 

1031 

1032 # Not a candidate for smooth up date (newer non-cruft version in unstable) 

1033 if binary in binaries_s[parch]: 

1034 if binaries_s[parch][binary].source_version == source_data.version: 

1035 continue 

1036 cruftbins.add(binaries_s[parch][binary].pkg_id) 

1037 

1038 # Maybe a candidate (cruft or removed binary): check if config allows us to smooth update it. 

1039 if is_smooth_update_allowed(binaries_t[parch][binary], smooth_updates, hints): 

1040 # if the package has reverse-dependencies which are 

1041 # built from other sources, it's a valid candidate for 

1042 # a smooth update. if not, it may still be a valid 

1043 # candidate if one if its r-deps is itself a candidate, 

1044 # so note it for checking later 

1045 rdeps = set(pkg_universe.reverse_dependencies_of(check_pkg_id)) 

1046 # We ignore all binaries listed in "removals" as we 

1047 # assume they will leave at the same time as the 

1048 # given package. 

1049 rdeps.difference_update(removals, binaries_to_check) 

1050 

1051 smooth_update_it = False 

1052 if target_suite.any_of_these_are_in_the_suite(rdeps): 

1053 combined = set(smoothbins) 

1054 combined.add(check_pkg_id) 

1055 for rdep in rdeps: 

1056 # each dependency clause has a set of possible 

1057 # alternatives that can satisfy that dependency. 

1058 # if any of them is outside the set of smoothbins, the 

1059 # dependency can be satisfied even if this binary was 

1060 # removed, so there is no need to keep it around for a 

1061 # smooth update 

1062 # if not, only this binary can satisfy the dependency, so 

1063 # we should keep it around until the rdep is no longer in 

1064 # testing 

1065 for dep_clause in pkg_universe.dependencies_of(rdep): 

1066 # filter out cruft binaries from unstable, because 

1067 # they will not be added to the set of packages that 

1068 # will be migrated 

1069 if dep_clause - cruftbins <= combined: 

1070 smooth_update_it = True 

1071 break 

1072 

1073 if smooth_update_it: 

1074 smoothbins = combined 

1075 else: 

1076 check.add(check_pkg_id) 

1077 

1078 # check whether we should perform a smooth update for 

1079 # packages which are candidates but do not have r-deps 

1080 # outside of the current source 

1081 while 1: 

1082 found_any = False 

1083 for candidate_pkg_id in check: 

1084 rdeps = pkg_universe.reverse_dependencies_of(candidate_pkg_id) 

1085 if not rdeps.isdisjoint(smoothbins): 

1086 smoothbins.add(candidate_pkg_id) 

1087 found_any = True 

1088 if not found_any: 

1089 break 

1090 check = {x for x in check if x not in smoothbins} 

1091 

1092 return smoothbins 

1093 

1094 

1095def find_newer_binaries( 

1096 suite_info: Suites, pkg: BinaryPackage, add_source_for_dropped_bin: bool = False 

1097) -> list[tuple[PackageId, Suite]]: 

1098 """ 

1099 Find newer binaries for pkg in any of the source suites. 

1100 

1101 :param pkg: BinaryPackage (is assumed to be in the target suite) 

1102 

1103 :param add_source_for_dropped_bin: If True, newer versions of the 

1104 source of pkg will be added if they don't have the binary pkg 

1105 

1106 :return: the newer binaries (or sources) and their suites 

1107 """ 

1108 source = pkg.source 

1109 newer_versions: list[tuple[PackageId, Suite]] = [] 

1110 for suite in suite_info: 

1111 if suite.suite_class == SuiteClass.TARGET_SUITE: 

1112 continue 

1113 

1114 suite_binaries_on_arch = suite.binaries.get(pkg.pkg_id.architecture) 

1115 if not suite_binaries_on_arch: 1115 ↛ 1116line 1115 didn't jump to line 1116 because the condition on line 1115 was never true

1116 continue 

1117 

1118 newerbin = None 

1119 if pkg.pkg_id.package_name in suite_binaries_on_arch: 

1120 newerbin = suite_binaries_on_arch[pkg.pkg_id.package_name] 

1121 if suite.is_cruft(newerbin): 

1122 # We pretend the cruft binary doesn't exist. 

1123 # We handle this as if the source didn't have the binary 

1124 # (see below) 

1125 newerbin = None 

1126 elif apt_pkg.version_compare(newerbin.version, pkg.version) <= 0: 

1127 continue 

1128 else: 

1129 if source not in suite.sources: 

1130 # bin and source not in suite: no newer version 

1131 continue 

1132 

1133 if not newerbin: 

1134 if not add_source_for_dropped_bin: 1134 ↛ 1135line 1134 didn't jump to line 1135 because the condition on line 1134 was never true

1135 continue 

1136 # We only get here if there is a newer version of the source, 

1137 # which doesn't have the binary anymore (either it doesn't 

1138 # exist, or it's cruft and we pretend it doesn't exist). 

1139 # Add the new source instead. 

1140 nsrc = suite.sources[source] 

1141 n_id = PackageId(source, nsrc.version, "source") 

1142 overs = pkg.source_version 

1143 if apt_pkg.version_compare(nsrc.version, overs) <= 0: 

1144 continue 

1145 else: 

1146 n_id = newerbin.pkg_id 

1147 

1148 newer_versions.append((n_id, suite)) 

1149 

1150 return newer_versions 

1151 

1152 

1153def parse_provides( 

1154 provides_raw: str, 

1155 pkg_id: BinaryPackageId | None = None, 

1156 logger: logging.Logger | None = None, 

1157) -> list[tuple[str, str, str]]: 

1158 parts = apt_pkg.parse_depends(provides_raw, False) 

1159 nprov = [] 

1160 for or_clause in parts: 

1161 if len(or_clause) != 1: # pragma: no cover 

1162 if logger is not None: 

1163 msg = "Ignoring invalid provides in %s: Alternatives [%s]" 

1164 logger.warning(msg, str(pkg_id), str(or_clause)) 

1165 continue 

1166 for part in or_clause: 

1167 provided, provided_version, op = part 

1168 if op != "" and op != "=": # pragma: no cover 

1169 if logger is not None: 

1170 msg = "Ignoring invalid provides in %s: %s (%s %s)" 

1171 logger.warning(msg, str(pkg_id), provided, op, provided_version) 

1172 continue 

1173 provided = sys.intern(provided) 

1174 provided_version = sys.intern(provided_version) 

1175 part = (provided, provided_version, sys.intern(op)) 

1176 nprov.append(part) 

1177 return nprov 

1178 

1179 

1180def parse_builtusing( 

1181 builtusing_raw: str, 

1182 pkg_id: BinaryPackageId | None = None, 

1183 logger: logging.Logger | None = None, 

1184) -> list[tuple[str, str]]: 

1185 parts = apt_pkg.parse_depends(builtusing_raw, False) 

1186 nbu = [] 

1187 for or_clause in parts: 

1188 if len(or_clause) != 1: # pragma: no cover 

1189 if logger is not None: 

1190 msg = "Ignoring invalid builtusing in %s: Alternatives [%s]" 

1191 logger.warning(msg, str(pkg_id), str(or_clause)) 

1192 continue 

1193 for part in or_clause: 

1194 bu, bu_version, op = part 

1195 if op != "=": # pragma: no cover 

1196 if logger is not None: 

1197 msg = "Ignoring invalid builtusing in %s: %s (%s %s)" 

1198 logger.warning(msg, str(pkg_id), bu, op, bu_version) 

1199 continue 

1200 bu = sys.intern(bu) 

1201 bu_version = sys.intern(bu_version) 

1202 nbu.append((bu, bu_version)) 

1203 return nbu 

1204 

1205 

1206def parse_option( 

1207 options: "optparse.Values", 

1208 option_name: str, 

1209 default: Any | None = None, 

1210 to_bool: bool = False, 

1211 to_int: bool = False, 

1212 day_to_sec: bool = False, 

1213) -> None: 

1214 """Ensure the option exist and has a sane value 

1215 

1216 :param options: dict with options 

1217 

1218 :param option_name: string with the name of the option 

1219 

1220 :param default: the default value for the option 

1221 

1222 :param to_int: convert the input to int (defaults to sys.maxsize) 

1223 

1224 :param to_bool: convert the input to bool 

1225 

1226 :param day_to_sec: convert the input from days to seconds (implies to_int=True) 

1227 """ 

1228 value = getattr(options, option_name, default) 

1229 

1230 # Option was provided with no value (or default is '') so pick up the default 

1231 if value == "": 

1232 value = default 

1233 

1234 if (to_int or day_to_sec) and value in (None, ""): 

1235 value = sys.maxsize 

1236 

1237 if day_to_sec: 

1238 value = int(float(value) * 24 * 60 * 60) # type: ignore[arg-type] 

1239 

1240 if to_int: 

1241 value = int(value) # type: ignore[arg-type] 

1242 

1243 if to_bool: 

1244 if value and ( 

1245 isinstance(value, bool) or value.lower() in ("yes", "y", "true", "t", "1") 

1246 ): 

1247 value = True 

1248 else: 

1249 value = False 

1250 

1251 setattr(options, option_name, value)