Coverage for nova/crypto.py: 92%
142 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
17"""Wrappers around standard crypto data elements.
19Includes root and intermediate CAs, SSH key_pairs and x509 certificates.
20"""
22import base64
23import binascii
24import hashlib
25import io
26import os
27import typing as ty
29from castellan.common import exception as castellan_exception
30from castellan.common.objects import passphrase
31from castellan import key_manager
32from cryptography.hazmat import backends
33from cryptography.hazmat.primitives.asymmetric import padding
34from cryptography.hazmat.primitives import hashes
35from cryptography.hazmat.primitives import serialization
36from cryptography import x509
37from oslo_concurrency import processutils
38from oslo_log import log as logging
39from oslo_serialization import base64 as oslo_base64
40import paramiko
42import nova.conf
43from nova import context as nova_context
44from nova import exception
45from nova.i18n import _
46from nova import objects
47from nova import utils
48from nova.virt import block_device as driver_block_device
51LOG = logging.getLogger(__name__)
53CONF = nova.conf.CONF
55_KEYMGR = None
57_VTPM_SECRET_BYTE_LENGTH = 384
59_EPHEMERAL_ENCRYPTION_SECRET_BYTE_LENGTH = 64
62def _get_key_manager():
63 global _KEYMGR
64 if _KEYMGR is None:
65 _KEYMGR = key_manager.API(configuration=CONF)
66 return _KEYMGR
69def generate_fingerprint(public_key: str) -> str:
70 try:
71 pub_bytes = public_key.encode('utf-8')
72 # Test that the given public_key string is a proper ssh key. The
73 # returned object is unused since pyca/cryptography does not have a
74 # fingerprint method.
75 serialization.load_ssh_public_key(
76 pub_bytes, backends.default_backend())
77 pub_data = base64.b64decode(public_key.split(' ')[1])
78 raw_fp = hashlib.md5(pub_data, usedforsecurity=False).hexdigest()
79 return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
80 except Exception:
81 raise exception.InvalidKeypair(
82 reason=_('failed to generate fingerprint'))
85def generate_x509_fingerprint(pem_key: ty.Union[bytes, str]) -> str:
86 try:
87 if isinstance(pem_key, str): 87 ↛ 89line 87 didn't jump to line 89 because the condition on line 87 was always true
88 pem_key = pem_key.encode('utf-8')
89 cert = x509.load_pem_x509_certificate(
90 pem_key, backends.default_backend())
91 raw_fp = binascii.hexlify(
92 cert.fingerprint(hashes.SHA1())
93 ).decode('ascii')
94 return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
95 except (ValueError, TypeError, binascii.Error) as ex:
96 raise exception.InvalidKeypair(
97 reason=_('failed to generate X509 fingerprint. '
98 'Error message: %s') % ex)
101def generate_key_pair(bits: int = 2048) -> ty.Tuple[str, str, str]:
102 key = paramiko.RSAKey.generate(bits)
103 keyout = io.StringIO()
104 key.write_private_key(keyout)
105 private_key = keyout.getvalue()
106 public_key = '%s %s Generated-by-Nova' % (key.get_name(), key.get_base64())
107 fingerprint = generate_fingerprint(public_key)
108 return (private_key, public_key, fingerprint)
111def ssh_encrypt_text(ssh_public_key: str, text: ty.Union[str, bytes]) -> bytes:
112 """Encrypt text with an ssh public key.
114 If text is a Unicode string, encode it to UTF-8.
115 """
116 if isinstance(text, str): 116 ↛ 118line 116 didn't jump to line 118 because the condition on line 116 was always true
117 text = text.encode('utf-8')
118 try:
119 pub_bytes = ssh_public_key.encode('utf-8')
120 pub_key = serialization.load_ssh_public_key(
121 pub_bytes, backends.default_backend())
122 return pub_key.encrypt(text, padding.PKCS1v15())
123 except Exception as exc:
124 raise exception.EncryptionFailure(reason=str(exc))
127def generate_winrm_x509_cert(
128 user_id: str,
129 bits: int = 2048
130) -> ty.Tuple[str, str, str]:
131 """Generate a cert for passwordless auth for user in project."""
132 subject = '/CN=%s' % user_id
133 upn = '%s@localhost' % user_id
135 with utils.tempdir() as tmpdir:
136 keyfile = os.path.abspath(os.path.join(tmpdir, 'temp.key'))
137 conffile = os.path.abspath(os.path.join(tmpdir, 'temp.conf'))
139 _create_x509_openssl_config(conffile, upn)
141 out, _ = processutils.execute(
142 'openssl', 'req', '-x509', '-nodes', '-days', '3650',
143 '-config', conffile, '-newkey', 'rsa:%s' % bits,
144 '-outform', 'PEM', '-keyout', keyfile, '-subj', subject,
145 '-extensions', 'v3_req_client',
146 binary=True)
148 certificate = out.decode('utf-8')
150 out, _ = processutils.execute(
151 'openssl', 'pkcs12', '-export', '-inkey', keyfile, '-password',
152 'pass:', process_input=out, binary=True)
154 private_key = base64.b64encode(out).decode('ascii')
155 fingerprint = generate_x509_fingerprint(certificate)
157 return (private_key, certificate, fingerprint)
160def _create_x509_openssl_config(conffile: str, upn: str):
161 content = ("distinguished_name = req_distinguished_name\n"
162 "[req_distinguished_name]\n"
163 "[v3_req_client]\n"
164 "extendedKeyUsage = clientAuth\n"
165 "subjectAltName = otherName:""1.3.6.1.4.1.311.20.2.3;UTF8:%s\n")
167 with open(conffile, 'w') as file:
168 file.write(content % upn)
171def ensure_vtpm_secret(
172 context: nova_context.RequestContext,
173 instance: 'objects.Instance',
174) -> ty.Tuple[str, str]:
175 """Communicates with the key manager service to retrieve or create a secret
176 for an instance's emulated TPM.
178 When creating a secret, its UUID is saved to the instance's system_metadata
179 as ``vtpm_secret_uuid``.
181 :param context: Nova auth context.
182 :param instance: Instance object.
183 :return: A tuple comprising (secret_uuid, passphrase).
184 :raise: castellan_exception.ManagedObjectNotFoundError if communication
185 with the key manager API fails, or if a vtpm_secret_uuid was present in
186 the instance's system metadata but could not be found in the key
187 manager service.
188 """
189 key_mgr = _get_key_manager()
191 secret_uuid = instance.system_metadata.get('vtpm_secret_uuid')
192 if secret_uuid is not None:
193 # Try to retrieve the secret from the key manager
194 try:
195 secret = key_mgr.get(context, secret_uuid)
196 # assert secret_uuid == secret.id ?
197 LOG.debug(
198 "Found existing vTPM secret with UUID %s.",
199 secret_uuid, instance=instance)
200 return secret.id, secret.get_encoded()
201 except castellan_exception.ManagedObjectNotFoundError:
202 LOG.warning(
203 "Despite being set on the instance, failed to find a vTPM "
204 "secret with UUID %s. This should only happen if the secret "
205 "was manually deleted from the key manager service. Your vTPM "
206 "is likely to be unrecoverable.",
207 secret_uuid, instance=instance)
208 raise
210 # If we get here, the instance has no vtpm_secret_uuid. Create a new one
211 # and register it with the key manager.
212 secret = base64.b64encode(os.urandom(_VTPM_SECRET_BYTE_LENGTH))
213 # Castellan ManagedObject
214 cmo = passphrase.Passphrase(
215 secret, name="vTPM secret for instance %s" % instance.uuid)
216 secret_uuid = key_mgr.store(context, cmo)
217 LOG.debug("Created vTPM secret with UUID %s",
218 secret_uuid, instance=instance)
220 instance.system_metadata['vtpm_secret_uuid'] = secret_uuid
221 instance.save()
222 return secret_uuid, secret
225def delete_vtpm_secret(
226 context: nova_context.RequestContext,
227 instance: 'objects.Instance',
228):
229 """Communicates with the key manager service to destroy the secret for an
230 instance's emulated TPM.
232 This operation is idempotent: if the instance never had a vTPM secret, OR
233 if the secret has already been deleted, it is a no-op.
235 The ``vtpm_secret_uuid`` member of the instance's system_metadata is
236 cleared as a side effect of this method.
238 :param context: Nova auth context.
239 :param instance: Instance object.
240 :return: None
241 :raise: castellan_exception.ManagedObjectNotFoundError if communication
242 with the key manager API.
243 """
244 secret_uuid = instance.system_metadata.get('vtpm_secret_uuid')
245 if not secret_uuid:
246 return
248 key_mgr = _get_key_manager()
249 try:
250 key_mgr.delete(context, secret_uuid)
251 LOG.debug("Deleted vTPM secret with UUID %s",
252 secret_uuid, instance=instance)
253 except castellan_exception.ManagedObjectNotFoundError:
254 LOG.debug("vTPM secret with UUID %s already deleted or never existed.",
255 secret_uuid, instance=instance)
257 del instance.system_metadata['vtpm_secret_uuid']
258 instance.save()
261def create_encryption_secret(
262 context: nova_context.RequestContext,
263 instance: 'objects.Instance',
264 driver_bdm: 'driver_block_device.DriverBlockDevice',
265 for_detail: ty.Optional[str] = None,
266):
267 # Use oslo.serialization to encode some random data as passphrase
268 secret = oslo_base64.encode_as_text(
269 os.urandom(_EPHEMERAL_ENCRYPTION_SECRET_BYTE_LENGTH))
270 if for_detail is None: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true
271 for_detail = f"instance {instance.uuid} BDM {driver_bdm['uuid']}"
272 secret_name = f'Ephemeral encryption secret for {for_detail}'
273 cmo = passphrase.Passphrase(secret, name=secret_name)
274 key_mgr = _get_key_manager()
275 secret_uuid = key_mgr.store(context, cmo)
276 LOG.debug(
277 f'Created "{secret_name}" with UUID {secret_uuid}',
278 instance=instance
279 )
280 return secret_uuid, secret
283def get_encryption_secret(
284 context: nova_context.RequestContext,
285 secret_uuid: str,
286) -> ty.Optional[str]:
287 key_mgr = _get_key_manager()
288 try:
289 key = key_mgr.get(context, secret_uuid)
290 LOG.debug(f"Retrieved secret with UUID {secret_uuid}")
291 return key.get_encoded()
292 except castellan_exception.ManagedObjectNotFoundError:
293 LOG.debug(f"Encryption secret with UUID {secret_uuid} was not found.")
294 return None
297def delete_encryption_secret(
298 context: nova_context.RequestContext,
299 instance_uuid: str,
300 secret_uuid: str,
301):
302 key_mgr = _get_key_manager()
303 try:
304 key_mgr.delete(context, secret_uuid)
305 LOG.debug(f"Deleted secret with UUID {secret_uuid}",
306 instance_uuid=instance_uuid)
307 except castellan_exception.ManagedObjectNotFoundError:
308 LOG.debug(f"Encryption secret with UUID {secret_uuid} already deleted "
309 "or never existed.", instance_uuid=instance_uuid)