Coverage for nova/objects/keypair.py: 83%

111 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-24 11:16 +0000

1# Copyright 2013 IBM Corp. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); you may 

4# not use this file except in compliance with the License. You may obtain 

5# a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

12# License for the specific language governing permissions and limitations 

13# under the License. 

14 

15from oslo_db import exception as db_exc 

16from oslo_db.sqlalchemy import utils as sqlalchemyutils 

17from oslo_log import log as logging 

18from oslo_utils import versionutils 

19 

20from nova.db.api import api as api_db_api 

21from nova.db.api import models as api_models 

22from nova import exception 

23from nova import objects 

24from nova.objects import base 

25from nova.objects import fields 

26 

27KEYPAIR_TYPE_SSH = 'ssh' 

28KEYPAIR_TYPE_X509 = 'x509' 

29LOG = logging.getLogger(__name__) 

30 

31 

32@api_db_api.context_manager.reader 

33def _get_from_db(context, user_id, name=None, limit=None, marker=None): 

34 query = context.session.query(api_models.KeyPair).\ 

35 filter(api_models.KeyPair.user_id == user_id) 

36 if name is not None: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true

37 db_keypair = query.filter(api_models.KeyPair.name == name).\ 

38 first() 

39 if not db_keypair: 

40 raise exception.KeypairNotFound(user_id=user_id, name=name) 

41 return db_keypair 

42 

43 marker_row = None 

44 if marker is not None: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true

45 marker_row = context.session.query(api_models.KeyPair).\ 

46 filter(api_models.KeyPair.name == marker).\ 

47 filter(api_models.KeyPair.user_id == user_id).first() 

48 if not marker_row: 

49 raise exception.MarkerNotFound(marker=marker) 

50 

51 query = sqlalchemyutils.paginate_query( 

52 query, api_models.KeyPair, limit, ['name'], marker=marker_row) 

53 

54 return query.all() 

55 

56 

57@api_db_api.context_manager.reader 

58def _get_count_from_db(context, user_id): 

59 return context.session.query(api_models.KeyPair).\ 

60 filter(api_models.KeyPair.user_id == user_id).\ 

61 count() 

62 

63 

64@api_db_api.context_manager.writer 

65def _create_in_db(context, values): 

66 kp = api_models.KeyPair() 

67 kp.update(values) 

68 try: 

69 kp.save(context.session) 

70 except db_exc.DBDuplicateEntry: 

71 raise exception.KeyPairExists(key_name=values['name']) 

72 return kp 

73 

74 

75@api_db_api.context_manager.writer 

76def _destroy_in_db(context, user_id, name): 

77 result = context.session.query(api_models.KeyPair).\ 

78 filter_by(user_id=user_id).\ 

79 filter_by(name=name).\ 

80 delete() 

81 if not result: 

82 raise exception.KeypairNotFound(user_id=user_id, name=name) 

83 

84 

85# TODO(berrange): Remove NovaObjectDictCompat 

86@base.NovaObjectRegistry.register 

87class KeyPair(base.NovaPersistentObject, base.NovaObject, 

88 base.NovaObjectDictCompat): 

89 # Version 1.0: Initial version 

90 # Version 1.1: String attributes updated to support unicode 

91 # Version 1.2: Added keypair type 

92 # Version 1.3: Name field is non-null 

93 # Version 1.4: Add localonly flag to get_by_name() 

94 VERSION = '1.4' 

95 

96 fields = { 

97 'id': fields.IntegerField(), 

98 'name': fields.StringField(nullable=False), 

99 'user_id': fields.StringField(nullable=True), 

100 'fingerprint': fields.StringField(nullable=True), 

101 'public_key': fields.StringField(nullable=True), 

102 'type': fields.StringField(nullable=False), 

103 } 

104 

105 def obj_make_compatible(self, primitive, target_version): 

106 super(KeyPair, self).obj_make_compatible(primitive, target_version) 

107 target_version = versionutils.convert_version_to_tuple(target_version) 

108 if target_version < (1, 2) and 'type' in primitive: 

109 del primitive['type'] 

110 

111 @staticmethod 

112 def _from_db_object(context, keypair, db_keypair): 

113 ignore = {'deleted': False, 

114 'deleted_at': None} 

115 for key in keypair.fields: 

116 if key in ignore and not hasattr(db_keypair, key): 

117 keypair[key] = ignore[key] 

118 else: 

119 keypair[key] = db_keypair[key] 

120 keypair._context = context 

121 keypair.obj_reset_changes() 

122 return keypair 

123 

124 @staticmethod 

125 def _get_from_db(context, user_id, name): 

126 return _get_from_db(context, user_id, name=name) 

127 

128 @staticmethod 

129 def _destroy_in_db(context, user_id, name): 

130 return _destroy_in_db(context, user_id, name) 

131 

132 @staticmethod 

133 def _create_in_db(context, values): 

134 return _create_in_db(context, values) 

135 

136 # TODO(stephenfin): Remove the 'localonly' parameter in v2.0 

137 @base.remotable_classmethod 

138 def get_by_name(cls, context, user_id, name, localonly=False): 

139 if localonly: 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was never true

140 # There is no longer a "local" (main) table for keypairs, so this 

141 # will always return nothing now 

142 raise exception.KeypairNotFound(user_id=user_id, name=name) 

143 

144 db_keypair = cls._get_from_db(context, user_id, name) 

145 return cls._from_db_object(context, cls(), db_keypair) 

146 

147 @base.remotable_classmethod 

148 def destroy_by_name(cls, context, user_id, name): 

149 cls._destroy_in_db(context, user_id, name) 

150 

151 @base.remotable 

152 def create(self): 

153 if self.obj_attr_is_set('id'): 

154 raise exception.ObjectActionError( 

155 action='create', reason='already created', 

156 ) 

157 

158 self._create() 

159 

160 def _create(self): 

161 updates = self.obj_get_changes() 

162 db_keypair = self._create_in_db(self._context, updates) 

163 self._from_db_object(self._context, self, db_keypair) 

164 

165 @base.remotable 

166 def destroy(self): 

167 self._destroy_in_db(self._context, self.user_id, self.name) 

168 

169 

170@base.NovaObjectRegistry.register 

171class KeyPairList(base.ObjectListBase, base.NovaObject): 

172 # Version 1.0: Initial version 

173 # KeyPair <= version 1.1 

174 # Version 1.1: KeyPair <= version 1.2 

175 # Version 1.2: KeyPair <= version 1.3 

176 # Version 1.3: Add new parameters 'limit' and 'marker' to get_by_user() 

177 VERSION = '1.3' 

178 

179 fields = { 

180 'objects': fields.ListOfObjectsField('KeyPair'), 

181 } 

182 

183 @staticmethod 

184 def _get_from_db(context, user_id, limit, marker): 

185 return _get_from_db(context, user_id, limit=limit, marker=marker) 

186 

187 @staticmethod 

188 def _get_count_from_db(context, user_id): 

189 return _get_count_from_db(context, user_id) 

190 

191 @base.remotable_classmethod 

192 def get_by_user(cls, context, user_id, limit=None, marker=None): 

193 api_db_keypairs = cls._get_from_db( 

194 context, user_id, limit=limit, marker=marker) 

195 

196 return base.obj_make_list( 

197 context, cls(context), objects.KeyPair, api_db_keypairs, 

198 ) 

199 

200 @base.remotable_classmethod 

201 def get_count_by_user(cls, context, user_id): 

202 return cls._get_count_from_db(context, user_id)