Coverage for nova/rpc.py: 96%
148 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
1# Copyright 2013 Red Hat, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
15import functools
17from oslo_log import log as logging
18import oslo_messaging as messaging
19from oslo_messaging.rpc import dispatcher
20from oslo_serialization import jsonutils
21from oslo_service import periodic_task
22from oslo_utils import importutils
24import nova.conf
25import nova.context
26import nova.exception
27from nova.i18n import _
29__all__ = [
30 'init',
31 'cleanup',
32 'set_defaults',
33 'add_extra_exmods',
34 'clear_extra_exmods',
35 'get_allowed_exmods',
36 'RequestContextSerializer',
37 'get_client',
38 'get_server',
39 'get_notifier',
40]
42profiler = importutils.try_import("osprofiler.profiler")
45CONF = nova.conf.CONF
47LOG = logging.getLogger(__name__)
49# TODO(stephenfin): These should be private
50TRANSPORT = None
51LEGACY_NOTIFIER = None
52NOTIFICATION_TRANSPORT = None
53NOTIFIER = None
55# NOTE(danms): If rpc_response_timeout is over this value (per-call or
56# globally), we will enable heartbeating
57HEARTBEAT_THRESHOLD = 60
59ALLOWED_EXMODS = [
60 nova.exception.__name__,
61]
62EXTRA_EXMODS = []
65def init(conf):
66 global TRANSPORT, NOTIFICATION_TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
67 exmods = get_allowed_exmods()
68 TRANSPORT = create_transport(get_transport_url())
69 NOTIFICATION_TRANSPORT = messaging.get_notification_transport(
70 conf, allowed_remote_exmods=exmods)
71 serializer = RequestContextSerializer(JsonPayloadSerializer())
72 if conf.notifications.notification_format == 'unversioned':
73 LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
74 serializer=serializer)
75 NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
76 serializer=serializer, driver='noop')
77 elif conf.notifications.notification_format == 'both':
78 LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
79 serializer=serializer)
80 NOTIFIER = messaging.Notifier(
81 NOTIFICATION_TRANSPORT,
82 serializer=serializer,
83 topics=conf.notifications.versioned_notifications_topics)
84 else:
85 LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
86 serializer=serializer,
87 driver='noop')
88 NOTIFIER = messaging.Notifier(
89 NOTIFICATION_TRANSPORT,
90 serializer=serializer,
91 topics=conf.notifications.versioned_notifications_topics)
94def cleanup():
95 global TRANSPORT, NOTIFICATION_TRANSPORT, LEGACY_NOTIFIER, NOTIFIER
96 assert TRANSPORT is not None
97 assert NOTIFICATION_TRANSPORT is not None
98 assert LEGACY_NOTIFIER is not None
99 assert NOTIFIER is not None
100 TRANSPORT.cleanup()
101 NOTIFICATION_TRANSPORT.cleanup()
102 TRANSPORT = NOTIFICATION_TRANSPORT = LEGACY_NOTIFIER = NOTIFIER = None
105def set_defaults(control_exchange):
106 messaging.set_transport_defaults(control_exchange)
109def add_extra_exmods(*args):
110 EXTRA_EXMODS.extend(args)
113def clear_extra_exmods():
114 del EXTRA_EXMODS[:]
117def get_allowed_exmods():
118 return ALLOWED_EXMODS + EXTRA_EXMODS
121class JsonPayloadSerializer(messaging.NoOpSerializer):
123 @staticmethod
124 def fallback(obj):
125 """Serializer fallback
127 This method is used to serialize an object which jsonutils.to_primitive
128 does not otherwise know how to handle.
130 This is mostly only needed in tests because of the use of the nova
131 CheatingSerializer fixture which keeps some non-serializable fields
132 on the RequestContext, like db_connection.
133 """
134 if isinstance(obj, nova.context.RequestContext):
135 # This matches RequestContextSerializer.serialize_context().
136 return obj.to_dict()
137 # The default fallback in jsonutils.to_primitive() is str.
138 return str(obj)
140 def serialize_entity(self, context, entity):
141 return jsonutils.to_primitive(entity, convert_instances=True,
142 fallback=self.fallback)
145class RequestContextSerializer(messaging.Serializer):
147 def __init__(self, base):
148 self._base = base
150 def serialize_entity(self, context, entity):
151 if not self._base:
152 return entity
153 return self._base.serialize_entity(context, entity)
155 def deserialize_entity(self, context, entity):
156 if not self._base:
157 return entity
158 return self._base.deserialize_entity(context, entity)
160 def serialize_context(self, context):
161 return context.to_dict()
163 def deserialize_context(self, context):
164 return nova.context.RequestContext.from_dict(context)
167class ProfilerRequestContextSerializer(RequestContextSerializer):
168 def serialize_context(self, context):
169 _context = super(ProfilerRequestContextSerializer,
170 self).serialize_context(context)
172 prof = profiler.get()
173 if prof:
174 # FIXME(DinaBelova): we'll add profiler.get_info() method
175 # to extract this info -> we'll need to update these lines
176 trace_info = {
177 "hmac_key": prof.hmac_key,
178 "base_id": prof.get_base_id(),
179 "parent_id": prof.get_id()
180 }
181 _context.update({"trace_info": trace_info})
183 return _context
185 def deserialize_context(self, context):
186 trace_info = context.pop("trace_info", None)
187 if trace_info: 187 ↛ 190line 187 didn't jump to line 190 because the condition on line 187 was always true
188 profiler.init(**trace_info)
190 return super(ProfilerRequestContextSerializer,
191 self).deserialize_context(context)
194def get_transport_url(url_str=None):
195 return messaging.TransportURL.parse(CONF, url_str)
198def get_client(target, version_cap=None, serializer=None,
199 call_monitor_timeout=None):
200 assert TRANSPORT is not None
202 if profiler:
203 serializer = ProfilerRequestContextSerializer(serializer)
204 else:
205 serializer = RequestContextSerializer(serializer)
207 return messaging.get_rpc_client(TRANSPORT, target,
208 version_cap=version_cap, serializer=serializer,
209 call_monitor_timeout=call_monitor_timeout)
212def get_server(target, endpoints, serializer=None):
213 assert TRANSPORT is not None
215 if profiler:
216 serializer = ProfilerRequestContextSerializer(serializer)
217 else:
218 serializer = RequestContextSerializer(serializer)
219 access_policy = dispatcher.DefaultRPCAccessPolicy
220 return messaging.get_rpc_server(TRANSPORT,
221 target,
222 endpoints,
223 executor='eventlet',
224 serializer=serializer,
225 access_policy=access_policy)
228def get_notifier(service, host=None):
229 assert LEGACY_NOTIFIER is not None
230 publisher_id = '%s.%s' % (service, host or CONF.host)
231 return LegacyValidatingNotifier(
232 LEGACY_NOTIFIER.prepare(publisher_id=publisher_id))
235def get_versioned_notifier(publisher_id):
236 assert NOTIFIER is not None
237 return NOTIFIER.prepare(publisher_id=publisher_id)
240def if_notifications_enabled(f):
241 """Calls decorated method only if versioned notifications are enabled."""
242 @functools.wraps(f)
243 def wrapped(*args, **kwargs):
244 if (NOTIFIER.is_enabled() and
245 CONF.notifications.notification_format in ('both',
246 'versioned')):
247 return f(*args, **kwargs)
248 else:
249 return None
250 return wrapped
253def create_transport(url):
254 exmods = get_allowed_exmods()
255 return messaging.get_rpc_transport(CONF,
256 url=url,
257 allowed_remote_exmods=exmods)
260class LegacyValidatingNotifier(object):
261 """Wraps an oslo.messaging Notifier and checks for allowed event_types."""
263 # If true an exception is thrown if the event_type is not allowed, if false
264 # then only a WARNING is logged
265 fatal = False
267 # This list contains the already existing therefore allowed legacy
268 # notification event_types. New items shall not be added to the list as
269 # Nova does not allow new legacy notifications any more. This list will be
270 # removed when all the notification is transformed to versioned
271 # notifications.
272 allowed_legacy_notification_event_types = [
273 'aggregate.addhost.end',
274 'aggregate.addhost.start',
275 'aggregate.create.end',
276 'aggregate.create.start',
277 'aggregate.delete.end',
278 'aggregate.delete.start',
279 'aggregate.removehost.end',
280 'aggregate.removehost.start',
281 'aggregate.updatemetadata.end',
282 'aggregate.updatemetadata.start',
283 'aggregate.updateprop.end',
284 'aggregate.updateprop.start',
285 'compute.instance.create.end',
286 'compute.instance.create.error',
287 'compute.instance.create_ip.end',
288 'compute.instance.create_ip.start',
289 'compute.instance.create.start',
290 'compute.instance.delete.end',
291 'compute.instance.delete_ip.end',
292 'compute.instance.delete_ip.start',
293 'compute.instance.delete.start',
294 'compute.instance.evacuate',
295 'compute.instance.exists',
296 'compute.instance.finish_resize.end',
297 'compute.instance.finish_resize.start',
298 'compute.instance.live.migration.abort.start',
299 'compute.instance.live.migration.abort.end',
300 'compute.instance.live.migration.force.complete.start',
301 'compute.instance.live.migration.force.complete.end',
302 'compute.instance.live_migration.post.dest.end',
303 'compute.instance.live_migration.post.dest.start',
304 'compute.instance.live_migration._post.end',
305 'compute.instance.live_migration._post.start',
306 'compute.instance.live_migration.pre.end',
307 'compute.instance.live_migration.pre.start',
308 'compute.instance.live_migration.rollback.dest.end',
309 'compute.instance.live_migration.rollback.dest.start',
310 'compute.instance.live_migration._rollback.end',
311 'compute.instance.live_migration._rollback.start',
312 'compute.instance.pause.end',
313 'compute.instance.pause.start',
314 'compute.instance.power_off.end',
315 'compute.instance.power_off.start',
316 'compute.instance.power_on.end',
317 'compute.instance.power_on.start',
318 'compute.instance.reboot.end',
319 'compute.instance.reboot.error',
320 'compute.instance.reboot.start',
321 'compute.instance.rebuild.end',
322 'compute.instance.rebuild.error',
323 'compute.instance.rebuild.scheduled',
324 'compute.instance.rebuild.start',
325 'compute.instance.rescue.end',
326 'compute.instance.rescue.start',
327 'compute.instance.resize.confirm.end',
328 'compute.instance.resize.confirm.start',
329 'compute.instance.resize.end',
330 'compute.instance.resize.error',
331 'compute.instance.resize.prep.end',
332 'compute.instance.resize.prep.start',
333 'compute.instance.resize.revert.end',
334 'compute.instance.resize.revert.start',
335 'compute.instance.resize.start',
336 'compute.instance.restore.end',
337 'compute.instance.restore.start',
338 'compute.instance.resume.end',
339 'compute.instance.resume.start',
340 'compute.instance.shelve.end',
341 'compute.instance.shelve_offload.end',
342 'compute.instance.shelve_offload.start',
343 'compute.instance.shelve.start',
344 'compute.instance.shutdown.end',
345 'compute.instance.shutdown.start',
346 'compute.instance.snapshot.end',
347 'compute.instance.snapshot.start',
348 'compute.instance.soft_delete.end',
349 'compute.instance.soft_delete.start',
350 'compute.instance.suspend.end',
351 'compute.instance.suspend.start',
352 'compute.instance.trigger_crash_dump.end',
353 'compute.instance.trigger_crash_dump.start',
354 'compute.instance.unpause.end',
355 'compute.instance.unpause.start',
356 'compute.instance.unrescue.end',
357 'compute.instance.unrescue.start',
358 'compute.instance.unshelve.start',
359 'compute.instance.unshelve.end',
360 'compute.instance.update',
361 'compute.instance.volume.attach',
362 'compute.instance.volume.detach',
363 'compute.libvirt.error',
364 'compute.metrics.update',
365 'compute_task.build_instances',
366 'compute_task.migrate_server',
367 'compute_task.rebuild_server',
368 'HostAPI.power_action.end',
369 'HostAPI.power_action.start',
370 'HostAPI.set_enabled.end',
371 'HostAPI.set_enabled.start',
372 'HostAPI.set_maintenance.end',
373 'HostAPI.set_maintenance.start',
374 'keypair.create.start',
375 'keypair.create.end',
376 'keypair.delete.start',
377 'keypair.delete.end',
378 'keypair.import.start',
379 'keypair.import.end',
380 'network.floating_ip.allocate',
381 'network.floating_ip.associate',
382 'network.floating_ip.deallocate',
383 'network.floating_ip.disassociate',
384 'scheduler.select_destinations.end',
385 'scheduler.select_destinations.start',
386 'servergroup.addmember',
387 'servergroup.create',
388 'servergroup.delete',
389 'volume.usage',
390 ]
392 message = _('%(event_type)s is not a versioned notification and not '
393 'whitelisted. See ./doc/source/reference/notifications.rst')
395 def __init__(self, notifier):
396 self.notifier = notifier
397 for priority in ['debug', 'info', 'warn', 'error', 'critical']:
398 setattr(self, priority,
399 functools.partial(self._notify, priority))
401 def _is_wrap_exception_notification(self, payload):
402 # nova.exception_wrapper.wrap_exception decorator emits notification
403 # where the event_type is the name of the decorated function. This
404 # is used in many places but it will be converted to versioned
405 # notification in one run by updating the decorator so it is pointless
406 # to white list all the function names here we white list the
407 # notification itself detected by the special payload keys.
408 return {'exception', 'args'} == set(payload.keys())
410 def _notify(self, priority, ctxt, event_type, payload):
411 if (event_type not in self.allowed_legacy_notification_event_types and 411 ↛ 413line 411 didn't jump to line 413 because the condition on line 411 was never true
412 not self._is_wrap_exception_notification(payload)):
413 if self.fatal:
414 raise AssertionError(self.message % {'event_type': event_type})
415 else:
416 LOG.warning(self.message, {'event_type': event_type})
418 getattr(self.notifier, priority)(ctxt, event_type, payload)
421class ClientRouter(periodic_task.PeriodicTasks):
422 """Creates RPC clients that honor the context's RPC transport
423 or provides a default.
424 """
426 def __init__(self, default_client):
427 super(ClientRouter, self).__init__(CONF)
428 self.default_client = default_client
429 self.target = default_client.target
430 self.version_cap = default_client.version_cap
431 self.serializer = default_client.serializer
433 def client(self, context):
434 transport = context.mq_connection
435 if transport:
436 cmt = self.default_client.call_monitor_timeout
437 return messaging.get_rpc_client(transport, self.target,
438 version_cap=self.version_cap,
439 serializer=self.serializer,
440 call_monitor_timeout=cmt)
441 else:
442 return self.default_client