Coverage for nova/objects/keypair.py: 83%
111 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
1# Copyright 2013 IBM Corp.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
15from oslo_db import exception as db_exc
16from oslo_db.sqlalchemy import utils as sqlalchemyutils
17from oslo_log import log as logging
18from oslo_utils import versionutils
20from nova.db.api import api as api_db_api
21from nova.db.api import models as api_models
22from nova import exception
23from nova import objects
24from nova.objects import base
25from nova.objects import fields
27KEYPAIR_TYPE_SSH = 'ssh'
28KEYPAIR_TYPE_X509 = 'x509'
29LOG = logging.getLogger(__name__)
32@api_db_api.context_manager.reader
33def _get_from_db(context, user_id, name=None, limit=None, marker=None):
34 query = context.session.query(api_models.KeyPair).\
35 filter(api_models.KeyPair.user_id == user_id)
36 if name is not None: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true
37 db_keypair = query.filter(api_models.KeyPair.name == name).\
38 first()
39 if not db_keypair:
40 raise exception.KeypairNotFound(user_id=user_id, name=name)
41 return db_keypair
43 marker_row = None
44 if marker is not None: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 marker_row = context.session.query(api_models.KeyPair).\
46 filter(api_models.KeyPair.name == marker).\
47 filter(api_models.KeyPair.user_id == user_id).first()
48 if not marker_row:
49 raise exception.MarkerNotFound(marker=marker)
51 query = sqlalchemyutils.paginate_query(
52 query, api_models.KeyPair, limit, ['name'], marker=marker_row)
54 return query.all()
57@api_db_api.context_manager.reader
58def _get_count_from_db(context, user_id):
59 return context.session.query(api_models.KeyPair).\
60 filter(api_models.KeyPair.user_id == user_id).\
61 count()
64@api_db_api.context_manager.writer
65def _create_in_db(context, values):
66 kp = api_models.KeyPair()
67 kp.update(values)
68 try:
69 kp.save(context.session)
70 except db_exc.DBDuplicateEntry:
71 raise exception.KeyPairExists(key_name=values['name'])
72 return kp
75@api_db_api.context_manager.writer
76def _destroy_in_db(context, user_id, name):
77 result = context.session.query(api_models.KeyPair).\
78 filter_by(user_id=user_id).\
79 filter_by(name=name).\
80 delete()
81 if not result:
82 raise exception.KeypairNotFound(user_id=user_id, name=name)
85# TODO(berrange): Remove NovaObjectDictCompat
86@base.NovaObjectRegistry.register
87class KeyPair(base.NovaPersistentObject, base.NovaObject,
88 base.NovaObjectDictCompat):
89 # Version 1.0: Initial version
90 # Version 1.1: String attributes updated to support unicode
91 # Version 1.2: Added keypair type
92 # Version 1.3: Name field is non-null
93 # Version 1.4: Add localonly flag to get_by_name()
94 VERSION = '1.4'
96 fields = {
97 'id': fields.IntegerField(),
98 'name': fields.StringField(nullable=False),
99 'user_id': fields.StringField(nullable=True),
100 'fingerprint': fields.StringField(nullable=True),
101 'public_key': fields.StringField(nullable=True),
102 'type': fields.StringField(nullable=False),
103 }
105 def obj_make_compatible(self, primitive, target_version):
106 super(KeyPair, self).obj_make_compatible(primitive, target_version)
107 target_version = versionutils.convert_version_to_tuple(target_version)
108 if target_version < (1, 2) and 'type' in primitive:
109 del primitive['type']
111 @staticmethod
112 def _from_db_object(context, keypair, db_keypair):
113 ignore = {'deleted': False,
114 'deleted_at': None}
115 for key in keypair.fields:
116 if key in ignore and not hasattr(db_keypair, key):
117 keypair[key] = ignore[key]
118 else:
119 keypair[key] = db_keypair[key]
120 keypair._context = context
121 keypair.obj_reset_changes()
122 return keypair
124 @staticmethod
125 def _get_from_db(context, user_id, name):
126 return _get_from_db(context, user_id, name=name)
128 @staticmethod
129 def _destroy_in_db(context, user_id, name):
130 return _destroy_in_db(context, user_id, name)
132 @staticmethod
133 def _create_in_db(context, values):
134 return _create_in_db(context, values)
136 # TODO(stephenfin): Remove the 'localonly' parameter in v2.0
137 @base.remotable_classmethod
138 def get_by_name(cls, context, user_id, name, localonly=False):
139 if localonly: 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was never true
140 # There is no longer a "local" (main) table for keypairs, so this
141 # will always return nothing now
142 raise exception.KeypairNotFound(user_id=user_id, name=name)
144 db_keypair = cls._get_from_db(context, user_id, name)
145 return cls._from_db_object(context, cls(), db_keypair)
147 @base.remotable_classmethod
148 def destroy_by_name(cls, context, user_id, name):
149 cls._destroy_in_db(context, user_id, name)
151 @base.remotable
152 def create(self):
153 if self.obj_attr_is_set('id'):
154 raise exception.ObjectActionError(
155 action='create', reason='already created',
156 )
158 self._create()
160 def _create(self):
161 updates = self.obj_get_changes()
162 db_keypair = self._create_in_db(self._context, updates)
163 self._from_db_object(self._context, self, db_keypair)
165 @base.remotable
166 def destroy(self):
167 self._destroy_in_db(self._context, self.user_id, self.name)
170@base.NovaObjectRegistry.register
171class KeyPairList(base.ObjectListBase, base.NovaObject):
172 # Version 1.0: Initial version
173 # KeyPair <= version 1.1
174 # Version 1.1: KeyPair <= version 1.2
175 # Version 1.2: KeyPair <= version 1.3
176 # Version 1.3: Add new parameters 'limit' and 'marker' to get_by_user()
177 VERSION = '1.3'
179 fields = {
180 'objects': fields.ListOfObjectsField('KeyPair'),
181 }
183 @staticmethod
184 def _get_from_db(context, user_id, limit, marker):
185 return _get_from_db(context, user_id, limit=limit, marker=marker)
187 @staticmethod
188 def _get_count_from_db(context, user_id):
189 return _get_count_from_db(context, user_id)
191 @base.remotable_classmethod
192 def get_by_user(cls, context, user_id, limit=None, marker=None):
193 api_db_keypairs = cls._get_from_db(
194 context, user_id, limit=limit, marker=marker)
196 return base.obj_make_list(
197 context, cls(context), objects.KeyPair, api_db_keypairs,
198 )
200 @base.remotable_classmethod
201 def get_count_by_user(cls, context, user_id):
202 return cls._get_count_from_db(context, user_id)