Coverage for nova/virt/netutils.py: 95%
175 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-24 11:16 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-24 11:16 +0000
1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# All Rights Reserved.
4# Copyright (c) 2010 Citrix Systems, Inc.
5# Copyright 2013 IBM Corp.
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may
8# not use this file except in compliance with the License. You may obtain
9# a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations
17# under the License.
20"""Network-related utilities for supporting libvirt connection code."""
22import os
24import jinja2
25import netaddr
26from oslo_utils import strutils
28import nova.conf
29from nova.network import model
31CONF = nova.conf.CONF
34def get_net_and_mask(cidr):
35 net = netaddr.IPNetwork(cidr)
36 return str(net.ip), str(net.netmask)
39def get_net_and_prefixlen(cidr):
40 net = netaddr.IPNetwork(cidr)
41 return str(net.ip), str(net._prefixlen)
44def get_ip_version(cidr):
45 net = netaddr.IPNetwork(cidr)
46 return int(net.version)
49def _get_first_network(network, version):
50 # Using a generator expression with a next() call for the first element
51 # of a list since we don't want to evaluate the whole list as we can
52 # have a lot of subnets
53 try:
54 return next(i for i in network['subnets']
55 if i['version'] == version)
56 except StopIteration:
57 pass
60def get_injected_network_template(network_info, template=None,
61 libvirt_virt_type=None):
62 """Returns a rendered network template for the given network_info.
64 :param network_info: `nova.network.models.NetworkInfo` object describing
65 the network metadata.
66 :param template: Path to the interfaces template file.
67 :param libvirt_virt_type: The Libvirt `virt_type`, will be `None` for
68 other hypervisors..
69 """
70 if not template: 70 ↛ 73line 70 didn't jump to line 73 because the condition on line 70 was always true
71 template = CONF.injected_network_template
73 if not (network_info and template):
74 return
76 nets = []
77 ifc_num = -1
78 ipv6_is_available = False
80 for vif in network_info:
81 if not vif['network'] or not vif['network']['subnets']:
82 continue
84 network = vif['network']
85 # NOTE(bnemec): The template only supports a single subnet per
86 # interface and I'm not sure how/if that can be fixed, so this
87 # code only takes the first subnet of the appropriate type.
88 subnet_v4 = _get_first_network(network, 4)
89 subnet_v6 = _get_first_network(network, 6)
91 ifc_num += 1
93 if not network.get_meta('injected'):
94 continue
96 hwaddress = vif.get('address')
97 address = None
98 netmask = None
99 gateway = ''
100 broadcast = None
101 dns = None
102 routes = []
103 if subnet_v4:
104 if subnet_v4.get_meta('dhcp_server') is not None:
105 continue
107 if subnet_v4['ips']: 107 ↛ 123line 107 didn't jump to line 123 because the condition on line 107 was always true
108 ip = subnet_v4['ips'][0]
109 address = ip['address']
110 netmask = model.get_netmask(ip, subnet_v4)
111 if subnet_v4['gateway']:
112 gateway = subnet_v4['gateway']['address']
113 broadcast = str(subnet_v4.as_netaddr().broadcast)
114 dns = ' '.join([i['address'] for i in subnet_v4['dns']])
115 for route_ref in subnet_v4['routes']:
116 (net, mask) = get_net_and_mask(route_ref['cidr'])
117 route = {'gateway': str(route_ref['gateway']['address']),
118 'cidr': str(route_ref['cidr']),
119 'network': net,
120 'netmask': mask}
121 routes.append(route)
123 address_v6 = None
124 gateway_v6 = ''
125 netmask_v6 = None
126 dns_v6 = None
127 if subnet_v6:
128 if subnet_v6.get_meta('dhcp_server') is not None: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true
129 continue
131 if subnet_v6['ips']: 131 ↛ 140line 131 didn't jump to line 140 because the condition on line 131 was always true
132 ipv6_is_available = True
133 ip_v6 = subnet_v6['ips'][0]
134 address_v6 = ip_v6['address']
135 netmask_v6 = model.get_netmask(ip_v6, subnet_v6)
136 if subnet_v6['gateway']:
137 gateway_v6 = subnet_v6['gateway']['address']
138 dns_v6 = ' '.join([i['address'] for i in subnet_v6['dns']])
140 net_info = {'name': 'eth%d' % ifc_num,
141 'hwaddress': hwaddress,
142 'address': address,
143 'netmask': netmask,
144 'gateway': gateway,
145 'broadcast': broadcast,
146 'dns': dns,
147 'routes': routes,
148 'address_v6': address_v6,
149 'gateway_v6': gateway_v6,
150 'netmask_v6': netmask_v6,
151 'dns_v6': dns_v6,
152 }
153 nets.append(net_info)
155 if not nets:
156 return
158 tmpl_path, tmpl_file = os.path.split(template)
159 env = jinja2.Environment( # nosec
160 loader=jinja2.FileSystemLoader(tmpl_path), # nosec
161 trim_blocks=True)
162 template = env.get_template(tmpl_file)
163 return template.render({'interfaces': nets,
164 'use_ipv6': ipv6_is_available,
165 'libvirt_virt_type': libvirt_virt_type})
168def get_network_metadata(network_info):
169 """Gets a more complete representation of the instance network information.
171 This data is exposed as network_data.json in the metadata service and
172 the config drive.
174 :param network_info: `nova.network.models.NetworkInfo` object describing
175 the network metadata.
176 """
177 if not network_info:
178 return
180 # IPv4 or IPv6 networks
181 nets = []
182 # VIFs, physical NICs, or VLANs. Physical NICs will have type 'phy'.
183 links = []
184 # Non-network bound services, such as DNS
185 services = []
186 ifc_num = -1
187 net_num = -1
189 for vif in network_info:
190 if not vif.get('network') or not vif['network'].get('subnets'): 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true
191 continue
193 network = vif['network']
194 # NOTE(JoshNang) currently, only supports the first IPv4 and first
195 # IPv6 subnet on network, a limitation that also exists in the
196 # network template.
197 subnet_v4 = _get_first_network(network, 4)
198 subnet_v6 = _get_first_network(network, 6)
200 ifc_num += 1
201 link = None
203 # Get the VIF or physical NIC data
204 if subnet_v4 or subnet_v6: 204 ↛ 209line 204 didn't jump to line 209 because the condition on line 204 was always true
205 link = _get_eth_link(vif, ifc_num)
206 links.append(link)
208 # Add IPv4 and IPv6 networks if they exist
209 if subnet_v4 and subnet_v4.get('ips'):
210 net_num += 1
211 nets.append(_get_nets(vif, subnet_v4, 4, net_num, link['id']))
212 services += [dns for dns in _get_dns_services(subnet_v4)
213 if dns not in services]
214 if subnet_v6 and subnet_v6.get('ips'):
215 net_num += 1
216 nets.append(_get_nets(vif, subnet_v6, 6, net_num, link['id']))
217 services += [dns for dns in _get_dns_services(subnet_v6)
218 if dns not in services]
220 return {
221 "links": links,
222 "networks": nets,
223 "services": services
224 }
227def get_ec2_ip_info(network_info):
228 if not isinstance(network_info, model.NetworkInfo):
229 network_info = model.NetworkInfo.hydrate(network_info)
231 ip_info = {}
232 fixed_ips = network_info.fixed_ips()
233 ip_info['fixed_ips'] = [
234 ip['address'] for ip in fixed_ips if ip['version'] == 4]
235 ip_info['fixed_ip6s'] = [
236 ip['address'] for ip in fixed_ips if ip['version'] == 6]
237 ip_info['floating_ips'] = [
238 ip['address'] for ip in network_info.floating_ips()]
240 return ip_info
243def _get_eth_link(vif, ifc_num):
244 """Get a VIF or physical NIC representation.
246 :param vif: Neutron VIF
247 :param ifc_num: Interface index for generating name if the VIF's
248 'devname' isn't defined.
249 :return: A dict with 'id', 'vif_id', 'type', 'mtu' and
250 'ethernet_mac_address' as keys
251 """
252 link_id = vif.get('devname')
253 if not link_id: 253 ↛ 257line 253 didn't jump to line 257 because the condition on line 253 was always true
254 link_id = 'interface%d' % ifc_num
256 # Use 'phy' for physical links. Ethernet can be confusing
257 if vif.get('type') in model.LEGACY_EXPOSED_VIF_TYPES:
258 nic_type = vif.get('type')
259 else:
260 nic_type = 'phy'
262 link = {
263 'id': link_id,
264 'vif_id': vif['id'],
265 'type': nic_type,
266 'mtu': _get_link_mtu(vif),
267 'ethernet_mac_address': vif.get('address'),
268 }
269 return link
272def _get_link_mtu(vif):
273 for subnet in vif['network']['subnets']:
274 if subnet['meta'].get('dhcp_server'):
275 return None
276 return vif['network']['meta'].get('mtu')
279def _get_nets(vif, subnet, version, net_num, link_id):
280 """Get networks for the given VIF and subnet
282 :param vif: Neutron VIF
283 :param subnet: Neutron subnet
284 :param version: IP version as an int, either '4' or '6'
285 :param net_num: Network index for generating name of each network
286 :param link_id: Arbitrary identifier for the link the networks are
287 attached to
288 """
289 net_type = ''
290 if subnet.get_meta('ipv6_address_mode') is not None:
291 net_type = '_%s' % subnet.get_meta('ipv6_address_mode')
292 elif (subnet.get_meta('dhcp_server') is not None or
293 subnet.get_meta('enable_dhcp')):
294 net_info = {
295 'id': 'network%d' % net_num,
296 'type': 'ipv%d_dhcp' % version,
297 'link': link_id,
298 'network_id': vif['network']['id']
299 }
300 return net_info
302 ip = subnet['ips'][0]
303 address = ip['address']
304 if version == 4:
305 netmask = model.get_netmask(ip, subnet)
306 elif version == 6: 306 ↛ 309line 306 didn't jump to line 309 because the condition on line 306 was always true
307 netmask = str(subnet.as_netaddr().netmask)
309 net_info = {
310 'id': 'network%d' % net_num,
311 'type': 'ipv%d%s' % (version, net_type),
312 'link': link_id,
313 'ip_address': address,
314 'netmask': netmask,
315 'routes': _get_default_route(version, subnet),
316 'network_id': vif['network']['id']
317 }
319 # Add any additional routes beyond the default route
320 for route in subnet['routes']:
321 route_addr = netaddr.IPNetwork(route['cidr'])
322 new_route = {
323 'network': str(route_addr.network),
324 'netmask': str(route_addr.netmask),
325 'gateway': route['gateway']['address']
326 }
327 net_info['routes'].append(new_route)
329 net_info['services'] = _get_dns_services(subnet)
331 return net_info
334def _get_default_route(version, subnet):
335 """Get a default route for a network
337 :param version: IP version as an int, either '4' or '6'
338 :param subnet: Neutron subnet
339 """
340 if subnet.get('gateway') and subnet['gateway'].get('address'):
341 gateway = subnet['gateway']['address']
342 else:
343 return []
345 if version == 4:
346 return [{
347 'network': '0.0.0.0',
348 'netmask': '0.0.0.0',
349 'gateway': gateway
350 }]
351 elif version == 6: 351 ↛ exitline 351 didn't return from function '_get_default_route' because the condition on line 351 was always true
352 return [{
353 'network': '::',
354 'netmask': '::',
355 'gateway': gateway
356 }]
359def _get_dns_services(subnet):
360 """Get the DNS servers for the subnet."""
361 services = []
362 if not subnet.get('dns'):
363 return services
364 return [{'type': 'dns', 'address': ip.get('address')}
365 for ip in subnet['dns']]
368def get_cached_vifs_with_vlan(network_info):
369 """Generates a dict from a list of VIFs that has a vlan tag, with
370 MAC, VLAN as a key, value.
371 """
372 if network_info is None:
373 return {}
374 return {vif['address']: vif['details']['vlan'] for vif in network_info
375 if vif.get('details', {}).get('vlan')}
378def get_cached_vifs_with_trusted(network_info):
379 """Generates a dict from a list of VIFs that trusted, MAC as key"""
380 if network_info is None:
381 return {}
382 return {vif['address']: strutils.bool_from_string(
383 vif['profile'].get('trusted', 'False')) for vif in network_info
384 if vif.get('profile')}