Coverage for nova/virt/libvirt/volume/lightos.py: 85%
26 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-24 11:16 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-24 11:16 +0000
1# Copyright (C) 2016-2020 Lightbits Labs Ltd.
2# Copyright (C) 2020 Intel Corporation
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
17import nova.conf
18from nova import utils
19from nova.virt.libvirt.volume import volume as libvirt_volume
20from os_brick import initiator
21from os_brick.initiator import connector
22from oslo_log import log as logging
25LOG = logging.getLogger(__name__)
26CONF = nova.conf.CONF
29class LibvirtLightOSVolumeDriver(libvirt_volume.LibvirtVolumeDriver):
30 """Driver to attach NVMe volumes to libvirt."""
31 VERSION = '2.3.12'
33 def __init__(self, connection):
34 super(LibvirtLightOSVolumeDriver, self).__init__(connection)
35 self.connector = connector.InitiatorConnector.factory(
36 initiator.LIGHTOS,
37 root_helper=utils.get_root_helper(),
38 device_scan_attempts=CONF.libvirt.num_nvme_discover_tries)
40 def connect_volume(self, connection_info, instance):
41 device_info = self.connector.connect_volume(connection_info['data'])
42 LOG.debug("Connecting NVMe volume with device_info %s", device_info)
43 connection_info['data']['device_path'] = device_info['path']
45 def disconnect_volume(self, connection_info, instance, force=False):
46 """Detach the volume from the instance."""
47 LOG.debug("Disconnecting NVMe disk. instance:%s, volume_id:%s",
48 connection_info.get("instance", ""),
49 connection_info.get("volume_id", ""))
50 self.connector.disconnect_volume(
51 connection_info['data'], None, force=force)
52 super(LibvirtLightOSVolumeDriver, self).disconnect_volume(
53 connection_info, instance, force=force)
55 def extend_volume(self, connection_info, instance, requested_size):
56 """Extend the volume."""
57 LOG.debug("calling os-brick to extend LightOS Volume."
58 "instance:%s, volume_id:%s",
59 connection_info.get("instance", ""),
60 connection_info.get("volume_id", ""))
61 new_size = self.connector.extend_volume(connection_info['data'])
62 LOG.debug("Extend LightOS Volume %s; new_size=%s",
63 connection_info['data']['device_path'], new_size)
64 return new_size