Coverage for britney2/__init__.py: 96%

178 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-04-18 20:48 +0000

1import logging 

2from collections import namedtuple 

3from enum import Enum, unique 

4from typing import Iterable, NamedTuple, Optional 

5 

6 

7class DependencyType(Enum): 

8 DEPENDS = ('Depends', 'depends', 'dependency') 

9 # BUILD_DEPENDS includes BUILD_DEPENDS_ARCH 

10 BUILD_DEPENDS = ('Build-Depends(-Arch)', 'build-depends', 'build-dependency') 

11 BUILD_DEPENDS_INDEP = ('Build-Depends-Indep', 'build-depends-indep', 'build-dependency (indep)') 

12 BUILT_USING = ('Built-Using', 'built-using', 'built-using') 

13 # Pseudo dependency where Breaks/Conflicts effectively become a inverted dependency. E.g. 

14 # p Depends on q plus q/2 breaks p/1 implies that p/2 must migrate before q/2 can migrate 

15 # (or they go at the same time). 

16 # - can also happen with version ranges 

17 IMPLICIT_DEPENDENCY = ('Implicit dependency', 'implicit-dependency', 'implicit-dependency') 

18 

19 def __str__(self) -> str: 

20 return self.value[0] 

21 

22 def get_reason(self) -> str: 

23 return self.value[1] 

24 

25 def get_description(self) -> str: 

26 return self.value[2] 

27 

28 

29@unique 

30class SuiteClass(Enum): 

31 

32 TARGET_SUITE = (False, False) 

33 PRIMARY_SOURCE_SUITE = (True, True) 

34 ADDITIONAL_SOURCE_SUITE = (True, False) 

35 

36 @property 

37 def is_source(self) -> bool: 

38 return self.value[0] 

39 

40 @property 

41 def is_target(self) -> bool: 

42 return not self.is_source 

43 

44 @property 

45 def is_primary_source(self) -> bool: 

46 return self is SuiteClass.PRIMARY_SOURCE_SUITE 

47 

48 @property 

49 def is_additional_source(self) -> bool: 

50 return self is SuiteClass.ADDITIONAL_SOURCE_SUITE 

51 

52 

53class Suite(object): 

54 

55 def __init__(self, suite_class: SuiteClass, name: str, path: str, suite_short_name: Optional[str] = None): 

56 self.suite_class = suite_class 

57 self.name = name 

58 self.codename = name 

59 self.path = path 

60 self.suite_short_name = suite_short_name if suite_short_name else '' 

61 self.sources: dict[str, SourcePackage] = {} 

62 self._binaries: dict[str, dict[str, BinaryPackage]] = {} 

63 self.provides_table: dict[str, dict[str, set[tuple[str, str]]]] = {} 

64 self._all_binaries_in_suite: Optional[dict[BinaryPackageId, BinaryPackage]] = None 

65 

66 @property 

67 def excuses_suffix(self) -> str: 

68 return self.suite_short_name 

69 

70 @property 

71 def binaries(self) -> dict[str, dict[str, "BinaryPackage"]]: 

72 # TODO some callers modify this structure, which doesn't invalidate 

73 # the self._all_binaries_in_suite cache 

74 return self._binaries 

75 

76 @binaries.setter 

77 def binaries(self, binaries: dict[str, dict[str, "BinaryPackage"]]) -> None: 

78 self._binaries = binaries 

79 self._all_binaries_in_suite = None 

80 

81 @property 

82 def all_binaries_in_suite(self) -> dict["BinaryPackageId", "BinaryPackage"]: 

83 if not self._all_binaries_in_suite: 

84 self._all_binaries_in_suite = \ 

85 {x.pkg_id: x for a in self._binaries for x in self._binaries[a].values()} 

86 return self._all_binaries_in_suite 

87 

88 def any_of_these_are_in_the_suite(self, pkgs: Iterable["BinaryPackageId"]) -> bool: 

89 """Test if at least one package of a given set is in the suite 

90 

91 :param pkgs: An iterable of BinaryPackageId 

92 :return: True if any of the packages in pkgs are currently in the suite 

93 """ 

94 return not self.all_binaries_in_suite.keys().isdisjoint(pkgs) 

95 

96 def is_pkg_in_the_suite(self, pkg_id: "BinaryPackageId") -> bool: 

97 """Test if the package of is in testing 

98 

99 :param pkg_id: A BinaryPackageId 

100 :return: True if the pkg is currently in the suite 

101 """ 

102 return pkg_id in self.all_binaries_in_suite 

103 

104 def which_of_these_are_in_the_suite(self, pkgs: Iterable["BinaryPackageId"]) -> Iterable["BinaryPackageId"]: 

105 """Iterate over all packages that are in the suite 

106 

107 :param pkgs: An iterable of package ids 

108 :return: An iterable of package ids that are in the suite 

109 """ 

110 yield from (x for x in pkgs if x in self.all_binaries_in_suite) 

111 

112 def is_cruft(self, pkg: "BinaryPackage") -> bool: 

113 """Check if the package is cruft in the suite 

114 

115 :param pkg: BinaryPackage to check 

116 Note that this package is assumed to be in the suite 

117 """ 

118 newest_src_in_suite = self.sources[pkg.source] 

119 return pkg.source_version != newest_src_in_suite.version 

120 

121 

122class TargetSuite(Suite): 

123 

124 def __init__(self, *args, **kwargs): 

125 super().__init__(*args, **kwargs) 

126 self.inst_tester = None 

127 logger_name = ".".join((self.__class__.__module__, self.__class__.__name__)) 

128 self._logger = logging.getLogger(logger_name) 

129 

130 def is_installable(self, pkg_id: "BinaryPackageId") -> bool: 

131 """Determine whether the given package can be installed in the suite 

132 

133 :param pkg_id: A BinaryPackageId 

134 :return: True if the pkg is currently installable in the suite 

135 """ 

136 return self.inst_tester.is_installable(pkg_id) 

137 

138 def add_binary(self, pkg_id: "BinaryPackageId") -> None: 

139 """Add a binary package to the suite 

140 

141 If the package is not known, this method will throw an 

142 KeyError. 

143 

144 :param pkg_id The id of the package 

145 """ 

146 

147 # TODO The calling code currently manually updates the contents of 

148 # target_suite.binaries when this is called. It would probably make 

149 # more sense to do that here instead 

150 self.inst_tester.add_binary(pkg_id) 

151 self._all_binaries_in_suite = None 

152 

153 def remove_binary(self, pkg_id: "BinaryPackageId") -> None: 

154 """Remove a binary from the suite 

155 

156 :param pkg_id The id of the package 

157 If the package is not known, this method will throw an 

158 KeyError. 

159 """ 

160 

161 # TODO The calling code currently manually updates the contents of 

162 # target_suite.binaries when this is called. It would probably make 

163 # more sense to do that here instead 

164 self.inst_tester.remove_binary(pkg_id) 

165 self._all_binaries_in_suite = None 

166 

167 def check_suite_source_pkg_consistency(self, comment: str) -> None: 

168 sources_t = self.sources 

169 binaries_t = self.binaries 

170 logger = self._logger 

171 issues_found = False 

172 

173 logger.info("check_target_suite_source_pkg_consistency %s", comment) 

174 

175 for arch in binaries_t: 

176 for pkg_name in binaries_t[arch]: 

177 pkg = binaries_t[arch][pkg_name] 

178 src = pkg.source 

179 

180 if src not in sources_t: # pragma: no cover 

181 issues_found = True 

182 logger.error("inconsistency found (%s): src %s not in target, target has pkg %s with source %s" % ( 

183 comment, src, pkg_name, src)) 

184 

185 for src in sources_t: 

186 source_data = sources_t[src] 

187 for pkg_id in source_data.binaries: 

188 binary, _, parch = pkg_id 

189 if binary not in binaries_t[parch]: # pragma: no cover 

190 issues_found = True 

191 logger.error("inconsistency found (%s): binary %s from source %s not in binaries_t[%s]" % ( 

192 comment, binary, src, parch)) 

193 

194 if issues_found: # pragma: no cover 

195 raise AssertionError("inconsistencies found in target suite") 

196 

197 

198class Suites(object): 

199 

200 def __init__(self, target_suite: TargetSuite, source_suites: list[Suite]): 

201 self._suites: dict[str, Suite] = {} 

202 self._by_name_or_alias: dict[str, Suite] = {} 

203 self.target_suite = target_suite 

204 self.source_suites = source_suites 

205 self._suites[target_suite.name] = target_suite 

206 self._by_name_or_alias[target_suite.name] = target_suite 

207 if target_suite.suite_short_name: 207 ↛ 208line 207 didn't jump to line 208, because the condition on line 207 was never true

208 self._by_name_or_alias[target_suite.suite_short_name] = target_suite 

209 for suite in source_suites: 

210 self._suites[suite.name] = suite 

211 self._by_name_or_alias[suite.name] = suite 

212 if suite.suite_short_name: 

213 self._by_name_or_alias[suite.suite_short_name] = suite 

214 

215 @property 

216 def primary_source_suite(self) -> Suite: 

217 return self.source_suites[0] 

218 

219 @property 

220 def by_name_or_alias(self): 

221 return self._by_name_or_alias 

222 

223 @property 

224 def additional_source_suites(self) -> list[Suite]: 

225 return self.source_suites[1:] 

226 

227 def __getitem__(self, item: str) -> Suite: 

228 return self._suites[item] 

229 

230 def __len__(self) -> int: 

231 return len(self.source_suites) + 1 

232 

233 def __contains__(self, item) -> bool: 

234 return item in self._suites 

235 

236 def __iter__(self): 

237 # Sources first (as we will rely on this for loading data in the old live-data tests) 

238 yield from self.source_suites 

239 yield self.target_suite 

240 

241 

242class SourcePackage(object): 

243 

244 __slots__ = ['source', 'version', 'section', 'binaries', 'maintainer', 'is_fakesrc', 'build_deps_arch', 

245 'build_deps_indep', 'testsuite', 'testsuite_triggers'] 

246 

247 def __init__(self, source: str, version: str, section: str, binaries: set["BinaryPackageId"], 

248 maintainer: Optional[str], is_fakesrc: bool, build_deps_arch: Optional[str], 

249 build_deps_indep: Optional[str], testsuite: list[str], testsuite_triggers: list[str]): 

250 self.source = source 

251 self.version = version 

252 self.section = section 

253 self.binaries = binaries 

254 self.maintainer = maintainer 

255 self.is_fakesrc = is_fakesrc 

256 self.build_deps_arch = build_deps_arch 

257 self.build_deps_indep = build_deps_indep 

258 self.testsuite = testsuite 

259 self.testsuite_triggers = testsuite_triggers 

260 

261 def __getitem__(self, item): 

262 return getattr(self, self.__slots__[item]) 

263 

264 

265class PackageId(namedtuple( 

266 'PackageId', 

267 [ 

268 'package_name', 

269 'version', 

270 'architecture', 

271 ])): 

272 """Represent a source or binary package""" 

273 

274 def __init__(self, package_name, version, architecture): 

275 assert self.architecture != 'all', "all not allowed for PackageId (%s)" % (self.name) 

276 

277 def __repr__(self) -> str: 

278 return ('PID(%s)' % (self.name)) 

279 

280 @property 

281 def name(self) -> str: 

282 if self.architecture == "source": 282 ↛ 283line 282 didn't jump to line 283, because the condition on line 282 was never true

283 return ('%s/%s' % (self.package_name, self.version)) 

284 else: 

285 return ('%s/%s/%s' % (self.package_name, self.version, self.architecture)) 

286 

287 @property 

288 def uvname(self) -> str: 

289 if self.architecture == "source": 289 ↛ 292line 289 didn't jump to line 292, because the condition on line 289 was never false

290 return ('%s' % (self.package_name)) 

291 else: 

292 return ('%s/%s' % (self.package_name, self.architecture)) 

293 

294 

295class BinaryPackageId(PackageId): 

296 """Represent a binary package""" 

297 

298 def __init__(self, package_name, version, architecture): 

299 assert self.architecture != 'source', "Source not allowed for BinaryPackageId (%s)" % (self.name) 

300 super().__init__(package_name, version, architecture) 

301 

302 def __repr__(self): 

303 return ('BPID(%s)' % (self.name)) 

304 

305 

306class BinaryPackage(NamedTuple): 

307 version: str 

308 section: str 

309 source: str 

310 source_version: str 

311 architecture: str 

312 multi_arch: Optional[str] 

313 depends: Optional[str] 

314 conflicts: Optional[str] 

315 provides: list[tuple[str, str, str]] 

316 is_essential: bool 

317 pkg_id: BinaryPackageId 

318 builtusing: list[tuple[str, str]]