Coverage for britney2/installability/tester.py: 98%
340 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-06-17 09:00 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-06-17 09:00 +0000
1# Copyright (C) 2012 Niels Thykier <niels@thykier.net>
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
13import logging
14from collections import defaultdict
15from collections.abc import Iterable, Iterator, MutableSet
16from functools import partial
17from itertools import chain, filterfalse
18from typing import (
19 TYPE_CHECKING,
20 Any,
21 Literal,
22 Optional,
23 cast,
24)
26from britney2.utils import add_transitive_dependencies_flatten, iter_except
28if TYPE_CHECKING: 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true
29 from .. import BinaryPackageId
30 from .universe import BinaryPackageUniverse
33class InstallabilityTester:
34 def __init__(
35 self, universe: "BinaryPackageUniverse", suite_contents: set["BinaryPackageId"]
36 ) -> None:
37 """Create a new installability tester
39 suite_contents is a (mutable) set of package ids that determines
40 which of the packages in universe are currently in the suite.
42 Package id: (pkg_name, pkg_version, pkg_arch)
43 - NB: arch:all packages are "re-mapped" to given architecture.
44 (simplifies caches and dependency checking)
45 """
47 self._universe = universe
48 # FIXME: Move this field to TargetSuite
49 self._suite_contents = suite_contents
50 self._stats = InstallabilityStats()
51 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
52 self.logger = logging.getLogger(logger_name)
54 # Cache of packages known to be broken - we deliberately do not
55 # include "broken" in it.
56 self._cache_broken: set["BinaryPackageId"] = set()
57 # Cache of packages known to be installable
58 self._cache_inst: set["BinaryPackageId"] = set()
59 # Per "arch" cache of the "minimal" (possibly incomplete)
60 # pseudo-essential set. This includes all the packages that
61 # are essential and packages that will always follow.
62 #
63 # It may not be a complete essential set, since alternatives
64 # are not always resolved. Noticeably cases like "awk" may be
65 # left out (since it could be either gawk, mawk or
66 # original-awk) unless something in this sets depends strictly
67 # on one of them
68 self._cache_ess: dict[
69 str,
70 tuple[
71 frozenset["BinaryPackageId"],
72 frozenset["BinaryPackageId"],
73 frozenset[frozenset["BinaryPackageId"]],
74 ],
75 ] = {}
77 essential_w_transitive_deps: MutableSet["BinaryPackageId"] = set(
78 universe.essential_packages
79 )
80 add_transitive_dependencies_flatten(universe, essential_w_transitive_deps)
81 self._cache_essential_transitive_dependencies = essential_w_transitive_deps
83 def compute_installability(self) -> None:
84 """Computes the installability of all the packages in the suite
86 This method computes the installability of all packages in
87 the suite and caches the result. This has the advantage of
88 making "is_installable" queries very fast for all packages
89 in the suite.
90 """
92 universe = self._universe
93 check_inst = self._check_inst
94 cbroken = self._cache_broken
95 cache_inst = self._cache_inst
96 suite_contents = self._suite_contents
97 tcopy = [x for x in suite_contents]
98 for t in filterfalse(cache_inst.__contains__, tcopy):
99 if t in cbroken: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true
100 continue
101 res = check_inst(t)
102 if t in universe.equivalent_packages:
103 eqv = (
104 x for x in universe.packages_equivalent_to(t) if x in suite_contents
105 )
106 if res:
107 cache_inst.update(eqv)
108 else:
109 eqv_set = frozenset(eqv)
110 suite_contents -= eqv_set
111 cbroken |= eqv_set
113 @property
114 def stats(self) -> "InstallabilityStats":
115 return self._stats
117 def any_of_these_are_in_the_suite(self, pkgs: Iterable["BinaryPackageId"]) -> bool:
118 """Test if at least one package of a given set is in the suite
120 :param pkgs: A set of package ids (as defined in the constructor)
121 :return: True if any of the packages in pkgs are currently in the suite
122 """
123 return not self._suite_contents.isdisjoint(pkgs)
125 def is_pkg_in_the_suite(self, pkg_id: "BinaryPackageId") -> bool:
126 """Test if the package of is in the suite
128 :param pkg_id: A package id (as defined in the constructor)
129 :return: True if the pkg is currently in the suite
130 """
131 return pkg_id in self._suite_contents
133 def which_of_these_are_in_the_suite(
134 self, pkgs: Iterable["BinaryPackageId"]
135 ) -> Iterator["BinaryPackageId"]:
136 """Iterate over all packages that are in the suite
138 :param pkgs: An iterable of package ids
139 :return: An iterable of package ids that are in the suite
140 """
141 yield from (x for x in pkgs if x in self._suite_contents)
143 def add_binary(self, pkg_id: "BinaryPackageId") -> Literal[True]:
144 """Add a binary package to the suite
146 If the package is not known, this method will throw an
147 KeyError.
148 """
150 if pkg_id not in self._universe: # pragma: no cover
151 raise KeyError(str(pkg_id))
153 if pkg_id in self._universe.broken_packages:
154 self._suite_contents.add(pkg_id)
155 elif pkg_id not in self._suite_contents: 155 ↛ 172line 155 didn't jump to line 172 because the condition on line 155 was always true
156 self._suite_contents.add(pkg_id)
157 if self._cache_inst:
158 self._stats.cache_drops += 1
159 self._cache_inst = set()
160 if self._cache_broken:
161 # Re-add broken packages as some of them may now be installable
162 self._suite_contents |= self._cache_broken
163 self._cache_broken = set()
164 if (
165 pkg_id in self._cache_essential_transitive_dependencies
166 and pkg_id.architecture in self._cache_ess
167 ):
168 # Adds new possibly pseudo-essential => "pseudo-essential" set needs to be
169 # recomputed
170 del self._cache_ess[pkg_id.architecture]
172 return True
174 def remove_binary(self, pkg_id: "BinaryPackageId") -> Literal[True]:
175 """Remove a binary from the suite
177 :param pkg_id: The id of the package
178 :raises KeyError: if the package is not known
179 """
181 if pkg_id not in self._universe: # pragma: no cover
182 raise KeyError(str(pkg_id))
184 self._cache_broken.discard(pkg_id)
186 if pkg_id in self._suite_contents:
187 self._suite_contents.remove(pkg_id)
188 if pkg_id.architecture in self._cache_ess:
189 (start, ess_never, ess_choices) = self._cache_ess[pkg_id.architecture]
190 if pkg_id in start or any(pkg_id in choices for choices in ess_choices):
191 # Removes a package from the "pseudo-essential set"
192 del self._cache_ess[pkg_id.architecture]
194 if not self._universe.reverse_dependencies_of(pkg_id):
195 # no reverse relations - safe
196 return True
197 if (
198 pkg_id not in self._universe.broken_packages
199 and pkg_id in self._cache_inst
200 ):
201 # It is in our cache (and not guaranteed to be broken) - throw out the cache
202 self._cache_inst = set()
203 self._stats.cache_drops += 1
205 return True
207 def is_installable(self, pkg_id: "BinaryPackageId") -> bool:
208 """Test if a package is installable in this package set
210 The package is assumed to be in the suite and only packages in
211 the suite can be used to satisfy relations.
213 :param pkg_id: The id of the package
214 Returns True iff the package is installable.
215 Returns False otherwise.
216 """
218 self._stats.is_installable_calls += 1
220 if pkg_id not in self._universe: # pragma: no cover
221 raise KeyError(str(pkg_id))
223 if (
224 pkg_id not in self._suite_contents
225 or pkg_id in self._universe.broken_packages
226 ):
227 self._stats.cache_hits += 1
228 return False
230 if pkg_id in self._cache_inst:
231 self._stats.cache_hits += 1
232 return True
234 self._stats.cache_misses += 1
235 return self._check_inst(pkg_id)
237 def _check_inst(
238 self,
239 t: "BinaryPackageId",
240 musts: set["BinaryPackageId"] | None = None,
241 never: set["BinaryPackageId"] | None = None,
242 choices: set[frozenset["BinaryPackageId"]] | None = None,
243 ) -> bool:
244 # See the explanation of musts, never and choices below.
245 stats = self._stats
246 universe = self._universe
247 suite_contents = self._suite_contents
248 cbroken = self._cache_broken
250 # Our installability verdict - start with "yes" and change if
251 # prove otherwise.
252 verdict = True
254 # set of packages that must be installed with this package
255 if musts is None:
256 musts = set()
257 musts.add(t)
258 # set of packages we can *never* choose (e.g. due to conflicts)
259 if never is None:
260 never = set()
261 # set of relations were we have a choice, but where we have not
262 # committed ourselves yet. Hopefully some choices may be taken
263 # for us (if one of the alternatives appear in "musts")
264 if choices is None:
265 choices = set()
267 # The subset of musts we haven't checked yet.
268 check = [t]
270 if len(musts) == 1:
271 # Include the essential packages in the suite as a starting point.
272 if t.architecture not in self._cache_ess:
273 # The minimal essential set cache is not present -
274 # compute it now.
275 (start, ess_never, ess_choices) = self._get_min_pseudo_ess_set(
276 t.architecture
277 )
278 else:
279 (start, ess_never, ess_choices) = self._cache_ess[t.architecture]
281 if t in ess_never:
282 # t conflicts with something in the essential set or the essential
283 # set conflicts with t - either way, t is f***ed
284 cbroken.add(t)
285 suite_contents.remove(t)
286 stats.conflicts_essential += 1
287 return False
288 musts.update(start)
289 never.update(ess_never)
290 choices.update(ess_choices)
292 # curry check_loop
293 check_loop = partial(
294 self._check_loop, universe, suite_contents, stats, musts, never, cbroken
295 )
297 # Useful things to remember:
298 #
299 # * musts and never are disjointed at all times
300 # - if not, t cannot be installable. Either t, or one of
301 # its dependencies conflict with t or one of its (other)
302 # dependencies.
303 #
304 # * choices should generally be avoided as much as possible.
305 # - picking a bad choice requires backtracking
306 # - sometimes musts/never will eventually "solve" the choice.
307 #
308 # * check never includes choices (these are always in choices)
309 #
310 # * A package is installable if never and musts are disjointed
311 # and both check and choices are empty.
312 # - exception: resolve_choices may determine the installability
313 # of t via recursion (calls _check_inst). In this case
314 # check and choices are not (always) empty.
316 def _prune_choices(rebuild: set[frozenset["BinaryPackageId"]]) -> bool:
317 """Picks a choice from choices and updates rebuild.
319 Prunes the choices and updates "rebuild" to reflect the
320 pruned choices.
322 Returns True if t is installable (determined via recursion).
323 Returns False if a choice was picked and added to check.
324 Returns None if t is uninstallable (no choice can be picked).
326 NB: If this returns False, choices should be replaced by
327 rebuild.
328 """
330 assert musts is not None
331 assert choices is not None
332 assert never is not None
333 # We already satisfied/chosen at least one of the literals
334 # in the choice, so the choice is gone
335 for choice in filter(musts.isdisjoint, choices):
336 # cbroken is needed here because (in theory) it could
337 # have changed since the choice was discovered and it
338 # is smaller than suite_contents (so presumably faster)
339 remain = choice - never - cbroken
341 if len(remain) == 1:
342 # the choice was reduced to one package we haven't checked - check that
343 check.extend(remain)
344 musts.update(remain)
345 stats.choice_presolved += 1
346 continue
348 if not remain:
349 # all alternatives would violate the conflicts or are uninstallable
350 # => package is not installable
351 stats.choice_presolved += 1
352 return False
354 # The choice is still deferred
355 rebuild.add(frozenset(remain))
357 return True
359 # END _prune_choices
361 while check:
362 if not check_loop(choices, check):
363 verdict = False
364 break
366 if choices:
367 rebuild: set[frozenset["BinaryPackageId"]] = set()
369 if not _prune_choices(rebuild):
370 verdict = False
371 break
373 if not check and rebuild:
374 # We have to "guess" now, which is always fun, but not cheap. We
375 # stop guessing:
376 # - once we run out of choices to make (obviously), OR
377 # - if one of the choices exhaust all but one option
378 if self.resolve_choices(check, musts, never, rebuild):
379 # The recursive call have already updated the
380 # cache so there is not point in doing it again.
381 return True
382 choices = rebuild
384 if verdict:
385 # if t is installable, then so are all packages in musts
386 self._cache_inst.update(musts)
387 stats.solved_installable += 1
388 else:
389 stats.solved_uninstallable += 1
391 return verdict
393 def resolve_choices(
394 self,
395 check: list["BinaryPackageId"],
396 musts: set["BinaryPackageId"],
397 never: set["BinaryPackageId"],
398 choices: set[frozenset["BinaryPackageId"]],
399 ) -> bool:
400 universe = self._universe
401 suite_contents = self._suite_contents
402 stats = self._stats
403 cbroken = self._cache_broken
405 while choices:
406 choice_options = choices.pop()
408 choice = iter(choice_options)
409 last = next(choice) # pick one to go last
410 solved = False
411 for p in choice:
412 musts_copy = musts.copy()
413 never_tmp: set["BinaryPackageId"] = set()
414 choices_tmp: set[frozenset["BinaryPackageId"]] = set()
415 check_tmp = [p]
416 # _check_loop assumes that "musts" is up to date
417 musts_copy.add(p)
418 if not self._check_loop(
419 universe,
420 suite_contents,
421 stats,
422 musts_copy,
423 never_tmp,
424 cbroken,
425 choices_tmp,
426 check_tmp,
427 ):
428 # p cannot be chosen/is broken (unlikely, but ...)
429 continue
431 # Test if we can pick p without any consequences.
432 # - when we can, we avoid a backtrack point.
433 if never_tmp <= never and choices_tmp <= choices:
434 # we can pick p without picking up new conflicts
435 # or unresolved choices. Therefore we commit to
436 # using p.
437 musts.update(musts_copy)
438 stats.choice_resolved_without_restore_point += 1
439 solved = True
440 break
442 if not musts.isdisjoint(never_tmp):
443 # If we pick p, we will definitely end up making
444 # t uninstallable, so p is a no-go.
445 continue
447 stats.backtrace_restore_point_created += 1
448 # We are not sure that p is safe, setup a backtrack
449 # point and recurse.
450 never_tmp |= never
451 choices_tmp |= choices
452 if self._check_inst(p, musts_copy, never_tmp, choices_tmp):
453 # Success, p was a valid choice and made it all
454 # installable
455 return True
457 # If we get here, we failed to find something that
458 # would satisfy choice (without breaking the
459 # installability of t). This means p cannot be used
460 # to satisfy the dependencies, so pretend to conflict
461 # with it - hopefully it will reduce future choices.
462 never.add(p)
463 stats.backtrace_restore_point_used += 1
465 if not solved:
466 # Optimization for the last case; avoid the recursive call
467 # and just assume the last will lead to a solution. If it
468 # doesn't there is no solution and if it does, we don't
469 # have to back-track anyway.
470 check.append(last)
471 musts.add(last)
472 stats.backtrace_last_option += 1
473 return False
474 return False
476 def _check_loop(
477 self,
478 universe: "BinaryPackageUniverse",
479 suite_contents: set["BinaryPackageId"],
480 stats: "InstallabilityStats",
481 musts: set["BinaryPackageId"],
482 never: set["BinaryPackageId"],
483 cbroken: set["BinaryPackageId"],
484 choices: set[frozenset["BinaryPackageId"]],
485 check: list["BinaryPackageId"],
486 ) -> bool:
487 """Finds all guaranteed dependencies via "check".
489 If it returns False, t is not installable. If it returns True
490 then "check" is exhausted. If "choices" are empty and this
491 returns True, then t is installable.
492 """
493 # Local variables for faster access...
494 not_satisfied: partial[filter["BinaryPackageId"]] = partial(
495 filter, musts.isdisjoint
496 )
498 # While we have guaranteed dependencies (in check), examine all
499 # of them.
500 for cur in iter_except(check.pop, IndexError):
501 relations = universe.relations_of(cur)
503 if relations.negative_dependencies:
504 # Conflicts?
505 if cur in never:
506 # cur adds a (reverse) conflict, so check if cur
507 # is in never.
508 #
509 # - there is a window where two conflicting
510 # packages can be in check. Example "A" depends
511 # on "B" and "C". If "B" conflicts with "C",
512 # then both "B" and "C" could end in "check".
513 return False
514 # We must install cur for the package to be installable,
515 # so "obviously" we can never choose any of its conflicts
516 never.update(relations.negative_dependencies & suite_contents)
518 # depgroup can be satisfied by picking something that is
519 # already in musts - lets pick that (again). :)
520 for depgroup in cast(
521 set[frozenset["BinaryPackageId"]],
522 not_satisfied(relations.dependencies),
523 ):
524 # Of all the packages listed in the relation remove those that
525 # are either:
526 # - not in the suite
527 # - known to be broken (by cache)
528 # - in never
529 candidates = (depgroup & suite_contents) - never
531 if not candidates:
532 # We got no candidates to satisfy it - this
533 # package cannot be installed with the current
534 # (version of the) suite
535 if cur not in cbroken and depgroup.isdisjoint(never):
536 # cur's dependency cannot be satisfied even if never was empty.
537 # This means that cur itself is broken (as well).
538 cbroken.add(cur)
539 suite_contents.remove(cur)
540 return False
541 if len(candidates) == 1:
542 # only one possible solution to this choice and we
543 # haven't seen it before
544 check.extend(candidates)
545 musts.update(candidates)
546 else:
547 possible_eqv = {
548 x for x in candidates if x in universe.equivalent_packages
549 }
550 if len(possible_eqv) > 1:
551 # Exploit equivalency to reduce the number of
552 # candidates if possible. Basically, this
553 # code maps "similar" candidates into a single
554 # candidate that will give a identical result
555 # to any other candidate it eliminates.
556 #
557 # See InstallabilityTesterBuilder's
558 # _build_eqv_packages_table method for more
559 # information on how this works.
560 new_cand = {x for x in candidates if x not in possible_eqv}
561 stats.eqv_table_times_used += 1
563 for chosen in iter_except(possible_eqv.pop, KeyError):
564 new_cand.add(chosen)
565 possible_eqv -= universe.packages_equivalent_to(chosen)
566 stats.eqv_table_total_number_of_alternatives_eliminated += len(
567 candidates
568 ) - len(new_cand)
569 if len(new_cand) == 1:
570 check.extend(new_cand)
571 musts.update(new_cand)
572 stats.eqv_table_reduced_to_one += 1
573 continue
574 elif len(candidates) == len(new_cand):
575 stats.eqv_table_reduced_by_zero += 1
577 candidates = frozenset(new_cand)
578 else:
579 # Candidates have to be a frozenset to be added to choices
580 candidates = frozenset(candidates)
581 # defer this choice till later
582 choices.add(candidates)
583 return True
585 def _get_min_pseudo_ess_set(self, arch: str) -> tuple[
586 frozenset["BinaryPackageId"],
587 frozenset["BinaryPackageId"],
588 frozenset[frozenset["BinaryPackageId"]],
589 ]:
590 if arch not in self._cache_ess: 590 ↛ 651line 590 didn't jump to line 651 because the condition on line 590 was always true
591 # The minimal essential set cache is not present -
592 # compute it now.
593 suite_contents = self._suite_contents
594 cbroken = self._cache_broken
595 universe = self._universe
596 stats = self._stats
598 ess_base = [
599 x
600 for x in self._universe.essential_packages
601 if x.architecture == arch and x in suite_contents
602 ]
603 start = set(ess_base)
604 ess_never: set["BinaryPackageId"] = set()
605 ess_choices: set[frozenset["BinaryPackageId"]] = set()
606 not_satisfied: partial[filter["BinaryPackageId"]] = partial(
607 filter, start.isdisjoint
608 )
610 while ess_base:
611 self._check_loop(
612 universe,
613 suite_contents,
614 stats,
615 start,
616 ess_never,
617 cbroken,
618 ess_choices,
619 ess_base,
620 )
621 if ess_choices:
622 # Try to break choices where possible
623 nchoice = set()
624 for choice in cast(
625 set[frozenset["BinaryPackageId"]], not_satisfied(ess_choices)
626 ):
627 b = False
628 for c in choice:
629 relations = universe.relations_of(c)
630 if (
631 relations.negative_dependencies is None
632 or relations.negative_dependencies <= ess_never
633 ) and not any(not_satisfied(relations.dependencies)):
634 ess_base.append(c)
635 b = True
636 break
637 if not b:
638 nchoice.add(choice)
639 ess_choices = nchoice
640 else:
641 break
643 for x in start:
644 ess_never.update(universe.negative_dependencies_of(x))
645 self._cache_ess[arch] = (
646 frozenset(start),
647 frozenset(ess_never),
648 frozenset(ess_choices),
649 )
651 return self._cache_ess[arch]
653 def compute_stats(self) -> dict[str, "ArchStats"]:
654 universe = self._universe
655 graph_stats: dict[str, ArchStats] = defaultdict(ArchStats)
656 seen_eqv: dict[str, set["BinaryPackageId"]] = defaultdict(set)
658 for pkg in universe:
659 pkg_arch = pkg.architecture
660 relations = universe.relations_of(pkg)
661 arch_stats = graph_stats[pkg_arch]
663 arch_stats.nodes += 1
665 if pkg in universe.equivalent_packages and pkg not in seen_eqv[pkg_arch]:
666 arch_stats.eqv_nodes += sum(
667 1
668 for e in universe.packages_equivalent_to(pkg)
669 if e.architecture == pkg_arch
670 )
672 arch_stats.add_dep_edges(relations.dependencies)
673 if relations.negative_dependencies is not None:
674 arch_stats.add_con_edges(relations.negative_dependencies)
676 for stat in graph_stats.values():
677 stat.compute_all()
679 return graph_stats
682class InstallabilityStats:
683 def __init__(self) -> None:
684 self.cache_hits = 0
685 self.cache_misses = 0
686 self.cache_drops = 0
687 self.backtrace_restore_point_created = 0
688 self.backtrace_restore_point_used = 0
689 self.backtrace_last_option = 0
690 self.choice_presolved = 0
691 self.choice_resolved_without_restore_point = 0
692 self.is_installable_calls = 0
693 self.solved_installable = 0
694 self.solved_uninstallable = 0
695 self.conflicts_essential = 0
696 self.eqv_table_times_used = 0
697 self.eqv_table_reduced_to_one = 0
698 self.eqv_table_reduced_by_zero = 0
699 self.eqv_table_total_number_of_alternatives_eliminated = 0
701 def stats(self) -> list[str]:
702 formats = [
703 "Requests - is_installable: {is_installable_calls}",
704 "Cache - hits: {cache_hits}, misses: {cache_misses}, drops: {cache_drops}",
705 "Choices - pre-solved: {choice_presolved}, No RP: {choice_resolved_without_restore_point}",
706 "Backtrace - RP created: {backtrace_restore_point_created}, RP used: {backtrace_restore_point_used}, reached last option: {backtrace_last_option}", # nopep8
707 "Solved - installable: {solved_installable}, uninstallable: {solved_uninstallable}, conflicts essential: {conflicts_essential}", # nopep8
708 "Eqv - times used: {eqv_table_times_used}, perfect reductions: {eqv_table_reduced_to_one}, failed reductions: {eqv_table_reduced_by_zero}, total no. of alternatives pruned: {eqv_table_total_number_of_alternatives_eliminated}", # nopep8
709 ]
710 return [x.format(**self.__dict__) for x in formats]
713class ArchStats:
714 def __init__(self) -> None:
715 self.nodes = 0
716 self.eqv_nodes = 0
717 self.dep_edges: list[frozenset[frozenset["BinaryPackageId"]]] = []
718 self.con_edges: list[frozenset["BinaryPackageId"]] = []
719 self.stats: dict[str, dict[str, int | float]] = defaultdict(
720 lambda: defaultdict(int)
721 )
723 def stat(self, statname: str) -> dict[str, int | float]:
724 return self.stats[statname]
726 def stat_summary(self) -> list[str]:
727 text = []
728 values: list[Any] | tuple[Any, ...]
729 for statname in [
730 "nodes",
731 "dependency-clauses",
732 "dependency-clause-alternatives",
733 "negative-dependency-clauses",
734 ]:
735 stat = self.stats[statname]
736 if statname != "nodes":
737 format_str = "%s, max: %d, min: %d, median: %d, average: %f (%d/%d)"
738 values = [
739 statname,
740 stat["max"],
741 stat["min"],
742 stat["median"],
743 stat["average"],
744 stat["sum"],
745 stat["size"],
746 ]
747 if "average-per-node" in stat:
748 format_str += ", average-per-node: %f"
749 values.append(stat["average-per-node"])
750 else:
751 format_str = "nodes: %d, eqv-nodes: %d"
752 values = (self.nodes, self.eqv_nodes)
753 text.append(format_str % tuple(values))
754 return text
756 def add_dep_edges(self, edges: frozenset[frozenset["BinaryPackageId"]]) -> None:
757 self.dep_edges.append(edges)
759 def add_con_edges(self, edges: frozenset["BinaryPackageId"]) -> None:
760 self.con_edges.append(edges)
762 def _list_stats(
763 self, stat_name: str, sorted_list: list[int], average_per_node: bool = False
764 ) -> None:
765 if sorted_list:
766 stats = self.stats[stat_name]
767 stats["max"] = sorted_list[-1]
768 stats["min"] = sorted_list[0]
769 stats["sum"] = sum(sorted_list)
770 stats["size"] = len(sorted_list)
771 stats["average"] = float(stats["sum"]) / len(sorted_list)
772 stats["median"] = sorted_list[len(sorted_list) // 2]
773 if average_per_node:
774 stats["average-per-node"] = float(stats["sum"]) / self.nodes
776 def compute_all(self) -> None:
777 dep_edges = self.dep_edges
778 con_edges = self.con_edges
779 sorted_no_dep_edges = sorted(len(x) for x in dep_edges)
780 sorted_size_dep_edges = sorted(len(x) for x in chain.from_iterable(dep_edges))
781 sorted_no_con_edges = sorted(len(x) for x in con_edges)
782 self._list_stats("dependency-clauses", sorted_no_dep_edges)
783 self._list_stats(
784 "dependency-clause-alternatives",
785 sorted_size_dep_edges,
786 average_per_node=True,
787 )
788 self._list_stats("negative-dependency-clauses", sorted_no_con_edges)