Coverage for britney2/hints.py: 93%
192 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-10-17 17:32 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-10-17 17:32 +0000
1# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org>
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by
5# the Free Software Foundation; either version 2 of the License, or
6# (at your option) any later version.
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU General Public License for more details.
13import logging
14from collections.abc import Callable, Iterable
15from enum import Enum, unique
16from itertools import chain
17from typing import TYPE_CHECKING, Any, Optional, Protocol, Union
19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true
20 from . import Suite
21 from .migrationitem import MigrationItem, MigrationItemFactory
24class MalformedHintException(Exception):
25 pass
28@unique
29class HintAnnotate(Enum):
30 REQUIRED = 1
31 FORBIDDEN = 2
32 OPTIONAL = 3
35class HintCollection:
36 def __init__(self) -> None:
37 self._hints: list["Hint"] = []
39 @property
40 def is_empty(self) -> bool:
41 return not self._hints
43 def __getitem__(self, type: str | None = None) -> list["Hint"]:
44 return self.search(type)
46 def search(
47 self,
48 type: str | None = None,
49 onlyactive: bool = True,
50 package: str | None = None,
51 version: str | None = None,
52 architecture: str | None = None,
53 suite: Optional["Suite"] = None,
54 removal: bool | None = None,
55 ) -> list["Hint"]:
56 return [
57 hint
58 for hint in self._hints
59 if (type is None or type == hint.type)
60 and (hint.active or not onlyactive)
61 and (package is None or package == hint.packages[0].package)
62 and (
63 version is None
64 or hint.packages[0].version is None
65 or version == hint.packages[0].version
66 )
67 and (
68 architecture is None
69 or hint.packages[0].architecture == "source"
70 or architecture == hint.packages[0].architecture
71 )
72 and (suite is None or suite == hint.packages[0].suite)
73 and (removal is None or removal == hint.packages[0].is_removal)
74 ]
76 def add_hint(self, hint: "Hint") -> None:
77 self._hints.append(hint)
80class PolicyHintParserProto(Protocol):
81 def __call__( 81 ↛ exitline 81 didn't jump to the function exit
82 self,
83 mi_factory: "MigrationItemFactory",
84 hints: HintCollection,
85 who: str,
86 hint_type: "HintType",
87 /,
88 *args: str,
89 ) -> None: ...
92def split_into_one_hint_per_package(
93 mi_factory: "MigrationItemFactory",
94 hints: HintCollection,
95 who: str,
96 hint_type: "HintType",
97 /,
98 *args: str,
99) -> None:
100 for item in mi_factory.parse_items(*args):
101 hints.add_hint(Hint(who, hint_type, [item]))
104def single_hint_taking_list_of_packages(
105 mi_factory: "MigrationItemFactory",
106 hints: HintCollection,
107 who: str,
108 hint_type: "HintType",
109 /,
110 *args: str,
111) -> None:
112 hints.add_hint(Hint(who, hint_type, mi_factory.parse_items(*args)))
115class HintType:
116 def __init__(
117 self,
118 hint_name: str,
119 parser_function: PolicyHintParserProto = split_into_one_hint_per_package,
120 min_args: int = 1,
121 versioned: HintAnnotate = HintAnnotate.REQUIRED,
122 architectured: HintAnnotate = HintAnnotate.FORBIDDEN,
123 ) -> None:
124 assert min_args > 0 or hint_name == "remark", "min_args must be at least 1"
125 self._hint_name = hint_name
126 self._min_args = min_args
127 self._parser_function = parser_function
128 self._versioned = versioned
129 self._architectured = architectured
131 @property
132 def hint_name(self) -> str:
133 return self._hint_name
135 @property
136 def min_args(self) -> int:
137 return self._min_args
139 @property
140 def parser_function(self) -> PolicyHintParserProto:
141 return self._parser_function
143 @property
144 def versioned(self) -> HintAnnotate:
145 return self._versioned
147 @property
148 def architectured(self) -> HintAnnotate:
149 return self._architectured
152class Hint:
153 def __init__(
154 self,
155 user: str,
156 hint_type: HintType,
157 packages: list["MigrationItem"],
158 ) -> None:
159 self._user = user
160 self._active = True
161 self._type = hint_type
162 self._packages = packages
164 self.check()
166 def check(self) -> None:
167 for package in self.packages:
168 if (
169 self._type.versioned == HintAnnotate.FORBIDDEN
170 and package.version is not None
171 ):
172 raise MalformedHintException(
173 f'"{self.type}" needs unversioned packages, got "{package}"'
174 )
175 elif (
176 self._type.versioned == HintAnnotate.REQUIRED
177 and package.version is None
178 ):
179 raise MalformedHintException(
180 f'"{self.type}" needs versioned packages, got "{package}"'
181 )
182 if (
183 self._type.architectured == HintAnnotate.REQUIRED
184 and package.architecture == "source"
185 ):
186 raise MalformedHintException(
187 f'"{self.type}" needs to be architecture specific, got {package}'
188 )
189 elif (
190 self._type.architectured == HintAnnotate.FORBIDDEN
191 and package.architecture != "source"
192 ):
193 raise MalformedHintException(
194 f'"{self.type}" must not be architecture specific, got {package}'
195 )
197 def set_active(self, active: bool) -> None:
198 self._active = active
200 def __str__(self) -> str:
201 if self._type.versioned == HintAnnotate.FORBIDDEN: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 return "{} {}".format(self.type, " ".join(x.uvname for x in self._packages))
203 else:
204 return "{} {}".format(self.type, " ".join(x.name for x in self._packages))
206 def __eq__(self, other: Any) -> bool:
207 if self.type != other.type: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 return False
209 else:
210 # we can't use sets, because unversioned items cannot be hashed
211 return sorted(self.packages) == sorted(other.packages)
213 @property
214 def type(self) -> str:
215 return self._type.hint_name
217 @property
218 def packages(self) -> list["MigrationItem"]:
219 return self._packages
221 @property
222 def active(self) -> bool:
223 return self._active
225 @property
226 def user(self) -> str:
227 return self._user
229 @property
230 def package(self) -> str | None:
231 if self.packages: 231 ↛ 235line 231 didn't jump to line 235 because the condition on line 231 was always true
232 assert len(self.packages) == 1, self.packages
233 return self.packages[0].package
234 else:
235 return None
237 @property
238 def version(self) -> str | None:
239 if self.packages: 239 ↛ 243line 239 didn't jump to line 243 because the condition on line 239 was always true
240 assert len(self.packages) == 1, self.packages
241 return self.packages[0].version
242 else:
243 return None
245 @property
246 def architecture(self) -> str | None:
247 if self.packages: 247 ↛ 251line 247 didn't jump to line 251 because the condition on line 247 was always true
248 assert len(self.packages) == 1, self.packages
249 return self.packages[0].architecture
250 else:
251 return None
253 @property
254 def suite(self) -> Optional["Suite"]:
255 if self.packages: 255 ↛ 259line 255 didn't jump to line 259 because the condition on line 255 was always true
256 assert len(self.packages) == 1, self.packages
257 return self.packages[0].suite
258 else:
259 return None
262class HintParser:
263 def __init__(self, mi_factory: "MigrationItemFactory") -> None:
264 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__))
265 self.logger = logging.getLogger(logger_name)
266 self.hints = HintCollection()
267 self.mi_factory = mi_factory
269 self._aliases: dict[str, str] = {
270 "approve": "unblock",
271 }
273 FORBIDDEN = HintAnnotate.FORBIDDEN
274 OPTIONAL = HintAnnotate.OPTIONAL
275 self._hint_set: set[HintType] = set()
276 # Migration grouping hints
277 self.register_hint_type(HintType("remark", lambda *x: None, 0)) 277 ↛ exitline 277 didn't run the lambda on line 277
278 # Easy needs at least 2 to make sense
279 self.register_hint_type(
280 HintType(
281 "easy", single_hint_taking_list_of_packages, 2, architectured=OPTIONAL
282 )
283 )
284 self.register_hint_type(
285 HintType(
286 "force-hint",
287 single_hint_taking_list_of_packages,
288 architectured=OPTIONAL,
289 )
290 )
291 self.register_hint_type(
292 HintType(
293 "hint", single_hint_taking_list_of_packages, architectured=OPTIONAL
294 )
295 )
296 # Block / freeze related hints
297 self.register_hint_type(HintType("block", versioned=FORBIDDEN))
298 self.register_hint_type(HintType("block-all", versioned=FORBIDDEN))
299 self.register_hint_type(HintType("block-udeb", versioned=FORBIDDEN))
300 self.register_hint_type(HintType("unblock", architectured=OPTIONAL))
301 self.register_hint_type(HintType("unblock-udeb", architectured=OPTIONAL))
302 # Other
303 self.register_hint_type(HintType("remove"))
304 self.register_hint_type(HintType("force", architectured=OPTIONAL))
305 self.register_hint_type(
306 HintType("allow-uninst", versioned=FORBIDDEN, architectured=OPTIONAL)
307 )
308 self.register_hint_type(HintType("allow-smooth-update"))
310 @property
311 def registered_hint_names(self) -> set[str]:
312 """A set of all known hint names (and aliases thereof)"""
313 return set(chain((x.hint_name for x in self._hint_set), self._aliases.keys()))
315 @property
316 def registered_hints(self) -> set[HintType]:
317 """A set of all known hints"""
318 return self._hint_set
320 def register_hint_type(
321 self,
322 hint_type: HintType,
323 *,
324 aliases: Iterable[str] | None = None,
325 ) -> None:
326 """Register a new hint that is supported by the parser
328 This registers a new hint that can be parsed by the hint parser. All hints are single words with a
329 space-separated list of arguments (on a single line). The hint parser will do some basic processing,
330 the permission checking and minor validation on the hint before passing it on to the parser function
331 given.
333 The parser_function will receive the following arguments:
334 * A hint collection
335 * Identifier of the entity providing the hint
336 * The hint_name (aliases will be mapped to the hint_name)
337 * Zero or more string arguments for the hint (so the function needs to use *args)
339 The parser_function will then have to process the arguments and call the hint collection's "add_hint"
340 as needed. Example implementations include "split_into_one_hint_per_package", which is used by almost
341 all policy hints.
343 :param hint_type: The hint
344 :param aliases: An optional iterable of aliases to the hint (use only for backwards compatibility)
345 """
346 hint_name = hint_type.hint_name
347 assert (
348 hint_name not in self._aliases
349 ), f"The hint type {hint_name} is already registered as an alias of {self._aliases[hint_name]}"
350 assert (
351 hint_name not in self.registered_hint_names
352 ), f"The hint type {hint_name} is already registered"
353 self._hint_set.add(hint_type)
354 if aliases:
355 for alias in aliases:
356 self._aliases[alias] = hint_name
358 def parse_hints(
359 self,
360 who: str,
361 permitted_hints: str | list[str],
362 filename: str,
363 lines: Iterable[str],
364 ) -> None:
365 reg_hints = self.registered_hints
366 reg_hint_names = self.registered_hint_names
367 line_no = 0
368 hints = self.hints
369 aliases = self._aliases
370 mi_factory = self.mi_factory
371 for line in lines:
372 line = line.strip()
373 line_no += 1
374 if line == "" or line.startswith("#"):
375 continue
376 ln = line.split()
377 hint_name = ln.pop(0)
378 if hint_name in aliases:
379 hint_name = aliases[hint_name]
380 if hint_name == "finished":
381 break
382 if (
383 hint_name not in reg_hint_names
384 ): # this includes aliases, but those are checked before
385 self.logger.warning(
386 "Unknown hint found in %s (line %d): '%s'", filename, line_no, line
387 )
388 continue
389 if hint_name not in permitted_hints and "ALL" not in permitted_hints:
390 reason = "The hint is not a part of the permitted hints for " + who
391 self.logger.info(
392 'Ignoring "%s" hint from %s found in %s (line %d): %s',
393 hint_name,
394 who,
395 filename,
396 line_no,
397 reason,
398 )
399 continue
400 hint_type = [x for x in reg_hints if x.hint_name == hint_name][0]
401 min_args = hint_type.min_args
402 if len(ln) < min_args:
403 self.logger.warning(
404 "Malformed hint found in %s (line %d): Needs at least %d argument(s), got %d",
405 filename,
406 line_no,
407 min_args,
408 len(ln),
409 )
410 continue
411 try:
412 hint_type.parser_function(mi_factory, hints, who, hint_type, *ln)
413 except MalformedHintException as e:
414 self.logger.warning(
415 'Malformed hint found in %s (line %d): "%s"',
416 filename,
417 line_no,
418 e.args[0],
419 )
420 continue