Coverage for nova/crypto.py: 92%

142 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-24 11:16 +0000

1# Copyright 2010 United States Government as represented by the 

2# Administrator of the National Aeronautics and Space Administration. 

3# All Rights Reserved. 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); you may 

6# not use this file except in compliance with the License. You may obtain 

7# a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

14# License for the specific language governing permissions and limitations 

15# under the License. 

16 

17"""Wrappers around standard crypto data elements. 

18 

19Includes root and intermediate CAs, SSH key_pairs and x509 certificates. 

20""" 

21 

22import base64 

23import binascii 

24import hashlib 

25import io 

26import os 

27import typing as ty 

28 

29from castellan.common import exception as castellan_exception 

30from castellan.common.objects import passphrase 

31from castellan import key_manager 

32from cryptography.hazmat import backends 

33from cryptography.hazmat.primitives.asymmetric import padding 

34from cryptography.hazmat.primitives import hashes 

35from cryptography.hazmat.primitives import serialization 

36from cryptography import x509 

37from oslo_concurrency import processutils 

38from oslo_log import log as logging 

39from oslo_serialization import base64 as oslo_base64 

40import paramiko 

41 

42import nova.conf 

43from nova import context as nova_context 

44from nova import exception 

45from nova.i18n import _ 

46from nova import objects 

47from nova import utils 

48from nova.virt import block_device as driver_block_device 

49 

50 

51LOG = logging.getLogger(__name__) 

52 

53CONF = nova.conf.CONF 

54 

55_KEYMGR = None 

56 

57_VTPM_SECRET_BYTE_LENGTH = 384 

58 

59_EPHEMERAL_ENCRYPTION_SECRET_BYTE_LENGTH = 64 

60 

61 

62def _get_key_manager(): 

63 global _KEYMGR 

64 if _KEYMGR is None: 

65 _KEYMGR = key_manager.API(configuration=CONF) 

66 return _KEYMGR 

67 

68 

69def generate_fingerprint(public_key: str) -> str: 

70 try: 

71 pub_bytes = public_key.encode('utf-8') 

72 # Test that the given public_key string is a proper ssh key. The 

73 # returned object is unused since pyca/cryptography does not have a 

74 # fingerprint method. 

75 serialization.load_ssh_public_key( 

76 pub_bytes, backends.default_backend()) 

77 pub_data = base64.b64decode(public_key.split(' ')[1]) 

78 raw_fp = hashlib.md5(pub_data, usedforsecurity=False).hexdigest() 

79 return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2])) 

80 except Exception: 

81 raise exception.InvalidKeypair( 

82 reason=_('failed to generate fingerprint')) 

83 

84 

85def generate_x509_fingerprint(pem_key: ty.Union[bytes, str]) -> str: 

86 try: 

87 if isinstance(pem_key, str): 87 ↛ 89line 87 didn't jump to line 89 because the condition on line 87 was always true

88 pem_key = pem_key.encode('utf-8') 

89 cert = x509.load_pem_x509_certificate( 

90 pem_key, backends.default_backend()) 

91 raw_fp = binascii.hexlify( 

92 cert.fingerprint(hashes.SHA1()) 

93 ).decode('ascii') 

94 return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2])) 

95 except (ValueError, TypeError, binascii.Error) as ex: 

96 raise exception.InvalidKeypair( 

97 reason=_('failed to generate X509 fingerprint. ' 

98 'Error message: %s') % ex) 

99 

100 

101def generate_key_pair(bits: int = 2048) -> ty.Tuple[str, str, str]: 

102 key = paramiko.RSAKey.generate(bits) 

103 keyout = io.StringIO() 

104 key.write_private_key(keyout) 

105 private_key = keyout.getvalue() 

106 public_key = '%s %s Generated-by-Nova' % (key.get_name(), key.get_base64()) 

107 fingerprint = generate_fingerprint(public_key) 

108 return (private_key, public_key, fingerprint) 

109 

110 

111def ssh_encrypt_text(ssh_public_key: str, text: ty.Union[str, bytes]) -> bytes: 

112 """Encrypt text with an ssh public key. 

113 

114 If text is a Unicode string, encode it to UTF-8. 

115 """ 

116 if isinstance(text, str): 116 ↛ 118line 116 didn't jump to line 118 because the condition on line 116 was always true

117 text = text.encode('utf-8') 

118 try: 

119 pub_bytes = ssh_public_key.encode('utf-8') 

120 pub_key = serialization.load_ssh_public_key( 

121 pub_bytes, backends.default_backend()) 

122 return pub_key.encrypt(text, padding.PKCS1v15()) 

123 except Exception as exc: 

124 raise exception.EncryptionFailure(reason=str(exc)) 

125 

126 

127def generate_winrm_x509_cert( 

128 user_id: str, 

129 bits: int = 2048 

130) -> ty.Tuple[str, str, str]: 

131 """Generate a cert for passwordless auth for user in project.""" 

132 subject = '/CN=%s' % user_id 

133 upn = '%s@localhost' % user_id 

134 

135 with utils.tempdir() as tmpdir: 

136 keyfile = os.path.abspath(os.path.join(tmpdir, 'temp.key')) 

137 conffile = os.path.abspath(os.path.join(tmpdir, 'temp.conf')) 

138 

139 _create_x509_openssl_config(conffile, upn) 

140 

141 out, _ = processutils.execute( 

142 'openssl', 'req', '-x509', '-nodes', '-days', '3650', 

143 '-config', conffile, '-newkey', 'rsa:%s' % bits, 

144 '-outform', 'PEM', '-keyout', keyfile, '-subj', subject, 

145 '-extensions', 'v3_req_client', 

146 binary=True) 

147 

148 certificate = out.decode('utf-8') 

149 

150 out, _ = processutils.execute( 

151 'openssl', 'pkcs12', '-export', '-inkey', keyfile, '-password', 

152 'pass:', process_input=out, binary=True) 

153 

154 private_key = base64.b64encode(out).decode('ascii') 

155 fingerprint = generate_x509_fingerprint(certificate) 

156 

157 return (private_key, certificate, fingerprint) 

158 

159 

160def _create_x509_openssl_config(conffile: str, upn: str): 

161 content = ("distinguished_name = req_distinguished_name\n" 

162 "[req_distinguished_name]\n" 

163 "[v3_req_client]\n" 

164 "extendedKeyUsage = clientAuth\n" 

165 "subjectAltName = otherName:""1.3.6.1.4.1.311.20.2.3;UTF8:%s\n") 

166 

167 with open(conffile, 'w') as file: 

168 file.write(content % upn) 

169 

170 

171def ensure_vtpm_secret( 

172 context: nova_context.RequestContext, 

173 instance: 'objects.Instance', 

174) -> ty.Tuple[str, str]: 

175 """Communicates with the key manager service to retrieve or create a secret 

176 for an instance's emulated TPM. 

177 

178 When creating a secret, its UUID is saved to the instance's system_metadata 

179 as ``vtpm_secret_uuid``. 

180 

181 :param context: Nova auth context. 

182 :param instance: Instance object. 

183 :return: A tuple comprising (secret_uuid, passphrase). 

184 :raise: castellan_exception.ManagedObjectNotFoundError if communication 

185 with the key manager API fails, or if a vtpm_secret_uuid was present in 

186 the instance's system metadata but could not be found in the key 

187 manager service. 

188 """ 

189 key_mgr = _get_key_manager() 

190 

191 secret_uuid = instance.system_metadata.get('vtpm_secret_uuid') 

192 if secret_uuid is not None: 

193 # Try to retrieve the secret from the key manager 

194 try: 

195 secret = key_mgr.get(context, secret_uuid) 

196 # assert secret_uuid == secret.id ? 

197 LOG.debug( 

198 "Found existing vTPM secret with UUID %s.", 

199 secret_uuid, instance=instance) 

200 return secret.id, secret.get_encoded() 

201 except castellan_exception.ManagedObjectNotFoundError: 

202 LOG.warning( 

203 "Despite being set on the instance, failed to find a vTPM " 

204 "secret with UUID %s. This should only happen if the secret " 

205 "was manually deleted from the key manager service. Your vTPM " 

206 "is likely to be unrecoverable.", 

207 secret_uuid, instance=instance) 

208 raise 

209 

210 # If we get here, the instance has no vtpm_secret_uuid. Create a new one 

211 # and register it with the key manager. 

212 secret = base64.b64encode(os.urandom(_VTPM_SECRET_BYTE_LENGTH)) 

213 # Castellan ManagedObject 

214 cmo = passphrase.Passphrase( 

215 secret, name="vTPM secret for instance %s" % instance.uuid) 

216 secret_uuid = key_mgr.store(context, cmo) 

217 LOG.debug("Created vTPM secret with UUID %s", 

218 secret_uuid, instance=instance) 

219 

220 instance.system_metadata['vtpm_secret_uuid'] = secret_uuid 

221 instance.save() 

222 return secret_uuid, secret 

223 

224 

225def delete_vtpm_secret( 

226 context: nova_context.RequestContext, 

227 instance: 'objects.Instance', 

228): 

229 """Communicates with the key manager service to destroy the secret for an 

230 instance's emulated TPM. 

231 

232 This operation is idempotent: if the instance never had a vTPM secret, OR 

233 if the secret has already been deleted, it is a no-op. 

234 

235 The ``vtpm_secret_uuid`` member of the instance's system_metadata is 

236 cleared as a side effect of this method. 

237 

238 :param context: Nova auth context. 

239 :param instance: Instance object. 

240 :return: None 

241 :raise: castellan_exception.ManagedObjectNotFoundError if communication 

242 with the key manager API. 

243 """ 

244 secret_uuid = instance.system_metadata.get('vtpm_secret_uuid') 

245 if not secret_uuid: 

246 return 

247 

248 key_mgr = _get_key_manager() 

249 try: 

250 key_mgr.delete(context, secret_uuid) 

251 LOG.debug("Deleted vTPM secret with UUID %s", 

252 secret_uuid, instance=instance) 

253 except castellan_exception.ManagedObjectNotFoundError: 

254 LOG.debug("vTPM secret with UUID %s already deleted or never existed.", 

255 secret_uuid, instance=instance) 

256 

257 del instance.system_metadata['vtpm_secret_uuid'] 

258 instance.save() 

259 

260 

261def create_encryption_secret( 

262 context: nova_context.RequestContext, 

263 instance: 'objects.Instance', 

264 driver_bdm: 'driver_block_device.DriverBlockDevice', 

265 for_detail: ty.Optional[str] = None, 

266): 

267 # Use oslo.serialization to encode some random data as passphrase 

268 secret = oslo_base64.encode_as_text( 

269 os.urandom(_EPHEMERAL_ENCRYPTION_SECRET_BYTE_LENGTH)) 

270 if for_detail is None: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true

271 for_detail = f"instance {instance.uuid} BDM {driver_bdm['uuid']}" 

272 secret_name = f'Ephemeral encryption secret for {for_detail}' 

273 cmo = passphrase.Passphrase(secret, name=secret_name) 

274 key_mgr = _get_key_manager() 

275 secret_uuid = key_mgr.store(context, cmo) 

276 LOG.debug( 

277 f'Created "{secret_name}" with UUID {secret_uuid}', 

278 instance=instance 

279 ) 

280 return secret_uuid, secret 

281 

282 

283def get_encryption_secret( 

284 context: nova_context.RequestContext, 

285 secret_uuid: str, 

286) -> ty.Optional[str]: 

287 key_mgr = _get_key_manager() 

288 try: 

289 key = key_mgr.get(context, secret_uuid) 

290 LOG.debug(f"Retrieved secret with UUID {secret_uuid}") 

291 return key.get_encoded() 

292 except castellan_exception.ManagedObjectNotFoundError: 

293 LOG.debug(f"Encryption secret with UUID {secret_uuid} was not found.") 

294 return None 

295 

296 

297def delete_encryption_secret( 

298 context: nova_context.RequestContext, 

299 instance_uuid: str, 

300 secret_uuid: str, 

301): 

302 key_mgr = _get_key_manager() 

303 try: 

304 key_mgr.delete(context, secret_uuid) 

305 LOG.debug(f"Deleted secret with UUID {secret_uuid}", 

306 instance_uuid=instance_uuid) 

307 except castellan_exception.ManagedObjectNotFoundError: 

308 LOG.debug(f"Encryption secret with UUID {secret_uuid} already deleted " 

309 "or never existed.", instance_uuid=instance_uuid)