Coverage for nova/objects/aggregate.py: 82%
313 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
1# Copyright 2013 IBM Corp.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
15from oslo_db import exception as db_exc
16from oslo_log import log as logging
17from oslo_utils import excutils
18from oslo_utils import uuidutils
19from sqlalchemy import orm
21from nova.compute import utils as compute_utils
22from nova.db.api import api as api_db_api
23from nova.db.api import models as api_models
24from nova import exception
25from nova.i18n import _
26from nova import objects
27from nova.objects import base
28from nova.objects import fields
30LOG = logging.getLogger(__name__)
32DEPRECATED_FIELDS = ['deleted', 'deleted_at']
35@api_db_api.context_manager.reader
36def _aggregate_get_from_db(context, aggregate_id):
37 query = context.session.query(api_models.Aggregate).\
38 options(orm.joinedload(api_models.Aggregate._hosts)).\
39 options(orm.joinedload(api_models.Aggregate._metadata))
40 query = query.filter(api_models.Aggregate.id == aggregate_id)
42 aggregate = query.first()
44 if not aggregate:
45 raise exception.AggregateNotFound(aggregate_id=aggregate_id)
47 return aggregate
50@api_db_api.context_manager.reader
51def _aggregate_get_from_db_by_uuid(context, aggregate_uuid):
52 query = context.session.query(api_models.Aggregate).\
53 options(orm.joinedload(api_models.Aggregate._hosts)).\
54 options(orm.joinedload(api_models.Aggregate._metadata))
55 query = query.filter(api_models.Aggregate.uuid == aggregate_uuid)
57 aggregate = query.first()
59 if not aggregate:
60 raise exception.AggregateNotFound(aggregate_id=aggregate_uuid)
62 return aggregate
65def _host_add_to_db(context, aggregate_id, host):
66 try:
67 with api_db_api.context_manager.writer.using(context):
68 # Check to see if the aggregate exists
69 _aggregate_get_from_db(context, aggregate_id)
71 host_ref = api_models.AggregateHost()
72 host_ref.update({"host": host, "aggregate_id": aggregate_id})
73 host_ref.save(context.session)
74 return host_ref
75 except db_exc.DBDuplicateEntry:
76 raise exception.AggregateHostExists(host=host,
77 aggregate_id=aggregate_id)
80def _host_delete_from_db(context, aggregate_id, host):
81 count = 0
82 with api_db_api.context_manager.writer.using(context):
83 # Check to see if the aggregate exists
84 _aggregate_get_from_db(context, aggregate_id)
86 query = context.session.query(api_models.AggregateHost)
87 query = query.filter(api_models.AggregateHost.aggregate_id ==
88 aggregate_id)
89 count = query.filter_by(host=host).delete()
91 if count == 0: 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true
92 raise exception.AggregateHostNotFound(aggregate_id=aggregate_id,
93 host=host)
96def _metadata_add_to_db(context, aggregate_id, metadata, max_retries=10,
97 set_delete=False):
98 all_keys = metadata.keys()
99 for attempt in range(max_retries): 99 ↛ exitline 99 didn't return from function '_metadata_add_to_db' because the loop on line 99 didn't complete
100 try:
101 with api_db_api.context_manager.writer.using(context):
102 query = context.session.query(api_models.AggregateMetadata).\
103 filter_by(aggregate_id=aggregate_id)
105 if set_delete: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true
106 query.filter(~api_models.AggregateMetadata.key.
107 in_(all_keys)).\
108 delete(synchronize_session=False)
110 already_existing_keys = set()
111 if all_keys: 111 ↛ 126line 111 didn't jump to line 126 because the condition on line 111 was always true
112 query = query.filter(
113 api_models.AggregateMetadata.key.in_(all_keys))
114 for meta_ref in query.all():
115 key = meta_ref.key
116 try:
117 meta_ref.update({"value": metadata[key]})
118 already_existing_keys.add(key)
119 except KeyError:
120 # NOTE(ratailor): When user tries updating
121 # metadata using case-sensitive key, we get
122 # KeyError.
123 raise exception.AggregateMetadataKeyExists(
124 aggregate_id=aggregate_id, key=key)
126 new_entries = []
127 for key, value in metadata.items():
128 if key in already_existing_keys:
129 continue
130 new_entries.append({"key": key,
131 "value": value,
132 "aggregate_id": aggregate_id})
133 if new_entries:
134 context.session.execute(
135 api_models.AggregateMetadata.__table__.insert(),
136 new_entries)
138 return metadata
139 except db_exc.DBDuplicateEntry:
140 # a concurrent transaction has been committed,
141 # try again unless this was the last attempt
142 with excutils.save_and_reraise_exception() as ctxt:
143 if attempt < max_retries - 1:
144 ctxt.reraise = False
145 else:
146 msg = _("Add metadata failed for aggregate %(id)s "
147 "after %(retries)s retries") % \
148 {"id": aggregate_id, "retries": max_retries}
149 LOG.warning(msg)
152@api_db_api.context_manager.writer
153def _metadata_delete_from_db(context, aggregate_id, key):
154 # Check to see if the aggregate exists
155 _aggregate_get_from_db(context, aggregate_id)
157 query = context.session.query(api_models.AggregateMetadata)
158 query = query.filter(api_models.AggregateMetadata.aggregate_id ==
159 aggregate_id)
160 count = query.filter_by(key=key).delete()
162 if count == 0: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 raise exception.AggregateMetadataNotFound(
164 aggregate_id=aggregate_id, metadata_key=key)
167@api_db_api.context_manager.writer
168def _aggregate_create_in_db(context, values, metadata=None):
169 query = context.session.query(api_models.Aggregate)
170 query = query.filter(api_models.Aggregate.name == values['name'])
171 aggregate = query.first()
173 if not aggregate: 173 ↛ 182line 173 didn't jump to line 182 because the condition on line 173 was always true
174 aggregate = api_models.Aggregate()
175 aggregate.update(values)
176 aggregate.save(context.session)
177 # We don't want these to be lazy loaded later. We know there is
178 # nothing here since we just created this aggregate.
179 aggregate._hosts = []
180 aggregate._metadata = []
181 else:
182 raise exception.AggregateNameExists(aggregate_name=values['name'])
183 if metadata:
184 _metadata_add_to_db(context, aggregate.id, metadata)
185 context.session.expire(aggregate, ['_metadata'])
186 aggregate._metadata
188 return aggregate
191@api_db_api.context_manager.writer
192def _aggregate_delete_from_db(context, aggregate_id):
193 # Delete Metadata first
194 context.session.query(api_models.AggregateMetadata).\
195 filter_by(aggregate_id=aggregate_id).\
196 delete()
198 count = context.session.query(api_models.Aggregate).\
199 filter(api_models.Aggregate.id == aggregate_id).\
200 delete()
202 if count == 0: 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true
203 raise exception.AggregateNotFound(aggregate_id=aggregate_id)
206@api_db_api.context_manager.writer
207def _aggregate_update_to_db(context, aggregate_id, values):
208 aggregate = _aggregate_get_from_db(context, aggregate_id)
210 set_delete = True
211 if "availability_zone" in values: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 az = values.pop('availability_zone')
213 if 'metadata' not in values:
214 values['metadata'] = {'availability_zone': az}
215 set_delete = False
216 else:
217 values['metadata']['availability_zone'] = az
218 metadata = values.get('metadata')
219 if metadata is not None: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true
220 _metadata_add_to_db(context, aggregate_id, values.pop('metadata'),
221 set_delete=set_delete)
223 aggregate.update(values)
224 try:
225 aggregate.save(context.session)
226 except db_exc.DBDuplicateEntry:
227 if 'name' in values:
228 raise exception.AggregateNameExists(
229 aggregate_name=values['name'])
230 else:
231 raise
232 return _aggregate_get_from_db(context, aggregate_id)
235@base.NovaObjectRegistry.register
236class Aggregate(base.NovaPersistentObject, base.NovaObject):
237 # Version 1.0: Initial version
238 # Version 1.1: String attributes updated to support unicode
239 # Version 1.2: Added uuid field
240 # Version 1.3: Added get_by_uuid method
241 VERSION = '1.3'
243 fields = {
244 'id': fields.IntegerField(),
245 'uuid': fields.UUIDField(nullable=False),
246 'name': fields.StringField(),
247 'hosts': fields.ListOfStringsField(nullable=True),
248 'metadata': fields.DictOfStringsField(nullable=True),
249 }
251 obj_extra_fields = ['availability_zone']
253 @staticmethod
254 def _from_db_object(context, aggregate, db_aggregate):
255 for key in aggregate.fields:
256 if key == 'metadata':
257 db_key = 'metadetails'
258 elif key in DEPRECATED_FIELDS and key not in db_aggregate:
259 continue
260 else:
261 db_key = key
262 setattr(aggregate, key, db_aggregate[db_key])
264 # NOTE: This can be removed when we bump Aggregate to v2.0
265 aggregate.deleted_at = None
266 aggregate.deleted = False
268 aggregate._context = context
269 aggregate.obj_reset_changes()
271 return aggregate
273 def _assert_no_hosts(self, action):
274 if 'hosts' in self.obj_what_changed():
275 raise exception.ObjectActionError(
276 action=action,
277 reason='hosts updated inline')
279 @base.remotable_classmethod
280 def get_by_id(cls, context, aggregate_id):
281 db_aggregate = _aggregate_get_from_db(context, aggregate_id)
282 return cls._from_db_object(context, cls(), db_aggregate)
284 @base.remotable_classmethod
285 def get_by_uuid(cls, context, aggregate_uuid):
286 db_aggregate = _aggregate_get_from_db_by_uuid(context,
287 aggregate_uuid)
288 return cls._from_db_object(context, cls(), db_aggregate)
290 @base.remotable
291 def create(self):
292 if self.obj_attr_is_set('id'):
293 raise exception.ObjectActionError(action='create',
294 reason='already created')
296 self._assert_no_hosts('create')
297 updates = self.obj_get_changes()
298 payload = dict(updates)
299 if 'metadata' in updates:
300 # NOTE(danms): For some reason the notification format is weird
301 payload['meta_data'] = payload.pop('metadata')
302 if 'uuid' not in updates:
303 updates['uuid'] = uuidutils.generate_uuid()
304 self.uuid = updates['uuid']
305 LOG.debug('Generated uuid %(uuid)s for aggregate',
306 dict(uuid=updates['uuid']))
307 compute_utils.notify_about_aggregate_update(self._context,
308 "create.start",
309 payload)
310 compute_utils.notify_about_aggregate_action(
311 context=self._context,
312 aggregate=self,
313 action=fields.NotificationAction.CREATE,
314 phase=fields.NotificationPhase.START)
316 metadata = updates.pop('metadata', None)
317 db_aggregate = _aggregate_create_in_db(self._context, updates,
318 metadata=metadata)
319 self._from_db_object(self._context, self, db_aggregate)
320 payload['aggregate_id'] = self.id
321 compute_utils.notify_about_aggregate_update(self._context,
322 "create.end",
323 payload)
324 compute_utils.notify_about_aggregate_action(
325 context=self._context,
326 aggregate=self,
327 action=fields.NotificationAction.CREATE,
328 phase=fields.NotificationPhase.END)
330 @base.remotable
331 def save(self):
332 self._assert_no_hosts('save')
333 updates = self.obj_get_changes()
335 payload = {'aggregate_id': self.id}
336 if 'metadata' in updates: 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true
337 payload['meta_data'] = updates['metadata']
338 compute_utils.notify_about_aggregate_update(self._context,
339 "updateprop.start",
340 payload)
341 compute_utils.notify_about_aggregate_action(
342 context=self._context,
343 aggregate=self,
344 action=fields.NotificationAction.UPDATE_PROP,
345 phase=fields.NotificationPhase.START)
346 updates.pop('id', None)
347 db_aggregate = _aggregate_update_to_db(self._context,
348 self.id, updates)
349 compute_utils.notify_about_aggregate_update(self._context,
350 "updateprop.end",
351 payload)
352 compute_utils.notify_about_aggregate_action(
353 context=self._context,
354 aggregate=self,
355 action=fields.NotificationAction.UPDATE_PROP,
356 phase=fields.NotificationPhase.END)
357 self._from_db_object(self._context, self, db_aggregate)
359 @base.remotable
360 def update_metadata(self, updates):
361 payload = {'aggregate_id': self.id,
362 'meta_data': updates}
363 compute_utils.notify_about_aggregate_update(self._context,
364 "updatemetadata.start",
365 payload)
366 compute_utils.notify_about_aggregate_action(
367 context=self._context,
368 aggregate=self,
369 action=fields.NotificationAction.UPDATE_METADATA,
370 phase=fields.NotificationPhase.START)
371 to_add = {}
372 for key, value in updates.items():
373 if value is None:
374 try:
375 _metadata_delete_from_db(self._context, self.id, key)
376 except exception.AggregateMetadataNotFound:
377 pass
378 try:
379 self.metadata.pop(key)
380 except KeyError:
381 pass
382 else:
383 to_add[key] = value
384 self.metadata[key] = value
385 _metadata_add_to_db(self._context, self.id, to_add)
386 compute_utils.notify_about_aggregate_update(self._context,
387 "updatemetadata.end",
388 payload)
389 compute_utils.notify_about_aggregate_action(
390 context=self._context,
391 aggregate=self,
392 action=fields.NotificationAction.UPDATE_METADATA,
393 phase=fields.NotificationPhase.END)
394 self.obj_reset_changes(fields=['metadata'])
396 @base.remotable
397 def destroy(self):
398 _aggregate_delete_from_db(self._context, self.id)
400 @base.remotable
401 def add_host(self, host):
402 _host_add_to_db(self._context, self.id, host)
404 if self.hosts is None: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true
405 self.hosts = []
406 self.hosts.append(host)
407 self.obj_reset_changes(fields=['hosts'])
409 @base.remotable
410 def delete_host(self, host):
411 _host_delete_from_db(self._context, self.id, host)
413 self.hosts.remove(host)
414 self.obj_reset_changes(fields=['hosts'])
416 @property
417 def availability_zone(self):
418 return self.metadata.get('availability_zone', None)
421@api_db_api.context_manager.reader
422def _get_all_from_db(context):
423 query = context.session.query(api_models.Aggregate).\
424 options(orm.joinedload(api_models.Aggregate._hosts)).\
425 options(orm.joinedload(api_models.Aggregate._metadata))
427 return query.all()
430@api_db_api.context_manager.reader
431def _get_by_host_from_db(context, host, key=None):
432 query = context.session.query(api_models.Aggregate).\
433 options(orm.joinedload(api_models.Aggregate._hosts)).\
434 options(orm.joinedload(api_models.Aggregate._metadata))
435 query = query.join(api_models.Aggregate._hosts)
436 query = query.filter(api_models.AggregateHost.host == host)
438 if key:
439 query = query.join(api_models.Aggregate._metadata).filter(
440 api_models.AggregateMetadata.key == key)
442 return query.all()
445@api_db_api.context_manager.reader
446def _get_by_metadata_from_db(context, key=None, value=None):
447 assert key is not None or value is not None
448 query = context.session.query(api_models.Aggregate)
449 query = query.join(api_models.Aggregate._metadata)
450 if key is not None: 450 ↛ 452line 450 didn't jump to line 452 because the condition on line 450 was always true
451 query = query.filter(api_models.AggregateMetadata.key == key)
452 if value is not None: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true
453 query = query.filter(api_models.AggregateMetadata.value == value)
454 query = query.options(
455 orm.contains_eager(api_models.Aggregate._metadata)
456 )
457 query = query.options(orm.joinedload(api_models.Aggregate._hosts))
459 return query.all()
462@api_db_api.context_manager.reader
463def _get_non_matching_by_metadata_keys_from_db(context, ignored_keys,
464 key_prefix, value):
465 """Filter aggregates based on non matching metadata.
467 Find aggregates with at least one ${key_prefix}*[=${value}] metadata where
468 the metadata key are not in the ignored_keys list.
470 :return: Aggregates with any metadata entry:
471 - whose key starts with `key_prefix`; and
472 - whose value is `value` and
473 - whose key is *not* in the `ignored_keys` list.
474 """
476 if not key_prefix:
477 raise ValueError(_('key_prefix mandatory field.'))
479 query = context.session.query(api_models.Aggregate)
480 query = query.join(api_models.Aggregate._metadata)
481 query = query.filter(api_models.AggregateMetadata.value == value)
482 query = query.filter(api_models.AggregateMetadata.key.like(
483 key_prefix + '%'))
484 if len(ignored_keys) > 0:
485 query = query.filter(
486 ~api_models.AggregateMetadata.key.in_(ignored_keys)
487 )
489 query = query.options(
490 orm.contains_eager(api_models.Aggregate._metadata)
491 )
492 query = query.options(orm.joinedload(api_models.Aggregate._hosts))
494 return query.all()
497@base.NovaObjectRegistry.register
498class AggregateList(base.ObjectListBase, base.NovaObject):
499 # Version 1.0: Initial version
500 # Version 1.1: Added key argument to get_by_host()
501 # Aggregate <= version 1.1
502 # Version 1.2: Added get_by_metadata_key
503 # Version 1.3: Added get_by_metadata
504 VERSION = '1.3'
506 fields = {
507 'objects': fields.ListOfObjectsField('Aggregate'),
508 }
510 @classmethod
511 def _filter_db_aggregates(cls, db_aggregates, hosts):
512 if not isinstance(hosts, set):
513 hosts = set(hosts)
514 filtered_aggregates = []
515 for db_aggregate in db_aggregates:
516 for host in db_aggregate['hosts']:
517 if host in hosts:
518 filtered_aggregates.append(db_aggregate)
519 break
520 return filtered_aggregates
522 @base.remotable_classmethod
523 def get_all(cls, context):
524 db_aggregates = _get_all_from_db(context)
525 return base.obj_make_list(context, cls(context), objects.Aggregate,
526 db_aggregates)
528 @base.remotable_classmethod
529 def get_by_host(cls, context, host, key=None):
530 db_aggregates = _get_by_host_from_db(context, host, key=key)
531 return base.obj_make_list(context, cls(context), objects.Aggregate,
532 db_aggregates)
534 @base.remotable_classmethod
535 def get_by_metadata_key(cls, context, key, hosts=None):
536 db_aggregates = _get_by_metadata_from_db(context, key=key)
537 if hosts is not None:
538 db_aggregates = cls._filter_db_aggregates(db_aggregates, hosts)
539 return base.obj_make_list(context, cls(context), objects.Aggregate,
540 db_aggregates)
542 @base.remotable_classmethod
543 def get_by_metadata(cls, context, key=None, value=None):
544 """Return aggregates with a metadata key set to value.
546 This returns a list of all aggregates that have a metadata key
547 set to some value. If key is specified, then only values for
548 that key will qualify.
549 """
550 db_aggregates = _get_by_metadata_from_db(context, key=key, value=value)
551 return base.obj_make_list(context, cls(context), objects.Aggregate,
552 db_aggregates)
554 @classmethod
555 def get_non_matching_by_metadata_keys(cls, context, ignored_keys,
556 key_prefix, value):
557 """Return aggregates that are not matching with metadata.
559 For example, we have aggregates with metadata as below:
561 'agg1' with trait:HW_CPU_X86_MMX="required"
562 'agg2' with trait:HW_CPU_X86_SGX="required"
563 'agg3' with trait:HW_CPU_X86_MMX="required"
564 'agg3' with trait:HW_CPU_X86_SGX="required"
566 Assume below request:
568 aggregate_obj.AggregateList.get_non_matching_by_metadata_keys(
569 self.context,
570 ['trait:HW_CPU_X86_MMX'],
571 'trait:',
572 value='required')
574 It will return 'agg2' and 'agg3' as aggregates that are not matching
575 with metadata.
577 :param context: The security context
578 :param ignored_keys: List of keys to match with the aggregate metadata
579 keys that starts with key_prefix.
580 :param key_prefix: Only compares metadata keys that starts with the
581 key_prefix
582 :param value: Value of metadata
584 :returns: List of aggregates that doesn't match metadata keys that
585 starts with key_prefix with the supplied keys.
586 """
587 db_aggregates = _get_non_matching_by_metadata_keys_from_db(
588 context, ignored_keys, key_prefix, value)
589 return base.obj_make_list(context, objects.AggregateList(context),
590 objects.Aggregate, db_aggregates)