Coverage for britney2/utils.py: 92%

496 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2026-06-17 09:00 +0000

1# Refactored parts from britney.py, which is/was: 

2# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org> 

3# Andreas Barth <aba@debian.org> 

4# Fabio Tranchitella <kobold@debian.org> 

5# Copyright (C) 2010-2012 Adam D. Barratt <adsb@debian.org> 

6# Copyright (C) 2012 Niels Thykier <niels@thykier.net> 

7# 

8# New portions 

9# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org> 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21 

22import errno 

23import logging 

24import optparse 

25import os 

26import sys 

27import time 

28from collections import defaultdict 

29from collections.abc import ( 

30 Callable, 

31 Container, 

32 Generator, 

33 Iterable, 

34 Iterator, 

35 Mapping, 

36 MutableSet, 

37) 

38from datetime import UTC, datetime 

39from enum import Enum, StrEnum 

40from functools import partial 

41from itertools import chain, filterfalse 

42from typing import ( 

43 IO, 

44 TYPE_CHECKING, 

45 Any, 

46 Literal, 

47 Protocol, 

48 TypeVar, 

49 Union, 

50 cast, 

51 overload, 

52) 

53 

54import apt_pkg 

55import yaml 

56 

57from britney2 import ( 

58 BinaryPackage, 

59 BinaryPackageId, 

60 MultiArch, 

61 PackageId, 

62 SourcePackage, 

63 Suite, 

64 SuiteClass, 

65 Suites, 

66 TargetSuite, 

67) 

68from britney2.excuse import Excuse 

69from britney2.excusedeps import DependencyState, ImpossibleDependencyState 

70from britney2.policies import PolicyVerdict 

71 

72if TYPE_CHECKING: 72 ↛ 74line 72 didn't jump to line 74 because the condition on line 72 was never true

73 

74 from _typeshed import SupportsRichComparisonT 

75 from apt_pkg import TagSection 

76 

77 from .hints import HintCollection 

78 from .installability.universe import BinaryPackageUniverse 

79 from .migrationitem import MigrationItem, MigrationItemFactory 

80 

81_T = TypeVar("_T") 

82 

83 

84class MigrationConstraintException(Exception): 

85 pass 

86 

87 

88@overload 

89def ifilter_except( 89 ↛ exitline 89 didn't jump to the function exit

90 container: Container[_T], iterable: Literal[None] = None 

91) -> "partial[filterfalse[_T]]": ... 

92 

93 

94@overload 

95def ifilter_except( 95 ↛ exitline 95 didn't jump to the function exit

96 container: Container[_T], iterable: Iterable[_T] 

97) -> "filterfalse[_T]": ... 

98 

99 

100def ifilter_except( 

101 container: Container[_T], iterable: Iterable[_T] | None = None 

102) -> Union["filterfalse[_T]", "partial[filterfalse[_T]]"]: 

103 """Filter out elements in container 

104 

105 If given an iterable it returns a filtered iterator, otherwise it 

106 returns a function to generate filtered iterators. The latter is 

107 useful if the same filter has to be (re-)used on multiple 

108 iterators that are not known on beforehand. 

109 """ 

110 if iterable is not None: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true

111 return filterfalse(container.__contains__, iterable) 

112 return cast( 

113 "partial[filterfalse[_T]]", partial(filterfalse, container.__contains__) 

114 ) 

115 

116 

117@overload 

118def ifilter_only(container: Container[_T], iterable: Iterable[_T]) -> "filter[_T]": ... 118 ↛ exitline 118 didn't return from function 'ifilter_only' because

119 

120 

121@overload 

122def ifilter_only(container: Container[_T], iterable: None) -> "partial[filter[_T]]": ... 122 ↛ exitline 122 didn't return from function 'ifilter_only' because

123 

124 

125def ifilter_only( 

126 container: Container[_T], iterable: Iterable[_T] | None = None 

127) -> Union["filter[_T]", "partial[filter[_T]]"]: 

128 """Filter out elements in which are not in container 

129 

130 If given an iterable it returns a filtered iterator, otherwise it 

131 returns a function to generate filtered iterators. The latter is 

132 useful if the same filter has to be (re-)used on multiple 

133 iterators that are not known on beforehand. 

134 """ 

135 if iterable is not None: 135 ↛ 137line 135 didn't jump to line 137 because the condition on line 135 was always true

136 return filter(container.__contains__, iterable) 

137 return partial(filter, container.__contains__) 

138 

139 

140# iter_except is from the "itertools" recipe 

141def iter_except( 

142 func: Callable[[], _T], 

143 exception: type[BaseException] | tuple[type[BaseException], ...], 

144 first: Any = None, 

145) -> Iterator[_T]: # pragma: no cover - itertools recipe function 

146 """Call a function repeatedly until an exception is raised. 

147 

148 Converts a call-until-exception interface to an iterator interface. 

149 Like __builtin__.iter(func, sentinel) but uses an exception instead 

150 of a sentinel to end the loop. 

151 

152 Examples: 

153 bsddbiter = iter_except(db.next, bsddb.error, db.first) 

154 heapiter = iter_except(functools.partial(heappop, h), IndexError) 

155 dictiter = iter_except(d.popitem, KeyError) 

156 dequeiter = iter_except(d.popleft, IndexError) 

157 queueiter = iter_except(q.get_nowait, Queue.Empty) 

158 setiter = iter_except(s.pop, KeyError) 

159 

160 """ 

161 try: 

162 if first is not None: 

163 yield first() 

164 while 1: 

165 yield func() 

166 except exception: 

167 pass 

168 

169 

170def log_and_format_old_libraries( 

171 logger: logging.Logger, libs: list["MigrationItem"] 

172) -> None: 

173 """Format and log old libraries in a table (no header)""" 

174 libraries: dict[str, list[str]] = {} 

175 for i in libs: 

176 pkg = i.package 

177 if pkg in libraries: 

178 libraries[pkg].append(i.architecture) 

179 else: 

180 libraries[pkg] = [i.architecture] 

181 

182 for lib in sorted(libraries): 

183 logger.info(" %s: %s", lib, " ".join(libraries[lib])) 

184 

185 

186def compute_reverse_tree( 

187 pkg_universe: "BinaryPackageUniverse", affected: set[BinaryPackageId] 

188) -> None: 

189 """Calculate the full dependency tree for a set of packages 

190 

191 This method returns the full dependency tree for a given set of 

192 packages. The first argument is an instance of the BinaryPackageUniverse 

193 and the second argument are a set of BinaryPackageId. 

194 

195 The set of affected packages will be updated in place and must 

196 therefore be mutable. 

197 """ 

198 remain = list(affected) 

199 while remain: 

200 pkg_id = remain.pop() 

201 new_pkg_ids = pkg_universe.reverse_dependencies_of(pkg_id) - affected 

202 affected.update(new_pkg_ids) 

203 remain.extend(new_pkg_ids) 

204 

205 

206def add_transitive_dependencies_flatten( 

207 pkg_universe: "BinaryPackageUniverse", initial_set: MutableSet[BinaryPackageId] 

208) -> None: 

209 """Find and include all transitive dependencies 

210 

211 This method updates the initial_set parameter to include all transitive 

212 dependencies. The first argument is an instance of the BinaryPackageUniverse 

213 and the second argument are a set of BinaryPackageId. 

214 

215 The set of initial packages will be updated in place and must 

216 therefore be mutable. 

217 """ 

218 remain = list(initial_set) 

219 while remain: 

220 pkg_id = remain.pop() 

221 new_pkg_ids = { 

222 x 

223 for x in chain.from_iterable(pkg_universe.dependencies_of(pkg_id)) 

224 if x not in initial_set 

225 } 

226 initial_set |= new_pkg_ids 

227 remain.extend(new_pkg_ids) 

228 

229 

230def write_nuninst(filename: str, nuninst: dict[str, set[str]]) -> None: 

231 """Write the non-installable report 

232 

233 Write the non-installable report derived from "nuninst" to the 

234 file denoted by "filename". 

235 """ 

236 with open(filename, "w", encoding="utf-8") as f: 

237 # Having two fields with (almost) identical dates seems a bit 

238 # redundant. 

239 f.write( 

240 "Built on: " 

241 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

242 + "\n" 

243 ) 

244 f.write( 

245 "Last update: " 

246 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

247 + "\n\n" 

248 ) 

249 for k in nuninst: 

250 f.write("{}: {}\n".format(k, " ".join(nuninst[k]))) 

251 

252 

253def read_nuninst(filename: str, architectures: set[str]) -> dict[str, set[str]]: 

254 """Read the non-installable report 

255 

256 Read the non-installable report from the file denoted by 

257 "filename" and return it. Only architectures in "architectures" 

258 will be included in the report. 

259 """ 

260 nuninst: dict[str, set[str]] = {} 

261 with open(filename, encoding="utf-8") as f: 

262 for r in f: 

263 if ":" not in r: 

264 continue 

265 arch, packages = r.strip().split(":", 1) 

266 if arch.split("+", 1)[0] in architectures: 

267 nuninst[arch] = set(packages.split()) 

268 return nuninst 

269 

270 

271def newly_uninst( 

272 nuold: dict[str, set[str]], nunew: dict[str, set[str]] 

273) -> dict[str, list[str]]: 

274 """Return a nuninst statistic with only new uninstallable packages 

275 

276 This method subtracts the uninstallable packages of the statistic 

277 "nunew" from the statistic "nuold". 

278 

279 It returns a dictionary with the architectures as keys and the list 

280 of uninstallable packages as values. If there are no regressions 

281 on a given architecture, then the architecture will be omitted in 

282 the result. Accordingly, if none of the architectures have 

283 regressions an empty directory is returned. 

284 """ 

285 res: dict[str, list[str]] = {} 

286 for arch in ifilter_only(nunew, nuold): 

287 arch_nuninst = [x for x in nunew[arch] if x not in nuold[arch]] 

288 # Leave res empty if there are no newly uninst packages 

289 if arch_nuninst: 

290 res[arch] = arch_nuninst 

291 return res 

292 

293 

294def format_and_log_uninst( 

295 logger: logging.Logger, 

296 architectures: Iterable[str], 

297 nuninst: Mapping[str, Iterable[str]], 

298 *, 

299 loglevel: int = logging.INFO, 

300) -> None: 

301 """Emits the uninstallable packages to the log 

302 

303 An example of the output string is: 

304 * i386: broken-pkg1, broken-pkg2 

305 

306 Note that if there is no uninstallable packages, then nothing is emitted. 

307 """ 

308 for arch in architectures: 

309 if arch in nuninst and nuninst[arch]: 

310 msg = " * {}: {}".format(arch, ", ".join(sorted(nuninst[arch]))) 

311 logger.log(loglevel, msg) 

312 

313 

314class Sorted(Protocol): 

315 def __call__( 315 ↛ exitline 315 didn't jump to the function exit

316 self, 

317 iterable: Iterable["SupportsRichComparisonT"], 

318 /, 

319 *, 

320 key: None = None, 

321 reverse: bool = False, 

322 ) -> list["SupportsRichComparisonT"]: ... 

323 

324 

325def write_heidi( 

326 filename: str, 

327 target_suite: TargetSuite, 

328 *, 

329 outofsync_arches: frozenset[str] = frozenset(), 

330 sorted: Sorted = sorted, 

331) -> None: 

332 """Write the output HeidiResult 

333 

334 This method write the output for Heidi, which contains all the 

335 binary packages and the source packages in the form: 

336 

337 <pkg-name> <pkg-version> <pkg-architecture> <pkg-section> 

338 <src-name> <src-version> source <src-section> 

339 

340 The file is written as "filename" using the sources and packages 

341 from the "target_suite" parameter. 

342 

343 outofsync_arches: If given, it is a set of architectures marked 

344 as "out of sync". The output file may exclude some out of date 

345 arch:all packages for those architectures to reduce the noise. 

346 

347 The "X=X" parameters are optimizations to avoid "load global" in 

348 the loops. 

349 """ 

350 sources_t = target_suite.sources 

351 packages_t = target_suite.binaries 

352 

353 with open(filename, "w", encoding="ascii") as f: 

354 

355 # write binary packages 

356 for arch in sorted(packages_t): 

357 binaries = packages_t[arch] 

358 for pkg_name in sorted(binaries): 

359 pkg = binaries[pkg_name] 

360 pkgv = pkg.version 

361 pkgarch = pkg.architecture or "all" 

362 pkgsec = pkg.section or "faux" 

363 if pkgsec == "faux" or pkgsec.endswith("/faux"): 

364 # Faux package; not really a part of testing 

365 continue 

366 if ( 366 ↛ 378line 366 didn't jump to line 378

367 pkg.source_version 

368 and pkgarch == "all" 

369 and pkg.source_version != sources_t[pkg.source].version 

370 and arch in outofsync_arches 

371 ): 

372 # when architectures are marked as "outofsync", their binary 

373 # versions may be lower than those of the associated 

374 # source package in testing. the binary package list for 

375 # such architectures will include arch:all packages 

376 # matching those older versions, but we only want the 

377 # newer arch:all in testing 

378 continue 

379 f.write(f"{pkg_name} {pkgv} {pkgarch} {pkgsec}\n") 

380 

381 # write sources 

382 for src_name in sorted(sources_t): 

383 src = sources_t[src_name] 

384 srcv = src.version 

385 srcsec = src.section or "unknown" 

386 if srcsec == "faux" or srcsec.endswith("/faux"): 

387 # Faux package; not really a part of testing 

388 continue 

389 f.write(f"{src_name} {srcv} source {srcsec}\n") 

390 

391 

392def write_heidi_delta(filename: str, all_selected: list["MigrationItem"]) -> None: 

393 """Write the output delta 

394 

395 This method writes the packages to be upgraded, in the form: 

396 <src-name> <src-version> 

397 or (if the source is to be removed): 

398 -<src-name> <src-version> 

399 

400 The order corresponds to that shown in update_output. 

401 """ 

402 with open(filename, "w", encoding="ascii") as fd: 

403 

404 fd.write("#HeidiDelta\n") 

405 

406 for item in all_selected: 

407 prefix = "" 

408 

409 if item.is_removal: 

410 prefix = "-" 

411 

412 if item.architecture == "source": 

413 fd.write(f"{prefix}{item.package} {item.version}\n") 

414 else: 

415 fd.write( 

416 "%s%s %s %s\n" 

417 % (prefix, item.package, item.version, item.architecture) 

418 ) 

419 

420 

421class Opener(Protocol): 

422 def __call__( 422 ↛ exitline 422 didn't jump to the function exit

423 self, file: str, mode: Literal["wt"], encoding: Literal["utf-8"] 

424 ) -> IO[Any]: ... 

425 

426 

427class ExcusesOutputFormat(Enum): 

428 YAML = 0 

429 LEGACY_HTML = 1 

430 

431 

432def write_excuses( 

433 excuses: dict[str, "Excuse"] | dict[PackageId, "Excuse"], 

434 dest_file: str, 

435 output_format: ExcusesOutputFormat = ExcusesOutputFormat.YAML, 

436) -> None: 

437 """Write the excuses to dest_file 

438 

439 Writes a list of excuses in a specified output_format to the 

440 path denoted by dest_file. The output_format can either be "yaml" 

441 or "legacy-html". 

442 """ 

443 excuselist = sorted(excuses.values(), key=lambda x: x.sortkey()) 

444 if output_format is ExcusesOutputFormat.YAML: 

445 # use custom representer to avoid creation of the full list with all excuse data before starting the serialization 

446 def represent_yaml_excuse(dumper: yaml.Dumper, data: Excuse) -> yaml.Node: 

447 return dumper.represent_data(data.excusedata(excuses)) 

448 

449 yaml.add_representer(Excuse, represent_yaml_excuse) 

450 yaml.Dumper.add_multi_representer( 

451 StrEnum, yaml.representer.Representer.represent_str 

452 ) 

453 

454 os.makedirs(os.path.dirname(dest_file), exist_ok=True) 

455 opener: Opener = open # type: ignore[assignment] 

456 if dest_file.endswith(".xz"): 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true

457 import lzma 

458 

459 opener = lzma.open # type: ignore[assignment] 

460 elif dest_file.endswith(".gz"): 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true

461 import gzip 

462 

463 opener = gzip.open # type: ignore[assignment] 

464 with opener(f"{dest_file}.new", "wt", encoding="utf-8") as f: 

465 excusesdata = { 

466 "sources": excuselist, 

467 "generated-date": datetime.now(UTC), 

468 } 

469 yaml.dump( 

470 excusesdata, stream=f, default_flow_style=False, allow_unicode=True 

471 ) 

472 os.replace(f"{dest_file}.new", dest_file) 

473 elif output_format is ExcusesOutputFormat.LEGACY_HTML: 

474 with open(f"{dest_file}.new", "w", encoding="utf-8") as f: 

475 f.write( 

476 '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/REC-html40/strict.dtd">\n' 

477 ) 

478 f.write("<html><head><title>excuses...</title>") 

479 f.write( 

480 '<meta http-equiv="Content-Type" content="text/html;charset=utf-8"></head><body>\n' 

481 ) 

482 f.write( 

483 "<p>Generated: " 

484 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

485 + "</p>\n" 

486 ) 

487 f.write("<ul>\n") 

488 for e in excuselist: 

489 f.write("<li>%s" % e.html(excuses)) 

490 f.write("</ul></body></html>\n") 

491 os.replace(f"{dest_file}.new", dest_file) 

492 else: # pragma: no cover 

493 raise ValueError('Output format must be either "YAML or "LEGACY_HTML"') 

494 

495 

496def old_libraries( 

497 mi_factory: "MigrationItemFactory", 

498 suite_info: Suites, 

499 outofsync_arches: Iterable[str] = frozenset(), 

500) -> list["MigrationItem"]: 

501 """Detect old libraries left in the target suite for smooth transitions 

502 

503 This method detects old libraries which are in the target suite but no 

504 longer built from the source package: they are still there because 

505 other packages still depend on them, but they should be removed as 

506 soon as possible. 

507 

508 For "outofsync" architectures, outdated binaries are allowed to be in 

509 the target suite, so they are only added to the removal list if they 

510 are no longer in the (primary) source suite. 

511 """ 

512 sources_t = suite_info.target_suite.sources 

513 binaries_t = suite_info.target_suite.binaries 

514 binaries_s = suite_info.primary_source_suite.binaries 

515 removals = [] 

516 for arch in binaries_t: 

517 for pkg_name in binaries_t[arch]: 

518 pkg = binaries_t[arch][pkg_name] 

519 if sources_t[pkg.source].version != pkg.source_version and ( 

520 arch not in outofsync_arches or pkg_name not in binaries_s[arch] 

521 ): 

522 removals.append(mi_factory.generate_removal_for_cruft_item(pkg.pkg_id)) 

523 return removals 

524 

525 

526def is_nuninst_asgood_generous( 

527 constraints: dict[str, list[str]], 

528 allow_uninst: dict[str, set[str | None]], 

529 architectures: list[str], 

530 old: dict[str, set[str]], 

531 new: dict[str, set[str]], 

532 break_arches: set[str] = cast(set[str], frozenset()), 

533) -> bool: 

534 """Compares the nuninst counters and constraints to see if they improved 

535 

536 Given a list of architectures, the previous and the current nuninst 

537 counters, this function determines if the current nuninst counter 

538 is better than the previous one. Optionally it also accepts a set 

539 of "break_arches", the nuninst counter for any architecture listed 

540 in this set are completely ignored. 

541 

542 If the nuninst counters are equal or better, then the constraints 

543 are checked for regressions (ignoring break_arches). 

544 

545 Returns True if the new nuninst counter is better than the 

546 previous and there are no constraint regressions (ignoring Break-archs). 

547 Returns False otherwise. 

548 

549 """ 

550 diff = 0 

551 for arch in architectures: 

552 if arch in break_arches: 

553 continue 

554 diff = diff + ( 

555 len(new[arch] - allow_uninst[arch]) - len(old[arch] - allow_uninst[arch]) 

556 ) 

557 if diff > 0: 

558 return False 

559 must_be_installable = constraints["keep-installable"] 

560 for arch in architectures: 

561 if arch in break_arches: 

562 continue 

563 regression = new[arch] - old[arch] 

564 if not regression.isdisjoint(must_be_installable): 564 ↛ 565line 564 didn't jump to line 565 because the condition on line 564 was never true

565 return False 

566 return True 

567 

568 

569def clone_nuninst( 

570 nuninst: dict[str, set[str]], 

571 *, 

572 packages_s: dict[str, dict[str, BinaryPackage]] | None = None, 

573 architectures: Iterable[str] | None = None, 

574) -> dict[str, set[str]]: 

575 """Completely or Selectively deep clone nuninst 

576 

577 Given nuninst table, the package table for a given suite and 

578 a list of architectures, this function will clone the nuninst 

579 table. Only the listed architectures will be deep cloned - 

580 the rest will only be shallow cloned. When packages_s is given, 

581 packages not listed in packages_s will be pruned from the clone 

582 (if packages_s is omitted, the per architecture nuninst is cloned 

583 as-is) 

584 """ 

585 clone = nuninst.copy() 

586 if architectures is None: 586 ↛ 587line 586 didn't jump to line 587 because the condition on line 586 was never true

587 return clone 

588 if packages_s is not None: 

589 for arch in architectures: 

590 clone[arch] = {x for x in nuninst[arch] if x in packages_s[arch]} 

591 clone[arch + "+all"] = { 

592 x for x in nuninst[arch + "+all"] if x in packages_s[arch] 

593 } 

594 else: 

595 for arch in architectures: 

596 clone[arch] = set(nuninst[arch]) 

597 clone[arch + "+all"] = set(nuninst[arch + "+all"]) 

598 return clone 

599 

600 

601def test_installability( 

602 target_suite: TargetSuite, 

603 pkg_name: str, 

604 pkg_id: BinaryPackageId, 

605 broken: set[str], 

606 nuninst_arch: set[str] | None, 

607) -> None: 

608 """Test for installability of a package on an architecture 

609 

610 (pkg_name, pkg_version, pkg_arch) is the package to check. 

611 

612 broken is the set of broken packages. If p changes 

613 installability (e.g. goes from uninstallable to installable), 

614 broken will be updated accordingly. 

615 

616 If nuninst_arch is not None then it also updated in the same 

617 way as broken is. 

618 """ 

619 if not target_suite.is_installable(pkg_id): 

620 # if pkg_name not in broken: regression else: already broken 

621 broken.add(pkg_name) 

622 if nuninst_arch is not None: 

623 nuninst_arch.add(pkg_name) 

624 else: 

625 # if pkg_name in broken: # improvement else: already not broken 

626 broken.discard(pkg_name) 

627 if nuninst_arch is not None: 

628 nuninst_arch.discard(pkg_name) 

629 

630 

631def check_installability( 

632 target_suite: TargetSuite, 

633 binaries: dict[str, dict[str, BinaryPackage]], 

634 arch: str, 

635 updates: set[BinaryPackageId], 

636 check_archall: bool, 

637 nuninst: dict[str, set[str]], 

638) -> None: 

639 broken = nuninst[arch + "+all"] 

640 packages_t_a = binaries[arch] 

641 

642 for pkg_id in (x for x in updates if x.architecture == arch): 

643 name, version, parch = pkg_id.package_name, pkg_id.version, pkg_id.architecture 

644 if name not in packages_t_a: 

645 continue 

646 pkgdata = packages_t_a[name] 

647 if version != pkgdata.version: 

648 # Not the version in testing right now, ignore 

649 continue 

650 actual_arch = pkgdata.architecture 

651 nuninst_arch = None 

652 # only check arch:all packages if requested 

653 if check_archall or actual_arch != "all": 

654 nuninst_arch = nuninst[parch] 

655 elif actual_arch == "all": 655 ↛ 657line 655 didn't jump to line 657 because the condition on line 655 was always true

656 nuninst[parch].discard(name) 

657 test_installability(target_suite, name, pkg_id, broken, nuninst_arch) 

658 

659 

660def possibly_compressed( 

661 path: str, *, permitted_compressions: list[str] | None = None 

662) -> str: 

663 """Find and select a (possibly compressed) variant of a path 

664 

665 If the given path exists, it will be returned 

666 

667 :param path: The base path. 

668 :param permitted_compressions: Alternative extensions to look for. Defaults to "gz" and "xz". 

669 :return: The path given possibly with one of the permitted extensions. 

670 :raises FileNotFoundError: if the path is not found 

671 """ 

672 if os.path.exists(path): 672 ↛ 674line 672 didn't jump to line 674 because the condition on line 672 was always true

673 return path 

674 if permitted_compressions is None: 

675 permitted_compressions = ["gz", "xz"] 

676 for ext in permitted_compressions: 

677 cpath = f"{path}.{ext}" 

678 if os.path.exists(cpath): 

679 return cpath 

680 raise FileNotFoundError( 

681 errno.ENOENT, os.strerror(errno.ENOENT), path 

682 ) # pragma: no cover 

683 

684 

685def create_provides_map( 

686 packages: dict[str, BinaryPackage], 

687) -> dict[str, set[tuple[str, str]]]: 

688 """Create a provides map from a map binary package names and their BinaryPackage objects 

689 

690 :param packages: A dict mapping binary package names to their BinaryPackage object 

691 :return: A provides map 

692 """ 

693 # create provides 

694 provides = defaultdict(set) 

695 

696 for pkg, dpkg in packages.items(): 

697 if dpkg.provides is None: 

698 continue 

699 # register virtual packages and real packages that provide 

700 # them 

701 for provided_pkg, provided_version, _ in dpkg.provides: 

702 provides[provided_pkg].add((pkg, provided_version)) 

703 

704 return provides 

705 

706 

707def read_release_file(suite_dir: str) -> "TagSection[str]": 

708 """Parses a given "Release" file 

709 

710 :param suite_dir: The directory to the suite 

711 :return: A dict of the first (and only) paragraph in an Release file 

712 """ 

713 release_file = os.path.join(suite_dir, "Release") 

714 with open(release_file) as fd: 

715 tag_file = iter(apt_pkg.TagFile(fd)) 

716 result = next(tag_file) 

717 if next(tag_file, None) is not None: # pragma: no cover 

718 raise TypeError("%s has more than one paragraph" % release_file) 

719 return result 

720 

721 

722def read_sources_file( 

723 filename: str, 

724 sources: dict[str, SourcePackage] | None = None, 

725 add_faux: bool = True, 

726 sources_target_suite: dict[str, SourcePackage] | None = None, 

727 intern: Callable[[str], str] = sys.intern, 

728) -> dict[str, SourcePackage]: 

729 """Parse a single Sources file into a hash 

730 

731 Parse a single Sources file into a dict mapping a source package 

732 name to a SourcePackage object. If there are multiple source 

733 packages with the same version, then highest versioned source 

734 package (that is not marked as "Extra-Source-Only") is the 

735 version kept in the dict. 

736 

737 :param filename: Path to the Sources file. Can be compressed by any algorithm supported by apt_pkg.TagFile 

738 :param sources: Optional dict to add the packages to. If given, this is also the value returned. 

739 :param add_faux: Add a faux arch:all binary for each source that claims it has arch:all 

740 :param sources_target_suite: SourcPackages loaded from the target suite for memory optimizations 

741 :param intern: Internal optimisation / implementation detail to avoid python's "LOAD_GLOBAL" instruction in a loop 

742 :return: mapping from names to a source package 

743 """ 

744 if sources is None: 

745 sources = {} 

746 if sources_target_suite is None: 

747 sources_target_suite = {} 

748 

749 tag_file = apt_pkg.TagFile(filename) 

750 get_field = tag_file.section.get 

751 step = tag_file.step 

752 

753 while step(): 

754 if get_field("Extra-Source-Only", "no") == "yes": 

755 # Ignore sources only referenced by Built-Using 

756 continue 

757 pkg = get_field("Package") 

758 ver = get_field("Version") 

759 # There may be multiple versions of the source package 

760 # (in unstable) if some architectures have out-of-date 

761 # binaries. We only ever consider the source with the 

762 # largest version for migration. 

763 if pkg in sources and apt_pkg.version_compare(sources[pkg].version, ver) > 0: 

764 continue 

765 maint = get_field("Maintainer") 

766 if maint: 766 ↛ 768line 766 didn't jump to line 768 because the condition on line 766 was always true

767 maint = intern(maint.strip()) 

768 section = get_field("Section") 

769 if section: 769 ↛ 772line 769 didn't jump to line 772 because the condition on line 769 was always true

770 section = intern(section.strip()) 

771 build_deps_arch: str | None 

772 build_deps_arch = ", ".join( 

773 x 

774 for x in (get_field("Build-Depends"), get_field("Build-Depends-Arch")) 

775 if x is not None 

776 ) 

777 if build_deps_arch != "": 

778 build_deps_arch = sys.intern(build_deps_arch) 

779 else: 

780 build_deps_arch = None 

781 build_deps_indep = get_field("Build-Depends-Indep") 

782 if build_deps_indep is not None: 

783 build_deps_indep = sys.intern(build_deps_indep) 

784 

785 # Adding arch:all packages to the list of binaries already to be able 

786 # to check for them later. Helps mitigate bug 887060 and is the 

787 # (partial?) answer to bug 1064428. 

788 binaries: set[BinaryPackageId] = set() 

789 if add_faux and "all" in get_field("Architecture", "").split(): 

790 # the value "faux" in arch:faux is used elsewhere, so keep in sync 

791 pkg_id = BinaryPackageId(f"{pkg}-faux", intern("0~~~~"), intern("faux")) 

792 binaries.add(pkg_id) 

793 

794 pkg = intern(pkg) 

795 ver = intern(ver) 

796 sources[pkg] = srcpkg = SourcePackage( 

797 pkg, 

798 ver, 

799 section, 

800 binaries, 

801 maint, 

802 False, 

803 build_deps_arch, 

804 build_deps_indep, 

805 get_field("Testsuite", "").split(), 

806 get_field("Testsuite-Triggers", "").replace(",", "").split(), 

807 ) 

808 

809 if ( 

810 srcpkg_target := sources_target_suite.get(pkg, None) 

811 ) is not None and srcpkg_target.version == ver: 

812 # If the source package exists and the version is the same, reuse the already stored data. 

813 # Note that the binaries field may be different if cruft packages are involved. 

814 srcpkg.build_deps_arch = srcpkg_target.build_deps_arch 

815 srcpkg.build_deps_indep = srcpkg_target.build_deps_indep 

816 srcpkg.testsuite = srcpkg_target.testsuite 

817 srcpkg.testsuite_triggers = srcpkg_target.testsuite_triggers 

818 return sources 

819 

820 

821def _check_and_update_packages( 

822 packages: list[BinaryPackage], 

823 package: BinaryPackage, 

824 archqual: str | None, 

825 build_depends: bool, 

826) -> None: 

827 """Helper for get_dependency_solvers 

828 

829 This method updates the list of packages with a given package if that 

830 package is a valid (Build-)Depends. 

831 

832 :param packages: which packages are to be updated 

833 :param archqual: Architecture qualifier 

834 :param build_depends: If True, check if the "package" parameter is valid as a build-dependency. 

835 """ 

836 

837 # See also bug #971739 and #1059929 

838 if archqual is None: 

839 packages.append(package) 

840 elif archqual == "native" and build_depends: 

841 # Multi-arch handling for build-dependencies 

842 # - :native is ok always 

843 packages.append(package) 

844 elif archqual == "any" and package.multi_arch is MultiArch.ALLOWED: 

845 # Multi-arch handling for both build-dependencies and regular dependencies 

846 # - :any is ok iff the target has "M-A: allowed" 

847 packages.append(package) 

848 

849 

850class GetDependencySolversProto(Protocol): 

851 def __call__( 851 ↛ exitline 851 didn't jump to the function exit

852 self, 

853 block: list[tuple[str, str, str]], 

854 binaries_s_a: dict[str, BinaryPackage], 

855 provides_s_a: dict[str, set[tuple[str, str]]], 

856 *, 

857 build_depends: bool = False, 

858 empty_set: Any = frozenset(), 

859 ) -> list[BinaryPackage]: ... 

860 

861 

862def get_dependency_solvers( 

863 block: list[tuple[str, str, str]], 

864 binaries_s_a: dict[str, BinaryPackage], 

865 provides_s_a: dict[str, set[tuple[str, str]]], 

866 *, 

867 build_depends: bool = False, 

868 empty_set: Any = frozenset(), 

869) -> list[BinaryPackage]: 

870 """Find the packages which satisfy a dependency block 

871 

872 This method returns the list of packages which satisfy a dependency 

873 block (as returned by apt_pkg.parse_depends) in a package table 

874 for a given suite and architecture (a la self.binaries[suite][arch]) 

875 

876 It can also handle build-dependency relations if the named parameter 

877 "build_depends" is set to True. In this case, block should be based 

878 on the return value from apt_pkg.parse_src_depends. 

879 

880 :param block: The dependency block as parsed by apt_pkg.parse_depends (or apt_pkg.parse_src_depends 

881 if the "build_depends" is True) 

882 :param binaries_s_a: Mapping of package names to the relevant BinaryPackage 

883 :param provides_s_a: Mapping of package names to their providers (as generated by parse_provides) 

884 :param build_depends: If True, treat the "block" parameter as a build-dependency relation rather than 

885 a regular dependency relation. 

886 :param empty_set: Internal implementation detail / optimisation 

887 :return: package names solving the relation 

888 """ 

889 packages: list[BinaryPackage] = [] 

890 

891 # for every package, version and operation in the block 

892 for name, version, op in block: 

893 if ":" in name: 

894 name, archqual = name.split(":", 1) 

895 else: 

896 archqual = None 

897 

898 # look for the package in unstable 

899 if name in binaries_s_a: 

900 package = binaries_s_a[name] 

901 # check the versioned dependency and architecture qualifier 

902 # (if present) 

903 if (op == "" and version == "") or apt_pkg.check_dep( 

904 package.version, op, version 

905 ): 

906 _check_and_update_packages(packages, package, archqual, build_depends) 

907 

908 # look for the package in the virtual packages list and loop on them 

909 for prov, prov_version in provides_s_a.get(name, empty_set): 

910 assert prov in binaries_s_a 

911 package = binaries_s_a[prov] 

912 # See Policy Manual §7.5 

913 if (op == "" and version == "") or ( 

914 prov_version != "" and apt_pkg.check_dep(prov_version, op, version) 

915 ): 

916 _check_and_update_packages(packages, package, archqual, build_depends) 

917 

918 return packages 

919 

920 

921def invalidate_excuses( 

922 excuses: dict[str, "Excuse"], 

923 valid: set[str], 

924 invalid: set[str], 

925 invalidated: set[str], 

926) -> None: 

927 """Invalidate impossible excuses 

928 

929 This method invalidates the impossible excuses, which depend 

930 on invalid excuses. The two parameters contains the sets of 

931 `valid' and `invalid' excuses. 

932 """ 

933 # make a list of all packages (source and binary) that are present in the 

934 # excuses we have 

935 excuses_packages: dict[PackageId | BinaryPackageId, set[str]] = defaultdict(set) 

936 for exc in excuses.values(): 

937 for arch in exc.packages: 

938 for pkg_arch_id in exc.packages[arch]: 

939 # note that the same package can be in multiple excuses 

940 # eg. when unstable and TPU have the same packages 

941 excuses_packages[pkg_arch_id].add(exc.name) 

942 

943 # create dependencies between excuses based on packages 

944 excuses_rdeps = defaultdict(set) 

945 for exc in excuses.values(): 

946 # Note that excuses_rdeps is only populated by dependencies generated 

947 # based on packages below. There are currently no dependencies between 

948 # excuses that are added directly, so this is ok. 

949 

950 for pkg_dep in exc.depends_packages: 

951 # set of excuses, each of which can satisfy this specific 

952 # dependency 

953 # if there is a dependency on a package for which no 

954 # excuses exist (e.g. a cruft binary), the set will 

955 # contain an ImpossibleDependencyState 

956 dep_exc: set[str | DependencyState] = set() 

957 for pkg_dep_id in cast(set[BinaryPackageId], pkg_dep.deps): 

958 pkg_excuses = excuses_packages[pkg_dep_id] 

959 # if the dependency isn't found, we get an empty set 

960 if pkg_excuses == frozenset(): 

961 imp_dep = ImpossibleDependencyState( 

962 PolicyVerdict.REJECTED_PERMANENTLY, "%s" % (pkg_dep_id.name) 

963 ) 

964 dep_exc.add(imp_dep) 

965 

966 else: 

967 dep_exc |= pkg_excuses 

968 for e in pkg_excuses: 

969 excuses_rdeps[e].add(exc.name) 

970 if not exc.add_dependency(dep_exc, pkg_dep.spec): 

971 valid.discard(exc.name) 

972 invalid.add(exc.name) 

973 

974 # loop on the invalid excuses 

975 # Convert invalid to a list for deterministic results 

976 invalid2 = sorted(invalid) 

977 for ename in iter_except(invalid2.pop, IndexError): 

978 invalidated.add(ename) 

979 # if there is no reverse dependency, skip the item 

980 if ename not in excuses_rdeps: 

981 continue 

982 

983 rdep_verdict = PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM 

984 if excuses[ename].policy_verdict.is_blocked: 

985 rdep_verdict = PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM 

986 

987 # loop on the reverse dependencies 

988 for x in sorted(excuses_rdeps[ename]): 

989 exc = excuses[x] 

990 # if the item is valid and it is not marked as `forced', then we 

991 # invalidate this specific dependency 

992 if x in valid and not exc.forced: 

993 # mark this specific dependency as invalid 

994 still_valid = exc.invalidate_dependency(ename, rdep_verdict) 

995 

996 # if there are no alternatives left for this dependency, 

997 # invalidate the excuse 

998 if not still_valid: 

999 valid.discard(x) 

1000 invalid2.append(x) 

1001 

1002 

1003def compile_nuninst( 

1004 target_suite: TargetSuite, architectures: list[str], nobreakall_arches: list[str] 

1005) -> dict[str, set[str]]: 

1006 """Compile a nuninst dict from the current testing 

1007 

1008 :param target_suite: The target suite 

1009 :param architectures: Which architectures to check 

1010 :param nobreakall_arches: Which architectures where arch:all packages must be installable 

1011 """ 

1012 nuninst: dict[str, set[str]] = {} 

1013 binaries_t = target_suite.binaries 

1014 

1015 # for all the architectures 

1016 for arch in architectures: 

1017 # if it is in the nobreakall ones, check arch-independent packages too 

1018 check_archall = arch in nobreakall_arches 

1019 

1020 # check all the packages for this architecture 

1021 nuninst[arch] = set() 

1022 packages_t_a = binaries_t[arch] 

1023 for pkg_name, pkg_data in packages_t_a.items(): 

1024 r = target_suite.is_installable(pkg_data.pkg_id) 

1025 if not r: 

1026 nuninst[arch].add(pkg_name) 

1027 

1028 # if they are not required, remove architecture-independent packages 

1029 nuninst[arch + "+all"] = nuninst[arch].copy() 

1030 if not check_archall: 

1031 for pkg_name in nuninst[arch + "+all"]: 

1032 pkg_data = packages_t_a[pkg_name] 

1033 if pkg_data.architecture == "all": 

1034 nuninst[arch].remove(pkg_name) 

1035 

1036 return nuninst 

1037 

1038 

1039def is_smooth_update_allowed( 

1040 binary: BinaryPackage, smooth_updates: list[str], hints: "HintCollection" 

1041) -> bool: 

1042 if "ALL" in smooth_updates: 1042 ↛ 1043line 1042 didn't jump to line 1043 because the condition on line 1042 was never true

1043 return True 

1044 section = binary.section.split("/")[-1] 

1045 if section in smooth_updates: 

1046 return True 

1047 # note that this needs to match the source version *IN TESTING* 

1048 return hints.has_hint( 

1049 "allow-smooth-update", package=binary.source, version=binary.source_version 

1050 ) 

1051 

1052 

1053def find_smooth_updateable_binaries( 

1054 binaries_to_check: list[BinaryPackageId], 

1055 source_data: SourcePackage, 

1056 pkg_universe: "BinaryPackageUniverse", 

1057 target_suite: TargetSuite, 

1058 binaries_t: dict[str, dict[str, BinaryPackage]], 

1059 binaries_s: dict[str, dict[str, BinaryPackage]], 

1060 removals: set[BinaryPackageId] | frozenset[BinaryPackageId], 

1061 smooth_updates: list[str], 

1062 hints: "HintCollection", 

1063) -> set[BinaryPackageId]: 

1064 check: set[BinaryPackageId] = set() 

1065 smoothbins: set[BinaryPackageId] = set() 

1066 

1067 binaries_to_check_set = set(binaries_to_check) 

1068 for check_pkg_id in binaries_to_check: 

1069 binary, parch = check_pkg_id.package_name, check_pkg_id.architecture 

1070 

1071 cruftbins: set[BinaryPackageId] = set() 

1072 

1073 # Not a candidate for smooth up date (newer non-cruft version in unstable) 

1074 if binary in binaries_s[parch]: 

1075 if binaries_s[parch][binary].source_version == source_data.version: 

1076 continue 

1077 cruftbins.add(binaries_s[parch][binary].pkg_id) 

1078 

1079 # Maybe a candidate (cruft or removed binary): check if config allows us to smooth update it. 

1080 if is_smooth_update_allowed(binaries_t[parch][binary], smooth_updates, hints): 

1081 # if the package has reverse-dependencies which are 

1082 # built from other sources, it's a valid candidate for 

1083 # a smooth update. if not, it may still be a valid 

1084 # candidate if one if its r-deps is itself a candidate, 

1085 # so note it for checking later 

1086 # 

1087 # We ignore all binaries listed in "removals" as we 

1088 # assume they will leave at the same time as the 

1089 # given package. 

1090 rdeps = { 

1091 x 

1092 for x in pkg_universe.reverse_dependencies_of(check_pkg_id) 

1093 if x not in removals and x not in binaries_to_check_set 

1094 } 

1095 

1096 smooth_update_it = False 

1097 if target_suite.any_of_these_are_in_the_suite(rdeps): 

1098 for rdep in rdeps: 

1099 # each dependency clause has a set of possible 

1100 # alternatives that can satisfy that dependency. 

1101 # if any of them is outside the set of smoothbins, the 

1102 # dependency can be satisfied even if this binary was 

1103 # removed, so there is no need to keep it around for a 

1104 # smooth update 

1105 # if not, only this binary can satisfy the dependency, so 

1106 # we should keep it around until the rdep is no longer in 

1107 # testing 

1108 for dep_clause in pkg_universe.dependencies_of(rdep): 

1109 # filter out cruft binaries from unstable, because 

1110 # they will not be added to the set of packages that 

1111 # will be migrated 

1112 if all( 

1113 x in smoothbins or x == check_pkg_id 

1114 for x in dep_clause 

1115 if x not in cruftbins 

1116 ): 

1117 smoothbins.add(check_pkg_id) 

1118 smooth_update_it = True 

1119 break 

1120 

1121 if not smooth_update_it: 

1122 check.add(check_pkg_id) 

1123 

1124 # check whether we should perform a smooth update for 

1125 # packages which are candidates but do not have r-deps 

1126 # outside of the current source 

1127 while 1: 

1128 found_any = False 

1129 for candidate_pkg_id in check: 

1130 rdeps = pkg_universe.reverse_dependencies_of(candidate_pkg_id) 

1131 if not rdeps.isdisjoint(smoothbins): 

1132 smoothbins.add(candidate_pkg_id) 

1133 found_any = True 

1134 if not found_any: 

1135 break 

1136 check = {x for x in check if x not in smoothbins} 

1137 

1138 return smoothbins 

1139 

1140 

1141def find_newer_binaries( 

1142 suite_info: Suites, pkg: BinaryPackage, add_source_for_dropped_bin: bool = False 

1143) -> Generator[tuple[PackageId, Suite], None, None]: 

1144 """ 

1145 Find newer binaries for pkg in any of the source suites. 

1146 

1147 :param pkg: BinaryPackage (is assumed to be in the target suite) 

1148 

1149 :param add_source_for_dropped_bin: If True, newer versions of the 

1150 source of pkg will be added if they don't have the binary pkg 

1151 

1152 :return: the newer binaries (or sources) and their suites 

1153 """ 

1154 source = pkg.source 

1155 for suite in suite_info: 

1156 if suite.suite_class is SuiteClass.TARGET_SUITE: 

1157 continue 

1158 

1159 suite_binaries_on_arch = suite.binaries.get(pkg.pkg_id.architecture) 

1160 if not suite_binaries_on_arch: 1160 ↛ 1161line 1160 didn't jump to line 1161 because the condition on line 1160 was never true

1161 continue 

1162 

1163 newerbin = None 

1164 if pkg.pkg_id.package_name in suite_binaries_on_arch: 

1165 newerbin = suite_binaries_on_arch[pkg.pkg_id.package_name] 

1166 if suite.is_cruft(newerbin): 

1167 # We pretend the cruft binary doesn't exist. 

1168 # We handle this as if the source didn't have the binary 

1169 # (see below) 

1170 newerbin = None 

1171 elif apt_pkg.version_compare(newerbin.version, pkg.version) <= 0: 

1172 continue 

1173 else: 

1174 if source not in suite.sources: 

1175 # bin and source not in suite: no newer version 

1176 continue 

1177 

1178 if not newerbin: 

1179 if not add_source_for_dropped_bin: 1179 ↛ 1180line 1179 didn't jump to line 1180 because the condition on line 1179 was never true

1180 continue 

1181 # We only get here if there is a newer version of the source, 

1182 # which doesn't have the binary anymore (either it doesn't 

1183 # exist, or it's cruft and we pretend it doesn't exist). 

1184 # Add the new source instead. 

1185 nsrc = suite.sources[source] 

1186 n_id = PackageId(source, nsrc.version, "source") 

1187 overs = pkg.source_version 

1188 if apt_pkg.version_compare(nsrc.version, overs) <= 0: 

1189 continue 

1190 else: 

1191 n_id = newerbin.pkg_id 

1192 

1193 yield (n_id, suite) 

1194 

1195 

1196def parse_provides( 

1197 provides_raw: str, 

1198 pkg_id: BinaryPackageId | None = None, 

1199 logger: logging.Logger | None = None, 

1200) -> list[tuple[str, str, str]]: 

1201 parts = apt_pkg.parse_depends(provides_raw, False) 

1202 nprov = [] 

1203 for or_clause in parts: 

1204 if len(or_clause) != 1: # pragma: no cover 

1205 if logger is not None: 

1206 msg = "Ignoring invalid provides in %s: Alternatives [%s]" 

1207 logger.warning(msg, str(pkg_id), str(or_clause)) 

1208 continue 

1209 for part in or_clause: 

1210 provided, provided_version, op = part 

1211 if op != "" and op != "=": # pragma: no cover 

1212 if logger is not None: 

1213 msg = "Ignoring invalid provides in %s: %s (%s %s)" 

1214 logger.warning(msg, str(pkg_id), provided, op, provided_version) 

1215 continue 

1216 provided = sys.intern(provided) 

1217 provided_version = sys.intern(provided_version) 

1218 part = (provided, provided_version, sys.intern(op)) 

1219 nprov.append(part) 

1220 return nprov 

1221 

1222 

1223def parse_builtusing( 

1224 builtusing_raw: str, 

1225 pkg_id: BinaryPackageId | None = None, 

1226 logger: logging.Logger | None = None, 

1227) -> list[tuple[str, str]]: 

1228 parts = apt_pkg.parse_depends(builtusing_raw, False) 

1229 nbu = [] 

1230 for or_clause in parts: 

1231 if len(or_clause) != 1: # pragma: no cover 

1232 if logger is not None: 

1233 msg = "Ignoring invalid builtusing in %s: Alternatives [%s]" 

1234 logger.warning(msg, str(pkg_id), str(or_clause)) 

1235 continue 

1236 for part in or_clause: 

1237 bu, bu_version, op = part 

1238 if op != "=": # pragma: no cover 

1239 if logger is not None: 

1240 msg = "Ignoring invalid builtusing in %s: %s (%s %s)" 

1241 logger.warning(msg, str(pkg_id), bu, op, bu_version) 

1242 continue 

1243 bu = sys.intern(bu) 

1244 bu_version = sys.intern(bu_version) 

1245 nbu.append((bu, bu_version)) 

1246 return nbu 

1247 

1248 

1249def parse_option( 

1250 options: "optparse.Values", 

1251 option_name: str, 

1252 default: Any | None = None, 

1253 to_bool: bool = False, 

1254 to_int: bool = False, 

1255 day_to_sec: bool = False, 

1256) -> None: 

1257 """Ensure the option exist and has a sane value 

1258 

1259 :param options: dict with options 

1260 

1261 :param option_name: string with the name of the option 

1262 

1263 :param default: the default value for the option 

1264 

1265 :param to_int: convert the input to int (defaults to sys.maxsize) 

1266 

1267 :param to_bool: convert the input to bool 

1268 

1269 :param day_to_sec: convert the input from days to seconds (implies to_int=True) 

1270 """ 

1271 value = getattr(options, option_name, default) 

1272 

1273 # Option was provided with no value (or default is '') so pick up the default 

1274 if value == "": 

1275 value = default 

1276 

1277 if (to_int or day_to_sec) and value in (None, ""): 

1278 value = sys.maxsize 

1279 

1280 if day_to_sec: 

1281 value = int(float(value) * 24 * 60 * 60) # type: ignore[arg-type] 

1282 

1283 if to_int: 

1284 value = int(value) # type: ignore[arg-type] 

1285 

1286 if to_bool: 

1287 if value and ( 

1288 isinstance(value, bool) or value.lower() in ("yes", "y", "true", "t", "1") 

1289 ): 

1290 value = True 

1291 else: 

1292 value = False 

1293 

1294 setattr(options, option_name, value) 

1295 

1296 

1297def filter_out_faux_gen( 

1298 binaries: Iterable[BinaryPackageId], 

1299) -> Generator[BinaryPackageId, None, None]: 

1300 """Generator for packages without faux packages""" 

1301 

1302 for pkg in binaries: 

1303 if not pkg.package_name.endswith("-faux-build-depends"): 

1304 yield pkg 

1305 

1306 

1307def filter_out_faux(binaries: Iterable[BinaryPackageId]) -> set[BinaryPackageId]: 

1308 """Returns a set without faux packages""" 

1309 

1310 return {pkg for pkg in filter_out_faux_gen(binaries)} 

1311 

1312 

1313def binaries_from_source_version( 

1314 source_data: SourcePackage, suite_info: Suites 

1315) -> tuple[set[BinaryPackageId], str]: 

1316 """Returns a set of real bid with only packages from this source version""" 

1317 

1318 binaries = source_data.binaries.copy() 

1319 # We don't know from which suite the source version comes 

1320 for suite in suite_info.source_suites: 1320 ↛ 1331line 1320 didn't jump to line 1331 because the loop on line 1320 didn't complete

1321 # But if it's there, we assume it will have all the associated binaries 

1322 if source_data.source in suite.sources: 1322 ↛ 1320line 1322 didn't jump to line 1320 because the condition on line 1322 was always true

1323 for bid in binaries.copy(): 

1324 if ( 

1325 suite.all_binaries_in_suite[bid].source_version 

1326 != source_data.version 

1327 ): 

1328 binaries.remove(bid) 

1329 break 

1330 

1331 return filter_out_faux(binaries), suite.name 

1332 

1333 

1334def get_component(section: str) -> str: 

1335 """Returns the component based on the section""" 

1336 

1337 # horrible hard-coding, but currently, we don't keep track of the component 

1338 # when loading the packages files, but let's centralize it here 

1339 component = "main" 

1340 if "/" in section: 

1341 component = section.split("/")[0] 

1342 return component