Coverage for nova/objects/aggregate.py: 82%

313 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-24 11:16 +0000

1# Copyright 2013 IBM Corp. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); you may 

4# not use this file except in compliance with the License. You may obtain 

5# a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

12# License for the specific language governing permissions and limitations 

13# under the License. 

14 

15from oslo_db import exception as db_exc 

16from oslo_log import log as logging 

17from oslo_utils import excutils 

18from oslo_utils import uuidutils 

19from sqlalchemy import orm 

20 

21from nova.compute import utils as compute_utils 

22from nova.db.api import api as api_db_api 

23from nova.db.api import models as api_models 

24from nova import exception 

25from nova.i18n import _ 

26from nova import objects 

27from nova.objects import base 

28from nova.objects import fields 

29 

30LOG = logging.getLogger(__name__) 

31 

32DEPRECATED_FIELDS = ['deleted', 'deleted_at'] 

33 

34 

35@api_db_api.context_manager.reader 

36def _aggregate_get_from_db(context, aggregate_id): 

37 query = context.session.query(api_models.Aggregate).\ 

38 options(orm.joinedload(api_models.Aggregate._hosts)).\ 

39 options(orm.joinedload(api_models.Aggregate._metadata)) 

40 query = query.filter(api_models.Aggregate.id == aggregate_id) 

41 

42 aggregate = query.first() 

43 

44 if not aggregate: 

45 raise exception.AggregateNotFound(aggregate_id=aggregate_id) 

46 

47 return aggregate 

48 

49 

50@api_db_api.context_manager.reader 

51def _aggregate_get_from_db_by_uuid(context, aggregate_uuid): 

52 query = context.session.query(api_models.Aggregate).\ 

53 options(orm.joinedload(api_models.Aggregate._hosts)).\ 

54 options(orm.joinedload(api_models.Aggregate._metadata)) 

55 query = query.filter(api_models.Aggregate.uuid == aggregate_uuid) 

56 

57 aggregate = query.first() 

58 

59 if not aggregate: 

60 raise exception.AggregateNotFound(aggregate_id=aggregate_uuid) 

61 

62 return aggregate 

63 

64 

65def _host_add_to_db(context, aggregate_id, host): 

66 try: 

67 with api_db_api.context_manager.writer.using(context): 

68 # Check to see if the aggregate exists 

69 _aggregate_get_from_db(context, aggregate_id) 

70 

71 host_ref = api_models.AggregateHost() 

72 host_ref.update({"host": host, "aggregate_id": aggregate_id}) 

73 host_ref.save(context.session) 

74 return host_ref 

75 except db_exc.DBDuplicateEntry: 

76 raise exception.AggregateHostExists(host=host, 

77 aggregate_id=aggregate_id) 

78 

79 

80def _host_delete_from_db(context, aggregate_id, host): 

81 count = 0 

82 with api_db_api.context_manager.writer.using(context): 

83 # Check to see if the aggregate exists 

84 _aggregate_get_from_db(context, aggregate_id) 

85 

86 query = context.session.query(api_models.AggregateHost) 

87 query = query.filter(api_models.AggregateHost.aggregate_id == 

88 aggregate_id) 

89 count = query.filter_by(host=host).delete() 

90 

91 if count == 0: 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true

92 raise exception.AggregateHostNotFound(aggregate_id=aggregate_id, 

93 host=host) 

94 

95 

96def _metadata_add_to_db(context, aggregate_id, metadata, max_retries=10, 

97 set_delete=False): 

98 all_keys = metadata.keys() 

99 for attempt in range(max_retries): 99 ↛ exitline 99 didn't return from function '_metadata_add_to_db' because the loop on line 99 didn't complete

100 try: 

101 with api_db_api.context_manager.writer.using(context): 

102 query = context.session.query(api_models.AggregateMetadata).\ 

103 filter_by(aggregate_id=aggregate_id) 

104 

105 if set_delete: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 query.filter(~api_models.AggregateMetadata.key. 

107 in_(all_keys)).\ 

108 delete(synchronize_session=False) 

109 

110 already_existing_keys = set() 

111 if all_keys: 111 ↛ 126line 111 didn't jump to line 126 because the condition on line 111 was always true

112 query = query.filter( 

113 api_models.AggregateMetadata.key.in_(all_keys)) 

114 for meta_ref in query.all(): 

115 key = meta_ref.key 

116 try: 

117 meta_ref.update({"value": metadata[key]}) 

118 already_existing_keys.add(key) 

119 except KeyError: 

120 # NOTE(ratailor): When user tries updating 

121 # metadata using case-sensitive key, we get 

122 # KeyError. 

123 raise exception.AggregateMetadataKeyExists( 

124 aggregate_id=aggregate_id, key=key) 

125 

126 new_entries = [] 

127 for key, value in metadata.items(): 

128 if key in already_existing_keys: 

129 continue 

130 new_entries.append({"key": key, 

131 "value": value, 

132 "aggregate_id": aggregate_id}) 

133 if new_entries: 

134 context.session.execute( 

135 api_models.AggregateMetadata.__table__.insert(), 

136 new_entries) 

137 

138 return metadata 

139 except db_exc.DBDuplicateEntry: 

140 # a concurrent transaction has been committed, 

141 # try again unless this was the last attempt 

142 with excutils.save_and_reraise_exception() as ctxt: 

143 if attempt < max_retries - 1: 

144 ctxt.reraise = False 

145 else: 

146 msg = _("Add metadata failed for aggregate %(id)s " 

147 "after %(retries)s retries") % \ 

148 {"id": aggregate_id, "retries": max_retries} 

149 LOG.warning(msg) 

150 

151 

152@api_db_api.context_manager.writer 

153def _metadata_delete_from_db(context, aggregate_id, key): 

154 # Check to see if the aggregate exists 

155 _aggregate_get_from_db(context, aggregate_id) 

156 

157 query = context.session.query(api_models.AggregateMetadata) 

158 query = query.filter(api_models.AggregateMetadata.aggregate_id == 

159 aggregate_id) 

160 count = query.filter_by(key=key).delete() 

161 

162 if count == 0: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true

163 raise exception.AggregateMetadataNotFound( 

164 aggregate_id=aggregate_id, metadata_key=key) 

165 

166 

167@api_db_api.context_manager.writer 

168def _aggregate_create_in_db(context, values, metadata=None): 

169 query = context.session.query(api_models.Aggregate) 

170 query = query.filter(api_models.Aggregate.name == values['name']) 

171 aggregate = query.first() 

172 

173 if not aggregate: 173 ↛ 182line 173 didn't jump to line 182 because the condition on line 173 was always true

174 aggregate = api_models.Aggregate() 

175 aggregate.update(values) 

176 aggregate.save(context.session) 

177 # We don't want these to be lazy loaded later. We know there is 

178 # nothing here since we just created this aggregate. 

179 aggregate._hosts = [] 

180 aggregate._metadata = [] 

181 else: 

182 raise exception.AggregateNameExists(aggregate_name=values['name']) 

183 if metadata: 

184 _metadata_add_to_db(context, aggregate.id, metadata) 

185 context.session.expire(aggregate, ['_metadata']) 

186 aggregate._metadata 

187 

188 return aggregate 

189 

190 

191@api_db_api.context_manager.writer 

192def _aggregate_delete_from_db(context, aggregate_id): 

193 # Delete Metadata first 

194 context.session.query(api_models.AggregateMetadata).\ 

195 filter_by(aggregate_id=aggregate_id).\ 

196 delete() 

197 

198 count = context.session.query(api_models.Aggregate).\ 

199 filter(api_models.Aggregate.id == aggregate_id).\ 

200 delete() 

201 

202 if count == 0: 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true

203 raise exception.AggregateNotFound(aggregate_id=aggregate_id) 

204 

205 

206@api_db_api.context_manager.writer 

207def _aggregate_update_to_db(context, aggregate_id, values): 

208 aggregate = _aggregate_get_from_db(context, aggregate_id) 

209 

210 set_delete = True 

211 if "availability_zone" in values: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 az = values.pop('availability_zone') 

213 if 'metadata' not in values: 

214 values['metadata'] = {'availability_zone': az} 

215 set_delete = False 

216 else: 

217 values['metadata']['availability_zone'] = az 

218 metadata = values.get('metadata') 

219 if metadata is not None: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true

220 _metadata_add_to_db(context, aggregate_id, values.pop('metadata'), 

221 set_delete=set_delete) 

222 

223 aggregate.update(values) 

224 try: 

225 aggregate.save(context.session) 

226 except db_exc.DBDuplicateEntry: 

227 if 'name' in values: 

228 raise exception.AggregateNameExists( 

229 aggregate_name=values['name']) 

230 else: 

231 raise 

232 return _aggregate_get_from_db(context, aggregate_id) 

233 

234 

235@base.NovaObjectRegistry.register 

236class Aggregate(base.NovaPersistentObject, base.NovaObject): 

237 # Version 1.0: Initial version 

238 # Version 1.1: String attributes updated to support unicode 

239 # Version 1.2: Added uuid field 

240 # Version 1.3: Added get_by_uuid method 

241 VERSION = '1.3' 

242 

243 fields = { 

244 'id': fields.IntegerField(), 

245 'uuid': fields.UUIDField(nullable=False), 

246 'name': fields.StringField(), 

247 'hosts': fields.ListOfStringsField(nullable=True), 

248 'metadata': fields.DictOfStringsField(nullable=True), 

249 } 

250 

251 obj_extra_fields = ['availability_zone'] 

252 

253 @staticmethod 

254 def _from_db_object(context, aggregate, db_aggregate): 

255 for key in aggregate.fields: 

256 if key == 'metadata': 

257 db_key = 'metadetails' 

258 elif key in DEPRECATED_FIELDS and key not in db_aggregate: 

259 continue 

260 else: 

261 db_key = key 

262 setattr(aggregate, key, db_aggregate[db_key]) 

263 

264 # NOTE: This can be removed when we bump Aggregate to v2.0 

265 aggregate.deleted_at = None 

266 aggregate.deleted = False 

267 

268 aggregate._context = context 

269 aggregate.obj_reset_changes() 

270 

271 return aggregate 

272 

273 def _assert_no_hosts(self, action): 

274 if 'hosts' in self.obj_what_changed(): 

275 raise exception.ObjectActionError( 

276 action=action, 

277 reason='hosts updated inline') 

278 

279 @base.remotable_classmethod 

280 def get_by_id(cls, context, aggregate_id): 

281 db_aggregate = _aggregate_get_from_db(context, aggregate_id) 

282 return cls._from_db_object(context, cls(), db_aggregate) 

283 

284 @base.remotable_classmethod 

285 def get_by_uuid(cls, context, aggregate_uuid): 

286 db_aggregate = _aggregate_get_from_db_by_uuid(context, 

287 aggregate_uuid) 

288 return cls._from_db_object(context, cls(), db_aggregate) 

289 

290 @base.remotable 

291 def create(self): 

292 if self.obj_attr_is_set('id'): 

293 raise exception.ObjectActionError(action='create', 

294 reason='already created') 

295 

296 self._assert_no_hosts('create') 

297 updates = self.obj_get_changes() 

298 payload = dict(updates) 

299 if 'metadata' in updates: 

300 # NOTE(danms): For some reason the notification format is weird 

301 payload['meta_data'] = payload.pop('metadata') 

302 if 'uuid' not in updates: 

303 updates['uuid'] = uuidutils.generate_uuid() 

304 self.uuid = updates['uuid'] 

305 LOG.debug('Generated uuid %(uuid)s for aggregate', 

306 dict(uuid=updates['uuid'])) 

307 compute_utils.notify_about_aggregate_update(self._context, 

308 "create.start", 

309 payload) 

310 compute_utils.notify_about_aggregate_action( 

311 context=self._context, 

312 aggregate=self, 

313 action=fields.NotificationAction.CREATE, 

314 phase=fields.NotificationPhase.START) 

315 

316 metadata = updates.pop('metadata', None) 

317 db_aggregate = _aggregate_create_in_db(self._context, updates, 

318 metadata=metadata) 

319 self._from_db_object(self._context, self, db_aggregate) 

320 payload['aggregate_id'] = self.id 

321 compute_utils.notify_about_aggregate_update(self._context, 

322 "create.end", 

323 payload) 

324 compute_utils.notify_about_aggregate_action( 

325 context=self._context, 

326 aggregate=self, 

327 action=fields.NotificationAction.CREATE, 

328 phase=fields.NotificationPhase.END) 

329 

330 @base.remotable 

331 def save(self): 

332 self._assert_no_hosts('save') 

333 updates = self.obj_get_changes() 

334 

335 payload = {'aggregate_id': self.id} 

336 if 'metadata' in updates: 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true

337 payload['meta_data'] = updates['metadata'] 

338 compute_utils.notify_about_aggregate_update(self._context, 

339 "updateprop.start", 

340 payload) 

341 compute_utils.notify_about_aggregate_action( 

342 context=self._context, 

343 aggregate=self, 

344 action=fields.NotificationAction.UPDATE_PROP, 

345 phase=fields.NotificationPhase.START) 

346 updates.pop('id', None) 

347 db_aggregate = _aggregate_update_to_db(self._context, 

348 self.id, updates) 

349 compute_utils.notify_about_aggregate_update(self._context, 

350 "updateprop.end", 

351 payload) 

352 compute_utils.notify_about_aggregate_action( 

353 context=self._context, 

354 aggregate=self, 

355 action=fields.NotificationAction.UPDATE_PROP, 

356 phase=fields.NotificationPhase.END) 

357 self._from_db_object(self._context, self, db_aggregate) 

358 

359 @base.remotable 

360 def update_metadata(self, updates): 

361 payload = {'aggregate_id': self.id, 

362 'meta_data': updates} 

363 compute_utils.notify_about_aggregate_update(self._context, 

364 "updatemetadata.start", 

365 payload) 

366 compute_utils.notify_about_aggregate_action( 

367 context=self._context, 

368 aggregate=self, 

369 action=fields.NotificationAction.UPDATE_METADATA, 

370 phase=fields.NotificationPhase.START) 

371 to_add = {} 

372 for key, value in updates.items(): 

373 if value is None: 

374 try: 

375 _metadata_delete_from_db(self._context, self.id, key) 

376 except exception.AggregateMetadataNotFound: 

377 pass 

378 try: 

379 self.metadata.pop(key) 

380 except KeyError: 

381 pass 

382 else: 

383 to_add[key] = value 

384 self.metadata[key] = value 

385 _metadata_add_to_db(self._context, self.id, to_add) 

386 compute_utils.notify_about_aggregate_update(self._context, 

387 "updatemetadata.end", 

388 payload) 

389 compute_utils.notify_about_aggregate_action( 

390 context=self._context, 

391 aggregate=self, 

392 action=fields.NotificationAction.UPDATE_METADATA, 

393 phase=fields.NotificationPhase.END) 

394 self.obj_reset_changes(fields=['metadata']) 

395 

396 @base.remotable 

397 def destroy(self): 

398 _aggregate_delete_from_db(self._context, self.id) 

399 

400 @base.remotable 

401 def add_host(self, host): 

402 _host_add_to_db(self._context, self.id, host) 

403 

404 if self.hosts is None: 404 ↛ 405line 404 didn't jump to line 405 because the condition on line 404 was never true

405 self.hosts = [] 

406 self.hosts.append(host) 

407 self.obj_reset_changes(fields=['hosts']) 

408 

409 @base.remotable 

410 def delete_host(self, host): 

411 _host_delete_from_db(self._context, self.id, host) 

412 

413 self.hosts.remove(host) 

414 self.obj_reset_changes(fields=['hosts']) 

415 

416 @property 

417 def availability_zone(self): 

418 return self.metadata.get('availability_zone', None) 

419 

420 

421@api_db_api.context_manager.reader 

422def _get_all_from_db(context): 

423 query = context.session.query(api_models.Aggregate).\ 

424 options(orm.joinedload(api_models.Aggregate._hosts)).\ 

425 options(orm.joinedload(api_models.Aggregate._metadata)) 

426 

427 return query.all() 

428 

429 

430@api_db_api.context_manager.reader 

431def _get_by_host_from_db(context, host, key=None): 

432 query = context.session.query(api_models.Aggregate).\ 

433 options(orm.joinedload(api_models.Aggregate._hosts)).\ 

434 options(orm.joinedload(api_models.Aggregate._metadata)) 

435 query = query.join(api_models.Aggregate._hosts) 

436 query = query.filter(api_models.AggregateHost.host == host) 

437 

438 if key: 

439 query = query.join(api_models.Aggregate._metadata).filter( 

440 api_models.AggregateMetadata.key == key) 

441 

442 return query.all() 

443 

444 

445@api_db_api.context_manager.reader 

446def _get_by_metadata_from_db(context, key=None, value=None): 

447 assert key is not None or value is not None 

448 query = context.session.query(api_models.Aggregate) 

449 query = query.join(api_models.Aggregate._metadata) 

450 if key is not None: 450 ↛ 452line 450 didn't jump to line 452 because the condition on line 450 was always true

451 query = query.filter(api_models.AggregateMetadata.key == key) 

452 if value is not None: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true

453 query = query.filter(api_models.AggregateMetadata.value == value) 

454 query = query.options( 

455 orm.contains_eager(api_models.Aggregate._metadata) 

456 ) 

457 query = query.options(orm.joinedload(api_models.Aggregate._hosts)) 

458 

459 return query.all() 

460 

461 

462@api_db_api.context_manager.reader 

463def _get_non_matching_by_metadata_keys_from_db(context, ignored_keys, 

464 key_prefix, value): 

465 """Filter aggregates based on non matching metadata. 

466 

467 Find aggregates with at least one ${key_prefix}*[=${value}] metadata where 

468 the metadata key are not in the ignored_keys list. 

469 

470 :return: Aggregates with any metadata entry: 

471 - whose key starts with `key_prefix`; and 

472 - whose value is `value` and 

473 - whose key is *not* in the `ignored_keys` list. 

474 """ 

475 

476 if not key_prefix: 

477 raise ValueError(_('key_prefix mandatory field.')) 

478 

479 query = context.session.query(api_models.Aggregate) 

480 query = query.join(api_models.Aggregate._metadata) 

481 query = query.filter(api_models.AggregateMetadata.value == value) 

482 query = query.filter(api_models.AggregateMetadata.key.like( 

483 key_prefix + '%')) 

484 if len(ignored_keys) > 0: 

485 query = query.filter( 

486 ~api_models.AggregateMetadata.key.in_(ignored_keys) 

487 ) 

488 

489 query = query.options( 

490 orm.contains_eager(api_models.Aggregate._metadata) 

491 ) 

492 query = query.options(orm.joinedload(api_models.Aggregate._hosts)) 

493 

494 return query.all() 

495 

496 

497@base.NovaObjectRegistry.register 

498class AggregateList(base.ObjectListBase, base.NovaObject): 

499 # Version 1.0: Initial version 

500 # Version 1.1: Added key argument to get_by_host() 

501 # Aggregate <= version 1.1 

502 # Version 1.2: Added get_by_metadata_key 

503 # Version 1.3: Added get_by_metadata 

504 VERSION = '1.3' 

505 

506 fields = { 

507 'objects': fields.ListOfObjectsField('Aggregate'), 

508 } 

509 

510 @classmethod 

511 def _filter_db_aggregates(cls, db_aggregates, hosts): 

512 if not isinstance(hosts, set): 

513 hosts = set(hosts) 

514 filtered_aggregates = [] 

515 for db_aggregate in db_aggregates: 

516 for host in db_aggregate['hosts']: 

517 if host in hosts: 

518 filtered_aggregates.append(db_aggregate) 

519 break 

520 return filtered_aggregates 

521 

522 @base.remotable_classmethod 

523 def get_all(cls, context): 

524 db_aggregates = _get_all_from_db(context) 

525 return base.obj_make_list(context, cls(context), objects.Aggregate, 

526 db_aggregates) 

527 

528 @base.remotable_classmethod 

529 def get_by_host(cls, context, host, key=None): 

530 db_aggregates = _get_by_host_from_db(context, host, key=key) 

531 return base.obj_make_list(context, cls(context), objects.Aggregate, 

532 db_aggregates) 

533 

534 @base.remotable_classmethod 

535 def get_by_metadata_key(cls, context, key, hosts=None): 

536 db_aggregates = _get_by_metadata_from_db(context, key=key) 

537 if hosts is not None: 

538 db_aggregates = cls._filter_db_aggregates(db_aggregates, hosts) 

539 return base.obj_make_list(context, cls(context), objects.Aggregate, 

540 db_aggregates) 

541 

542 @base.remotable_classmethod 

543 def get_by_metadata(cls, context, key=None, value=None): 

544 """Return aggregates with a metadata key set to value. 

545 

546 This returns a list of all aggregates that have a metadata key 

547 set to some value. If key is specified, then only values for 

548 that key will qualify. 

549 """ 

550 db_aggregates = _get_by_metadata_from_db(context, key=key, value=value) 

551 return base.obj_make_list(context, cls(context), objects.Aggregate, 

552 db_aggregates) 

553 

554 @classmethod 

555 def get_non_matching_by_metadata_keys(cls, context, ignored_keys, 

556 key_prefix, value): 

557 """Return aggregates that are not matching with metadata. 

558 

559 For example, we have aggregates with metadata as below: 

560 

561 'agg1' with trait:HW_CPU_X86_MMX="required" 

562 'agg2' with trait:HW_CPU_X86_SGX="required" 

563 'agg3' with trait:HW_CPU_X86_MMX="required" 

564 'agg3' with trait:HW_CPU_X86_SGX="required" 

565 

566 Assume below request: 

567 

568 aggregate_obj.AggregateList.get_non_matching_by_metadata_keys( 

569 self.context, 

570 ['trait:HW_CPU_X86_MMX'], 

571 'trait:', 

572 value='required') 

573 

574 It will return 'agg2' and 'agg3' as aggregates that are not matching 

575 with metadata. 

576 

577 :param context: The security context 

578 :param ignored_keys: List of keys to match with the aggregate metadata 

579 keys that starts with key_prefix. 

580 :param key_prefix: Only compares metadata keys that starts with the 

581 key_prefix 

582 :param value: Value of metadata 

583 

584 :returns: List of aggregates that doesn't match metadata keys that 

585 starts with key_prefix with the supplied keys. 

586 """ 

587 db_aggregates = _get_non_matching_by_metadata_keys_from_db( 

588 context, ignored_keys, key_prefix, value) 

589 return base.obj_make_list(context, objects.AggregateList(context), 

590 objects.Aggregate, db_aggregates)