Coverage for nova/virt/netutils.py: 95%

175 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-24 11:16 +0000

1# Copyright 2010 United States Government as represented by the 

2# Administrator of the National Aeronautics and Space Administration. 

3# All Rights Reserved. 

4# Copyright (c) 2010 Citrix Systems, Inc. 

5# Copyright 2013 IBM Corp. 

6# 

7# Licensed under the Apache License, Version 2.0 (the "License"); you may 

8# not use this file except in compliance with the License. You may obtain 

9# a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

16# License for the specific language governing permissions and limitations 

17# under the License. 

18 

19 

20"""Network-related utilities for supporting libvirt connection code.""" 

21 

22import os 

23 

24import jinja2 

25import netaddr 

26from oslo_utils import strutils 

27 

28import nova.conf 

29from nova.network import model 

30 

31CONF = nova.conf.CONF 

32 

33 

34def get_net_and_mask(cidr): 

35 net = netaddr.IPNetwork(cidr) 

36 return str(net.ip), str(net.netmask) 

37 

38 

39def get_net_and_prefixlen(cidr): 

40 net = netaddr.IPNetwork(cidr) 

41 return str(net.ip), str(net._prefixlen) 

42 

43 

44def get_ip_version(cidr): 

45 net = netaddr.IPNetwork(cidr) 

46 return int(net.version) 

47 

48 

49def _get_first_network(network, version): 

50 # Using a generator expression with a next() call for the first element 

51 # of a list since we don't want to evaluate the whole list as we can 

52 # have a lot of subnets 

53 try: 

54 return next(i for i in network['subnets'] 

55 if i['version'] == version) 

56 except StopIteration: 

57 pass 

58 

59 

60def get_injected_network_template(network_info, template=None, 

61 libvirt_virt_type=None): 

62 """Returns a rendered network template for the given network_info. 

63 

64 :param network_info: `nova.network.models.NetworkInfo` object describing 

65 the network metadata. 

66 :param template: Path to the interfaces template file. 

67 :param libvirt_virt_type: The Libvirt `virt_type`, will be `None` for 

68 other hypervisors.. 

69 """ 

70 if not template: 70 ↛ 73line 70 didn't jump to line 73 because the condition on line 70 was always true

71 template = CONF.injected_network_template 

72 

73 if not (network_info and template): 

74 return 

75 

76 nets = [] 

77 ifc_num = -1 

78 ipv6_is_available = False 

79 

80 for vif in network_info: 

81 if not vif['network'] or not vif['network']['subnets']: 

82 continue 

83 

84 network = vif['network'] 

85 # NOTE(bnemec): The template only supports a single subnet per 

86 # interface and I'm not sure how/if that can be fixed, so this 

87 # code only takes the first subnet of the appropriate type. 

88 subnet_v4 = _get_first_network(network, 4) 

89 subnet_v6 = _get_first_network(network, 6) 

90 

91 ifc_num += 1 

92 

93 if not network.get_meta('injected'): 

94 continue 

95 

96 hwaddress = vif.get('address') 

97 address = None 

98 netmask = None 

99 gateway = '' 

100 broadcast = None 

101 dns = None 

102 routes = [] 

103 if subnet_v4: 

104 if subnet_v4.get_meta('dhcp_server') is not None: 

105 continue 

106 

107 if subnet_v4['ips']: 107 ↛ 123line 107 didn't jump to line 123 because the condition on line 107 was always true

108 ip = subnet_v4['ips'][0] 

109 address = ip['address'] 

110 netmask = model.get_netmask(ip, subnet_v4) 

111 if subnet_v4['gateway']: 

112 gateway = subnet_v4['gateway']['address'] 

113 broadcast = str(subnet_v4.as_netaddr().broadcast) 

114 dns = ' '.join([i['address'] for i in subnet_v4['dns']]) 

115 for route_ref in subnet_v4['routes']: 

116 (net, mask) = get_net_and_mask(route_ref['cidr']) 

117 route = {'gateway': str(route_ref['gateway']['address']), 

118 'cidr': str(route_ref['cidr']), 

119 'network': net, 

120 'netmask': mask} 

121 routes.append(route) 

122 

123 address_v6 = None 

124 gateway_v6 = '' 

125 netmask_v6 = None 

126 dns_v6 = None 

127 if subnet_v6: 

128 if subnet_v6.get_meta('dhcp_server') is not None: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 continue 

130 

131 if subnet_v6['ips']: 131 ↛ 140line 131 didn't jump to line 140 because the condition on line 131 was always true

132 ipv6_is_available = True 

133 ip_v6 = subnet_v6['ips'][0] 

134 address_v6 = ip_v6['address'] 

135 netmask_v6 = model.get_netmask(ip_v6, subnet_v6) 

136 if subnet_v6['gateway']: 

137 gateway_v6 = subnet_v6['gateway']['address'] 

138 dns_v6 = ' '.join([i['address'] for i in subnet_v6['dns']]) 

139 

140 net_info = {'name': 'eth%d' % ifc_num, 

141 'hwaddress': hwaddress, 

142 'address': address, 

143 'netmask': netmask, 

144 'gateway': gateway, 

145 'broadcast': broadcast, 

146 'dns': dns, 

147 'routes': routes, 

148 'address_v6': address_v6, 

149 'gateway_v6': gateway_v6, 

150 'netmask_v6': netmask_v6, 

151 'dns_v6': dns_v6, 

152 } 

153 nets.append(net_info) 

154 

155 if not nets: 

156 return 

157 

158 tmpl_path, tmpl_file = os.path.split(template) 

159 env = jinja2.Environment( # nosec 

160 loader=jinja2.FileSystemLoader(tmpl_path), # nosec 

161 trim_blocks=True) 

162 template = env.get_template(tmpl_file) 

163 return template.render({'interfaces': nets, 

164 'use_ipv6': ipv6_is_available, 

165 'libvirt_virt_type': libvirt_virt_type}) 

166 

167 

168def get_network_metadata(network_info): 

169 """Gets a more complete representation of the instance network information. 

170 

171 This data is exposed as network_data.json in the metadata service and 

172 the config drive. 

173 

174 :param network_info: `nova.network.models.NetworkInfo` object describing 

175 the network metadata. 

176 """ 

177 if not network_info: 

178 return 

179 

180 # IPv4 or IPv6 networks 

181 nets = [] 

182 # VIFs, physical NICs, or VLANs. Physical NICs will have type 'phy'. 

183 links = [] 

184 # Non-network bound services, such as DNS 

185 services = [] 

186 ifc_num = -1 

187 net_num = -1 

188 

189 for vif in network_info: 

190 if not vif.get('network') or not vif['network'].get('subnets'): 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true

191 continue 

192 

193 network = vif['network'] 

194 # NOTE(JoshNang) currently, only supports the first IPv4 and first 

195 # IPv6 subnet on network, a limitation that also exists in the 

196 # network template. 

197 subnet_v4 = _get_first_network(network, 4) 

198 subnet_v6 = _get_first_network(network, 6) 

199 

200 ifc_num += 1 

201 link = None 

202 

203 # Get the VIF or physical NIC data 

204 if subnet_v4 or subnet_v6: 204 ↛ 209line 204 didn't jump to line 209 because the condition on line 204 was always true

205 link = _get_eth_link(vif, ifc_num) 

206 links.append(link) 

207 

208 # Add IPv4 and IPv6 networks if they exist 

209 if subnet_v4 and subnet_v4.get('ips'): 

210 net_num += 1 

211 nets.append(_get_nets(vif, subnet_v4, 4, net_num, link['id'])) 

212 services += [dns for dns in _get_dns_services(subnet_v4) 

213 if dns not in services] 

214 if subnet_v6 and subnet_v6.get('ips'): 

215 net_num += 1 

216 nets.append(_get_nets(vif, subnet_v6, 6, net_num, link['id'])) 

217 services += [dns for dns in _get_dns_services(subnet_v6) 

218 if dns not in services] 

219 

220 return { 

221 "links": links, 

222 "networks": nets, 

223 "services": services 

224 } 

225 

226 

227def get_ec2_ip_info(network_info): 

228 if not isinstance(network_info, model.NetworkInfo): 

229 network_info = model.NetworkInfo.hydrate(network_info) 

230 

231 ip_info = {} 

232 fixed_ips = network_info.fixed_ips() 

233 ip_info['fixed_ips'] = [ 

234 ip['address'] for ip in fixed_ips if ip['version'] == 4] 

235 ip_info['fixed_ip6s'] = [ 

236 ip['address'] for ip in fixed_ips if ip['version'] == 6] 

237 ip_info['floating_ips'] = [ 

238 ip['address'] for ip in network_info.floating_ips()] 

239 

240 return ip_info 

241 

242 

243def _get_eth_link(vif, ifc_num): 

244 """Get a VIF or physical NIC representation. 

245 

246 :param vif: Neutron VIF 

247 :param ifc_num: Interface index for generating name if the VIF's 

248 'devname' isn't defined. 

249 :return: A dict with 'id', 'vif_id', 'type', 'mtu' and 

250 'ethernet_mac_address' as keys 

251 """ 

252 link_id = vif.get('devname') 

253 if not link_id: 253 ↛ 257line 253 didn't jump to line 257 because the condition on line 253 was always true

254 link_id = 'interface%d' % ifc_num 

255 

256 # Use 'phy' for physical links. Ethernet can be confusing 

257 if vif.get('type') in model.LEGACY_EXPOSED_VIF_TYPES: 

258 nic_type = vif.get('type') 

259 else: 

260 nic_type = 'phy' 

261 

262 link = { 

263 'id': link_id, 

264 'vif_id': vif['id'], 

265 'type': nic_type, 

266 'mtu': _get_link_mtu(vif), 

267 'ethernet_mac_address': vif.get('address'), 

268 } 

269 return link 

270 

271 

272def _get_link_mtu(vif): 

273 for subnet in vif['network']['subnets']: 

274 if subnet['meta'].get('dhcp_server'): 

275 return None 

276 return vif['network']['meta'].get('mtu') 

277 

278 

279def _get_nets(vif, subnet, version, net_num, link_id): 

280 """Get networks for the given VIF and subnet 

281 

282 :param vif: Neutron VIF 

283 :param subnet: Neutron subnet 

284 :param version: IP version as an int, either '4' or '6' 

285 :param net_num: Network index for generating name of each network 

286 :param link_id: Arbitrary identifier for the link the networks are 

287 attached to 

288 """ 

289 net_type = '' 

290 if subnet.get_meta('ipv6_address_mode') is not None: 

291 net_type = '_%s' % subnet.get_meta('ipv6_address_mode') 

292 elif (subnet.get_meta('dhcp_server') is not None or 

293 subnet.get_meta('enable_dhcp')): 

294 net_info = { 

295 'id': 'network%d' % net_num, 

296 'type': 'ipv%d_dhcp' % version, 

297 'link': link_id, 

298 'network_id': vif['network']['id'] 

299 } 

300 return net_info 

301 

302 ip = subnet['ips'][0] 

303 address = ip['address'] 

304 if version == 4: 

305 netmask = model.get_netmask(ip, subnet) 

306 elif version == 6: 306 ↛ 309line 306 didn't jump to line 309 because the condition on line 306 was always true

307 netmask = str(subnet.as_netaddr().netmask) 

308 

309 net_info = { 

310 'id': 'network%d' % net_num, 

311 'type': 'ipv%d%s' % (version, net_type), 

312 'link': link_id, 

313 'ip_address': address, 

314 'netmask': netmask, 

315 'routes': _get_default_route(version, subnet), 

316 'network_id': vif['network']['id'] 

317 } 

318 

319 # Add any additional routes beyond the default route 

320 for route in subnet['routes']: 

321 route_addr = netaddr.IPNetwork(route['cidr']) 

322 new_route = { 

323 'network': str(route_addr.network), 

324 'netmask': str(route_addr.netmask), 

325 'gateway': route['gateway']['address'] 

326 } 

327 net_info['routes'].append(new_route) 

328 

329 net_info['services'] = _get_dns_services(subnet) 

330 

331 return net_info 

332 

333 

334def _get_default_route(version, subnet): 

335 """Get a default route for a network 

336 

337 :param version: IP version as an int, either '4' or '6' 

338 :param subnet: Neutron subnet 

339 """ 

340 if subnet.get('gateway') and subnet['gateway'].get('address'): 

341 gateway = subnet['gateway']['address'] 

342 else: 

343 return [] 

344 

345 if version == 4: 

346 return [{ 

347 'network': '0.0.0.0', 

348 'netmask': '0.0.0.0', 

349 'gateway': gateway 

350 }] 

351 elif version == 6: 351 ↛ exitline 351 didn't return from function '_get_default_route' because the condition on line 351 was always true

352 return [{ 

353 'network': '::', 

354 'netmask': '::', 

355 'gateway': gateway 

356 }] 

357 

358 

359def _get_dns_services(subnet): 

360 """Get the DNS servers for the subnet.""" 

361 services = [] 

362 if not subnet.get('dns'): 

363 return services 

364 return [{'type': 'dns', 'address': ip.get('address')} 

365 for ip in subnet['dns']] 

366 

367 

368def get_cached_vifs_with_vlan(network_info): 

369 """Generates a dict from a list of VIFs that has a vlan tag, with 

370 MAC, VLAN as a key, value. 

371 """ 

372 if network_info is None: 

373 return {} 

374 return {vif['address']: vif['details']['vlan'] for vif in network_info 

375 if vif.get('details', {}).get('vlan')} 

376 

377 

378def get_cached_vifs_with_trusted(network_info): 

379 """Generates a dict from a list of VIFs that trusted, MAC as key""" 

380 if network_info is None: 

381 return {} 

382 return {vif['address']: strutils.bool_from_string( 

383 vif['profile'].get('trusted', 'False')) for vif in network_info 

384 if vif.get('profile')}