Coverage for nova/pci/devspec.py: 97%
204 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-24 11:16 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-24 11:16 +0000
1#
2# Licensed under the Apache License, Version 2.0 (the "License"); you may
3# not use this file except in compliance with the License. You may obtain
4# a copy of the License at
5#
6# http://www.apache.org/licenses/LICENSE-2.0
7#
8# Unless required by applicable law or agreed to in writing, software
9# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11# License for the specific language governing permissions and limitations
12# under the License.
14import abc
15import copy
16import re
17import string
18import typing as ty
20import nova.conf
21from nova import exception
22from nova.i18n import _
23from nova import objects
24from nova.pci.request import PCI_REMOTE_MANAGED_TAG
25from nova.pci import utils
26from oslo_log import log as logging
27from oslo_utils import strutils
29MAX_VENDOR_ID = 0xFFFF
30MAX_PRODUCT_ID = 0xFFFF
31MAX_FUNC = 0x7
32MAX_DOMAIN = 0xFFFF
33MAX_BUS = 0xFF
34MAX_SLOT = 0x1F
35ANY = '*'
36REGEX_ANY = '.*'
38LOG = logging.getLogger(__name__)
39CONF = nova.conf.CONF
41PCISpecAddressType = ty.Union[ty.Dict[str, str], str]
44class PciAddressSpec(metaclass=abc.ABCMeta):
45 """Abstract class for all PCI address spec styles
47 This class checks the address fields of the pci.device_spec
48 """
50 def __init__(self, pci_addr: str) -> None:
51 self.domain = ''
52 self.bus = ''
53 self.slot = ''
54 self.func = ''
56 @abc.abstractmethod
57 def match(self, pci_addr):
58 pass
60 def is_single_address(self) -> bool:
61 return all([
62 all(c in string.hexdigits for c in self.domain),
63 all(c in string.hexdigits for c in self.bus),
64 all(c in string.hexdigits for c in self.slot),
65 all(c in string.hexdigits for c in self.func)])
67 def _set_pci_dev_info(
68 self, prop: str, maxval: int, hex_value: str
69 ) -> None:
70 a = getattr(self, prop)
71 if a == ANY:
72 return
73 try:
74 v = int(a, 16)
75 except ValueError:
76 raise exception.PciConfigInvalidSpec(
77 reason=_("property %(property)s ('%(attr)s') does not parse "
78 "as a hex number.") % {'property': prop, 'attr': a})
79 if v > maxval:
80 raise exception.PciConfigInvalidSpec(
81 reason=_("property %(property)s (%(attr)s) is greater than "
82 "the maximum allowable value (%(max)X).") %
83 {'property': prop, 'attr': a, 'max': maxval})
84 setattr(self, prop, hex_value % v)
87class PhysicalPciAddress(PciAddressSpec):
88 """Manages the address fields for a fully-qualified PCI address.
90 This function class will validate the address fields for a single
91 PCI device.
92 """
94 def __init__(self, pci_addr: PCISpecAddressType) -> None:
95 try:
96 # TODO(stephenfin): Is this ever actually a string?
97 if isinstance(pci_addr, dict):
98 self.domain = pci_addr['domain']
99 self.bus = pci_addr['bus']
100 self.slot = pci_addr['slot']
101 self.func = pci_addr['function']
102 else:
103 self.domain, self.bus, self.slot, self.func = (
104 utils.get_pci_address_fields(pci_addr))
105 self._set_pci_dev_info('func', MAX_FUNC, '%1x')
106 self._set_pci_dev_info('domain', MAX_DOMAIN, '%04x')
107 self._set_pci_dev_info('bus', MAX_BUS, '%02x')
108 self._set_pci_dev_info('slot', MAX_SLOT, '%02x')
109 except (KeyError, ValueError):
110 raise exception.PciDeviceWrongAddressFormat(address=pci_addr)
112 def match(self, phys_pci_addr: PciAddressSpec) -> bool:
113 conditions = [
114 self.domain == phys_pci_addr.domain,
115 self.bus == phys_pci_addr.bus,
116 self.slot == phys_pci_addr.slot,
117 self.func == phys_pci_addr.func,
118 ]
119 return all(conditions)
121 def __str__(self):
122 return f'{self.domain}:{self.bus}:{self.slot}.{self.func}'
125class PciAddressGlobSpec(PciAddressSpec):
126 """Manages the address fields with glob style.
128 This function class will validate the address fields with glob style,
129 check for wildcards, and insert wildcards where the field is left blank.
130 """
132 def __init__(self, pci_addr: str) -> None:
133 self.domain = ANY
134 self.bus = ANY
135 self.slot = ANY
136 self.func = ANY
138 dbs, sep, func = pci_addr.partition('.')
139 if func:
140 self.func = func.strip()
141 self._set_pci_dev_info('func', MAX_FUNC, '%01x')
142 if dbs:
143 dbs_fields = dbs.split(':')
144 if len(dbs_fields) > 3:
145 raise exception.PciDeviceWrongAddressFormat(address=pci_addr)
146 # If we got a partial address like ":00.", we need to turn this
147 # into a domain of ANY, a bus of ANY, and a slot of 00. This code
148 # allows the address bus and/or domain to be left off
149 dbs_all = [ANY] * (3 - len(dbs_fields))
150 dbs_all.extend(dbs_fields)
151 dbs_checked = [s.strip() or ANY for s in dbs_all]
152 self.domain, self.bus, self.slot = dbs_checked
153 self._set_pci_dev_info('domain', MAX_DOMAIN, '%04x')
154 self._set_pci_dev_info('bus', MAX_BUS, '%02x')
155 self._set_pci_dev_info('slot', MAX_SLOT, '%02x')
157 def match(self, phys_pci_addr: PciAddressSpec) -> bool:
158 conditions = [
159 self.domain in (ANY, phys_pci_addr.domain),
160 self.bus in (ANY, phys_pci_addr.bus),
161 self.slot in (ANY, phys_pci_addr.slot),
162 self.func in (ANY, phys_pci_addr.func)
163 ]
164 return all(conditions)
167class PciAddressRegexSpec(PciAddressSpec):
168 """Manages the address fields with regex style.
170 This function class will validate the address fields with regex style.
171 The validation includes check for all PCI address attributes and validate
172 their regex.
173 """
175 def __init__(self, pci_addr: dict) -> None:
176 try:
177 self.domain = pci_addr.get('domain', REGEX_ANY)
178 self.bus = pci_addr.get('bus', REGEX_ANY)
179 self.slot = pci_addr.get('slot', REGEX_ANY)
180 self.func = pci_addr.get('function', REGEX_ANY)
181 self.domain_regex = re.compile(self.domain)
182 self.bus_regex = re.compile(self.bus)
183 self.slot_regex = re.compile(self.slot)
184 self.func_regex = re.compile(self.func)
185 except re.error:
186 raise exception.PciDeviceWrongAddressFormat(address=pci_addr)
188 def match(self, phys_pci_addr: PciAddressSpec) -> bool:
189 conditions = [
190 bool(self.domain_regex.match(phys_pci_addr.domain)),
191 bool(self.bus_regex.match(phys_pci_addr.bus)),
192 bool(self.slot_regex.match(phys_pci_addr.slot)),
193 bool(self.func_regex.match(phys_pci_addr.func))
194 ]
195 return all(conditions)
198class WhitelistPciAddress(object):
199 """Manages the address fields of the whitelist.
201 This class checks the address fields of the pci.device_spec
202 configuration option, validating the address fields.
203 Example configs:
205 | [pci]
206 | device_spec = {"address":"*:0a:00.*",
207 | "physical_network":"physnet1"}
208 | device_spec = {"address": {"domain": ".*",
209 "bus": "02",
210 "slot": "01",
211 "function": "[0-2]"},
212 "physical_network":"net1"}
213 | device_spec = {"vendor_id":"1137","product_id":"0071"}
215 """
217 def __init__(
218 self, pci_addr: PCISpecAddressType, is_physical_function: bool
219 ) -> None:
220 self.is_physical_function = is_physical_function
221 self._init_address_fields(pci_addr)
223 def _check_physical_function(self) -> None:
224 if self.pci_address_spec.is_single_address():
225 self.is_physical_function = (
226 utils.is_physical_function(
227 self.pci_address_spec.domain,
228 self.pci_address_spec.bus,
229 self.pci_address_spec.slot,
230 self.pci_address_spec.func))
232 def _init_address_fields(self, pci_addr: PCISpecAddressType) -> None:
233 self.pci_address_spec: PciAddressSpec
234 if not self.is_physical_function:
235 if isinstance(pci_addr, str):
236 self.pci_address_spec = PciAddressGlobSpec(pci_addr)
237 elif isinstance(pci_addr, dict):
238 self.pci_address_spec = PciAddressRegexSpec(pci_addr)
239 else:
240 raise exception.PciDeviceWrongAddressFormat(address=pci_addr)
241 self._check_physical_function()
242 else:
243 self.pci_address_spec = PhysicalPciAddress(pci_addr)
245 def match(self, pci_addr: str, pci_phys_addr: ty.Optional[str]) -> bool:
246 """Match a device to this PciAddress.
248 Assume this is called with a ``pci_addr`` and ``pci_phys_addr``
249 reported by libvirt. No attempt is made to verify if ``pci_addr`` is a
250 VF of ``pci_phys_addr``.
252 :param pci_addr: PCI address of the device to match.
253 :param pci_phys_addr: PCI address of the parent of the device to match
254 (or None if the device is not a VF).
255 """
257 # Try to match on the parent PCI address if the PciDeviceSpec is a
258 # PF (sriov is available) and the device to match is a VF. This
259 # makes it possible to specify the PCI address of a PF in the
260 # pci.device_spec to match any of its VFs' PCI addresses.
261 if self.is_physical_function and pci_phys_addr:
262 pci_phys_addr_obj = PhysicalPciAddress(pci_phys_addr)
263 if self.pci_address_spec.match(pci_phys_addr_obj): 263 ↛ 267line 263 didn't jump to line 267 because the condition on line 263 was always true
264 return True
266 # Try to match on the device PCI address only.
267 pci_addr_obj = PhysicalPciAddress(pci_addr)
268 return self.pci_address_spec.match(pci_addr_obj)
271class PciDeviceSpec(PciAddressSpec):
272 def __init__(self, dev_spec: ty.Dict[str, str]) -> None:
273 # stored for better error reporting
274 self.dev_spec_conf = copy.deepcopy(dev_spec)
275 # the non tag fields (i.e. address, devname) will be removed by
276 # _init_dev_details
277 self.tags = dev_spec
278 self._init_dev_details()
280 def _address_obj(self) -> ty.Optional[WhitelistPciAddress]:
281 address_obj = None
282 if self.dev_name:
283 address_str, pf = utils.get_function_by_ifname(self.dev_name)
284 if not address_str:
285 return None
286 # Note(moshele): In this case we always passing a string
287 # of the PF pci address
288 address_obj = WhitelistPciAddress(address_str, pf)
289 else: # use self.address
290 address_obj = self.address
292 return address_obj
294 def _init_dev_details(self) -> None:
295 self.vendor_id = self.tags.pop("vendor_id", ANY)
296 self.product_id = self.tags.pop("product_id", ANY)
297 self.dev_name = self.tags.pop("devname", None)
298 self.address: ty.Optional[WhitelistPciAddress] = None
299 # Note(moshele): The address attribute can be a string or a dict.
300 # For glob syntax or specific pci it is a string and for regex syntax
301 # it is a dict. The WhitelistPciAddress class handles both types.
302 address = self.tags.pop("address", None)
304 self.vendor_id = self.vendor_id.strip()
305 self._set_pci_dev_info('vendor_id', MAX_VENDOR_ID, '%04x')
306 self._set_pci_dev_info('product_id', MAX_PRODUCT_ID, '%04x')
308 if address and self.dev_name:
309 raise exception.PciDeviceInvalidDeviceName()
311 if not self.dev_name:
312 self.address = WhitelistPciAddress(address or '*:*:*.*', False)
314 # PFs with remote_managed tags are explicitly not supported. If they
315 # are tagged as such by mistake in the whitelist Nova will
316 # raise an exception. The reason for excluding PFs is the lack of a way
317 # for an instance to access the control plane at the remote side (e.g.
318 # on a DPU) for managing the PF representor corresponding to the PF.
319 address_obj = self._address_obj()
320 self._remote_managed = strutils.bool_from_string(
321 self.tags.get(PCI_REMOTE_MANAGED_TAG))
323 self._normalize_device_spec_tag("managed")
324 self._normalize_device_spec_tag("live_migratable")
325 self._normalize_device_spec_tag("one_time_use")
327 if self.tags.get('one_time_use') == 'true':
328 # Validate that one_time_use=true is not set on devices where we
329 # cannot support proper reservation protection.
330 if not CONF.pci.report_in_placement:
331 raise exception.PciConfigInvalidSpec(
332 reason=_('one_time_use=true requires '
333 'pci.report_in_placement to be enabled'))
335 if self._remote_managed:
336 if address_obj is None:
337 # Note that this will happen if a netdev was specified in the
338 # whitelist but it is not actually present on a system - in
339 # this case Nova is not able to look up an address by
340 # a netdev name.
341 raise exception.PciDeviceRemoteManagedNotPresent()
342 elif address_obj.is_physical_function:
343 pf_addr = str(address_obj.pci_address_spec)
344 vf_product_id = utils.get_vf_product_id_by_pf_addr(pf_addr)
345 # VF vendor IDs have to match the corresponding PF vendor IDs
346 # per the SR-IOV spec so we use it for matching here.
347 pf_vendor_id, pf_product_id = utils.get_pci_ids_by_pci_addr(
348 pf_addr)
349 # Check the actual vendor ID and VF product ID of an assumed
350 # VF (based on the actual PF). The VF product ID must match
351 # the actual one if this is a VF device spec.
352 if (self.product_id == vf_product_id and
353 self.vendor_id in (pf_vendor_id, ANY)):
354 pass
355 elif (self.product_id in (pf_product_id, ANY) and
356 self.vendor_id in (pf_vendor_id, ANY)):
357 raise exception.PciDeviceInvalidPFRemoteManaged(
358 address_obj.pci_address_spec)
359 else:
360 # The specified product and vendor IDs of what is supposed
361 # to be a VF corresponding to the PF PCI address do not
362 # match the actual ones for this PF. This means that the
363 # whitelist is invalid.
364 raise exception.PciConfigInvalidSpec(
365 reason=_('the specified VF vendor ID %(vendor_id)s and'
366 ' product ID %(product_id)s do not match the'
367 ' expected VF IDs based on the corresponding'
368 ' PF identified by PCI address %(pf_addr)s') %
369 {'vendor_id': self.vendor_id,
370 'product_id': self.product_id,
371 'pf_addr': pf_addr})
373 def _ensure_remote_managed_dev_vpd_serial(
374 self, dev_dict: ty.Dict[str, ty.Any]) -> bool:
375 """Ensure the presence of a serial number field in PCI VPD.
377 A card serial number extracted from PCI VPD is required to allow a
378 networking backend to identify which remote host needs to program a
379 given device. So if a device is tagged as remote_managed, it must
380 have the card serial number or be filtered out.
381 """
382 if not self._remote_managed:
383 return True
384 card_sn = dev_dict.get('capabilities', {}).get(
385 'vpd', {}).get('card_serial_number')
386 # None or empty card_serial_number should be filtered out. That would
387 # mean either no serial number in the VPD (if present at all) or SN is
388 # an empty string which is not useful for device identification.
389 return bool(card_sn)
391 def match(self, dev_dict: ty.Dict[str, ty.Any]) -> bool:
392 address_obj: ty.Optional[WhitelistPciAddress] = self._address_obj()
393 if not address_obj:
394 return False
396 return all([
397 self.vendor_id in (ANY, dev_dict['vendor_id']),
398 self.product_id in (ANY, dev_dict['product_id']),
399 address_obj.match(dev_dict['address'],
400 dev_dict.get('parent_addr')),
401 self._ensure_remote_managed_dev_vpd_serial(dev_dict),
402 ])
404 def match_pci_obj(self, pci_obj: 'objects.PciDevice') -> bool:
405 dev_dict = {
406 'vendor_id': pci_obj.vendor_id,
407 'product_id': pci_obj.product_id,
408 'address': pci_obj.address,
409 'parent_addr': pci_obj.parent_addr,
410 'capabilities': {
411 'vpd': {'card_serial_number': pci_obj.card_serial_number}}
412 }
413 return self.match(dev_dict)
415 def get_tags(self) -> ty.Dict[str, str]:
416 return self.tags
418 def _normalize_device_spec_tag(self, tag):
419 if self.tags.get(tag, None) is not None:
420 try:
421 self.tags[tag] = (
422 "true" if strutils.bool_from_string(
423 self.tags.get(tag), strict=True) else "false")
424 except ValueError as e:
425 raise exception.PciConfigInvalidSpec(
426 reason=f"Cannot parse tag '{tag}': " + str(e)
427 )
429 def enhanced_pci_device_with_spec_tags(self, dev: ty.Dict[str, ty.Any]):
430 spec_tags = ["managed", "live_migratable"]
431 for tag in spec_tags:
432 tag_value = self.tags.get(tag)
433 if tag_value is not None:
434 dev.update({tag: tag_value})