Coverage for nova/objects/ec2.py: 89%
147 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
1# Copyright 2014 Red Hat Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
15import functools
17from oslo_utils import uuidutils
19from nova import cache_utils
20from nova.db.main import api as db
21from nova import exception
22from nova.objects import base
23from nova.objects import fields
26# NOTE(vish): cache mapping for one week
27_CACHE_TIME = 7 * 24 * 60 * 60
28_CACHE = None
31def memoize(func):
32 @functools.wraps(func)
33 def memoizer(context, reqid):
34 global _CACHE
35 if not _CACHE:
36 _CACHE = cache_utils.get_client(expiration_time=_CACHE_TIME)
37 key = "%s:%s" % (func.__name__, reqid)
38 key = str(key)
39 value = _CACHE.get(key)
40 if value is None:
41 value = func(context, reqid)
42 _CACHE.set(key, value)
43 return value
44 return memoizer
47def id_to_ec2_id(instance_id, template='i-%08x'):
48 """Convert an instance ID (int) to an ec2 ID (i-[base 16 number])."""
49 return template % int(instance_id)
52def id_to_ec2_inst_id(context, instance_id):
53 """Get or create an ec2 instance ID (i-[base 16 number]) from uuid."""
54 if instance_id is None: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true
55 return None
56 elif uuidutils.is_uuid_like(instance_id): 56 ↛ 60line 56 didn't jump to line 60 because the condition on line 56 was always true
57 int_id = get_int_id_from_instance_uuid(context, instance_id)
58 return id_to_ec2_id(int_id)
59 else:
60 return id_to_ec2_id(instance_id)
63@memoize
64def get_int_id_from_instance_uuid(context, instance_uuid):
65 if instance_uuid is None: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true
66 return
67 try:
68 imap = EC2InstanceMapping.get_by_uuid(context, instance_uuid)
69 return imap.id
70 except exception.NotFound:
71 imap = EC2InstanceMapping(context)
72 imap.uuid = instance_uuid
73 imap.create()
74 return imap.id
77def glance_id_to_ec2_id(context, glance_id, image_type='ami'):
78 image_id = glance_id_to_id(context, glance_id)
79 if image_id is None:
80 return
82 template = image_type + '-%08x'
83 return id_to_ec2_id(image_id, template=template)
86@memoize
87def glance_id_to_id(context, glance_id):
88 """Convert a glance id to an internal (db) id."""
89 if not glance_id:
90 return
92 try:
93 return S3ImageMapping.get_by_uuid(context, glance_id).id
94 except exception.NotFound:
95 s3imap = S3ImageMapping(context, uuid=glance_id)
96 s3imap.create()
97 return s3imap.id
100def glance_type_to_ec2_type(image_type):
101 """Converts to a three letter image type.
103 aki, kernel => aki
104 ari, ramdisk => ari
105 anything else => ami
107 """
108 if image_type == 'kernel':
109 return 'aki'
110 if image_type == 'ramdisk': 110 ↛ 112line 110 didn't jump to line 112 because the condition on line 110 was always true
111 return 'ari'
112 if image_type not in ['aki', 'ari']:
113 return 'ami'
114 return image_type
117@base.NovaObjectRegistry.register
118class EC2InstanceMapping(base.NovaPersistentObject, base.NovaObject):
119 # Version 1.0: Initial version
120 VERSION = '1.0'
122 fields = {
123 'id': fields.IntegerField(),
124 'uuid': fields.UUIDField(),
125 }
127 @staticmethod
128 def _from_db_object(context, imap, db_imap):
129 for field in imap.fields:
130 setattr(imap, field, db_imap[field])
131 imap._context = context
132 imap.obj_reset_changes()
133 return imap
135 @base.remotable
136 def create(self):
137 if self.obj_attr_is_set('id'): 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 raise exception.ObjectActionError(action='create',
139 reason='already created')
140 db_imap = db.ec2_instance_create(self._context, self.uuid)
141 self._from_db_object(self._context, self, db_imap)
143 @base.remotable_classmethod
144 def get_by_uuid(cls, context, instance_uuid):
145 db_imap = db.ec2_instance_get_by_uuid(context, instance_uuid)
146 if db_imap: 146 ↛ exitline 146 didn't return from function 'get_by_uuid' because the condition on line 146 was always true
147 return cls._from_db_object(context, cls(), db_imap)
149 @base.remotable_classmethod
150 def get_by_id(cls, context, ec2_id):
151 db_imap = db.ec2_instance_get_by_id(context, ec2_id)
152 if db_imap: 152 ↛ exitline 152 didn't return from function 'get_by_id' because the condition on line 152 was always true
153 return cls._from_db_object(context, cls(), db_imap)
156@base.NovaObjectRegistry.register
157class S3ImageMapping(base.NovaPersistentObject, base.NovaObject):
158 # Version 1.0: Initial version
159 VERSION = '1.0'
161 fields = {
162 'id': fields.IntegerField(read_only=True),
163 'uuid': fields.UUIDField(),
164 }
166 @staticmethod
167 def _from_db_object(context, s3imap, db_s3imap):
168 for field in s3imap.fields:
169 setattr(s3imap, field, db_s3imap[field])
170 s3imap._context = context
171 s3imap.obj_reset_changes()
172 return s3imap
174 @base.remotable
175 def create(self):
176 if self.obj_attr_is_set('id'): 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true
177 raise exception.ObjectActionError(action='create',
178 reason='already created')
179 db_s3imap = db.s3_image_create(self._context, self.uuid)
180 self._from_db_object(self._context, self, db_s3imap)
182 @base.remotable_classmethod
183 def get_by_uuid(cls, context, s3_image_uuid):
184 db_s3imap = db.s3_image_get_by_uuid(context, s3_image_uuid)
185 if db_s3imap: 185 ↛ exitline 185 didn't return from function 'get_by_uuid' because the condition on line 185 was always true
186 return cls._from_db_object(context, cls(context), db_s3imap)
188 @base.remotable_classmethod
189 def get_by_id(cls, context, s3_id):
190 db_s3imap = db.s3_image_get(context, s3_id)
191 if db_s3imap: 191 ↛ exitline 191 didn't return from function 'get_by_id' because the condition on line 191 was always true
192 return cls._from_db_object(context, cls(context), db_s3imap)
195@base.NovaObjectRegistry.register
196class EC2Ids(base.NovaObject):
197 # Version 1.0: Initial version
198 VERSION = '1.0'
200 fields = {
201 'instance_id': fields.StringField(read_only=True),
202 'ami_id': fields.StringField(nullable=True, read_only=True),
203 'kernel_id': fields.StringField(nullable=True, read_only=True),
204 'ramdisk_id': fields.StringField(nullable=True, read_only=True),
205 }
207 @staticmethod
208 def _from_dict(ec2ids, dict_ec2ids):
209 for field in ec2ids.fields:
210 setattr(ec2ids, field, dict_ec2ids[field])
211 return ec2ids
213 @staticmethod
214 def _get_ec2_ids(context, instance):
215 ec2_ids = {}
217 ec2_ids['instance_id'] = id_to_ec2_inst_id(context, instance.uuid)
218 ec2_ids['ami_id'] = glance_id_to_ec2_id(context, instance.image_ref)
219 for image_type in ['kernel', 'ramdisk']:
220 image_id = getattr(instance, '%s_id' % image_type)
221 ec2_id = None
222 if image_id is not None:
223 ec2_image_type = glance_type_to_ec2_type(image_type)
224 ec2_id = glance_id_to_ec2_id(context, image_id, ec2_image_type)
225 ec2_ids['%s_id' % image_type] = ec2_id
227 return ec2_ids
229 @base.remotable_classmethod
230 def get_by_instance(cls, context, instance):
231 ec2_ids = cls._get_ec2_ids(context, instance)
232 return cls._from_dict(cls(context), ec2_ids)