Coverage for nova/pci/utils.py: 95%

119 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-17 15:08 +0000

1# Copyright (c) 2013 Intel, Inc. 

2# Copyright (c) 2012 OpenStack Foundation 

3# All Rights Reserved. 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); you may 

6# not use this file except in compliance with the License. You may obtain 

7# a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

14# License for the specific language governing permissions and limitations 

15# under the License. 

16 

17import glob 

18import os 

19import re 

20import typing as ty 

21 

22from oslo_log import log as logging 

23 

24from nova import exception 

25 

26if ty.TYPE_CHECKING: 26 ↛ 28line 26 didn't jump to line 28 because the condition on line 26 was never true

27 # avoid circular import 

28 from nova.pci import stats 

29 

30LOG = logging.getLogger(__name__) 

31 

32PCI_VENDOR_PATTERN = "^(hex{4})$".replace("hex", r"[\da-fA-F]") 

33_PCI_ADDRESS_PATTERN = ("^(hex{4}):(hex{2}):(hex{2}).(oct{1})$". 

34 replace("hex", r"[\da-fA-F]"). 

35 replace("oct", "[0-7]")) 

36_PCI_ADDRESS_REGEX = re.compile(_PCI_ADDRESS_PATTERN) 

37_SRIOV_TOTALVFS = "sriov_totalvfs" 

38 

39 

40def pci_device_prop_match( 

41 pci_dev: 'stats.Pool', specs: ty.List[ty.Dict[str, str]], 

42) -> bool: 

43 """Check if the pci_dev meet spec requirement 

44 

45 Specs is a list of PCI device property requirements. 

46 An example of device requirement that the PCI should be either: 

47 a) Device with vendor_id as 0x8086 and product_id as 0x8259, or 

48 b) Device with vendor_id as 0x10de and product_id as 0x10d8: 

49 

50 [{"vendor_id":"8086", "product_id":"8259"}, 

51 {"vendor_id":"10de", "product_id":"10d8", 

52 "capabilities_network": ["rx", "tx", "tso", "gso"]}] 

53 

54 """ 

55 

56 def _matching_devices(spec: ty.Dict[str, str]) -> bool: 

57 for k, v in spec.items(): 

58 pci_dev_v = pci_dev.get(k) 

59 if isinstance(v, list) and isinstance(pci_dev_v, list): 

60 if not all(x in pci_dev.get(k) for x in v): 

61 return False 

62 else: 

63 # We don't need to check case for tags in order to avoid any 

64 # mismatch with the tags provided by users for port 

65 # binding profile and the ones configured by operators 

66 # with pci whitelist option. 

67 if isinstance(v, str): 

68 v = v.lower() 

69 if isinstance(pci_dev_v, str): 

70 pci_dev_v = pci_dev_v.lower() 

71 if pci_dev_v != v: 

72 return False 

73 return True 

74 

75 return any(_matching_devices(spec) for spec in specs) 

76 

77 

78def parse_address(address: str) -> ty.Sequence[str]: 

79 """Parse a PCI address. 

80 

81 Returns (domain, bus, slot, function) from PCI address that is stored in 

82 PciDevice DB table. 

83 """ 

84 m = _PCI_ADDRESS_REGEX.match(address) 

85 if not m: 

86 raise exception.PciDeviceWrongAddressFormat(address=address) 

87 return m.groups() 

88 

89 

90def get_pci_address_fields(pci_addr: str) -> ty.Tuple[str, str, str, str]: 

91 """Parse a fully-specified PCI device address. 

92 

93 Does not validate that the components are valid hex or wildcard values. 

94 

95 :param pci_addr: A string of the form "<domain>:<bus>:<slot>.<function>". 

96 :return: A 4-tuple of strings ("<domain>", "<bus>", "<slot>", "<function>") 

97 """ 

98 dbs, sep, func = pci_addr.partition('.') 

99 domain, bus, slot = dbs.split(':') 

100 return domain, bus, slot, func 

101 

102 

103def get_pci_address(domain: str, bus: str, slot: str, func: str) -> str: 

104 """Assembles PCI address components into a fully-specified PCI address. 

105 

106 Does not validate that the components are valid hex or wildcard values. 

107 

108 :param domain, bus, slot, func: Hex or wildcard strings. 

109 :return: A string of the form "<domain>:<bus>:<slot>.<function>". 

110 """ 

111 return '%s:%s:%s.%s' % (domain, bus, slot, func) 

112 

113 

114def get_function_by_ifname(ifname: str) -> ty.Tuple[ty.Optional[str], bool]: 

115 """Given the device name, returns the PCI address of a device 

116 and returns True if the address is in a physical function. 

117 """ 

118 dev_path = "/sys/class/net/%s/device" % ifname 

119 sriov_totalvfs = 0 

120 if os.path.isdir(dev_path): 

121 try: 

122 # sriov_totalvfs contains the maximum possible VFs for this PF 

123 with open(os.path.join(dev_path, _SRIOV_TOTALVFS)) as fd: 

124 sriov_totalvfs = int(fd.read()) 

125 return (os.readlink(dev_path).strip("./"), 

126 sriov_totalvfs > 0) 

127 except (IOError, ValueError): 

128 return os.readlink(dev_path).strip("./"), False 

129 return None, False 

130 

131 

132def is_physical_function( 

133 domain: str, bus: str, slot: str, function: str, 

134) -> bool: 

135 dev_path = "/sys/bus/pci/devices/%(d)s:%(b)s:%(s)s.%(f)s/" % { 

136 "d": domain, "b": bus, "s": slot, "f": function} 

137 if os.path.isdir(dev_path): 

138 try: 

139 with open(dev_path + _SRIOV_TOTALVFS) as fd: 

140 sriov_totalvfs = int(fd.read()) 

141 return sriov_totalvfs > 0 

142 except (IOError, ValueError): 

143 pass 

144 return False 

145 

146 

147def _get_sysfs_netdev_path(pci_addr: str, pf_interface: bool) -> str: 

148 """Get the sysfs path based on the PCI address of the device. 

149 

150 Assumes a networking device - will not check for the existence of the path. 

151 """ 

152 if pf_interface: 

153 return "/sys/bus/pci/devices/%s/physfn/net" % pci_addr 

154 return "/sys/bus/pci/devices/%s/net" % pci_addr 

155 

156 

157def get_ifname_by_pci_address( 

158 pci_addr: str, pf_interface: bool = False, 

159) -> str: 

160 """Get the interface name based on a VF's pci address. 

161 

162 The returned interface name is either the parent PF's or that of the VF 

163 itself based on the argument of pf_interface. 

164 """ 

165 dev_path = _get_sysfs_netdev_path(pci_addr, pf_interface) 

166 try: 

167 dev_info = os.listdir(dev_path) 

168 return dev_info.pop() 

169 except Exception: 

170 raise exception.PciDeviceNotFoundById(id=pci_addr) 

171 

172 

173def get_mac_by_pci_address(pci_addr: str, pf_interface: bool = False) -> str: 

174 """Get the MAC address of the nic based on its PCI address. 

175 

176 Raises PciDeviceNotFoundById in case the pci device is not a NIC 

177 """ 

178 dev_path = _get_sysfs_netdev_path(pci_addr, pf_interface) 

179 if_name = get_ifname_by_pci_address(pci_addr, pf_interface) 

180 addr_file = os.path.join(dev_path, if_name, 'address') 

181 

182 try: 

183 with open(addr_file) as f: 

184 mac = next(f).strip() 

185 return mac 

186 except (IOError, StopIteration) as e: 

187 LOG.warning("Could not find the expected sysfs file for " 

188 "determining the MAC address of the PCI device " 

189 "%(addr)s. May not be a NIC. Error: %(e)s", 

190 {'addr': pci_addr, 'e': e}) 

191 raise exception.PciDeviceNotFoundById(id=pci_addr) 

192 

193 

194def get_vf_num_by_pci_address(pci_addr: str) -> int: 

195 """Get the VF number based on a VF's pci address 

196 

197 A VF is associated with an VF number, which ip link command uses to 

198 configure it. This number can be obtained from the PCI device filesystem. 

199 """ 

200 VIRTFN_RE = re.compile(r"virtfn(\d+)") 

201 virtfns_path = "/sys/bus/pci/devices/%s/physfn/virtfn*" % (pci_addr) 

202 vf_num = None 

203 

204 for vf_path in glob.iglob(virtfns_path): 

205 if re.search(pci_addr, os.readlink(vf_path)): 

206 t = VIRTFN_RE.search(vf_path) 

207 if t: 207 ↛ 204line 207 didn't jump to line 204 because the condition on line 207 was always true

208 vf_num = t.group(1) 

209 break 

210 else: 

211 raise exception.PciDeviceNotFoundById(id=pci_addr) 

212 

213 return int(vf_num) 

214 

215 

216def get_vf_product_id_by_pf_addr(pci_addr: str) -> str: 

217 """Get the VF product ID for a given PF. 

218 

219 "Product ID" or Device ID in the PCIe spec terms for a PF is 

220 possible to retrieve via the VF Device ID field present as of 

221 SR-IOV 1.0 in the "3.3.11. VF Device ID (1Ah)" section. It is 

222 described as a field that "contains the Device ID that should 

223 be presented for every VF to the SI". 

224 

225 It is available as of Linux kernel 4.15, commit 

226 7dfca15276fc3f18411a2b2182704fa1222bcb60 

227 

228 :param pci_addr: A string of the form "<domain>:<bus>:<slot>.<function>". 

229 :return: A string containing a product ID of a VF corresponding to the PF. 

230 """ 

231 sriov_vf_device_path = f"/sys/bus/pci/devices/{pci_addr}/sriov_vf_device" 

232 try: 

233 with open(sriov_vf_device_path) as f: 

234 vf_product_id = f.readline().strip() 

235 except IOError as e: 

236 LOG.warning( 

237 "Could not find the expected sysfs file for " 

238 "determining the VF product ID of a PCI VF by PF" 

239 "with addr %(addr)s. May not be a PF. Error: %(e)s", 

240 {"addr": pci_addr, "e": e}, 

241 ) 

242 raise exception.PciDeviceNotFoundById(id=pci_addr) 

243 if not vf_product_id: 

244 raise ValueError("sriov_vf_device file does not contain" 

245 " a VF product ID") 

246 return vf_product_id 

247 

248 

249def get_pci_ids_by_pci_addr(pci_addr: str) -> ty.Tuple[str, ...]: 

250 """Get the product ID and vendor ID for a given PCI device. 

251 

252 :param pci_addr: A string of the form "<domain>:<bus>:<slot>.<function>". 

253 :return: A list containing a vendor and product ids. 

254 """ 

255 id_prefix = f"/sys/bus/pci/devices/{pci_addr}" 

256 ids: ty.List[str] = [] 

257 for id_name in ("vendor", "product"): 

258 try: 

259 with open(os.path.join(id_prefix, id_name)) as f: 

260 id_value = f.readline() 

261 if not id_value: 

262 raise ValueError(f"{id_name} file does not contain" 

263 " a valid value") 

264 ids.append(id_value.strip().replace("0x", "")) 

265 except IOError as e: 

266 LOG.warning( 

267 "Could not find the expected sysfs file for " 

268 f"determining the {id_name} ID of a PCI device " 

269 "with addr %(addr)s. Error: %(e)s", 

270 {"addr": pci_addr, "e": e}, 

271 ) 

272 raise exception.PciDeviceNotFoundById(id=pci_addr) 

273 return tuple(ids)