Coverage for britney2/utils.py: 91%

469 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-03-23 07:34 +0000

1# -*- coding: utf-8 -*- 

2 

3# Refactored parts from britney.py, which is/was: 

4# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org> 

5# Andreas Barth <aba@debian.org> 

6# Fabio Tranchitella <kobold@debian.org> 

7# Copyright (C) 2010-2012 Adam D. Barratt <adsb@debian.org> 

8# Copyright (C) 2012 Niels Thykier <niels@thykier.net> 

9# 

10# New portions 

11# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org> 

12 

13# This program is free software; you can redistribute it and/or modify 

14# it under the terms of the GNU General Public License as published by 

15# the Free Software Foundation; either version 2 of the License, or 

16# (at your option) any later version. 

17 

18# This program is distributed in the hope that it will be useful, 

19# but WITHOUT ANY WARRANTY; without even the implied warranty of 

20# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

21# GNU General Public License for more details. 

22 

23 

24import errno 

25import logging 

26import optparse 

27import os 

28import sys 

29import time 

30from collections import defaultdict 

31from collections.abc import Callable, Container, MutableSet, Iterable, Iterator, Mapping 

32from datetime import datetime 

33from functools import partial 

34from itertools import chain, filterfalse 

35from typing import ( 

36 IO, 

37 TYPE_CHECKING, 

38 Any, 

39 Literal, 

40 Optional, 

41 Protocol, 

42 TypeVar, 

43 Union, 

44 cast, 

45 overload, 

46) 

47 

48import apt_pkg 

49import yaml 

50 

51from britney2 import ( 

52 BinaryPackage, 

53 BinaryPackageId, 

54 PackageId, 

55 SourcePackage, 

56 Suite, 

57 SuiteClass, 

58 Suites, 

59 TargetSuite, 

60) 

61from britney2.excusedeps import DependencyState, ImpossibleDependencyState 

62from britney2.policies import PolicyVerdict 

63 

64if TYPE_CHECKING: 64 ↛ 66line 64 didn't jump to line 66, because the condition on line 64 was never true

65 

66 from _typeshed import SupportsRichComparisonT 

67 from apt_pkg import TagSection 

68 

69 from .excuse import Excuse 

70 from .hints import HintCollection 

71 from .installability.universe import BinaryPackageUniverse 

72 from .migrationitem import MigrationItem, MigrationItemFactory 

73 

74_T = TypeVar("_T") 

75 

76 

77class MigrationConstraintException(Exception): 

78 pass 

79 

80 

81@overload 

82def ifilter_except( 82 ↛ exitline 82 didn't jump to the function exit

83 container: Container[_T], iterable: Literal[None] = None 

84) -> "partial[filterfalse[_T]]": ... 

85 

86 

87@overload 

88def ifilter_except( 88 ↛ exitline 88 didn't jump to the function exit

89 container: Container[_T], iterable: Iterable[_T] 

90) -> "filterfalse[_T]": ... 

91 

92 

93def ifilter_except( 

94 container: Container[_T], iterable: Optional[Iterable[_T]] = None 

95) -> Union["filterfalse[_T]", "partial[filterfalse[_T]]"]: 

96 """Filter out elements in container 

97 

98 If given an iterable it returns a filtered iterator, otherwise it 

99 returns a function to generate filtered iterators. The latter is 

100 useful if the same filter has to be (re-)used on multiple 

101 iterators that are not known on beforehand. 

102 """ 

103 if iterable is not None: 103 ↛ 104line 103 didn't jump to line 104, because the condition on line 103 was never true

104 return filterfalse(container.__contains__, iterable) 

105 return cast( 

106 "partial[filterfalse[_T]]", partial(filterfalse, container.__contains__) 

107 ) 

108 

109 

110@overload 

111def ifilter_only(container: Container[_T], iterable: Iterable[_T]) -> "filter[_T]": ... 111 ↛ exitline 111 didn't return from function 'ifilter_only'

112 

113 

114@overload 

115def ifilter_only(container: Container[_T], iterable: None) -> "partial[filter[_T]]": ... 115 ↛ exitline 115 didn't return from function 'ifilter_only'

116 

117 

118def ifilter_only( 

119 container: Container[_T], iterable: Optional[Iterable[_T]] = None 

120) -> Union["filter[_T]", "partial[filter[_T]]"]: 

121 """Filter out elements in which are not in container 

122 

123 If given an iterable it returns a filtered iterator, otherwise it 

124 returns a function to generate filtered iterators. The latter is 

125 useful if the same filter has to be (re-)used on multiple 

126 iterators that are not known on beforehand. 

127 """ 

128 if iterable is not None: 128 ↛ 130line 128 didn't jump to line 130, because the condition on line 128 was never false

129 return filter(container.__contains__, iterable) 

130 return partial(filter, container.__contains__) 

131 

132 

133# iter_except is from the "itertools" recipe 

134def iter_except( 

135 func: Callable[[], _T], 

136 exception: type[BaseException] | tuple[type[BaseException], ...], 

137 first: Any = None, 

138) -> Iterator[_T]: # pragma: no cover - itertools recipe function 

139 """Call a function repeatedly until an exception is raised. 

140 

141 Converts a call-until-exception interface to an iterator interface. 

142 Like __builtin__.iter(func, sentinel) but uses an exception instead 

143 of a sentinel to end the loop. 

144 

145 Examples: 

146 bsddbiter = iter_except(db.next, bsddb.error, db.first) 

147 heapiter = iter_except(functools.partial(heappop, h), IndexError) 

148 dictiter = iter_except(d.popitem, KeyError) 

149 dequeiter = iter_except(d.popleft, IndexError) 

150 queueiter = iter_except(q.get_nowait, Queue.Empty) 

151 setiter = iter_except(s.pop, KeyError) 

152 

153 """ 

154 try: 

155 if first is not None: 

156 yield first() 

157 while 1: 

158 yield func() 

159 except exception: 

160 pass 

161 

162 

163def log_and_format_old_libraries( 

164 logger: logging.Logger, libs: list["MigrationItem"] 

165) -> None: 

166 """Format and log old libraries in a table (no header)""" 

167 libraries: dict[str, list[str]] = {} 

168 for i in libs: 

169 pkg = i.package 

170 if pkg in libraries: 

171 libraries[pkg].append(i.architecture) 

172 else: 

173 libraries[pkg] = [i.architecture] 

174 

175 for lib in sorted(libraries): 

176 logger.info(" %s: %s", lib, " ".join(libraries[lib])) 

177 

178 

179def compute_reverse_tree( 

180 pkg_universe: "BinaryPackageUniverse", affected: set[BinaryPackageId] 

181) -> None: 

182 """Calculate the full dependency tree for a set of packages 

183 

184 This method returns the full dependency tree for a given set of 

185 packages. The first argument is an instance of the BinaryPackageUniverse 

186 and the second argument are a set of BinaryPackageId. 

187 

188 The set of affected packages will be updated in place and must 

189 therefore be mutable. 

190 """ 

191 remain = list(affected) 

192 while remain: 

193 pkg_id = remain.pop() 

194 new_pkg_ids = pkg_universe.reverse_dependencies_of(pkg_id) - affected 

195 affected.update(new_pkg_ids) 

196 remain.extend(new_pkg_ids) 

197 

198 

199def add_transitive_dependencies_flatten( 

200 pkg_universe: "BinaryPackageUniverse", initial_set: MutableSet[BinaryPackageId] 

201) -> None: 

202 """Find and include all transitive dependencies 

203 

204 This method updates the initial_set parameter to include all transitive 

205 dependencies. The first argument is an instance of the BinaryPackageUniverse 

206 and the second argument are a set of BinaryPackageId. 

207 

208 The set of initial packages will be updated in place and must 

209 therefore be mutable. 

210 """ 

211 remain = list(initial_set) 

212 while remain: 

213 pkg_id = remain.pop() 

214 new_pkg_ids = { 

215 x 

216 for x in chain.from_iterable(pkg_universe.dependencies_of(pkg_id)) 

217 if x not in initial_set 

218 } 

219 initial_set |= new_pkg_ids 

220 remain.extend(new_pkg_ids) 

221 

222 

223def write_nuninst(filename: str, nuninst: dict[str, set[str]]) -> None: 

224 """Write the non-installable report 

225 

226 Write the non-installable report derived from "nuninst" to the 

227 file denoted by "filename". 

228 """ 

229 with open(filename, "w", encoding="utf-8") as f: 

230 # Having two fields with (almost) identical dates seems a bit 

231 # redundant. 

232 f.write( 

233 "Built on: " 

234 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

235 + "\n" 

236 ) 

237 f.write( 

238 "Last update: " 

239 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

240 + "\n\n" 

241 ) 

242 for k in nuninst: 

243 f.write("%s: %s\n" % (k, " ".join(nuninst[k]))) 

244 

245 

246def read_nuninst(filename: str, architectures: set[str]) -> dict[str, set[str]]: 

247 """Read the non-installable report 

248 

249 Read the non-installable report from the file denoted by 

250 "filename" and return it. Only architectures in "architectures" 

251 will be included in the report. 

252 """ 

253 nuninst: dict[str, set[str]] = {} 

254 with open(filename, encoding="ascii") as f: 

255 for r in f: 

256 if ":" not in r: 

257 continue 

258 arch, packages = r.strip().split(":", 1) 

259 if arch.split("+", 1)[0] in architectures: 

260 nuninst[arch] = set(packages.split()) 

261 return nuninst 

262 

263 

264def newly_uninst( 

265 nuold: dict[str, set[str]], nunew: dict[str, set[str]] 

266) -> dict[str, list[str]]: 

267 """Return a nuninst statistic with only new uninstallable packages 

268 

269 This method subtracts the uninstallable packages of the statistic 

270 "nunew" from the statistic "nuold". 

271 

272 It returns a dictionary with the architectures as keys and the list 

273 of uninstallable packages as values. If there are no regressions 

274 on a given architecture, then the architecture will be omitted in 

275 the result. Accordingly, if none of the architectures have 

276 regressions an empty directory is returned. 

277 """ 

278 res: dict[str, list[str]] = {} 

279 for arch in ifilter_only(nunew, nuold): 

280 arch_nuninst = [x for x in nunew[arch] if x not in nuold[arch]] 

281 # Leave res empty if there are no newly uninst packages 

282 if arch_nuninst: 

283 res[arch] = arch_nuninst 

284 return res 

285 

286 

287def format_and_log_uninst( 

288 logger: logging.Logger, 

289 architectures: Iterable[str], 

290 nuninst: Mapping[str, Iterable[str]], 

291 *, 

292 loglevel: int = logging.INFO, 

293) -> None: 

294 """Emits the uninstallable packages to the log 

295 

296 An example of the output string is: 

297 * i386: broken-pkg1, broken-pkg2 

298 

299 Note that if there is no uninstallable packages, then nothing is emitted. 

300 """ 

301 for arch in architectures: 

302 if arch in nuninst and nuninst[arch]: 

303 msg = " * %s: %s" % (arch, ", ".join(sorted(nuninst[arch]))) 

304 logger.log(loglevel, msg) 

305 

306 

307class Sorted(Protocol): 

308 def __call__( 308 ↛ exitline 308 didn't jump to the function exit

309 self, 

310 iterable: Iterable["SupportsRichComparisonT"], 

311 /, 

312 *, 

313 key: None = None, 

314 reverse: bool = False, 

315 ) -> list["SupportsRichComparisonT"]: ... 

316 

317 

318def write_heidi( 

319 filename: str, 

320 target_suite: TargetSuite, 

321 *, 

322 outofsync_arches: frozenset[str] = frozenset(), 

323 sorted: Sorted = sorted, 

324) -> None: 

325 """Write the output HeidiResult 

326 

327 This method write the output for Heidi, which contains all the 

328 binary packages and the source packages in the form: 

329 

330 <pkg-name> <pkg-version> <pkg-architecture> <pkg-section> 

331 <src-name> <src-version> source <src-section> 

332 

333 The file is written as "filename" using the sources and packages 

334 from the "target_suite" parameter. 

335 

336 outofsync_arches: If given, it is a set of architectures marked 

337 as "out of sync". The output file may exclude some out of date 

338 arch:all packages for those architectures to reduce the noise. 

339 

340 The "X=X" parameters are optimizations to avoid "load global" in 

341 the loops. 

342 """ 

343 sources_t = target_suite.sources 

344 packages_t = target_suite.binaries 

345 

346 with open(filename, "w", encoding="ascii") as f: 

347 

348 # write binary packages 

349 for arch in sorted(packages_t): 

350 binaries = packages_t[arch] 

351 for pkg_name in sorted(binaries): 

352 pkg = binaries[pkg_name] 

353 pkgv = pkg.version 

354 pkgarch = pkg.architecture or "all" 

355 pkgsec = pkg.section or "faux" 

356 if pkgsec == "faux" or pkgsec.endswith("/faux"): 

357 # Faux package; not really a part of testing 

358 continue 

359 if ( 359 ↛ 371line 359 didn't jump to line 371

360 pkg.source_version 

361 and pkgarch == "all" 

362 and pkg.source_version != sources_t[pkg.source].version 

363 and arch in outofsync_arches 

364 ): 

365 # when architectures are marked as "outofsync", their binary 

366 # versions may be lower than those of the associated 

367 # source package in testing. the binary package list for 

368 # such architectures will include arch:all packages 

369 # matching those older versions, but we only want the 

370 # newer arch:all in testing 

371 continue 

372 f.write("%s %s %s %s\n" % (pkg_name, pkgv, pkgarch, pkgsec)) 

373 

374 # write sources 

375 for src_name in sorted(sources_t): 

376 src = sources_t[src_name] 

377 srcv = src.version 

378 srcsec = src.section or "unknown" 

379 if srcsec == "faux" or srcsec.endswith("/faux"): 

380 # Faux package; not really a part of testing 

381 continue 

382 f.write("%s %s source %s\n" % (src_name, srcv, srcsec)) 

383 

384 

385def write_heidi_delta(filename: str, all_selected: list["MigrationItem"]) -> None: 

386 """Write the output delta 

387 

388 This method writes the packages to be upgraded, in the form: 

389 <src-name> <src-version> 

390 or (if the source is to be removed): 

391 -<src-name> <src-version> 

392 

393 The order corresponds to that shown in update_output. 

394 """ 

395 with open(filename, "w", encoding="ascii") as fd: 

396 

397 fd.write("#HeidiDelta\n") 

398 

399 for item in all_selected: 

400 prefix = "" 

401 

402 if item.is_removal: 

403 prefix = "-" 

404 

405 if item.architecture == "source": 

406 fd.write("%s%s %s\n" % (prefix, item.package, item.version)) 

407 else: 

408 fd.write( 

409 "%s%s %s %s\n" 

410 % (prefix, item.package, item.version, item.architecture) 

411 ) 

412 

413 

414class Opener(Protocol): 

415 def __call__( 415 ↛ exitline 415 didn't jump to the function exit

416 self, file: str, mode: Literal["wt"], encoding: Literal["utf-8"] 

417 ) -> IO[Any]: ... 

418 

419 

420def write_excuses( 

421 excuses: Union[dict[str, "Excuse"], dict[PackageId, "Excuse"]], 

422 dest_file: str, 

423 output_format: Literal["yaml", "legacy-html"] = "yaml", 

424) -> None: 

425 """Write the excuses to dest_file 

426 

427 Writes a list of excuses in a specified output_format to the 

428 path denoted by dest_file. The output_format can either be "yaml" 

429 or "legacy-html". 

430 """ 

431 excuselist = sorted(excuses.values(), key=lambda x: x.sortkey()) 

432 if output_format == "yaml": 

433 os.makedirs(os.path.dirname(dest_file), exist_ok=True) 

434 opener: Opener = open # type: ignore[assignment] 

435 if dest_file.endswith(".xz"): 435 ↛ 436line 435 didn't jump to line 436, because the condition on line 435 was never true

436 import lzma 

437 

438 opener = lzma.open # type: ignore[assignment] 

439 elif dest_file.endswith(".gz"): 439 ↛ 440line 439 didn't jump to line 440, because the condition on line 439 was never true

440 import gzip 

441 

442 opener = gzip.open # type: ignore[assignment] 

443 with opener(dest_file, "wt", encoding="utf-8") as f: 

444 edatalist = [e.excusedata(excuses) for e in excuselist] 

445 excusesdata = { 

446 "sources": edatalist, 

447 "generated-date": datetime.utcnow(), 

448 } 

449 f.write( 

450 yaml.dump(excusesdata, default_flow_style=False, allow_unicode=True) 

451 ) 

452 elif output_format == "legacy-html": 

453 with open(dest_file, "w", encoding="utf-8") as f: 

454 f.write( 

455 '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/REC-html40/strict.dtd">\n' 

456 ) 

457 f.write("<html><head><title>excuses...</title>") 

458 f.write( 

459 '<meta http-equiv="Content-Type" content="text/html;charset=utf-8"></head><body>\n' 

460 ) 

461 f.write( 

462 "<p>Generated: " 

463 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

464 + "</p>\n" 

465 ) 

466 f.write("<ul>\n") 

467 for e in excuselist: 

468 f.write("<li>%s" % e.html(excuses)) 

469 f.write("</ul></body></html>\n") 

470 else: # pragma: no cover 

471 raise ValueError('Output format must be either "yaml or "legacy-html"') 

472 

473 

474def old_libraries( 

475 mi_factory: "MigrationItemFactory", 

476 suite_info: Suites, 

477 outofsync_arches: Iterable[str] = frozenset(), 

478) -> list["MigrationItem"]: 

479 """Detect old libraries left in the target suite for smooth transitions 

480 

481 This method detects old libraries which are in the target suite but no 

482 longer built from the source package: they are still there because 

483 other packages still depend on them, but they should be removed as 

484 soon as possible. 

485 

486 For "outofsync" architectures, outdated binaries are allowed to be in 

487 the target suite, so they are only added to the removal list if they 

488 are no longer in the (primary) source suite. 

489 """ 

490 sources_t = suite_info.target_suite.sources 

491 binaries_t = suite_info.target_suite.binaries 

492 binaries_s = suite_info.primary_source_suite.binaries 

493 removals = [] 

494 for arch in binaries_t: 

495 for pkg_name in binaries_t[arch]: 

496 pkg = binaries_t[arch][pkg_name] 

497 if sources_t[pkg.source].version != pkg.source_version and ( 

498 arch not in outofsync_arches or pkg_name not in binaries_s[arch] 

499 ): 

500 removals.append(mi_factory.generate_removal_for_cruft_item(pkg.pkg_id)) 

501 return removals 

502 

503 

504def is_nuninst_asgood_generous( 

505 constraints: dict[str, list[str]], 

506 allow_uninst: dict[str, set[Optional[str]]], 

507 architectures: list[str], 

508 old: dict[str, set[str]], 

509 new: dict[str, set[str]], 

510 break_arches: set[str] = cast(set[str], frozenset()), 

511) -> bool: 

512 """Compares the nuninst counters and constraints to see if they improved 

513 

514 Given a list of architectures, the previous and the current nuninst 

515 counters, this function determines if the current nuninst counter 

516 is better than the previous one. Optionally it also accepts a set 

517 of "break_arches", the nuninst counter for any architecture listed 

518 in this set are completely ignored. 

519 

520 If the nuninst counters are equal or better, then the constraints 

521 are checked for regressions (ignoring break_arches). 

522 

523 Returns True if the new nuninst counter is better than the 

524 previous and there are no constraint regressions (ignoring Break-archs). 

525 Returns False otherwise. 

526 

527 """ 

528 diff = 0 

529 for arch in architectures: 

530 if arch in break_arches: 

531 continue 

532 diff = diff + ( 

533 len(new[arch] - allow_uninst[arch]) - len(old[arch] - allow_uninst[arch]) 

534 ) 

535 if diff > 0: 

536 return False 

537 must_be_installable = constraints["keep-installable"] 

538 for arch in architectures: 

539 if arch in break_arches: 

540 continue 

541 regression = new[arch] - old[arch] 

542 if not regression.isdisjoint(must_be_installable): 542 ↛ 543line 542 didn't jump to line 543, because the condition on line 542 was never true

543 return False 

544 return True 

545 

546 

547def clone_nuninst( 

548 nuninst: dict[str, set[str]], 

549 *, 

550 packages_s: Optional[dict[str, dict[str, BinaryPackage]]] = None, 

551 architectures: Optional[Iterable[str]] = None, 

552) -> dict[str, set[str]]: 

553 """Completely or Selectively deep clone nuninst 

554 

555 Given nuninst table, the package table for a given suite and 

556 a list of architectures, this function will clone the nuninst 

557 table. Only the listed architectures will be deep cloned - 

558 the rest will only be shallow cloned. When packages_s is given, 

559 packages not listed in packages_s will be pruned from the clone 

560 (if packages_s is omitted, the per architecture nuninst is cloned 

561 as-is) 

562 """ 

563 clone = nuninst.copy() 

564 if architectures is None: 564 ↛ 565line 564 didn't jump to line 565, because the condition on line 564 was never true

565 return clone 

566 if packages_s is not None: 

567 for arch in architectures: 

568 clone[arch] = set(x for x in nuninst[arch] if x in packages_s[arch]) 

569 clone[arch + "+all"] = set( 

570 x for x in nuninst[arch + "+all"] if x in packages_s[arch] 

571 ) 

572 else: 

573 for arch in architectures: 

574 clone[arch] = set(nuninst[arch]) 

575 clone[arch + "+all"] = set(nuninst[arch + "+all"]) 

576 return clone 

577 

578 

579def test_installability( 

580 target_suite: TargetSuite, 

581 pkg_name: str, 

582 pkg_id: BinaryPackageId, 

583 broken: set[str], 

584 nuninst_arch: Optional[set[str]], 

585) -> Literal[-1, 0, 1]: 

586 """Test for installability of a package on an architecture 

587 

588 (pkg_name, pkg_version, pkg_arch) is the package to check. 

589 

590 broken is the set of broken packages. If p changes 

591 installability (e.g. goes from uninstallable to installable), 

592 broken will be updated accordingly. 

593 

594 If nuninst_arch is not None then it also updated in the same 

595 way as broken is. 

596 """ 

597 c: Literal[-1, 0, 1] = 0 

598 r = target_suite.is_installable(pkg_id) 

599 if not r: 

600 # not installable 

601 if pkg_name not in broken: 

602 # regression 

603 broken.add(pkg_name) 

604 c = -1 

605 if nuninst_arch is not None and pkg_name not in nuninst_arch: 

606 nuninst_arch.add(pkg_name) 

607 else: 

608 if pkg_name in broken: 

609 # Improvement 

610 broken.remove(pkg_name) 

611 c = 1 

612 if nuninst_arch is not None and pkg_name in nuninst_arch: 

613 nuninst_arch.remove(pkg_name) 

614 return c 

615 

616 

617def check_installability( 

618 target_suite: TargetSuite, 

619 binaries: dict[str, dict[str, BinaryPackage]], 

620 arch: str, 

621 updates: set[BinaryPackageId], 

622 check_archall: bool, 

623 nuninst: dict[str, set[str]], 

624) -> None: 

625 broken = nuninst[arch + "+all"] 

626 packages_t_a = binaries[arch] 

627 

628 for pkg_id in (x for x in updates if x.architecture == arch): 

629 name, version, parch = pkg_id 

630 if name not in packages_t_a: 

631 continue 

632 pkgdata = packages_t_a[name] 

633 if version != pkgdata.version: 

634 # Not the version in testing right now, ignore 

635 continue 

636 actual_arch = pkgdata.architecture 

637 nuninst_arch = None 

638 # only check arch:all packages if requested 

639 if check_archall or actual_arch != "all": 

640 nuninst_arch = nuninst[parch] 

641 elif actual_arch == "all": 641 ↛ 643line 641 didn't jump to line 643, because the condition on line 641 was never false

642 nuninst[parch].discard(name) 

643 test_installability(target_suite, name, pkg_id, broken, nuninst_arch) 

644 

645 

646def possibly_compressed( 

647 path: str, *, permitted_compressions: Optional[list[str]] = None 

648) -> str: 

649 """Find and select a (possibly compressed) variant of a path 

650 

651 If the given path exists, it will be returned 

652 

653 :param path: The base path. 

654 :param permitted_compressions: Alternative extensions to look for. Defaults to "gz" and "xz". 

655 :return: The path given possibly with one of the permitted extensions. 

656 :raises FileNotFoundError: if the path is not found 

657 """ 

658 if os.path.exists(path): 658 ↛ 660line 658 didn't jump to line 660, because the condition on line 658 was never false

659 return path 

660 if permitted_compressions is None: 

661 permitted_compressions = ["gz", "xz"] 

662 for ext in permitted_compressions: 

663 cpath = "%s.%s" % (path, ext) 

664 if os.path.exists(cpath): 

665 return cpath 

666 raise FileNotFoundError( 

667 errno.ENOENT, os.strerror(errno.ENOENT), path 

668 ) # pragma: no cover 

669 

670 

671def create_provides_map( 

672 packages: dict[str, BinaryPackage], 

673) -> dict[str, set[tuple[str, str]]]: 

674 """Create a provides map from a map binary package names and their BinaryPackage objects 

675 

676 :param packages: A dict mapping binary package names to their BinaryPackage object 

677 :return: A provides map 

678 """ 

679 # create provides 

680 provides = defaultdict(set) 

681 

682 for pkg, dpkg in packages.items(): 

683 # register virtual packages and real packages that provide 

684 # them 

685 for provided_pkg, provided_version, _ in dpkg.provides: 

686 provides[provided_pkg].add((pkg, provided_version)) 

687 

688 return provides 

689 

690 

691def read_release_file(suite_dir: str) -> "TagSection[str]": 

692 """Parses a given "Release" file 

693 

694 :param suite_dir: The directory to the suite 

695 :return: A dict of the first (and only) paragraph in an Release file 

696 """ 

697 release_file = os.path.join(suite_dir, "Release") 

698 with open(release_file) as fd: 

699 tag_file = iter(apt_pkg.TagFile(fd)) 

700 result = next(tag_file) 

701 if next(tag_file, None) is not None: # pragma: no cover 

702 raise TypeError("%s has more than one paragraph" % release_file) 

703 return result 

704 

705 

706def read_sources_file( 

707 filename: str, 

708 sources: Optional[dict[str, SourcePackage]] = None, 

709 add_faux: bool = True, 

710 intern: Callable[[str], str] = sys.intern, 

711) -> dict[str, SourcePackage]: 

712 """Parse a single Sources file into a hash 

713 

714 Parse a single Sources file into a dict mapping a source package 

715 name to a SourcePackage object. If there are multiple source 

716 packages with the same version, then highest versioned source 

717 package (that is not marked as "Extra-Source-Only") is the 

718 version kept in the dict. 

719 

720 :param filename: Path to the Sources file. Can be compressed by any algorithm supported by apt_pkg.TagFile 

721 :param sources: Optional dict to add the packages to. If given, this is also the value returned. 

722 :param add_faux: Add a faux arch:all binary for each source that claims it has arch:all 

723 :param intern: Internal optimisation / implementation detail to avoid python's "LOAD_GLOBAL" instruction in a loop 

724 :return: mapping from names to a source package 

725 """ 

726 if sources is None: 

727 sources = {} 

728 

729 tag_file = apt_pkg.TagFile(filename) 

730 get_field = tag_file.section.get 

731 step = tag_file.step 

732 

733 while step(): 

734 if get_field("Extra-Source-Only", "no") == "yes": 

735 # Ignore sources only referenced by Built-Using 

736 continue 

737 pkg = get_field("Package") 

738 ver = get_field("Version") 

739 # There may be multiple versions of the source package 

740 # (in unstable) if some architectures have out-of-date 

741 # binaries. We only ever consider the source with the 

742 # largest version for migration. 

743 if pkg in sources and apt_pkg.version_compare(sources[pkg].version, ver) > 0: 

744 continue 

745 maint = get_field("Maintainer") 

746 if maint: 746 ↛ 748line 746 didn't jump to line 748, because the condition on line 746 was never false

747 maint = intern(maint.strip()) 

748 section = get_field("Section") 

749 if section: 749 ↛ 752line 749 didn't jump to line 752, because the condition on line 749 was never false

750 section = intern(section.strip()) 

751 build_deps_arch: Optional[str] 

752 build_deps_arch = ", ".join( 

753 x 

754 for x in (get_field("Build-Depends"), get_field("Build-Depends-Arch")) 

755 if x is not None 

756 ) 

757 if build_deps_arch != "": 

758 build_deps_arch = sys.intern(build_deps_arch) 

759 else: 

760 build_deps_arch = None 

761 build_deps_indep = get_field("Build-Depends-Indep") 

762 if build_deps_indep is not None: 

763 build_deps_indep = sys.intern(build_deps_indep) 

764 

765 # Adding arch:all packages to the list of binaries already to be able 

766 # to check for them later. Helps mitigate bug 887060 and is the 

767 # (partial?) answer to bug 1064428. 

768 binaries: set[BinaryPackageId] = set() 

769 if add_faux and "all" in get_field("Architecture", "").split(): 

770 # the value "faux" in arch:faux is used elsewhere, so keep in sync 

771 pkg_id = BinaryPackageId(pkg + "-faux", intern("0~~~~"), intern("faux")) 

772 binaries.add(pkg_id) 

773 

774 sources[intern(pkg)] = SourcePackage( 

775 intern(pkg), 

776 intern(ver), 

777 section, 

778 binaries, 

779 maint, 

780 False, 

781 build_deps_arch, 

782 build_deps_indep, 

783 get_field("Testsuite", "").split(), 

784 get_field("Testsuite-Triggers", "").replace(",", "").split(), 

785 ) 

786 return sources 

787 

788 

789def _check_and_update_packages( 

790 packages: list[BinaryPackage], 

791 package: BinaryPackage, 

792 archqual: Optional[str], 

793 build_depends: bool, 

794) -> None: 

795 """Helper for get_dependency_solvers 

796 

797 This method updates the list of packages with a given package if that 

798 package is a valid (Build-)Depends. 

799 

800 :param packages: which packages are to be updated 

801 :param archqual: Architecture qualifier 

802 :param build_depends: If True, check if the "package" parameter is valid as a build-dependency. 

803 """ 

804 

805 # See also bug #971739 and #1059929 

806 if archqual is None: 

807 packages.append(package) 

808 elif archqual == "native" and build_depends: 

809 # Multi-arch handling for build-dependencies 

810 # - :native is ok always 

811 packages.append(package) 

812 elif archqual == "any" and package.multi_arch == "allowed": 

813 # Multi-arch handling for both build-dependencies and regular dependencies 

814 # - :any is ok iff the target has "M-A: allowed" 

815 packages.append(package) 

816 

817 

818class GetDependencySolversProto(Protocol): 

819 def __call__( 819 ↛ exitline 819 didn't jump to the function exit

820 self, 

821 block: list[tuple[str, str, str]], 

822 binaries_s_a: dict[str, BinaryPackage], 

823 provides_s_a: dict[str, set[tuple[str, str]]], 

824 *, 

825 build_depends: bool = False, 

826 empty_set: Any = frozenset(), 

827 ) -> list[BinaryPackage]: ... 

828 

829 

830def get_dependency_solvers( 

831 block: list[tuple[str, str, str]], 

832 binaries_s_a: dict[str, BinaryPackage], 

833 provides_s_a: dict[str, set[tuple[str, str]]], 

834 *, 

835 build_depends: bool = False, 

836 empty_set: Any = frozenset(), 

837) -> list[BinaryPackage]: 

838 """Find the packages which satisfy a dependency block 

839 

840 This method returns the list of packages which satisfy a dependency 

841 block (as returned by apt_pkg.parse_depends) in a package table 

842 for a given suite and architecture (a la self.binaries[suite][arch]) 

843 

844 It can also handle build-dependency relations if the named parameter 

845 "build_depends" is set to True. In this case, block should be based 

846 on the return value from apt_pkg.parse_src_depends. 

847 

848 :param block: The dependency block as parsed by apt_pkg.parse_depends (or apt_pkg.parse_src_depends 

849 if the "build_depends" is True) 

850 :param binaries_s_a: Mapping of package names to the relevant BinaryPackage 

851 :param provides_s_a: Mapping of package names to their providers (as generated by parse_provides) 

852 :param build_depends: If True, treat the "block" parameter as a build-dependency relation rather than 

853 a regular dependency relation. 

854 :param empty_set: Internal implementation detail / optimisation 

855 :return: package names solving the relation 

856 """ 

857 packages: list[BinaryPackage] = [] 

858 

859 # for every package, version and operation in the block 

860 for name, version, op in block: 

861 if ":" in name: 

862 name, archqual = name.split(":", 1) 

863 else: 

864 archqual = None 

865 

866 # look for the package in unstable 

867 if name in binaries_s_a: 

868 package = binaries_s_a[name] 

869 # check the versioned dependency and architecture qualifier 

870 # (if present) 

871 if (op == "" and version == "") or apt_pkg.check_dep( 

872 package.version, op, version 

873 ): 

874 _check_and_update_packages(packages, package, archqual, build_depends) 

875 

876 # look for the package in the virtual packages list and loop on them 

877 for prov, prov_version in provides_s_a.get(name, empty_set): 

878 assert prov in binaries_s_a 

879 package = binaries_s_a[prov] 

880 # See Policy Manual §7.5 

881 if (op == "" and version == "") or ( 

882 prov_version != "" and apt_pkg.check_dep(prov_version, op, version) 

883 ): 

884 _check_and_update_packages(packages, package, archqual, build_depends) 

885 

886 return packages 

887 

888 

889def invalidate_excuses( 

890 excuses: dict[str, "Excuse"], 

891 valid: set[str], 

892 invalid: set[str], 

893 invalidated: set[str], 

894) -> None: 

895 """Invalidate impossible excuses 

896 

897 This method invalidates the impossible excuses, which depend 

898 on invalid excuses. The two parameters contains the sets of 

899 `valid' and `invalid' excuses. 

900 """ 

901 # make a list of all packages (source and binary) that are present in the 

902 # excuses we have 

903 excuses_packages: dict[Union[PackageId, BinaryPackageId], set[str]] = defaultdict( 

904 set 

905 ) 

906 for exc in excuses.values(): 

907 for arch in exc.packages: 

908 for pkg_arch_id in exc.packages[arch]: 

909 # note that the same package can be in multiple excuses 

910 # eg. when unstable and TPU have the same packages 

911 excuses_packages[pkg_arch_id].add(exc.name) 

912 

913 # create dependencies between excuses based on packages 

914 excuses_rdeps = defaultdict(set) 

915 for exc in excuses.values(): 

916 # Note that excuses_rdeps is only populated by dependencies generated 

917 # based on packages below. There are currently no dependencies between 

918 # excuses that are added directly, so this is ok. 

919 

920 for pkg_dep in exc.depends_packages: 

921 # set of excuses, each of which can satisfy this specific 

922 # dependency 

923 # if there is a dependency on a package for which no 

924 # excuses exist (e.g. a cruft binary), the set will 

925 # contain an ImpossibleDependencyState 

926 dep_exc: set[Union[str, DependencyState]] = set() 

927 for pkg_dep_id in cast(set[BinaryPackageId], pkg_dep.deps): 

928 pkg_excuses = excuses_packages[pkg_dep_id] 

929 # if the dependency isn't found, we get an empty set 

930 if pkg_excuses == frozenset(): 

931 imp_dep = ImpossibleDependencyState( 

932 PolicyVerdict.REJECTED_PERMANENTLY, "%s" % (pkg_dep_id.name) 

933 ) 

934 dep_exc.add(imp_dep) 

935 

936 else: 

937 dep_exc |= pkg_excuses 

938 for e in pkg_excuses: 

939 excuses_rdeps[e].add(exc.name) 

940 if not exc.add_dependency(dep_exc, pkg_dep.spec): 

941 valid.discard(exc.name) 

942 invalid.add(exc.name) 

943 

944 # loop on the invalid excuses 

945 # Convert invalid to a list for deterministic results 

946 invalid2 = sorted(invalid) 

947 for ename in iter_except(invalid2.pop, IndexError): 

948 invalidated.add(ename) 

949 # if there is no reverse dependency, skip the item 

950 if ename not in excuses_rdeps: 

951 continue 

952 

953 rdep_verdict = PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM 

954 if excuses[ename].policy_verdict.is_blocked: 

955 rdep_verdict = PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM 

956 

957 # loop on the reverse dependencies 

958 for x in sorted(excuses_rdeps[ename]): 

959 exc = excuses[x] 

960 # if the item is valid and it is not marked as `forced', then we 

961 # invalidate this specific dependency 

962 if x in valid and not exc.forced: 

963 # mark this specific dependency as invalid 

964 still_valid = exc.invalidate_dependency(ename, rdep_verdict) 

965 

966 # if there are no alternatives left for this dependency, 

967 # invalidate the excuse 

968 if not still_valid: 

969 valid.discard(x) 

970 invalid2.append(x) 

971 

972 

973def compile_nuninst( 

974 target_suite: TargetSuite, architectures: list[str], nobreakall_arches: list[str] 

975) -> dict[str, set[str]]: 

976 """Compile a nuninst dict from the current testing 

977 

978 :param target_suite: The target suite 

979 :param architectures: Which architectures to check 

980 :param nobreakall_arches: Which architectures where arch:all packages must be installable 

981 """ 

982 nuninst: dict[str, set[str]] = {} 

983 binaries_t = target_suite.binaries 

984 

985 # for all the architectures 

986 for arch in architectures: 

987 # if it is in the nobreakall ones, check arch-independent packages too 

988 check_archall = arch in nobreakall_arches 

989 

990 # check all the packages for this architecture 

991 nuninst[arch] = set() 

992 packages_t_a = binaries_t[arch] 

993 for pkg_name, pkg_data in packages_t_a.items(): 

994 r = target_suite.is_installable(pkg_data.pkg_id) 

995 if not r: 

996 nuninst[arch].add(pkg_name) 

997 

998 # if they are not required, remove architecture-independent packages 

999 nuninst[arch + "+all"] = nuninst[arch].copy() 

1000 if not check_archall: 

1001 for pkg_name in nuninst[arch + "+all"]: 

1002 pkg_data = packages_t_a[pkg_name] 

1003 if pkg_data.architecture == "all": 

1004 nuninst[arch].remove(pkg_name) 

1005 

1006 return nuninst 

1007 

1008 

1009def is_smooth_update_allowed( 

1010 binary: BinaryPackage, smooth_updates: list[str], hints: "HintCollection" 

1011) -> bool: 

1012 if "ALL" in smooth_updates: 1012 ↛ 1013line 1012 didn't jump to line 1013, because the condition on line 1012 was never true

1013 return True 

1014 section = binary.section.split("/")[-1] 

1015 if section in smooth_updates: 

1016 return True 

1017 if hints.search( 

1018 "allow-smooth-update", package=binary.source, version=binary.source_version 

1019 ): 

1020 # note that this needs to match the source version *IN TESTING* 

1021 return True 

1022 return False 

1023 

1024 

1025def find_smooth_updateable_binaries( 

1026 binaries_to_check: list[BinaryPackageId], 

1027 source_data: SourcePackage, 

1028 pkg_universe: "BinaryPackageUniverse", 

1029 target_suite: TargetSuite, 

1030 binaries_t: dict[str, dict[str, BinaryPackage]], 

1031 binaries_s: dict[str, dict[str, BinaryPackage]], 

1032 removals: Union[set[BinaryPackageId], frozenset[BinaryPackageId]], 

1033 smooth_updates: list[str], 

1034 hints: "HintCollection", 

1035) -> set[BinaryPackageId]: 

1036 check: set[BinaryPackageId] = set() 

1037 smoothbins: set[BinaryPackageId] = set() 

1038 

1039 for check_pkg_id in binaries_to_check: 

1040 binary, _, parch = check_pkg_id 

1041 

1042 cruftbins: set[BinaryPackageId] = set() 

1043 

1044 # Not a candidate for smooth up date (newer non-cruft version in unstable) 

1045 if binary in binaries_s[parch]: 

1046 if binaries_s[parch][binary].source_version == source_data.version: 

1047 continue 

1048 cruftbins.add(binaries_s[parch][binary].pkg_id) 

1049 

1050 # Maybe a candidate (cruft or removed binary): check if config allows us to smooth update it. 

1051 if is_smooth_update_allowed(binaries_t[parch][binary], smooth_updates, hints): 

1052 # if the package has reverse-dependencies which are 

1053 # built from other sources, it's a valid candidate for 

1054 # a smooth update. if not, it may still be a valid 

1055 # candidate if one if its r-deps is itself a candidate, 

1056 # so note it for checking later 

1057 rdeps = set(pkg_universe.reverse_dependencies_of(check_pkg_id)) 

1058 # We ignore all binaries listed in "removals" as we 

1059 # assume they will leave at the same time as the 

1060 # given package. 

1061 rdeps.difference_update(removals, binaries_to_check) 

1062 

1063 smooth_update_it = False 

1064 if target_suite.any_of_these_are_in_the_suite(rdeps): 

1065 combined = set(smoothbins) 

1066 combined.add(check_pkg_id) 

1067 for rdep in rdeps: 

1068 # each dependency clause has a set of possible 

1069 # alternatives that can satisfy that dependency. 

1070 # if any of them is outside the set of smoothbins, the 

1071 # dependency can be satisfied even if this binary was 

1072 # removed, so there is no need to keep it around for a 

1073 # smooth update 

1074 # if not, only this binary can satisfy the dependency, so 

1075 # we should keep it around until the rdep is no longer in 

1076 # testing 

1077 for dep_clause in pkg_universe.dependencies_of(rdep): 

1078 # filter out cruft binaries from unstable, because 

1079 # they will not be added to the set of packages that 

1080 # will be migrated 

1081 if dep_clause - cruftbins <= combined: 

1082 smooth_update_it = True 

1083 break 

1084 

1085 if smooth_update_it: 

1086 smoothbins = combined 

1087 else: 

1088 check.add(check_pkg_id) 

1089 

1090 # check whether we should perform a smooth update for 

1091 # packages which are candidates but do not have r-deps 

1092 # outside of the current source 

1093 while 1: 

1094 found_any = False 

1095 for candidate_pkg_id in check: 

1096 rdeps = pkg_universe.reverse_dependencies_of(candidate_pkg_id) 

1097 if not rdeps.isdisjoint(smoothbins): 

1098 smoothbins.add(candidate_pkg_id) 

1099 found_any = True 

1100 if not found_any: 

1101 break 

1102 check = {x for x in check if x not in smoothbins} 

1103 

1104 return smoothbins 

1105 

1106 

1107def find_newer_binaries( 

1108 suite_info: Suites, pkg: BinaryPackage, add_source_for_dropped_bin: bool = False 

1109) -> list[tuple[PackageId, Suite]]: 

1110 """ 

1111 Find newer binaries for pkg in any of the source suites. 

1112 

1113 :param pkg: BinaryPackage (is assumed to be in the target suite) 

1114 

1115 :param add_source_for_dropped_bin: If True, newer versions of the 

1116 source of pkg will be added if they don't have the binary pkg 

1117 

1118 :return: the newer binaries (or sources) and their suites 

1119 """ 

1120 source = pkg.source 

1121 newer_versions: list[tuple[PackageId, Suite]] = [] 

1122 for suite in suite_info: 

1123 if suite.suite_class == SuiteClass.TARGET_SUITE: 

1124 continue 

1125 

1126 suite_binaries_on_arch = suite.binaries.get(pkg.pkg_id.architecture) 

1127 if not suite_binaries_on_arch: 1127 ↛ 1128line 1127 didn't jump to line 1128, because the condition on line 1127 was never true

1128 continue 

1129 

1130 newerbin = None 

1131 if pkg.pkg_id.package_name in suite_binaries_on_arch: 

1132 newerbin = suite_binaries_on_arch[pkg.pkg_id.package_name] 

1133 if suite.is_cruft(newerbin): 

1134 # We pretend the cruft binary doesn't exist. 

1135 # We handle this as if the source didn't have the binary 

1136 # (see below) 

1137 newerbin = None 

1138 elif apt_pkg.version_compare(newerbin.version, pkg.version) <= 0: 

1139 continue 

1140 else: 

1141 if source not in suite.sources: 

1142 # bin and source not in suite: no newer version 

1143 continue 

1144 

1145 if not newerbin: 

1146 if not add_source_for_dropped_bin: 1146 ↛ 1147line 1146 didn't jump to line 1147, because the condition on line 1146 was never true

1147 continue 

1148 # We only get here if there is a newer version of the source, 

1149 # which doesn't have the binary anymore (either it doesn't 

1150 # exist, or it's cruft and we pretend it doesn't exist). 

1151 # Add the new source instead. 

1152 nsrc = suite.sources[source] 

1153 n_id = PackageId(source, nsrc.version, "source") 

1154 overs = pkg.source_version 

1155 if apt_pkg.version_compare(nsrc.version, overs) <= 0: 

1156 continue 

1157 else: 

1158 n_id = newerbin.pkg_id 

1159 

1160 newer_versions.append((n_id, suite)) 

1161 

1162 return newer_versions 

1163 

1164 

1165def parse_provides( 

1166 provides_raw: str, 

1167 pkg_id: Optional[BinaryPackageId] = None, 

1168 logger: Optional[logging.Logger] = None, 

1169) -> list[tuple[str, str, str]]: 

1170 parts = apt_pkg.parse_depends(provides_raw, False) 

1171 nprov = [] 

1172 for or_clause in parts: 

1173 if len(or_clause) != 1: # pragma: no cover 

1174 if logger is not None: 

1175 msg = "Ignoring invalid provides in %s: Alternatives [%s]" 

1176 logger.warning(msg, str(pkg_id), str(or_clause)) 

1177 continue 

1178 for part in or_clause: 

1179 provided, provided_version, op = part 

1180 if op != "" and op != "=": # pragma: no cover 

1181 if logger is not None: 

1182 msg = "Ignoring invalid provides in %s: %s (%s %s)" 

1183 logger.warning(msg, str(pkg_id), provided, op, provided_version) 

1184 continue 

1185 provided = sys.intern(provided) 

1186 provided_version = sys.intern(provided_version) 

1187 part = (provided, provided_version, sys.intern(op)) 

1188 nprov.append(part) 

1189 return nprov 

1190 

1191 

1192def parse_builtusing( 

1193 builtusing_raw: str, 

1194 pkg_id: Optional[BinaryPackageId] = None, 

1195 logger: Optional[logging.Logger] = None, 

1196) -> list[tuple[str, str]]: 

1197 parts = apt_pkg.parse_depends(builtusing_raw, False) 

1198 nbu = [] 

1199 for or_clause in parts: 

1200 if len(or_clause) != 1: # pragma: no cover 

1201 if logger is not None: 

1202 msg = "Ignoring invalid builtusing in %s: Alternatives [%s]" 

1203 logger.warning(msg, str(pkg_id), str(or_clause)) 

1204 continue 

1205 for part in or_clause: 

1206 bu, bu_version, op = part 

1207 if op != "=": # pragma: no cover 

1208 if logger is not None: 

1209 msg = "Ignoring invalid builtusing in %s: %s (%s %s)" 

1210 logger.warning(msg, str(pkg_id), bu, op, bu_version) 

1211 continue 

1212 bu = sys.intern(bu) 

1213 bu_version = sys.intern(bu_version) 

1214 nbu.append((bu, bu_version)) 

1215 return nbu 

1216 

1217 

1218def parse_option( 

1219 options: "optparse.Values", 

1220 option_name: str, 

1221 default: Optional[Any] = None, 

1222 to_bool: bool = False, 

1223 to_int: bool = False, 

1224 day_to_sec: bool = False, 

1225) -> None: 

1226 """Ensure the option exist and has a sane value 

1227 

1228 :param options: dict with options 

1229 

1230 :param option_name: string with the name of the option 

1231 

1232 :param default: the default value for the option 

1233 

1234 :param to_int: convert the input to int (defaults to sys.maxsize) 

1235 

1236 :param to_bool: convert the input to bool 

1237 

1238 :param day_to_sec: convert the input from days to seconds (implies to_int=True) 

1239 """ 

1240 value = getattr(options, option_name, default) 

1241 

1242 # Option was provided with no value (or default is '') so pick up the default 

1243 if value == "": 

1244 value = default 

1245 

1246 if (to_int or day_to_sec) and value in (None, ""): 

1247 value = sys.maxsize 

1248 

1249 if day_to_sec: 

1250 value = int(float(value) * 24 * 60 * 60) # type: ignore[arg-type] 

1251 

1252 if to_int: 

1253 value = int(value) # type: ignore[arg-type] 

1254 

1255 if to_bool: 

1256 if value and ( 

1257 isinstance(value, bool) or value.lower() in ("yes", "y", "true", "t", "1") 

1258 ): 

1259 value = True 

1260 else: 

1261 value = False 

1262 

1263 setattr(options, option_name, value)