Coverage for britney2/installability/tester.py: 98%
341 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-29 17:21 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-01-29 17:21 +0000
1# Copyright (C) 2012 Niels Thykier <niels@thykier.net>
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
13import logging
14from builtins import frozenset as _frozenset
15from collections import defaultdict
16from collections.abc import Callable, Iterable, Iterator, MutableSet, Sized
17from functools import partial
18from itertools import chain, filterfalse
19from typing import (
20 TYPE_CHECKING,
21 Any,
22 Literal,
23 Optional,
24 cast,
25)
27from britney2.utils import add_transitive_dependencies_flatten, iter_except
29if TYPE_CHECKING: 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true
30 from .. import BinaryPackageId
31 from .universe import BinaryPackageUniverse
34class InstallabilityTester:
35 def __init__(
36 self, universe: "BinaryPackageUniverse", suite_contents: set["BinaryPackageId"]
37 ) -> None:
38 """Create a new installability tester
40 suite_contents is a (mutable) set of package ids that determines
41 which of the packages in universe are currently in the suite.
43 Package id: (pkg_name, pkg_version, pkg_arch)
44 - NB: arch:all packages are "re-mapped" to given architecture.
45 (simplifies caches and dependency checking)
46 """
48 self._universe = universe
49 # FIXME: Move this field to TargetSuite
50 self._suite_contents = suite_contents
51 self._stats = InstallabilityStats()
52 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
53 self.logger = logging.getLogger(logger_name)
55 # Cache of packages known to be broken - we deliberately do not
56 # include "broken" in it.
57 self._cache_broken: set["BinaryPackageId"] = set()
58 # Cache of packages known to be installable
59 self._cache_inst: set["BinaryPackageId"] = set()
60 # Per "arch" cache of the "minimal" (possibly incomplete)
61 # pseudo-essential set. This includes all the packages that
62 # are essential and packages that will always follow.
63 #
64 # It may not be a complete essential set, since alternatives
65 # are not always resolved. Noticeably cases like "awk" may be
66 # left out (since it could be either gawk, mawk or
67 # original-awk) unless something in this sets depends strictly
68 # on one of them
69 self._cache_ess: dict[
70 str,
71 tuple[
72 frozenset["BinaryPackageId"],
73 frozenset["BinaryPackageId"],
74 frozenset[frozenset["BinaryPackageId"]],
75 ],
76 ] = {}
78 essential_w_transitive_deps: MutableSet["BinaryPackageId"] = set(
79 universe.essential_packages
80 )
81 add_transitive_dependencies_flatten(universe, essential_w_transitive_deps)
82 self._cache_essential_transitive_dependencies = essential_w_transitive_deps
84 def compute_installability(self) -> None:
85 """Computes the installability of all the packages in the suite
87 This method computes the installability of all packages in
88 the suite and caches the result. This has the advantage of
89 making "is_installable" queries very fast for all packages
90 in the suite.
91 """
93 universe = self._universe
94 check_inst = self._check_inst
95 cbroken = self._cache_broken
96 cache_inst = self._cache_inst
97 suite_contents = self._suite_contents
98 tcopy = [x for x in suite_contents]
99 for t in filterfalse(cache_inst.__contains__, tcopy):
100 if t in cbroken: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true
101 continue
102 res = check_inst(t)
103 if t in universe.equivalent_packages:
104 eqv = (
105 x for x in universe.packages_equivalent_to(t) if x in suite_contents
106 )
107 if res:
108 cache_inst.update(eqv)
109 else:
110 eqv_set = frozenset(eqv)
111 suite_contents -= eqv_set
112 cbroken |= eqv_set
114 @property
115 def stats(self) -> "InstallabilityStats":
116 return self._stats
118 def any_of_these_are_in_the_suite(self, pkgs: Iterable["BinaryPackageId"]) -> bool:
119 """Test if at least one package of a given set is in the suite
121 :param pkgs: A set of package ids (as defined in the constructor)
122 :return: True if any of the packages in pkgs are currently in the suite
123 """
124 return not self._suite_contents.isdisjoint(pkgs)
126 def is_pkg_in_the_suite(self, pkg_id: "BinaryPackageId") -> bool:
127 """Test if the package of is in the suite
129 :param pkg_id: A package id (as defined in the constructor)
130 :return: True if the pkg is currently in the suite
131 """
132 return pkg_id in self._suite_contents
134 def which_of_these_are_in_the_suite(
135 self, pkgs: Iterable["BinaryPackageId"]
136 ) -> Iterator["BinaryPackageId"]:
137 """Iterate over all packages that are in the suite
139 :param pkgs: An iterable of package ids
140 :return: An iterable of package ids that are in the suite
141 """
142 yield from (x for x in pkgs if x in self._suite_contents)
144 def add_binary(self, pkg_id: "BinaryPackageId") -> Literal[True]:
145 """Add a binary package to the suite
147 If the package is not known, this method will throw an
148 KeyError.
149 """
151 if pkg_id not in self._universe: # pragma: no cover
152 raise KeyError(str(pkg_id))
154 if pkg_id in self._universe.broken_packages:
155 self._suite_contents.add(pkg_id)
156 elif pkg_id not in self._suite_contents: 156 ↛ 173line 156 didn't jump to line 173 because the condition on line 156 was always true
157 self._suite_contents.add(pkg_id)
158 if self._cache_inst:
159 self._stats.cache_drops += 1
160 self._cache_inst = set()
161 if self._cache_broken:
162 # Re-add broken packages as some of them may now be installable
163 self._suite_contents |= self._cache_broken
164 self._cache_broken = set()
165 if (
166 pkg_id in self._cache_essential_transitive_dependencies
167 and pkg_id.architecture in self._cache_ess
168 ):
169 # Adds new possibly pseudo-essential => "pseudo-essential" set needs to be
170 # recomputed
171 del self._cache_ess[pkg_id.architecture]
173 return True
175 def remove_binary(self, pkg_id: "BinaryPackageId") -> Literal[True]:
176 """Remove a binary from the suite
178 :param pkg_id: The id of the package
179 :raises KeyError: if the package is not known
180 """
182 if pkg_id not in self._universe: # pragma: no cover
183 raise KeyError(str(pkg_id))
185 self._cache_broken.discard(pkg_id)
187 if pkg_id in self._suite_contents:
188 self._suite_contents.remove(pkg_id)
189 if pkg_id.architecture in self._cache_ess:
190 (start, ess_never, ess_choices) = self._cache_ess[pkg_id.architecture]
191 if pkg_id in start or any(
192 [pkg_id in choices for choices in ess_choices]
193 ):
194 # Removes a package from the "pseudo-essential set"
195 del self._cache_ess[pkg_id.architecture]
197 if not self._universe.reverse_dependencies_of(pkg_id):
198 # no reverse relations - safe
199 return True
200 if (
201 pkg_id not in self._universe.broken_packages
202 and pkg_id in self._cache_inst
203 ):
204 # It is in our cache (and not guaranteed to be broken) - throw out the cache
205 self._cache_inst = set()
206 self._stats.cache_drops += 1
208 return True
210 def is_installable(self, pkg_id: "BinaryPackageId") -> bool:
211 """Test if a package is installable in this package set
213 The package is assumed to be in the suite and only packages in
214 the suite can be used to satisfy relations.
216 :param pkg_id: The id of the package
217 Returns True iff the package is installable.
218 Returns False otherwise.
219 """
221 self._stats.is_installable_calls += 1
223 if pkg_id not in self._universe: # pragma: no cover
224 raise KeyError(str(pkg_id))
226 if (
227 pkg_id not in self._suite_contents
228 or pkg_id in self._universe.broken_packages
229 ):
230 self._stats.cache_hits += 1
231 return False
233 if pkg_id in self._cache_inst:
234 self._stats.cache_hits += 1
235 return True
237 self._stats.cache_misses += 1
238 return self._check_inst(pkg_id)
240 def _check_inst(
241 self,
242 t: "BinaryPackageId",
243 musts: set["BinaryPackageId"] | None = None,
244 never: set["BinaryPackageId"] | None = None,
245 choices: set[frozenset["BinaryPackageId"]] | None = None,
246 ) -> bool:
247 # See the explanation of musts, never and choices below.
248 stats = self._stats
249 universe = self._universe
250 suite_contents = self._suite_contents
251 cbroken = self._cache_broken
253 # Our installability verdict - start with "yes" and change if
254 # prove otherwise.
255 verdict = True
257 # set of packages that must be installed with this package
258 if musts is None:
259 musts = set()
260 musts.add(t)
261 # set of packages we can *never* choose (e.g. due to conflicts)
262 if never is None:
263 never = set()
264 # set of relations were we have a choice, but where we have not
265 # committed ourselves yet. Hopefully some choices may be taken
266 # for us (if one of the alternatives appear in "musts")
267 if choices is None:
268 choices = set()
270 # The subset of musts we haven't checked yet.
271 check = [t]
273 if len(musts) == 1:
274 # Include the essential packages in the suite as a starting point.
275 if t.architecture not in self._cache_ess:
276 # The minimal essential set cache is not present -
277 # compute it now.
278 (start, ess_never, ess_choices) = self._get_min_pseudo_ess_set(
279 t.architecture
280 )
281 else:
282 (start, ess_never, ess_choices) = self._cache_ess[t.architecture]
284 if t in ess_never:
285 # t conflicts with something in the essential set or the essential
286 # set conflicts with t - either way, t is f***ed
287 cbroken.add(t)
288 suite_contents.remove(t)
289 stats.conflicts_essential += 1
290 return False
291 musts.update(start)
292 never.update(ess_never)
293 choices.update(ess_choices)
295 # curry check_loop
296 check_loop = partial(
297 self._check_loop, universe, suite_contents, stats, musts, never, cbroken
298 )
300 # Useful things to remember:
301 #
302 # * musts and never are disjointed at all times
303 # - if not, t cannot be installable. Either t, or one of
304 # its dependencies conflict with t or one of its (other)
305 # dependencies.
306 #
307 # * choices should generally be avoided as much as possible.
308 # - picking a bad choice requires backtracking
309 # - sometimes musts/never will eventually "solve" the choice.
310 #
311 # * check never includes choices (these are always in choices)
312 #
313 # * A package is installable if never and musts are disjointed
314 # and both check and choices are empty.
315 # - exception: resolve_choices may determine the installability
316 # of t via recursion (calls _check_inst). In this case
317 # check and choices are not (always) empty.
319 def _prune_choices(
320 rebuild: set[frozenset["BinaryPackageId"]],
321 len: Callable[[Sized], int] = len,
322 ) -> bool:
323 """Picks a choice from choices and updates rebuild.
325 Prunes the choices and updates "rebuild" to reflect the
326 pruned choices.
328 Returns True if t is installable (determined via recursion).
329 Returns False if a choice was picked and added to check.
330 Returns None if t is uninstallable (no choice can be picked).
332 NB: If this returns False, choices should be replaced by
333 rebuild.
334 """
336 assert musts is not None
337 assert choices is not None
338 assert never is not None
339 # We already satisfied/chosen at least one of the literals
340 # in the choice, so the choice is gone
341 for choice in filter(musts.isdisjoint, choices):
342 # cbroken is needed here because (in theory) it could
343 # have changed since the choice was discovered and it
344 # is smaller than suite_contents (so presumably faster)
345 remain = choice - never - cbroken
347 if len(remain) == 1:
348 # the choice was reduced to one package we haven't checked - check that
349 check.extend(remain)
350 musts.update(remain)
351 stats.choice_presolved += 1
352 continue
354 if not remain:
355 # all alternatives would violate the conflicts or are uninstallable
356 # => package is not installable
357 stats.choice_presolved += 1
358 return False
360 # The choice is still deferred
361 rebuild.add(frozenset(remain))
363 return True
365 # END _prune_choices
367 while check:
368 if not check_loop(choices, check):
369 verdict = False
370 break
372 if choices:
373 rebuild: set[frozenset["BinaryPackageId"]] = set()
375 if not _prune_choices(rebuild):
376 verdict = False
377 break
379 if not check and rebuild:
380 # We have to "guess" now, which is always fun, but not cheap. We
381 # stop guessing:
382 # - once we run out of choices to make (obviously), OR
383 # - if one of the choices exhaust all but one option
384 if self.resolve_choices(check, musts, never, rebuild):
385 # The recursive call have already updated the
386 # cache so there is not point in doing it again.
387 return True
388 choices = rebuild
390 if verdict:
391 # if t is installable, then so are all packages in musts
392 self._cache_inst.update(musts)
393 stats.solved_installable += 1
394 else:
395 stats.solved_uninstallable += 1
397 return verdict
399 def resolve_choices(
400 self,
401 check: list["BinaryPackageId"],
402 musts: set["BinaryPackageId"],
403 never: set["BinaryPackageId"],
404 choices: set[frozenset["BinaryPackageId"]],
405 ) -> bool:
406 universe = self._universe
407 suite_contents = self._suite_contents
408 stats = self._stats
409 cbroken = self._cache_broken
411 while choices:
412 choice_options = choices.pop()
414 choice = iter(choice_options)
415 last = next(choice) # pick one to go last
416 solved = False
417 for p in choice:
418 musts_copy = musts.copy()
419 never_tmp: set["BinaryPackageId"] = set()
420 choices_tmp: set[frozenset["BinaryPackageId"]] = set()
421 check_tmp = [p]
422 # _check_loop assumes that "musts" is up to date
423 musts_copy.add(p)
424 if not self._check_loop(
425 universe,
426 suite_contents,
427 stats,
428 musts_copy,
429 never_tmp,
430 cbroken,
431 choices_tmp,
432 check_tmp,
433 ):
434 # p cannot be chosen/is broken (unlikely, but ...)
435 continue
437 # Test if we can pick p without any consequences.
438 # - when we can, we avoid a backtrack point.
439 if never_tmp <= never and choices_tmp <= choices:
440 # we can pick p without picking up new conflicts
441 # or unresolved choices. Therefore we commit to
442 # using p.
443 musts.update(musts_copy)
444 stats.choice_resolved_without_restore_point += 1
445 solved = True
446 break
448 if not musts.isdisjoint(never_tmp):
449 # If we pick p, we will definitely end up making
450 # t uninstallable, so p is a no-go.
451 continue
453 stats.backtrace_restore_point_created += 1
454 # We are not sure that p is safe, setup a backtrack
455 # point and recurse.
456 never_tmp |= never
457 choices_tmp |= choices
458 if self._check_inst(p, musts_copy, never_tmp, choices_tmp):
459 # Success, p was a valid choice and made it all
460 # installable
461 return True
463 # If we get here, we failed to find something that
464 # would satisfy choice (without breaking the
465 # installability of t). This means p cannot be used
466 # to satisfy the dependencies, so pretend to conflict
467 # with it - hopefully it will reduce future choices.
468 never.add(p)
469 stats.backtrace_restore_point_used += 1
471 if not solved:
472 # Optimization for the last case; avoid the recursive call
473 # and just assume the last will lead to a solution. If it
474 # doesn't there is no solution and if it does, we don't
475 # have to back-track anyway.
476 check.append(last)
477 musts.add(last)
478 stats.backtrace_last_option += 1
479 return False
480 return False
482 def _check_loop(
483 self,
484 universe: "BinaryPackageUniverse",
485 suite_contents: set["BinaryPackageId"],
486 stats: "InstallabilityStats",
487 musts: set["BinaryPackageId"],
488 never: set["BinaryPackageId"],
489 cbroken: set["BinaryPackageId"],
490 choices: set[frozenset["BinaryPackageId"]],
491 check: list["BinaryPackageId"],
492 len: Callable[[Sized], int] = len,
493 frozenset: Callable[..., Any] = frozenset,
494 ) -> bool:
495 """Finds all guaranteed dependencies via "check".
497 If it returns False, t is not installable. If it returns True
498 then "check" is exhausted. If "choices" are empty and this
499 returns True, then t is installable.
500 """
501 # Local variables for faster access...
502 not_satisfied: partial[filter["BinaryPackageId"]] = partial(
503 filter, musts.isdisjoint
504 )
506 # While we have guaranteed dependencies (in check), examine all
507 # of them.
508 for cur in iter_except(check.pop, IndexError):
509 relations = universe.relations_of(cur)
511 if relations.negative_dependencies:
512 # Conflicts?
513 if cur in never:
514 # cur adds a (reverse) conflict, so check if cur
515 # is in never.
516 #
517 # - there is a window where two conflicting
518 # packages can be in check. Example "A" depends
519 # on "B" and "C". If "B" conflicts with "C",
520 # then both "B" and "C" could end in "check".
521 return False
522 # We must install cur for the package to be installable,
523 # so "obviously" we can never choose any of its conflicts
524 never.update(relations.negative_dependencies & suite_contents)
526 # depgroup can be satisfied by picking something that is
527 # already in musts - lets pick that (again). :)
528 for depgroup in cast(
529 set[_frozenset["BinaryPackageId"]],
530 not_satisfied(relations.dependencies),
531 ):
532 # Of all the packages listed in the relation remove those that
533 # are either:
534 # - not in the suite
535 # - known to be broken (by cache)
536 # - in never
537 candidates = (depgroup & suite_contents) - never
539 if not candidates:
540 # We got no candidates to satisfy it - this
541 # package cannot be installed with the current
542 # (version of the) suite
543 if cur not in cbroken and depgroup.isdisjoint(never):
544 # cur's dependency cannot be satisfied even if never was empty.
545 # This means that cur itself is broken (as well).
546 cbroken.add(cur)
547 suite_contents.remove(cur)
548 return False
549 if len(candidates) == 1:
550 # only one possible solution to this choice and we
551 # haven't seen it before
552 check.extend(candidates)
553 musts.update(candidates)
554 else:
555 possible_eqv = {
556 x for x in candidates if x in universe.equivalent_packages
557 }
558 if len(possible_eqv) > 1:
559 # Exploit equivalency to reduce the number of
560 # candidates if possible. Basically, this
561 # code maps "similar" candidates into a single
562 # candidate that will give a identical result
563 # to any other candidate it eliminates.
564 #
565 # See InstallabilityTesterBuilder's
566 # _build_eqv_packages_table method for more
567 # information on how this works.
568 new_cand = {x for x in candidates if x not in possible_eqv}
569 stats.eqv_table_times_used += 1
571 for chosen in iter_except(possible_eqv.pop, KeyError):
572 new_cand.add(chosen)
573 possible_eqv -= universe.packages_equivalent_to(chosen)
574 stats.eqv_table_total_number_of_alternatives_eliminated += len(
575 candidates
576 ) - len(new_cand)
577 if len(new_cand) == 1:
578 check.extend(new_cand)
579 musts.update(new_cand)
580 stats.eqv_table_reduced_to_one += 1
581 continue
582 elif len(candidates) == len(new_cand):
583 stats.eqv_table_reduced_by_zero += 1
585 candidates = frozenset(new_cand)
586 else:
587 # Candidates have to be a frozenset to be added to choices
588 candidates = frozenset(candidates)
589 # defer this choice till later
590 choices.add(candidates)
591 return True
593 def _get_min_pseudo_ess_set(self, arch: str) -> tuple[
594 frozenset["BinaryPackageId"],
595 frozenset["BinaryPackageId"],
596 frozenset[frozenset["BinaryPackageId"]],
597 ]:
598 if arch not in self._cache_ess: 598 ↛ 659line 598 didn't jump to line 659 because the condition on line 598 was always true
599 # The minimal essential set cache is not present -
600 # compute it now.
601 suite_contents = self._suite_contents
602 cbroken = self._cache_broken
603 universe = self._universe
604 stats = self._stats
606 ess_base = [
607 x
608 for x in self._universe.essential_packages
609 if x.architecture == arch and x in suite_contents
610 ]
611 start = set(ess_base)
612 ess_never: set["BinaryPackageId"] = set()
613 ess_choices: set[frozenset["BinaryPackageId"]] = set()
614 not_satisfied: partial[filter["BinaryPackageId"]] = partial(
615 filter, start.isdisjoint
616 )
618 while ess_base:
619 self._check_loop(
620 universe,
621 suite_contents,
622 stats,
623 start,
624 ess_never,
625 cbroken,
626 ess_choices,
627 ess_base,
628 )
629 if ess_choices:
630 # Try to break choices where possible
631 nchoice = set()
632 for choice in cast(
633 set[frozenset["BinaryPackageId"]], not_satisfied(ess_choices)
634 ):
635 b = False
636 for c in choice:
637 relations = universe.relations_of(c)
638 if (
639 relations.negative_dependencies <= ess_never
640 and not any(not_satisfied(relations.dependencies))
641 ):
642 ess_base.append(c)
643 b = True
644 break
645 if not b:
646 nchoice.add(choice)
647 ess_choices = nchoice
648 else:
649 break
651 for x in start:
652 ess_never.update(universe.negative_dependencies_of(x))
653 self._cache_ess[arch] = (
654 frozenset(start),
655 frozenset(ess_never),
656 frozenset(ess_choices),
657 )
659 return self._cache_ess[arch]
661 def compute_stats(self) -> dict[str, "ArchStats"]:
662 universe = self._universe
663 graph_stats: dict[str, ArchStats] = defaultdict(ArchStats)
664 seen_eqv: dict[str, set["BinaryPackageId"]] = defaultdict(set)
666 for pkg in universe:
667 (pkg_name, pkg_version, pkg_arch) = pkg
668 relations = universe.relations_of(pkg)
669 arch_stats = graph_stats[pkg_arch]
671 arch_stats.nodes += 1
673 if pkg in universe.equivalent_packages and pkg not in seen_eqv[pkg_arch]:
674 eqv = [
675 e
676 for e in universe.packages_equivalent_to(pkg)
677 if e.architecture == pkg_arch
678 ]
679 arch_stats.eqv_nodes += len(eqv)
681 arch_stats.add_dep_edges(relations.dependencies)
682 arch_stats.add_con_edges(relations.negative_dependencies)
684 for stat in graph_stats.values():
685 stat.compute_all()
687 return graph_stats
690class InstallabilityStats:
691 def __init__(self) -> None:
692 self.cache_hits = 0
693 self.cache_misses = 0
694 self.cache_drops = 0
695 self.backtrace_restore_point_created = 0
696 self.backtrace_restore_point_used = 0
697 self.backtrace_last_option = 0
698 self.choice_presolved = 0
699 self.choice_resolved_without_restore_point = 0
700 self.is_installable_calls = 0
701 self.solved_installable = 0
702 self.solved_uninstallable = 0
703 self.conflicts_essential = 0
704 self.eqv_table_times_used = 0
705 self.eqv_table_reduced_to_one = 0
706 self.eqv_table_reduced_by_zero = 0
707 self.eqv_table_total_number_of_alternatives_eliminated = 0
709 def stats(self) -> list[str]:
710 formats = [
711 "Requests - is_installable: {is_installable_calls}",
712 "Cache - hits: {cache_hits}, misses: {cache_misses}, drops: {cache_drops}",
713 "Choices - pre-solved: {choice_presolved}, No RP: {choice_resolved_without_restore_point}",
714 "Backtrace - RP created: {backtrace_restore_point_created}, RP used: {backtrace_restore_point_used}, reached last option: {backtrace_last_option}", # nopep8
715 "Solved - installable: {solved_installable}, uninstallable: {solved_uninstallable}, conflicts essential: {conflicts_essential}", # nopep8
716 "Eqv - times used: {eqv_table_times_used}, perfect reductions: {eqv_table_reduced_to_one}, failed reductions: {eqv_table_reduced_by_zero}, total no. of alternatives pruned: {eqv_table_total_number_of_alternatives_eliminated}", # nopep8
717 ]
718 return [x.format(**self.__dict__) for x in formats]
721class ArchStats:
722 def __init__(self) -> None:
723 self.nodes = 0
724 self.eqv_nodes = 0
725 self.dep_edges: list[frozenset[frozenset["BinaryPackageId"]]] = []
726 self.con_edges: list[frozenset["BinaryPackageId"]] = []
727 self.stats: dict[str, dict[str, int | float]] = defaultdict(
728 lambda: defaultdict(int)
729 )
731 def stat(self, statname: str) -> dict[str, int | float]:
732 return self.stats[statname]
734 def stat_summary(self) -> list[str]:
735 text = []
736 values: list[Any] | tuple[Any, ...]
737 for statname in [
738 "nodes",
739 "dependency-clauses",
740 "dependency-clause-alternatives",
741 "negative-dependency-clauses",
742 ]:
743 stat = self.stats[statname]
744 if statname != "nodes":
745 format_str = "%s, max: %d, min: %d, median: %d, average: %f (%d/%d)"
746 values = [
747 statname,
748 stat["max"],
749 stat["min"],
750 stat["median"],
751 stat["average"],
752 stat["sum"],
753 stat["size"],
754 ]
755 if "average-per-node" in stat:
756 format_str += ", average-per-node: %f"
757 values.append(stat["average-per-node"])
758 else:
759 format_str = "nodes: %d, eqv-nodes: %d"
760 values = (self.nodes, self.eqv_nodes)
761 text.append(format_str % tuple(values))
762 return text
764 def add_dep_edges(self, edges: frozenset[frozenset["BinaryPackageId"]]) -> None:
765 self.dep_edges.append(edges)
767 def add_con_edges(self, edges: frozenset["BinaryPackageId"]) -> None:
768 self.con_edges.append(edges)
770 def _list_stats(
771 self, stat_name: str, sorted_list: list[int], average_per_node: bool = False
772 ) -> None:
773 if sorted_list:
774 stats = self.stats[stat_name]
775 stats["max"] = sorted_list[-1]
776 stats["min"] = sorted_list[0]
777 stats["sum"] = sum(sorted_list)
778 stats["size"] = len(sorted_list)
779 stats["average"] = float(stats["sum"]) / len(sorted_list)
780 stats["median"] = sorted_list[len(sorted_list) // 2]
781 if average_per_node:
782 stats["average-per-node"] = float(stats["sum"]) / self.nodes
784 def compute_all(self) -> None:
785 dep_edges = self.dep_edges
786 con_edges = self.con_edges
787 sorted_no_dep_edges = sorted(len(x) for x in dep_edges)
788 sorted_size_dep_edges = sorted(len(x) for x in chain.from_iterable(dep_edges))
789 sorted_no_con_edges = sorted(len(x) for x in con_edges)
790 self._list_stats("dependency-clauses", sorted_no_dep_edges)
791 self._list_stats(
792 "dependency-clause-alternatives",
793 sorted_size_dep_edges,
794 average_per_node=True,
795 )
796 self._list_stats("negative-dependency-clauses", sorted_no_con_edges)