Coverage for nova/db/main/api.py: 93%

2135 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-17 15:08 +0000

1# Copyright (c) 2011 X.commerce, a business unit of eBay Inc. 

2# Copyright 2010 United States Government as represented by the 

3# Administrator of the National Aeronautics and Space Administration. 

4# All Rights Reserved. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); you may 

7# not use this file except in compliance with the License. You may obtain 

8# a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

15# License for the specific language governing permissions and limitations 

16# under the License. 

17 

18"""Implementation of SQLAlchemy backend.""" 

19 

20import collections 

21import copy 

22import datetime 

23import functools 

24import inspect 

25import traceback 

26 

27from oslo_db import api as oslo_db_api 

28from oslo_db import exception as db_exc 

29from oslo_db.sqlalchemy import enginefacade 

30from oslo_db.sqlalchemy import update_match 

31from oslo_db.sqlalchemy import utils as sqlalchemyutils 

32from oslo_log import log as logging 

33from oslo_utils import excutils 

34from oslo_utils import importutils 

35from oslo_utils import timeutils 

36from oslo_utils import uuidutils 

37import sqlalchemy as sa 

38from sqlalchemy import exc as sqla_exc 

39from sqlalchemy import orm 

40from sqlalchemy import schema 

41from sqlalchemy import sql 

42from sqlalchemy.sql import expression 

43from sqlalchemy.sql import func 

44 

45from nova import block_device 

46from nova.compute import task_states 

47from nova.compute import vm_states 

48import nova.conf 

49import nova.context 

50from nova.db.main import models 

51from nova.db import utils as db_utils 

52from nova.db.utils import require_context 

53from nova import exception 

54from nova.i18n import _ 

55from nova import safe_utils 

56 

57profiler_sqlalchemy = importutils.try_import('osprofiler.sqlalchemy') 

58 

59CONF = nova.conf.CONF 

60LOG = logging.getLogger(__name__) 

61 

62DISABLE_DB_ACCESS = False 

63 

64context_manager = enginefacade.transaction_context() 

65 

66 

67def _get_db_conf(conf_group, connection=None): 

68 kw = dict(conf_group.items()) 

69 if connection is not None: 

70 kw['connection'] = connection 

71 return kw 

72 

73 

74def _context_manager_from_context(context): 

75 if context: 

76 try: 

77 return context.db_connection 

78 except AttributeError: 

79 pass 

80 

81 

82def _joinedload_all(lead_entity, column): 

83 """Do a nested load. 

84 

85 For example, resolve the following:: 

86 

87 _joinedload_all(models.SecurityGroup, 'instances.info_cache') 

88 

89 to: 

90 

91 orm.joinedload( 

92 models.SecurityGroup.instances 

93 ).joinedload( 

94 Instance.info_cache 

95 ) 

96 """ 

97 elements = column.split('.') 

98 relationship_attr = getattr(lead_entity, elements.pop(0)) 

99 joined = orm.joinedload(relationship_attr) 

100 for element in elements: 

101 relationship_entity = relationship_attr.entity.class_ 

102 relationship_attr = getattr(relationship_entity, element) 

103 joined = joined.joinedload(relationship_attr) 

104 

105 return joined 

106 

107 

108def configure(conf): 

109 context_manager.configure(**_get_db_conf(conf.database)) 

110 

111 if ( 111 ↛ 116line 111 didn't jump to line 116 because the condition on line 111 was never true

112 profiler_sqlalchemy and 

113 CONF.profiler.enabled and 

114 CONF.profiler.trace_sqlalchemy 

115 ): 

116 context_manager.append_on_engine_create( 

117 lambda eng: profiler_sqlalchemy.add_tracing(sa, eng, "db")) 

118 

119 

120def create_context_manager(connection=None): 

121 """Create a database context manager object for a cell database connection. 

122 

123 :param connection: The database connection string 

124 """ 

125 ctxt_mgr = enginefacade.transaction_context() 

126 ctxt_mgr.configure(**_get_db_conf(CONF.database, connection=connection)) 

127 return ctxt_mgr 

128 

129 

130def get_context_manager(context): 

131 """Get a database context manager object. 

132 

133 :param context: The request context that can contain a context manager 

134 """ 

135 return _context_manager_from_context(context) or context_manager 

136 

137 

138def get_engine(use_slave=False, context=None): 

139 """Get a database engine object. 

140 

141 :param use_slave: Whether to use the slave connection 

142 :param context: The request context that can contain a context manager 

143 """ 

144 ctxt_mgr = get_context_manager(context) 

145 if use_slave: 

146 return ctxt_mgr.reader.get_engine() 

147 return ctxt_mgr.writer.get_engine() 

148 

149 

150_SHADOW_TABLE_PREFIX = 'shadow_' 

151_DEFAULT_QUOTA_NAME = 'default' 

152PER_PROJECT_QUOTAS = ['fixed_ips', 'floating_ips', 'networks'] 

153 

154 

155def select_db_reader_mode(f): 

156 """Decorator to select synchronous or asynchronous reader mode. 

157 

158 The kwarg argument 'use_slave' defines reader mode. Asynchronous reader 

159 will be used if 'use_slave' is True and synchronous reader otherwise. 

160 If 'use_slave' is not specified default value 'False' will be used. 

161 

162 Wrapped function must have a context in the arguments. 

163 """ 

164 

165 @functools.wraps(f) 

166 def wrapper(*args, **kwargs): 

167 wrapped_func = safe_utils.get_wrapped_function(f) 

168 keyed_args = inspect.getcallargs(wrapped_func, *args, **kwargs) 

169 

170 context = keyed_args['context'] 

171 use_slave = keyed_args.get('use_slave', False) 

172 

173 if use_slave: 

174 reader_mode = get_context_manager(context).async_ 

175 else: 

176 reader_mode = get_context_manager(context).reader 

177 

178 with reader_mode.using(context): 

179 return f(*args, **kwargs) 

180 wrapper.__signature__ = inspect.signature(f) 

181 return wrapper 

182 

183 

184def _check_db_access(): 

185 # disable all database access if required 

186 if DISABLE_DB_ACCESS: 

187 service_name = 'nova-compute' 

188 stacktrace = ''.join(traceback.format_stack()) 

189 LOG.error( 

190 'No DB access allowed in %(service_name)s: %(stacktrace)s', 

191 {'service_name': service_name, 'stacktrace': stacktrace}) 

192 raise exception.DBNotAllowed(binary=service_name) 

193 

194 

195def pick_context_manager_writer(f): 

196 """Decorator to use a writer db context manager. 

197 

198 The db context manager will be picked from the RequestContext. 

199 

200 Wrapped function must have a RequestContext in the arguments. 

201 """ 

202 @functools.wraps(f) 

203 def wrapper(context, *args, **kwargs): 

204 _check_db_access() 

205 ctxt_mgr = get_context_manager(context) 

206 with ctxt_mgr.writer.using(context): 

207 return f(context, *args, **kwargs) 

208 wrapper.__signature__ = inspect.signature(f) 

209 return wrapper 

210 

211 

212def pick_context_manager_reader(f): 

213 """Decorator to use a reader db context manager. 

214 

215 The db context manager will be picked from the RequestContext. 

216 

217 Wrapped function must have a RequestContext in the arguments. 

218 """ 

219 @functools.wraps(f) 

220 def wrapper(context, *args, **kwargs): 

221 _check_db_access() 

222 ctxt_mgr = get_context_manager(context) 

223 with ctxt_mgr.reader.using(context): 

224 return f(context, *args, **kwargs) 

225 wrapper.__signature__ = inspect.signature(f) 

226 return wrapper 

227 

228 

229def pick_context_manager_reader_allow_async(f): 

230 """Decorator to use a reader.allow_async db context manager. 

231 

232 The db context manager will be picked from the RequestContext. 

233 

234 Wrapped function must have a RequestContext in the arguments. 

235 """ 

236 @functools.wraps(f) 

237 def wrapper(context, *args, **kwargs): 

238 _check_db_access() 

239 ctxt_mgr = get_context_manager(context) 

240 with ctxt_mgr.reader.allow_async.using(context): 

241 return f(context, *args, **kwargs) 

242 wrapper.__signature__ = inspect.signature(f) 

243 return wrapper 

244 

245 

246def model_query( 

247 context, model, args=None, read_deleted=None, project_only=False, 

248): 

249 """Query helper that accounts for context's `read_deleted` field. 

250 

251 :param context: The request context that can contain a context manager 

252 :param model: Model to query. Must be a subclass of ModelBase. 

253 :param args: Arguments to query. If None - model is used. 

254 :param read_deleted: If not None, overrides context's read_deleted field. 

255 Permitted values are 'no', which does not return deleted values; 

256 'only', which only returns deleted values; and 'yes', which does not 

257 filter deleted values. 

258 :param project_only: If set and context is user-type, then restrict 

259 query to match the context's project_id. If set to 'allow_none', 

260 restriction includes project_id = None. 

261 """ 

262 

263 if read_deleted is None: 

264 read_deleted = context.read_deleted 

265 

266 query_kwargs = {} 

267 if 'no' == read_deleted: 

268 query_kwargs['deleted'] = False 

269 elif 'only' == read_deleted: 

270 query_kwargs['deleted'] = True 

271 elif 'yes' == read_deleted: 

272 pass 

273 else: 

274 raise ValueError(_("Unrecognized read_deleted value '%s'") 

275 % read_deleted) 

276 

277 query = sqlalchemyutils.model_query( 

278 model, context.session, args, **query_kwargs) 

279 

280 # We can't use oslo.db model_query's project_id here, as it doesn't allow 

281 # us to return both our projects and unowned projects. 

282 if nova.context.is_user_context(context) and project_only: 

283 if project_only == 'allow_none': 283 ↛ 284line 283 didn't jump to line 284 because the condition on line 283 was never true

284 query = query.filter(sql.or_( 

285 model.project_id == context.project_id, 

286 model.project_id == sql.null() 

287 )) 

288 else: 

289 query = query.filter_by(project_id=context.project_id) 

290 

291 return query 

292 

293 

294def convert_objects_related_datetimes(values, *datetime_keys): 

295 if not datetime_keys: 

296 datetime_keys = ('created_at', 'deleted_at', 'updated_at') 

297 

298 for key in datetime_keys: 

299 if key in values and values[key]: 

300 if isinstance(values[key], str): 

301 try: 

302 values[key] = timeutils.parse_strtime(values[key]) 

303 except ValueError: 

304 # Try alternate parsing since parse_strtime will fail 

305 # with say converting '2015-05-28T19:59:38+00:00' 

306 values[key] = timeutils.parse_isotime(values[key]) 

307 # NOTE(danms): Strip UTC timezones from datetimes, since they're 

308 # stored that way in the database 

309 values[key] = values[key].replace(tzinfo=None) 

310 return values 

311 

312 

313################### 

314 

315 

316def constraint(**conditions): 

317 """Return a constraint object suitable for use with some updates.""" 

318 return Constraint(conditions) 

319 

320 

321def equal_any(*values): 

322 """Return an equality condition object suitable for use in a constraint. 

323 

324 Equal_any conditions require that a model object's attribute equal any 

325 one of the given values. 

326 """ 

327 return EqualityCondition(values) 

328 

329 

330def not_equal(*values): 

331 """Return an inequality condition object suitable for use in a constraint. 

332 

333 Not_equal conditions require that a model object's attribute differs from 

334 all of the given values. 

335 """ 

336 return InequalityCondition(values) 

337 

338 

339class Constraint(object): 

340 

341 def __init__(self, conditions): 

342 self.conditions = conditions 

343 

344 def apply(self, model, query): 

345 for key, condition in self.conditions.items(): 

346 for clause in condition.clauses(getattr(model, key)): 

347 query = query.filter(clause) 

348 return query 

349 

350 

351class EqualityCondition(object): 

352 

353 def __init__(self, values): 

354 self.values = values 

355 

356 def clauses(self, field): 

357 # method signature requires us to return an iterable even if for OR 

358 # operator this will actually be a single clause 

359 return [sql.or_(*[field == value for value in self.values])] 

360 

361 

362class InequalityCondition(object): 

363 

364 def __init__(self, values): 

365 self.values = values 

366 

367 def clauses(self, field): 

368 return [field != value for value in self.values] 

369 

370 

371################### 

372 

373 

374@pick_context_manager_writer 

375def service_destroy(context, service_id): 

376 """Destroy the service or raise if it does not exist.""" 

377 service = service_get(context, service_id) 

378 

379 model_query(context, models.Service).\ 

380 filter_by(id=service_id).\ 

381 soft_delete(synchronize_session=False) 

382 

383 if service.binary == 'nova-compute': 

384 # TODO(sbauza): Remove the service_id filter in a later release 

385 # once we are sure that all compute nodes report the host field 

386 model_query(context, models.ComputeNode).\ 

387 filter(sql.or_( 

388 models.ComputeNode.service_id == service_id, 

389 models.ComputeNode.host == service['host'])).\ 

390 soft_delete(synchronize_session=False) 

391 

392 

393@pick_context_manager_reader 

394def service_get(context, service_id): 

395 """Get a service or raise if it does not exist.""" 

396 query = model_query(context, models.Service).filter_by(id=service_id) 

397 

398 result = query.first() 

399 if not result: 

400 raise exception.ServiceNotFound(service_id=service_id) 

401 

402 return result 

403 

404 

405@pick_context_manager_reader 

406def service_get_by_uuid(context, service_uuid): 

407 """Get a service by it's uuid or raise ServiceNotFound if it does not 

408 exist. 

409 """ 

410 query = model_query(context, models.Service).filter_by(uuid=service_uuid) 

411 

412 result = query.first() 

413 if not result: 

414 raise exception.ServiceNotFound(service_id=service_uuid) 

415 

416 return result 

417 

418 

419@pick_context_manager_reader_allow_async 

420def service_get_minimum_version(context, binaries): 

421 """Get the minimum service version in the database.""" 

422 min_versions = context.session.query( 

423 models.Service.binary, 

424 func.min(models.Service.version)).\ 

425 filter(models.Service.binary.in_(binaries)).\ 

426 filter(models.Service.deleted == 0).\ 

427 filter(models.Service.forced_down == sql.false()).\ 

428 group_by(models.Service.binary) 

429 return dict(min_versions) 

430 

431 

432@pick_context_manager_reader 

433def service_get_all(context, disabled=None): 

434 """Get all services.""" 

435 query = model_query(context, models.Service) 

436 

437 if disabled is not None: 

438 query = query.filter_by(disabled=disabled) 

439 

440 return query.all() 

441 

442 

443@pick_context_manager_reader 

444def service_get_all_by_topic(context, topic): 

445 """Get all services for a given topic.""" 

446 return model_query(context, models.Service, read_deleted="no").\ 

447 filter_by(disabled=False).\ 

448 filter_by(topic=topic).\ 

449 all() 

450 

451 

452@pick_context_manager_reader 

453def service_get_by_host_and_topic(context, host, topic): 

454 """Get a service by hostname and topic it listens to.""" 

455 return model_query(context, models.Service, read_deleted="no").\ 

456 filter_by(disabled=False).\ 

457 filter_by(host=host).\ 

458 filter_by(topic=topic).\ 

459 first() 

460 

461 

462@pick_context_manager_reader 

463def service_get_all_by_binary(context, binary, include_disabled=False): 

464 """Get services for a given binary. 

465 

466 Includes disabled services if 'include_disabled' parameter is True 

467 """ 

468 query = model_query(context, models.Service).filter_by(binary=binary) 

469 if not include_disabled: 

470 query = query.filter_by(disabled=False) 

471 return query.all() 

472 

473 

474@pick_context_manager_reader 

475def service_get_all_computes_by_hv_type(context, hv_type, 

476 include_disabled=False): 

477 """Get all compute services for a given hypervisor type. 

478 

479 Includes disabled services if 'include_disabled' parameter is True. 

480 """ 

481 query = model_query(context, models.Service, read_deleted="no").\ 

482 filter_by(binary='nova-compute') 

483 if not include_disabled: 

484 query = query.filter_by(disabled=False) 

485 query = query.join(models.ComputeNode, 

486 models.Service.host == models.ComputeNode.host).\ 

487 filter(models.ComputeNode.hypervisor_type == hv_type).\ 

488 distinct() 

489 return query.all() 

490 

491 

492@pick_context_manager_reader 

493def service_get_by_host_and_binary(context, host, binary): 

494 """Get a service by hostname and binary.""" 

495 result = model_query(context, models.Service, read_deleted="no").\ 

496 filter_by(host=host).\ 

497 filter_by(binary=binary).\ 

498 first() 

499 

500 if not result: 

501 raise exception.HostBinaryNotFound(host=host, binary=binary) 

502 

503 return result 

504 

505 

506@pick_context_manager_reader 

507def service_get_all_by_host(context, host): 

508 """Get all services for a given host.""" 

509 return model_query(context, models.Service, read_deleted="no").\ 

510 filter_by(host=host).\ 

511 all() 

512 

513 

514@pick_context_manager_reader_allow_async 

515def service_get_by_compute_host(context, host): 

516 """Get the service entry for a given compute host. 

517 

518 Returns the service entry joined with the compute_node entry. 

519 """ 

520 result = model_query(context, models.Service, read_deleted="no").\ 

521 filter_by(host=host).\ 

522 filter_by(binary='nova-compute').\ 

523 first() 

524 

525 if not result: 

526 raise exception.ComputeHostNotFound(host=host) 

527 

528 return result 

529 

530 

531@pick_context_manager_writer 

532def service_create(context, values): 

533 """Create a service from the values dictionary.""" 

534 service_ref = models.Service() 

535 service_ref.update(values) 

536 # We only auto-disable nova-compute services since those are the only 

537 # ones that can be enabled using the os-services REST API and they are 

538 # the only ones where being disabled means anything. It does 

539 # not make sense to be able to disable non-compute services like 

540 # nova-scheduler or nova-osapi_compute since that does nothing. 

541 if not CONF.enable_new_services and values.get('binary') == 'nova-compute': 

542 msg = _("New compute service disabled due to config option.") 

543 service_ref.disabled = True 

544 service_ref.disabled_reason = msg 

545 try: 

546 service_ref.save(context.session) 

547 except db_exc.DBDuplicateEntry as e: 

548 if 'binary' in e.columns: 

549 raise exception.ServiceBinaryExists(host=values.get('host'), 

550 binary=values.get('binary')) 

551 raise exception.ServiceTopicExists(host=values.get('host'), 

552 topic=values.get('topic')) 

553 return service_ref 

554 

555 

556@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) 

557@pick_context_manager_writer 

558def service_update(context, service_id, values): 

559 """Set the given properties on a service and update it. 

560 

561 :raises: NotFound if service does not exist. 

562 """ 

563 service_ref = service_get(context, service_id) 

564 # Only servicegroup.drivers.db.DbDriver._report_state() updates 

565 # 'report_count', so if that value changes then store the timestamp 

566 # as the last time we got a state report. 

567 if 'report_count' in values: 

568 if values['report_count'] > service_ref.report_count: 568 ↛ 570line 568 didn't jump to line 570 because the condition on line 568 was always true

569 service_ref.last_seen_up = timeutils.utcnow() 

570 service_ref.update(values) 

571 

572 return service_ref 

573 

574 

575################### 

576 

577 

578def _compute_node_select(context, filters=None, limit=None, marker=None): 

579 if filters is None: 

580 filters = {} 

581 

582 cn_tbl = models.ComputeNode.__table__.alias('cn') 

583 select = sa.select(cn_tbl) 

584 

585 if context.read_deleted == "no": 

586 select = select.where(cn_tbl.c.deleted == 0) 

587 if "compute_id" in filters: 

588 select = select.where(cn_tbl.c.id == filters["compute_id"]) 

589 if "service_id" in filters: 

590 select = select.where(cn_tbl.c.service_id == filters["service_id"]) 

591 if "host" in filters: 

592 select = select.where(cn_tbl.c.host == filters["host"]) 

593 if "hypervisor_hostname" in filters: 

594 hyp_hostname = filters["hypervisor_hostname"] 

595 select = select.where(cn_tbl.c.hypervisor_hostname == hyp_hostname) 

596 if "mapped" in filters: 

597 select = select.where(cn_tbl.c.mapped < filters['mapped']) 

598 if marker is not None: 

599 try: 

600 compute_node_get(context, marker) 

601 except exception.ComputeHostNotFound: 

602 raise exception.MarkerNotFound(marker=marker) 

603 select = select.where(cn_tbl.c.id > marker) 

604 if limit is not None: 

605 select = select.limit(limit) 

606 # Explicitly order by id, so we're not dependent on the native sort 

607 # order of the underlying DB. 

608 select = select.order_by(expression.asc("id")) 

609 return select 

610 

611 

612def _compute_node_fetchall(context, filters=None, limit=None, marker=None): 

613 select = _compute_node_select(context, filters, limit=limit, marker=marker) 

614 engine = get_engine(context=context) 

615 

616 with engine.connect() as conn, conn.begin(): 

617 results = conn.execute(select).fetchall() 

618 

619 # Callers expect dict-like objects, not SQLAlchemy RowProxy objects... 

620 results = [dict(r._mapping) for r in results] 

621 conn.close() 

622 return results 

623 

624 

625@pick_context_manager_reader 

626def compute_node_get(context, compute_id): 

627 """Get a compute node by its id. 

628 

629 :param context: The security context 

630 :param compute_id: ID of the compute node 

631 

632 :returns: Dictionary-like object containing properties of the compute node 

633 :raises: ComputeHostNotFound if compute node with the given ID doesn't 

634 exist. 

635 """ 

636 results = _compute_node_fetchall(context, {"compute_id": compute_id}) 

637 if not results: 

638 raise exception.ComputeHostNotFound(host=compute_id) 

639 return results[0] 

640 

641 

642# TODO(edleafe): remove once the compute node resource provider migration is 

643# complete, and this distinction is no longer necessary. 

644@pick_context_manager_reader 

645def compute_node_get_model(context, compute_id): 

646 """Get a compute node sqlalchemy model object by its id. 

647 

648 :param context: The security context 

649 :param compute_id: ID of the compute node 

650 

651 :returns: Sqlalchemy model object containing properties of the compute node 

652 :raises: ComputeHostNotFound if compute node with the given ID doesn't 

653 exist. 

654 """ 

655 result = model_query(context, models.ComputeNode).\ 

656 filter_by(id=compute_id).\ 

657 first() 

658 if not result: 

659 raise exception.ComputeHostNotFound(host=compute_id) 

660 return result 

661 

662 

663@pick_context_manager_reader 

664def compute_nodes_get_by_service_id(context, service_id): 

665 """Get a list of compute nodes by their associated service id. 

666 

667 :param context: The security context 

668 :param service_id: ID of the associated service 

669 

670 :returns: List of dictionary-like objects, each containing properties of 

671 the compute node, including its corresponding service and statistics 

672 :raises: ServiceNotFound if service with the given ID doesn't exist. 

673 """ 

674 results = _compute_node_fetchall(context, {"service_id": service_id}) 

675 if not results: 

676 raise exception.ServiceNotFound(service_id=service_id) 

677 return results 

678 

679 

680@pick_context_manager_reader 

681def compute_node_get_by_host_and_nodename(context, host, nodename): 

682 """Get a compute node by its associated host and nodename. 

683 

684 :param context: The security context (admin) 

685 :param host: Name of the host 

686 :param nodename: Name of the node 

687 

688 :returns: Dictionary-like object containing properties of the compute node, 

689 including its statistics 

690 :raises: ComputeHostNotFound if host with the given name doesn't exist. 

691 """ 

692 results = _compute_node_fetchall(context, 

693 {"host": host, "hypervisor_hostname": nodename}) 

694 if not results: 

695 raise exception.ComputeHostNotFound(host=host) 

696 return results[0] 

697 

698 

699@pick_context_manager_reader 

700def compute_node_get_by_nodename(context, hypervisor_hostname): 

701 """Get a compute node by hypervisor_hostname. 

702 

703 :param context: The security context (admin) 

704 :param hypervisor_hostname: Name of the node 

705 

706 :returns: Dictionary-like object containing properties of the compute node, 

707 including its statistics 

708 :raises: ComputeHostNotFound if hypervisor_hostname with the given name 

709 doesn't exist. 

710 """ 

711 results = _compute_node_fetchall(context, 

712 {"hypervisor_hostname": hypervisor_hostname}) 

713 if not results: 

714 raise exception.ComputeHostNotFound(host=hypervisor_hostname) 

715 return results[0] 

716 

717 

718@pick_context_manager_reader 

719def compute_node_get_all(context): 

720 """Get all compute nodes. 

721 

722 :param context: The security context 

723 

724 :returns: List of dictionaries each containing compute node properties 

725 """ 

726 return _compute_node_fetchall(context) 

727 

728 

729@pick_context_manager_reader_allow_async 

730def compute_node_get_all_by_host(context, host): 

731 """Get all compute nodes by host name. 

732 

733 :param context: The security context (admin) 

734 :param host: Name of the host 

735 

736 :returns: List of dictionaries each containing compute node properties 

737 """ 

738 results = _compute_node_fetchall(context, {"host": host}) 

739 if not results: 

740 raise exception.ComputeHostNotFound(host=host) 

741 return results 

742 

743 

744@pick_context_manager_reader 

745def compute_node_get_all_mapped_less_than(context, mapped_less_than): 

746 """Get all compute nodes with specific mapped values. 

747 

748 :param context: The security context 

749 :param mapped_less_than: Get compute nodes with mapped less than this value 

750 

751 :returns: List of dictionaries each containing compute node properties 

752 """ 

753 return _compute_node_fetchall(context, 

754 {'mapped': mapped_less_than}) 

755 

756 

757@pick_context_manager_reader 

758def compute_node_get_all_by_pagination(context, limit=None, marker=None): 

759 """Get all compute nodes by pagination. 

760 

761 :param context: The security context 

762 :param limit: Maximum number of items to return 

763 :param marker: The last item of the previous page, the next results after 

764 this value will be returned 

765 

766 :returns: List of dictionaries each containing compute node properties 

767 """ 

768 return _compute_node_fetchall(context, limit=limit, marker=marker) 

769 

770 

771@pick_context_manager_reader 

772def compute_node_search_by_hypervisor(context, hypervisor_match): 

773 """Get all compute nodes by hypervisor hostname. 

774 

775 :param context: The security context 

776 :param hypervisor_match: The hypervisor hostname 

777 

778 :returns: List of dictionary-like objects each containing compute node 

779 properties 

780 """ 

781 field = models.ComputeNode.hypervisor_hostname 

782 return model_query(context, models.ComputeNode).\ 

783 filter(field.like('%%%s%%' % hypervisor_match)).\ 

784 all() 

785 

786 

787@pick_context_manager_writer 

788def _compute_node_create(context, values): 

789 """Create a compute node from the values dictionary. 

790 

791 :param context: The security context 

792 :param values: Dictionary containing compute node properties 

793 

794 :returns: Dictionary-like object containing the properties of the created 

795 node, including its corresponding service and statistics 

796 """ 

797 convert_objects_related_datetimes(values) 

798 

799 compute_node_ref = models.ComputeNode() 

800 compute_node_ref.update(values) 

801 compute_node_ref.save(context.session) 

802 return compute_node_ref 

803 

804 

805# NOTE(mgoddard): We avoid decorating this with @pick_context_manager_writer, 

806# so that we get a separate transaction in the exception handler. This avoids 

807# an error message about inactive DB sessions during a transaction rollback. 

808# See https://bugs.launchpad.net/nova/+bug/1853159. 

809def compute_node_create(context, values): 

810 """Creates a new ComputeNode and populates the capacity fields 

811 with the most recent data. Will restore a soft deleted compute node if a 

812 UUID has been explicitly requested. 

813 """ 

814 try: 

815 compute_node_ref = _compute_node_create(context, values) 

816 except db_exc.DBDuplicateEntry: 

817 with excutils.save_and_reraise_exception(logger=LOG) as err_ctx: 

818 # Check to see if we have a (soft) deleted ComputeNode with the 

819 # same UUID and if so just update it and mark as no longer (soft) 

820 # deleted. See bug 1839560 for details. 

821 if 'uuid' in values: 821 ↛ 832line 821 didn't jump to line 832

822 # Get a fresh context for a new DB session and allow it to 

823 # get a deleted record. 

824 ctxt = nova.context.get_admin_context(read_deleted='yes') 

825 compute_node_ref = _compute_node_get_and_update_deleted( 

826 ctxt, values) 

827 # If we didn't get anything back we failed to find the node 

828 # by uuid and update it so re-raise the DBDuplicateEntry. 

829 if compute_node_ref: 

830 err_ctx.reraise = False 

831 

832 return compute_node_ref 

833 

834 

835@pick_context_manager_writer 

836def _compute_node_get_and_update_deleted(context, values): 

837 """Find a compute node by uuid, update and un-delete it. 

838 

839 This is a special case from the ``compute_node_create`` method which 

840 needs to be separate to get a new Session. 

841 

842 This method will update the ComputeNode, if found, to have deleted=0 and 

843 deleted_at=None values. 

844 

845 :param context: request auth context which should be able to read deleted 

846 records 

847 :param values: values used to update the ComputeNode record - must include 

848 uuid 

849 :return: updated ComputeNode sqlalchemy model object if successfully found 

850 and updated, None otherwise 

851 """ 

852 cn = model_query( 

853 context, models.ComputeNode).filter_by(uuid=values['uuid']).first() 

854 if cn: 

855 # Update with the provided values but un-soft-delete. 

856 update_values = copy.deepcopy(values) 

857 update_values['deleted'] = 0 

858 update_values['deleted_at'] = None 

859 return compute_node_update(context, cn.id, update_values) 

860 

861 

862@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) 

863@pick_context_manager_writer 

864def compute_node_update(context, compute_id, values): 

865 """Set the given properties on a compute node and update it. 

866 

867 :param context: The security context 

868 :param compute_id: ID of the compute node 

869 :param values: Dictionary containing compute node properties to be updated 

870 

871 :returns: Dictionary-like object containing the properties of the updated 

872 compute node, including its corresponding service and statistics 

873 :raises: ComputeHostNotFound if compute node with the given ID doesn't 

874 exist. 

875 """ 

876 compute_ref = compute_node_get_model(context, compute_id) 

877 # Always update this, even if there's going to be no other 

878 # changes in data. This ensures that we invalidate the 

879 # scheduler cache of compute node data in case of races. 

880 values['updated_at'] = timeutils.utcnow() 

881 convert_objects_related_datetimes(values) 

882 compute_ref.update(values) 

883 

884 return compute_ref 

885 

886 

887@pick_context_manager_writer 

888def compute_node_delete(context, compute_id, constraint=None): 

889 """Delete a compute node from the database. 

890 

891 :param context: The security context 

892 :param compute_id: ID of the compute node 

893 :param constraint: a constraint object 

894 

895 :raises: ComputeHostNotFound if compute node with the given ID doesn't 

896 exist. 

897 :raises: ConstraintNotMet if a constraint was specified and it was not met. 

898 """ 

899 query = model_query(context, models.ComputeNode).filter_by(id=compute_id) 

900 

901 if constraint is not None: 

902 query = constraint.apply(models.ComputeNode, query) 

903 

904 result = query.soft_delete(synchronize_session=False) 

905 

906 if not result: 

907 # The soft_delete could fail for one of two reasons: 

908 # 1) The compute node no longer exists 

909 # 2) The constraint, if specified, was not met 

910 # Try to read the compute node and let it raise ComputeHostNotFound if 

911 # 1) happened. 

912 compute_node_get(context, compute_id) 

913 # Else, raise ConstraintNotMet if 2) happened. 

914 raise exception.ConstraintNotMet() 

915 

916 

917@pick_context_manager_reader 

918def compute_node_statistics(context): 

919 """Get aggregate statistics over all compute nodes. 

920 

921 :param context: The security context 

922 

923 :returns: Dictionary containing compute node characteristics summed up 

924 over all the compute nodes, e.g. 'vcpus', 'free_ram_mb' etc. 

925 """ 

926 engine = get_engine(context=context) 

927 services_tbl = models.Service.__table__ 

928 

929 inner_sel = _compute_node_select(context).alias('inner_sel') 

930 

931 # TODO(sbauza): Remove the service_id filter in a later release 

932 # once we are sure that all compute nodes report the host field 

933 j = sa.join( 

934 inner_sel, services_tbl, 

935 sql.and_( 

936 sql.or_( 

937 inner_sel.c.host == services_tbl.c.host, 

938 inner_sel.c.service_id == services_tbl.c.id 

939 ), 

940 services_tbl.c.disabled == sql.false(), 

941 services_tbl.c.binary == 'nova-compute', 

942 services_tbl.c.deleted == 0 

943 ) 

944 ) 

945 

946 # NOTE(jaypipes): This COALESCE() stuff is temporary while the data 

947 # migration to the new resource providers inventories and allocations 

948 # tables is completed. 

949 agg_cols = [ 

950 func.count().label('count'), 

951 sql.func.sum( 

952 inner_sel.c.vcpus 

953 ).label('vcpus'), 

954 sql.func.sum( 

955 inner_sel.c.memory_mb 

956 ).label('memory_mb'), 

957 sql.func.sum( 

958 inner_sel.c.local_gb 

959 ).label('local_gb'), 

960 sql.func.sum( 

961 inner_sel.c.vcpus_used 

962 ).label('vcpus_used'), 

963 sql.func.sum( 

964 inner_sel.c.memory_mb_used 

965 ).label('memory_mb_used'), 

966 sql.func.sum( 

967 inner_sel.c.local_gb_used 

968 ).label('local_gb_used'), 

969 sql.func.sum( 

970 inner_sel.c.free_ram_mb 

971 ).label('free_ram_mb'), 

972 sql.func.sum( 

973 inner_sel.c.free_disk_gb 

974 ).label('free_disk_gb'), 

975 sql.func.sum( 

976 inner_sel.c.current_workload 

977 ).label('current_workload'), 

978 sql.func.sum( 

979 inner_sel.c.running_vms 

980 ).label('running_vms'), 

981 sql.func.sum( 

982 inner_sel.c.disk_available_least 

983 ).label('disk_available_least'), 

984 ] 

985 select = sql.select(*agg_cols).select_from(j) 

986 

987 with engine.connect() as conn, conn.begin(): 

988 results = conn.execute(select).fetchone() 

989 

990 # Build a dict of the info--making no assumptions about result 

991 fields = ('count', 'vcpus', 'memory_mb', 'local_gb', 'vcpus_used', 

992 'memory_mb_used', 'local_gb_used', 'free_ram_mb', 'free_disk_gb', 

993 'current_workload', 'running_vms', 'disk_available_least') 

994 results = {field: int(results[idx] or 0) 

995 for idx, field in enumerate(fields)} 

996 return results 

997 

998 

999################### 

1000 

1001 

1002@pick_context_manager_writer 

1003def certificate_create(context, values): 

1004 """Create a certificate from the values dictionary.""" 

1005 certificate_ref = models.Certificate() 

1006 for (key, value) in values.items(): 

1007 certificate_ref[key] = value 

1008 certificate_ref.save(context.session) 

1009 return certificate_ref 

1010 

1011 

1012@pick_context_manager_reader 

1013def certificate_get_all_by_project(context, project_id): 

1014 """Get all certificates for a project.""" 

1015 return model_query(context, models.Certificate, read_deleted="no").\ 

1016 filter_by(project_id=project_id).\ 

1017 all() 

1018 

1019 

1020@pick_context_manager_reader 

1021def certificate_get_all_by_user(context, user_id): 

1022 """Get all certificates for a user.""" 

1023 return model_query(context, models.Certificate, read_deleted="no").\ 

1024 filter_by(user_id=user_id).\ 

1025 all() 

1026 

1027 

1028@pick_context_manager_reader 

1029def certificate_get_all_by_user_and_project(context, user_id, project_id): 

1030 """Get all certificates for a user and project.""" 

1031 return model_query(context, models.Certificate, read_deleted="no").\ 

1032 filter_by(user_id=user_id).\ 

1033 filter_by(project_id=project_id).\ 

1034 all() 

1035 

1036 

1037################### 

1038 

1039 

1040@require_context 

1041@pick_context_manager_writer 

1042def virtual_interface_create(context, values): 

1043 """Create a new virtual interface record. 

1044 

1045 :param values: Dict containing column values. 

1046 """ 

1047 try: 

1048 vif_ref = models.VirtualInterface() 

1049 vif_ref.update(values) 

1050 vif_ref.save(context.session) 

1051 except db_exc.DBError: 

1052 LOG.exception("VIF creation failed with a database error.") 

1053 raise exception.VirtualInterfaceCreateException() 

1054 

1055 return vif_ref 

1056 

1057 

1058def _virtual_interface_query(context): 

1059 return model_query(context, models.VirtualInterface, read_deleted="no") 

1060 

1061 

1062@require_context 

1063@pick_context_manager_writer 

1064def virtual_interface_update(context, vif_uuid, values): 

1065 """Create a virtual interface record in the database.""" 

1066 vif_ref = virtual_interface_get_by_uuid(context, vif_uuid) 

1067 vif_ref.update(values) 

1068 vif_ref.save(context.session) 

1069 return vif_ref 

1070 

1071 

1072@require_context 

1073@pick_context_manager_reader 

1074def virtual_interface_get(context, vif_id): 

1075 """Get a virtual interface by ID. 

1076 

1077 :param vif_id: ID of the virtual interface. 

1078 """ 

1079 vif_ref = _virtual_interface_query(context).\ 

1080 filter_by(id=vif_id).\ 

1081 first() 

1082 return vif_ref 

1083 

1084 

1085@require_context 

1086@pick_context_manager_reader 

1087def virtual_interface_get_by_address(context, address): 

1088 """Get a virtual interface by address. 

1089 

1090 :param address: The address of the interface you're looking to get. 

1091 """ 

1092 try: 

1093 vif_ref = _virtual_interface_query(context).\ 

1094 filter_by(address=address).\ 

1095 first() 

1096 except db_exc.DBError: 

1097 msg = _("Invalid virtual interface address %s in request") % address 

1098 LOG.warning(msg) 

1099 raise exception.InvalidIpAddressError(msg) 

1100 return vif_ref 

1101 

1102 

1103@require_context 

1104@pick_context_manager_reader 

1105def virtual_interface_get_by_uuid(context, vif_uuid): 

1106 """Get a virtual interface by UUID. 

1107 

1108 :param vif_uuid: The uuid of the interface you're looking to get 

1109 """ 

1110 vif_ref = _virtual_interface_query(context).\ 

1111 filter_by(uuid=vif_uuid).\ 

1112 first() 

1113 return vif_ref 

1114 

1115 

1116@require_context 

1117@pick_context_manager_reader_allow_async 

1118def virtual_interface_get_by_instance(context, instance_uuid): 

1119 """Gets all virtual interfaces for instance. 

1120 

1121 :param instance_uuid: UUID of the instance to filter on. 

1122 """ 

1123 vif_refs = _virtual_interface_query(context).\ 

1124 filter_by(instance_uuid=instance_uuid).\ 

1125 order_by(expression.asc("created_at"), expression.asc("id")).\ 

1126 all() 

1127 return vif_refs 

1128 

1129 

1130@require_context 

1131@pick_context_manager_reader 

1132def virtual_interface_get_by_instance_and_network(context, instance_uuid, 

1133 network_id): 

1134 """Get all virtual interface for instance that's associated with 

1135 network. 

1136 """ 

1137 vif_ref = _virtual_interface_query(context).\ 

1138 filter_by(instance_uuid=instance_uuid).\ 

1139 filter_by(network_id=network_id).\ 

1140 first() 

1141 return vif_ref 

1142 

1143 

1144@require_context 

1145@pick_context_manager_writer 

1146def virtual_interface_delete_by_instance(context, instance_uuid): 

1147 """Delete virtual interface records associated with instance. 

1148 

1149 :param instance_uuid: UUID of the instance to filter on. 

1150 """ 

1151 _virtual_interface_query(context).\ 

1152 filter_by(instance_uuid=instance_uuid).\ 

1153 soft_delete() 

1154 

1155 

1156@require_context 

1157@pick_context_manager_writer 

1158def virtual_interface_delete(context, id): 

1159 """Delete a virtual interface records. 

1160 

1161 :param id: ID of the interface. 

1162 """ 

1163 _virtual_interface_query(context).\ 

1164 filter_by(id=id).\ 

1165 soft_delete() 

1166 

1167 

1168@require_context 

1169@pick_context_manager_reader 

1170def virtual_interface_get_all(context): 

1171 """Get all virtual interface records.""" 

1172 vif_refs = _virtual_interface_query(context).all() 

1173 return vif_refs 

1174 

1175 

1176################### 

1177 

1178 

1179def _metadata_refs(metadata_dict, meta_class): 

1180 metadata_refs = [] 

1181 if metadata_dict: 

1182 for k, v in metadata_dict.items(): 

1183 metadata_ref = meta_class() 

1184 metadata_ref['key'] = k 

1185 metadata_ref['value'] = v 

1186 metadata_refs.append(metadata_ref) 

1187 return metadata_refs 

1188 

1189 

1190def _validate_unique_server_name(context, name): 

1191 if not CONF.osapi_compute_unique_server_name_scope: 

1192 return 

1193 

1194 lowername = name.lower() 

1195 base_query = model_query(context, models.Instance, read_deleted='no').\ 

1196 filter(func.lower(models.Instance.hostname) == lowername) 

1197 

1198 if CONF.osapi_compute_unique_server_name_scope == 'project': 

1199 instance_with_same_name = base_query.\ 

1200 filter_by(project_id=context.project_id).\ 

1201 count() 

1202 

1203 elif CONF.osapi_compute_unique_server_name_scope == 'global': 1203 ↛ 1207line 1203 didn't jump to line 1207 because the condition on line 1203 was always true

1204 instance_with_same_name = base_query.count() 

1205 

1206 else: 

1207 return 

1208 

1209 if instance_with_same_name > 0: 

1210 raise exception.InstanceExists(name=lowername) 

1211 

1212 

1213def _handle_objects_related_type_conversions(values): 

1214 """Make sure that certain things in values (which may have come from 

1215 an objects.instance.Instance object) are in suitable form for the 

1216 database. 

1217 """ 

1218 # NOTE(danms): Make sure IP addresses are passed as strings to 

1219 # the database engine 

1220 for key in ('access_ip_v4', 'access_ip_v6'): 

1221 if key in values and values[key] is not None: 

1222 values[key] = str(values[key]) 

1223 

1224 datetime_keys = ('created_at', 'deleted_at', 'updated_at', 

1225 'launched_at', 'terminated_at') 

1226 convert_objects_related_datetimes(values, *datetime_keys) 

1227 

1228 

1229def _check_instance_exists_in_project(context, instance_uuid): 

1230 if not model_query(context, models.Instance, read_deleted="no", 

1231 project_only=True).filter_by( 

1232 uuid=instance_uuid).first(): 

1233 raise exception.InstanceNotFound(instance_id=instance_uuid) 

1234 

1235 

1236@require_context 

1237@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) 

1238@pick_context_manager_writer 

1239def instance_create(context, values): 

1240 """Create an instance from the values dictionary. 

1241 

1242 :param context: Request context object 

1243 :param values: Dict containing column values. 

1244 """ 

1245 

1246 default_group = security_group_ensure_default(context) 

1247 

1248 values = values.copy() 

1249 values['metadata'] = _metadata_refs( 

1250 values.get('metadata'), models.InstanceMetadata) 

1251 

1252 values['system_metadata'] = _metadata_refs( 

1253 values.get('system_metadata'), models.InstanceSystemMetadata) 

1254 _handle_objects_related_type_conversions(values) 

1255 

1256 instance_ref = models.Instance() 

1257 if not values.get('uuid'): 

1258 values['uuid'] = uuidutils.generate_uuid() 

1259 instance_ref['info_cache'] = models.InstanceInfoCache() 

1260 info_cache = values.pop('info_cache', None) 

1261 if info_cache is not None: 

1262 instance_ref['info_cache'].update(info_cache) 

1263 security_groups = values.pop('security_groups', []) 

1264 instance_ref['extra'] = models.InstanceExtra() 

1265 instance_ref['extra'].update( 

1266 {'numa_topology': None, 

1267 'pci_requests': None, 

1268 'vcpu_model': None, 

1269 'trusted_certs': None, 

1270 'resources': None, 

1271 }) 

1272 instance_ref['extra'].update(values.pop('extra', {})) 

1273 instance_ref.update(values) 

1274 

1275 # Gather the security groups for the instance 

1276 sg_models = [] 

1277 if 'default' in security_groups: 1277 ↛ 1278line 1277 didn't jump to line 1278 because the condition on line 1277 was never true

1278 sg_models.append(default_group) 

1279 # Generate a new list, so we don't modify the original 

1280 security_groups = [x for x in security_groups if x != 'default'] 

1281 if security_groups: 1281 ↛ 1282line 1281 didn't jump to line 1282 because the condition on line 1281 was never true

1282 sg_models.extend(_security_group_get_by_names( 

1283 context, security_groups)) 

1284 

1285 if 'hostname' in values: 

1286 _validate_unique_server_name(context, values['hostname']) 

1287 instance_ref.security_groups = sg_models 

1288 context.session.add(instance_ref) 

1289 

1290 # create the instance uuid to ec2_id mapping entry for instance 

1291 ec2_instance_create(context, instance_ref['uuid']) 

1292 

1293 # Parity with the return value of instance_get_all_by_filters_sort() 

1294 # Obviously a newly-created instance record can't already have a fault 

1295 # record because of the FK constraint, so this is fine. 

1296 instance_ref.fault = None 

1297 

1298 return instance_ref 

1299 

1300 

1301@require_context 

1302@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) 

1303@pick_context_manager_writer 

1304def instance_destroy( 

1305 context, instance_uuid, constraint=None, hard_delete=False, 

1306): 

1307 """Destroy the instance or raise if it does not exist. 

1308 

1309 :param context: request context object 

1310 :param instance_uuid: uuid of the instance to delete 

1311 :param constraint: a constraint object 

1312 :param hard_delete: when set to True, removes all records related to the 

1313 instance 

1314 """ 

1315 if uuidutils.is_uuid_like(instance_uuid): 1315 ↛ 1318line 1315 didn't jump to line 1318 because the condition on line 1315 was always true

1316 instance_ref = _instance_get_by_uuid(context, instance_uuid) 

1317 else: 

1318 raise exception.InvalidUUID(uuid=instance_uuid) 

1319 

1320 query = model_query(context, models.Instance).\ 

1321 filter_by(uuid=instance_uuid) 

1322 if constraint is not None: 

1323 query = constraint.apply(models.Instance, query) 

1324 # Either in hard or soft delete, we soft delete the instance first 

1325 # to make sure that the constraints were met. 

1326 count = query.soft_delete() 

1327 if count == 0: 

1328 # The failure to soft delete could be due to one of two things: 

1329 # 1) A racing request has deleted the instance out from under us 

1330 # 2) A constraint was not met 

1331 # Try to read the instance back once more and let it raise 

1332 # InstanceNotFound if 1) happened. This will give the caller an error 

1333 # that more accurately reflects the reason for the failure. 

1334 _instance_get_by_uuid(context, instance_uuid) 

1335 # Else, raise ConstraintNotMet if 2) happened. 

1336 raise exception.ConstraintNotMet() 

1337 

1338 models_to_delete = [ 

1339 models.SecurityGroupInstanceAssociation, models.InstanceInfoCache, 

1340 models.InstanceMetadata, models.InstanceFault, models.InstanceExtra, 

1341 models.InstanceSystemMetadata, models.BlockDeviceMapping, 

1342 models.Migration, models.VirtualInterface 

1343 ] 

1344 

1345 # For most referenced models we filter by the instance_uuid column, but for 

1346 # these models we filter by the uuid column. 

1347 filtered_by_uuid = [models.InstanceIdMapping] 

1348 

1349 for model in models_to_delete + filtered_by_uuid: 

1350 key = 'instance_uuid' if model not in filtered_by_uuid else 'uuid' 

1351 filter_ = {key: instance_uuid} 

1352 if hard_delete: 

1353 # We need to read any soft-deleted related records to make sure 

1354 # and clean those up as well otherwise we can fail with ForeignKey 

1355 # constraint errors when hard deleting the instance. 

1356 model_query(context, model, read_deleted='yes').filter_by( 

1357 **filter_).delete() 

1358 else: 

1359 model_query(context, model).filter_by(**filter_).soft_delete() 

1360 

1361 # NOTE(snikitin): We can't use model_query here, because there is no 

1362 # column 'deleted' in 'tags' or 'console_auth_tokens' tables. 

1363 context.session.query(models.Tag).filter_by( 

1364 resource_id=instance_uuid).delete() 

1365 context.session.query(models.ConsoleAuthToken).filter_by( 

1366 instance_uuid=instance_uuid).delete() 

1367 # NOTE(cfriesen): We intentionally do not soft-delete entries in the 

1368 # instance_actions or instance_actions_events tables because they 

1369 # can be used by operators to find out what actions were performed on a 

1370 # deleted instance. Both of these tables are special-cased in 

1371 # _archive_deleted_rows_for_table(). 

1372 if hard_delete: 

1373 # NOTE(ttsiousts): In case of hard delete, we need to remove the 

1374 # instance actions too since instance_uuid is a foreign key and 

1375 # for this we need to delete the corresponding InstanceActionEvents 

1376 actions = context.session.query(models.InstanceAction).filter_by( 

1377 instance_uuid=instance_uuid).all() 

1378 for action in actions: 

1379 context.session.query(models.InstanceActionEvent).filter_by( 

1380 action_id=action.id).delete() 

1381 context.session.query(models.InstanceAction).filter_by( 

1382 instance_uuid=instance_uuid).delete() 

1383 # NOTE(ttsiouts): The instance is the last thing to be deleted in 

1384 # order to respect all constraints 

1385 context.session.query(models.Instance).filter_by( 

1386 uuid=instance_uuid).delete() 

1387 

1388 return instance_ref 

1389 

1390 

1391@require_context 

1392@pick_context_manager_reader_allow_async 

1393def instance_get_by_uuid(context, uuid, columns_to_join=None): 

1394 """Get an instance or raise if it does not exist.""" 

1395 return _instance_get_by_uuid(context, uuid, 

1396 columns_to_join=columns_to_join) 

1397 

1398 

1399def _instance_get_by_uuid(context, uuid, columns_to_join=None): 

1400 result = _build_instance_get( 

1401 context, columns_to_join=columns_to_join 

1402 ).filter_by(uuid=uuid).first() 

1403 

1404 if not result: 

1405 raise exception.InstanceNotFound(instance_id=uuid) 

1406 

1407 return result 

1408 

1409 

1410@require_context 

1411@pick_context_manager_reader 

1412def instance_get(context, instance_id, columns_to_join=None): 

1413 """Get an instance or raise if it does not exist.""" 

1414 try: 

1415 result = _build_instance_get(context, columns_to_join=columns_to_join 

1416 ).filter_by(id=instance_id).first() 

1417 

1418 if not result: 

1419 raise exception.InstanceNotFound(instance_id=instance_id) 

1420 

1421 return result 

1422 except db_exc.DBError: 

1423 # NOTE(sdague): catch all in case the db engine chokes on the 

1424 # id because it's too long of an int to store. 

1425 LOG.warning("Invalid instance id %s in request", instance_id) 

1426 raise exception.InvalidID(id=instance_id) 

1427 

1428 

1429def _build_instance_get(context, columns_to_join=None): 

1430 query = model_query( 

1431 context, models.Instance, project_only=True, 

1432 ).options( 

1433 orm.joinedload( 

1434 models.Instance.security_groups 

1435 ).joinedload(models.SecurityGroup.rules) 

1436 ).options(orm.joinedload(models.Instance.info_cache)) 

1437 if columns_to_join is None: 

1438 columns_to_join = ['metadata', 'system_metadata'] 

1439 for column in columns_to_join: 

1440 if column in ['info_cache', 'security_groups']: 

1441 # Already always joined above 

1442 continue 

1443 if 'extra.' in column: 

1444 column_ref = getattr(models.InstanceExtra, column.split('.')[1]) 

1445 query = query.options( 

1446 orm.joinedload(models.Instance.extra).undefer(column_ref) 

1447 ) 

1448 elif column in ['metadata', 'system_metadata']: 

1449 # NOTE(melwitt): We use subqueryload() instead of joinedload() for 

1450 # metadata and system_metadata because of the one-to-many 

1451 # relationship of the data. Directly joining these columns can 

1452 # result in a large number of additional rows being queried if an 

1453 # instance has a large number of (system_)metadata items, resulting 

1454 # in a large data transfer. Instead, the subqueryload() will 

1455 # perform additional queries to obtain metadata and system_metadata 

1456 # for the instance. 

1457 column_ref = getattr(models.Instance, column) 

1458 query = query.options(orm.subqueryload(column_ref)) 

1459 else: 

1460 column_ref = getattr(models.Instance, column) 

1461 query = query.options(orm.joinedload(column_ref)) 

1462 # NOTE(alaski) Stop lazy loading of columns not needed. 

1463 for column in ['metadata', 'system_metadata']: 

1464 if column not in columns_to_join: 

1465 column_ref = getattr(models.Instance, column) 

1466 query = query.options(orm.noload(column_ref)) 

1467 # NOTE(melwitt): We need to use order_by(<unique column>) so that the 

1468 # additional queries emitted by subqueryload() include the same ordering as 

1469 # used by the parent query. 

1470 # https://docs.sqlalchemy.org/en/13/orm/loading_relationships.html#the-importance-of-ordering 

1471 return query.order_by(models.Instance.id) 

1472 

1473 

1474def _instances_fill_metadata(context, instances, manual_joins=None): 

1475 """Selectively fill instances with manually-joined metadata. Note that 

1476 instance will be converted to a dict. 

1477 

1478 :param context: security context 

1479 :param instances: list of instances to fill 

1480 :param manual_joins: list of tables to manually join (can be any 

1481 combination of 'metadata' and 'system_metadata' or 

1482 None to take the default of both) 

1483 """ 

1484 uuids = [inst['uuid'] for inst in instances] 

1485 

1486 if manual_joins is None: 

1487 manual_joins = ['metadata', 'system_metadata'] 

1488 

1489 meta = collections.defaultdict(list) 

1490 if 'metadata' in manual_joins: 

1491 for row in _instance_metadata_get_multi(context, uuids): 

1492 meta[row['instance_uuid']].append(row) 

1493 

1494 sys_meta = collections.defaultdict(list) 

1495 if 'system_metadata' in manual_joins: 

1496 for row in _instance_system_metadata_get_multi(context, uuids): 

1497 sys_meta[row['instance_uuid']].append(row) 

1498 

1499 pcidevs = collections.defaultdict(list) 

1500 if 'pci_devices' in manual_joins: 1500 ↛ 1501line 1500 didn't jump to line 1501 because the condition on line 1500 was never true

1501 for row in _instance_pcidevs_get_multi(context, uuids): 

1502 pcidevs[row['instance_uuid']].append(row) 

1503 

1504 if 'fault' in manual_joins: 

1505 faults = instance_fault_get_by_instance_uuids(context, uuids, 

1506 latest=True) 

1507 else: 

1508 faults = {} 

1509 

1510 filled_instances = [] 

1511 for inst in instances: 

1512 inst = dict(inst) 

1513 inst['system_metadata'] = sys_meta[inst['uuid']] 

1514 inst['metadata'] = meta[inst['uuid']] 

1515 if 'pci_devices' in manual_joins: 1515 ↛ 1516line 1515 didn't jump to line 1516 because the condition on line 1515 was never true

1516 inst['pci_devices'] = pcidevs[inst['uuid']] 

1517 inst_faults = faults.get(inst['uuid']) 

1518 inst['fault'] = inst_faults and inst_faults[0] or None 

1519 filled_instances.append(inst) 

1520 

1521 return filled_instances 

1522 

1523 

1524@require_context 

1525@pick_context_manager_reader 

1526def instances_fill_metadata(context, instances, manual_joins=None): 

1527 """Selectively fill instances with manually-joined metadata. 

1528 

1529 See _instances_fill_metadata(). This is only for use as a standalone 

1530 operation in its own transaction. 

1531 """ 

1532 return _instances_fill_metadata(context, instances, 

1533 manual_joins=manual_joins) 

1534 

1535 

1536def _manual_join_columns(columns_to_join): 

1537 """Separate manually joined columns from columns_to_join 

1538 

1539 If columns_to_join contains 'metadata', 'system_metadata', 'fault', or 

1540 'pci_devices' those columns are removed from columns_to_join and added 

1541 to a manual_joins list to be used with the _instances_fill_metadata method. 

1542 

1543 The columns_to_join formal parameter is copied and not modified, the return 

1544 tuple has the modified columns_to_join list to be used with joinedload in 

1545 a model query. 

1546 

1547 :param:columns_to_join: List of columns to join in a model query. 

1548 :return: tuple of (manual_joins, columns_to_join) 

1549 """ 

1550 manual_joins = [] 

1551 columns_to_join_new = copy.copy(columns_to_join) 

1552 for column in ('metadata', 'system_metadata', 'pci_devices', 'fault'): 

1553 if column in columns_to_join_new: 

1554 columns_to_join_new.remove(column) 

1555 manual_joins.append(column) 

1556 return manual_joins, columns_to_join_new 

1557 

1558 

1559@require_context 

1560@pick_context_manager_reader 

1561def instance_get_all(context, columns_to_join=None): 

1562 """Get all instances.""" 

1563 if columns_to_join is None: 

1564 columns_to_join_new = ['info_cache', 'security_groups'] 

1565 manual_joins = ['metadata', 'system_metadata'] 

1566 else: 

1567 manual_joins, columns_to_join_new = ( 

1568 _manual_join_columns(columns_to_join)) 

1569 query = model_query(context, models.Instance) 

1570 for column in columns_to_join_new: 

1571 column_ref = getattr(models.Instance, column) 

1572 query = query.options(orm.joinedload(column_ref)) 

1573 if not context.is_admin: 

1574 # If we're not admin context, add appropriate filter.. 

1575 if context.project_id: 1575 ↛ 1578line 1575 didn't jump to line 1578 because the condition on line 1575 was always true

1576 query = query.filter_by(project_id=context.project_id) 

1577 else: 

1578 query = query.filter_by(user_id=context.user_id) 

1579 instances = query.all() 

1580 return _instances_fill_metadata(context, instances, manual_joins) 

1581 

1582 

1583@require_context 

1584@pick_context_manager_reader_allow_async 

1585def instance_get_all_by_filters( 

1586 context, filters, sort_key='created_at', sort_dir='desc', limit=None, 

1587 marker=None, columns_to_join=None, 

1588): 

1589 """Get all instances matching all filters sorted by the primary key. 

1590 

1591 See instance_get_all_by_filters_sort for more information. 

1592 """ 

1593 # Invoke the API with the multiple sort keys and directions using the 

1594 # single sort key/direction 

1595 return instance_get_all_by_filters_sort(context, filters, limit=limit, 

1596 marker=marker, 

1597 columns_to_join=columns_to_join, 

1598 sort_keys=[sort_key], 

1599 sort_dirs=[sort_dir]) 

1600 

1601 

1602def _get_query_nova_resource_by_changes_time(query, filters, model_object): 

1603 """Filter resources by changes-since or changes-before. 

1604 

1605 Special keys are used to tweak the query further:: 

1606 

1607 | 'changes-since' - only return resources updated after 

1608 | 'changes-before' - only return resources updated before 

1609 

1610 Return query results. 

1611 

1612 :param query: query to apply filters to. 

1613 :param filters: dictionary of filters with regex values. 

1614 :param model_object: object of the operation target. 

1615 """ 

1616 for change_filter in ['changes-since', 'changes-before']: 

1617 if filters and filters.get(change_filter): 

1618 changes_filter_time = timeutils.normalize_time( 

1619 filters.get(change_filter)) 

1620 updated_at = getattr(model_object, 'updated_at') 

1621 if change_filter == 'changes-since': 

1622 query = query.filter(updated_at >= changes_filter_time) 

1623 else: 

1624 query = query.filter(updated_at <= changes_filter_time) 

1625 return query 

1626 

1627 

1628@require_context 

1629@pick_context_manager_reader_allow_async 

1630def instance_get_all_by_filters_sort(context, filters, limit=None, marker=None, 

1631 columns_to_join=None, sort_keys=None, 

1632 sort_dirs=None): 

1633 """Get all instances that match all filters sorted by the given keys. 

1634 

1635 Deleted instances will be returned by default, unless there's a filter that 

1636 says otherwise. 

1637 

1638 Depending on the name of a filter, matching for that filter is 

1639 performed using either exact matching or as regular expression 

1640 matching. Exact matching is applied for the following filters:: 

1641 

1642 | ['project_id', 'user_id', 'image_ref', 

1643 | 'vm_state', 'instance_type_id', 'uuid', 

1644 | 'metadata', 'host', 'system_metadata', 'locked', 'hidden'] 

1645 

1646 Hidden instances will *not* be returned by default, unless there's a 

1647 filter that says otherwise. 

1648 

1649 A third type of filter (also using exact matching), filters 

1650 based on instance metadata tags when supplied under a special 

1651 key named 'filter':: 

1652 

1653 | filters = { 

1654 | 'filter': [ 

1655 | {'name': 'tag-key', 'value': '<metakey>'}, 

1656 | {'name': 'tag-value', 'value': '<metaval>'}, 

1657 | {'name': 'tag:<metakey>', 'value': '<metaval>'} 

1658 | ] 

1659 | } 

1660 

1661 Special keys are used to tweak the query further:: 

1662 

1663 | 'changes-since' - only return instances updated after 

1664 | 'changes-before' - only return instances updated before 

1665 | 'deleted' - only return (or exclude) deleted instances 

1666 | 'soft_deleted' - modify behavior of 'deleted' to either 

1667 | include or exclude instances whose 

1668 | vm_state is SOFT_DELETED. 

1669 

1670 A fourth type of filter (also using exact matching), filters 

1671 based on instance tags (not metadata tags). There are two types 

1672 of these tags: 

1673 

1674 `tags` -- One or more strings that will be used to filter results 

1675 in an AND expression: T1 AND T2 

1676 

1677 `tags-any` -- One or more strings that will be used to filter results in 

1678 an OR expression: T1 OR T2 

1679 

1680 `not-tags` -- One or more strings that will be used to filter results in 

1681 an NOT AND expression: NOT (T1 AND T2) 

1682 

1683 `not-tags-any` -- One or more strings that will be used to filter results 

1684 in an NOT OR expression: NOT (T1 OR T2) 

1685 

1686 Tags should be represented as list:: 

1687 

1688 | filters = { 

1689 | 'tags': [some-tag, some-another-tag], 

1690 | 'tags-any: [some-any-tag, some-another-any-tag], 

1691 | 'not-tags: [some-not-tag, some-another-not-tag], 

1692 | 'not-tags-any: [some-not-any-tag, some-another-not-any-tag] 

1693 | } 

1694 """ 

1695 # NOTE(mriedem): If the limit is 0 there is no point in even going 

1696 # to the database since nothing is going to be returned anyway. 

1697 if limit == 0: 

1698 return [] 

1699 

1700 sort_keys, sort_dirs = db_utils.process_sort_params( 

1701 sort_keys, sort_dirs, default_dir='desc') 

1702 

1703 if columns_to_join is None: 

1704 columns_to_join_new = ['info_cache', 'security_groups'] 

1705 manual_joins = ['metadata', 'system_metadata'] 

1706 else: 

1707 manual_joins, columns_to_join_new = ( 

1708 _manual_join_columns(columns_to_join)) 

1709 

1710 query_prefix = context.session.query(models.Instance) 

1711 for column in columns_to_join_new: 

1712 if 'extra.' in column: 

1713 column_ref = getattr(models.InstanceExtra, column.split('.')[1]) 

1714 query_prefix = query_prefix.options( 

1715 orm.joinedload(models.Instance.extra).undefer(column_ref) 

1716 ) 

1717 else: 

1718 column_ref = getattr(models.Instance, column) 

1719 query_prefix = query_prefix.options(orm.joinedload(column_ref)) 

1720 

1721 # Note: order_by is done in the sqlalchemy.utils.py paginate_query(), 

1722 # no need to do it here as well 

1723 

1724 # Make a copy of the filters dictionary to use going forward, as we'll 

1725 # be modifying it and we shouldn't affect the caller's use of it. 

1726 filters = copy.deepcopy(filters) 

1727 

1728 model_object = models.Instance 

1729 query_prefix = _get_query_nova_resource_by_changes_time( 

1730 query_prefix, filters, model_object, 

1731 ) 

1732 

1733 if 'deleted' in filters: 

1734 # Instances can be soft or hard deleted and the query needs to 

1735 # include or exclude both 

1736 deleted = filters.pop('deleted') 

1737 if deleted: 

1738 if filters.pop('soft_deleted', True): 

1739 delete = sql.or_( 

1740 models.Instance.deleted == models.Instance.id, 

1741 models.Instance.vm_state == vm_states.SOFT_DELETED 

1742 ) 

1743 query_prefix = query_prefix.filter(delete) 

1744 else: 

1745 query_prefix = query_prefix.\ 

1746 filter(models.Instance.deleted == models.Instance.id) 

1747 else: 

1748 query_prefix = query_prefix.filter_by(deleted=0) 

1749 if not filters.pop('soft_deleted', False): 

1750 # It would be better to have vm_state not be nullable 

1751 # but until then we test it explicitly as a workaround. 

1752 not_soft_deleted = sql.or_( 

1753 models.Instance.vm_state != vm_states.SOFT_DELETED, 

1754 models.Instance.vm_state == sql.null() 

1755 ) 

1756 query_prefix = query_prefix.filter(not_soft_deleted) 

1757 

1758 if 'cleaned' in filters: 

1759 cleaned = 1 if filters.pop('cleaned') else 0 

1760 query_prefix = query_prefix.filter(models.Instance.cleaned == cleaned) 

1761 

1762 if 'tags' in filters: 

1763 tags = filters.pop('tags') 

1764 # We build a JOIN ladder expression for each tag, JOIN'ing 

1765 # the first tag to the instances table, and each subsequent 

1766 # tag to the last JOIN'd tags table 

1767 first_tag = tags.pop(0) 

1768 query_prefix = query_prefix.join(models.Instance.tags) 

1769 query_prefix = query_prefix.filter(models.Tag.tag == first_tag) 

1770 

1771 for tag in tags: 

1772 tag_alias = orm.aliased(models.Tag) 

1773 query_prefix = query_prefix.join(tag_alias, 

1774 models.Instance.tags) 

1775 query_prefix = query_prefix.filter(tag_alias.tag == tag) 

1776 

1777 if 'tags-any' in filters: 

1778 tags = filters.pop('tags-any') 

1779 tag_alias = orm.aliased(models.Tag) 

1780 query_prefix = query_prefix.join(tag_alias, models.Instance.tags) 

1781 query_prefix = query_prefix.filter(tag_alias.tag.in_(tags)) 

1782 

1783 if 'not-tags' in filters: 

1784 tags = filters.pop('not-tags') 

1785 first_tag = tags.pop(0) 

1786 subq = query_prefix.session.query(models.Tag.resource_id) 

1787 subq = subq.join(models.Instance.tags) 

1788 subq = subq.filter(models.Tag.tag == first_tag) 

1789 

1790 for tag in tags: 

1791 tag_alias = orm.aliased(models.Tag) 

1792 subq = subq.join(tag_alias, models.Instance.tags) 

1793 subq = subq.filter(tag_alias.tag == tag) 

1794 

1795 query_prefix = query_prefix.filter(~models.Instance.uuid.in_(subq)) 

1796 

1797 if 'not-tags-any' in filters: 

1798 tags = filters.pop('not-tags-any') 

1799 query_prefix = query_prefix.filter(~models.Instance.tags.any( 

1800 models.Tag.tag.in_(tags))) 

1801 

1802 if not context.is_admin: 

1803 # If we're not admin context, add appropriate filter.. 

1804 if context.project_id: 1804 ↛ 1807line 1804 didn't jump to line 1807 because the condition on line 1804 was always true

1805 filters['project_id'] = context.project_id 

1806 else: 

1807 filters['user_id'] = context.user_id 

1808 

1809 if filters.pop('hidden', False): 

1810 query_prefix = query_prefix.filter( 

1811 models.Instance.hidden == sql.true()) 

1812 else: 

1813 # If the query should not include hidden instances, then 

1814 # filter instances with hidden=False or hidden=NULL because 

1815 # older records may have no value set. 

1816 query_prefix = query_prefix.filter(sql.or_( 

1817 models.Instance.hidden == sql.false(), 

1818 models.Instance.hidden == sql.null())) 

1819 

1820 # Filters for exact matches that we can do along with the SQL query... 

1821 # For other filters that don't match this, we will do regexp matching 

1822 exact_match_filter_names = ['project_id', 'user_id', 'image_ref', 

1823 'vm_state', 'instance_type_id', 'uuid', 

1824 'metadata', 'host', 'task_state', 

1825 'system_metadata', 'locked', 'hidden'] 

1826 

1827 # Filter the query 

1828 query_prefix = _exact_instance_filter(query_prefix, 

1829 filters, exact_match_filter_names) 

1830 if query_prefix is None: 

1831 return [] 

1832 query_prefix = _regex_instance_filter(query_prefix, filters) 

1833 

1834 # paginate query 

1835 if marker is not None: 

1836 try: 

1837 marker = _instance_get_by_uuid( 

1838 context.elevated(read_deleted='yes'), marker, 

1839 ) 

1840 except exception.InstanceNotFound: 

1841 raise exception.MarkerNotFound(marker=marker) 

1842 try: 

1843 query_prefix = sqlalchemyutils.paginate_query( 

1844 query_prefix, 

1845 models.Instance, 

1846 limit, 

1847 sort_keys, 

1848 marker=marker, 

1849 sort_dirs=sort_dirs, 

1850 ) 

1851 except db_exc.InvalidSortKey: 

1852 raise exception.InvalidSortKey() 

1853 

1854 instances = query_prefix.all() 

1855 

1856 return _instances_fill_metadata(context, instances, manual_joins) 

1857 

1858 

1859@require_context 

1860@pick_context_manager_reader_allow_async 

1861def instance_get_by_sort_filters(context, sort_keys, sort_dirs, values): 

1862 """Get the UUID of the first instance in a sort order. 

1863 

1864 Attempt to get a single instance based on a combination of sort 

1865 keys, directions and filter values. This is used to try to find a 

1866 marker instance when we don't have a marker uuid. 

1867 

1868 :returns: The UUID of the instance that matched, if any. 

1869 """ 

1870 

1871 model = models.Instance 

1872 return _model_get_uuid_by_sort_filters(context, model, sort_keys, 

1873 sort_dirs, values) 

1874 

1875 

1876def _model_get_uuid_by_sort_filters(context, model, sort_keys, sort_dirs, 

1877 values): 

1878 query = context.session.query(model.uuid) 

1879 

1880 # NOTE(danms): Below is a re-implementation of our 

1881 # oslo_db.sqlalchemy.utils.paginate_query() utility. We can't use that 

1882 # directly because it does not return the marker and we need it to. 

1883 # The below is basically the same algorithm, stripped down to just what 

1884 # we need, and augmented with the filter criteria required for us to 

1885 # get back the instance that would correspond to our query. 

1886 

1887 # This is our position in sort_keys,sort_dirs,values for the loop below 

1888 key_index = 0 

1889 

1890 # We build a list of criteria to apply to the query, which looks 

1891 # approximately like this (assuming all ascending): 

1892 # 

1893 # OR(row.key1 > val1, 

1894 # AND(row.key1 == val1, row.key2 > val2), 

1895 # AND(row.key1 == val1, row.key2 == val2, row.key3 >= val3), 

1896 # ) 

1897 # 

1898 # The final key is compared with the "or equal" variant so that 

1899 # a complete match instance is still returned. 

1900 criteria = [] 

1901 

1902 for skey, sdir, val in zip(sort_keys, sort_dirs, values): 

1903 # Apply ordering to our query for the key, direction we're processing 

1904 if sdir == 'desc': 

1905 query = query.order_by(expression.desc(getattr(model, skey))) 

1906 else: 

1907 query = query.order_by(expression.asc(getattr(model, skey))) 

1908 

1909 # Build a list of equivalence requirements on keys we've already 

1910 # processed through the loop. In other words, if we're adding 

1911 # key2 > val2, make sure that key1 == val1 

1912 crit_attrs = [] 

1913 for equal_attr in range(0, key_index): 

1914 crit_attrs.append( 

1915 (getattr(model, sort_keys[equal_attr]) == values[equal_attr])) 

1916 

1917 model_attr = getattr(model, skey) 

1918 if isinstance(model_attr.type, sa.Boolean): 

1919 model_attr = expression.cast(model_attr, sa.Integer) 

1920 val = int(val) 

1921 

1922 if skey == sort_keys[-1]: 

1923 # If we are the last key, then we should use or-equal to 

1924 # allow a complete match to be returned 

1925 if sdir == 'asc': 

1926 crit = (model_attr >= val) 

1927 else: 

1928 crit = (model_attr <= val) 

1929 else: 

1930 # If we're not the last key, then strict greater or less than 

1931 # so we order strictly. 

1932 if sdir == 'asc': 1932 ↛ 1935line 1932 didn't jump to line 1935 because the condition on line 1932 was always true

1933 crit = (model_attr > val) 

1934 else: 

1935 crit = (model_attr < val) 

1936 

1937 # AND together all the above 

1938 crit_attrs.append(crit) 

1939 criteria.append(sql.and_(*crit_attrs)) 

1940 key_index += 1 

1941 

1942 # OR together all the ANDs 

1943 query = query.filter(sql.or_(*criteria)) 

1944 

1945 # We can't raise InstanceNotFound because we don't have a uuid to 

1946 # be looking for, so just return nothing if no match. 

1947 result = query.limit(1).first() 

1948 if result: 

1949 # We're querying for a single column, which means we get back a 

1950 # tuple of one thing. Strip that out and just return the uuid 

1951 # for our caller. 

1952 return result[0] 

1953 else: 

1954 return result 

1955 

1956 

1957def _db_connection_type(db_connection): 

1958 """Returns a lowercase symbol for the db type. 

1959 

1960 This is useful when we need to change what we are doing per DB 

1961 (like handling regexes). In a CellsV2 world it probably needs to 

1962 do something better than use the database configuration string. 

1963 """ 

1964 

1965 db_string = db_connection.split(':')[0].split('+')[0] 

1966 return db_string.lower() 

1967 

1968 

1969def _safe_regex_mysql(raw_string): 

1970 """Make regex safe to mysql. 

1971 

1972 Certain items like '|' are interpreted raw by mysql REGEX. If you 

1973 search for a single | then you trigger an error because it's 

1974 expecting content on either side. 

1975 

1976 For consistency sake we escape all '|'. This does mean we wouldn't 

1977 support something like foo|bar to match completely different 

1978 things, however, one can argue putting such complicated regex into 

1979 name search probably means you are doing this wrong. 

1980 """ 

1981 return raw_string.replace('|', '\\|') 

1982 

1983 

1984def _get_regexp_ops(connection): 

1985 """Return safety filter and db opts for regex.""" 

1986 regexp_op_map = { 

1987 'postgresql': '~', 

1988 'mysql': 'REGEXP', 

1989 'sqlite': 'REGEXP' 

1990 } 

1991 regex_safe_filters = { 

1992 'mysql': _safe_regex_mysql 

1993 } 

1994 db_type = _db_connection_type(connection) 

1995 

1996 return (regex_safe_filters.get(db_type, lambda x: x), 

1997 regexp_op_map.get(db_type, 'LIKE')) 

1998 

1999 

2000def _regex_instance_filter(query, filters): 

2001 """Applies regular expression filtering to an Instance query. 

2002 

2003 Returns the updated query. 

2004 

2005 :param query: query to apply filters to 

2006 :param filters: dictionary of filters with regex values 

2007 """ 

2008 

2009 model = models.Instance 

2010 safe_regex_filter, db_regexp_op = _get_regexp_ops(CONF.database.connection) 

2011 for filter_name in filters: 

2012 try: 

2013 column_attr = getattr(model, filter_name) 

2014 except AttributeError: 

2015 continue 

2016 if 'property' == type(column_attr).__name__: 2016 ↛ 2017line 2016 didn't jump to line 2017 because the condition on line 2016 was never true

2017 continue 

2018 filter_val = filters[filter_name] 

2019 # Sometimes the REGEX filter value is not a string 

2020 if not isinstance(filter_val, str): 

2021 filter_val = str(filter_val) 

2022 if db_regexp_op == 'LIKE': 

2023 query = query.filter(column_attr.op(db_regexp_op)( 

2024 u'%' + filter_val + u'%')) 

2025 else: 

2026 filter_val = safe_regex_filter(filter_val) 

2027 query = query.filter(column_attr.op(db_regexp_op)( 

2028 filter_val)) 

2029 return query 

2030 

2031 

2032def _exact_instance_filter(query, filters, legal_keys): 

2033 """Applies exact match filtering to an Instance query. 

2034 

2035 Returns the updated query. Modifies filters argument to remove 

2036 filters consumed. 

2037 

2038 :param query: query to apply filters to 

2039 :param filters: dictionary of filters; values that are lists, 

2040 tuples, sets, or frozensets cause an 'IN' test to 

2041 be performed, while exact matching ('==' operator) 

2042 is used for other values 

2043 :param legal_keys: list of keys to apply exact filtering to 

2044 """ 

2045 

2046 filter_dict = {} 

2047 model = models.Instance 

2048 

2049 # Walk through all the keys 

2050 for key in legal_keys: 

2051 # Skip ones we're not filtering on 

2052 if key not in filters: 

2053 continue 

2054 

2055 # OK, filtering on this key; what value do we search for? 

2056 value = filters.pop(key) 

2057 

2058 if key in ('metadata', 'system_metadata'): 

2059 column_attr = getattr(model, key) 

2060 if isinstance(value, list): 2060 ↛ 2061line 2060 didn't jump to line 2061 because the condition on line 2060 was never true

2061 for item in value: 

2062 for k, v in item.items(): 

2063 query = query.filter(column_attr.any(key=k)) 

2064 query = query.filter(column_attr.any(value=v)) 

2065 

2066 else: 

2067 for k, v in value.items(): 

2068 query = query.filter(column_attr.any(key=k)) 

2069 query = query.filter(column_attr.any(value=v)) 

2070 elif isinstance(value, (list, tuple, set, frozenset)): 

2071 if not value: 

2072 return None # empty IN-predicate; short circuit 

2073 # Looking for values in a list; apply to query directly 

2074 column_attr = getattr(model, key) 

2075 query = query.filter(column_attr.in_(value)) 

2076 else: 

2077 # OK, simple exact match; save for later 

2078 filter_dict[key] = value 

2079 

2080 # Apply simple exact matches 

2081 if filter_dict: 

2082 query = query.filter(*[getattr(models.Instance, k) == v 

2083 for k, v in filter_dict.items()]) 

2084 return query 

2085 

2086 

2087@require_context 

2088@pick_context_manager_reader_allow_async 

2089def instance_get_active_by_window_joined(context, begin, end=None, 

2090 project_id=None, host=None, 

2091 columns_to_join=None, limit=None, 

2092 marker=None): 

2093 """Get instances and joins active during a certain time window. 

2094 

2095 Specifying a project_id will filter for a certain project. 

2096 Specifying a host will filter for instances on a given compute host. 

2097 """ 

2098 query = context.session.query(models.Instance) 

2099 

2100 if columns_to_join is None: 

2101 columns_to_join_new = ['info_cache', 'security_groups'] 

2102 manual_joins = ['metadata', 'system_metadata'] 

2103 else: 

2104 manual_joins, columns_to_join_new = ( 

2105 _manual_join_columns(columns_to_join)) 

2106 

2107 for column in columns_to_join_new: 

2108 if 'extra.' in column: 

2109 column_ref = getattr(models.InstanceExtra, column.split('.')[1]) 

2110 query = query.options( 

2111 orm.joinedload(models.Instance.extra).undefer(column_ref) 

2112 ) 

2113 else: 

2114 column_ref = getattr(models.Instance, column) 

2115 query = query.options(orm.joinedload(column_ref)) 

2116 

2117 query = query.filter(sql.or_( 

2118 models.Instance.terminated_at == sql.null(), 

2119 models.Instance.terminated_at > begin)) 

2120 if end: 

2121 query = query.filter(models.Instance.launched_at < end) 

2122 if project_id: 

2123 query = query.filter_by(project_id=project_id) 

2124 if host: 

2125 query = query.filter_by(host=host) 

2126 

2127 if marker is not None: 

2128 try: 

2129 marker = _instance_get_by_uuid( 

2130 context.elevated(read_deleted='yes'), marker) 

2131 except exception.InstanceNotFound: 

2132 raise exception.MarkerNotFound(marker=marker) 

2133 

2134 query = sqlalchemyutils.paginate_query( 

2135 query, models.Instance, limit, ['project_id', 'uuid'], marker=marker, 

2136 ) 

2137 instances = query.all() 

2138 

2139 return _instances_fill_metadata(context, instances, manual_joins) 

2140 

2141 

2142def _instance_get_all_query(context, project_only=False, joins=None): 

2143 if joins is None: 

2144 joins = ['info_cache', 'security_groups'] 

2145 

2146 query = model_query( 

2147 context, 

2148 models.Instance, 

2149 project_only=project_only, 

2150 ) 

2151 for column in joins: 

2152 if 'extra.' in column: 

2153 column_ref = getattr(models.InstanceExtra, column.split('.')[1]) 

2154 query = query.options( 

2155 orm.joinedload(models.Instance.extra).undefer(column_ref) 

2156 ) 

2157 else: 

2158 column_ref = getattr(models.Instance, column) 

2159 query = query.options(orm.joinedload(column_ref)) 

2160 return query 

2161 

2162 

2163@pick_context_manager_reader_allow_async 

2164def instance_get_all_by_host(context, host, columns_to_join=None): 

2165 """Get all instances belonging to a host.""" 

2166 query = _instance_get_all_query(context, joins=columns_to_join) 

2167 instances = query.filter_by(host=host).all() 

2168 return _instances_fill_metadata( 

2169 context, 

2170 instances, 

2171 manual_joins=columns_to_join, 

2172 ) 

2173 

2174 

2175def _instance_get_all_uuids_by_hosts(context, hosts): 

2176 itbl = models.Instance.__table__ 

2177 default_deleted_value = itbl.c.deleted.default.arg 

2178 sel = sql.select(itbl.c.host, itbl.c.uuid) 

2179 sel = sel.where(sql.and_( 

2180 itbl.c.deleted == default_deleted_value, 

2181 itbl.c.host.in_(sa.bindparam('hosts', expanding=True)))) 

2182 

2183 # group the instance UUIDs by hostname 

2184 res = collections.defaultdict(list) 

2185 for rec in context.session.execute(sel, {'hosts': hosts}).fetchall(): 

2186 res[rec[0]].append(rec[1]) 

2187 return res 

2188 

2189 

2190@pick_context_manager_reader 

2191def instance_get_all_uuids_by_hosts(context, hosts): 

2192 """Get a dict, keyed by hostname, of a list of the instance UUIDs on the 

2193 host for each supplied hostname, not Instance model objects. 

2194 

2195 The dict is a defaultdict of list, thus inspecting the dict for a host not 

2196 in the dict will return an empty list not a KeyError. 

2197 """ 

2198 return _instance_get_all_uuids_by_hosts(context, hosts) 

2199 

2200 

2201@pick_context_manager_reader 

2202def instance_get_all_by_host_and_node( 

2203 context, host, node, columns_to_join=None, 

2204): 

2205 """Get all instances belonging to a node.""" 

2206 if columns_to_join is None: 

2207 manual_joins = [] 

2208 else: 

2209 candidates = ['system_metadata', 'metadata'] 

2210 manual_joins = [x for x in columns_to_join if x in candidates] 

2211 columns_to_join = list(set(columns_to_join) - set(candidates)) 

2212 instances = _instance_get_all_query( 

2213 context, 

2214 joins=columns_to_join, 

2215 ).filter_by(host=host).filter_by(node=node).all() 

2216 return _instances_fill_metadata( 

2217 context, 

2218 instances, 

2219 manual_joins=manual_joins, 

2220 ) 

2221 

2222 

2223@pick_context_manager_reader 

2224def instance_get_all_by_host_and_not_type(context, host, type_id=None): 

2225 """Get all instances belonging to a host with a different type_id.""" 

2226 instances = _instance_get_all_query(context).filter_by( 

2227 host=host, 

2228 ).filter( 

2229 models.Instance.instance_type_id != type_id 

2230 ).all() 

2231 return _instances_fill_metadata(context, instances) 

2232 

2233 

2234# NOTE(hanlind): This method can be removed as conductor RPC API moves to v2.0. 

2235@pick_context_manager_reader 

2236def instance_get_all_hung_in_rebooting(context, reboot_window): 

2237 """Get all instances stuck in a rebooting state.""" 

2238 reboot_window = (timeutils.utcnow() - 

2239 datetime.timedelta(seconds=reboot_window)) 

2240 

2241 # NOTE(danms): this is only used in the _poll_rebooting_instances() 

2242 # call in compute/manager, so we can avoid the metadata lookups 

2243 # explicitly 

2244 instances = model_query(context, models.Instance).filter( 

2245 models.Instance.updated_at <= reboot_window 

2246 ).filter_by(task_state=task_states.REBOOTING).all() 

2247 return _instances_fill_metadata( 

2248 context, 

2249 instances, 

2250 manual_joins=[], 

2251 ) 

2252 

2253 

2254def _retry_instance_update(): 

2255 """Wrap with oslo_db_api.wrap_db_retry, and also retry on 

2256 UnknownInstanceUpdateConflict. 

2257 """ 

2258 exception_checker = \ 

2259 lambda exc: isinstance(exc, (exception.UnknownInstanceUpdateConflict,)) 

2260 return oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True, 

2261 exception_checker=exception_checker) 

2262 

2263 

2264@require_context 

2265@_retry_instance_update() 

2266@pick_context_manager_writer 

2267def instance_update(context, instance_uuid, values, expected=None): 

2268 """Set the given properties on an instance and update it. 

2269 

2270 :raises: NotFound if instance does not exist. 

2271 """ 

2272 return _instance_update(context, instance_uuid, values, expected) 

2273 

2274 

2275@require_context 

2276@_retry_instance_update() 

2277@pick_context_manager_writer 

2278def instance_update_and_get_original(context, instance_uuid, values, 

2279 columns_to_join=None, expected=None): 

2280 """Set the given properties on an instance and update it. 

2281 

2282 Return a shallow copy of the original instance reference, as well as the 

2283 updated one. 

2284 

2285 If "expected_task_state" exists in values, the update can only happen 

2286 when the task state before update matches expected_task_state. Otherwise 

2287 a UnexpectedTaskStateError is thrown. 

2288 

2289 :param context: request context object 

2290 :param instance_uuid: instance uuid 

2291 :param values: dict containing column values 

2292 :returns: a tuple of the form (old_instance_ref, new_instance_ref) 

2293 :raises: NotFound if instance does not exist. 

2294 """ 

2295 instance_ref = _instance_get_by_uuid(context, instance_uuid, 

2296 columns_to_join=columns_to_join) 

2297 return (copy.copy(instance_ref), _instance_update( 

2298 context, instance_uuid, values, expected, original=instance_ref)) 

2299 

2300 

2301# NOTE(danms): This updates the instance's metadata list in-place and in 

2302# the database to avoid stale data and refresh issues. It assumes the 

2303# delete=True behavior of instance_metadata_update(...) 

2304def _instance_metadata_update_in_place(context, instance, metadata_type, model, 

2305 metadata): 

2306 metadata = dict(metadata) 

2307 to_delete = [] 

2308 for keyvalue in instance[metadata_type]: 

2309 key = keyvalue['key'] 

2310 if key in metadata: 

2311 keyvalue['value'] = metadata.pop(key) 

2312 elif key not in metadata: 2312 ↛ 2308line 2312 didn't jump to line 2308 because the condition on line 2312 was always true

2313 to_delete.append(keyvalue) 

2314 

2315 # NOTE: we have to hard_delete here otherwise we will get more than one 

2316 # system_metadata record when we read deleted for an instance; 

2317 # regular metadata doesn't have the same problem because we don't 

2318 # allow reading deleted regular metadata anywhere. 

2319 if metadata_type == 'system_metadata': 

2320 for condemned in to_delete: 

2321 context.session.delete(condemned) 

2322 instance[metadata_type].remove(condemned) 

2323 else: 

2324 for condemned in to_delete: 

2325 condemned.soft_delete(context.session) 

2326 

2327 for key, value in metadata.items(): 

2328 newitem = model() 

2329 newitem.update({'key': key, 'value': value, 

2330 'instance_uuid': instance['uuid']}) 

2331 context.session.add(newitem) 

2332 instance[metadata_type].append(newitem) 

2333 

2334 

2335def _instance_update(context, instance_uuid, values, expected, original=None): 

2336 if not uuidutils.is_uuid_like(instance_uuid): 2336 ↛ 2337line 2336 didn't jump to line 2337 because the condition on line 2336 was never true

2337 raise exception.InvalidUUID(uuid=instance_uuid) 

2338 

2339 # NOTE(mdbooth): We pop values from this dict below, so we copy it here to 

2340 # ensure there are no side effects for the caller or if we retry the 

2341 # function due to a db conflict. 

2342 updates = copy.copy(values) 

2343 

2344 if expected is None: 

2345 expected = {} 

2346 else: 

2347 # Coerce all single values to singleton lists 

2348 expected = {k: [None] if v is None else sqlalchemyutils.to_list(v) 

2349 for (k, v) in expected.items()} 

2350 

2351 # Extract 'expected_' values from values dict, as these aren't actually 

2352 # updates 

2353 for field in ('task_state', 'vm_state'): 

2354 expected_field = 'expected_%s' % field 

2355 if expected_field in updates: 

2356 value = updates.pop(expected_field, None) 

2357 # Coerce all single values to singleton lists 

2358 if value is None: 

2359 expected[field] = [None] 

2360 else: 

2361 expected[field] = sqlalchemyutils.to_list(value) 

2362 

2363 # Values which need to be updated separately 

2364 metadata = updates.pop('metadata', None) 

2365 system_metadata = updates.pop('system_metadata', None) 

2366 

2367 _handle_objects_related_type_conversions(updates) 

2368 

2369 # Hostname is potentially unique, but this is enforced in code rather 

2370 # than the DB. The query below races, but the number of users of 

2371 # osapi_compute_unique_server_name_scope is small, and a robust fix 

2372 # will be complex. This is intentionally left as is for the moment. 

2373 if 'hostname' in updates: 

2374 _validate_unique_server_name(context, updates['hostname']) 

2375 

2376 compare = models.Instance(uuid=instance_uuid, **expected) 

2377 try: 

2378 instance_ref = model_query(context, models.Instance, 

2379 project_only=True).\ 

2380 update_on_match(compare, 'uuid', updates) 

2381 except update_match.NoRowsMatched: 

2382 # Update failed. Try to find why and raise a specific error. 

2383 

2384 # We should get here only because our expected values were not current 

2385 # when update_on_match executed. Having failed, we now have a hint that 

2386 # the values are out of date and should check them. 

2387 

2388 # This code is made more complex because we are using repeatable reads. 

2389 # If we have previously read the original instance in the current 

2390 # transaction, reading it again will return the same data, even though 

2391 # the above update failed because it has changed: it is not possible to 

2392 # determine what has changed in this transaction. In this case we raise 

2393 # UnknownInstanceUpdateConflict, which will cause the operation to be 

2394 # retried in a new transaction. 

2395 

2396 # Because of the above, if we have previously read the instance in the 

2397 # current transaction it will have been passed as 'original', and there 

2398 # is no point refreshing it. If we have not previously read the 

2399 # instance, we can fetch it here and we will get fresh data. 

2400 if original is None: 

2401 original = _instance_get_by_uuid(context, instance_uuid) 

2402 

2403 conflicts_expected = {} 

2404 conflicts_actual = {} 

2405 for (field, expected_values) in expected.items(): 

2406 actual = original[field] 

2407 if actual not in expected_values: 

2408 conflicts_expected[field] = expected_values 

2409 conflicts_actual[field] = actual 

2410 

2411 # Exception properties 

2412 exc_props = { 

2413 'instance_uuid': instance_uuid, 

2414 'expected': conflicts_expected, 

2415 'actual': conflicts_actual 

2416 } 

2417 

2418 # There was a conflict, but something (probably the MySQL read view, 

2419 # but possibly an exceptionally unlikely second race) is preventing us 

2420 # from seeing what it is. When we go round again we'll get a fresh 

2421 # transaction and a fresh read view. 

2422 if len(conflicts_actual) == 0: 

2423 raise exception.UnknownInstanceUpdateConflict(**exc_props) 

2424 

2425 # Task state gets special handling for convenience. We raise the 

2426 # specific error UnexpectedDeletingTaskStateError or 

2427 # UnexpectedTaskStateError as appropriate 

2428 if 'task_state' in conflicts_actual: 

2429 conflict_task_state = conflicts_actual['task_state'] 

2430 if conflict_task_state == task_states.DELETING: 

2431 exc = exception.UnexpectedDeletingTaskStateError 

2432 else: 

2433 exc = exception.UnexpectedTaskStateError 

2434 

2435 # Everything else is an InstanceUpdateConflict 

2436 else: 

2437 exc = exception.InstanceUpdateConflict 

2438 

2439 raise exc(**exc_props) 

2440 

2441 if metadata is not None: 

2442 _instance_metadata_update_in_place(context, instance_ref, 

2443 'metadata', 

2444 models.InstanceMetadata, 

2445 metadata) 

2446 

2447 if system_metadata is not None: 

2448 _instance_metadata_update_in_place(context, instance_ref, 

2449 'system_metadata', 

2450 models.InstanceSystemMetadata, 

2451 system_metadata) 

2452 

2453 return instance_ref 

2454 

2455 

2456@pick_context_manager_writer 

2457def instance_add_security_group(context, instance_uuid, security_group_id): 

2458 """Associate the given security group with the given instance.""" 

2459 sec_group_ref = models.SecurityGroupInstanceAssociation() 

2460 sec_group_ref.update({'instance_uuid': instance_uuid, 

2461 'security_group_id': security_group_id}) 

2462 sec_group_ref.save(context.session) 

2463 

2464 

2465@require_context 

2466@pick_context_manager_writer 

2467def instance_remove_security_group(context, instance_uuid, security_group_id): 

2468 """Disassociate the given security group from the given instance.""" 

2469 model_query(context, models.SecurityGroupInstanceAssociation).\ 

2470 filter_by(instance_uuid=instance_uuid).\ 

2471 filter_by(security_group_id=security_group_id).\ 

2472 soft_delete() 

2473 

2474 

2475################### 

2476 

2477 

2478@require_context 

2479@pick_context_manager_reader 

2480def instance_info_cache_get(context, instance_uuid): 

2481 """Gets an instance info cache from the table. 

2482 

2483 :param instance_uuid: = uuid of the info cache's instance 

2484 """ 

2485 return model_query(context, models.InstanceInfoCache).\ 

2486 filter_by(instance_uuid=instance_uuid).\ 

2487 first() 

2488 

2489 

2490@require_context 

2491@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) 

2492@pick_context_manager_writer 

2493def instance_info_cache_update(context, instance_uuid, values): 

2494 """Update an instance info cache record in the table. 

2495 

2496 :param instance_uuid: = uuid of info cache's instance 

2497 :param values: = dict containing column values to update 

2498 """ 

2499 convert_objects_related_datetimes(values) 

2500 

2501 info_cache = model_query(context, models.InstanceInfoCache).\ 

2502 filter_by(instance_uuid=instance_uuid).\ 

2503 first() 

2504 needs_create = False 

2505 if info_cache and info_cache['deleted']: 2505 ↛ 2506line 2505 didn't jump to line 2506 because the condition on line 2505 was never true

2506 raise exception.InstanceInfoCacheNotFound( 

2507 instance_uuid=instance_uuid) 

2508 elif not info_cache: 

2509 # NOTE(tr3buchet): just in case someone blows away an instance's 

2510 # cache entry, re-create it. 

2511 values['instance_uuid'] = instance_uuid 

2512 info_cache = models.InstanceInfoCache(**values) 

2513 needs_create = True 

2514 

2515 try: 

2516 with get_context_manager(context).writer.savepoint.using(context): 

2517 if needs_create: 

2518 info_cache.save(context.session) 

2519 else: 

2520 info_cache.update(values) 

2521 except db_exc.DBDuplicateEntry: 

2522 # NOTE(sirp): Possible race if two greenthreads attempt to 

2523 # recreate the instance cache entry at the same time. First one 

2524 # wins. 

2525 pass 

2526 

2527 return info_cache 

2528 

2529 

2530@require_context 

2531@pick_context_manager_writer 

2532def instance_info_cache_delete(context, instance_uuid): 

2533 """Deletes an existing instance_info_cache record 

2534 

2535 :param instance_uuid: = uuid of the instance tied to the cache record 

2536 """ 

2537 model_query(context, models.InstanceInfoCache).\ 

2538 filter_by(instance_uuid=instance_uuid).\ 

2539 soft_delete() 

2540 

2541 

2542################### 

2543 

2544 

2545def _instance_extra_create(context, values): 

2546 inst_extra_ref = models.InstanceExtra() 

2547 inst_extra_ref.update(values) 

2548 inst_extra_ref.save(context.session) 

2549 return inst_extra_ref 

2550 

2551 

2552@pick_context_manager_writer 

2553def instance_extra_update_by_uuid(context, instance_uuid, updates): 

2554 """Update the instance extra record by instance uuid 

2555 

2556 :param instance_uuid: UUID of the instance tied to the record 

2557 :param updates: A dict of updates to apply 

2558 """ 

2559 rows_updated = model_query(context, models.InstanceExtra).\ 

2560 filter_by(instance_uuid=instance_uuid).\ 

2561 update(updates) 

2562 if not rows_updated: 

2563 LOG.debug("Created instance_extra for %s", instance_uuid) 

2564 create_values = copy.copy(updates) 

2565 create_values["instance_uuid"] = instance_uuid 

2566 _instance_extra_create(context, create_values) 

2567 rows_updated = 1 

2568 return rows_updated 

2569 

2570 

2571@pick_context_manager_reader 

2572def instance_extra_get_by_instance_uuid( 

2573 context, instance_uuid, columns=None, 

2574): 

2575 """Get the instance extra record 

2576 

2577 :param instance_uuid: UUID of the instance tied to the topology record 

2578 :param columns: A list of the columns to load, or None for 'all of them' 

2579 """ 

2580 query = model_query(context, models.InstanceExtra).filter_by( 

2581 instance_uuid=instance_uuid, 

2582 ) 

2583 if columns is None: 

2584 columns = ['numa_topology', 'pci_requests', 'flavor', 'vcpu_model', 

2585 'trusted_certs', 'resources', 'migration_context'] 

2586 for column in columns: 

2587 column_ref = getattr(models.InstanceExtra, column) 

2588 query = query.options(orm.undefer(column_ref)) 

2589 instance_extra = query.first() 

2590 return instance_extra 

2591 

2592 

2593################### 

2594 

2595 

2596@require_context 

2597@pick_context_manager_reader 

2598def quota_get(context, project_id, resource, user_id=None): 

2599 """Retrieve a quota or raise if it does not exist.""" 

2600 model = models.ProjectUserQuota if user_id else models.Quota 

2601 query = model_query(context, model).\ 

2602 filter_by(project_id=project_id).\ 

2603 filter_by(resource=resource) 

2604 if user_id: 

2605 query = query.filter_by(user_id=user_id) 

2606 

2607 result = query.first() 

2608 if not result: 

2609 if user_id: 

2610 raise exception.ProjectUserQuotaNotFound(project_id=project_id, 

2611 user_id=user_id) 

2612 else: 

2613 raise exception.ProjectQuotaNotFound(project_id=project_id) 

2614 

2615 return result 

2616 

2617 

2618@require_context 

2619@pick_context_manager_reader 

2620def quota_get_all_by_project_and_user(context, project_id, user_id): 

2621 """Retrieve all quotas associated with a given project and user.""" 

2622 user_quotas = model_query(context, models.ProjectUserQuota, 

2623 (models.ProjectUserQuota.resource, 

2624 models.ProjectUserQuota.hard_limit)).\ 

2625 filter_by(project_id=project_id).\ 

2626 filter_by(user_id=user_id).\ 

2627 all() 

2628 

2629 result = {'project_id': project_id, 'user_id': user_id} 

2630 for user_quota in user_quotas: 

2631 result[user_quota.resource] = user_quota.hard_limit 

2632 

2633 return result 

2634 

2635 

2636@require_context 

2637@pick_context_manager_reader 

2638def quota_get_all_by_project(context, project_id): 

2639 """Retrieve all quotas associated with a given project.""" 

2640 rows = model_query(context, models.Quota, read_deleted="no").\ 

2641 filter_by(project_id=project_id).\ 

2642 all() 

2643 

2644 result = {'project_id': project_id} 

2645 for row in rows: 

2646 result[row.resource] = row.hard_limit 

2647 

2648 return result 

2649 

2650 

2651@require_context 

2652@pick_context_manager_reader 

2653def quota_get_all(context, project_id): 

2654 """Retrieve all user quotas associated with a given project.""" 

2655 result = model_query(context, models.ProjectUserQuota).\ 

2656 filter_by(project_id=project_id).\ 

2657 all() 

2658 

2659 return result 

2660 

2661 

2662def quota_get_per_project_resources(): 

2663 """Retrieve the names of resources whose quotas are calculated on a 

2664 per-project rather than a per-user basis. 

2665 """ 

2666 return PER_PROJECT_QUOTAS 

2667 

2668 

2669@pick_context_manager_writer 

2670def quota_create(context, project_id, resource, limit, user_id=None): 

2671 """Create a quota for the given project and resource.""" 

2672 per_user = user_id and resource not in PER_PROJECT_QUOTAS 

2673 quota_ref = models.ProjectUserQuota() if per_user else models.Quota() 

2674 if per_user: 

2675 quota_ref.user_id = user_id 

2676 quota_ref.project_id = project_id 

2677 quota_ref.resource = resource 

2678 quota_ref.hard_limit = limit 

2679 try: 

2680 quota_ref.save(context.session) 

2681 except db_exc.DBDuplicateEntry: 

2682 raise exception.QuotaExists(project_id=project_id, resource=resource) 

2683 return quota_ref 

2684 

2685 

2686@pick_context_manager_writer 

2687def quota_update(context, project_id, resource, limit, user_id=None): 

2688 """Update a quota or raise if it does not exist.""" 

2689 per_user = user_id and resource not in PER_PROJECT_QUOTAS 

2690 model = models.ProjectUserQuota if per_user else models.Quota 

2691 query = model_query(context, model).\ 

2692 filter_by(project_id=project_id).\ 

2693 filter_by(resource=resource) 

2694 if per_user: 2694 ↛ 2695line 2694 didn't jump to line 2695 because the condition on line 2694 was never true

2695 query = query.filter_by(user_id=user_id) 

2696 

2697 result = query.update({'hard_limit': limit}) 

2698 if not result: 

2699 if per_user: 2699 ↛ 2700line 2699 didn't jump to line 2700 because the condition on line 2699 was never true

2700 raise exception.ProjectUserQuotaNotFound(project_id=project_id, 

2701 user_id=user_id) 

2702 else: 

2703 raise exception.ProjectQuotaNotFound(project_id=project_id) 

2704 

2705 

2706################### 

2707 

2708 

2709@require_context 

2710@pick_context_manager_reader 

2711def quota_class_get(context, class_name, resource): 

2712 """Retrieve a quota class or raise if it does not exist.""" 

2713 result = model_query(context, models.QuotaClass, read_deleted="no").\ 

2714 filter_by(class_name=class_name).\ 

2715 filter_by(resource=resource).\ 

2716 first() 

2717 

2718 if not result: 

2719 raise exception.QuotaClassNotFound(class_name=class_name) 

2720 

2721 return result 

2722 

2723 

2724@pick_context_manager_reader 

2725def quota_class_get_default(context): 

2726 """Retrieve all default quotas.""" 

2727 rows = model_query(context, models.QuotaClass, read_deleted="no").\ 

2728 filter_by(class_name=_DEFAULT_QUOTA_NAME).\ 

2729 all() 

2730 

2731 result = {'class_name': _DEFAULT_QUOTA_NAME} 

2732 for row in rows: 

2733 result[row.resource] = row.hard_limit 

2734 

2735 return result 

2736 

2737 

2738@require_context 

2739@pick_context_manager_reader 

2740def quota_class_get_all_by_name(context, class_name): 

2741 """Retrieve all quotas associated with a given quota class.""" 

2742 rows = model_query(context, models.QuotaClass, read_deleted="no").\ 

2743 filter_by(class_name=class_name).\ 

2744 all() 

2745 

2746 result = {'class_name': class_name} 

2747 for row in rows: 

2748 result[row.resource] = row.hard_limit 

2749 

2750 return result 

2751 

2752 

2753@pick_context_manager_writer 

2754def quota_class_create(context, class_name, resource, limit): 

2755 """Create a quota class for the given name and resource.""" 

2756 quota_class_ref = models.QuotaClass() 

2757 quota_class_ref.class_name = class_name 

2758 quota_class_ref.resource = resource 

2759 quota_class_ref.hard_limit = limit 

2760 quota_class_ref.save(context.session) 

2761 return quota_class_ref 

2762 

2763 

2764@pick_context_manager_writer 

2765def quota_class_update(context, class_name, resource, limit): 

2766 """Update a quota class or raise if it does not exist.""" 

2767 result = model_query(context, models.QuotaClass, read_deleted="no").\ 

2768 filter_by(class_name=class_name).\ 

2769 filter_by(resource=resource).\ 

2770 update({'hard_limit': limit}) 

2771 

2772 if not result: 

2773 raise exception.QuotaClassNotFound(class_name=class_name) 

2774 

2775 

2776################### 

2777 

2778 

2779@pick_context_manager_writer 

2780def quota_destroy_all_by_project_and_user(context, project_id, user_id): 

2781 """Destroy all quotas associated with a given project and user.""" 

2782 model_query(context, models.ProjectUserQuota, read_deleted="no").\ 

2783 filter_by(project_id=project_id).\ 

2784 filter_by(user_id=user_id).\ 

2785 soft_delete(synchronize_session=False) 

2786 

2787 

2788@pick_context_manager_writer 

2789def quota_destroy_all_by_project(context, project_id): 

2790 """Destroy all quotas associated with a given project.""" 

2791 model_query(context, models.Quota, read_deleted="no").\ 

2792 filter_by(project_id=project_id).\ 

2793 soft_delete(synchronize_session=False) 

2794 

2795 model_query(context, models.ProjectUserQuota, read_deleted="no").\ 

2796 filter_by(project_id=project_id).\ 

2797 soft_delete(synchronize_session=False) 

2798 

2799 

2800################### 

2801 

2802 

2803def _block_device_mapping_get_query(context, columns_to_join=None): 

2804 if columns_to_join is None: 

2805 columns_to_join = [] 

2806 

2807 query = model_query(context, models.BlockDeviceMapping) 

2808 

2809 for column in columns_to_join: 

2810 column_ref = getattr(models.BlockDeviceMapping, column) 

2811 query = query.options(orm.joinedload(column_ref)) 

2812 

2813 return query 

2814 

2815 

2816def _scrub_empty_str_values(dct, keys_to_scrub): 

2817 """Remove any keys found in sequence keys_to_scrub from the dict 

2818 if they have the value ''. 

2819 """ 

2820 for key in keys_to_scrub: 

2821 if key in dct and dct[key] == '': 

2822 del dct[key] 

2823 

2824 

2825def _from_legacy_values(values, legacy, allow_updates=False): 

2826 if legacy: 

2827 if allow_updates and block_device.is_safe_for_update(values): 

2828 return values 

2829 else: 

2830 return block_device.BlockDeviceDict.from_legacy(values) 

2831 else: 

2832 return values 

2833 

2834 

2835def _set_or_validate_uuid(values): 

2836 uuid = values.get('uuid') 

2837 

2838 # values doesn't contain uuid, or it's blank 

2839 if not uuid: 

2840 values['uuid'] = uuidutils.generate_uuid() 

2841 

2842 # values contains a uuid 

2843 else: 

2844 if not uuidutils.is_uuid_like(uuid): 

2845 raise exception.InvalidUUID(uuid=uuid) 

2846 

2847 

2848@require_context 

2849@pick_context_manager_writer 

2850def block_device_mapping_create(context, values, legacy=True): 

2851 """Create an entry of block device mapping.""" 

2852 _scrub_empty_str_values(values, ['volume_size']) 

2853 values = _from_legacy_values(values, legacy) 

2854 convert_objects_related_datetimes(values) 

2855 

2856 _set_or_validate_uuid(values) 

2857 

2858 bdm_ref = models.BlockDeviceMapping() 

2859 bdm_ref.update(values) 

2860 bdm_ref.save(context.session) 

2861 return bdm_ref 

2862 

2863 

2864@require_context 

2865@pick_context_manager_writer 

2866def block_device_mapping_update(context, bdm_id, values, legacy=True): 

2867 """Update an entry of block device mapping.""" 

2868 _scrub_empty_str_values(values, ['volume_size']) 

2869 values = _from_legacy_values(values, legacy, allow_updates=True) 

2870 convert_objects_related_datetimes(values) 

2871 

2872 query = _block_device_mapping_get_query(context).filter_by(id=bdm_id) 

2873 query.update(values) 

2874 return query.first() 

2875 

2876 

2877@pick_context_manager_writer 

2878def block_device_mapping_update_or_create(context, values, legacy=True): 

2879 """Update an entry of block device mapping. 

2880 

2881 If not existed, create a new entry 

2882 """ 

2883 # TODO(mdbooth): Remove this method entirely. Callers should know whether 

2884 # they require update or create, and call the appropriate method. 

2885 

2886 _scrub_empty_str_values(values, ['volume_size']) 

2887 values = _from_legacy_values(values, legacy, allow_updates=True) 

2888 convert_objects_related_datetimes(values) 

2889 

2890 result = None 

2891 # NOTE(xqueralt,danms): Only update a BDM when device_name or 

2892 # uuid was provided. Prefer the uuid, if available, but fall 

2893 # back to device_name if no uuid is provided, which can happen 

2894 # for BDMs created before we had a uuid. We allow empty device 

2895 # names so they will be set later by the manager. 

2896 if 'uuid' in values: 

2897 query = _block_device_mapping_get_query(context) 

2898 result = query.filter_by(instance_uuid=values['instance_uuid'], 

2899 uuid=values['uuid']).one_or_none() 

2900 

2901 if not result and values['device_name']: 

2902 query = _block_device_mapping_get_query(context) 

2903 result = query.filter_by(instance_uuid=values['instance_uuid'], 

2904 device_name=values['device_name']).first() 

2905 

2906 if result: 

2907 result.update(values) 

2908 else: 

2909 # Either the device_name or uuid doesn't exist in the database yet, or 

2910 # neither was provided. Both cases mean creating a new BDM. 

2911 _set_or_validate_uuid(values) 

2912 result = models.BlockDeviceMapping(**values) 

2913 result.save(context.session) 

2914 

2915 # NOTE(xqueralt): Prevent from having multiple swap devices for the 

2916 # same instance. This will delete all the existing ones. 

2917 if block_device.new_format_is_swap(values): 

2918 query = _block_device_mapping_get_query(context) 

2919 query = query.filter_by(instance_uuid=values['instance_uuid'], 

2920 source_type='blank', guest_format='swap') 

2921 query = query.filter(models.BlockDeviceMapping.id != result.id) 

2922 query.soft_delete() 

2923 

2924 return result 

2925 

2926 

2927@require_context 

2928@pick_context_manager_reader_allow_async 

2929def block_device_mapping_get_all_by_instance_uuids(context, instance_uuids): 

2930 """Get all block device mapping belonging to a list of instances.""" 

2931 if not instance_uuids: 

2932 return [] 

2933 return _block_device_mapping_get_query(context).filter( 

2934 models.BlockDeviceMapping.instance_uuid.in_(instance_uuids)).all() 

2935 

2936 

2937@require_context 

2938@pick_context_manager_reader_allow_async 

2939def block_device_mapping_get_all_by_instance(context, instance_uuid): 

2940 """Get all block device mapping belonging to an instance.""" 

2941 return _block_device_mapping_get_query(context).\ 

2942 filter_by(instance_uuid=instance_uuid).\ 

2943 all() 

2944 

2945 

2946@require_context 

2947@pick_context_manager_reader 

2948def block_device_mapping_get_all_by_volume_id( 

2949 context, volume_id, columns_to_join=None, 

2950): 

2951 """Get block device mapping for a given volume.""" 

2952 return _block_device_mapping_get_query(context, 

2953 columns_to_join=columns_to_join).\ 

2954 filter_by(volume_id=volume_id).\ 

2955 all() 

2956 

2957 

2958@require_context 

2959@pick_context_manager_reader 

2960def block_device_mapping_get_by_instance_and_volume_id( 

2961 context, volume_id, instance_uuid, columns_to_join=None, 

2962): 

2963 """Get block device mapping for a given volume ID and instance UUID.""" 

2964 return _block_device_mapping_get_query(context, 

2965 columns_to_join=columns_to_join).\ 

2966 filter_by(volume_id=volume_id).\ 

2967 filter_by(instance_uuid=instance_uuid).\ 

2968 first() 

2969 

2970 

2971@require_context 

2972@pick_context_manager_writer 

2973def block_device_mapping_destroy(context, bdm_id): 

2974 """Destroy the block device mapping.""" 

2975 _block_device_mapping_get_query(context).\ 

2976 filter_by(id=bdm_id).\ 

2977 soft_delete() 

2978 

2979 

2980@require_context 

2981@pick_context_manager_writer 

2982def block_device_mapping_destroy_by_instance_and_volume( 

2983 context, instance_uuid, volume_id, 

2984): 

2985 """Destroy the block device mapping.""" 

2986 _block_device_mapping_get_query(context).\ 

2987 filter_by(instance_uuid=instance_uuid).\ 

2988 filter_by(volume_id=volume_id).\ 

2989 soft_delete() 

2990 

2991 

2992@require_context 

2993@pick_context_manager_writer 

2994def block_device_mapping_destroy_by_instance_and_device( 

2995 context, instance_uuid, device_name, 

2996): 

2997 """Destroy the block device mapping.""" 

2998 _block_device_mapping_get_query(context).\ 

2999 filter_by(instance_uuid=instance_uuid).\ 

3000 filter_by(device_name=device_name).\ 

3001 soft_delete() 

3002 

3003 

3004################### 

3005 

3006 

3007@require_context 

3008@pick_context_manager_writer 

3009def security_group_create(context, values): 

3010 """Create a new security group.""" 

3011 security_group_ref = models.SecurityGroup() 

3012 # FIXME(devcamcar): Unless I do this, rules fails with lazy load exception 

3013 # once save() is called. This will get cleaned up in next orm pass. 

3014 security_group_ref.rules = [] 

3015 security_group_ref.update(values) 

3016 try: 

3017 with get_context_manager(context).writer.savepoint.using(context): 

3018 security_group_ref.save(context.session) 

3019 except db_exc.DBDuplicateEntry: 

3020 raise exception.SecurityGroupExists( 

3021 project_id=values['project_id'], 

3022 security_group_name=values['name']) 

3023 return security_group_ref 

3024 

3025 

3026def _security_group_get_query(context, read_deleted=None, 

3027 project_only=False, join_rules=True): 

3028 query = model_query( 

3029 context, 

3030 models.SecurityGroup, 

3031 read_deleted=read_deleted, 

3032 project_only=project_only, 

3033 ) 

3034 if join_rules: 3034 ↛ 3035line 3034 didn't jump to line 3035 because the condition on line 3034 was never true

3035 query = query.options( 

3036 orm.joinedload( 

3037 models.SecurityGroup.rules 

3038 ).joinedload(models.SecurityGroupIngressRule.grantee_group) 

3039 ) 

3040 return query 

3041 

3042 

3043def _security_group_get_by_names(context, group_names): 

3044 """Get security group models for a project by a list of names. 

3045 Raise SecurityGroupNotFoundForProject for a name not found. 

3046 """ 

3047 query = _security_group_get_query(context, read_deleted="no", 

3048 join_rules=False).\ 

3049 filter_by(project_id=context.project_id).\ 

3050 filter(models.SecurityGroup.name.in_(group_names)) 

3051 sg_models = query.all() 

3052 if len(sg_models) == len(group_names): 

3053 return sg_models 

3054 # Find the first one missing and raise 

3055 group_names_from_models = [x.name for x in sg_models] 

3056 for group_name in group_names: 3056 ↛ exitline 3056 didn't return from function '_security_group_get_by_names' because the loop on line 3056 didn't complete

3057 if group_name not in group_names_from_models: 3057 ↛ 3056line 3057 didn't jump to line 3056 because the condition on line 3057 was always true

3058 raise exception.SecurityGroupNotFoundForProject( 

3059 project_id=context.project_id, security_group_id=group_name) 

3060 # Not Reached 

3061 

3062 

3063@require_context 

3064@pick_context_manager_reader 

3065def security_group_get_all(context): 

3066 """Get all security groups.""" 

3067 return _security_group_get_query(context).all() 

3068 

3069 

3070@require_context 

3071@pick_context_manager_reader 

3072def security_group_get(context, security_group_id, columns_to_join=None): 

3073 """Get security group by its ID.""" 

3074 join_rules = columns_to_join and 'rules' in columns_to_join 

3075 if join_rules: 

3076 columns_to_join.remove('rules') 

3077 query = _security_group_get_query(context, project_only=True, 

3078 join_rules=join_rules).\ 

3079 filter_by(id=security_group_id) 

3080 

3081 if columns_to_join is None: 

3082 columns_to_join = [] 

3083 for column in columns_to_join: 

3084 query = query.options(_joinedload_all(models.SecurityGroup, column)) 

3085 

3086 result = query.first() 

3087 if not result: 

3088 raise exception.SecurityGroupNotFound( 

3089 security_group_id=security_group_id) 

3090 

3091 return result 

3092 

3093 

3094@require_context 

3095@pick_context_manager_reader 

3096def security_group_get_by_name(context, project_id, group_name): 

3097 """Returns a security group with the specified name from a project.""" 

3098 query = _security_group_get_query( 

3099 context, read_deleted="no", join_rules=False, 

3100 ).filter_by( 

3101 project_id=project_id, 

3102 ).filter_by( 

3103 name=group_name, 

3104 ).options( 

3105 orm.joinedload(models.SecurityGroup.instances) 

3106 ).options( 

3107 orm.joinedload( 

3108 models.SecurityGroup.rules 

3109 ).joinedload(models.SecurityGroupIngressRule.grantee_group) 

3110 ) 

3111 

3112 result = query.first() 

3113 if not result: 3113 ↛ 3114line 3113 didn't jump to line 3114 because the condition on line 3113 was never true

3114 raise exception.SecurityGroupNotFoundForProject( 

3115 project_id=project_id, security_group_id=group_name, 

3116 ) 

3117 

3118 return result 

3119 

3120 

3121@require_context 

3122@pick_context_manager_reader 

3123def security_group_get_by_project(context, project_id): 

3124 """Get all security groups belonging to a project.""" 

3125 return _security_group_get_query(context, read_deleted="no").\ 

3126 filter_by(project_id=project_id).\ 

3127 all() 

3128 

3129 

3130@require_context 

3131@pick_context_manager_reader 

3132def security_group_get_by_instance(context, instance_uuid): 

3133 """Get security groups to which the instance is assigned.""" 

3134 return _security_group_get_query(context, read_deleted="no").\ 

3135 join(models.SecurityGroup.instances).\ 

3136 filter_by(uuid=instance_uuid).\ 

3137 all() 

3138 

3139 

3140@require_context 

3141@pick_context_manager_reader 

3142def security_group_in_use(context, group_id): 

3143 """Indicates if a security group is currently in use.""" 

3144 # Are there any instances that haven't been deleted 

3145 # that include this group? 

3146 inst_assoc = model_query(context, 

3147 models.SecurityGroupInstanceAssociation, 

3148 read_deleted="no").\ 

3149 filter_by(security_group_id=group_id).\ 

3150 all() 

3151 for ia in inst_assoc: 

3152 num_instances = model_query(context, models.Instance, 

3153 read_deleted="no").\ 

3154 filter_by(uuid=ia.instance_uuid).\ 

3155 count() 

3156 if num_instances: 

3157 return True 

3158 

3159 return False 

3160 

3161 

3162@require_context 

3163@pick_context_manager_writer 

3164def security_group_update(context, security_group_id, values): 

3165 """Update a security group.""" 

3166 query = model_query(context, models.SecurityGroup).filter_by( 

3167 id=security_group_id, 

3168 ) 

3169 security_group_ref = query.first() 

3170 

3171 if not security_group_ref: 

3172 raise exception.SecurityGroupNotFound( 

3173 security_group_id=security_group_id) 

3174 security_group_ref.update(values) 

3175 name = security_group_ref['name'] 

3176 project_id = security_group_ref['project_id'] 

3177 try: 

3178 security_group_ref.save(context.session) 

3179 except db_exc.DBDuplicateEntry: 

3180 raise exception.SecurityGroupExists( 

3181 project_id=project_id, 

3182 security_group_name=name) 

3183 return security_group_ref 

3184 

3185 

3186def security_group_ensure_default(context): 

3187 """Ensure default security group exists for a project_id. 

3188 

3189 Returns a tuple with the first element being a bool indicating 

3190 if the default security group previously existed. Second 

3191 element is the dict used to create the default security group. 

3192 """ 

3193 

3194 try: 

3195 # NOTE(rpodolyaka): create the default security group, if it doesn't 

3196 # exist. This must be done in a separate transaction, so that 

3197 # this one is not aborted in case a concurrent one succeeds first 

3198 # and the unique constraint for security group names is violated 

3199 # by a concurrent INSERT 

3200 with get_context_manager(context).writer.independent.using(context): 

3201 return _security_group_ensure_default(context) 

3202 except exception.SecurityGroupExists: 

3203 # NOTE(rpodolyaka): a concurrent transaction has succeeded first, 

3204 # suppress the error and proceed 

3205 return security_group_get_by_name(context, context.project_id, 

3206 'default') 

3207 

3208 

3209@pick_context_manager_writer 

3210def _security_group_ensure_default(context): 

3211 try: 

3212 default_group = _security_group_get_by_names(context, ['default'])[0] 

3213 except exception.NotFound: 

3214 values = {'name': 'default', 

3215 'description': 'default', 

3216 'user_id': context.user_id, 

3217 'project_id': context.project_id} 

3218 default_group = security_group_create(context, values) 

3219 return default_group 

3220 

3221 

3222@require_context 

3223@pick_context_manager_writer 

3224def security_group_destroy(context, security_group_id): 

3225 """Deletes a security group.""" 

3226 model_query(context, models.SecurityGroup).\ 

3227 filter_by(id=security_group_id).\ 

3228 soft_delete() 

3229 model_query(context, models.SecurityGroupInstanceAssociation).\ 

3230 filter_by(security_group_id=security_group_id).\ 

3231 soft_delete() 

3232 model_query(context, models.SecurityGroupIngressRule).\ 

3233 filter_by(group_id=security_group_id).\ 

3234 soft_delete() 

3235 model_query(context, models.SecurityGroupIngressRule).\ 

3236 filter_by(parent_group_id=security_group_id).\ 

3237 soft_delete() 

3238 

3239 

3240################### 

3241 

3242 

3243@pick_context_manager_writer 

3244def migration_create(context, values): 

3245 """Create a migration record.""" 

3246 migration = models.Migration() 

3247 migration.update(values) 

3248 migration.save(context.session) 

3249 return migration 

3250 

3251 

3252@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) 

3253@pick_context_manager_writer 

3254def migration_update(context, migration_id, values): 

3255 """Update a migration instance.""" 

3256 migration = migration_get(context, migration_id) 

3257 migration.update(values) 

3258 

3259 return migration 

3260 

3261 

3262@pick_context_manager_reader 

3263def migration_get(context, migration_id): 

3264 """Finds a migration by the ID.""" 

3265 result = model_query(context, models.Migration, read_deleted="yes").\ 

3266 filter_by(id=migration_id).\ 

3267 first() 

3268 

3269 if not result: 

3270 raise exception.MigrationNotFound(migration_id=migration_id) 

3271 

3272 return result 

3273 

3274 

3275@pick_context_manager_reader 

3276def migration_get_by_uuid(context, migration_uuid): 

3277 """Finds a migration by the migration UUID.""" 

3278 result = model_query(context, models.Migration, read_deleted="yes").\ 

3279 filter_by(uuid=migration_uuid).\ 

3280 first() 

3281 

3282 if not result: 

3283 raise exception.MigrationNotFound(migration_id=migration_uuid) 

3284 

3285 return result 

3286 

3287 

3288@pick_context_manager_reader 

3289def migration_get_by_id_and_instance(context, migration_id, instance_uuid): 

3290 """Finds a migration by the migration ID and the instance UUID.""" 

3291 result = model_query(context, models.Migration).\ 

3292 filter_by(id=migration_id).\ 

3293 filter_by(instance_uuid=instance_uuid).\ 

3294 first() 

3295 

3296 if not result: 

3297 raise exception.MigrationNotFoundForInstance( 

3298 migration_id=migration_id, instance_id=instance_uuid) 

3299 

3300 return result 

3301 

3302 

3303@pick_context_manager_reader 

3304def migration_get_by_instance_and_status(context, instance_uuid, status): 

3305 """Finds a migration by the instance UUID it's migrating.""" 

3306 result = model_query(context, models.Migration, read_deleted="yes").\ 

3307 filter_by(instance_uuid=instance_uuid).\ 

3308 filter_by(status=status).\ 

3309 first() 

3310 

3311 if not result: 

3312 raise exception.MigrationNotFoundByStatus(instance_id=instance_uuid, 

3313 status=status) 

3314 

3315 return result 

3316 

3317 

3318@pick_context_manager_reader_allow_async 

3319def migration_get_unconfirmed_by_dest_compute( 

3320 context, confirm_window, dest_compute, 

3321): 

3322 """Finds all unconfirmed migrations within the confirmation window for 

3323 a specific destination compute host. 

3324 """ 

3325 confirm_window = (timeutils.utcnow() - 

3326 datetime.timedelta(seconds=confirm_window)) 

3327 

3328 return model_query(context, models.Migration, read_deleted="yes").\ 

3329 filter(models.Migration.updated_at <= confirm_window).\ 

3330 filter_by(status="finished").\ 

3331 filter_by(dest_compute=dest_compute).\ 

3332 all() 

3333 

3334 

3335@pick_context_manager_reader 

3336def migration_get_in_progress_by_host_and_node(context, host, node): 

3337 """Finds all migrations for the given host + node that are not yet 

3338 confirmed or reverted. 

3339 """ 

3340 # TODO(mriedem): Tracking what various code flows set for 

3341 # migration status is nutty, since it happens all over the place 

3342 # and several of the statuses are redundant (done and completed). 

3343 # We need to define these in an enum somewhere and just update 

3344 # that one central place that defines what "in progress" means. 

3345 # NOTE(mriedem): The 'finished' status is not in this list because 

3346 # 'finished' means a resize is finished on the destination host 

3347 # and the instance is in VERIFY_RESIZE state, so the end state 

3348 # for a resize is actually 'confirmed' or 'reverted'. 

3349 return model_query( 

3350 context, models.Migration, 

3351 ).filter( 

3352 sql.or_( 

3353 sql.and_( 

3354 models.Migration.source_compute == host, 

3355 models.Migration.source_node == node, 

3356 ), 

3357 sql.and_( 

3358 models.Migration.dest_compute == host, 

3359 models.Migration.dest_node == node, 

3360 ), 

3361 ) 

3362 ).filter( 

3363 ~models.Migration.status.in_( 

3364 [ 

3365 'confirmed', 

3366 'reverted', 

3367 'error', 

3368 'failed', 

3369 'completed', 

3370 'cancelled', 

3371 'done', 

3372 ] 

3373 ) 

3374 ).options( 

3375 orm.joinedload( 

3376 models.Migration.instance 

3377 ).joinedload(models.Instance.system_metadata) 

3378 ).all() 

3379 

3380 

3381@pick_context_manager_reader 

3382def migration_get_in_progress_by_instance(context, instance_uuid, 

3383 migration_type=None): 

3384 """Finds all migrations of an instance in progress.""" 

3385 # TODO(Shaohe Feng) we should share the in-progress list. 

3386 # TODO(Shaohe Feng) will also summarize all status to a new 

3387 # MigrationStatus class. 

3388 query = model_query(context, models.Migration).\ 

3389 filter_by(instance_uuid=instance_uuid).\ 

3390 filter(models.Migration.status.in_(['queued', 'preparing', 

3391 'running', 

3392 'post-migrating'])) 

3393 if migration_type: 

3394 query = query.filter(models.Migration.migration_type == migration_type) 

3395 

3396 return query.all() 

3397 

3398 

3399@pick_context_manager_reader 

3400def migration_get_all_by_filters(context, filters, 

3401 sort_keys=None, sort_dirs=None, 

3402 limit=None, marker=None): 

3403 """Finds all migrations using the provided filters.""" 

3404 if limit == 0: 3404 ↛ 3405line 3404 didn't jump to line 3405 because the condition on line 3404 was never true

3405 return [] 

3406 

3407 query = model_query(context, models.Migration) 

3408 if "uuid" in filters: 

3409 # The uuid filter is here for the MigrationLister and multi-cell 

3410 # paging support in the compute API. 

3411 uuid = filters["uuid"] 

3412 uuid = [uuid] if isinstance(uuid, str) else uuid 

3413 query = query.filter(models.Migration.uuid.in_(uuid)) 

3414 

3415 model_object = models.Migration 

3416 query = _get_query_nova_resource_by_changes_time(query, 

3417 filters, 

3418 model_object) 

3419 

3420 if "status" in filters: 

3421 status = filters["status"] 

3422 status = [status] if isinstance(status, str) else status 

3423 query = query.filter(models.Migration.status.in_(status)) 

3424 if "host" in filters: 

3425 host = filters["host"] 

3426 query = query.filter(sql.or_( 

3427 models.Migration.source_compute == host, 

3428 models.Migration.dest_compute == host)) 

3429 elif "source_compute" in filters: 

3430 host = filters['source_compute'] 

3431 query = query.filter(models.Migration.source_compute == host) 

3432 if "node" in filters: 3432 ↛ 3433line 3432 didn't jump to line 3433 because the condition on line 3432 was never true

3433 node = filters['node'] 

3434 query = query.filter(sql.or_( 

3435 models.Migration.source_node == node, 

3436 models.Migration.dest_node == node)) 

3437 if "migration_type" in filters: 

3438 migtype = filters["migration_type"] 

3439 query = query.filter(models.Migration.migration_type == migtype) 

3440 if "hidden" in filters: 

3441 hidden = filters["hidden"] 

3442 query = query.filter(models.Migration.hidden == hidden) 

3443 if "instance_uuid" in filters: 

3444 instance_uuid = filters["instance_uuid"] 

3445 query = query.filter(models.Migration.instance_uuid == instance_uuid) 

3446 if 'user_id' in filters: 

3447 user_id = filters['user_id'] 

3448 query = query.filter(models.Migration.user_id == user_id) 

3449 if 'project_id' in filters: 

3450 project_id = filters['project_id'] 

3451 query = query.filter(models.Migration.project_id == project_id) 

3452 

3453 if marker: 

3454 try: 

3455 marker = migration_get_by_uuid(context, marker) 

3456 except exception.MigrationNotFound: 

3457 raise exception.MarkerNotFound(marker=marker) 

3458 if limit or marker or sort_keys or sort_dirs: 

3459 # Default sort by desc(['created_at', 'id']) 

3460 sort_keys, sort_dirs = db_utils.process_sort_params( 

3461 sort_keys, sort_dirs, default_dir='desc') 

3462 return sqlalchemyutils.paginate_query(query, 

3463 models.Migration, 

3464 limit=limit, 

3465 sort_keys=sort_keys, 

3466 marker=marker, 

3467 sort_dirs=sort_dirs).all() 

3468 else: 

3469 return query.all() 

3470 

3471 

3472@require_context 

3473@pick_context_manager_reader_allow_async 

3474def migration_get_by_sort_filters(context, sort_keys, sort_dirs, values): 

3475 """Get the uuid of the first migration in a sort order. 

3476 

3477 Return the first migration (uuid) of the set where each column value 

3478 is greater than or equal to the matching one in @values, for each key 

3479 in @sort_keys. This is used to try to find a marker migration when we don't 

3480 have a marker uuid. 

3481 

3482 :returns: A UUID of the migration that matched. 

3483 """ 

3484 model = models.Migration 

3485 return _model_get_uuid_by_sort_filters(context, model, sort_keys, 

3486 sort_dirs, values) 

3487 

3488 

3489@pick_context_manager_writer 

3490def migration_migrate_to_uuid(context, max_count): 

3491 # Avoid circular import 

3492 from nova import objects 

3493 

3494 db_migrations = model_query(context, models.Migration).filter_by( 

3495 uuid=None).limit(max_count).all() 

3496 

3497 done = 0 

3498 for db_migration in db_migrations: 

3499 mig = objects.Migration(context) 

3500 mig._from_db_object(context, mig, db_migration) 

3501 done += 1 

3502 

3503 # We don't have any situation where we can (detectably) not 

3504 # migrate a thing, so report anything that matched as "completed". 

3505 return done, done 

3506 

3507 

3508@pick_context_manager_reader 

3509def migration_get_in_progress_and_error_by_host_and_node(context, host, node): 

3510 """Finds all in progress migrations and error migrations for the given 

3511 host and node. 

3512 """ 

3513 return model_query( 

3514 context, models.Migration, 

3515 ).filter( 

3516 sql.or_( 

3517 sql.and_( 

3518 models.Migration.source_compute == host, 

3519 models.Migration.source_node == node), 

3520 sql.and_( 

3521 models.Migration.dest_compute == host, 

3522 models.Migration.dest_node == node, 

3523 ), 

3524 ) 

3525 ).filter( 

3526 ~models.Migration.status.in_([ 

3527 'confirmed', 

3528 'reverted', 

3529 'failed', 

3530 'completed', 

3531 'cancelled', 

3532 'done', 

3533 ]) 

3534 ).options( 

3535 orm.joinedload( 

3536 models.Migration.instance 

3537 ).joinedload(models.Instance.system_metadata) 

3538 ).all() 

3539 

3540 

3541######################## 

3542# User-provided metadata 

3543 

3544def _instance_metadata_get_multi(context, instance_uuids): 

3545 if not instance_uuids: 

3546 return [] 

3547 return model_query(context, models.InstanceMetadata).filter( 

3548 models.InstanceMetadata.instance_uuid.in_(instance_uuids)) 

3549 

3550 

3551def _instance_metadata_get_query(context, instance_uuid): 

3552 return model_query(context, models.InstanceMetadata, read_deleted="no").\ 

3553 filter_by(instance_uuid=instance_uuid) 

3554 

3555 

3556@require_context 

3557@pick_context_manager_reader 

3558def instance_metadata_get(context, instance_uuid): 

3559 """Get all metadata for an instance.""" 

3560 rows = _instance_metadata_get_query(context, instance_uuid).all() 

3561 return {row['key']: row['value'] for row in rows} 

3562 

3563 

3564@require_context 

3565@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) 

3566@pick_context_manager_writer 

3567def instance_metadata_delete(context, instance_uuid, key): 

3568 """Delete the given metadata item.""" 

3569 _instance_metadata_get_query(context, instance_uuid).\ 

3570 filter_by(key=key).\ 

3571 soft_delete() 

3572 

3573 

3574@require_context 

3575@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) 

3576@pick_context_manager_writer 

3577def instance_metadata_update(context, instance_uuid, metadata, delete): 

3578 """Update metadata if it exists, otherwise create it.""" 

3579 all_keys = metadata.keys() 

3580 if delete: 

3581 _instance_metadata_get_query(context, instance_uuid).\ 

3582 filter(~models.InstanceMetadata.key.in_(all_keys)).\ 

3583 soft_delete(synchronize_session=False) 

3584 

3585 already_existing_keys = [] 

3586 meta_refs = _instance_metadata_get_query(context, instance_uuid).\ 

3587 filter(models.InstanceMetadata.key.in_(all_keys)).\ 

3588 all() 

3589 

3590 for meta_ref in meta_refs: 

3591 already_existing_keys.append(meta_ref.key) 

3592 meta_ref.update({"value": metadata[meta_ref.key]}) 

3593 

3594 new_keys = set(all_keys) - set(already_existing_keys) 

3595 for key in new_keys: 

3596 meta_ref = models.InstanceMetadata() 

3597 meta_ref.update({"key": key, "value": metadata[key], 

3598 "instance_uuid": instance_uuid}) 

3599 context.session.add(meta_ref) 

3600 

3601 return metadata 

3602 

3603 

3604####################### 

3605# System-owned metadata 

3606 

3607 

3608def _instance_system_metadata_get_multi(context, instance_uuids): 

3609 if not instance_uuids: 

3610 return [] 

3611 return model_query(context, models.InstanceSystemMetadata, 

3612 read_deleted='yes').filter( 

3613 models.InstanceSystemMetadata.instance_uuid.in_(instance_uuids)) 

3614 

3615 

3616def _instance_system_metadata_get_query(context, instance_uuid): 

3617 return model_query(context, models.InstanceSystemMetadata).\ 

3618 filter_by(instance_uuid=instance_uuid) 

3619 

3620 

3621@require_context 

3622@pick_context_manager_reader 

3623def instance_system_metadata_get(context, instance_uuid): 

3624 """Get all system metadata for an instance.""" 

3625 rows = _instance_system_metadata_get_query(context, instance_uuid).all() 

3626 return {row['key']: row['value'] for row in rows} 

3627 

3628 

3629@require_context 

3630@pick_context_manager_writer 

3631def instance_system_metadata_update(context, instance_uuid, metadata, delete): 

3632 """Update metadata if it exists, otherwise create it.""" 

3633 all_keys = metadata.keys() 

3634 if delete: 

3635 _instance_system_metadata_get_query(context, instance_uuid).\ 

3636 filter(~models.InstanceSystemMetadata.key.in_(all_keys)).\ 

3637 soft_delete(synchronize_session=False) 

3638 

3639 already_existing_keys = [] 

3640 meta_refs = _instance_system_metadata_get_query(context, instance_uuid).\ 

3641 filter(models.InstanceSystemMetadata.key.in_(all_keys)).\ 

3642 all() 

3643 

3644 for meta_ref in meta_refs: 

3645 already_existing_keys.append(meta_ref.key) 

3646 meta_ref.update({"value": metadata[meta_ref.key]}) 

3647 

3648 new_keys = set(all_keys) - set(already_existing_keys) 

3649 for key in new_keys: 

3650 meta_ref = models.InstanceSystemMetadata() 

3651 meta_ref.update({"key": key, "value": metadata[key], 

3652 "instance_uuid": instance_uuid}) 

3653 context.session.add(meta_ref) 

3654 

3655 return metadata 

3656 

3657 

3658#################### 

3659 

3660 

3661@require_context 

3662@pick_context_manager_reader 

3663def vol_get_usage_by_time(context, begin): 

3664 """Return volumes usage that have been updated after a specified time.""" 

3665 return model_query(context, models.VolumeUsage, read_deleted="yes").\ 

3666 filter(sql.or_( 

3667 models.VolumeUsage.tot_last_refreshed == sql.null(), 

3668 models.VolumeUsage.tot_last_refreshed > begin, 

3669 models.VolumeUsage.curr_last_refreshed == sql.null(), 

3670 models.VolumeUsage.curr_last_refreshed > begin, 

3671 )).all() 

3672 

3673 

3674@require_context 

3675@pick_context_manager_writer 

3676def vol_usage_update( 

3677 context, id, rd_req, rd_bytes, wr_req, wr_bytes, 

3678 instance_id, project_id, user_id, availability_zone, 

3679 update_totals=False, 

3680): 

3681 """Update cached volume usage for a volume 

3682 

3683 Creates new record if needed. 

3684 """ 

3685 

3686 refreshed = timeutils.utcnow() 

3687 

3688 values = {} 

3689 # NOTE(dricco): We will be mostly updating current usage records vs 

3690 # updating total or creating records. Optimize accordingly. 

3691 if not update_totals: 

3692 values = {'curr_last_refreshed': refreshed, 

3693 'curr_reads': rd_req, 

3694 'curr_read_bytes': rd_bytes, 

3695 'curr_writes': wr_req, 

3696 'curr_write_bytes': wr_bytes, 

3697 'instance_uuid': instance_id, 

3698 'project_id': project_id, 

3699 'user_id': user_id, 

3700 'availability_zone': availability_zone} 

3701 else: 

3702 values = {'tot_last_refreshed': refreshed, 

3703 'tot_reads': models.VolumeUsage.tot_reads + rd_req, 

3704 'tot_read_bytes': models.VolumeUsage.tot_read_bytes + 

3705 rd_bytes, 

3706 'tot_writes': models.VolumeUsage.tot_writes + wr_req, 

3707 'tot_write_bytes': models.VolumeUsage.tot_write_bytes + 

3708 wr_bytes, 

3709 'curr_reads': 0, 

3710 'curr_read_bytes': 0, 

3711 'curr_writes': 0, 

3712 'curr_write_bytes': 0, 

3713 'instance_uuid': instance_id, 

3714 'project_id': project_id, 

3715 'user_id': user_id, 

3716 'availability_zone': availability_zone} 

3717 

3718 current_usage = model_query(context, models.VolumeUsage, 

3719 read_deleted="yes").\ 

3720 filter_by(volume_id=id).\ 

3721 first() 

3722 if current_usage: 

3723 if (rd_req < current_usage['curr_reads'] or 

3724 rd_bytes < current_usage['curr_read_bytes'] or 

3725 wr_req < current_usage['curr_writes'] or 

3726 wr_bytes < current_usage['curr_write_bytes']): 

3727 LOG.info("Volume(%s) has lower stats then what is in " 

3728 "the database. Instance must have been rebooted " 

3729 "or crashed. Updating totals.", id) 

3730 if not update_totals: 

3731 values['tot_reads'] = (models.VolumeUsage.tot_reads + 

3732 current_usage['curr_reads']) 

3733 values['tot_read_bytes'] = ( 

3734 models.VolumeUsage.tot_read_bytes + 

3735 current_usage['curr_read_bytes']) 

3736 values['tot_writes'] = (models.VolumeUsage.tot_writes + 

3737 current_usage['curr_writes']) 

3738 values['tot_write_bytes'] = ( 

3739 models.VolumeUsage.tot_write_bytes + 

3740 current_usage['curr_write_bytes']) 

3741 else: 

3742 values['tot_reads'] = (models.VolumeUsage.tot_reads + 

3743 current_usage['curr_reads'] + 

3744 rd_req) 

3745 values['tot_read_bytes'] = ( 

3746 models.VolumeUsage.tot_read_bytes + 

3747 current_usage['curr_read_bytes'] + rd_bytes) 

3748 values['tot_writes'] = (models.VolumeUsage.tot_writes + 

3749 current_usage['curr_writes'] + 

3750 wr_req) 

3751 values['tot_write_bytes'] = ( 

3752 models.VolumeUsage.tot_write_bytes + 

3753 current_usage['curr_write_bytes'] + wr_bytes) 

3754 

3755 current_usage.update(values) 

3756 current_usage.save(context.session) 

3757 context.session.refresh(current_usage) 

3758 return current_usage 

3759 

3760 vol_usage = models.VolumeUsage() 

3761 vol_usage.volume_id = id 

3762 vol_usage.instance_uuid = instance_id 

3763 vol_usage.project_id = project_id 

3764 vol_usage.user_id = user_id 

3765 vol_usage.availability_zone = availability_zone 

3766 

3767 if not update_totals: 3767 ↛ 3774line 3767 didn't jump to line 3774 because the condition on line 3767 was always true

3768 vol_usage.curr_last_refreshed = refreshed 

3769 vol_usage.curr_reads = rd_req 

3770 vol_usage.curr_read_bytes = rd_bytes 

3771 vol_usage.curr_writes = wr_req 

3772 vol_usage.curr_write_bytes = wr_bytes 

3773 else: 

3774 vol_usage.tot_last_refreshed = refreshed 

3775 vol_usage.tot_reads = rd_req 

3776 vol_usage.tot_read_bytes = rd_bytes 

3777 vol_usage.tot_writes = wr_req 

3778 vol_usage.tot_write_bytes = wr_bytes 

3779 

3780 vol_usage.save(context.session) 

3781 

3782 return vol_usage 

3783 

3784 

3785#################### 

3786 

3787 

3788@pick_context_manager_reader 

3789def s3_image_get(context, image_id): 

3790 """Find local s3 image represented by the provided id.""" 

3791 result = model_query(context, models.S3Image, read_deleted="yes").\ 

3792 filter_by(id=image_id).\ 

3793 first() 

3794 

3795 if not result: 

3796 raise exception.ImageNotFound(image_id=image_id) 

3797 

3798 return result 

3799 

3800 

3801@pick_context_manager_reader 

3802def s3_image_get_by_uuid(context, image_uuid): 

3803 """Find local s3 image represented by the provided uuid.""" 

3804 result = model_query(context, models.S3Image, read_deleted="yes").\ 

3805 filter_by(uuid=image_uuid).\ 

3806 first() 

3807 

3808 if not result: 

3809 raise exception.ImageNotFound(image_id=image_uuid) 

3810 

3811 return result 

3812 

3813 

3814@pick_context_manager_writer 

3815def s3_image_create(context, image_uuid): 

3816 """Create local s3 image represented by provided uuid.""" 

3817 try: 

3818 s3_image_ref = models.S3Image() 

3819 s3_image_ref.update({'uuid': image_uuid}) 

3820 s3_image_ref.save(context.session) 

3821 except Exception as e: 

3822 raise db_exc.DBError(e) 

3823 

3824 return s3_image_ref 

3825 

3826 

3827#################### 

3828 

3829 

3830@pick_context_manager_writer 

3831def instance_fault_create(context, values): 

3832 """Create a new instance fault.""" 

3833 fault_ref = models.InstanceFault() 

3834 fault_ref.update(values) 

3835 fault_ref.save(context.session) 

3836 return dict(fault_ref) 

3837 

3838 

3839@pick_context_manager_reader 

3840def instance_fault_get_by_instance_uuids( 

3841 context, instance_uuids, latest=False, 

3842): 

3843 """Get all instance faults for the provided instance_uuids. 

3844 

3845 :param instance_uuids: List of UUIDs of instances to grab faults for 

3846 :param latest: Optional boolean indicating we should only return the latest 

3847 fault for the instance 

3848 """ 

3849 if not instance_uuids: 

3850 return {} 

3851 

3852 faults_tbl = models.InstanceFault.__table__ 

3853 # NOTE(rpodolyaka): filtering by instance_uuids is performed in both 

3854 # code branches below for the sake of a better query plan. On change, 

3855 # make sure to update the other one as well. 

3856 query = model_query(context, models.InstanceFault, 

3857 [faults_tbl], 

3858 read_deleted='no') 

3859 

3860 if latest: 

3861 # NOTE(jaypipes): We join instance_faults to a derived table of the 

3862 # latest faults per instance UUID. The SQL produced below looks like 

3863 # this: 

3864 # 

3865 # SELECT instance_faults.* 

3866 # FROM instance_faults 

3867 # JOIN ( 

3868 # SELECT instance_uuid, MAX(id) AS max_id 

3869 # FROM instance_faults 

3870 # WHERE instance_uuid IN ( ... ) 

3871 # AND deleted = 0 

3872 # GROUP BY instance_uuid 

3873 # ) AS latest_faults 

3874 # ON instance_faults.id = latest_faults.max_id; 

3875 latest_faults = model_query( 

3876 context, models.InstanceFault, 

3877 [faults_tbl.c.instance_uuid, 

3878 sql.func.max(faults_tbl.c.id).label('max_id')], 

3879 read_deleted='no' 

3880 ).filter( 

3881 faults_tbl.c.instance_uuid.in_(instance_uuids) 

3882 ).group_by( 

3883 faults_tbl.c.instance_uuid 

3884 ).subquery(name="latest_faults") 

3885 

3886 query = query.join(latest_faults, 

3887 faults_tbl.c.id == latest_faults.c.max_id) 

3888 else: 

3889 query = query.filter( 

3890 models.InstanceFault.instance_uuid.in_(instance_uuids) 

3891 ).order_by(expression.desc("id")) 

3892 

3893 output = {} 

3894 for instance_uuid in instance_uuids: 

3895 output[instance_uuid] = [] 

3896 

3897 for row in query: 

3898 output[row.instance_uuid].append(row._asdict()) 

3899 

3900 return output 

3901 

3902 

3903################## 

3904 

3905 

3906@pick_context_manager_writer 

3907def action_start(context, values): 

3908 """Start an action for an instance.""" 

3909 convert_objects_related_datetimes(values, 'start_time', 'updated_at') 

3910 action_ref = models.InstanceAction() 

3911 action_ref.update(values) 

3912 action_ref.save(context.session) 

3913 return action_ref 

3914 

3915 

3916@pick_context_manager_writer 

3917def action_finish(context, values): 

3918 """Finish an action for an instance.""" 

3919 convert_objects_related_datetimes(values, 'start_time', 'finish_time', 

3920 'updated_at') 

3921 query = model_query(context, models.InstanceAction).\ 

3922 filter_by(instance_uuid=values['instance_uuid']).\ 

3923 filter_by(request_id=values['request_id']) 

3924 if query.update(values) != 1: 

3925 raise exception.InstanceActionNotFound( 

3926 request_id=values['request_id'], 

3927 instance_uuid=values['instance_uuid']) 

3928 return query.one() 

3929 

3930 

3931@pick_context_manager_reader 

3932def actions_get(context, instance_uuid, limit=None, marker=None, 

3933 filters=None): 

3934 """Get all instance actions for the provided instance and filters.""" 

3935 if limit == 0: 3935 ↛ 3936line 3935 didn't jump to line 3936 because the condition on line 3935 was never true

3936 return [] 

3937 

3938 sort_keys = ['created_at', 'id'] 

3939 sort_dirs = ['desc', 'desc'] 

3940 

3941 query_prefix = model_query(context, models.InstanceAction).\ 

3942 filter_by(instance_uuid=instance_uuid) 

3943 

3944 model_object = models.InstanceAction 

3945 query_prefix = _get_query_nova_resource_by_changes_time(query_prefix, 

3946 filters, 

3947 model_object) 

3948 

3949 if marker is not None: 

3950 marker = action_get_by_request_id(context, instance_uuid, marker) 

3951 if not marker: 

3952 raise exception.MarkerNotFound(marker=marker) 

3953 actions = sqlalchemyutils.paginate_query(query_prefix, 

3954 models.InstanceAction, limit, 

3955 sort_keys, marker=marker, 

3956 sort_dirs=sort_dirs).all() 

3957 return actions 

3958 

3959 

3960@pick_context_manager_reader 

3961def action_get_by_request_id(context, instance_uuid, request_id): 

3962 """Get the action by request_id and given instance.""" 

3963 action = _action_get_by_request_id(context, instance_uuid, request_id) 

3964 return action 

3965 

3966 

3967def _action_get_by_request_id(context, instance_uuid, request_id): 

3968 result = model_query(context, models.InstanceAction).\ 

3969 filter_by(instance_uuid=instance_uuid).\ 

3970 filter_by(request_id=request_id).\ 

3971 order_by(expression.desc("created_at"), expression.desc("id")).\ 

3972 first() 

3973 return result 

3974 

3975 

3976def _action_get_last_created_by_instance_uuid(context, instance_uuid): 

3977 result = model_query(context, models.InstanceAction).\ 

3978 filter_by(instance_uuid=instance_uuid).\ 

3979 order_by(expression.desc("created_at"), expression.desc("id")).\ 

3980 first() 

3981 return result 

3982 

3983 

3984@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) 

3985@pick_context_manager_writer 

3986def action_event_start(context, values): 

3987 """Start an event on an instance action.""" 

3988 convert_objects_related_datetimes(values, 'start_time') 

3989 action = _action_get_by_request_id(context, values['instance_uuid'], 

3990 values['request_id']) 

3991 # When nova-compute restarts, the context is generated again in 

3992 # init_host workflow, the request_id was different with the request_id 

3993 # recorded in InstanceAction, so we can't get the original record 

3994 # according to request_id. Try to get the last created action so that 

3995 # init_instance can continue to finish the recovery action, like: 

3996 # powering_off, unpausing, and so on. 

3997 update_action = True 

3998 if not action and not context.project_id: 

3999 action = _action_get_last_created_by_instance_uuid( 

4000 context, values['instance_uuid']) 

4001 # If we couldn't find an action by the request_id, we don't want to 

4002 # update this action since it likely represents an inactive action. 

4003 update_action = False 

4004 

4005 if not action: 

4006 raise exception.InstanceActionNotFound( 

4007 request_id=values['request_id'], 

4008 instance_uuid=values['instance_uuid']) 

4009 

4010 values['action_id'] = action['id'] 

4011 

4012 event_ref = models.InstanceActionEvent() 

4013 event_ref.update(values) 

4014 context.session.add(event_ref) 

4015 

4016 # Update action updated_at. 

4017 if update_action: 

4018 action.update({'updated_at': values['start_time']}) 

4019 action.save(context.session) 

4020 

4021 return event_ref 

4022 

4023 

4024# NOTE: We need the retry_on_deadlock decorator for cases like resize where 

4025# a lot of events are happening at once between multiple hosts trying to 

4026# update the same action record in a small time window. 

4027@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True) 

4028@pick_context_manager_writer 

4029def action_event_finish(context, values): 

4030 """Finish an event on an instance action.""" 

4031 convert_objects_related_datetimes(values, 'start_time', 'finish_time') 

4032 action = _action_get_by_request_id(context, values['instance_uuid'], 

4033 values['request_id']) 

4034 # When nova-compute restarts, the context is generated again in 

4035 # init_host workflow, the request_id was different with the request_id 

4036 # recorded in InstanceAction, so we can't get the original record 

4037 # according to request_id. Try to get the last created action so that 

4038 # init_instance can continue to finish the recovery action, like: 

4039 # powering_off, unpausing, and so on. 

4040 update_action = True 

4041 if not action and not context.project_id: 

4042 action = _action_get_last_created_by_instance_uuid( 

4043 context, values['instance_uuid']) 

4044 # If we couldn't find an action by the request_id, we don't want to 

4045 # update this action since it likely represents an inactive action. 

4046 update_action = False 

4047 

4048 if not action: 

4049 raise exception.InstanceActionNotFound( 

4050 request_id=values['request_id'], 

4051 instance_uuid=values['instance_uuid']) 

4052 

4053 event_ref = model_query(context, models.InstanceActionEvent).\ 

4054 filter_by(action_id=action['id']).\ 

4055 filter_by(event=values['event']).\ 

4056 first() 

4057 

4058 if not event_ref: 

4059 raise exception.InstanceActionEventNotFound(action_id=action['id'], 

4060 event=values['event']) 

4061 event_ref.update(values) 

4062 

4063 if values['result'].lower() == 'error': 

4064 action.update({'message': 'Error'}) 

4065 

4066 # Update action updated_at. 

4067 if update_action: 

4068 action.update({'updated_at': values['finish_time']}) 

4069 action.save(context.session) 

4070 

4071 return event_ref 

4072 

4073 

4074@pick_context_manager_reader 

4075def action_events_get(context, action_id): 

4076 """Get the events by action id.""" 

4077 events = model_query(context, models.InstanceActionEvent).\ 

4078 filter_by(action_id=action_id).\ 

4079 order_by(expression.desc("created_at"), expression.desc("id")).\ 

4080 all() 

4081 

4082 return events 

4083 

4084 

4085@pick_context_manager_reader 

4086def action_event_get_by_id(context, action_id, event_id): 

4087 event = model_query(context, models.InstanceActionEvent).\ 

4088 filter_by(action_id=action_id).\ 

4089 filter_by(id=event_id).\ 

4090 first() 

4091 

4092 return event 

4093 

4094 

4095################## 

4096 

4097 

4098@require_context 

4099@pick_context_manager_writer 

4100def ec2_instance_create(context, instance_uuid, id=None): 

4101 """Create the EC2 ID to instance UUID mapping on demand.""" 

4102 ec2_instance_ref = models.InstanceIdMapping() 

4103 ec2_instance_ref.update({'uuid': instance_uuid}) 

4104 if id is not None: 4104 ↛ 4105line 4104 didn't jump to line 4105 because the condition on line 4104 was never true

4105 ec2_instance_ref.update({'id': id}) 

4106 

4107 ec2_instance_ref.save(context.session) 

4108 

4109 return ec2_instance_ref 

4110 

4111 

4112@require_context 

4113@pick_context_manager_reader 

4114def ec2_instance_get_by_uuid(context, instance_uuid): 

4115 """Get UUID through EC2 ID from instance_id_mappings table.""" 

4116 result = _ec2_instance_get_query(context).\ 

4117 filter_by(uuid=instance_uuid).\ 

4118 first() 

4119 

4120 if not result: 

4121 raise exception.InstanceNotFound(instance_id=instance_uuid) 

4122 

4123 return result 

4124 

4125 

4126@require_context 

4127@pick_context_manager_reader 

4128def ec2_instance_get_by_id(context, instance_id): 

4129 result = _ec2_instance_get_query(context).\ 

4130 filter_by(id=instance_id).\ 

4131 first() 

4132 

4133 if not result: 

4134 raise exception.InstanceNotFound(instance_id=instance_id) 

4135 

4136 return result 

4137 

4138 

4139@require_context 

4140@pick_context_manager_reader 

4141def get_instance_uuid_by_ec2_id(context, ec2_id): 

4142 """Get UUID through EC2 ID from instance_id_mappings table.""" 

4143 result = ec2_instance_get_by_id(context, ec2_id) 

4144 return result['uuid'] 

4145 

4146 

4147def _ec2_instance_get_query(context): 

4148 return model_query(context, models.InstanceIdMapping, read_deleted='yes') 

4149 

4150 

4151################## 

4152 

4153 

4154def _task_log_get_query(context, task_name, period_beginning, 

4155 period_ending, host=None, state=None): 

4156 values = {'period_beginning': period_beginning, 

4157 'period_ending': period_ending} 

4158 values = convert_objects_related_datetimes(values, *values.keys()) 

4159 

4160 query = model_query(context, models.TaskLog).\ 

4161 filter_by(task_name=task_name).\ 

4162 filter_by(period_beginning=values['period_beginning']).\ 

4163 filter_by(period_ending=values['period_ending']) 

4164 if host is not None: 4164 ↛ 4166line 4164 didn't jump to line 4166 because the condition on line 4164 was always true

4165 query = query.filter_by(host=host) 

4166 if state is not None: 

4167 query = query.filter_by(state=state) 

4168 return query 

4169 

4170 

4171@pick_context_manager_reader 

4172def task_log_get(context, task_name, period_beginning, period_ending, host, 

4173 state=None): 

4174 return _task_log_get_query(context, task_name, period_beginning, 

4175 period_ending, host, state).first() 

4176 

4177 

4178@pick_context_manager_reader 

4179def task_log_get_all(context, task_name, period_beginning, period_ending, 

4180 host=None, state=None): 

4181 return _task_log_get_query(context, task_name, period_beginning, 

4182 period_ending, host, state).all() 

4183 

4184 

4185@pick_context_manager_writer 

4186def task_log_begin_task( 

4187 context, task_name, period_beginning, period_ending, host, task_items=None, 

4188 message=None, 

4189): 

4190 """Mark a task as started for a given host/time period.""" 

4191 values = {'period_beginning': period_beginning, 

4192 'period_ending': period_ending} 

4193 values = convert_objects_related_datetimes(values, *values.keys()) 

4194 

4195 task = models.TaskLog() 

4196 task.task_name = task_name 

4197 task.period_beginning = values['period_beginning'] 

4198 task.period_ending = values['period_ending'] 

4199 task.host = host 

4200 task.state = "RUNNING" 

4201 if message: 4201 ↛ 4203line 4201 didn't jump to line 4203 because the condition on line 4201 was always true

4202 task.message = message 

4203 if task_items: 

4204 task.task_items = task_items 

4205 try: 

4206 task.save(context.session) 

4207 except db_exc.DBDuplicateEntry: 

4208 raise exception.TaskAlreadyRunning(task_name=task_name, host=host) 

4209 

4210 

4211@pick_context_manager_writer 

4212def task_log_end_task( 

4213 context, task_name, period_beginning, period_ending, host, errors, 

4214 message=None, 

4215): 

4216 """Mark a task as complete for a given host/time period.""" 

4217 values = dict(state="DONE", errors=errors) 

4218 if message: 4218 ↛ 4221line 4218 didn't jump to line 4221 because the condition on line 4218 was always true

4219 values["message"] = message 

4220 

4221 rows = _task_log_get_query(context, task_name, period_beginning, 

4222 period_ending, host).update(values) 

4223 if rows == 0: 

4224 # It's not running! 

4225 raise exception.TaskNotRunning(task_name=task_name, host=host) 

4226 

4227 

4228################## 

4229 

4230 

4231def _get_tables_with_fk_to_table(table): 

4232 """Get a list of tables that refer to the given table by foreign key (FK). 

4233 

4234 :param table: Table object (parent) for which to find references by FK 

4235 

4236 :returns: A list of Table objects that refer to the specified table by FK 

4237 """ 

4238 tables = [] 

4239 for t in models.BASE.metadata.tables.values(): 

4240 for fk in t.foreign_keys: 

4241 if fk.references(table): 

4242 tables.append(t) 

4243 return tables 

4244 

4245 

4246def _get_fk_stmts(metadata, conn, table, column, records): 

4247 """Find records related to this table by foreign key (FK) and create and 

4248 return insert/delete statements for them. 

4249 

4250 Logic is: find the tables that reference the table passed to this method 

4251 and walk the tree of references by FK. As child records are found, prepend 

4252 them to deques to execute later in a single database transaction (to avoid 

4253 orphaning related records if any one insert/delete fails or the archive 

4254 process is otherwise interrupted). 

4255 

4256 :param metadata: Metadata object to use to construct a shadow Table object 

4257 :param conn: Connection object to use to select records related by FK 

4258 :param table: Table object (parent) for which to find references by FK 

4259 :param column: Column object (parent) to use to select records related by 

4260 FK 

4261 :param records: A list of records (column values) to use to select records 

4262 related by FK 

4263 

4264 :returns: tuple of (insert statements, delete statements) for records 

4265 related by FK to insert into shadow tables and delete from main tables 

4266 """ 

4267 inserts = collections.deque() 

4268 deletes = collections.deque() 

4269 fk_tables = _get_tables_with_fk_to_table(table) 

4270 for fk_table in fk_tables: 

4271 # Create the shadow table for the referencing table. 

4272 fk_shadow_tablename = _SHADOW_TABLE_PREFIX + fk_table.name 

4273 try: 

4274 with conn.begin(): 

4275 fk_shadow_table = schema.Table( 

4276 fk_shadow_tablename, metadata, autoload_with=conn, 

4277 ) 

4278 except sqla_exc.NoSuchTableError: 

4279 # No corresponding shadow table; skip it. 

4280 continue 

4281 

4282 # TODO(stephenfin): Drop this when we drop the table 

4283 if fk_table.name == "dns_domains": 4283 ↛ 4286line 4283 didn't jump to line 4286 because the condition on line 4283 was never true

4284 # We have one table (dns_domains) where the key is called 

4285 # "domain" rather than "id" 

4286 fk_column = fk_table.c.domain 

4287 else: 

4288 fk_column = fk_table.c.id 

4289 

4290 for fk in fk_table.foreign_keys: 

4291 if table != fk.column.table: 

4292 # if the foreign key doesn't actually point to the table we're 

4293 # archiving entries from then it's not relevant; trying to 

4294 # resolve this would result in a cartesian product 

4295 continue 

4296 

4297 # We need to find the records in the referring (child) table that 

4298 # correspond to the records in our (parent) table so we can archive 

4299 # them. 

4300 

4301 # First, select the column in the parent referenced by the child 

4302 # table that corresponds to the parent table records that were 

4303 # passed in. 

4304 # Example: table = 'instances' and fk_table = 'instance_extra' 

4305 # fk.parent = instance_extra.instance_uuid 

4306 # fk.column = instances.uuid 

4307 # SELECT instances.uuid FROM instances, instance_extra 

4308 # WHERE instance_extra.instance_uuid = instances.uuid 

4309 # AND instance.id IN (<ids>) 

4310 # We need the instance uuids for the <ids> in order to 

4311 # look up the matching instance_extra records. 

4312 select = sql.select(fk.column).where( 

4313 sql.and_(fk.parent == fk.column, column.in_(records)) 

4314 ) 

4315 with conn.begin(): 

4316 rows = conn.execute(select).fetchall() 

4317 p_records = [r[0] for r in rows] 

4318 # Then, select rows in the child table that correspond to the 

4319 # parent table records that were passed in. 

4320 # Example: table = 'instances' and fk_table = 'instance_extra' 

4321 # fk.parent = instance_extra.instance_uuid 

4322 # fk.column = instances.uuid 

4323 # SELECT instance_extra.id FROM instance_extra, instances 

4324 # WHERE instance_extra.instance_uuid = instances.uuid 

4325 # AND instances.uuid IN (<uuids>) 

4326 # We will get the instance_extra ids we need to archive 

4327 # them. 

4328 fk_select = sql.select(fk_column).where( 

4329 sql.and_(fk.parent == fk.column, fk.column.in_(p_records)) 

4330 ) 

4331 with conn.begin(): 

4332 fk_rows = conn.execute(fk_select).fetchall() 

4333 fk_records = [r[0] for r in fk_rows] 

4334 if fk_records: 

4335 # If we found any records in the child table, create shadow 

4336 # table insert statements for them and prepend them to the 

4337 # deque. 

4338 fk_columns = [c.name for c in fk_table.c] 

4339 fk_insert = fk_shadow_table.insert().from_select( 

4340 fk_columns, 

4341 sql.select(fk_table).where(fk_column.in_(fk_records)) 

4342 ).inline() 

4343 inserts.appendleft(fk_insert) 

4344 # Create main table delete statements and prepend them to the 

4345 # deque. 

4346 fk_delete = fk_table.delete().where(fk_column.in_(fk_records)) 

4347 deletes.appendleft(fk_delete) 

4348 

4349 # Repeat for any possible nested child tables. 

4350 i, d = _get_fk_stmts(metadata, conn, fk_table, fk_column, fk_records) 

4351 inserts.extendleft(i) 

4352 deletes.extendleft(d) 

4353 

4354 return inserts, deletes 

4355 

4356 

4357def _archive_deleted_rows_for_table( 

4358 metadata, engine, tablename, max_rows, before, task_log, 

4359): 

4360 """Move up to max_rows rows from one tables to the corresponding 

4361 shadow table. 

4362 

4363 Will also follow FK constraints and archive all referring rows. 

4364 Example: archiving a record from the 'instances' table will also archive 

4365 the 'instance_extra' record before archiving the 'instances' record. 

4366 

4367 :returns: 3-item tuple: 

4368 

4369 - number of rows archived 

4370 - list of UUIDs of instances that were archived 

4371 - number of extra rows archived (due to FK constraints) 

4372 dict of {tablename: rows_archived} 

4373 """ 

4374 conn = engine.connect() 

4375 # NOTE(tdurakov): table metadata should be received 

4376 # from models, not db tables. Default value specified by SoftDeleteMixin 

4377 # is known only by models, not DB layer. 

4378 # IMPORTANT: please do not change source of metadata information for table. 

4379 table = models.BASE.metadata.tables[tablename] 

4380 

4381 shadow_tablename = _SHADOW_TABLE_PREFIX + tablename 

4382 rows_archived = 0 

4383 deleted_instance_uuids = [] 

4384 try: 

4385 with conn.begin(): 

4386 shadow_table = schema.Table( 

4387 shadow_tablename, metadata, autoload_with=conn, 

4388 ) 

4389 except sqla_exc.NoSuchTableError: 

4390 # No corresponding shadow table; skip it. 

4391 conn.close() 

4392 return rows_archived, deleted_instance_uuids, {} 

4393 

4394 # TODO(stephenfin): Drop this when we drop the table 

4395 if tablename == "dns_domains": 4395 ↛ 4398line 4395 didn't jump to line 4398 because the condition on line 4395 was never true

4396 # We have one table (dns_domains) where the key is called 

4397 # "domain" rather than "id" 

4398 column = table.c.domain 

4399 else: 

4400 column = table.c.id 

4401 

4402 deleted_column = table.c.deleted 

4403 columns = [c.name for c in table.c] 

4404 

4405 select = sql.select(column).where( 

4406 deleted_column != deleted_column.default.arg 

4407 ) 

4408 

4409 if tablename == "task_log" and task_log: 

4410 # task_log table records are never deleted by anything, so we won't 

4411 # base our select statement on the 'deleted' column status. 

4412 select = sql.select(column) 

4413 

4414 if before: 

4415 if tablename != "task_log": 

4416 select = select.where(table.c.deleted_at < before) 

4417 elif task_log: 

4418 # task_log table records are never deleted by anything, so we won't 

4419 # base our select statement on the 'deleted_at' column status. 

4420 select = select.where(table.c.updated_at < before) 

4421 

4422 select = select.order_by(column).limit(max_rows) 

4423 with conn.begin(): 

4424 rows = conn.execute(select).fetchall() 

4425 

4426 # This is a list of IDs of rows that should be archived from this table, 

4427 # limited to a length of max_rows. 

4428 records = [r[0] for r in rows] 

4429 

4430 # We will archive deleted rows for this table and also generate insert and 

4431 # delete statements for extra rows we may archive by following FK 

4432 # relationships. Because we are iterating over the sorted_tables (list of 

4433 # Table objects sorted in order of foreign key dependency), new inserts and 

4434 # deletes ("leaves") will be added to the fronts of the deques created in 

4435 # _get_fk_stmts. This way, we make sure we delete child table records 

4436 # before we delete their parent table records. 

4437 

4438 # Keep track of any extra tablenames to number of rows that we archive by 

4439 # following FK relationships. 

4440 # 

4441 # extras = {tablename: number_of_extra_rows_archived} 

4442 extras = collections.defaultdict(int) 

4443 

4444 if not records: 

4445 # Nothing to archive, so return. 

4446 return rows_archived, deleted_instance_uuids, extras 

4447 

4448 # Keep track of how many rows we accumulate for the insert+delete database 

4449 # transaction and cap it as soon as it is >= max_rows. Because we will 

4450 # archive all child rows of a parent row along with the parent at the same 

4451 # time, we end up with extra rows to archive in addition to len(records). 

4452 num_rows_in_batch = 0 

4453 # The sequence of query statements we will execute in a batch. These are 

4454 # ordered: [child1, child1, parent1, child2, child2, child2, parent2, ...] 

4455 # Parent + child "trees" are kept together to avoid FK constraint 

4456 # violations. 

4457 statements_in_batch = [] 

4458 # The list of records in the batch. This is used for collecting deleted 

4459 # instance UUIDs in the case of the 'instances' table. 

4460 records_in_batch = [] 

4461 

4462 # (melwitt): We will gather rows related by foreign key relationship for 

4463 # each deleted row, one at a time. We do it this way to keep track of and 

4464 # limit the total number of rows that will be archived in a single database 

4465 # transaction. In a large scale database with potentially hundreds of 

4466 # thousands of deleted rows, if we don't limit the size of the transaction 

4467 # based on max_rows, we can get into a situation where we get stuck not 

4468 # able to make much progress. The value of max_rows has to be 1) small 

4469 # enough to not exceed the database's max packet size limit or timeout with 

4470 # a deadlock but 2) large enough to make progress in an environment with a 

4471 # constant high volume of create and delete traffic. By archiving each 

4472 # parent + child rows tree one at a time, we can ensure meaningful progress 

4473 # can be made while allowing the caller to predictably control the size of 

4474 # the database transaction with max_rows. 

4475 for record in records: 

4476 # Walk FK relationships and add insert/delete statements for rows that 

4477 # refer to this table via FK constraints. fk_inserts and fk_deletes 

4478 # will be prepended to by _get_fk_stmts if referring rows are found by 

4479 # FK constraints. 

4480 fk_inserts, fk_deletes = _get_fk_stmts( 

4481 metadata, conn, table, column, [record]) 

4482 statements_in_batch.extend(fk_inserts + fk_deletes) 

4483 # statement to add parent row to shadow table 

4484 insert = shadow_table.insert().from_select( 

4485 columns, sql.select(table).where(column.in_([record]))).inline() 

4486 statements_in_batch.append(insert) 

4487 # statement to remove parent row from main table 

4488 delete = table.delete().where(column.in_([record])) 

4489 statements_in_batch.append(delete) 

4490 

4491 records_in_batch.append(record) 

4492 

4493 # Check whether were have a full batch >= max_rows. Rows are counted as 

4494 # the number of rows that will be moved in the database transaction. 

4495 # So each insert+delete pair represents one row that will be moved. 

4496 # 1 parent + its fks 

4497 num_rows_in_batch += 1 + len(fk_inserts) 

4498 

4499 if max_rows is not None and num_rows_in_batch >= max_rows: 

4500 break 

4501 

4502 # NOTE(tssurya): In order to facilitate the deletion of records from 

4503 # instance_mappings, request_specs and instance_group_member tables in the 

4504 # nova_api DB, the rows of deleted instances from the instances table are 

4505 # stored prior to their deletion. Basically the uuids of the archived 

4506 # instances are queried and returned. 

4507 if tablename == "instances": 

4508 query_select = sql.select(table.c.uuid).where( 

4509 table.c.id.in_(records_in_batch)) 

4510 with conn.begin(): 

4511 rows = conn.execute(query_select).fetchall() 

4512 # deleted_instance_uuids = ['uuid1', 'uuid2', ...] 

4513 deleted_instance_uuids = [r[0] for r in rows] 

4514 

4515 try: 

4516 # Group the insert and delete in a transaction. 

4517 with conn.begin(): 

4518 for statement in statements_in_batch: 

4519 result = conn.execute(statement) 

4520 result_tablename = statement.table.name 

4521 # Add to archived row counts if not a shadow table. 

4522 if not result_tablename.startswith(_SHADOW_TABLE_PREFIX): 

4523 if result_tablename == tablename: 

4524 # Number of tablename (parent) rows archived. 

4525 rows_archived += result.rowcount 

4526 else: 

4527 # Number(s) of child rows archived. 

4528 extras[result_tablename] += result.rowcount 

4529 

4530 except db_exc.DBReferenceError as ex: 

4531 # A foreign key constraint keeps us from deleting some of these rows 

4532 # until we clean up a dependent table. Just skip this table for now; 

4533 # we'll come back to it later. 

4534 LOG.warning("IntegrityError detected when archiving table " 

4535 "%(tablename)s: %(error)s", 

4536 {'tablename': tablename, 'error': str(ex)}) 

4537 

4538 conn.close() 

4539 

4540 return rows_archived, deleted_instance_uuids, extras 

4541 

4542 

4543def archive_deleted_rows(context=None, max_rows=None, before=None, 

4544 task_log=False): 

4545 """Move up to max_rows rows from production tables to the corresponding 

4546 shadow tables. 

4547 

4548 :param context: nova.context.RequestContext for database access 

4549 :param max_rows: Maximum number of rows to archive (required) 

4550 :param before: optional datetime which when specified filters the records 

4551 to only archive those records deleted before the given date 

4552 :param task_log: Optional for whether to archive task_log table records 

4553 :returns: 3-item tuple: 

4554 

4555 - dict that maps table name to number of rows archived from that table, 

4556 for example:: 

4557 

4558 { 

4559 'instances': 5, 

4560 'block_device_mapping': 5, 

4561 'pci_devices': 2, 

4562 } 

4563 - list of UUIDs of instances that were archived 

4564 - total number of rows that were archived 

4565 """ 

4566 table_to_rows_archived = collections.defaultdict(int) 

4567 deleted_instance_uuids = [] 

4568 total_rows_archived = 0 

4569 meta = sa.MetaData() 

4570 engine = get_engine(use_slave=True, context=context) 

4571 meta.reflect(bind=engine) 

4572 # Get the sorted list of tables in order of foreign key dependency. 

4573 # Process the parent tables and find their dependent records in order to 

4574 # archive the related records in a single database transactions. The goal 

4575 # is to avoid a situation where, for example, an 'instances' table record 

4576 # is missing its corresponding 'instance_extra' record due to running the 

4577 # archive_deleted_rows command with max_rows. 

4578 for table in meta.sorted_tables: 

4579 tablename = table.name 

4580 rows_archived = 0 

4581 # skip the special alembic_version version table and any shadow tables 

4582 if ( 

4583 tablename == 'alembic_version' or 

4584 tablename.startswith(_SHADOW_TABLE_PREFIX) 

4585 ): 

4586 continue 

4587 

4588 # skip the tables that we've since removed the models for 

4589 if tablename in models.REMOVED_TABLES: 

4590 continue 

4591 

4592 rows_archived, _deleted_instance_uuids, extras = ( 

4593 _archive_deleted_rows_for_table( 

4594 meta, engine, tablename, 

4595 max_rows=max_rows - total_rows_archived, 

4596 before=before, 

4597 task_log=task_log)) 

4598 total_rows_archived += rows_archived 

4599 if tablename == 'instances': 

4600 deleted_instance_uuids = _deleted_instance_uuids 

4601 # Only report results for tables that had updates. 

4602 if rows_archived: 

4603 table_to_rows_archived[tablename] = rows_archived 

4604 for tablename, extra_rows_archived in extras.items(): 

4605 table_to_rows_archived[tablename] += extra_rows_archived 

4606 total_rows_archived += extra_rows_archived 

4607 if total_rows_archived >= max_rows: 

4608 break 

4609 return table_to_rows_archived, deleted_instance_uuids, total_rows_archived 

4610 

4611 

4612def _purgeable_tables(metadata): 

4613 return [ 

4614 t for t in metadata.sorted_tables if ( 

4615 t.name.startswith(_SHADOW_TABLE_PREFIX) and not 

4616 t.name == 'alembic_version' 

4617 ) 

4618 ] 

4619 

4620 

4621def purge_shadow_tables(context, before_date, status_fn=None): 

4622 engine = get_engine(context=context) 

4623 conn = engine.connect() 

4624 metadata = sa.MetaData() 

4625 metadata.reflect(bind=engine) 

4626 total_deleted = 0 

4627 

4628 if status_fn is None: 

4629 status_fn = lambda m: None 

4630 

4631 # Some things never get formally deleted, and thus deleted_at 

4632 # is never set. So, prefer specific timestamp columns here 

4633 # for those special cases. 

4634 overrides = { 

4635 'shadow_instance_actions': 'created_at', 

4636 'shadow_instance_actions_events': 'created_at', 

4637 'shadow_task_log': 'updated_at', 

4638 } 

4639 

4640 for table in _purgeable_tables(metadata): 

4641 if before_date is None: 

4642 col = None 

4643 elif table.name in overrides: 

4644 col = getattr(table.c, overrides[table.name]) 

4645 elif hasattr(table.c, 'deleted_at'): 

4646 col = table.c.deleted_at 

4647 elif hasattr(table.c, 'updated_at'): 

4648 col = table.c.updated_at 

4649 elif hasattr(table.c, 'created_at'): 

4650 col = table.c.created_at 

4651 else: 

4652 status_fn(_('Unable to purge table %(table)s because it ' 

4653 'has no timestamp column') % { 

4654 'table': table.name}) 

4655 continue 

4656 

4657 if col is not None: 

4658 delete = table.delete().where(col < before_date) 

4659 else: 

4660 delete = table.delete() 

4661 

4662 with conn.begin(): 

4663 deleted = conn.execute(delete) 

4664 if deleted.rowcount > 0: 

4665 status_fn(_('Deleted %(rows)i rows from %(table)s based on ' 

4666 'timestamp column %(col)s') % { 

4667 'rows': deleted.rowcount, 

4668 'table': table.name, 

4669 'col': col is None and '(n/a)' or col.name}) 

4670 total_deleted += deleted.rowcount 

4671 

4672 conn.close() 

4673 

4674 return total_deleted 

4675 

4676 

4677#################### 

4678 

4679 

4680@pick_context_manager_reader 

4681def pci_device_get_by_addr(context, node_id, dev_addr): 

4682 """Get PCI device by address.""" 

4683 pci_dev_ref = model_query(context, models.PciDevice).\ 

4684 filter_by(compute_node_id=node_id).\ 

4685 filter_by(address=dev_addr).\ 

4686 first() 

4687 if not pci_dev_ref: 

4688 raise exception.PciDeviceNotFound(node_id=node_id, address=dev_addr) 

4689 return pci_dev_ref 

4690 

4691 

4692@pick_context_manager_reader 

4693def pci_device_get_by_id(context, id): 

4694 """Get PCI device by id.""" 

4695 pci_dev_ref = model_query(context, models.PciDevice).\ 

4696 filter_by(id=id).\ 

4697 first() 

4698 if not pci_dev_ref: 

4699 raise exception.PciDeviceNotFoundById(id=id) 

4700 return pci_dev_ref 

4701 

4702 

4703@pick_context_manager_reader 

4704def pci_device_get_all_by_node(context, node_id): 

4705 """Get all PCI devices for one host.""" 

4706 return model_query(context, models.PciDevice).\ 

4707 filter_by(compute_node_id=node_id).\ 

4708 all() 

4709 

4710 

4711@pick_context_manager_reader 

4712def pci_device_get_all_by_parent_addr(context, node_id, parent_addr): 

4713 """Get all PCI devices by parent address.""" 

4714 return model_query(context, models.PciDevice).\ 

4715 filter_by(compute_node_id=node_id).\ 

4716 filter_by(parent_addr=parent_addr).\ 

4717 all() 

4718 

4719 

4720@require_context 

4721@pick_context_manager_reader 

4722def pci_device_get_all_by_instance_uuid(context, instance_uuid): 

4723 """Get PCI devices allocated to instance.""" 

4724 return model_query(context, models.PciDevice).\ 

4725 filter_by(status='allocated').\ 

4726 filter_by(instance_uuid=instance_uuid).\ 

4727 all() 

4728 

4729 

4730@pick_context_manager_reader 

4731def _instance_pcidevs_get_multi(context, instance_uuids): 

4732 if not instance_uuids: 

4733 return [] 

4734 return model_query(context, models.PciDevice).\ 

4735 filter_by(status='allocated').\ 

4736 filter(models.PciDevice.instance_uuid.in_(instance_uuids)) 

4737 

4738 

4739@pick_context_manager_writer 

4740def pci_device_destroy(context, node_id, address): 

4741 """Delete a PCI device record.""" 

4742 result = model_query(context, models.PciDevice).\ 

4743 filter_by(compute_node_id=node_id).\ 

4744 filter_by(address=address).\ 

4745 soft_delete() 

4746 if not result: 

4747 raise exception.PciDeviceNotFound(node_id=node_id, address=address) 

4748 

4749 

4750@pick_context_manager_writer 

4751def pci_device_update(context, node_id, address, values): 

4752 """Update a pci device.""" 

4753 query = model_query(context, models.PciDevice, read_deleted="no").\ 

4754 filter_by(compute_node_id=node_id).\ 

4755 filter_by(address=address) 

4756 if query.update(values) == 0: 

4757 device = models.PciDevice() 

4758 device.update(values) 

4759 context.session.add(device) 

4760 return query.one() 

4761 

4762 

4763#################### 

4764 

4765 

4766@pick_context_manager_writer 

4767def instance_tag_add(context, instance_uuid, tag): 

4768 """Add tag to the instance.""" 

4769 tag_ref = models.Tag() 

4770 tag_ref.resource_id = instance_uuid 

4771 tag_ref.tag = tag 

4772 

4773 try: 

4774 _check_instance_exists_in_project(context, instance_uuid) 

4775 with get_context_manager(context).writer.savepoint.using(context): 

4776 context.session.add(tag_ref) 

4777 except db_exc.DBDuplicateEntry: 

4778 # NOTE(snikitin): We should ignore tags duplicates 

4779 pass 

4780 

4781 return tag_ref 

4782 

4783 

4784@pick_context_manager_writer 

4785def instance_tag_set(context, instance_uuid, tags): 

4786 """Replace all of the instance tags with specified list of tags.""" 

4787 _check_instance_exists_in_project(context, instance_uuid) 

4788 

4789 existing = context.session.query(models.Tag.tag).filter_by( 

4790 resource_id=instance_uuid).all() 

4791 

4792 existing = set(row.tag for row in existing) 

4793 tags = set(tags) 

4794 to_delete = existing - tags 

4795 to_add = tags - existing 

4796 

4797 if to_delete: 

4798 context.session.query(models.Tag).filter_by( 

4799 resource_id=instance_uuid).filter( 

4800 models.Tag.tag.in_(to_delete)).delete( 

4801 synchronize_session=False) 

4802 

4803 if to_add: 

4804 data = [ 

4805 {'resource_id': instance_uuid, 'tag': tag} for tag in to_add] 

4806 context.session.execute(models.Tag.__table__.insert(), data) 

4807 

4808 return context.session.query(models.Tag).filter_by( 

4809 resource_id=instance_uuid).all() 

4810 

4811 

4812@pick_context_manager_reader 

4813def instance_tag_get_by_instance_uuid(context, instance_uuid): 

4814 """Get all tags for a given instance.""" 

4815 _check_instance_exists_in_project(context, instance_uuid) 

4816 return context.session.query(models.Tag).filter_by( 

4817 resource_id=instance_uuid).all() 

4818 

4819 

4820@pick_context_manager_writer 

4821def instance_tag_delete(context, instance_uuid, tag): 

4822 """Delete specified tag from the instance.""" 

4823 _check_instance_exists_in_project(context, instance_uuid) 

4824 result = context.session.query(models.Tag).filter_by( 

4825 resource_id=instance_uuid, tag=tag).delete() 

4826 

4827 if not result: 

4828 raise exception.InstanceTagNotFound(instance_id=instance_uuid, 

4829 tag=tag) 

4830 

4831 

4832@pick_context_manager_writer 

4833def instance_tag_delete_all(context, instance_uuid): 

4834 """Delete all tags from the instance.""" 

4835 _check_instance_exists_in_project(context, instance_uuid) 

4836 context.session.query(models.Tag).filter_by( 

4837 resource_id=instance_uuid).delete() 

4838 

4839 

4840@pick_context_manager_reader 

4841def instance_tag_exists(context, instance_uuid, tag): 

4842 """Check if specified tag exist on the instance.""" 

4843 _check_instance_exists_in_project(context, instance_uuid) 

4844 q = context.session.query(models.Tag).filter_by( 

4845 resource_id=instance_uuid, tag=tag) 

4846 return context.session.query(q.exists()).scalar() 

4847 

4848 

4849#################### 

4850 

4851 

4852@pick_context_manager_writer 

4853def console_auth_token_create(context, values): 

4854 """Create a console authorization.""" 

4855 instance_uuid = values.get('instance_uuid') 

4856 _check_instance_exists_in_project(context, instance_uuid) 

4857 token_ref = models.ConsoleAuthToken() 

4858 token_ref.update(values) 

4859 context.session.add(token_ref) 

4860 return token_ref 

4861 

4862 

4863@pick_context_manager_reader 

4864def console_auth_token_get_valid(context, token_hash, instance_uuid=None): 

4865 """Get a valid console authorization by token_hash and instance_uuid. 

4866 

4867 The console authorizations expire at the time specified by their 

4868 'expires' column. An expired console auth token will not be returned 

4869 to the caller - it is treated as if it does not exist. 

4870 

4871 If instance_uuid is specified, the token is validated against both 

4872 expiry and instance_uuid. 

4873 

4874 If instance_uuid is not specified, the token is validated against 

4875 expiry only. 

4876 """ 

4877 if instance_uuid is not None: 

4878 _check_instance_exists_in_project(context, instance_uuid) 

4879 query = context.session.query(models.ConsoleAuthToken).\ 

4880 filter_by(token_hash=token_hash) 

4881 if instance_uuid is not None: 

4882 query = query.filter_by(instance_uuid=instance_uuid) 

4883 return query.filter( 

4884 models.ConsoleAuthToken.expires > timeutils.utcnow_ts()).first() 

4885 

4886 

4887@pick_context_manager_writer 

4888def console_auth_token_destroy_all_by_instance(context, instance_uuid): 

4889 """Delete all console authorizations belonging to the instance.""" 

4890 context.session.query(models.ConsoleAuthToken).\ 

4891 filter_by(instance_uuid=instance_uuid).delete() 

4892 

4893 

4894@pick_context_manager_writer 

4895def console_auth_token_destroy_expired(context): 

4896 """Delete expired console authorizations. 

4897 

4898 The console authorizations expire at the time specified by their 

4899 'expires' column. This function is used to garbage collect expired tokens. 

4900 """ 

4901 context.session.query(models.ConsoleAuthToken).\ 

4902 filter(models.ConsoleAuthToken.expires <= timeutils.utcnow_ts()).\ 

4903 delete() 

4904 

4905 

4906@pick_context_manager_writer 

4907def console_auth_token_destroy_expired_by_host(context, host): 

4908 """Delete expired console authorizations belonging to the host. 

4909 

4910 The console authorizations expire at the time specified by their 

4911 'expires' column. This function is used to garbage collect expired 

4912 tokens associated with the given host. 

4913 """ 

4914 context.session.query(models.ConsoleAuthToken).\ 

4915 filter_by(host=host).\ 

4916 filter(models.ConsoleAuthToken.expires <= timeutils.utcnow_ts()).\ 

4917 delete() 

4918 

4919 

4920#################### 

4921 

4922 

4923@require_context 

4924@pick_context_manager_reader 

4925def share_mapping_get_all(context): 

4926 """Get all share_mapping.""" 

4927 return context.session.query(models.ShareMapping).all() 

4928 

4929 

4930@require_context 

4931@pick_context_manager_reader 

4932def share_mapping_get_by_share_id(context, share_id): 

4933 """Get share_mapping records for a specific share.""" 

4934 return context.session.query(models.ShareMapping).\ 

4935 filter_by(share_id=share_id).all() 

4936 

4937 

4938@require_context 

4939@pick_context_manager_reader 

4940def share_mapping_get_by_instance_uuid(context, instance_uuid): 

4941 """Get share_mapping records for a specific instance.""" 

4942 return context.session.query(models.ShareMapping).\ 

4943 filter_by(instance_uuid=instance_uuid).all() 

4944 

4945 

4946@require_context 

4947@pick_context_manager_reader 

4948def share_mapping_get_by_instance_uuid_and_share_id( 

4949 context, instance_uuid, share_id): 

4950 """Get share_mapping record for a specific instance and share_id.""" 

4951 return context.session.query(models.ShareMapping).\ 

4952 filter_by(instance_uuid=instance_uuid, share_id=share_id).first() 

4953 

4954 

4955@require_context 

4956@pick_context_manager_writer 

4957def share_mapping_delete_by_instance_uuid_and_share_id( 

4958 context, instance_uuid, share_id): 

4959 """Delete share_mapping record for a specific instance and share_id.""" 

4960 context.session.query(models.ShareMapping).\ 

4961 filter_by(instance_uuid=instance_uuid, share_id=share_id).delete() 

4962 

4963 

4964@require_context 

4965@pick_context_manager_writer 

4966def share_mapping_update( 

4967 context, uuid, instance_uuid, share_id, status, tag, export_location, 

4968 share_proto 

4969): 

4970 """Update share_mapping for a share 

4971 Creates new record if needed. 

4972 """ 

4973 share_mapping = share_mapping_get_by_instance_uuid_and_share_id( 

4974 context, instance_uuid, share_id) 

4975 

4976 if share_mapping: 

4977 share_mapping.status = status 

4978 share_mapping.tag = tag 

4979 share_mapping.export_location = export_location 

4980 share_mapping.share_proto = share_proto 

4981 share_mapping.save(context.session) 

4982 context.session.refresh(share_mapping) 

4983 

4984 else: 

4985 share_mapping = models.ShareMapping() 

4986 share_mapping.uuid = uuid 

4987 share_mapping.instance_uuid = instance_uuid 

4988 share_mapping.share_id = share_id 

4989 share_mapping.status = status 

4990 share_mapping.tag = tag 

4991 share_mapping.export_location = export_location 

4992 share_mapping.share_proto = share_proto 

4993 share_mapping.save(context.session) 

4994 

4995 return share_mapping