Coverage for nova/pci/devspec.py: 97%

204 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-17 15:08 +0000

1# 

2# Licensed under the Apache License, Version 2.0 (the "License"); you may 

3# not use this file except in compliance with the License. You may obtain 

4# a copy of the License at 

5# 

6# http://www.apache.org/licenses/LICENSE-2.0 

7# 

8# Unless required by applicable law or agreed to in writing, software 

9# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

10# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

11# License for the specific language governing permissions and limitations 

12# under the License. 

13 

14import abc 

15import copy 

16import re 

17import string 

18import typing as ty 

19 

20import nova.conf 

21from nova import exception 

22from nova.i18n import _ 

23from nova import objects 

24from nova.pci.request import PCI_REMOTE_MANAGED_TAG 

25from nova.pci import utils 

26from oslo_log import log as logging 

27from oslo_utils import strutils 

28 

29MAX_VENDOR_ID = 0xFFFF 

30MAX_PRODUCT_ID = 0xFFFF 

31MAX_FUNC = 0x7 

32MAX_DOMAIN = 0xFFFF 

33MAX_BUS = 0xFF 

34MAX_SLOT = 0x1F 

35ANY = '*' 

36REGEX_ANY = '.*' 

37 

38LOG = logging.getLogger(__name__) 

39CONF = nova.conf.CONF 

40 

41PCISpecAddressType = ty.Union[ty.Dict[str, str], str] 

42 

43 

44class PciAddressSpec(metaclass=abc.ABCMeta): 

45 """Abstract class for all PCI address spec styles 

46 

47 This class checks the address fields of the pci.device_spec 

48 """ 

49 

50 def __init__(self, pci_addr: str) -> None: 

51 self.domain = '' 

52 self.bus = '' 

53 self.slot = '' 

54 self.func = '' 

55 

56 @abc.abstractmethod 

57 def match(self, pci_addr): 

58 pass 

59 

60 def is_single_address(self) -> bool: 

61 return all([ 

62 all(c in string.hexdigits for c in self.domain), 

63 all(c in string.hexdigits for c in self.bus), 

64 all(c in string.hexdigits for c in self.slot), 

65 all(c in string.hexdigits for c in self.func)]) 

66 

67 def _set_pci_dev_info( 

68 self, prop: str, maxval: int, hex_value: str 

69 ) -> None: 

70 a = getattr(self, prop) 

71 if a == ANY: 

72 return 

73 try: 

74 v = int(a, 16) 

75 except ValueError: 

76 raise exception.PciConfigInvalidSpec( 

77 reason=_("property %(property)s ('%(attr)s') does not parse " 

78 "as a hex number.") % {'property': prop, 'attr': a}) 

79 if v > maxval: 

80 raise exception.PciConfigInvalidSpec( 

81 reason=_("property %(property)s (%(attr)s) is greater than " 

82 "the maximum allowable value (%(max)X).") % 

83 {'property': prop, 'attr': a, 'max': maxval}) 

84 setattr(self, prop, hex_value % v) 

85 

86 

87class PhysicalPciAddress(PciAddressSpec): 

88 """Manages the address fields for a fully-qualified PCI address. 

89 

90 This function class will validate the address fields for a single 

91 PCI device. 

92 """ 

93 

94 def __init__(self, pci_addr: PCISpecAddressType) -> None: 

95 try: 

96 # TODO(stephenfin): Is this ever actually a string? 

97 if isinstance(pci_addr, dict): 

98 self.domain = pci_addr['domain'] 

99 self.bus = pci_addr['bus'] 

100 self.slot = pci_addr['slot'] 

101 self.func = pci_addr['function'] 

102 else: 

103 self.domain, self.bus, self.slot, self.func = ( 

104 utils.get_pci_address_fields(pci_addr)) 

105 self._set_pci_dev_info('func', MAX_FUNC, '%1x') 

106 self._set_pci_dev_info('domain', MAX_DOMAIN, '%04x') 

107 self._set_pci_dev_info('bus', MAX_BUS, '%02x') 

108 self._set_pci_dev_info('slot', MAX_SLOT, '%02x') 

109 except (KeyError, ValueError): 

110 raise exception.PciDeviceWrongAddressFormat(address=pci_addr) 

111 

112 def match(self, phys_pci_addr: PciAddressSpec) -> bool: 

113 conditions = [ 

114 self.domain == phys_pci_addr.domain, 

115 self.bus == phys_pci_addr.bus, 

116 self.slot == phys_pci_addr.slot, 

117 self.func == phys_pci_addr.func, 

118 ] 

119 return all(conditions) 

120 

121 def __str__(self): 

122 return f'{self.domain}:{self.bus}:{self.slot}.{self.func}' 

123 

124 

125class PciAddressGlobSpec(PciAddressSpec): 

126 """Manages the address fields with glob style. 

127 

128 This function class will validate the address fields with glob style, 

129 check for wildcards, and insert wildcards where the field is left blank. 

130 """ 

131 

132 def __init__(self, pci_addr: str) -> None: 

133 self.domain = ANY 

134 self.bus = ANY 

135 self.slot = ANY 

136 self.func = ANY 

137 

138 dbs, sep, func = pci_addr.partition('.') 

139 if func: 

140 self.func = func.strip() 

141 self._set_pci_dev_info('func', MAX_FUNC, '%01x') 

142 if dbs: 

143 dbs_fields = dbs.split(':') 

144 if len(dbs_fields) > 3: 

145 raise exception.PciDeviceWrongAddressFormat(address=pci_addr) 

146 # If we got a partial address like ":00.", we need to turn this 

147 # into a domain of ANY, a bus of ANY, and a slot of 00. This code 

148 # allows the address bus and/or domain to be left off 

149 dbs_all = [ANY] * (3 - len(dbs_fields)) 

150 dbs_all.extend(dbs_fields) 

151 dbs_checked = [s.strip() or ANY for s in dbs_all] 

152 self.domain, self.bus, self.slot = dbs_checked 

153 self._set_pci_dev_info('domain', MAX_DOMAIN, '%04x') 

154 self._set_pci_dev_info('bus', MAX_BUS, '%02x') 

155 self._set_pci_dev_info('slot', MAX_SLOT, '%02x') 

156 

157 def match(self, phys_pci_addr: PciAddressSpec) -> bool: 

158 conditions = [ 

159 self.domain in (ANY, phys_pci_addr.domain), 

160 self.bus in (ANY, phys_pci_addr.bus), 

161 self.slot in (ANY, phys_pci_addr.slot), 

162 self.func in (ANY, phys_pci_addr.func) 

163 ] 

164 return all(conditions) 

165 

166 

167class PciAddressRegexSpec(PciAddressSpec): 

168 """Manages the address fields with regex style. 

169 

170 This function class will validate the address fields with regex style. 

171 The validation includes check for all PCI address attributes and validate 

172 their regex. 

173 """ 

174 

175 def __init__(self, pci_addr: dict) -> None: 

176 try: 

177 self.domain = pci_addr.get('domain', REGEX_ANY) 

178 self.bus = pci_addr.get('bus', REGEX_ANY) 

179 self.slot = pci_addr.get('slot', REGEX_ANY) 

180 self.func = pci_addr.get('function', REGEX_ANY) 

181 self.domain_regex = re.compile(self.domain) 

182 self.bus_regex = re.compile(self.bus) 

183 self.slot_regex = re.compile(self.slot) 

184 self.func_regex = re.compile(self.func) 

185 except re.error: 

186 raise exception.PciDeviceWrongAddressFormat(address=pci_addr) 

187 

188 def match(self, phys_pci_addr: PciAddressSpec) -> bool: 

189 conditions = [ 

190 bool(self.domain_regex.match(phys_pci_addr.domain)), 

191 bool(self.bus_regex.match(phys_pci_addr.bus)), 

192 bool(self.slot_regex.match(phys_pci_addr.slot)), 

193 bool(self.func_regex.match(phys_pci_addr.func)) 

194 ] 

195 return all(conditions) 

196 

197 

198class WhitelistPciAddress(object): 

199 """Manages the address fields of the whitelist. 

200 

201 This class checks the address fields of the pci.device_spec 

202 configuration option, validating the address fields. 

203 Example configs: 

204 

205 | [pci] 

206 | device_spec = {"address":"*:0a:00.*", 

207 | "physical_network":"physnet1"} 

208 | device_spec = {"address": {"domain": ".*", 

209 "bus": "02", 

210 "slot": "01", 

211 "function": "[0-2]"}, 

212 "physical_network":"net1"} 

213 | device_spec = {"vendor_id":"1137","product_id":"0071"} 

214 

215 """ 

216 

217 def __init__( 

218 self, pci_addr: PCISpecAddressType, is_physical_function: bool 

219 ) -> None: 

220 self.is_physical_function = is_physical_function 

221 self._init_address_fields(pci_addr) 

222 

223 def _check_physical_function(self) -> None: 

224 if self.pci_address_spec.is_single_address(): 

225 self.is_physical_function = ( 

226 utils.is_physical_function( 

227 self.pci_address_spec.domain, 

228 self.pci_address_spec.bus, 

229 self.pci_address_spec.slot, 

230 self.pci_address_spec.func)) 

231 

232 def _init_address_fields(self, pci_addr: PCISpecAddressType) -> None: 

233 self.pci_address_spec: PciAddressSpec 

234 if not self.is_physical_function: 

235 if isinstance(pci_addr, str): 

236 self.pci_address_spec = PciAddressGlobSpec(pci_addr) 

237 elif isinstance(pci_addr, dict): 

238 self.pci_address_spec = PciAddressRegexSpec(pci_addr) 

239 else: 

240 raise exception.PciDeviceWrongAddressFormat(address=pci_addr) 

241 self._check_physical_function() 

242 else: 

243 self.pci_address_spec = PhysicalPciAddress(pci_addr) 

244 

245 def match(self, pci_addr: str, pci_phys_addr: ty.Optional[str]) -> bool: 

246 """Match a device to this PciAddress. 

247 

248 Assume this is called with a ``pci_addr`` and ``pci_phys_addr`` 

249 reported by libvirt. No attempt is made to verify if ``pci_addr`` is a 

250 VF of ``pci_phys_addr``. 

251 

252 :param pci_addr: PCI address of the device to match. 

253 :param pci_phys_addr: PCI address of the parent of the device to match 

254 (or None if the device is not a VF). 

255 """ 

256 

257 # Try to match on the parent PCI address if the PciDeviceSpec is a 

258 # PF (sriov is available) and the device to match is a VF. This 

259 # makes it possible to specify the PCI address of a PF in the 

260 # pci.device_spec to match any of its VFs' PCI addresses. 

261 if self.is_physical_function and pci_phys_addr: 

262 pci_phys_addr_obj = PhysicalPciAddress(pci_phys_addr) 

263 if self.pci_address_spec.match(pci_phys_addr_obj): 263 ↛ 267line 263 didn't jump to line 267 because the condition on line 263 was always true

264 return True 

265 

266 # Try to match on the device PCI address only. 

267 pci_addr_obj = PhysicalPciAddress(pci_addr) 

268 return self.pci_address_spec.match(pci_addr_obj) 

269 

270 

271class PciDeviceSpec(PciAddressSpec): 

272 def __init__(self, dev_spec: ty.Dict[str, str]) -> None: 

273 # stored for better error reporting 

274 self.dev_spec_conf = copy.deepcopy(dev_spec) 

275 # the non tag fields (i.e. address, devname) will be removed by 

276 # _init_dev_details 

277 self.tags = dev_spec 

278 self._init_dev_details() 

279 

280 def _address_obj(self) -> ty.Optional[WhitelistPciAddress]: 

281 address_obj = None 

282 if self.dev_name: 

283 address_str, pf = utils.get_function_by_ifname(self.dev_name) 

284 if not address_str: 

285 return None 

286 # Note(moshele): In this case we always passing a string 

287 # of the PF pci address 

288 address_obj = WhitelistPciAddress(address_str, pf) 

289 else: # use self.address 

290 address_obj = self.address 

291 

292 return address_obj 

293 

294 def _init_dev_details(self) -> None: 

295 self.vendor_id = self.tags.pop("vendor_id", ANY) 

296 self.product_id = self.tags.pop("product_id", ANY) 

297 self.dev_name = self.tags.pop("devname", None) 

298 self.address: ty.Optional[WhitelistPciAddress] = None 

299 # Note(moshele): The address attribute can be a string or a dict. 

300 # For glob syntax or specific pci it is a string and for regex syntax 

301 # it is a dict. The WhitelistPciAddress class handles both types. 

302 address = self.tags.pop("address", None) 

303 

304 self.vendor_id = self.vendor_id.strip() 

305 self._set_pci_dev_info('vendor_id', MAX_VENDOR_ID, '%04x') 

306 self._set_pci_dev_info('product_id', MAX_PRODUCT_ID, '%04x') 

307 

308 if address and self.dev_name: 

309 raise exception.PciDeviceInvalidDeviceName() 

310 

311 if not self.dev_name: 

312 self.address = WhitelistPciAddress(address or '*:*:*.*', False) 

313 

314 # PFs with remote_managed tags are explicitly not supported. If they 

315 # are tagged as such by mistake in the whitelist Nova will 

316 # raise an exception. The reason for excluding PFs is the lack of a way 

317 # for an instance to access the control plane at the remote side (e.g. 

318 # on a DPU) for managing the PF representor corresponding to the PF. 

319 address_obj = self._address_obj() 

320 self._remote_managed = strutils.bool_from_string( 

321 self.tags.get(PCI_REMOTE_MANAGED_TAG)) 

322 

323 self._normalize_device_spec_tag("managed") 

324 self._normalize_device_spec_tag("live_migratable") 

325 self._normalize_device_spec_tag("one_time_use") 

326 

327 if self.tags.get('one_time_use') == 'true': 

328 # Validate that one_time_use=true is not set on devices where we 

329 # cannot support proper reservation protection. 

330 if not CONF.pci.report_in_placement: 

331 raise exception.PciConfigInvalidSpec( 

332 reason=_('one_time_use=true requires ' 

333 'pci.report_in_placement to be enabled')) 

334 

335 if self._remote_managed: 

336 if address_obj is None: 

337 # Note that this will happen if a netdev was specified in the 

338 # whitelist but it is not actually present on a system - in 

339 # this case Nova is not able to look up an address by 

340 # a netdev name. 

341 raise exception.PciDeviceRemoteManagedNotPresent() 

342 elif address_obj.is_physical_function: 

343 pf_addr = str(address_obj.pci_address_spec) 

344 vf_product_id = utils.get_vf_product_id_by_pf_addr(pf_addr) 

345 # VF vendor IDs have to match the corresponding PF vendor IDs 

346 # per the SR-IOV spec so we use it for matching here. 

347 pf_vendor_id, pf_product_id = utils.get_pci_ids_by_pci_addr( 

348 pf_addr) 

349 # Check the actual vendor ID and VF product ID of an assumed 

350 # VF (based on the actual PF). The VF product ID must match 

351 # the actual one if this is a VF device spec. 

352 if (self.product_id == vf_product_id and 

353 self.vendor_id in (pf_vendor_id, ANY)): 

354 pass 

355 elif (self.product_id in (pf_product_id, ANY) and 

356 self.vendor_id in (pf_vendor_id, ANY)): 

357 raise exception.PciDeviceInvalidPFRemoteManaged( 

358 address_obj.pci_address_spec) 

359 else: 

360 # The specified product and vendor IDs of what is supposed 

361 # to be a VF corresponding to the PF PCI address do not 

362 # match the actual ones for this PF. This means that the 

363 # whitelist is invalid. 

364 raise exception.PciConfigInvalidSpec( 

365 reason=_('the specified VF vendor ID %(vendor_id)s and' 

366 ' product ID %(product_id)s do not match the' 

367 ' expected VF IDs based on the corresponding' 

368 ' PF identified by PCI address %(pf_addr)s') % 

369 {'vendor_id': self.vendor_id, 

370 'product_id': self.product_id, 

371 'pf_addr': pf_addr}) 

372 

373 def _ensure_remote_managed_dev_vpd_serial( 

374 self, dev_dict: ty.Dict[str, ty.Any]) -> bool: 

375 """Ensure the presence of a serial number field in PCI VPD. 

376 

377 A card serial number extracted from PCI VPD is required to allow a 

378 networking backend to identify which remote host needs to program a 

379 given device. So if a device is tagged as remote_managed, it must 

380 have the card serial number or be filtered out. 

381 """ 

382 if not self._remote_managed: 

383 return True 

384 card_sn = dev_dict.get('capabilities', {}).get( 

385 'vpd', {}).get('card_serial_number') 

386 # None or empty card_serial_number should be filtered out. That would 

387 # mean either no serial number in the VPD (if present at all) or SN is 

388 # an empty string which is not useful for device identification. 

389 return bool(card_sn) 

390 

391 def match(self, dev_dict: ty.Dict[str, ty.Any]) -> bool: 

392 address_obj: ty.Optional[WhitelistPciAddress] = self._address_obj() 

393 if not address_obj: 

394 return False 

395 

396 return all([ 

397 self.vendor_id in (ANY, dev_dict['vendor_id']), 

398 self.product_id in (ANY, dev_dict['product_id']), 

399 address_obj.match(dev_dict['address'], 

400 dev_dict.get('parent_addr')), 

401 self._ensure_remote_managed_dev_vpd_serial(dev_dict), 

402 ]) 

403 

404 def match_pci_obj(self, pci_obj: 'objects.PciDevice') -> bool: 

405 dev_dict = { 

406 'vendor_id': pci_obj.vendor_id, 

407 'product_id': pci_obj.product_id, 

408 'address': pci_obj.address, 

409 'parent_addr': pci_obj.parent_addr, 

410 'capabilities': { 

411 'vpd': {'card_serial_number': pci_obj.card_serial_number}} 

412 } 

413 return self.match(dev_dict) 

414 

415 def get_tags(self) -> ty.Dict[str, str]: 

416 return self.tags 

417 

418 def _normalize_device_spec_tag(self, tag): 

419 if self.tags.get(tag, None) is not None: 

420 try: 

421 self.tags[tag] = ( 

422 "true" if strutils.bool_from_string( 

423 self.tags.get(tag), strict=True) else "false") 

424 except ValueError as e: 

425 raise exception.PciConfigInvalidSpec( 

426 reason=f"Cannot parse tag '{tag}': " + str(e) 

427 ) 

428 

429 def enhanced_pci_device_with_spec_tags(self, dev: ty.Dict[str, ty.Any]): 

430 spec_tags = ["managed", "live_migratable"] 

431 for tag in spec_tags: 

432 tag_value = self.tags.get(tag) 

433 if tag_value is not None: 

434 dev.update({tag: tag_value})