Coverage for britney2/utils.py: 90%

469 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-10-17 17:32 +0000

1# Refactored parts from britney.py, which is/was: 

2# Copyright (C) 2001-2008 Anthony Towns <ajt@debian.org> 

3# Andreas Barth <aba@debian.org> 

4# Fabio Tranchitella <kobold@debian.org> 

5# Copyright (C) 2010-2012 Adam D. Barratt <adsb@debian.org> 

6# Copyright (C) 2012 Niels Thykier <niels@thykier.net> 

7# 

8# New portions 

9# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org> 

10 

11# This program is free software; you can redistribute it and/or modify 

12# it under the terms of the GNU General Public License as published by 

13# the Free Software Foundation; either version 2 of the License, or 

14# (at your option) any later version. 

15 

16# This program is distributed in the hope that it will be useful, 

17# but WITHOUT ANY WARRANTY; without even the implied warranty of 

18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19# GNU General Public License for more details. 

20 

21 

22import errno 

23import logging 

24import optparse 

25import os 

26import sys 

27import time 

28from collections import defaultdict 

29from collections.abc import Callable, Container, Iterable, Iterator, Mapping, MutableSet 

30from datetime import UTC, datetime 

31from functools import partial 

32from itertools import chain, filterfalse 

33from typing import ( 

34 IO, 

35 TYPE_CHECKING, 

36 Any, 

37 Literal, 

38 Optional, 

39 Protocol, 

40 TypeVar, 

41 Union, 

42 cast, 

43 overload, 

44) 

45 

46import apt_pkg 

47import yaml 

48 

49from britney2 import ( 

50 BinaryPackage, 

51 BinaryPackageId, 

52 PackageId, 

53 SourcePackage, 

54 Suite, 

55 SuiteClass, 

56 Suites, 

57 TargetSuite, 

58) 

59from britney2.excusedeps import DependencyState, ImpossibleDependencyState 

60from britney2.policies import PolicyVerdict 

61 

62if TYPE_CHECKING: 62 ↛ 64line 62 didn't jump to line 64 because the condition on line 62 was never true

63 

64 from _typeshed import SupportsRichComparisonT 

65 from apt_pkg import TagSection 

66 

67 from .excuse import Excuse 

68 from .hints import HintCollection 

69 from .installability.universe import BinaryPackageUniverse 

70 from .migrationitem import MigrationItem, MigrationItemFactory 

71 

72_T = TypeVar("_T") 

73 

74 

75class MigrationConstraintException(Exception): 

76 pass 

77 

78 

79@overload 

80def ifilter_except( 80 ↛ exitline 80 didn't jump to the function exit

81 container: Container[_T], iterable: Literal[None] = None 

82) -> "partial[filterfalse[_T]]": ... 

83 

84 

85@overload 

86def ifilter_except( 86 ↛ exitline 86 didn't jump to the function exit

87 container: Container[_T], iterable: Iterable[_T] 

88) -> "filterfalse[_T]": ... 

89 

90 

91def ifilter_except( 

92 container: Container[_T], iterable: Iterable[_T] | None = None 

93) -> Union["filterfalse[_T]", "partial[filterfalse[_T]]"]: 

94 """Filter out elements in container 

95 

96 If given an iterable it returns a filtered iterator, otherwise it 

97 returns a function to generate filtered iterators. The latter is 

98 useful if the same filter has to be (re-)used on multiple 

99 iterators that are not known on beforehand. 

100 """ 

101 if iterable is not None: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true

102 return filterfalse(container.__contains__, iterable) 

103 return cast( 

104 "partial[filterfalse[_T]]", partial(filterfalse, container.__contains__) 

105 ) 

106 

107 

108@overload 

109def ifilter_only(container: Container[_T], iterable: Iterable[_T]) -> "filter[_T]": ... 109 ↛ exitline 109 didn't return from function 'ifilter_only' because

110 

111 

112@overload 

113def ifilter_only(container: Container[_T], iterable: None) -> "partial[filter[_T]]": ... 113 ↛ exitline 113 didn't return from function 'ifilter_only' because

114 

115 

116def ifilter_only( 

117 container: Container[_T], iterable: Iterable[_T] | None = None 

118) -> Union["filter[_T]", "partial[filter[_T]]"]: 

119 """Filter out elements in which are not in container 

120 

121 If given an iterable it returns a filtered iterator, otherwise it 

122 returns a function to generate filtered iterators. The latter is 

123 useful if the same filter has to be (re-)used on multiple 

124 iterators that are not known on beforehand. 

125 """ 

126 if iterable is not None: 126 ↛ 128line 126 didn't jump to line 128 because the condition on line 126 was always true

127 return filter(container.__contains__, iterable) 

128 return partial(filter, container.__contains__) 

129 

130 

131# iter_except is from the "itertools" recipe 

132def iter_except( 

133 func: Callable[[], _T], 

134 exception: type[BaseException] | tuple[type[BaseException], ...], 

135 first: Any = None, 

136) -> Iterator[_T]: # pragma: no cover - itertools recipe function 

137 """Call a function repeatedly until an exception is raised. 

138 

139 Converts a call-until-exception interface to an iterator interface. 

140 Like __builtin__.iter(func, sentinel) but uses an exception instead 

141 of a sentinel to end the loop. 

142 

143 Examples: 

144 bsddbiter = iter_except(db.next, bsddb.error, db.first) 

145 heapiter = iter_except(functools.partial(heappop, h), IndexError) 

146 dictiter = iter_except(d.popitem, KeyError) 

147 dequeiter = iter_except(d.popleft, IndexError) 

148 queueiter = iter_except(q.get_nowait, Queue.Empty) 

149 setiter = iter_except(s.pop, KeyError) 

150 

151 """ 

152 try: 

153 if first is not None: 

154 yield first() 

155 while 1: 

156 yield func() 

157 except exception: 

158 pass 

159 

160 

161def log_and_format_old_libraries( 

162 logger: logging.Logger, libs: list["MigrationItem"] 

163) -> None: 

164 """Format and log old libraries in a table (no header)""" 

165 libraries: dict[str, list[str]] = {} 

166 for i in libs: 

167 pkg = i.package 

168 if pkg in libraries: 

169 libraries[pkg].append(i.architecture) 

170 else: 

171 libraries[pkg] = [i.architecture] 

172 

173 for lib in sorted(libraries): 

174 logger.info(" %s: %s", lib, " ".join(libraries[lib])) 

175 

176 

177def compute_reverse_tree( 

178 pkg_universe: "BinaryPackageUniverse", affected: set[BinaryPackageId] 

179) -> None: 

180 """Calculate the full dependency tree for a set of packages 

181 

182 This method returns the full dependency tree for a given set of 

183 packages. The first argument is an instance of the BinaryPackageUniverse 

184 and the second argument are a set of BinaryPackageId. 

185 

186 The set of affected packages will be updated in place and must 

187 therefore be mutable. 

188 """ 

189 remain = list(affected) 

190 while remain: 

191 pkg_id = remain.pop() 

192 new_pkg_ids = pkg_universe.reverse_dependencies_of(pkg_id) - affected 

193 affected.update(new_pkg_ids) 

194 remain.extend(new_pkg_ids) 

195 

196 

197def add_transitive_dependencies_flatten( 

198 pkg_universe: "BinaryPackageUniverse", initial_set: MutableSet[BinaryPackageId] 

199) -> None: 

200 """Find and include all transitive dependencies 

201 

202 This method updates the initial_set parameter to include all transitive 

203 dependencies. The first argument is an instance of the BinaryPackageUniverse 

204 and the second argument are a set of BinaryPackageId. 

205 

206 The set of initial packages will be updated in place and must 

207 therefore be mutable. 

208 """ 

209 remain = list(initial_set) 

210 while remain: 

211 pkg_id = remain.pop() 

212 new_pkg_ids = { 

213 x 

214 for x in chain.from_iterable(pkg_universe.dependencies_of(pkg_id)) 

215 if x not in initial_set 

216 } 

217 initial_set |= new_pkg_ids 

218 remain.extend(new_pkg_ids) 

219 

220 

221def write_nuninst(filename: str, nuninst: dict[str, set[str]]) -> None: 

222 """Write the non-installable report 

223 

224 Write the non-installable report derived from "nuninst" to the 

225 file denoted by "filename". 

226 """ 

227 with open(filename, "w", encoding="utf-8") as f: 

228 # Having two fields with (almost) identical dates seems a bit 

229 # redundant. 

230 f.write( 

231 "Built on: " 

232 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

233 + "\n" 

234 ) 

235 f.write( 

236 "Last update: " 

237 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

238 + "\n\n" 

239 ) 

240 for k in nuninst: 

241 f.write("{}: {}\n".format(k, " ".join(nuninst[k]))) 

242 

243 

244def read_nuninst(filename: str, architectures: set[str]) -> dict[str, set[str]]: 

245 """Read the non-installable report 

246 

247 Read the non-installable report from the file denoted by 

248 "filename" and return it. Only architectures in "architectures" 

249 will be included in the report. 

250 """ 

251 nuninst: dict[str, set[str]] = {} 

252 with open(filename, encoding="ascii") as f: 

253 for r in f: 

254 if ":" not in r: 

255 continue 

256 arch, packages = r.strip().split(":", 1) 

257 if arch.split("+", 1)[0] in architectures: 

258 nuninst[arch] = set(packages.split()) 

259 return nuninst 

260 

261 

262def newly_uninst( 

263 nuold: dict[str, set[str]], nunew: dict[str, set[str]] 

264) -> dict[str, list[str]]: 

265 """Return a nuninst statistic with only new uninstallable packages 

266 

267 This method subtracts the uninstallable packages of the statistic 

268 "nunew" from the statistic "nuold". 

269 

270 It returns a dictionary with the architectures as keys and the list 

271 of uninstallable packages as values. If there are no regressions 

272 on a given architecture, then the architecture will be omitted in 

273 the result. Accordingly, if none of the architectures have 

274 regressions an empty directory is returned. 

275 """ 

276 res: dict[str, list[str]] = {} 

277 for arch in ifilter_only(nunew, nuold): 

278 arch_nuninst = [x for x in nunew[arch] if x not in nuold[arch]] 

279 # Leave res empty if there are no newly uninst packages 

280 if arch_nuninst: 

281 res[arch] = arch_nuninst 

282 return res 

283 

284 

285def format_and_log_uninst( 

286 logger: logging.Logger, 

287 architectures: Iterable[str], 

288 nuninst: Mapping[str, Iterable[str]], 

289 *, 

290 loglevel: int = logging.INFO, 

291) -> None: 

292 """Emits the uninstallable packages to the log 

293 

294 An example of the output string is: 

295 * i386: broken-pkg1, broken-pkg2 

296 

297 Note that if there is no uninstallable packages, then nothing is emitted. 

298 """ 

299 for arch in architectures: 

300 if arch in nuninst and nuninst[arch]: 

301 msg = " * {}: {}".format(arch, ", ".join(sorted(nuninst[arch]))) 

302 logger.log(loglevel, msg) 

303 

304 

305class Sorted(Protocol): 

306 def __call__( 306 ↛ exitline 306 didn't jump to the function exit

307 self, 

308 iterable: Iterable["SupportsRichComparisonT"], 

309 /, 

310 *, 

311 key: None = None, 

312 reverse: bool = False, 

313 ) -> list["SupportsRichComparisonT"]: ... 

314 

315 

316def write_heidi( 

317 filename: str, 

318 target_suite: TargetSuite, 

319 *, 

320 outofsync_arches: frozenset[str] = frozenset(), 

321 sorted: Sorted = sorted, 

322) -> None: 

323 """Write the output HeidiResult 

324 

325 This method write the output for Heidi, which contains all the 

326 binary packages and the source packages in the form: 

327 

328 <pkg-name> <pkg-version> <pkg-architecture> <pkg-section> 

329 <src-name> <src-version> source <src-section> 

330 

331 The file is written as "filename" using the sources and packages 

332 from the "target_suite" parameter. 

333 

334 outofsync_arches: If given, it is a set of architectures marked 

335 as "out of sync". The output file may exclude some out of date 

336 arch:all packages for those architectures to reduce the noise. 

337 

338 The "X=X" parameters are optimizations to avoid "load global" in 

339 the loops. 

340 """ 

341 sources_t = target_suite.sources 

342 packages_t = target_suite.binaries 

343 

344 with open(filename, "w", encoding="ascii") as f: 

345 

346 # write binary packages 

347 for arch in sorted(packages_t): 

348 binaries = packages_t[arch] 

349 for pkg_name in sorted(binaries): 

350 pkg = binaries[pkg_name] 

351 pkgv = pkg.version 

352 pkgarch = pkg.architecture or "all" 

353 pkgsec = pkg.section or "faux" 

354 if pkgsec == "faux" or pkgsec.endswith("/faux"): 

355 # Faux package; not really a part of testing 

356 continue 

357 if ( 357 ↛ 369line 357 didn't jump to line 369

358 pkg.source_version 

359 and pkgarch == "all" 

360 and pkg.source_version != sources_t[pkg.source].version 

361 and arch in outofsync_arches 

362 ): 

363 # when architectures are marked as "outofsync", their binary 

364 # versions may be lower than those of the associated 

365 # source package in testing. the binary package list for 

366 # such architectures will include arch:all packages 

367 # matching those older versions, but we only want the 

368 # newer arch:all in testing 

369 continue 

370 f.write(f"{pkg_name} {pkgv} {pkgarch} {pkgsec}\n") 

371 

372 # write sources 

373 for src_name in sorted(sources_t): 

374 src = sources_t[src_name] 

375 srcv = src.version 

376 srcsec = src.section or "unknown" 

377 if srcsec == "faux" or srcsec.endswith("/faux"): 

378 # Faux package; not really a part of testing 

379 continue 

380 f.write(f"{src_name} {srcv} source {srcsec}\n") 

381 

382 

383def write_heidi_delta(filename: str, all_selected: list["MigrationItem"]) -> None: 

384 """Write the output delta 

385 

386 This method writes the packages to be upgraded, in the form: 

387 <src-name> <src-version> 

388 or (if the source is to be removed): 

389 -<src-name> <src-version> 

390 

391 The order corresponds to that shown in update_output. 

392 """ 

393 with open(filename, "w", encoding="ascii") as fd: 

394 

395 fd.write("#HeidiDelta\n") 

396 

397 for item in all_selected: 

398 prefix = "" 

399 

400 if item.is_removal: 

401 prefix = "-" 

402 

403 if item.architecture == "source": 

404 fd.write(f"{prefix}{item.package} {item.version}\n") 

405 else: 

406 fd.write( 

407 "%s%s %s %s\n" 

408 % (prefix, item.package, item.version, item.architecture) 

409 ) 

410 

411 

412class Opener(Protocol): 

413 def __call__( 413 ↛ exitline 413 didn't jump to the function exit

414 self, file: str, mode: Literal["wt"], encoding: Literal["utf-8"] 

415 ) -> IO[Any]: ... 

416 

417 

418def write_excuses( 

419 excuses: dict[str, "Excuse"] | dict[PackageId, "Excuse"], 

420 dest_file: str, 

421 output_format: Literal["yaml", "legacy-html"] = "yaml", 

422) -> None: 

423 """Write the excuses to dest_file 

424 

425 Writes a list of excuses in a specified output_format to the 

426 path denoted by dest_file. The output_format can either be "yaml" 

427 or "legacy-html". 

428 """ 

429 excuselist = sorted(excuses.values(), key=lambda x: x.sortkey()) 

430 if output_format == "yaml": 

431 os.makedirs(os.path.dirname(dest_file), exist_ok=True) 

432 opener: Opener = open # type: ignore[assignment] 

433 if dest_file.endswith(".xz"): 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true

434 import lzma 

435 

436 opener = lzma.open # type: ignore[assignment] 

437 elif dest_file.endswith(".gz"): 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true

438 import gzip 

439 

440 opener = gzip.open # type: ignore[assignment] 

441 with opener(dest_file, "wt", encoding="utf-8") as f: 

442 edatalist = [e.excusedata(excuses) for e in excuselist] 

443 excusesdata = { 

444 "sources": edatalist, 

445 "generated-date": datetime.now(UTC), 

446 } 

447 f.write( 

448 yaml.dump(excusesdata, default_flow_style=False, allow_unicode=True) 

449 ) 

450 elif output_format == "legacy-html": 

451 with open(dest_file, "w", encoding="utf-8") as f: 

452 f.write( 

453 '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/REC-html40/strict.dtd">\n' 

454 ) 

455 f.write("<html><head><title>excuses...</title>") 

456 f.write( 

457 '<meta http-equiv="Content-Type" content="text/html;charset=utf-8"></head><body>\n' 

458 ) 

459 f.write( 

460 "<p>Generated: " 

461 + time.strftime("%Y.%m.%d %H:%M:%S %z", time.gmtime(time.time())) 

462 + "</p>\n" 

463 ) 

464 f.write("<ul>\n") 

465 for e in excuselist: 

466 f.write("<li>%s" % e.html(excuses)) 

467 f.write("</ul></body></html>\n") 

468 else: # pragma: no cover 

469 raise ValueError('Output format must be either "yaml or "legacy-html"') 

470 

471 

472def old_libraries( 

473 mi_factory: "MigrationItemFactory", 

474 suite_info: Suites, 

475 outofsync_arches: Iterable[str] = frozenset(), 

476) -> list["MigrationItem"]: 

477 """Detect old libraries left in the target suite for smooth transitions 

478 

479 This method detects old libraries which are in the target suite but no 

480 longer built from the source package: they are still there because 

481 other packages still depend on them, but they should be removed as 

482 soon as possible. 

483 

484 For "outofsync" architectures, outdated binaries are allowed to be in 

485 the target suite, so they are only added to the removal list if they 

486 are no longer in the (primary) source suite. 

487 """ 

488 sources_t = suite_info.target_suite.sources 

489 binaries_t = suite_info.target_suite.binaries 

490 binaries_s = suite_info.primary_source_suite.binaries 

491 removals = [] 

492 for arch in binaries_t: 

493 for pkg_name in binaries_t[arch]: 

494 pkg = binaries_t[arch][pkg_name] 

495 if sources_t[pkg.source].version != pkg.source_version and ( 

496 arch not in outofsync_arches or pkg_name not in binaries_s[arch] 

497 ): 

498 removals.append(mi_factory.generate_removal_for_cruft_item(pkg.pkg_id)) 

499 return removals 

500 

501 

502def is_nuninst_asgood_generous( 

503 constraints: dict[str, list[str]], 

504 allow_uninst: dict[str, set[str | None]], 

505 architectures: list[str], 

506 old: dict[str, set[str]], 

507 new: dict[str, set[str]], 

508 break_arches: set[str] = cast(set[str], frozenset()), 

509) -> bool: 

510 """Compares the nuninst counters and constraints to see if they improved 

511 

512 Given a list of architectures, the previous and the current nuninst 

513 counters, this function determines if the current nuninst counter 

514 is better than the previous one. Optionally it also accepts a set 

515 of "break_arches", the nuninst counter for any architecture listed 

516 in this set are completely ignored. 

517 

518 If the nuninst counters are equal or better, then the constraints 

519 are checked for regressions (ignoring break_arches). 

520 

521 Returns True if the new nuninst counter is better than the 

522 previous and there are no constraint regressions (ignoring Break-archs). 

523 Returns False otherwise. 

524 

525 """ 

526 diff = 0 

527 for arch in architectures: 

528 if arch in break_arches: 

529 continue 

530 diff = diff + ( 

531 len(new[arch] - allow_uninst[arch]) - len(old[arch] - allow_uninst[arch]) 

532 ) 

533 if diff > 0: 533 ↛ 534line 533 didn't jump to line 534 because the condition on line 533 was never true

534 return False 

535 must_be_installable = constraints["keep-installable"] 

536 for arch in architectures: 

537 if arch in break_arches: 

538 continue 

539 regression = new[arch] - old[arch] 

540 if not regression.isdisjoint(must_be_installable): 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true

541 return False 

542 return True 

543 

544 

545def clone_nuninst( 

546 nuninst: dict[str, set[str]], 

547 *, 

548 packages_s: dict[str, dict[str, BinaryPackage]] | None = None, 

549 architectures: Iterable[str] | None = None, 

550) -> dict[str, set[str]]: 

551 """Completely or Selectively deep clone nuninst 

552 

553 Given nuninst table, the package table for a given suite and 

554 a list of architectures, this function will clone the nuninst 

555 table. Only the listed architectures will be deep cloned - 

556 the rest will only be shallow cloned. When packages_s is given, 

557 packages not listed in packages_s will be pruned from the clone 

558 (if packages_s is omitted, the per architecture nuninst is cloned 

559 as-is) 

560 """ 

561 clone = nuninst.copy() 

562 if architectures is None: 562 ↛ 563line 562 didn't jump to line 563 because the condition on line 562 was never true

563 return clone 

564 if packages_s is not None: 

565 for arch in architectures: 

566 clone[arch] = {x for x in nuninst[arch] if x in packages_s[arch]} 

567 clone[arch + "+all"] = { 

568 x for x in nuninst[arch + "+all"] if x in packages_s[arch] 

569 } 

570 else: 

571 for arch in architectures: 

572 clone[arch] = set(nuninst[arch]) 

573 clone[arch + "+all"] = set(nuninst[arch + "+all"]) 

574 return clone 

575 

576 

577def test_installability( 

578 target_suite: TargetSuite, 

579 pkg_name: str, 

580 pkg_id: BinaryPackageId, 

581 broken: set[str], 

582 nuninst_arch: set[str] | None, 

583) -> Literal[-1, 0, 1]: 

584 """Test for installability of a package on an architecture 

585 

586 (pkg_name, pkg_version, pkg_arch) is the package to check. 

587 

588 broken is the set of broken packages. If p changes 

589 installability (e.g. goes from uninstallable to installable), 

590 broken will be updated accordingly. 

591 

592 If nuninst_arch is not None then it also updated in the same 

593 way as broken is. 

594 """ 

595 c: Literal[-1, 0, 1] = 0 

596 r = target_suite.is_installable(pkg_id) 

597 if not r: 

598 # not installable 

599 if pkg_name not in broken: 

600 # regression 

601 broken.add(pkg_name) 

602 c = -1 

603 if nuninst_arch is not None and pkg_name not in nuninst_arch: 

604 nuninst_arch.add(pkg_name) 

605 else: 

606 if pkg_name in broken: 

607 # Improvement 

608 broken.remove(pkg_name) 

609 c = 1 

610 if nuninst_arch is not None and pkg_name in nuninst_arch: 

611 nuninst_arch.remove(pkg_name) 

612 return c 

613 

614 

615def check_installability( 

616 target_suite: TargetSuite, 

617 binaries: dict[str, dict[str, BinaryPackage]], 

618 arch: str, 

619 updates: set[BinaryPackageId], 

620 check_archall: bool, 

621 nuninst: dict[str, set[str]], 

622) -> None: 

623 broken = nuninst[arch + "+all"] 

624 packages_t_a = binaries[arch] 

625 

626 for pkg_id in (x for x in updates if x.architecture == arch): 

627 name, version, parch = pkg_id 

628 if name not in packages_t_a: 

629 continue 

630 pkgdata = packages_t_a[name] 

631 if version != pkgdata.version: 

632 # Not the version in testing right now, ignore 

633 continue 

634 actual_arch = pkgdata.architecture 

635 nuninst_arch = None 

636 # only check arch:all packages if requested 

637 if check_archall or actual_arch != "all": 

638 nuninst_arch = nuninst[parch] 

639 elif actual_arch == "all": 639 ↛ 641line 639 didn't jump to line 641 because the condition on line 639 was always true

640 nuninst[parch].discard(name) 

641 test_installability(target_suite, name, pkg_id, broken, nuninst_arch) 

642 

643 

644def possibly_compressed( 

645 path: str, *, permitted_compressions: list[str] | None = None 

646) -> str: 

647 """Find and select a (possibly compressed) variant of a path 

648 

649 If the given path exists, it will be returned 

650 

651 :param path: The base path. 

652 :param permitted_compressions: Alternative extensions to look for. Defaults to "gz" and "xz". 

653 :return: The path given possibly with one of the permitted extensions. 

654 :raises FileNotFoundError: if the path is not found 

655 """ 

656 if os.path.exists(path): 656 ↛ 658line 656 didn't jump to line 658 because the condition on line 656 was always true

657 return path 

658 if permitted_compressions is None: 

659 permitted_compressions = ["gz", "xz"] 

660 for ext in permitted_compressions: 

661 cpath = f"{path}.{ext}" 

662 if os.path.exists(cpath): 

663 return cpath 

664 raise FileNotFoundError( 

665 errno.ENOENT, os.strerror(errno.ENOENT), path 

666 ) # pragma: no cover 

667 

668 

669def create_provides_map( 

670 packages: dict[str, BinaryPackage], 

671) -> dict[str, set[tuple[str, str]]]: 

672 """Create a provides map from a map binary package names and their BinaryPackage objects 

673 

674 :param packages: A dict mapping binary package names to their BinaryPackage object 

675 :return: A provides map 

676 """ 

677 # create provides 

678 provides = defaultdict(set) 

679 

680 for pkg, dpkg in packages.items(): 

681 # register virtual packages and real packages that provide 

682 # them 

683 for provided_pkg, provided_version, _ in dpkg.provides: 

684 provides[provided_pkg].add((pkg, provided_version)) 

685 

686 return provides 

687 

688 

689def read_release_file(suite_dir: str) -> "TagSection[str]": 

690 """Parses a given "Release" file 

691 

692 :param suite_dir: The directory to the suite 

693 :return: A dict of the first (and only) paragraph in an Release file 

694 """ 

695 release_file = os.path.join(suite_dir, "Release") 

696 with open(release_file) as fd: 

697 tag_file = iter(apt_pkg.TagFile(fd)) 

698 result = next(tag_file) 

699 if next(tag_file, None) is not None: # pragma: no cover 

700 raise TypeError("%s has more than one paragraph" % release_file) 

701 return result 

702 

703 

704def read_sources_file( 

705 filename: str, 

706 sources: dict[str, SourcePackage] | None = None, 

707 add_faux: bool = True, 

708 intern: Callable[[str], str] = sys.intern, 

709) -> dict[str, SourcePackage]: 

710 """Parse a single Sources file into a hash 

711 

712 Parse a single Sources file into a dict mapping a source package 

713 name to a SourcePackage object. If there are multiple source 

714 packages with the same version, then highest versioned source 

715 package (that is not marked as "Extra-Source-Only") is the 

716 version kept in the dict. 

717 

718 :param filename: Path to the Sources file. Can be compressed by any algorithm supported by apt_pkg.TagFile 

719 :param sources: Optional dict to add the packages to. If given, this is also the value returned. 

720 :param add_faux: Add a faux arch:all binary for each source that claims it has arch:all 

721 :param intern: Internal optimisation / implementation detail to avoid python's "LOAD_GLOBAL" instruction in a loop 

722 :return: mapping from names to a source package 

723 """ 

724 if sources is None: 

725 sources = {} 

726 

727 tag_file = apt_pkg.TagFile(filename) 

728 get_field = tag_file.section.get 

729 step = tag_file.step 

730 

731 while step(): 

732 if get_field("Extra-Source-Only", "no") == "yes": 

733 # Ignore sources only referenced by Built-Using 

734 continue 

735 pkg = get_field("Package") 

736 ver = get_field("Version") 

737 # There may be multiple versions of the source package 

738 # (in unstable) if some architectures have out-of-date 

739 # binaries. We only ever consider the source with the 

740 # largest version for migration. 

741 if pkg in sources and apt_pkg.version_compare(sources[pkg].version, ver) > 0: 

742 continue 

743 maint = get_field("Maintainer") 

744 if maint: 744 ↛ 746line 744 didn't jump to line 746 because the condition on line 744 was always true

745 maint = intern(maint.strip()) 

746 section = get_field("Section") 

747 if section: 747 ↛ 750line 747 didn't jump to line 750 because the condition on line 747 was always true

748 section = intern(section.strip()) 

749 build_deps_arch: str | None 

750 build_deps_arch = ", ".join( 

751 x 

752 for x in (get_field("Build-Depends"), get_field("Build-Depends-Arch")) 

753 if x is not None 

754 ) 

755 if build_deps_arch != "": 

756 build_deps_arch = sys.intern(build_deps_arch) 

757 else: 

758 build_deps_arch = None 

759 build_deps_indep = get_field("Build-Depends-Indep") 

760 if build_deps_indep is not None: 

761 build_deps_indep = sys.intern(build_deps_indep) 

762 

763 # Adding arch:all packages to the list of binaries already to be able 

764 # to check for them later. Helps mitigate bug 887060 and is the 

765 # (partial?) answer to bug 1064428. 

766 binaries: set[BinaryPackageId] = set() 

767 if add_faux and "all" in get_field("Architecture", "").split(): 

768 # the value "faux" in arch:faux is used elsewhere, so keep in sync 

769 pkg_id = BinaryPackageId(pkg + "-faux", intern("0~~~~"), intern("faux")) 

770 binaries.add(pkg_id) 

771 

772 sources[intern(pkg)] = SourcePackage( 

773 intern(pkg), 

774 intern(ver), 

775 section, 

776 binaries, 

777 maint, 

778 False, 

779 build_deps_arch, 

780 build_deps_indep, 

781 get_field("Testsuite", "").split(), 

782 get_field("Testsuite-Triggers", "").replace(",", "").split(), 

783 ) 

784 return sources 

785 

786 

787def _check_and_update_packages( 

788 packages: list[BinaryPackage], 

789 package: BinaryPackage, 

790 archqual: str | None, 

791 build_depends: bool, 

792) -> None: 

793 """Helper for get_dependency_solvers 

794 

795 This method updates the list of packages with a given package if that 

796 package is a valid (Build-)Depends. 

797 

798 :param packages: which packages are to be updated 

799 :param archqual: Architecture qualifier 

800 :param build_depends: If True, check if the "package" parameter is valid as a build-dependency. 

801 """ 

802 

803 # See also bug #971739 and #1059929 

804 if archqual is None: 

805 packages.append(package) 

806 elif archqual == "native" and build_depends: 

807 # Multi-arch handling for build-dependencies 

808 # - :native is ok always 

809 packages.append(package) 

810 elif archqual == "any" and package.multi_arch == "allowed": 810 ↛ 813line 810 didn't jump to line 813 because the condition on line 810 was never true

811 # Multi-arch handling for both build-dependencies and regular dependencies 

812 # - :any is ok iff the target has "M-A: allowed" 

813 packages.append(package) 

814 

815 

816class GetDependencySolversProto(Protocol): 

817 def __call__( 817 ↛ exitline 817 didn't jump to the function exit

818 self, 

819 block: list[tuple[str, str, str]], 

820 binaries_s_a: dict[str, BinaryPackage], 

821 provides_s_a: dict[str, set[tuple[str, str]]], 

822 *, 

823 build_depends: bool = False, 

824 empty_set: Any = frozenset(), 

825 ) -> list[BinaryPackage]: ... 

826 

827 

828def get_dependency_solvers( 

829 block: list[tuple[str, str, str]], 

830 binaries_s_a: dict[str, BinaryPackage], 

831 provides_s_a: dict[str, set[tuple[str, str]]], 

832 *, 

833 build_depends: bool = False, 

834 empty_set: Any = frozenset(), 

835) -> list[BinaryPackage]: 

836 """Find the packages which satisfy a dependency block 

837 

838 This method returns the list of packages which satisfy a dependency 

839 block (as returned by apt_pkg.parse_depends) in a package table 

840 for a given suite and architecture (a la self.binaries[suite][arch]) 

841 

842 It can also handle build-dependency relations if the named parameter 

843 "build_depends" is set to True. In this case, block should be based 

844 on the return value from apt_pkg.parse_src_depends. 

845 

846 :param block: The dependency block as parsed by apt_pkg.parse_depends (or apt_pkg.parse_src_depends 

847 if the "build_depends" is True) 

848 :param binaries_s_a: Mapping of package names to the relevant BinaryPackage 

849 :param provides_s_a: Mapping of package names to their providers (as generated by parse_provides) 

850 :param build_depends: If True, treat the "block" parameter as a build-dependency relation rather than 

851 a regular dependency relation. 

852 :param empty_set: Internal implementation detail / optimisation 

853 :return: package names solving the relation 

854 """ 

855 packages: list[BinaryPackage] = [] 

856 

857 # for every package, version and operation in the block 

858 for name, version, op in block: 

859 if ":" in name: 

860 name, archqual = name.split(":", 1) 

861 else: 

862 archqual = None 

863 

864 # look for the package in unstable 

865 if name in binaries_s_a: 

866 package = binaries_s_a[name] 

867 # check the versioned dependency and architecture qualifier 

868 # (if present) 

869 if (op == "" and version == "") or apt_pkg.check_dep( 

870 package.version, op, version 

871 ): 

872 _check_and_update_packages(packages, package, archqual, build_depends) 

873 

874 # look for the package in the virtual packages list and loop on them 

875 for prov, prov_version in provides_s_a.get(name, empty_set): 

876 assert prov in binaries_s_a 

877 package = binaries_s_a[prov] 

878 # See Policy Manual §7.5 

879 if (op == "" and version == "") or ( 

880 prov_version != "" and apt_pkg.check_dep(prov_version, op, version) 

881 ): 

882 _check_and_update_packages(packages, package, archqual, build_depends) 

883 

884 return packages 

885 

886 

887def invalidate_excuses( 

888 excuses: dict[str, "Excuse"], 

889 valid: set[str], 

890 invalid: set[str], 

891 invalidated: set[str], 

892) -> None: 

893 """Invalidate impossible excuses 

894 

895 This method invalidates the impossible excuses, which depend 

896 on invalid excuses. The two parameters contains the sets of 

897 `valid' and `invalid' excuses. 

898 """ 

899 # make a list of all packages (source and binary) that are present in the 

900 # excuses we have 

901 excuses_packages: dict[PackageId | BinaryPackageId, set[str]] = defaultdict(set) 

902 for exc in excuses.values(): 

903 for arch in exc.packages: 

904 for pkg_arch_id in exc.packages[arch]: 

905 # note that the same package can be in multiple excuses 

906 # eg. when unstable and TPU have the same packages 

907 excuses_packages[pkg_arch_id].add(exc.name) 

908 

909 # create dependencies between excuses based on packages 

910 excuses_rdeps = defaultdict(set) 

911 for exc in excuses.values(): 

912 # Note that excuses_rdeps is only populated by dependencies generated 

913 # based on packages below. There are currently no dependencies between 

914 # excuses that are added directly, so this is ok. 

915 

916 for pkg_dep in exc.depends_packages: 

917 # set of excuses, each of which can satisfy this specific 

918 # dependency 

919 # if there is a dependency on a package for which no 

920 # excuses exist (e.g. a cruft binary), the set will 

921 # contain an ImpossibleDependencyState 

922 dep_exc: set[str | DependencyState] = set() 

923 for pkg_dep_id in cast(set[BinaryPackageId], pkg_dep.deps): 

924 pkg_excuses = excuses_packages[pkg_dep_id] 

925 # if the dependency isn't found, we get an empty set 

926 if pkg_excuses == frozenset(): 

927 imp_dep = ImpossibleDependencyState( 

928 PolicyVerdict.REJECTED_PERMANENTLY, "%s" % (pkg_dep_id.name) 

929 ) 

930 dep_exc.add(imp_dep) 

931 

932 else: 

933 dep_exc |= pkg_excuses 

934 for e in pkg_excuses: 

935 excuses_rdeps[e].add(exc.name) 

936 if not exc.add_dependency(dep_exc, pkg_dep.spec): 

937 valid.discard(exc.name) 

938 invalid.add(exc.name) 

939 

940 # loop on the invalid excuses 

941 # Convert invalid to a list for deterministic results 

942 invalid2 = sorted(invalid) 

943 for ename in iter_except(invalid2.pop, IndexError): 

944 invalidated.add(ename) 

945 # if there is no reverse dependency, skip the item 

946 if ename not in excuses_rdeps: 

947 continue 

948 

949 rdep_verdict = PolicyVerdict.REJECTED_WAITING_FOR_ANOTHER_ITEM 

950 if excuses[ename].policy_verdict.is_blocked: 

951 rdep_verdict = PolicyVerdict.REJECTED_BLOCKED_BY_ANOTHER_ITEM 

952 

953 # loop on the reverse dependencies 

954 for x in sorted(excuses_rdeps[ename]): 

955 exc = excuses[x] 

956 # if the item is valid and it is not marked as `forced', then we 

957 # invalidate this specific dependency 

958 if x in valid and not exc.forced: 

959 # mark this specific dependency as invalid 

960 still_valid = exc.invalidate_dependency(ename, rdep_verdict) 

961 

962 # if there are no alternatives left for this dependency, 

963 # invalidate the excuse 

964 if not still_valid: 

965 valid.discard(x) 

966 invalid2.append(x) 

967 

968 

969def compile_nuninst( 

970 target_suite: TargetSuite, architectures: list[str], nobreakall_arches: list[str] 

971) -> dict[str, set[str]]: 

972 """Compile a nuninst dict from the current testing 

973 

974 :param target_suite: The target suite 

975 :param architectures: Which architectures to check 

976 :param nobreakall_arches: Which architectures where arch:all packages must be installable 

977 """ 

978 nuninst: dict[str, set[str]] = {} 

979 binaries_t = target_suite.binaries 

980 

981 # for all the architectures 

982 for arch in architectures: 

983 # if it is in the nobreakall ones, check arch-independent packages too 

984 check_archall = arch in nobreakall_arches 

985 

986 # check all the packages for this architecture 

987 nuninst[arch] = set() 

988 packages_t_a = binaries_t[arch] 

989 for pkg_name, pkg_data in packages_t_a.items(): 

990 r = target_suite.is_installable(pkg_data.pkg_id) 

991 if not r: 

992 nuninst[arch].add(pkg_name) 

993 

994 # if they are not required, remove architecture-independent packages 

995 nuninst[arch + "+all"] = nuninst[arch].copy() 

996 if not check_archall: 

997 for pkg_name in nuninst[arch + "+all"]: 

998 pkg_data = packages_t_a[pkg_name] 

999 if pkg_data.architecture == "all": 

1000 nuninst[arch].remove(pkg_name) 

1001 

1002 return nuninst 

1003 

1004 

1005def is_smooth_update_allowed( 

1006 binary: BinaryPackage, smooth_updates: list[str], hints: "HintCollection" 

1007) -> bool: 

1008 if "ALL" in smooth_updates: 1008 ↛ 1009line 1008 didn't jump to line 1009 because the condition on line 1008 was never true

1009 return True 

1010 section = binary.section.split("/")[-1] 

1011 if section in smooth_updates: 

1012 return True 

1013 if hints.search( 1013 ↛ 1017line 1013 didn't jump to line 1017 because the condition on line 1013 was never true

1014 "allow-smooth-update", package=binary.source, version=binary.source_version 

1015 ): 

1016 # note that this needs to match the source version *IN TESTING* 

1017 return True 

1018 return False 

1019 

1020 

1021def find_smooth_updateable_binaries( 

1022 binaries_to_check: list[BinaryPackageId], 

1023 source_data: SourcePackage, 

1024 pkg_universe: "BinaryPackageUniverse", 

1025 target_suite: TargetSuite, 

1026 binaries_t: dict[str, dict[str, BinaryPackage]], 

1027 binaries_s: dict[str, dict[str, BinaryPackage]], 

1028 removals: set[BinaryPackageId] | frozenset[BinaryPackageId], 

1029 smooth_updates: list[str], 

1030 hints: "HintCollection", 

1031) -> set[BinaryPackageId]: 

1032 check: set[BinaryPackageId] = set() 

1033 smoothbins: set[BinaryPackageId] = set() 

1034 

1035 for check_pkg_id in binaries_to_check: 

1036 binary, _, parch = check_pkg_id 

1037 

1038 cruftbins: set[BinaryPackageId] = set() 

1039 

1040 # Not a candidate for smooth up date (newer non-cruft version in unstable) 

1041 if binary in binaries_s[parch]: 

1042 if binaries_s[parch][binary].source_version == source_data.version: 

1043 continue 

1044 cruftbins.add(binaries_s[parch][binary].pkg_id) 

1045 

1046 # Maybe a candidate (cruft or removed binary): check if config allows us to smooth update it. 

1047 if is_smooth_update_allowed(binaries_t[parch][binary], smooth_updates, hints): 

1048 # if the package has reverse-dependencies which are 

1049 # built from other sources, it's a valid candidate for 

1050 # a smooth update. if not, it may still be a valid 

1051 # candidate if one if its r-deps is itself a candidate, 

1052 # so note it for checking later 

1053 rdeps = set(pkg_universe.reverse_dependencies_of(check_pkg_id)) 

1054 # We ignore all binaries listed in "removals" as we 

1055 # assume they will leave at the same time as the 

1056 # given package. 

1057 rdeps.difference_update(removals, binaries_to_check) 

1058 

1059 smooth_update_it = False 

1060 if target_suite.any_of_these_are_in_the_suite(rdeps): 

1061 combined = set(smoothbins) 

1062 combined.add(check_pkg_id) 

1063 for rdep in rdeps: 

1064 # each dependency clause has a set of possible 

1065 # alternatives that can satisfy that dependency. 

1066 # if any of them is outside the set of smoothbins, the 

1067 # dependency can be satisfied even if this binary was 

1068 # removed, so there is no need to keep it around for a 

1069 # smooth update 

1070 # if not, only this binary can satisfy the dependency, so 

1071 # we should keep it around until the rdep is no longer in 

1072 # testing 

1073 for dep_clause in pkg_universe.dependencies_of(rdep): 

1074 # filter out cruft binaries from unstable, because 

1075 # they will not be added to the set of packages that 

1076 # will be migrated 

1077 if dep_clause - cruftbins <= combined: 

1078 smooth_update_it = True 

1079 break 

1080 

1081 if smooth_update_it: 

1082 smoothbins = combined 

1083 else: 

1084 check.add(check_pkg_id) 

1085 

1086 # check whether we should perform a smooth update for 

1087 # packages which are candidates but do not have r-deps 

1088 # outside of the current source 

1089 while 1: 

1090 found_any = False 

1091 for candidate_pkg_id in check: 

1092 rdeps = pkg_universe.reverse_dependencies_of(candidate_pkg_id) 

1093 if not rdeps.isdisjoint(smoothbins): 

1094 smoothbins.add(candidate_pkg_id) 

1095 found_any = True 

1096 if not found_any: 

1097 break 

1098 check = {x for x in check if x not in smoothbins} 

1099 

1100 return smoothbins 

1101 

1102 

1103def find_newer_binaries( 

1104 suite_info: Suites, pkg: BinaryPackage, add_source_for_dropped_bin: bool = False 

1105) -> list[tuple[PackageId, Suite]]: 

1106 """ 

1107 Find newer binaries for pkg in any of the source suites. 

1108 

1109 :param pkg: BinaryPackage (is assumed to be in the target suite) 

1110 

1111 :param add_source_for_dropped_bin: If True, newer versions of the 

1112 source of pkg will be added if they don't have the binary pkg 

1113 

1114 :return: the newer binaries (or sources) and their suites 

1115 """ 

1116 source = pkg.source 

1117 newer_versions: list[tuple[PackageId, Suite]] = [] 

1118 for suite in suite_info: 

1119 if suite.suite_class == SuiteClass.TARGET_SUITE: 

1120 continue 

1121 

1122 suite_binaries_on_arch = suite.binaries.get(pkg.pkg_id.architecture) 

1123 if not suite_binaries_on_arch: 1123 ↛ 1124line 1123 didn't jump to line 1124 because the condition on line 1123 was never true

1124 continue 

1125 

1126 newerbin = None 

1127 if pkg.pkg_id.package_name in suite_binaries_on_arch: 

1128 newerbin = suite_binaries_on_arch[pkg.pkg_id.package_name] 

1129 if suite.is_cruft(newerbin): 

1130 # We pretend the cruft binary doesn't exist. 

1131 # We handle this as if the source didn't have the binary 

1132 # (see below) 

1133 newerbin = None 

1134 elif apt_pkg.version_compare(newerbin.version, pkg.version) <= 0: 

1135 continue 

1136 else: 

1137 if source not in suite.sources: 

1138 # bin and source not in suite: no newer version 

1139 continue 

1140 

1141 if not newerbin: 

1142 if not add_source_for_dropped_bin: 1142 ↛ 1143line 1142 didn't jump to line 1143 because the condition on line 1142 was never true

1143 continue 

1144 # We only get here if there is a newer version of the source, 

1145 # which doesn't have the binary anymore (either it doesn't 

1146 # exist, or it's cruft and we pretend it doesn't exist). 

1147 # Add the new source instead. 

1148 nsrc = suite.sources[source] 

1149 n_id = PackageId(source, nsrc.version, "source") 

1150 overs = pkg.source_version 

1151 if apt_pkg.version_compare(nsrc.version, overs) <= 0: 

1152 continue 

1153 else: 

1154 n_id = newerbin.pkg_id 

1155 

1156 newer_versions.append((n_id, suite)) 

1157 

1158 return newer_versions 

1159 

1160 

1161def parse_provides( 

1162 provides_raw: str, 

1163 pkg_id: BinaryPackageId | None = None, 

1164 logger: logging.Logger | None = None, 

1165) -> list[tuple[str, str, str]]: 

1166 parts = apt_pkg.parse_depends(provides_raw, False) 

1167 nprov = [] 

1168 for or_clause in parts: 

1169 if len(or_clause) != 1: # pragma: no cover 

1170 if logger is not None: 

1171 msg = "Ignoring invalid provides in %s: Alternatives [%s]" 

1172 logger.warning(msg, str(pkg_id), str(or_clause)) 

1173 continue 

1174 for part in or_clause: 

1175 provided, provided_version, op = part 

1176 if op != "" and op != "=": # pragma: no cover 

1177 if logger is not None: 

1178 msg = "Ignoring invalid provides in %s: %s (%s %s)" 

1179 logger.warning(msg, str(pkg_id), provided, op, provided_version) 

1180 continue 

1181 provided = sys.intern(provided) 

1182 provided_version = sys.intern(provided_version) 

1183 part = (provided, provided_version, sys.intern(op)) 

1184 nprov.append(part) 

1185 return nprov 

1186 

1187 

1188def parse_builtusing( 

1189 builtusing_raw: str, 

1190 pkg_id: BinaryPackageId | None = None, 

1191 logger: logging.Logger | None = None, 

1192) -> list[tuple[str, str]]: 

1193 parts = apt_pkg.parse_depends(builtusing_raw, False) 

1194 nbu = [] 

1195 for or_clause in parts: 

1196 if len(or_clause) != 1: # pragma: no cover 

1197 if logger is not None: 

1198 msg = "Ignoring invalid builtusing in %s: Alternatives [%s]" 

1199 logger.warning(msg, str(pkg_id), str(or_clause)) 

1200 continue 

1201 for part in or_clause: 

1202 bu, bu_version, op = part 

1203 if op != "=": # pragma: no cover 

1204 if logger is not None: 

1205 msg = "Ignoring invalid builtusing in %s: %s (%s %s)" 

1206 logger.warning(msg, str(pkg_id), bu, op, bu_version) 

1207 continue 

1208 bu = sys.intern(bu) 

1209 bu_version = sys.intern(bu_version) 

1210 nbu.append((bu, bu_version)) 

1211 return nbu 

1212 

1213 

1214def parse_option( 

1215 options: "optparse.Values", 

1216 option_name: str, 

1217 default: Any | None = None, 

1218 to_bool: bool = False, 

1219 to_int: bool = False, 

1220 day_to_sec: bool = False, 

1221) -> None: 

1222 """Ensure the option exist and has a sane value 

1223 

1224 :param options: dict with options 

1225 

1226 :param option_name: string with the name of the option 

1227 

1228 :param default: the default value for the option 

1229 

1230 :param to_int: convert the input to int (defaults to sys.maxsize) 

1231 

1232 :param to_bool: convert the input to bool 

1233 

1234 :param day_to_sec: convert the input from days to seconds (implies to_int=True) 

1235 """ 

1236 value = getattr(options, option_name, default) 

1237 

1238 # Option was provided with no value (or default is '') so pick up the default 

1239 if value == "": 

1240 value = default 

1241 

1242 if (to_int or day_to_sec) and value in (None, ""): 

1243 value = sys.maxsize 

1244 

1245 if day_to_sec: 

1246 value = int(float(value) * 24 * 60 * 60) # type: ignore[arg-type] 

1247 

1248 if to_int: 

1249 value = int(value) # type: ignore[arg-type] 

1250 

1251 if to_bool: 

1252 if value and ( 

1253 isinstance(value, bool) or value.lower() in ("yes", "y", "true", "t", "1") 

1254 ): 

1255 value = True 

1256 else: 

1257 value = False 

1258 

1259 setattr(options, option_name, value)