Coverage for britney2/installability/tester.py: 98%

341 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-03-23 07:34 +0000

1# -*- coding: utf-8 -*- 

2 

3# Copyright (C) 2012 Niels Thykier <niels@thykier.net> 

4 

5# This program is free software; you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation; either version 2 of the License, or 

8# (at your option) any later version. 

9 

10# This program is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14 

15import logging 

16from builtins import frozenset as _frozenset 

17from collections import defaultdict 

18from collections.abc import Callable, Iterable, Iterator, MutableSet, Sized 

19from functools import partial 

20from itertools import chain, filterfalse 

21from typing import ( 

22 TYPE_CHECKING, 

23 Any, 

24 Literal, 

25 Optional, 

26 Union, 

27 cast, 

28) 

29 

30from britney2.utils import add_transitive_dependencies_flatten, iter_except 

31 

32if TYPE_CHECKING: 32 ↛ 33line 32 didn't jump to line 33, because the condition on line 32 was never true

33 from .. import BinaryPackageId 

34 from .universe import BinaryPackageUniverse 

35 

36 

37class InstallabilityTester(object): 

38 def __init__( 

39 self, universe: "BinaryPackageUniverse", suite_contents: set["BinaryPackageId"] 

40 ) -> None: 

41 """Create a new installability tester 

42 

43 suite_contents is a (mutable) set of package ids that determines 

44 which of the packages in universe are currently in the suite. 

45 

46 Package id: (pkg_name, pkg_version, pkg_arch) 

47 - NB: arch:all packages are "re-mapped" to given architecture. 

48 (simplifies caches and dependency checking) 

49 """ 

50 

51 self._universe = universe 

52 # FIXME: Move this field to TargetSuite 

53 self._suite_contents = suite_contents 

54 self._stats = InstallabilityStats() 

55 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

56 self.logger = logging.getLogger(logger_name) 

57 

58 # Cache of packages known to be broken - we deliberately do not 

59 # include "broken" in it. See _optimize for more info. 

60 self._cache_broken: set["BinaryPackageId"] = set() 

61 # Cache of packages known to be installable 

62 self._cache_inst: set["BinaryPackageId"] = set() 

63 # Per "arch" cache of the "minimal" (possibly incomplete) 

64 # pseudo-essential set. This includes all the packages that 

65 # are essential and packages that will always follow. 

66 # 

67 # It may not be a complete essential set, since alternatives 

68 # are not always resolved. Noticeably cases like "awk" may be 

69 # left out (since it could be either gawk, mawk or 

70 # original-awk) unless something in this sets depends strictly 

71 # on one of them 

72 self._cache_ess: dict[ 

73 str, 

74 tuple[ 

75 frozenset["BinaryPackageId"], 

76 frozenset["BinaryPackageId"], 

77 frozenset[frozenset["BinaryPackageId"]], 

78 ], 

79 ] = {} 

80 

81 essential_w_transitive_deps: MutableSet["BinaryPackageId"] = set( 

82 universe.essential_packages 

83 ) 

84 add_transitive_dependencies_flatten(universe, essential_w_transitive_deps) 

85 self._cache_essential_transitive_dependencies = essential_w_transitive_deps 

86 

87 def compute_installability(self) -> None: 

88 """Computes the installability of all the packages in the suite 

89 

90 This method computes the installability of all packages in 

91 the suite and caches the result. This has the advantage of 

92 making "is_installable" queries very fast for all packages 

93 in the suite. 

94 """ 

95 

96 universe = self._universe 

97 check_inst = self._check_inst 

98 cbroken = self._cache_broken 

99 cache_inst = self._cache_inst 

100 suite_contents = self._suite_contents 

101 tcopy = [x for x in suite_contents] 

102 for t in filterfalse(cache_inst.__contains__, tcopy): 

103 if t in cbroken: 103 ↛ 104line 103 didn't jump to line 104, because the condition on line 103 was never true

104 continue 

105 res = check_inst(t) 

106 if t in universe.equivalent_packages: 

107 eqv = ( 

108 x for x in universe.packages_equivalent_to(t) if x in suite_contents 

109 ) 

110 if res: 

111 cache_inst.update(eqv) 

112 else: 

113 eqv_set = frozenset(eqv) 

114 suite_contents -= eqv_set 

115 cbroken |= eqv_set 

116 

117 @property 

118 def stats(self) -> "InstallabilityStats": 

119 return self._stats 

120 

121 def any_of_these_are_in_the_suite(self, pkgs: Iterable["BinaryPackageId"]) -> bool: 

122 """Test if at least one package of a given set is in the suite 

123 

124 :param pkgs: A set of package ids (as defined in the constructor) 

125 :return: True if any of the packages in pkgs are currently in the suite 

126 """ 

127 return not self._suite_contents.isdisjoint(pkgs) 

128 

129 def is_pkg_in_the_suite(self, pkg_id: "BinaryPackageId") -> bool: 

130 """Test if the package of is in the suite 

131 

132 :param pkg_id: A package id (as defined in the constructor) 

133 :return: True if the pkg is currently in the suite 

134 """ 

135 return pkg_id in self._suite_contents 

136 

137 def which_of_these_are_in_the_suite( 

138 self, pkgs: Iterable["BinaryPackageId"] 

139 ) -> Iterator["BinaryPackageId"]: 

140 """Iterate over all packages that are in the suite 

141 

142 :param pkgs: An iterable of package ids 

143 :return: An iterable of package ids that are in the suite 

144 """ 

145 yield from (x for x in pkgs if x in self._suite_contents) 

146 

147 def add_binary(self, pkg_id: "BinaryPackageId") -> Literal[True]: 

148 """Add a binary package to the suite 

149 

150 If the package is not known, this method will throw an 

151 KeyError. 

152 """ 

153 

154 if pkg_id not in self._universe: # pragma: no cover 

155 raise KeyError(str(pkg_id)) 

156 

157 if pkg_id in self._universe.broken_packages: 

158 self._suite_contents.add(pkg_id) 

159 elif pkg_id not in self._suite_contents: 159 ↛ 176line 159 didn't jump to line 176, because the condition on line 159 was never false

160 self._suite_contents.add(pkg_id) 

161 if self._cache_inst: 

162 self._stats.cache_drops += 1 

163 self._cache_inst = set() 

164 if self._cache_broken: 

165 # Re-add broken packages as some of them may now be installable 

166 self._suite_contents |= self._cache_broken 

167 self._cache_broken = set() 

168 if ( 

169 pkg_id in self._cache_essential_transitive_dependencies 

170 and pkg_id.architecture in self._cache_ess 

171 ): 

172 # Adds new possibly pseudo-essential => "pseudo-essential" set needs to be 

173 # recomputed 

174 del self._cache_ess[pkg_id.architecture] 

175 

176 return True 

177 

178 def remove_binary(self, pkg_id: "BinaryPackageId") -> Literal[True]: 

179 """Remove a binary from the suite 

180 

181 :param pkg_id: The id of the package 

182 :raises KeyError: if the package is not known 

183 """ 

184 

185 if pkg_id not in self._universe: # pragma: no cover 

186 raise KeyError(str(pkg_id)) 

187 

188 self._cache_broken.discard(pkg_id) 

189 

190 if pkg_id in self._suite_contents: 

191 self._suite_contents.remove(pkg_id) 

192 if pkg_id.architecture in self._cache_ess: 

193 (start, ess_never, ess_choices) = self._cache_ess[pkg_id.architecture] 

194 if pkg_id in start or any( 

195 [pkg_id in choices for choices in ess_choices] 

196 ): 

197 # Removes a package from the "pseudo-essential set" 

198 del self._cache_ess[pkg_id.architecture] 

199 

200 if not self._universe.reverse_dependencies_of(pkg_id): 

201 # no reverse relations - safe 

202 return True 

203 if ( 

204 pkg_id not in self._universe.broken_packages 

205 and pkg_id in self._cache_inst 

206 ): 

207 # It is in our cache (and not guaranteed to be broken) - throw out the cache 

208 self._cache_inst = set() 

209 self._stats.cache_drops += 1 

210 

211 return True 

212 

213 def is_installable(self, pkg_id: "BinaryPackageId") -> bool: 

214 """Test if a package is installable in this package set 

215 

216 The package is assumed to be in the suite and only packages in 

217 the suite can be used to satisfy relations. 

218 

219 :param pkg_id: The id of the package 

220 Returns True iff the package is installable. 

221 Returns False otherwise. 

222 """ 

223 

224 self._stats.is_installable_calls += 1 

225 

226 if pkg_id not in self._universe: # pragma: no cover 

227 raise KeyError(str(pkg_id)) 

228 

229 if ( 

230 pkg_id not in self._suite_contents 

231 or pkg_id in self._universe.broken_packages 

232 ): 

233 self._stats.cache_hits += 1 

234 return False 

235 

236 if pkg_id in self._cache_inst: 

237 self._stats.cache_hits += 1 

238 return True 

239 

240 self._stats.cache_misses += 1 

241 return self._check_inst(pkg_id) 

242 

243 def _check_inst( 

244 self, 

245 t: "BinaryPackageId", 

246 musts: Optional[set["BinaryPackageId"]] = None, 

247 never: Optional[set["BinaryPackageId"]] = None, 

248 choices: Optional[set[frozenset["BinaryPackageId"]]] = None, 

249 ) -> bool: 

250 # See the explanation of musts, never and choices below. 

251 stats = self._stats 

252 universe = self._universe 

253 suite_contents = self._suite_contents 

254 cbroken = self._cache_broken 

255 

256 # Our installability verdict - start with "yes" and change if 

257 # prove otherwise. 

258 verdict = True 

259 

260 # set of packages that must be installed with this package 

261 if musts is None: 

262 musts = set() 

263 musts.add(t) 

264 # set of packages we can *never* choose (e.g. due to conflicts) 

265 if never is None: 

266 never = set() 

267 # set of relations were we have a choice, but where we have not 

268 # committed ourselves yet. Hopefully some choices may be taken 

269 # for us (if one of the alternatives appear in "musts") 

270 if choices is None: 

271 choices = set() 

272 

273 # The subset of musts we haven't checked yet. 

274 check = [t] 

275 

276 if len(musts) == 1: 

277 # Include the essential packages in the suite as a starting point. 

278 if t.architecture not in self._cache_ess: 

279 # The minimal essential set cache is not present - 

280 # compute it now. 

281 (start, ess_never, ess_choices) = self._get_min_pseudo_ess_set( 

282 t.architecture 

283 ) 

284 else: 

285 (start, ess_never, ess_choices) = self._cache_ess[t.architecture] 

286 

287 if t in ess_never: 

288 # t conflicts with something in the essential set or the essential 

289 # set conflicts with t - either way, t is f***ed 

290 cbroken.add(t) 

291 suite_contents.remove(t) 

292 stats.conflicts_essential += 1 

293 return False 

294 musts.update(start) 

295 never.update(ess_never) 

296 choices.update(ess_choices) 

297 

298 # curry check_loop 

299 check_loop = partial( 

300 self._check_loop, universe, suite_contents, stats, musts, never, cbroken 

301 ) 

302 

303 # Useful things to remember: 

304 # 

305 # * musts and never are disjointed at all times 

306 # - if not, t cannot be installable. Either t, or one of 

307 # its dependencies conflict with t or one of its (other) 

308 # dependencies. 

309 # 

310 # * choices should generally be avoided as much as possible. 

311 # - picking a bad choice requires backtracking 

312 # - sometimes musts/never will eventually "solve" the choice. 

313 # 

314 # * check never includes choices (these are always in choices) 

315 # 

316 # * A package is installable if never and musts are disjointed 

317 # and both check and choices are empty. 

318 # - exception: resolve_choices may determine the installability 

319 # of t via recursion (calls _check_inst). In this case 

320 # check and choices are not (always) empty. 

321 

322 def _prune_choices( 

323 rebuild: set[frozenset["BinaryPackageId"]], 

324 len: Callable[[Sized], int] = len, 

325 ) -> bool: 

326 """Picks a choice from choices and updates rebuild. 

327 

328 Prunes the choices and updates "rebuild" to reflect the 

329 pruned choices. 

330 

331 Returns True if t is installable (determined via recursion). 

332 Returns False if a choice was picked and added to check. 

333 Returns None if t is uninstallable (no choice can be picked). 

334 

335 NB: If this returns False, choices should be replaced by 

336 rebuild. 

337 """ 

338 

339 assert musts is not None 

340 assert choices is not None 

341 assert never is not None 

342 # We already satisfied/chosen at least one of the literals 

343 # in the choice, so the choice is gone 

344 for choice in filter(musts.isdisjoint, choices): 

345 # cbroken is needed here because (in theory) it could 

346 # have changed since the choice was discovered and it 

347 # is smaller than suite_contents (so presumably faster) 

348 remain = choice - never - cbroken 

349 

350 if len(remain) == 1: 

351 # the choice was reduced to one package we haven't checked - check that 

352 check.extend(remain) 

353 musts.update(remain) 

354 stats.choice_presolved += 1 

355 continue 

356 

357 if not remain: 

358 # all alternatives would violate the conflicts or are uninstallable 

359 # => package is not installable 

360 stats.choice_presolved += 1 

361 return False 

362 

363 # The choice is still deferred 

364 rebuild.add(frozenset(remain)) 

365 

366 return True 

367 

368 # END _prune_choices 

369 

370 while check: 

371 if not check_loop(choices, check): 

372 verdict = False 

373 break 

374 

375 if choices: 

376 rebuild: set[frozenset["BinaryPackageId"]] = set() 

377 

378 if not _prune_choices(rebuild): 

379 verdict = False 

380 break 

381 

382 if not check and rebuild: 

383 # We have to "guess" now, which is always fun, but not cheap. We 

384 # stop guessing: 

385 # - once we run out of choices to make (obviously), OR 

386 # - if one of the choices exhaust all but one option 

387 if self.resolve_choices(check, musts, never, rebuild): 

388 # The recursive call have already updated the 

389 # cache so there is not point in doing it again. 

390 return True 

391 choices = rebuild 

392 

393 if verdict: 

394 # if t is installable, then so are all packages in musts 

395 self._cache_inst.update(musts) 

396 stats.solved_installable += 1 

397 else: 

398 stats.solved_uninstallable += 1 

399 

400 return verdict 

401 

402 def resolve_choices( 

403 self, 

404 check: list["BinaryPackageId"], 

405 musts: set["BinaryPackageId"], 

406 never: set["BinaryPackageId"], 

407 choices: set[frozenset["BinaryPackageId"]], 

408 ) -> bool: 

409 universe = self._universe 

410 suite_contents = self._suite_contents 

411 stats = self._stats 

412 cbroken = self._cache_broken 

413 

414 while choices: 

415 choice_options = choices.pop() 

416 

417 choice = iter(choice_options) 

418 last = next(choice) # pick one to go last 

419 solved = False 

420 for p in choice: 

421 musts_copy = musts.copy() 

422 never_tmp: set["BinaryPackageId"] = set() 

423 choices_tmp: set[frozenset["BinaryPackageId"]] = set() 

424 check_tmp = [p] 

425 # _check_loop assumes that "musts" is up to date 

426 musts_copy.add(p) 

427 if not self._check_loop( 

428 universe, 

429 suite_contents, 

430 stats, 

431 musts_copy, 

432 never_tmp, 

433 cbroken, 

434 choices_tmp, 

435 check_tmp, 

436 ): 

437 # p cannot be chosen/is broken (unlikely, but ...) 

438 continue 

439 

440 # Test if we can pick p without any consequences. 

441 # - when we can, we avoid a backtrack point. 

442 if never_tmp <= never and choices_tmp <= choices: 

443 # we can pick p without picking up new conflicts 

444 # or unresolved choices. Therefore we commit to 

445 # using p. 

446 musts.update(musts_copy) 

447 stats.choice_resolved_without_restore_point += 1 

448 solved = True 

449 break 

450 

451 if not musts.isdisjoint(never_tmp): 

452 # If we pick p, we will definitely end up making 

453 # t uninstallable, so p is a no-go. 

454 continue 

455 

456 stats.backtrace_restore_point_created += 1 

457 # We are not sure that p is safe, setup a backtrack 

458 # point and recurse. 

459 never_tmp |= never 

460 choices_tmp |= choices 

461 if self._check_inst(p, musts_copy, never_tmp, choices_tmp): 

462 # Success, p was a valid choice and made it all 

463 # installable 

464 return True 

465 

466 # If we get here, we failed to find something that 

467 # would satisfy choice (without breaking the 

468 # installability of t). This means p cannot be used 

469 # to satisfy the dependencies, so pretend to conflict 

470 # with it - hopefully it will reduce future choices. 

471 never.add(p) 

472 stats.backtrace_restore_point_used += 1 

473 

474 if not solved: 

475 # Optimization for the last case; avoid the recursive call 

476 # and just assume the last will lead to a solution. If it 

477 # doesn't there is no solution and if it does, we don't 

478 # have to back-track anyway. 

479 check.append(last) 

480 musts.add(last) 

481 stats.backtrace_last_option += 1 

482 return False 

483 return False 

484 

485 def _check_loop( 

486 self, 

487 universe: "BinaryPackageUniverse", 

488 suite_contents: set["BinaryPackageId"], 

489 stats: "InstallabilityStats", 

490 musts: set["BinaryPackageId"], 

491 never: set["BinaryPackageId"], 

492 cbroken: set["BinaryPackageId"], 

493 choices: set[frozenset["BinaryPackageId"]], 

494 check: list["BinaryPackageId"], 

495 len: Callable[[Sized], int] = len, 

496 frozenset: Callable[..., Any] = frozenset, 

497 ) -> bool: 

498 """Finds all guaranteed dependencies via "check". 

499 

500 If it returns False, t is not installable. If it returns True 

501 then "check" is exhausted. If "choices" are empty and this 

502 returns True, then t is installable. 

503 """ 

504 # Local variables for faster access... 

505 not_satisfied: partial[filter["BinaryPackageId"]] = partial( 

506 filter, musts.isdisjoint 

507 ) 

508 

509 # While we have guaranteed dependencies (in check), examine all 

510 # of them. 

511 for cur in iter_except(check.pop, IndexError): 

512 relations = universe.relations_of(cur) 

513 

514 if relations.negative_dependencies: 

515 # Conflicts? 

516 if cur in never: 

517 # cur adds a (reverse) conflict, so check if cur 

518 # is in never. 

519 # 

520 # - there is a window where two conflicting 

521 # packages can be in check. Example "A" depends 

522 # on "B" and "C". If "B" conflicts with "C", 

523 # then both "B" and "C" could end in "check". 

524 return False 

525 # We must install cur for the package to be installable, 

526 # so "obviously" we can never choose any of its conflicts 

527 never.update(relations.negative_dependencies & suite_contents) 

528 

529 # depgroup can be satisfied by picking something that is 

530 # already in musts - lets pick that (again). :) 

531 for depgroup in cast( 

532 set[_frozenset["BinaryPackageId"]], 

533 not_satisfied(relations.dependencies), 

534 ): 

535 # Of all the packages listed in the relation remove those that 

536 # are either: 

537 # - not in the suite 

538 # - known to be broken (by cache) 

539 # - in never 

540 candidates = (depgroup & suite_contents) - never 

541 

542 if not candidates: 

543 # We got no candidates to satisfy it - this 

544 # package cannot be installed with the current 

545 # (version of the) suite 

546 if cur not in cbroken and depgroup.isdisjoint(never): 

547 # cur's dependency cannot be satisfied even if never was empty. 

548 # This means that cur itself is broken (as well). 

549 cbroken.add(cur) 

550 suite_contents.remove(cur) 

551 return False 

552 if len(candidates) == 1: 

553 # only one possible solution to this choice and we 

554 # haven't seen it before 

555 check.extend(candidates) 

556 musts.update(candidates) 

557 else: 

558 possible_eqv = set( 

559 x for x in candidates if x in universe.equivalent_packages 

560 ) 

561 if len(possible_eqv) > 1: 

562 # Exploit equivalency to reduce the number of 

563 # candidates if possible. Basically, this 

564 # code maps "similar" candidates into a single 

565 # candidate that will give a identical result 

566 # to any other candidate it eliminates. 

567 # 

568 # See InstallabilityTesterBuilder's 

569 # _build_eqv_packages_table method for more 

570 # information on how this works. 

571 new_cand = set(x for x in candidates if x not in possible_eqv) 

572 stats.eqv_table_times_used += 1 

573 

574 for chosen in iter_except(possible_eqv.pop, KeyError): 

575 new_cand.add(chosen) 

576 possible_eqv -= universe.packages_equivalent_to(chosen) 

577 stats.eqv_table_total_number_of_alternatives_eliminated += len( 

578 candidates 

579 ) - len(new_cand) 

580 if len(new_cand) == 1: 

581 check.extend(new_cand) 

582 musts.update(new_cand) 

583 stats.eqv_table_reduced_to_one += 1 

584 continue 

585 elif len(candidates) == len(new_cand): 

586 stats.eqv_table_reduced_by_zero += 1 

587 

588 candidates = frozenset(new_cand) 

589 else: 

590 # Candidates have to be a frozenset to be added to choices 

591 candidates = frozenset(candidates) 

592 # defer this choice till later 

593 choices.add(candidates) 

594 return True 

595 

596 def _get_min_pseudo_ess_set(self, arch: str) -> tuple[ 

597 frozenset["BinaryPackageId"], 

598 frozenset["BinaryPackageId"], 

599 frozenset[frozenset["BinaryPackageId"]], 

600 ]: 

601 if arch not in self._cache_ess: 601 ↛ 662line 601 didn't jump to line 662, because the condition on line 601 was never false

602 # The minimal essential set cache is not present - 

603 # compute it now. 

604 suite_contents = self._suite_contents 

605 cbroken = self._cache_broken 

606 universe = self._universe 

607 stats = self._stats 

608 

609 ess_base = [ 

610 x 

611 for x in self._universe.essential_packages 

612 if x.architecture == arch and x in suite_contents 

613 ] 

614 start = set(ess_base) 

615 ess_never: set["BinaryPackageId"] = set() 

616 ess_choices: set[frozenset["BinaryPackageId"]] = set() 

617 not_satisfied: partial[filter["BinaryPackageId"]] = partial( 

618 filter, start.isdisjoint 

619 ) 

620 

621 while ess_base: 

622 self._check_loop( 

623 universe, 

624 suite_contents, 

625 stats, 

626 start, 

627 ess_never, 

628 cbroken, 

629 ess_choices, 

630 ess_base, 

631 ) 

632 if ess_choices: 

633 # Try to break choices where possible 

634 nchoice = set() 

635 for choice in cast( 

636 set[frozenset["BinaryPackageId"]], not_satisfied(ess_choices) 

637 ): 

638 b = False 

639 for c in choice: 

640 relations = universe.relations_of(c) 

641 if ( 

642 relations.negative_dependencies <= ess_never 

643 and not any(not_satisfied(relations.dependencies)) 

644 ): 

645 ess_base.append(c) 

646 b = True 

647 break 

648 if not b: 

649 nchoice.add(choice) 

650 ess_choices = nchoice 

651 else: 

652 break 

653 

654 for x in start: 

655 ess_never.update(universe.negative_dependencies_of(x)) 

656 self._cache_ess[arch] = ( 

657 frozenset(start), 

658 frozenset(ess_never), 

659 frozenset(ess_choices), 

660 ) 

661 

662 return self._cache_ess[arch] 

663 

664 def compute_stats(self) -> dict[str, "ArchStats"]: 

665 universe = self._universe 

666 graph_stats: dict[str, ArchStats] = defaultdict(ArchStats) 

667 seen_eqv: dict[str, set["BinaryPackageId"]] = defaultdict(set) 

668 

669 for pkg in universe: 

670 (pkg_name, pkg_version, pkg_arch) = pkg 

671 relations = universe.relations_of(pkg) 

672 arch_stats = graph_stats[pkg_arch] 

673 

674 arch_stats.nodes += 1 

675 

676 if pkg in universe.equivalent_packages and pkg not in seen_eqv[pkg_arch]: 

677 eqv = [ 

678 e 

679 for e in universe.packages_equivalent_to(pkg) 

680 if e.architecture == pkg_arch 

681 ] 

682 arch_stats.eqv_nodes += len(eqv) 

683 

684 arch_stats.add_dep_edges(relations.dependencies) 

685 arch_stats.add_con_edges(relations.negative_dependencies) 

686 

687 for stat in graph_stats.values(): 

688 stat.compute_all() 

689 

690 return graph_stats 

691 

692 

693class InstallabilityStats(object): 

694 def __init__(self) -> None: 

695 self.cache_hits = 0 

696 self.cache_misses = 0 

697 self.cache_drops = 0 

698 self.backtrace_restore_point_created = 0 

699 self.backtrace_restore_point_used = 0 

700 self.backtrace_last_option = 0 

701 self.choice_presolved = 0 

702 self.choice_resolved_without_restore_point = 0 

703 self.is_installable_calls = 0 

704 self.solved_installable = 0 

705 self.solved_uninstallable = 0 

706 self.conflicts_essential = 0 

707 self.eqv_table_times_used = 0 

708 self.eqv_table_reduced_to_one = 0 

709 self.eqv_table_reduced_by_zero = 0 

710 self.eqv_table_total_number_of_alternatives_eliminated = 0 

711 

712 def stats(self) -> list[str]: 

713 formats = [ 

714 "Requests - is_installable: {is_installable_calls}", 

715 "Cache - hits: {cache_hits}, misses: {cache_misses}, drops: {cache_drops}", 

716 "Choices - pre-solved: {choice_presolved}, No RP: {choice_resolved_without_restore_point}", 

717 "Backtrace - RP created: {backtrace_restore_point_created}, RP used: {backtrace_restore_point_used}, reached last option: {backtrace_last_option}", # nopep8 

718 "Solved - installable: {solved_installable}, uninstallable: {solved_uninstallable}, conflicts essential: {conflicts_essential}", # nopep8 

719 "Eqv - times used: {eqv_table_times_used}, perfect reductions: {eqv_table_reduced_to_one}, failed reductions: {eqv_table_reduced_by_zero}, total no. of alternatives pruned: {eqv_table_total_number_of_alternatives_eliminated}", # nopep8 

720 ] 

721 return [x.format(**self.__dict__) for x in formats] 

722 

723 

724class ArchStats(object): 

725 def __init__(self) -> None: 

726 self.nodes = 0 

727 self.eqv_nodes = 0 

728 self.dep_edges: list[frozenset[frozenset["BinaryPackageId"]]] = [] 

729 self.con_edges: list[frozenset["BinaryPackageId"]] = [] 

730 self.stats: dict[str, dict[str, Union[int, float]]] = defaultdict( 

731 lambda: defaultdict(int) 

732 ) 

733 

734 def stat(self, statname: str) -> dict[str, Union[int, float]]: 

735 return self.stats[statname] 

736 

737 def stat_summary(self) -> list[str]: 

738 text = [] 

739 values: Union[list[Any], tuple[Any, ...]] 

740 for statname in [ 

741 "nodes", 

742 "dependency-clauses", 

743 "dependency-clause-alternatives", 

744 "negative-dependency-clauses", 

745 ]: 

746 stat = self.stats[statname] 

747 if statname != "nodes": 

748 format_str = "%s, max: %d, min: %d, median: %d, average: %f (%d/%d)" 

749 values = [ 

750 statname, 

751 stat["max"], 

752 stat["min"], 

753 stat["median"], 

754 stat["average"], 

755 stat["sum"], 

756 stat["size"], 

757 ] 

758 if "average-per-node" in stat: 

759 format_str += ", average-per-node: %f" 

760 values.append(stat["average-per-node"]) 

761 else: 

762 format_str = "nodes: %d, eqv-nodes: %d" 

763 values = (self.nodes, self.eqv_nodes) 

764 text.append(format_str % tuple(values)) 

765 return text 

766 

767 def add_dep_edges(self, edges: frozenset[frozenset["BinaryPackageId"]]) -> None: 

768 self.dep_edges.append(edges) 

769 

770 def add_con_edges(self, edges: frozenset["BinaryPackageId"]) -> None: 

771 self.con_edges.append(edges) 

772 

773 def _list_stats( 

774 self, stat_name: str, sorted_list: list[int], average_per_node: bool = False 

775 ) -> None: 

776 if sorted_list: 

777 stats = self.stats[stat_name] 

778 stats["max"] = sorted_list[-1] 

779 stats["min"] = sorted_list[0] 

780 stats["sum"] = sum(sorted_list) 

781 stats["size"] = len(sorted_list) 

782 stats["average"] = float(stats["sum"]) / len(sorted_list) 

783 stats["median"] = sorted_list[len(sorted_list) // 2] 

784 if average_per_node: 

785 stats["average-per-node"] = float(stats["sum"]) / self.nodes 

786 

787 def compute_all(self) -> None: 

788 dep_edges = self.dep_edges 

789 con_edges = self.con_edges 

790 sorted_no_dep_edges = sorted(len(x) for x in dep_edges) 

791 sorted_size_dep_edges = sorted(len(x) for x in chain.from_iterable(dep_edges)) 

792 sorted_no_con_edges = sorted(len(x) for x in con_edges) 

793 self._list_stats("dependency-clauses", sorted_no_dep_edges) 

794 self._list_stats( 

795 "dependency-clause-alternatives", 

796 sorted_size_dep_edges, 

797 average_per_node=True, 

798 ) 

799 self._list_stats("negative-dependency-clauses", sorted_no_con_edges)