Coverage for nova/virt/libvirt/volume/nvme.py: 84%

25 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-17 15:08 +0000

1# Licensed under the Apache License, Version 2.0 (the "License"); you may 

2# not use this file except in compliance with the License. You may obtain 

3# a copy of the License at 

4# 

5# http://www.apache.org/licenses/LICENSE-2.0 

6# 

7# Unless required by applicable law or agreed to in writing, software 

8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

10# License for the specific language governing permissions and limitations 

11# under the License. 

12 

13import nova.conf 

14from nova import utils 

15from nova.virt.libvirt.volume import volume as libvirt_volume 

16 

17from os_brick import initiator 

18from os_brick.initiator import connector 

19 

20from oslo_log import log as logging 

21 

22LOG = logging.getLogger(__name__) 

23 

24CONF = nova.conf.CONF 

25 

26 

27class LibvirtNVMEVolumeDriver(libvirt_volume.LibvirtVolumeDriver): 

28 """Driver to attach NVMe volumes to libvirt.""" 

29 

30 def __init__(self, connection): 

31 super(LibvirtNVMEVolumeDriver, 

32 self).__init__(connection) 

33 

34 self.connector = connector.InitiatorConnector.factory( 

35 initiator.NVME, utils.get_root_helper(), 

36 use_multipath=CONF.libvirt.volume_use_multipath, 

37 device_scan_attempts=CONF.libvirt.num_nvme_discover_tries, 

38 enforce_multipath=CONF.libvirt.volume_enforce_multipath) 

39 

40 def connect_volume(self, connection_info, instance): 

41 

42 device_info = self.connector.connect_volume( 

43 connection_info['data']) 

44 LOG.debug("Connecting NVMe volume with device_info %s", 

45 device_info, instance=instance) 

46 

47 connection_info['data']['device_path'] = device_info['path'] 

48 

49 def disconnect_volume(self, connection_info, instance, force=False): 

50 """Detach the volume from the instance.""" 

51 LOG.debug("Disconnecting NVMe disk", instance=instance) 

52 self.connector.disconnect_volume( 

53 connection_info['data'], None, force=force) 

54 super(LibvirtNVMEVolumeDriver, 

55 self).disconnect_volume(connection_info, instance, force=force) 

56 

57 def extend_volume(self, connection_info, instance, requested_size): 

58 """Extend the volume.""" 

59 LOG.debug("calling os-brick to extend NVMe Volume", instance=instance) 

60 new_size = self.connector.extend_volume(connection_info['data']) 

61 LOG.debug("Extend NVMe Volume %s; new_size=%s", 

62 connection_info['data']['device_path'], 

63 new_size, instance=instance) 

64 return new_size