Coverage for britney2/hints.py: 91%

138 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-04-18 20:48 +0000

1# -*- coding: utf-8 -*- 

2 

3# Copyright (C) 2013 Adam D. Barratt <adsb@debian.org> 

4 

5# This program is free software; you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation; either version 2 of the License, or 

8# (at your option) any later version. 

9 

10# This program is distributed in the hope that it will be useful, 

11# but WITHOUT ANY WARRANTY; without even the implied warranty of 

12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

13# GNU General Public License for more details. 

14 

15import logging 

16 

17from itertools import chain 

18 

19 

20class MalformedHintException(Exception): 

21 pass 

22 

23 

24class HintCollection(object): 

25 def __init__(self): 

26 self._hints = [] 

27 

28 @property 

29 def is_empty(self): 

30 return not self._hints 

31 

32 def __getitem__(self, type=None): 

33 return self.search(type) 

34 

35 def search(self, type=None, onlyactive=True, package=None, 

36 version=None, architecture=None, suite=None, removal=None): 

37 

38 return [hint for hint in self._hints if 

39 (type is None or type == hint.type) and 

40 (hint.active or not onlyactive) and 

41 (package is None or package == hint.packages[0].package) and 

42 (version is None or version == hint.packages[0].version) and 

43 (architecture is None or architecture == hint.packages[0].architecture) and 

44 (suite is None or suite == hint.packages[0].suite) and 

45 (removal is None or removal == hint.packages[0].is_removal)] 

46 

47 def add_hint(self, hint): 

48 self._hints.append(hint) 

49 

50 

51class Hint(object): 

52 NO_VERSION = ['block', 'block-all', 'block-udeb', 'allow-archall-maintainer-upload', 'allow-uninst'] 

53 

54 def __init__(self, user, hint_type, packages): 

55 self._user = user 

56 self._active = True 

57 self._type = hint_type 

58 self._packages = packages 

59 

60 self.check() 

61 

62 def check(self): 

63 for package in self.packages: 

64 # TODO check if hint is allowed to specify architecture 

65 if self.type in self.__class__.NO_VERSION: 

66 if package.version is not None: 

67 raise MalformedHintException("\"%s\" needs unversioned packages, got \"%s\"" % (self.type, package)) 

68 else: 

69 if package.version is None: 69 ↛ 70line 69 didn't jump to line 70, because the condition on line 69 was never true

70 raise MalformedHintException("\"%s\" needs versioned packages, got \"%s\"" % (self.type, package)) 

71 

72 def set_active(self, active): 

73 self._active = active 

74 

75 def __str__(self): 

76 if self.type in self.__class__.NO_VERSION: 

77 return '%s %s' % (self._type, ' '.join(x.uvname for x in self._packages)) 

78 else: 

79 return '%s %s' % (self._type, ' '.join(x.name for x in self._packages)) 

80 

81 def __eq__(self, other): 

82 if self.type != other.type: 82 ↛ 83line 82 didn't jump to line 83, because the condition on line 82 was never true

83 return False 

84 else: 

85 # we can't use sets, because unversioned items cannot be hashed 

86 return sorted(self.packages) == sorted(other.packages) 

87 

88 @property 

89 def type(self): 

90 return self._type 

91 

92 @property 

93 def packages(self): 

94 return self._packages 

95 

96 @property 

97 def active(self): 

98 return self._active 

99 

100 @property 

101 def user(self): 

102 return self._user 

103 

104 @property 

105 def package(self): 

106 if self.packages: 106 ↛ 110line 106 didn't jump to line 110, because the condition on line 106 was never false

107 assert len(self.packages) == 1, self.packages 

108 return self.packages[0].package 

109 else: 

110 return None 

111 

112 @property 

113 def version(self): 

114 if self.packages: 114 ↛ 118line 114 didn't jump to line 118, because the condition on line 114 was never false

115 assert len(self.packages) == 1, self.packages 

116 return self.packages[0].version 

117 else: 

118 return None 

119 

120 @property 

121 def architecture(self): 

122 if self.packages: 122 ↛ 126line 122 didn't jump to line 126, because the condition on line 122 was never false

123 assert len(self.packages) == 1, self.packages 

124 return self.packages[0].architecture 

125 else: 

126 return None 

127 

128 @property 

129 def suite(self): 

130 if self.packages: 130 ↛ 134line 130 didn't jump to line 134, because the condition on line 130 was never false

131 assert len(self.packages) == 1, self.packages 

132 return self.packages[0].suite 

133 else: 

134 return None 

135 

136 

137def split_into_one_hint_per_package(mi_factory, hints, who, hint_name, *args): 

138 for item in mi_factory.parse_items(*args): 

139 hints.add_hint(Hint(who, hint_name, [item])) 

140 

141 

142def single_hint_taking_list_of_packages(mi_factory, hints, who, hint_type, *args): 

143 hints.add_hint(Hint(who, hint_type, mi_factory.parse_items(*args))) 

144 

145 

146class HintParser(object): 

147 

148 def __init__(self, mi_factory): 

149 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

150 self.logger = logging.getLogger(logger_name) 

151 self.hints = HintCollection() 

152 self.mi_factory = mi_factory 

153 self._hint_table = { 153 ↛ exitline 153 didn't jump to the function exit

154 'remark': (0, lambda *x: None), 

155 

156 # Migration grouping hints 

157 'easy': (2, single_hint_taking_list_of_packages), # Easy needs at least 2 to make sense 

158 'force-hint': (1, single_hint_taking_list_of_packages), 

159 'hint': (1, single_hint_taking_list_of_packages), 

160 

161 # Block / freeze related hints 

162 'block': (1, split_into_one_hint_per_package), 

163 'block-all': (1, split_into_one_hint_per_package), 

164 'block-udeb': (1, split_into_one_hint_per_package), 

165 'unblock': (1, split_into_one_hint_per_package), 

166 'unblock-udeb': (1, split_into_one_hint_per_package), 

167 

168 # Other 

169 'remove': (1, split_into_one_hint_per_package), 

170 'force': (1, split_into_one_hint_per_package), 

171 'allow-uninst': (1, split_into_one_hint_per_package), 

172 'allow-smooth-update': (1, split_into_one_hint_per_package), 

173 } 

174 self._aliases = { 

175 'approve': 'unblock', 

176 } 

177 

178 @property 

179 def registered_hints(self): 

180 """A set of all known hints (and aliases thereof)""" 

181 return set(chain(self._hint_table.keys(), self._aliases.keys())) 

182 

183 def register_hint_type(self, hint_name: str, parser_function, *, min_args=1, aliases=None) -> None: 

184 """Register a new hint that is supported by the parser 

185 

186 This registers a new hint that can be parsed by the hint parser. All hints are single words with a 

187 space-separated list of arguments (on a single line). The hint parser will do some basic processing, 

188 the permission checking and minor validation on the hint before passing it on to the parser function 

189 given. 

190 

191 The parser_function will receive the following arguments: 

192 * A hint collection 

193 * Identifier of the entity providing the hint 

194 * The hint_name (aliases will be mapped to the hint_name) 

195 * Zero or more string arguments for the hint (so the function needs to use *args) 

196 

197 The parser_function will then have to process the arguments and call the hint collection's "add_hint" 

198 as needed. Example implementations include "split_into_one_hint_per_package", which is used by almost 

199 all policy hints. 

200 

201 :param hint_name: The name of the hint 

202 :param parser_function: A function to add the hint 

203 :param min_args: An optional positive integer (non-zero) denoting the number of arguments the hint takes. 

204 :param aliases: An optional iterable of aliases to the hint (use only for backwards compatibility) 

205 """ 

206 if min_args < 1: 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true

207 raise ValueError("min_args must be at least 1") 

208 if hint_name in self._hint_table: 208 ↛ 209line 208 didn't jump to line 209, because the condition on line 208 was never true

209 raise ValueError("The hint type %s is already registered" % hint_name) 

210 if hint_name in self._aliases: 210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true

211 raise ValueError("The hint type %s is already registered as an alias of %s" % ( 

212 hint_name, self._aliases[hint_name])) 

213 self._hint_table[hint_name] = (min_args, parser_function) 

214 if aliases: 

215 for alias in aliases: 

216 self._aliases[alias] = hint_name 

217 

218 def parse_hints(self, who, permitted_hints, filename, lines): 

219 hint_table = self._hint_table 

220 line_no = 0 

221 hints = self.hints 

222 aliases = self._aliases 

223 mi_factory = self.mi_factory 

224 for line in lines: 

225 line = line.strip() 

226 line_no += 1 

227 if line == "" or line.startswith('#'): 

228 continue 

229 ln = line.split() 

230 hint_name = ln[0] 

231 if hint_name in aliases: 

232 hint_name = aliases[hint_name] 

233 ln[0] = hint_name 

234 if hint_name == 'finished': 

235 break 

236 if hint_name not in hint_table: 

237 self.logger.warning("Unknown hint found in %s (line %d): '%s'", filename, line_no, line) 

238 continue 

239 if hint_name not in permitted_hints and 'ALL' not in permitted_hints: 

240 reason = 'The hint is not a part of the permitted hints for ' + who 

241 self.logger.info("Ignoring \"%s\" hint from %s found in %s (line %d): %s", 

242 hint_name, who, filename, line_no, reason) 

243 continue 

244 min_args, hint_parser_impl = hint_table[hint_name] 

245 if len(ln) - 1 < min_args: 

246 self.logger.warning("Malformed hint found in %s (line %d): Needs at least %d argument(s), got %d", 

247 filename, line_no, min_args, len(ln) - 1) 

248 continue 

249 try: 

250 hint_parser_impl(mi_factory, hints, who, *ln) 

251 except MalformedHintException as e: 

252 self.logger.warning("Malformed hint found in %s (line %d): \"%s\"", filename, line_no, e.args[0]) 

253 continue