Coverage for britney2/hints.py: 93%
195 statements
« prev ^ index » next coverage.py v7.6.0, created at 2026-06-17 09:00 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2026-06-17 09:00 +0000
1# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org>
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
13import logging
14from collections.abc import Generator, Iterable
15from dataclasses import dataclass
16from enum import Enum, unique
17from itertools import chain
18from typing import TYPE_CHECKING, Any, Optional, Protocol
20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true
21 from . import Suite
22 from .migrationitem import MigrationItem, MigrationItemFactory
25class MalformedHintException(Exception):
26 pass
29@unique
30class HintAnnotate(Enum):
31 REQUIRED = 1
32 FORBIDDEN = 2
33 OPTIONAL = 3
36class HintCollection:
37 def __init__(self) -> None:
38 self._hints: dict[str, list["Hint"]] = {}
40 @property
41 def is_empty(self) -> bool:
42 return not self._hints
44 def __getitem__(self, type: str) -> list["Hint"]:
45 """Get hints per type"""
46 return self._hints[type] if type in self._hints else []
48 def _search(
49 self,
50 type: str | None = None,
51 *,
52 package: str | None = None,
53 version: str | None = None,
54 architecture: str | None = None,
55 suite: Optional["Suite"] = None,
56 removal: bool | None = None,
57 ) -> Generator["Hint", None, None]:
58 for itype in (type,) if type is not None else self._hints.keys():
59 if itype not in self._hints:
60 continue
62 for hint in self._hints[itype]:
63 if (
64 hint.active
65 and (package is None or package == hint.packages[0].package)
66 and (
67 version is None
68 or hint.packages[0].version is None
69 or version == hint.packages[0].version
70 )
71 and (
72 architecture is None
73 or hint.packages[0].architecture == "source"
74 or architecture == hint.packages[0].architecture
75 )
76 and (suite is None or suite == hint.packages[0].suite)
77 and (removal is None or removal == hint.packages[0].is_removal)
78 ):
79 yield hint
81 def search(
82 self,
83 type: str | None = None,
84 *,
85 package: str | None = None,
86 version: str | None = None,
87 architecture: str | None = None,
88 suite: Optional["Suite"] = None,
89 removal: bool | None = None,
90 ) -> list["Hint"]:
91 return [
92 hint
93 for hint in self._search(
94 type,
95 package=package,
96 version=version,
97 architecture=architecture,
98 suite=suite,
99 removal=removal,
100 )
101 ]
103 def search_first(
104 self,
105 type: str | None = None,
106 *,
107 package: str | None = None,
108 version: str | None = None,
109 architecture: str | None = None,
110 suite: Optional["Suite"] = None,
111 removal: bool | None = None,
112 ) -> "Hint | None":
113 for hint in self._search(
114 type,
115 package=package,
116 version=version,
117 architecture=architecture,
118 suite=suite,
119 removal=removal,
120 ):
121 return hint
122 else:
123 return None
125 def has_hint(
126 self,
127 type: str | None = None,
128 *,
129 package: str | None = None,
130 version: str | None = None,
131 architecture: str | None = None,
132 suite: Optional["Suite"] = None,
133 removal: bool | None = None,
134 ) -> bool:
135 return (
136 self.search_first(
137 type,
138 package=package,
139 version=version,
140 architecture=architecture,
141 suite=suite,
142 removal=removal,
143 )
144 is not None
145 )
147 def add_hint(self, hint: "Hint") -> None:
148 if hint.type not in self._hints:
149 self._hints[hint.type] = [hint]
150 else:
151 self._hints[hint.type].append(hint)
153 def remove_inactive_hints(self) -> None:
154 for hint_type, hints in self._hints.items():
155 self._hints[hint_type] = [hint for hint in hints if hint.active]
158class PolicyHintParserProto(Protocol):
159 def __call__( 159 ↛ exitline 159 didn't jump to the function exit
160 self,
161 mi_factory: "MigrationItemFactory",
162 hints: HintCollection,
163 who: str,
164 hint_type: "HintType",
165 /,
166 *args: str,
167 ) -> None: ...
170def split_into_one_hint_per_package(
171 mi_factory: "MigrationItemFactory",
172 hints: HintCollection,
173 who: str,
174 hint_type: "HintType",
175 /,
176 *args: str,
177) -> None:
178 for item in mi_factory.parse_items(*args):
179 hints.add_hint(Hint(who, hint_type, [item]))
182def single_hint_taking_list_of_packages(
183 mi_factory: "MigrationItemFactory",
184 hints: HintCollection,
185 who: str,
186 hint_type: "HintType",
187 /,
188 *args: str,
189) -> None:
190 hints.add_hint(Hint(who, hint_type, mi_factory.parse_items(*args)))
193@dataclass(frozen=True, slots=True)
194class HintType:
195 hint_name: str
196 parser_function: PolicyHintParserProto = split_into_one_hint_per_package
197 min_args: int = 1
198 versioned: HintAnnotate = HintAnnotate.REQUIRED
199 architectured: HintAnnotate = HintAnnotate.FORBIDDEN
201 def __post_init__(
202 self,
203 ) -> None:
204 assert (
205 self.min_args > 0 or self.hint_name == "remark"
206 ), "min_args must be at least 1"
209class Hint:
210 def __init__(
211 self,
212 user: str,
213 hint_type: HintType,
214 packages: list["MigrationItem"],
215 ) -> None:
216 self._user = user
217 self._active = True
218 self._type = hint_type
219 self._packages = packages
221 self.check()
223 def check(self) -> None:
224 for package in self.packages:
225 if (
226 self._type.versioned is HintAnnotate.FORBIDDEN
227 and package.version is not None
228 ):
229 raise MalformedHintException(
230 f'"{self.type}" needs unversioned packages, got "{package}"'
231 )
232 elif (
233 self._type.versioned is HintAnnotate.REQUIRED
234 and package.version is None
235 ):
236 raise MalformedHintException(
237 f'"{self.type}" needs versioned packages, got "{package}"'
238 )
239 if (
240 self._type.architectured is HintAnnotate.REQUIRED
241 and package.architecture == "source"
242 ):
243 raise MalformedHintException(
244 f'"{self.type}" needs to be architecture specific, got {package}'
245 )
246 elif (
247 self._type.architectured is HintAnnotate.FORBIDDEN
248 and package.architecture != "source"
249 ):
250 raise MalformedHintException(
251 f'"{self.type}" must not be architecture specific, got {package}'
252 )
254 def set_active(self, active: bool) -> None:
255 self._active = active
257 def __str__(self) -> str:
258 if self._type.versioned is HintAnnotate.FORBIDDEN: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true
259 return " ".join(chain((self.type,), (x.uvname for x in self._packages)))
260 else:
261 return " ".join(chain((self.type,), (x.name for x in self._packages)))
263 def __eq__(self, other: Any) -> bool:
264 if self.type != other.type: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true
265 return False
266 else:
267 # we can't use sets, because unversioned items cannot be hashed
268 return sorted(self.packages) == sorted(other.packages)
270 @property
271 def type(self) -> str:
272 return self._type.hint_name
274 @property
275 def packages(self) -> list["MigrationItem"]:
276 return self._packages
278 @property
279 def active(self) -> bool:
280 return self._active
282 @property
283 def user(self) -> str:
284 return self._user
286 @property
287 def package(self) -> str | None:
288 if self.packages: 288 ↛ 292line 288 didn't jump to line 292 because the condition on line 288 was always true
289 assert len(self.packages) == 1, self.packages
290 return self.packages[0].package
291 else:
292 return None
294 @property
295 def version(self) -> str | None:
296 if self.packages: 296 ↛ 300line 296 didn't jump to line 300 because the condition on line 296 was always true
297 assert len(self.packages) == 1, self.packages
298 return self.packages[0].version
299 else:
300 return None
302 @property
303 def architecture(self) -> str | None:
304 if self.packages: 304 ↛ 308line 304 didn't jump to line 308 because the condition on line 304 was always true
305 assert len(self.packages) == 1, self.packages
306 return self.packages[0].architecture
307 else:
308 return None
310 @property
311 def suite(self) -> Optional["Suite"]:
312 if self.packages: 312 ↛ 316line 312 didn't jump to line 316 because the condition on line 312 was always true
313 assert len(self.packages) == 1, self.packages
314 return self.packages[0].suite
315 else:
316 return None
319class HintParser:
320 def __init__(self, mi_factory: "MigrationItemFactory") -> None:
321 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
322 self.logger = logging.getLogger(logger_name)
323 self.hints = HintCollection()
324 self.mi_factory = mi_factory
326 self._aliases: dict[str, str] = {
327 "approve": "unblock",
328 }
330 FORBIDDEN = HintAnnotate.FORBIDDEN
331 OPTIONAL = HintAnnotate.OPTIONAL
332 self._hint_set: set[HintType] = set()
333 # Migration grouping hints
334 self.register_hint_type(HintType("remark", lambda *x: None, 0)) 334 ↛ exitline 334 didn't run the lambda on line 334
335 # Easy needs at least 2 to make sense
336 self.register_hint_type(
337 HintType(
338 "easy", single_hint_taking_list_of_packages, 2, architectured=OPTIONAL
339 )
340 )
341 self.register_hint_type(
342 HintType(
343 "force-hint",
344 single_hint_taking_list_of_packages,
345 architectured=OPTIONAL,
346 )
347 )
348 self.register_hint_type(
349 HintType(
350 "hint", single_hint_taking_list_of_packages, architectured=OPTIONAL
351 )
352 )
353 # Block / freeze related hints
354 self.register_hint_type(HintType("block", versioned=FORBIDDEN))
355 self.register_hint_type(HintType("block-all", versioned=FORBIDDEN))
356 self.register_hint_type(HintType("block-udeb", versioned=FORBIDDEN))
357 self.register_hint_type(HintType("unblock", architectured=OPTIONAL))
358 self.register_hint_type(HintType("unblock-udeb", architectured=OPTIONAL))
359 # Other
360 self.register_hint_type(HintType("remove"))
361 self.register_hint_type(HintType("force", architectured=OPTIONAL))
362 self.register_hint_type(
363 HintType("allow-uninst", versioned=FORBIDDEN, architectured=OPTIONAL)
364 )
365 self.register_hint_type(HintType("allow-smooth-update"))
367 @property
368 def registered_hint_names(self) -> set[str]:
369 """A set of all known hint names (and aliases thereof)"""
370 return set(chain((x.hint_name for x in self._hint_set), self._aliases.keys()))
372 @property
373 def registered_hints(self) -> set[HintType]:
374 """A set of all known hints"""
375 return self._hint_set
377 def register_hint_type(
378 self,
379 hint_type: HintType,
380 *,
381 aliases: Iterable[str] | None = None,
382 ) -> None:
383 """Register a new hint that is supported by the parser
385 This registers a new hint that can be parsed by the hint parser. All hints are single words with a
386 space-separated list of arguments (on a single line). The hint parser will do some basic processing,
387 the permission checking and minor validation on the hint before passing it on to the parser function
388 given.
390 The parser_function will receive the following arguments:
391 * A hint collection
392 * Identifier of the entity providing the hint
393 * The hint_name (aliases will be mapped to the hint_name)
394 * Zero or more string arguments for the hint (so the function needs to use *args)
396 The parser_function will then have to process the arguments and call the hint collection's "add_hint"
397 as needed. Example implementations include "split_into_one_hint_per_package", which is used by almost
398 all policy hints.
400 :param hint_type: The hint
401 :param aliases: An optional iterable of aliases to the hint (use only for backwards compatibility)
402 """
403 hint_name = hint_type.hint_name
404 assert (
405 hint_name not in self._aliases
406 ), f"The hint type {hint_name} is already registered as an alias of {self._aliases[hint_name]}"
407 assert (
408 hint_name not in self.registered_hint_names
409 ), f"The hint type {hint_name} is already registered"
410 self._hint_set.add(hint_type)
411 if aliases:
412 for alias in aliases:
413 self._aliases[alias] = hint_name
415 def parse_hints(
416 self,
417 who: str,
418 permitted_hints: str | list[str],
419 filename: str,
420 lines: Iterable[str],
421 ) -> None:
422 reg_hints = self.registered_hints
423 reg_hint_names = self.registered_hint_names
424 hints = self.hints
425 aliases = self._aliases
426 mi_factory = self.mi_factory
427 for line_no, line in enumerate(lines, 1):
428 line = line.strip()
429 if line == "" or line.startswith("#"):
430 continue
431 ln = line.split()
432 hint_name = ln.pop(0)
433 if hint_name in aliases:
434 hint_name = aliases[hint_name]
435 if hint_name == "finished":
436 break
437 if (
438 hint_name not in reg_hint_names
439 ): # this includes aliases, but those are checked before
440 self.logger.warning(
441 "Unknown hint found in %s (line %d): '%s'", filename, line_no, line
442 )
443 continue
444 if hint_name not in permitted_hints and "ALL" not in permitted_hints:
445 reason = "The hint is not a part of the permitted hints for " + who
446 self.logger.info(
447 'Ignoring "%s" hint from %s found in %s (line %d): %s',
448 hint_name,
449 who,
450 filename,
451 line_no,
452 reason,
453 )
454 continue
455 hint_type = [x for x in reg_hints if x.hint_name == hint_name][0]
456 min_args = hint_type.min_args
457 if len(ln) < min_args:
458 self.logger.warning(
459 "Malformed hint found in %s (line %d): Needs at least %d argument(s), got %d",
460 filename,
461 line_no,
462 min_args,
463 len(ln),
464 )
465 continue
466 try:
467 hint_type.parser_function(mi_factory, hints, who, hint_type, *ln)
468 except MalformedHintException as e:
469 self.logger.warning(
470 'Malformed hint found in %s (line %d): "%s"',
471 filename,
472 line_no,
473 e.args[0],
474 )
475 continue