Coverage for nova/pci/utils.py: 95%
119 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
1# Copyright (c) 2013 Intel, Inc.
2# Copyright (c) 2012 OpenStack Foundation
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
17import glob
18import os
19import re
20import typing as ty
22from oslo_log import log as logging
24from nova import exception
26if ty.TYPE_CHECKING: 26 ↛ 28line 26 didn't jump to line 28 because the condition on line 26 was never true
27 # avoid circular import
28 from nova.pci import stats
30LOG = logging.getLogger(__name__)
32PCI_VENDOR_PATTERN = "^(hex{4})$".replace("hex", r"[\da-fA-F]")
33_PCI_ADDRESS_PATTERN = ("^(hex{4}):(hex{2}):(hex{2}).(oct{1})$".
34 replace("hex", r"[\da-fA-F]").
35 replace("oct", "[0-7]"))
36_PCI_ADDRESS_REGEX = re.compile(_PCI_ADDRESS_PATTERN)
37_SRIOV_TOTALVFS = "sriov_totalvfs"
40def pci_device_prop_match(
41 pci_dev: 'stats.Pool', specs: ty.List[ty.Dict[str, str]],
42) -> bool:
43 """Check if the pci_dev meet spec requirement
45 Specs is a list of PCI device property requirements.
46 An example of device requirement that the PCI should be either:
47 a) Device with vendor_id as 0x8086 and product_id as 0x8259, or
48 b) Device with vendor_id as 0x10de and product_id as 0x10d8:
50 [{"vendor_id":"8086", "product_id":"8259"},
51 {"vendor_id":"10de", "product_id":"10d8",
52 "capabilities_network": ["rx", "tx", "tso", "gso"]}]
54 """
56 def _matching_devices(spec: ty.Dict[str, str]) -> bool:
57 for k, v in spec.items():
58 pci_dev_v = pci_dev.get(k)
59 if isinstance(v, list) and isinstance(pci_dev_v, list):
60 if not all(x in pci_dev.get(k) for x in v):
61 return False
62 else:
63 # We don't need to check case for tags in order to avoid any
64 # mismatch with the tags provided by users for port
65 # binding profile and the ones configured by operators
66 # with pci whitelist option.
67 if isinstance(v, str):
68 v = v.lower()
69 if isinstance(pci_dev_v, str):
70 pci_dev_v = pci_dev_v.lower()
71 if pci_dev_v != v:
72 return False
73 return True
75 return any(_matching_devices(spec) for spec in specs)
78def parse_address(address: str) -> ty.Sequence[str]:
79 """Parse a PCI address.
81 Returns (domain, bus, slot, function) from PCI address that is stored in
82 PciDevice DB table.
83 """
84 m = _PCI_ADDRESS_REGEX.match(address)
85 if not m:
86 raise exception.PciDeviceWrongAddressFormat(address=address)
87 return m.groups()
90def get_pci_address_fields(pci_addr: str) -> ty.Tuple[str, str, str, str]:
91 """Parse a fully-specified PCI device address.
93 Does not validate that the components are valid hex or wildcard values.
95 :param pci_addr: A string of the form "<domain>:<bus>:<slot>.<function>".
96 :return: A 4-tuple of strings ("<domain>", "<bus>", "<slot>", "<function>")
97 """
98 dbs, sep, func = pci_addr.partition('.')
99 domain, bus, slot = dbs.split(':')
100 return domain, bus, slot, func
103def get_pci_address(domain: str, bus: str, slot: str, func: str) -> str:
104 """Assembles PCI address components into a fully-specified PCI address.
106 Does not validate that the components are valid hex or wildcard values.
108 :param domain, bus, slot, func: Hex or wildcard strings.
109 :return: A string of the form "<domain>:<bus>:<slot>.<function>".
110 """
111 return '%s:%s:%s.%s' % (domain, bus, slot, func)
114def get_function_by_ifname(ifname: str) -> ty.Tuple[ty.Optional[str], bool]:
115 """Given the device name, returns the PCI address of a device
116 and returns True if the address is in a physical function.
117 """
118 dev_path = "/sys/class/net/%s/device" % ifname
119 sriov_totalvfs = 0
120 if os.path.isdir(dev_path):
121 try:
122 # sriov_totalvfs contains the maximum possible VFs for this PF
123 with open(os.path.join(dev_path, _SRIOV_TOTALVFS)) as fd:
124 sriov_totalvfs = int(fd.read())
125 return (os.readlink(dev_path).strip("./"),
126 sriov_totalvfs > 0)
127 except (IOError, ValueError):
128 return os.readlink(dev_path).strip("./"), False
129 return None, False
132def is_physical_function(
133 domain: str, bus: str, slot: str, function: str,
134) -> bool:
135 dev_path = "/sys/bus/pci/devices/%(d)s:%(b)s:%(s)s.%(f)s/" % {
136 "d": domain, "b": bus, "s": slot, "f": function}
137 if os.path.isdir(dev_path):
138 try:
139 with open(dev_path + _SRIOV_TOTALVFS) as fd:
140 sriov_totalvfs = int(fd.read())
141 return sriov_totalvfs > 0
142 except (IOError, ValueError):
143 pass
144 return False
147def _get_sysfs_netdev_path(pci_addr: str, pf_interface: bool) -> str:
148 """Get the sysfs path based on the PCI address of the device.
150 Assumes a networking device - will not check for the existence of the path.
151 """
152 if pf_interface:
153 return "/sys/bus/pci/devices/%s/physfn/net" % pci_addr
154 return "/sys/bus/pci/devices/%s/net" % pci_addr
157def get_ifname_by_pci_address(
158 pci_addr: str, pf_interface: bool = False,
159) -> str:
160 """Get the interface name based on a VF's pci address.
162 The returned interface name is either the parent PF's or that of the VF
163 itself based on the argument of pf_interface.
164 """
165 dev_path = _get_sysfs_netdev_path(pci_addr, pf_interface)
166 try:
167 dev_info = os.listdir(dev_path)
168 return dev_info.pop()
169 except Exception:
170 raise exception.PciDeviceNotFoundById(id=pci_addr)
173def get_mac_by_pci_address(pci_addr: str, pf_interface: bool = False) -> str:
174 """Get the MAC address of the nic based on its PCI address.
176 Raises PciDeviceNotFoundById in case the pci device is not a NIC
177 """
178 dev_path = _get_sysfs_netdev_path(pci_addr, pf_interface)
179 if_name = get_ifname_by_pci_address(pci_addr, pf_interface)
180 addr_file = os.path.join(dev_path, if_name, 'address')
182 try:
183 with open(addr_file) as f:
184 mac = next(f).strip()
185 return mac
186 except (IOError, StopIteration) as e:
187 LOG.warning("Could not find the expected sysfs file for "
188 "determining the MAC address of the PCI device "
189 "%(addr)s. May not be a NIC. Error: %(e)s",
190 {'addr': pci_addr, 'e': e})
191 raise exception.PciDeviceNotFoundById(id=pci_addr)
194def get_vf_num_by_pci_address(pci_addr: str) -> int:
195 """Get the VF number based on a VF's pci address
197 A VF is associated with an VF number, which ip link command uses to
198 configure it. This number can be obtained from the PCI device filesystem.
199 """
200 VIRTFN_RE = re.compile(r"virtfn(\d+)")
201 virtfns_path = "/sys/bus/pci/devices/%s/physfn/virtfn*" % (pci_addr)
202 vf_num = None
204 for vf_path in glob.iglob(virtfns_path):
205 if re.search(pci_addr, os.readlink(vf_path)):
206 t = VIRTFN_RE.search(vf_path)
207 if t: 207 ↛ 204line 207 didn't jump to line 204 because the condition on line 207 was always true
208 vf_num = t.group(1)
209 break
210 else:
211 raise exception.PciDeviceNotFoundById(id=pci_addr)
213 return int(vf_num)
216def get_vf_product_id_by_pf_addr(pci_addr: str) -> str:
217 """Get the VF product ID for a given PF.
219 "Product ID" or Device ID in the PCIe spec terms for a PF is
220 possible to retrieve via the VF Device ID field present as of
221 SR-IOV 1.0 in the "3.3.11. VF Device ID (1Ah)" section. It is
222 described as a field that "contains the Device ID that should
223 be presented for every VF to the SI".
225 It is available as of Linux kernel 4.15, commit
226 7dfca15276fc3f18411a2b2182704fa1222bcb60
228 :param pci_addr: A string of the form "<domain>:<bus>:<slot>.<function>".
229 :return: A string containing a product ID of a VF corresponding to the PF.
230 """
231 sriov_vf_device_path = f"/sys/bus/pci/devices/{pci_addr}/sriov_vf_device"
232 try:
233 with open(sriov_vf_device_path) as f:
234 vf_product_id = f.readline().strip()
235 except IOError as e:
236 LOG.warning(
237 "Could not find the expected sysfs file for "
238 "determining the VF product ID of a PCI VF by PF"
239 "with addr %(addr)s. May not be a PF. Error: %(e)s",
240 {"addr": pci_addr, "e": e},
241 )
242 raise exception.PciDeviceNotFoundById(id=pci_addr)
243 if not vf_product_id:
244 raise ValueError("sriov_vf_device file does not contain"
245 " a VF product ID")
246 return vf_product_id
249def get_pci_ids_by_pci_addr(pci_addr: str) -> ty.Tuple[str, ...]:
250 """Get the product ID and vendor ID for a given PCI device.
252 :param pci_addr: A string of the form "<domain>:<bus>:<slot>.<function>".
253 :return: A list containing a vendor and product ids.
254 """
255 id_prefix = f"/sys/bus/pci/devices/{pci_addr}"
256 ids: ty.List[str] = []
257 for id_name in ("vendor", "product"):
258 try:
259 with open(os.path.join(id_prefix, id_name)) as f:
260 id_value = f.readline()
261 if not id_value:
262 raise ValueError(f"{id_name} file does not contain"
263 " a valid value")
264 ids.append(id_value.strip().replace("0x", ""))
265 except IOError as e:
266 LOG.warning(
267 "Could not find the expected sysfs file for "
268 f"determining the {id_name} ID of a PCI device "
269 "with addr %(addr)s. Error: %(e)s",
270 {"addr": pci_addr, "e": e},
271 )
272 raise exception.PciDeviceNotFoundById(id=pci_addr)
273 return tuple(ids)