Coverage for britney2/installability/tester.py: 98%
341 statements
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2025-03-23 07:34 +0000
1# -*- coding: utf-8 -*-
3# Copyright (C) 2012 Niels Thykier <niels@thykier.net>
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
15import logging
16from builtins import frozenset as _frozenset
17from collections import defaultdict
18from collections.abc import Callable, Iterable, Iterator, MutableSet, Sized
19from functools import partial
20from itertools import chain, filterfalse
21from typing import (
22 TYPE_CHECKING,
23 Any,
24 Literal,
25 Optional,
26 Union,
27 cast,
28)
30from britney2.utils import add_transitive_dependencies_flatten, iter_except
32if TYPE_CHECKING: 32 ↛ 33line 32 didn't jump to line 33, because the condition on line 32 was never true
33 from .. import BinaryPackageId
34 from .universe import BinaryPackageUniverse
37class InstallabilityTester(object):
38 def __init__(
39 self, universe: "BinaryPackageUniverse", suite_contents: set["BinaryPackageId"]
40 ) -> None:
41 """Create a new installability tester
43 suite_contents is a (mutable) set of package ids that determines
44 which of the packages in universe are currently in the suite.
46 Package id: (pkg_name, pkg_version, pkg_arch)
47 - NB: arch:all packages are "re-mapped" to given architecture.
48 (simplifies caches and dependency checking)
49 """
51 self._universe = universe
52 # FIXME: Move this field to TargetSuite
53 self._suite_contents = suite_contents
54 self._stats = InstallabilityStats()
55 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
56 self.logger = logging.getLogger(logger_name)
58 # Cache of packages known to be broken - we deliberately do not
59 # include "broken" in it. See _optimize for more info.
60 self._cache_broken: set["BinaryPackageId"] = set()
61 # Cache of packages known to be installable
62 self._cache_inst: set["BinaryPackageId"] = set()
63 # Per "arch" cache of the "minimal" (possibly incomplete)
64 # pseudo-essential set. This includes all the packages that
65 # are essential and packages that will always follow.
66 #
67 # It may not be a complete essential set, since alternatives
68 # are not always resolved. Noticeably cases like "awk" may be
69 # left out (since it could be either gawk, mawk or
70 # original-awk) unless something in this sets depends strictly
71 # on one of them
72 self._cache_ess: dict[
73 str,
74 tuple[
75 frozenset["BinaryPackageId"],
76 frozenset["BinaryPackageId"],
77 frozenset[frozenset["BinaryPackageId"]],
78 ],
79 ] = {}
81 essential_w_transitive_deps: MutableSet["BinaryPackageId"] = set(
82 universe.essential_packages
83 )
84 add_transitive_dependencies_flatten(universe, essential_w_transitive_deps)
85 self._cache_essential_transitive_dependencies = essential_w_transitive_deps
87 def compute_installability(self) -> None:
88 """Computes the installability of all the packages in the suite
90 This method computes the installability of all packages in
91 the suite and caches the result. This has the advantage of
92 making "is_installable" queries very fast for all packages
93 in the suite.
94 """
96 universe = self._universe
97 check_inst = self._check_inst
98 cbroken = self._cache_broken
99 cache_inst = self._cache_inst
100 suite_contents = self._suite_contents
101 tcopy = [x for x in suite_contents]
102 for t in filterfalse(cache_inst.__contains__, tcopy):
103 if t in cbroken: 103 ↛ 104line 103 didn't jump to line 104, because the condition on line 103 was never true
104 continue
105 res = check_inst(t)
106 if t in universe.equivalent_packages:
107 eqv = (
108 x for x in universe.packages_equivalent_to(t) if x in suite_contents
109 )
110 if res:
111 cache_inst.update(eqv)
112 else:
113 eqv_set = frozenset(eqv)
114 suite_contents -= eqv_set
115 cbroken |= eqv_set
117 @property
118 def stats(self) -> "InstallabilityStats":
119 return self._stats
121 def any_of_these_are_in_the_suite(self, pkgs: Iterable["BinaryPackageId"]) -> bool:
122 """Test if at least one package of a given set is in the suite
124 :param pkgs: A set of package ids (as defined in the constructor)
125 :return: True if any of the packages in pkgs are currently in the suite
126 """
127 return not self._suite_contents.isdisjoint(pkgs)
129 def is_pkg_in_the_suite(self, pkg_id: "BinaryPackageId") -> bool:
130 """Test if the package of is in the suite
132 :param pkg_id: A package id (as defined in the constructor)
133 :return: True if the pkg is currently in the suite
134 """
135 return pkg_id in self._suite_contents
137 def which_of_these_are_in_the_suite(
138 self, pkgs: Iterable["BinaryPackageId"]
139 ) -> Iterator["BinaryPackageId"]:
140 """Iterate over all packages that are in the suite
142 :param pkgs: An iterable of package ids
143 :return: An iterable of package ids that are in the suite
144 """
145 yield from (x for x in pkgs if x in self._suite_contents)
147 def add_binary(self, pkg_id: "BinaryPackageId") -> Literal[True]:
148 """Add a binary package to the suite
150 If the package is not known, this method will throw an
151 KeyError.
152 """
154 if pkg_id not in self._universe: # pragma: no cover
155 raise KeyError(str(pkg_id))
157 if pkg_id in self._universe.broken_packages:
158 self._suite_contents.add(pkg_id)
159 elif pkg_id not in self._suite_contents: 159 ↛ 176line 159 didn't jump to line 176, because the condition on line 159 was never false
160 self._suite_contents.add(pkg_id)
161 if self._cache_inst:
162 self._stats.cache_drops += 1
163 self._cache_inst = set()
164 if self._cache_broken:
165 # Re-add broken packages as some of them may now be installable
166 self._suite_contents |= self._cache_broken
167 self._cache_broken = set()
168 if (
169 pkg_id in self._cache_essential_transitive_dependencies
170 and pkg_id.architecture in self._cache_ess
171 ):
172 # Adds new possibly pseudo-essential => "pseudo-essential" set needs to be
173 # recomputed
174 del self._cache_ess[pkg_id.architecture]
176 return True
178 def remove_binary(self, pkg_id: "BinaryPackageId") -> Literal[True]:
179 """Remove a binary from the suite
181 :param pkg_id: The id of the package
182 :raises KeyError: if the package is not known
183 """
185 if pkg_id not in self._universe: # pragma: no cover
186 raise KeyError(str(pkg_id))
188 self._cache_broken.discard(pkg_id)
190 if pkg_id in self._suite_contents:
191 self._suite_contents.remove(pkg_id)
192 if pkg_id.architecture in self._cache_ess:
193 (start, ess_never, ess_choices) = self._cache_ess[pkg_id.architecture]
194 if pkg_id in start or any(
195 [pkg_id in choices for choices in ess_choices]
196 ):
197 # Removes a package from the "pseudo-essential set"
198 del self._cache_ess[pkg_id.architecture]
200 if not self._universe.reverse_dependencies_of(pkg_id):
201 # no reverse relations - safe
202 return True
203 if (
204 pkg_id not in self._universe.broken_packages
205 and pkg_id in self._cache_inst
206 ):
207 # It is in our cache (and not guaranteed to be broken) - throw out the cache
208 self._cache_inst = set()
209 self._stats.cache_drops += 1
211 return True
213 def is_installable(self, pkg_id: "BinaryPackageId") -> bool:
214 """Test if a package is installable in this package set
216 The package is assumed to be in the suite and only packages in
217 the suite can be used to satisfy relations.
219 :param pkg_id: The id of the package
220 Returns True iff the package is installable.
221 Returns False otherwise.
222 """
224 self._stats.is_installable_calls += 1
226 if pkg_id not in self._universe: # pragma: no cover
227 raise KeyError(str(pkg_id))
229 if (
230 pkg_id not in self._suite_contents
231 or pkg_id in self._universe.broken_packages
232 ):
233 self._stats.cache_hits += 1
234 return False
236 if pkg_id in self._cache_inst:
237 self._stats.cache_hits += 1
238 return True
240 self._stats.cache_misses += 1
241 return self._check_inst(pkg_id)
243 def _check_inst(
244 self,
245 t: "BinaryPackageId",
246 musts: Optional[set["BinaryPackageId"]] = None,
247 never: Optional[set["BinaryPackageId"]] = None,
248 choices: Optional[set[frozenset["BinaryPackageId"]]] = None,
249 ) -> bool:
250 # See the explanation of musts, never and choices below.
251 stats = self._stats
252 universe = self._universe
253 suite_contents = self._suite_contents
254 cbroken = self._cache_broken
256 # Our installability verdict - start with "yes" and change if
257 # prove otherwise.
258 verdict = True
260 # set of packages that must be installed with this package
261 if musts is None:
262 musts = set()
263 musts.add(t)
264 # set of packages we can *never* choose (e.g. due to conflicts)
265 if never is None:
266 never = set()
267 # set of relations were we have a choice, but where we have not
268 # committed ourselves yet. Hopefully some choices may be taken
269 # for us (if one of the alternatives appear in "musts")
270 if choices is None:
271 choices = set()
273 # The subset of musts we haven't checked yet.
274 check = [t]
276 if len(musts) == 1:
277 # Include the essential packages in the suite as a starting point.
278 if t.architecture not in self._cache_ess:
279 # The minimal essential set cache is not present -
280 # compute it now.
281 (start, ess_never, ess_choices) = self._get_min_pseudo_ess_set(
282 t.architecture
283 )
284 else:
285 (start, ess_never, ess_choices) = self._cache_ess[t.architecture]
287 if t in ess_never:
288 # t conflicts with something in the essential set or the essential
289 # set conflicts with t - either way, t is f***ed
290 cbroken.add(t)
291 suite_contents.remove(t)
292 stats.conflicts_essential += 1
293 return False
294 musts.update(start)
295 never.update(ess_never)
296 choices.update(ess_choices)
298 # curry check_loop
299 check_loop = partial(
300 self._check_loop, universe, suite_contents, stats, musts, never, cbroken
301 )
303 # Useful things to remember:
304 #
305 # * musts and never are disjointed at all times
306 # - if not, t cannot be installable. Either t, or one of
307 # its dependencies conflict with t or one of its (other)
308 # dependencies.
309 #
310 # * choices should generally be avoided as much as possible.
311 # - picking a bad choice requires backtracking
312 # - sometimes musts/never will eventually "solve" the choice.
313 #
314 # * check never includes choices (these are always in choices)
315 #
316 # * A package is installable if never and musts are disjointed
317 # and both check and choices are empty.
318 # - exception: resolve_choices may determine the installability
319 # of t via recursion (calls _check_inst). In this case
320 # check and choices are not (always) empty.
322 def _prune_choices(
323 rebuild: set[frozenset["BinaryPackageId"]],
324 len: Callable[[Sized], int] = len,
325 ) -> bool:
326 """Picks a choice from choices and updates rebuild.
328 Prunes the choices and updates "rebuild" to reflect the
329 pruned choices.
331 Returns True if t is installable (determined via recursion).
332 Returns False if a choice was picked and added to check.
333 Returns None if t is uninstallable (no choice can be picked).
335 NB: If this returns False, choices should be replaced by
336 rebuild.
337 """
339 assert musts is not None
340 assert choices is not None
341 assert never is not None
342 # We already satisfied/chosen at least one of the literals
343 # in the choice, so the choice is gone
344 for choice in filter(musts.isdisjoint, choices):
345 # cbroken is needed here because (in theory) it could
346 # have changed since the choice was discovered and it
347 # is smaller than suite_contents (so presumably faster)
348 remain = choice - never - cbroken
350 if len(remain) == 1:
351 # the choice was reduced to one package we haven't checked - check that
352 check.extend(remain)
353 musts.update(remain)
354 stats.choice_presolved += 1
355 continue
357 if not remain:
358 # all alternatives would violate the conflicts or are uninstallable
359 # => package is not installable
360 stats.choice_presolved += 1
361 return False
363 # The choice is still deferred
364 rebuild.add(frozenset(remain))
366 return True
368 # END _prune_choices
370 while check:
371 if not check_loop(choices, check):
372 verdict = False
373 break
375 if choices:
376 rebuild: set[frozenset["BinaryPackageId"]] = set()
378 if not _prune_choices(rebuild):
379 verdict = False
380 break
382 if not check and rebuild:
383 # We have to "guess" now, which is always fun, but not cheap. We
384 # stop guessing:
385 # - once we run out of choices to make (obviously), OR
386 # - if one of the choices exhaust all but one option
387 if self.resolve_choices(check, musts, never, rebuild):
388 # The recursive call have already updated the
389 # cache so there is not point in doing it again.
390 return True
391 choices = rebuild
393 if verdict:
394 # if t is installable, then so are all packages in musts
395 self._cache_inst.update(musts)
396 stats.solved_installable += 1
397 else:
398 stats.solved_uninstallable += 1
400 return verdict
402 def resolve_choices(
403 self,
404 check: list["BinaryPackageId"],
405 musts: set["BinaryPackageId"],
406 never: set["BinaryPackageId"],
407 choices: set[frozenset["BinaryPackageId"]],
408 ) -> bool:
409 universe = self._universe
410 suite_contents = self._suite_contents
411 stats = self._stats
412 cbroken = self._cache_broken
414 while choices:
415 choice_options = choices.pop()
417 choice = iter(choice_options)
418 last = next(choice) # pick one to go last
419 solved = False
420 for p in choice:
421 musts_copy = musts.copy()
422 never_tmp: set["BinaryPackageId"] = set()
423 choices_tmp: set[frozenset["BinaryPackageId"]] = set()
424 check_tmp = [p]
425 # _check_loop assumes that "musts" is up to date
426 musts_copy.add(p)
427 if not self._check_loop(
428 universe,
429 suite_contents,
430 stats,
431 musts_copy,
432 never_tmp,
433 cbroken,
434 choices_tmp,
435 check_tmp,
436 ):
437 # p cannot be chosen/is broken (unlikely, but ...)
438 continue
440 # Test if we can pick p without any consequences.
441 # - when we can, we avoid a backtrack point.
442 if never_tmp <= never and choices_tmp <= choices:
443 # we can pick p without picking up new conflicts
444 # or unresolved choices. Therefore we commit to
445 # using p.
446 musts.update(musts_copy)
447 stats.choice_resolved_without_restore_point += 1
448 solved = True
449 break
451 if not musts.isdisjoint(never_tmp):
452 # If we pick p, we will definitely end up making
453 # t uninstallable, so p is a no-go.
454 continue
456 stats.backtrace_restore_point_created += 1
457 # We are not sure that p is safe, setup a backtrack
458 # point and recurse.
459 never_tmp |= never
460 choices_tmp |= choices
461 if self._check_inst(p, musts_copy, never_tmp, choices_tmp):
462 # Success, p was a valid choice and made it all
463 # installable
464 return True
466 # If we get here, we failed to find something that
467 # would satisfy choice (without breaking the
468 # installability of t). This means p cannot be used
469 # to satisfy the dependencies, so pretend to conflict
470 # with it - hopefully it will reduce future choices.
471 never.add(p)
472 stats.backtrace_restore_point_used += 1
474 if not solved:
475 # Optimization for the last case; avoid the recursive call
476 # and just assume the last will lead to a solution. If it
477 # doesn't there is no solution and if it does, we don't
478 # have to back-track anyway.
479 check.append(last)
480 musts.add(last)
481 stats.backtrace_last_option += 1
482 return False
483 return False
485 def _check_loop(
486 self,
487 universe: "BinaryPackageUniverse",
488 suite_contents: set["BinaryPackageId"],
489 stats: "InstallabilityStats",
490 musts: set["BinaryPackageId"],
491 never: set["BinaryPackageId"],
492 cbroken: set["BinaryPackageId"],
493 choices: set[frozenset["BinaryPackageId"]],
494 check: list["BinaryPackageId"],
495 len: Callable[[Sized], int] = len,
496 frozenset: Callable[..., Any] = frozenset,
497 ) -> bool:
498 """Finds all guaranteed dependencies via "check".
500 If it returns False, t is not installable. If it returns True
501 then "check" is exhausted. If "choices" are empty and this
502 returns True, then t is installable.
503 """
504 # Local variables for faster access...
505 not_satisfied: partial[filter["BinaryPackageId"]] = partial(
506 filter, musts.isdisjoint
507 )
509 # While we have guaranteed dependencies (in check), examine all
510 # of them.
511 for cur in iter_except(check.pop, IndexError):
512 relations = universe.relations_of(cur)
514 if relations.negative_dependencies:
515 # Conflicts?
516 if cur in never:
517 # cur adds a (reverse) conflict, so check if cur
518 # is in never.
519 #
520 # - there is a window where two conflicting
521 # packages can be in check. Example "A" depends
522 # on "B" and "C". If "B" conflicts with "C",
523 # then both "B" and "C" could end in "check".
524 return False
525 # We must install cur for the package to be installable,
526 # so "obviously" we can never choose any of its conflicts
527 never.update(relations.negative_dependencies & suite_contents)
529 # depgroup can be satisfied by picking something that is
530 # already in musts - lets pick that (again). :)
531 for depgroup in cast(
532 set[_frozenset["BinaryPackageId"]],
533 not_satisfied(relations.dependencies),
534 ):
535 # Of all the packages listed in the relation remove those that
536 # are either:
537 # - not in the suite
538 # - known to be broken (by cache)
539 # - in never
540 candidates = (depgroup & suite_contents) - never
542 if not candidates:
543 # We got no candidates to satisfy it - this
544 # package cannot be installed with the current
545 # (version of the) suite
546 if cur not in cbroken and depgroup.isdisjoint(never):
547 # cur's dependency cannot be satisfied even if never was empty.
548 # This means that cur itself is broken (as well).
549 cbroken.add(cur)
550 suite_contents.remove(cur)
551 return False
552 if len(candidates) == 1:
553 # only one possible solution to this choice and we
554 # haven't seen it before
555 check.extend(candidates)
556 musts.update(candidates)
557 else:
558 possible_eqv = set(
559 x for x in candidates if x in universe.equivalent_packages
560 )
561 if len(possible_eqv) > 1:
562 # Exploit equivalency to reduce the number of
563 # candidates if possible. Basically, this
564 # code maps "similar" candidates into a single
565 # candidate that will give a identical result
566 # to any other candidate it eliminates.
567 #
568 # See InstallabilityTesterBuilder's
569 # _build_eqv_packages_table method for more
570 # information on how this works.
571 new_cand = set(x for x in candidates if x not in possible_eqv)
572 stats.eqv_table_times_used += 1
574 for chosen in iter_except(possible_eqv.pop, KeyError):
575 new_cand.add(chosen)
576 possible_eqv -= universe.packages_equivalent_to(chosen)
577 stats.eqv_table_total_number_of_alternatives_eliminated += len(
578 candidates
579 ) - len(new_cand)
580 if len(new_cand) == 1:
581 check.extend(new_cand)
582 musts.update(new_cand)
583 stats.eqv_table_reduced_to_one += 1
584 continue
585 elif len(candidates) == len(new_cand):
586 stats.eqv_table_reduced_by_zero += 1
588 candidates = frozenset(new_cand)
589 else:
590 # Candidates have to be a frozenset to be added to choices
591 candidates = frozenset(candidates)
592 # defer this choice till later
593 choices.add(candidates)
594 return True
596 def _get_min_pseudo_ess_set(self, arch: str) -> tuple[
597 frozenset["BinaryPackageId"],
598 frozenset["BinaryPackageId"],
599 frozenset[frozenset["BinaryPackageId"]],
600 ]:
601 if arch not in self._cache_ess: 601 ↛ 662line 601 didn't jump to line 662, because the condition on line 601 was never false
602 # The minimal essential set cache is not present -
603 # compute it now.
604 suite_contents = self._suite_contents
605 cbroken = self._cache_broken
606 universe = self._universe
607 stats = self._stats
609 ess_base = [
610 x
611 for x in self._universe.essential_packages
612 if x.architecture == arch and x in suite_contents
613 ]
614 start = set(ess_base)
615 ess_never: set["BinaryPackageId"] = set()
616 ess_choices: set[frozenset["BinaryPackageId"]] = set()
617 not_satisfied: partial[filter["BinaryPackageId"]] = partial(
618 filter, start.isdisjoint
619 )
621 while ess_base:
622 self._check_loop(
623 universe,
624 suite_contents,
625 stats,
626 start,
627 ess_never,
628 cbroken,
629 ess_choices,
630 ess_base,
631 )
632 if ess_choices:
633 # Try to break choices where possible
634 nchoice = set()
635 for choice in cast(
636 set[frozenset["BinaryPackageId"]], not_satisfied(ess_choices)
637 ):
638 b = False
639 for c in choice:
640 relations = universe.relations_of(c)
641 if (
642 relations.negative_dependencies <= ess_never
643 and not any(not_satisfied(relations.dependencies))
644 ):
645 ess_base.append(c)
646 b = True
647 break
648 if not b:
649 nchoice.add(choice)
650 ess_choices = nchoice
651 else:
652 break
654 for x in start:
655 ess_never.update(universe.negative_dependencies_of(x))
656 self._cache_ess[arch] = (
657 frozenset(start),
658 frozenset(ess_never),
659 frozenset(ess_choices),
660 )
662 return self._cache_ess[arch]
664 def compute_stats(self) -> dict[str, "ArchStats"]:
665 universe = self._universe
666 graph_stats: dict[str, ArchStats] = defaultdict(ArchStats)
667 seen_eqv: dict[str, set["BinaryPackageId"]] = defaultdict(set)
669 for pkg in universe:
670 (pkg_name, pkg_version, pkg_arch) = pkg
671 relations = universe.relations_of(pkg)
672 arch_stats = graph_stats[pkg_arch]
674 arch_stats.nodes += 1
676 if pkg in universe.equivalent_packages and pkg not in seen_eqv[pkg_arch]:
677 eqv = [
678 e
679 for e in universe.packages_equivalent_to(pkg)
680 if e.architecture == pkg_arch
681 ]
682 arch_stats.eqv_nodes += len(eqv)
684 arch_stats.add_dep_edges(relations.dependencies)
685 arch_stats.add_con_edges(relations.negative_dependencies)
687 for stat in graph_stats.values():
688 stat.compute_all()
690 return graph_stats
693class InstallabilityStats(object):
694 def __init__(self) -> None:
695 self.cache_hits = 0
696 self.cache_misses = 0
697 self.cache_drops = 0
698 self.backtrace_restore_point_created = 0
699 self.backtrace_restore_point_used = 0
700 self.backtrace_last_option = 0
701 self.choice_presolved = 0
702 self.choice_resolved_without_restore_point = 0
703 self.is_installable_calls = 0
704 self.solved_installable = 0
705 self.solved_uninstallable = 0
706 self.conflicts_essential = 0
707 self.eqv_table_times_used = 0
708 self.eqv_table_reduced_to_one = 0
709 self.eqv_table_reduced_by_zero = 0
710 self.eqv_table_total_number_of_alternatives_eliminated = 0
712 def stats(self) -> list[str]:
713 formats = [
714 "Requests - is_installable: {is_installable_calls}",
715 "Cache - hits: {cache_hits}, misses: {cache_misses}, drops: {cache_drops}",
716 "Choices - pre-solved: {choice_presolved}, No RP: {choice_resolved_without_restore_point}",
717 "Backtrace - RP created: {backtrace_restore_point_created}, RP used: {backtrace_restore_point_used}, reached last option: {backtrace_last_option}", # nopep8
718 "Solved - installable: {solved_installable}, uninstallable: {solved_uninstallable}, conflicts essential: {conflicts_essential}", # nopep8
719 "Eqv - times used: {eqv_table_times_used}, perfect reductions: {eqv_table_reduced_to_one}, failed reductions: {eqv_table_reduced_by_zero}, total no. of alternatives pruned: {eqv_table_total_number_of_alternatives_eliminated}", # nopep8
720 ]
721 return [x.format(**self.__dict__) for x in formats]
724class ArchStats(object):
725 def __init__(self) -> None:
726 self.nodes = 0
727 self.eqv_nodes = 0
728 self.dep_edges: list[frozenset[frozenset["BinaryPackageId"]]] = []
729 self.con_edges: list[frozenset["BinaryPackageId"]] = []
730 self.stats: dict[str, dict[str, Union[int, float]]] = defaultdict(
731 lambda: defaultdict(int)
732 )
734 def stat(self, statname: str) -> dict[str, Union[int, float]]:
735 return self.stats[statname]
737 def stat_summary(self) -> list[str]:
738 text = []
739 values: Union[list[Any], tuple[Any, ...]]
740 for statname in [
741 "nodes",
742 "dependency-clauses",
743 "dependency-clause-alternatives",
744 "negative-dependency-clauses",
745 ]:
746 stat = self.stats[statname]
747 if statname != "nodes":
748 format_str = "%s, max: %d, min: %d, median: %d, average: %f (%d/%d)"
749 values = [
750 statname,
751 stat["max"],
752 stat["min"],
753 stat["median"],
754 stat["average"],
755 stat["sum"],
756 stat["size"],
757 ]
758 if "average-per-node" in stat:
759 format_str += ", average-per-node: %f"
760 values.append(stat["average-per-node"])
761 else:
762 format_str = "nodes: %d, eqv-nodes: %d"
763 values = (self.nodes, self.eqv_nodes)
764 text.append(format_str % tuple(values))
765 return text
767 def add_dep_edges(self, edges: frozenset[frozenset["BinaryPackageId"]]) -> None:
768 self.dep_edges.append(edges)
770 def add_con_edges(self, edges: frozenset["BinaryPackageId"]) -> None:
771 self.con_edges.append(edges)
773 def _list_stats(
774 self, stat_name: str, sorted_list: list[int], average_per_node: bool = False
775 ) -> None:
776 if sorted_list:
777 stats = self.stats[stat_name]
778 stats["max"] = sorted_list[-1]
779 stats["min"] = sorted_list[0]
780 stats["sum"] = sum(sorted_list)
781 stats["size"] = len(sorted_list)
782 stats["average"] = float(stats["sum"]) / len(sorted_list)
783 stats["median"] = sorted_list[len(sorted_list) // 2]
784 if average_per_node:
785 stats["average-per-node"] = float(stats["sum"]) / self.nodes
787 def compute_all(self) -> None:
788 dep_edges = self.dep_edges
789 con_edges = self.con_edges
790 sorted_no_dep_edges = sorted(len(x) for x in dep_edges)
791 sorted_size_dep_edges = sorted(len(x) for x in chain.from_iterable(dep_edges))
792 sorted_no_con_edges = sorted(len(x) for x in con_edges)
793 self._list_stats("dependency-clauses", sorted_no_dep_edges)
794 self._list_stats(
795 "dependency-clause-alternatives",
796 sorted_size_dep_edges,
797 average_per_node=True,
798 )
799 self._list_stats("negative-dependency-clauses", sorted_no_con_edges)