Coverage for britney2/utils.py: 93%

447 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-04-18 20:48 +0000

1# -*- coding: utf-8 -*- 

2 

3# Refactored parts from britney.py, which is/was: 

4# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org> 

5# Andreas Barth <aba@debian.org> 

6# Fabio Tranchitella <kobold@debian.org> 

7# Copyright (C) 2010-2012 Adam D. Barratt <adsb@debian.org> 

8# Copyright (C) 2012 Niels Thykier <niels@thykier.net> 

9# 

10# New portions 

11# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org> 

12 

13# This program is free software; you can redistribute it and/or modify 

14# it under the terms of the GNU General Public License as published by 

15# the Free Software Foundation; either version 2 of the License, or 

16# (at your option) any later version. 

17 

18# This program is distributed in the hope that it will be useful, 

19# but WITHOUT ANY WARRANTY; without even the implied warranty of 

20# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

21# GNU General Public License for more details. 

22 

23 

24import apt_pkg 

25import errno 

26import logging 

27import os 

28import sys 

29import time 

30from typing import Optional 

31from collections import defaultdict 

32from datetime import datetime 

33from functools import partial 

34from itertools import filterfalse, chain 

35 

36import yaml 

37 

38from britney2 import BinaryPackage, BinaryPackageId, PackageId, SourcePackage, SuiteClass 

39from britney2.excusedeps import ImpossibleDependencyState 

40from britney2.policies import PolicyVerdict 

41 

42 

43class MigrationConstraintException(Exception): 

44 pass 

45 

46 

47def ifilter_except(container, iterable=None): 

48 """Filter out elements in container 

49 

50 If given an iterable it returns a filtered iterator, otherwise it 

51 returns a function to generate filtered iterators. The latter is 

52 useful if the same filter has to be (re-)used on multiple 

53 iterators that are not known on beforehand. 

54 """ 

55 if iterable is not None: 55 ↛ 56line 55 didn't jump to line 56, because the condition on line 55 was never true

56 return filterfalse(container.__contains__, iterable) 

57 return partial(filterfalse, container.__contains__) 

58 

59 

60def ifilter_only(container, iterable=None): 

61 """Filter out elements in which are not in container 

62 

63 If given an iterable it returns a filtered iterator, otherwise it 

64 returns a function to generate filtered iterators. The latter is 

65 useful if the same filter has to be (re-)used on multiple 

66 iterators that are not known on beforehand. 

67 """ 

68 if iterable is not None: 68 ↛ 70line 68 didn't jump to line 70, because the condition on line 68 was never false

69 return filter(container.__contains__, iterable) 

70 return partial(filter, container.__contains__) 

71 

72 

73# iter_except is from the "itertools" recipe 

74def iter_except(func, exception, first=None): # pragma: no cover - itertools recipe function 

75 """ Call a function repeatedly until an exception is raised. 

76 

77 Converts a call-until-exception interface to an iterator interface. 

78 Like __builtin__.iter(func, sentinel) but uses an exception instead 

79 of a sentinel to end the loop. 

80 

81 Examples: 

82 bsddbiter = iter_except(db.next, bsddb.error, db.first) 

83 heapiter = iter_except(functools.partial(heappop, h), IndexError) 

84 dictiter = iter_except(d.popitem, KeyError) 

85 dequeiter = iter_except(d.popleft, IndexError) 

86 queueiter = iter_except(q.get_nowait, Queue.Empty) 

87 setiter = iter_except(s.pop, KeyError) 

88 

89 """ 

90 try: 

91 if first is not None: 

92 yield first() 

93 while 1: 

94 yield func() 

95 except exception: 

96 pass 

97 

98 

99def log_and_format_old_libraries(logger, libs): 

100 """Format and log old libraries in a table (no header)""" 

101 libraries = {} 

102 for i in libs: 

103 pkg = i.package 

104 if pkg in libraries: 

105 libraries[pkg].append(i.architecture) 

106 else: 

107 libraries[pkg] = [i.architecture] 

108 

109 for lib in sorted(libraries): 

110 logger.info(" %s: %s", lib, " ".join(libraries[lib])) 

111 

112 

113def compute_reverse_tree(pkg_universe, affected): 

114 """Calculate the full dependency tree for a set of packages 

115 

116 This method returns the full dependency tree for a given set of 

117 packages. The first argument is an instance of the BinaryPackageUniverse 

118 and the second argument are a set of BinaryPackageId. 

119 

120 The set of affected packages will be updated in place and must 

121 therefore be mutable. 

122 """ 

123 remain = list(affected) 

124 while remain: 

125 pkg_id = remain.pop() 

126 new_pkg_ids = pkg_universe.reverse_dependencies_of(pkg_id) - affected 

127 affected.update(new_pkg_ids) 

128 remain.extend(new_pkg_ids) 

129 return None 

130 

131 

132def add_transitive_dependencies_flatten(pkg_universe, initial_set): 

133 """Find and include all transitive dependencies 

134 

135 This method updates the initial_set parameter to include all transitive 

136 dependencies. The first argument is an instance of the BinaryPackageUniverse 

137 and the second argument are a set of BinaryPackageId. 

138 

139 The set of initial packages will be updated in place and must 

140 therefore be mutable. 

141 """ 

142 remain = list(initial_set) 

143 while remain: 

144 pkg_id = remain.pop() 

145 new_pkg_ids = [x for x in chain.from_iterable(pkg_universe.dependencies_of(pkg_id)) if x not in initial_set] 

146 initial_set.update(new_pkg_ids) 

147 remain.extend(new_pkg_ids) 

148 return None 

149 

150 

151def write_nuninst(filename, nuninst): 

152 """Write the non-installable report 

153 

154 Write the non-installable report derived from "nuninst" to the 

155 file denoted by "filename". 

156 """ 

157 with open(filename, 'w', encoding='utf-8') as f: 

158 # Having two fields with (almost) identical dates seems a bit 

159 # redundant. 

160 f.write("Built on: " + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) + "\n") 

161 f.write("Last update: " + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) + "\n\n") 

162 for k in nuninst: 

163 f.write("%s: %s\n" % (k, " ".join(nuninst[k]))) 

164 

165 

166def read_nuninst(filename, architectures): 

167 """Read the non-installable report 

168 

169 Read the non-installable report from the file denoted by 

170 "filename" and return it. Only architectures in "architectures" 

171 will be included in the report. 

172 """ 

173 nuninst = {} 

174 with open(filename, encoding='ascii') as f: 

175 for r in f: 

176 if ":" not in r: 

177 continue 

178 arch, packages = r.strip().split(":", 1) 

179 if arch.split("+", 1)[0] in architectures: 

180 nuninst[arch] = set(packages.split()) 

181 return nuninst 

182 

183 

184def newly_uninst(nuold, nunew): 

185 """Return a nuninst statstic with only new uninstallable packages 

186 

187 This method subtracts the uninstallable packages of the statistic 

188 "nunew" from the statistic "nuold". 

189 

190 It returns a dictionary with the architectures as keys and the list 

191 of uninstallable packages as values. If there are no regressions 

192 on a given architecture, then the architecture will be omitted in 

193 the result. Accordingly, if none of the architectures have 

194 regressions an empty directory is returned. 

195 """ 

196 res = {} 

197 for arch in ifilter_only(nunew, nuold): 

198 arch_nuninst = [x for x in nunew[arch] if x not in nuold[arch]] 

199 # Leave res empty if there are no newly uninst packages 

200 if arch_nuninst: 

201 res[arch] = arch_nuninst 

202 return res 

203 

204 

205def format_and_log_uninst(logger, architectures, nuninst, *, loglevel=logging.INFO): 

206 """Emits the uninstallable packages to the log 

207 

208 An example of the output string is: 

209 * i386: broken-pkg1, broken-pkg2 

210 

211 Note that if there is no uninstallable packages, then nothing is emitted. 

212 """ 

213 for arch in architectures: 

214 if arch in nuninst and nuninst[arch]: 

215 msg = " * %s: %s" % (arch, ", ".join(sorted(nuninst[arch]))) 

216 logger.log(loglevel, msg) 

217 

218 

219def write_heidi(filename, target_suite, *, outofsync_arches=frozenset(), sorted=sorted): 

220 """Write the output HeidiResult 

221 

222 This method write the output for Heidi, which contains all the 

223 binary packages and the source packages in the form: 

224 

225 <pkg-name> <pkg-version> <pkg-architecture> <pkg-section> 

226 <src-name> <src-version> source <src-section> 

227 

228 The file is written as "filename" using the sources and packages 

229 from the "target_suite" parameter. 

230 

231 outofsync_arches: If given, it is a set of architectures marked 

232 as "out of sync". The output file may exclude some out of date 

233 arch:all packages for those architectures to reduce the noise. 

234 

235 The "X=X" parameters are optimizations to avoid "load global" in 

236 the loops. 

237 """ 

238 sources_t = target_suite.sources 

239 packages_t = target_suite.binaries 

240 

241 with open(filename, 'w', encoding='ascii') as f: 

242 

243 # write binary packages 

244 for arch in sorted(packages_t): 

245 binaries = packages_t[arch] 

246 for pkg_name in sorted(binaries): 

247 pkg = binaries[pkg_name] 

248 pkgv = pkg.version 

249 pkgarch = pkg.architecture or 'all' 

250 pkgsec = pkg.section or 'faux' 

251 if pkgsec == 'faux' or pkgsec.endswith('/faux'): 

252 # Faux package; not really a part of testing 

253 continue 

254 if pkg.source_version and pkgarch == 'all' and \ 254 ↛ 263line 254 didn't jump to line 263, because the condition on line 254 was never true

255 pkg.source_version != sources_t[pkg.source].version and \ 

256 arch in outofsync_arches: 

257 # when architectures are marked as "outofsync", their binary 

258 # versions may be lower than those of the associated 

259 # source package in testing. the binary package list for 

260 # such architectures will include arch:all packages 

261 # matching those older versions, but we only want the 

262 # newer arch:all in testing 

263 continue 

264 f.write('%s %s %s %s\n' % (pkg_name, pkgv, pkgarch, pkgsec)) 

265 

266 # write sources 

267 for src_name in sorted(sources_t): 

268 src = sources_t[src_name] 

269 srcv = src.version 

270 srcsec = src.section or 'unknown' 

271 if srcsec == 'faux' or srcsec.endswith('/faux'): 

272 # Faux package; not really a part of testing 

273 continue 

274 f.write('%s %s source %s\n' % (src_name, srcv, srcsec)) 

275 

276 

277def write_heidi_delta(filename, all_selected): 

278 """Write the output delta 

279 

280 This method writes the packages to be upgraded, in the form: 

281 <src-name> <src-version> 

282 or (if the source is to be removed): 

283 -<src-name> <src-version> 

284 

285 The order corresponds to that shown in update_output. 

286 """ 

287 with open(filename, "w", encoding='ascii') as fd: 

288 

289 fd.write("#HeidiDelta\n") 

290 

291 for item in all_selected: 

292 prefix = "" 

293 

294 if item.is_removal: 

295 prefix = "-" 

296 

297 if item.architecture == 'source': 

298 fd.write('%s%s %s\n' % (prefix, item.package, item.version)) 

299 else: 

300 fd.write('%s%s %s %s\n' % (prefix, item.package, 

301 item.version, item.architecture)) 

302 

303 

304def write_excuses(excuses, dest_file, output_format="yaml"): 

305 """Write the excuses to dest_file 

306 

307 Writes a list of excuses in a specified output_format to the 

308 path denoted by dest_file. The output_format can either be "yaml" 

309 or "legacy-html". 

310 """ 

311 excuselist = sorted(excuses.values(), key=lambda x: x.sortkey()) 

312 if output_format == "yaml": 

313 os.makedirs(os.path.dirname(dest_file), exist_ok=True) 

314 opener = open 

315 if dest_file.endswith('.xz'): 315 ↛ 316line 315 didn't jump to line 316, because the condition on line 315 was never true

316 import lzma 

317 opener = lzma.open 

318 elif dest_file.endswith('.gz'): 318 ↛ 319line 318 didn't jump to line 319, because the condition on line 318 was never true

319 import gzip 

320 opener = gzip.open 

321 with opener(dest_file, 'wt', encoding='utf-8') as f: 

322 edatalist = [e.excusedata(excuses) for e in excuselist] 

323 excusesdata = { 

324 'sources': edatalist, 

325 'generated-date': datetime.utcnow(), 

326 } 

327 f.write(yaml.dump(excusesdata, default_flow_style=False, allow_unicode=True)) 

328 elif output_format == "legacy-html": 

329 with open(dest_file, 'w', encoding='utf-8') as f: 

330 f.write("<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.01//EN\" \"http://www.w3.org/TR/REC-html40/strict.dtd\">\n") 

331 f.write("<html><head><title>excuses...</title>") 

332 f.write("<meta http-equiv=\"Content-Type\" content=\"text/html;charset=utf-8\"></head><body>\n") 

333 f.write("<p>Generated: " + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) + "</p>\n") 

334 f.write("<ul>\n") 

335 for e in excuselist: 

336 f.write("<li>%s" % e.html(excuses)) 

337 f.write("</ul></body></html>\n") 

338 else: # pragma: no cover 

339 raise ValueError('Output format must be either "yaml or "legacy-html"') 

340 

341 

342def old_libraries(mi_factory, suite_info, outofsync_arches=frozenset()): 

343 """Detect old libraries left in the target suite for smooth transitions 

344 

345 This method detects old libraries which are in the target suite but no 

346 longer built from the source package: they are still there because 

347 other packages still depend on them, but they should be removed as 

348 soon as possible. 

349 

350 For "outofsync" architectures, outdated binaries are allowed to be in 

351 the target suite, so they are only added to the removal list if they 

352 are no longer in the (primary) source suite. 

353 """ 

354 sources_t = suite_info.target_suite.sources 

355 binaries_t = suite_info.target_suite.binaries 

356 binaries_s = suite_info.primary_source_suite.binaries 

357 removals = [] 

358 for arch in binaries_t: 

359 for pkg_name in binaries_t[arch]: 

360 pkg = binaries_t[arch][pkg_name] 

361 if sources_t[pkg.source].version != pkg.source_version and \ 

362 (arch not in outofsync_arches or pkg_name not in binaries_s[arch]): 

363 removals.append(mi_factory.generate_removal_for_cruft_item(pkg.pkg_id)) 

364 return removals 

365 

366 

367def is_nuninst_asgood_generous(constraints, allow_uninst, architectures, old, new, break_arches=frozenset()): 

368 """Compares the nuninst counters and constraints to see if they improved 

369 

370 Given a list of architectures, the previous and the current nuninst 

371 counters, this function determines if the current nuninst counter 

372 is better than the previous one. Optionally it also accepts a set 

373 of "break_arches", the nuninst counter for any architecture listed 

374 in this set are completely ignored. 

375 

376 If the nuninst counters are equal or better, then the constraints 

377 are checked for regressions (ignoring break_arches). 

378 

379 Returns True if the new nuninst counter is better than the 

380 previous and there are no constraint regressions (ignoring Break-archs). 

381 Returns False otherwise. 

382 

383 """ 

384 diff = 0 

385 for arch in architectures: 

386 if arch in break_arches: 

387 continue 

388 diff = diff + \ 

389 (len(new[arch] - allow_uninst[arch]) 

390 - len(old[arch] - allow_uninst[arch])) 

391 if diff > 0: 

392 return False 

393 must_be_installable = constraints['keep-installable'] 

394 for arch in architectures: 

395 if arch in break_arches: 

396 continue 

397 regression = new[arch] - old[arch] 

398 if not regression.isdisjoint(must_be_installable): 398 ↛ 399line 398 didn't jump to line 399, because the condition on line 398 was never true

399 return False 

400 return True 

401 

402 

403def clone_nuninst(nuninst, *, packages_s=None, architectures=None): 

404 """Completely or Selectively deep clone nuninst 

405 

406 Given nuninst table, the package table for a given suite and 

407 a list of architectures, this function will clone the nuninst 

408 table. Only the listed architectures will be deep cloned - 

409 the rest will only be shallow cloned. When packages_s is given, 

410 packages not listed in packages_s will be pruned from the clone 

411 (if packages_s is omitted, the per architecture nuninst is cloned 

412 as-is) 

413 """ 

414 clone = nuninst.copy() 

415 if architectures is None: 415 ↛ 416line 415 didn't jump to line 416, because the condition on line 415 was never true

416 return clone 

417 if packages_s is not None: 

418 for arch in architectures: 

419 clone[arch] = set(x for x in nuninst[arch] if x in packages_s[arch]) 

420 clone[arch + "+all"] = set(x for x in nuninst[arch + "+all"] if x in packages_s[arch]) 

421 else: 

422 for arch in architectures: 

423 clone[arch] = set(nuninst[arch]) 

424 clone[arch + "+all"] = set(nuninst[arch + "+all"]) 

425 return clone 

426 

427 

428def test_installability(target_suite, pkg_name, pkg_id, broken, nuninst_arch): 

429 """Test for installability of a package on an architecture 

430 

431 (pkg_name, pkg_version, pkg_arch) is the package to check. 

432 

433 broken is the set of broken packages. If p changes 

434 installability (e.g. goes from uninstallable to installable), 

435 broken will be updated accordingly. 

436 

437 If nuninst_arch is not None then it also updated in the same 

438 way as broken is. 

439 """ 

440 c = 0 

441 r = target_suite.is_installable(pkg_id) 

442 if not r: 

443 # not installable 

444 if pkg_name not in broken: 

445 # regression 

446 broken.add(pkg_name) 

447 c = -1 

448 if nuninst_arch is not None and pkg_name not in nuninst_arch: 

449 nuninst_arch.add(pkg_name) 

450 else: 

451 if pkg_name in broken: 

452 # Improvement 

453 broken.remove(pkg_name) 

454 c = 1 

455 if nuninst_arch is not None and pkg_name in nuninst_arch: 

456 nuninst_arch.remove(pkg_name) 

457 return c 

458 

459 

460def check_installability(target_suite, binaries, arch, updates, check_archall, nuninst): 

461 broken = nuninst[arch + "+all"] 

462 packages_t_a = binaries[arch] 

463 

464 for pkg_id in (x for x in updates if x.architecture == arch): 

465 name, version, parch = pkg_id 

466 if name not in packages_t_a: 

467 continue 

468 pkgdata = packages_t_a[name] 

469 if version != pkgdata.version: 

470 # Not the version in testing right now, ignore 

471 continue 

472 actual_arch = pkgdata.architecture 

473 nuninst_arch = None 

474 # only check arch:all packages if requested 

475 if check_archall or actual_arch != 'all': 

476 nuninst_arch = nuninst[parch] 

477 elif actual_arch == 'all': 477 ↛ 479line 477 didn't jump to line 479, because the condition on line 477 was never false

478 nuninst[parch].discard(name) 

479 test_installability(target_suite, name, pkg_id, broken, nuninst_arch) 

480 

481 

482def possibly_compressed(path, *, permitted_compressions=None): 

483 """Find and select a (possibly compressed) variant of a path 

484 

485 If the given path exists, it will be returned 

486 

487 :param path The base path. 

488 :param permitted_compressions An optional list of alternative extensions to look for. 

489 Defaults to "gz" and "xz". 

490 :returns The path given possibly with one of the permitted extensions. Will raise a 

491 FileNotFoundError 

492 """ 

493 if os.path.exists(path): 493 ↛ 495line 493 didn't jump to line 495, because the condition on line 493 was never false

494 return path 

495 if permitted_compressions is None: 

496 permitted_compressions = ['gz', 'xz'] 

497 for ext in permitted_compressions: 

498 cpath = "%s.%s" % (path, ext) 

499 if os.path.exists(cpath): 

500 return cpath 

501 raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path) # pragma: no cover 

502 

503 

504def create_provides_map(packages: dict[str, BinaryPackage]) -> dict[str, set[tuple[str, str]]]: 

505 """Create a provides map from a map binary package names and their BinaryPackage objects 

506 

507 :param packages: A dict mapping binary package names to their BinaryPackage object 

508 :return: A provides map 

509 """ 

510 # create provides 

511 provides = defaultdict(set) 

512 

513 for pkg, dpkg in packages.items(): 

514 # register virtual packages and real packages that provide 

515 # them 

516 for provided_pkg, provided_version, _ in dpkg.provides: 

517 provides[provided_pkg].add((pkg, provided_version)) 

518 

519 return provides 

520 

521 

522def read_release_file(suite_dir): 

523 """Parses a given "Release" file 

524 

525 :param suite_dir: The directory to the suite 

526 :return: A dict of the first (and only) paragraph in an Release file 

527 """ 

528 release_file = os.path.join(suite_dir, 'Release') 

529 with open(release_file) as fd: 

530 tag_file = iter(apt_pkg.TagFile(fd)) 

531 result = next(tag_file) 

532 if next(tag_file, None) is not None: # pragma: no cover 

533 raise TypeError("%s has more than one paragraph" % release_file) 

534 return result 

535 

536 

537def read_sources_file(filename: str, 

538 sources: Optional[dict[str, SourcePackage]] = None, 

539 add_faux: bool = True, 

540 intern=sys.intern) -> dict[str, SourcePackage]: 

541 """Parse a single Sources file into a hash 

542 

543 Parse a single Sources file into a dict mapping a source package 

544 name to a SourcePackage object. If there are multiple source 

545 packages with the same version, then highest versioned source 

546 package (that is not marked as "Extra-Source-Only") is the 

547 version kept in the dict. 

548 

549 :param filename: Path to the Sources file. Can be compressed by any algorithm supported by apt_pkg.TagFile 

550 :param sources: Optional dict to add the packages to. If given, this is also the value returned. 

551 :param add_faux: Add a faux arch:all binary for each source that claims it has arch:all 

552 :param intern: Internal optimisation / implementation detail to avoid python's "LOAD_GLOBAL" instruction in a loop 

553 :return a dict mapping a name to a source package 

554 """ 

555 if sources is None: 

556 sources = {} 

557 

558 tag_file = apt_pkg.TagFile(filename) 

559 get_field = tag_file.section.get 

560 step = tag_file.step 

561 

562 while step(): 

563 if get_field('Extra-Source-Only', 'no') == 'yes': 

564 # Ignore sources only referenced by Built-Using 

565 continue 

566 pkg = get_field('Package') 

567 ver = get_field('Version') 

568 # There may be multiple versions of the source package 

569 # (in unstable) if some architectures have out-of-date 

570 # binaries. We only ever consider the source with the 

571 # largest version for migration. 

572 if pkg in sources and apt_pkg.version_compare(sources[pkg].version, ver) > 0: 

573 continue 

574 maint = get_field('Maintainer') 

575 if maint: 575 ↛ 577line 575 didn't jump to line 577, because the condition on line 575 was never false

576 maint = intern(maint.strip()) 

577 section = get_field('Section') 

578 if section: 578 ↛ 581line 578 didn't jump to line 581, because the condition on line 578 was never false

579 section = intern(section.strip()) 

580 build_deps_arch: Optional[str] 

581 build_deps_arch = ", ".join(x for x in (get_field('Build-Depends'), get_field('Build-Depends-Arch')) 

582 if x is not None) 

583 if build_deps_arch != '': 

584 build_deps_arch = sys.intern(build_deps_arch) 

585 else: 

586 build_deps_arch = None 

587 build_deps_indep = get_field('Build-Depends-Indep') 

588 if build_deps_indep is not None: 

589 build_deps_indep = sys.intern(build_deps_indep) 

590 

591 # Adding arch:all packages to the list of binaries already to be able 

592 # to check for them later. Helps mitigate bug 887060 and is the 

593 # (partial?) answer to bug 1064428. 

594 binaries = set() 

595 if add_faux and "all" in get_field('Architecture', '').split(): 

596 # the value "faux" in arch:faux is used elsewhere, so keep in sync 

597 pkg_id = BinaryPackageId(pkg + "-faux", intern("0~~~~"), intern("faux")) 

598 binaries.add(pkg_id) 

599 

600 sources[intern(pkg)] = SourcePackage(intern(pkg), 

601 intern(ver), 

602 section, 

603 binaries, 

604 maint, 

605 False, 

606 build_deps_arch, 

607 build_deps_indep, 

608 get_field('Testsuite', '').split(), 

609 get_field('Testsuite-Triggers', '').replace(',', '').split(), 

610 ) 

611 return sources 

612 

613 

614def _check_and_update_packages(packages, package, archqual, build_depends): 

615 """Helper for get_dependency_solvers 

616 

617 This method updates the list of packages with a given package if that 

618 package is a valid (Build-)Depends. 

619 

620 :param packages: list of packages to be updated 

621 :param package: a BinaryPackage 

622 :param archqual: None or string with an architecture qualifier 

623 :param build_depends: If True, check if the "package" parameter is valid as a build-dependency. 

624 """ 

625 

626 # See also bug #971739 and #1059929 

627 if archqual is None: 

628 packages.append(package) 

629 elif archqual == 'native' and build_depends: 

630 # Multi-arch handling for build-dependencies 

631 # - :native is ok always 

632 packages.append(package) 

633 elif archqual == 'any' and package.multi_arch == 'allowed': 

634 # Multi-arch handling for both build-dependencies and regular dependencies 

635 # - :any is ok iff the target has "M-A: allowed" 

636 packages.append(package) 

637 

638 

639def get_dependency_solvers(block, binaries_s_a, provides_s_a, *, build_depends=False, empty_set=frozenset()): 

640 """Find the packages which satisfy a dependency block 

641 

642 This method returns the list of packages which satisfy a dependency 

643 block (as returned by apt_pkg.parse_depends) in a package table 

644 for a given suite and architecture (a la self.binaries[suite][arch]) 

645 

646 It can also handle build-dependency relations if the named parameter 

647 "build_depends" is set to True. In this case, block should be based 

648 on the return value from apt_pkg.parse_src_depends. 

649 

650 :param block: The dependency block as parsed by apt_pkg.parse_depends (or apt_pkg.parse_src_depends 

651 if the "build_depends" is True) 

652 :param binaries_s_a: A dict mapping package names to the relevant BinaryPackage 

653 :param provides_s_a: A dict mapping package names to their providers (as generated by parse_provides) 

654 :param build_depends: If True, treat the "block" parameter as a build-dependency relation rather than 

655 a regular dependency relation. 

656 :param empty_set: Internal implementation detail / optimisation 

657 :return a list of package names solving the relation 

658 """ 

659 packages = [] 

660 

661 # for every package, version and operation in the block 

662 for name, version, op in block: 

663 if ":" in name: 

664 name, archqual = name.split(":", 1) 

665 else: 

666 archqual = None 

667 

668 # look for the package in unstable 

669 if name in binaries_s_a: 

670 package = binaries_s_a[name] 

671 # check the versioned dependency and architecture qualifier 

672 # (if present) 

673 if (op == '' and version == '') or apt_pkg.check_dep(package.version, op, version): 

674 _check_and_update_packages(packages, package, archqual, build_depends) 

675 

676 # look for the package in the virtual packages list and loop on them 

677 for prov, prov_version in provides_s_a.get(name, empty_set): 

678 assert prov in binaries_s_a 

679 package = binaries_s_a[prov] 

680 # See Policy Manual §7.5 

681 if (op == '' and version == '') or \ 

682 (prov_version != '' and apt_pkg.check_dep(prov_version, op, version)): 

683 _check_and_update_packages(packages, package, archqual, build_depends) 

684 

685 return packages 

686 

687 

688def invalidate_excuses(excuses, valid, invalid, invalidated): 

689 """Invalidate impossible excuses 

690 

691 This method invalidates the impossible excuses, which depend 

692 on invalid excuses. The two parameters contains the sets of 

693 `valid' and `invalid' excuses. 

694 """ 

695 

696 # make a list of all packages (source and binary) that are present in the 

697 # excuses we have 

698 excuses_packages = defaultdict(set) 

699 for exc in excuses.values(): 

700 for arch in exc.packages: 

701 for pkg_id in exc.packages[arch]: 

702 # note that the same package can be in multiple excuses 

703 # eg. when unstable and TPU have the same packages 

704 excuses_packages[pkg_id].add(exc.name) 

705 

706 # create dependencies between excuses based on packages 

707 excuses_rdeps = defaultdict(set) 

708 for exc in excuses.values(): 

709 # Note that excuses_rdeps is only populated by dependencies generated 

710 # based on packages below. There are currently no dependencies between 

711 # excuses that are added directly, so this is ok. 

712 

713 for pkg_dep in exc.depends_packages: 

714 # set of excuses, each of which can satisfy this specific 

715 # dependency 

716 # if there is a dependency on a package for which no 

717 # excuses exist (e.g. a cruft binary), the set will 

718 # contain an ImpossibleDependencyState 

719 dep_exc = set() 

720 for pkg_id in pkg_dep.deps: 

721 pkg_excuses = excuses_packages[pkg_id] 

722 # if the dependency isn't found, we get an empty set 

723 if pkg_excuses == frozenset(): 

724 imp_dep = ImpossibleDependencyState( 

725 PolicyVerdict.REJECTED_PERMANENTLY, 

726 "%s" % (pkg_id.name)) 

727 dep_exc.add(imp_dep) 

728 

729 else: 

730 dep_exc |= pkg_excuses 

731 for e in pkg_excuses: 

732 excuses_rdeps[e].add(exc.name) 

733 if not exc.add_dependency(dep_exc, pkg_dep.spec): 

734 valid.discard(exc.name) 

735 invalid.add(exc.name) 

736 

737 # loop on the invalid excuses 

738 # Convert invalid to a list for deterministic results 

739 invalid = sorted(invalid) 

740 for ename in iter_except(invalid.pop, IndexError): 

741 invalidated.add(ename) 

742 # if there is no reverse dependency, skip the item 

743 if ename not in excuses_rdeps: 

744 continue 

745 

746 rdep_verdict = PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM 

747 if excuses[ename].policy_verdict.is_blocked: 

748 rdep_verdict = PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM 

749 

750 # loop on the reverse dependencies 

751 for x in sorted(excuses_rdeps[ename]): 

752 exc = excuses[x] 

753 # if the item is valid and it is not marked as `forced', then we 

754 # invalidate this specific dependency 

755 if x in valid and not exc.forced: 

756 # mark this specific dependency as invalid 

757 still_valid = exc.invalidate_dependency(ename, rdep_verdict) 

758 

759 # if there are no alternatives left for this dependency, 

760 # invalidate the excuse 

761 if not still_valid: 

762 valid.discard(x) 

763 invalid.append(x) 

764 

765 

766def compile_nuninst(target_suite, architectures, nobreakall_arches): 

767 """Compile a nuninst dict from the current testing 

768 

769 :param target_suite: The target suite 

770 :param architectures: List of architectures 

771 :param nobreakall_arches: List of architectures where arch:all packages must be installable 

772 """ 

773 nuninst = {} 

774 binaries_t = target_suite.binaries 

775 

776 # for all the architectures 

777 for arch in architectures: 

778 # if it is in the nobreakall ones, check arch-independent packages too 

779 check_archall = arch in nobreakall_arches 

780 

781 # check all the packages for this architecture 

782 nuninst[arch] = set() 

783 packages_t_a = binaries_t[arch] 

784 for pkg_name, pkg_data in packages_t_a.items(): 

785 r = target_suite.is_installable(pkg_data.pkg_id) 

786 if not r: 

787 nuninst[arch].add(pkg_name) 

788 

789 # if they are not required, remove architecture-independent packages 

790 nuninst[arch + "+all"] = nuninst[arch].copy() 

791 if not check_archall: 

792 for pkg_name in nuninst[arch + "+all"]: 

793 pkg_data = packages_t_a[pkg_name] 

794 if pkg_data.architecture == 'all': 

795 nuninst[arch].remove(pkg_name) 

796 

797 return nuninst 

798 

799 

800def is_smooth_update_allowed(binary, smooth_updates, hints): 

801 if 'ALL' in smooth_updates: 801 ↛ 802line 801 didn't jump to line 802, because the condition on line 801 was never true

802 return True 

803 section = binary.section.split('/')[-1] 

804 if section in smooth_updates: 

805 return True 

806 if hints.search('allow-smooth-update', package=binary.source, version=binary.source_version): 

807 # note that this needs to match the source version *IN TESTING* 

808 return True 

809 return False 

810 

811 

812def find_smooth_updateable_binaries(binaries_to_check, 

813 source_data, 

814 pkg_universe, 

815 target_suite, 

816 binaries_t, 

817 binaries_s, 

818 removals, 

819 smooth_updates, 

820 hints): 

821 check = set() 

822 smoothbins = set() 

823 

824 for pkg_id in binaries_to_check: 

825 binary, _, parch = pkg_id 

826 

827 cruftbins = set() 

828 

829 # Not a candidate for smooth up date (newer non-cruft version in unstable) 

830 if binary in binaries_s[parch]: 

831 if binaries_s[parch][binary].source_version == source_data.version: 

832 continue 

833 cruftbins.add(binaries_s[parch][binary].pkg_id) 

834 

835 # Maybe a candidate (cruft or removed binary): check if config allows us to smooth update it. 

836 if is_smooth_update_allowed(binaries_t[parch][binary], smooth_updates, hints): 

837 # if the package has reverse-dependencies which are 

838 # built from other sources, it's a valid candidate for 

839 # a smooth update. if not, it may still be a valid 

840 # candidate if one if its r-deps is itself a candidate, 

841 # so note it for checking later 

842 rdeps = set(pkg_universe.reverse_dependencies_of(pkg_id)) 

843 # We ignore all binaries listed in "removals" as we 

844 # assume they will leave at the same time as the 

845 # given package. 

846 rdeps.difference_update(removals, binaries_to_check) 

847 

848 smooth_update_it = False 

849 if target_suite.any_of_these_are_in_the_suite(rdeps): 

850 combined = set(smoothbins) 

851 combined.add(pkg_id) 

852 for rdep in rdeps: 

853 # each dependency clause has a set of possible 

854 # alternatives that can satisfy that dependency. 

855 # if any of them is outside the set of smoothbins, the 

856 # dependency can be satisfied even if this binary was 

857 # removed, so there is no need to keep it around for a 

858 # smooth update 

859 # if not, only this binary can satisfy the dependency, so 

860 # we should keep it around until the rdep is no longer in 

861 # testing 

862 for dep_clause in pkg_universe.dependencies_of(rdep): 

863 # filter out cruft binaries from unstable, because 

864 # they will not be added to the set of packages that 

865 # will be migrated 

866 if dep_clause - cruftbins <= combined: 

867 smooth_update_it = True 

868 break 

869 

870 if smooth_update_it: 

871 smoothbins = combined 

872 else: 

873 check.add(pkg_id) 

874 

875 # check whether we should perform a smooth update for 

876 # packages which are candidates but do not have r-deps 

877 # outside of the current source 

878 while 1: 

879 found_any = False 

880 for pkg_id in check: 

881 rdeps = pkg_universe.reverse_dependencies_of(pkg_id) 

882 if not rdeps.isdisjoint(smoothbins): 

883 smoothbins.add(pkg_id) 

884 found_any = True 

885 if not found_any: 

886 break 

887 check = [x for x in check if x not in smoothbins] 

888 

889 return smoothbins 

890 

891 

892def find_newer_binaries(suite_info, pkg, add_source_for_dropped_bin=False): 

893 """ 

894 Find newer binaries for pkg in any of the source suites. 

895 

896 :param pkg BinaryPackage (is assumed to be in the target suite) 

897 

898 :param add_source_for_dropped_bin If True, newer versions of the 

899 source of pkg will be added if they don't have the binary pkg 

900 

901 :return list of BinaryPackageId 

902 """ 

903 

904 source = pkg.source 

905 newer_versions = [] 

906 for suite in suite_info: 

907 if suite.suite_class == SuiteClass.TARGET_SUITE: 

908 continue 

909 

910 suite_binaries_on_arch = suite.binaries.get(pkg.pkg_id.architecture) 

911 if not suite_binaries_on_arch: 911 ↛ 912line 911 didn't jump to line 912, because the condition on line 911 was never true

912 continue 

913 

914 newerbin = None 

915 if pkg.pkg_id.package_name in suite_binaries_on_arch: 

916 newerbin = suite_binaries_on_arch[pkg.pkg_id.package_name] 

917 if suite.is_cruft(newerbin): 

918 # We pretend the cruft binary doesn't exist. 

919 # We handle this as if the source didn't have the binary 

920 # (see below) 

921 newerbin = None 

922 elif apt_pkg.version_compare(newerbin.version, pkg.version) <= 0: 

923 continue 

924 else: 

925 if source not in suite.sources: 

926 # bin and source not in suite: no newer version 

927 continue 

928 

929 if not newerbin: 

930 if not add_source_for_dropped_bin: 930 ↛ 931line 930 didn't jump to line 931, because the condition on line 930 was never true

931 continue 

932 # We only get here if there is a newer version of the source, 

933 # which doesn't have the binary anymore (either it doesn't 

934 # exist, or it's cruft and we pretend it doesn't exist). 

935 # Add the new source instead. 

936 nsrc = suite.sources[source] 

937 n_id = PackageId(source, nsrc.version, "source") 

938 overs = pkg.source_version 

939 if apt_pkg.version_compare(nsrc.version, overs) <= 0: 

940 continue 

941 else: 

942 n_id = newerbin.pkg_id 

943 

944 newer_versions.append((n_id, suite)) 

945 

946 return newer_versions 

947 

948 

949def parse_provides(provides_raw: str, pkg_id=None, logger=None) -> list[tuple[str, str, str]]: 

950 parts = apt_pkg.parse_depends(provides_raw, False) 

951 nprov = [] 

952 for or_clause in parts: 

953 if len(or_clause) != 1: # pragma: no cover 

954 if logger is not None: 

955 msg = "Ignoring invalid provides in %s: Alternatives [%s]" 

956 logger.warning(msg, str(pkg_id), str(or_clause)) 

957 continue 

958 for part in or_clause: 

959 provided, provided_version, op = part 

960 if op != '' and op != '=': # pragma: no cover 

961 if logger is not None: 

962 msg = "Ignoring invalid provides in %s: %s (%s %s)" 

963 logger.warning(msg, str(pkg_id), provided, op, provided_version) 

964 continue 

965 provided = sys.intern(provided) 

966 provided_version = sys.intern(provided_version) 

967 part = (provided, provided_version, sys.intern(op)) 

968 nprov.append(part) 

969 return nprov 

970 

971 

972def parse_builtusing(builtusing_raw, pkg_id=None, logger=None) -> list[tuple[str, str]]: 

973 parts = apt_pkg.parse_depends(builtusing_raw, False) 

974 nbu = [] 

975 for or_clause in parts: 

976 if len(or_clause) != 1: # pragma: no cover 

977 if logger is not None: 

978 msg = "Ignoring invalid builtusing in %s: Alternatives [%s]" 

979 logger.warning(msg, str(pkg_id), str(or_clause)) 

980 continue 

981 for part in or_clause: 

982 bu, bu_version, op = part 

983 if op != '=': # pragma: no cover 

984 if logger is not None: 

985 msg = "Ignoring invalid builtusing in %s: %s (%s %s)" 

986 logger.warning(msg, str(pkg_id), bu, op, bu_version) 

987 continue 

988 bu = sys.intern(bu) 

989 bu_version = sys.intern(bu_version) 

990 nbu.append((bu, bu_version)) 

991 return nbu 

992 

993 

994def parse_option(options, 

995 option_name, 

996 default=None, 

997 to_bool=False, 

998 to_int=False, 

999 day_to_sec=False): 

1000 """Ensure the option exist and has a sane value 

1001 

1002 :param options: dict with options 

1003 

1004 :param option_name: string with the name of the option 

1005 

1006 :param default: the default value for the option 

1007 

1008 :param to_int: convert the input to int (defaults to sys.maxsize) 

1009 

1010 :param to_bool: convert the input to bool 

1011 

1012 :param day_to_sec: convert the input from days to seconds (implies to_int=True) 

1013 """ 

1014 value = getattr(options, option_name, default) 

1015 

1016 # Option was provided with no value (or default is '') so pick up the default 

1017 if value == '': 

1018 value = default 

1019 

1020 if (to_int or day_to_sec) and value in (None, ''): 

1021 value = sys.maxsize 

1022 

1023 if day_to_sec: 

1024 value = int(float(value) * 24 * 60 * 60) 

1025 

1026 if to_int: 

1027 value = int(value) 

1028 

1029 if to_bool: 

1030 if value and (isinstance(value, bool) or value.lower() in ('yes', 'y', 'true', 't', '1')): 

1031 value = True 

1032 else: 

1033 value = False 

1034 

1035 setattr(options, option_name, value)