Coverage for nova/rpc.py: 96%

148 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-24 11:16 +0000

1# Copyright 2013 Red Hat, Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); you may 

4# not use this file except in compliance with the License. You may obtain 

5# a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

12# License for the specific language governing permissions and limitations 

13# under the License. 

14 

15import functools 

16 

17from oslo_log import log as logging 

18import oslo_messaging as messaging 

19from oslo_messaging.rpc import dispatcher 

20from oslo_serialization import jsonutils 

21from oslo_service import periodic_task 

22from oslo_utils import importutils 

23 

24import nova.conf 

25import nova.context 

26import nova.exception 

27from nova.i18n import _ 

28 

29__all__ = [ 

30 'init', 

31 'cleanup', 

32 'set_defaults', 

33 'add_extra_exmods', 

34 'clear_extra_exmods', 

35 'get_allowed_exmods', 

36 'RequestContextSerializer', 

37 'get_client', 

38 'get_server', 

39 'get_notifier', 

40] 

41 

42profiler = importutils.try_import("osprofiler.profiler") 

43 

44 

45CONF = nova.conf.CONF 

46 

47LOG = logging.getLogger(__name__) 

48 

49# TODO(stephenfin): These should be private 

50TRANSPORT = None 

51LEGACY_NOTIFIER = None 

52NOTIFICATION_TRANSPORT = None 

53NOTIFIER = None 

54 

55# NOTE(danms): If rpc_response_timeout is over this value (per-call or 

56# globally), we will enable heartbeating 

57HEARTBEAT_THRESHOLD = 60 

58 

59ALLOWED_EXMODS = [ 

60 nova.exception.__name__, 

61] 

62EXTRA_EXMODS = [] 

63 

64 

65def init(conf): 

66 global TRANSPORT, NOTIFICATION_TRANSPORT, LEGACY_NOTIFIER, NOTIFIER 

67 exmods = get_allowed_exmods() 

68 TRANSPORT = create_transport(get_transport_url()) 

69 NOTIFICATION_TRANSPORT = messaging.get_notification_transport( 

70 conf, allowed_remote_exmods=exmods) 

71 serializer = RequestContextSerializer(JsonPayloadSerializer()) 

72 if conf.notifications.notification_format == 'unversioned': 

73 LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT, 

74 serializer=serializer) 

75 NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT, 

76 serializer=serializer, driver='noop') 

77 elif conf.notifications.notification_format == 'both': 

78 LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT, 

79 serializer=serializer) 

80 NOTIFIER = messaging.Notifier( 

81 NOTIFICATION_TRANSPORT, 

82 serializer=serializer, 

83 topics=conf.notifications.versioned_notifications_topics) 

84 else: 

85 LEGACY_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT, 

86 serializer=serializer, 

87 driver='noop') 

88 NOTIFIER = messaging.Notifier( 

89 NOTIFICATION_TRANSPORT, 

90 serializer=serializer, 

91 topics=conf.notifications.versioned_notifications_topics) 

92 

93 

94def cleanup(): 

95 global TRANSPORT, NOTIFICATION_TRANSPORT, LEGACY_NOTIFIER, NOTIFIER 

96 assert TRANSPORT is not None 

97 assert NOTIFICATION_TRANSPORT is not None 

98 assert LEGACY_NOTIFIER is not None 

99 assert NOTIFIER is not None 

100 TRANSPORT.cleanup() 

101 NOTIFICATION_TRANSPORT.cleanup() 

102 TRANSPORT = NOTIFICATION_TRANSPORT = LEGACY_NOTIFIER = NOTIFIER = None 

103 

104 

105def set_defaults(control_exchange): 

106 messaging.set_transport_defaults(control_exchange) 

107 

108 

109def add_extra_exmods(*args): 

110 EXTRA_EXMODS.extend(args) 

111 

112 

113def clear_extra_exmods(): 

114 del EXTRA_EXMODS[:] 

115 

116 

117def get_allowed_exmods(): 

118 return ALLOWED_EXMODS + EXTRA_EXMODS 

119 

120 

121class JsonPayloadSerializer(messaging.NoOpSerializer): 

122 

123 @staticmethod 

124 def fallback(obj): 

125 """Serializer fallback 

126 

127 This method is used to serialize an object which jsonutils.to_primitive 

128 does not otherwise know how to handle. 

129 

130 This is mostly only needed in tests because of the use of the nova 

131 CheatingSerializer fixture which keeps some non-serializable fields 

132 on the RequestContext, like db_connection. 

133 """ 

134 if isinstance(obj, nova.context.RequestContext): 

135 # This matches RequestContextSerializer.serialize_context(). 

136 return obj.to_dict() 

137 # The default fallback in jsonutils.to_primitive() is str. 

138 return str(obj) 

139 

140 def serialize_entity(self, context, entity): 

141 return jsonutils.to_primitive(entity, convert_instances=True, 

142 fallback=self.fallback) 

143 

144 

145class RequestContextSerializer(messaging.Serializer): 

146 

147 def __init__(self, base): 

148 self._base = base 

149 

150 def serialize_entity(self, context, entity): 

151 if not self._base: 

152 return entity 

153 return self._base.serialize_entity(context, entity) 

154 

155 def deserialize_entity(self, context, entity): 

156 if not self._base: 

157 return entity 

158 return self._base.deserialize_entity(context, entity) 

159 

160 def serialize_context(self, context): 

161 return context.to_dict() 

162 

163 def deserialize_context(self, context): 

164 return nova.context.RequestContext.from_dict(context) 

165 

166 

167class ProfilerRequestContextSerializer(RequestContextSerializer): 

168 def serialize_context(self, context): 

169 _context = super(ProfilerRequestContextSerializer, 

170 self).serialize_context(context) 

171 

172 prof = profiler.get() 

173 if prof: 

174 # FIXME(DinaBelova): we'll add profiler.get_info() method 

175 # to extract this info -> we'll need to update these lines 

176 trace_info = { 

177 "hmac_key": prof.hmac_key, 

178 "base_id": prof.get_base_id(), 

179 "parent_id": prof.get_id() 

180 } 

181 _context.update({"trace_info": trace_info}) 

182 

183 return _context 

184 

185 def deserialize_context(self, context): 

186 trace_info = context.pop("trace_info", None) 

187 if trace_info: 187 ↛ 190line 187 didn't jump to line 190 because the condition on line 187 was always true

188 profiler.init(**trace_info) 

189 

190 return super(ProfilerRequestContextSerializer, 

191 self).deserialize_context(context) 

192 

193 

194def get_transport_url(url_str=None): 

195 return messaging.TransportURL.parse(CONF, url_str) 

196 

197 

198def get_client(target, version_cap=None, serializer=None, 

199 call_monitor_timeout=None): 

200 assert TRANSPORT is not None 

201 

202 if profiler: 

203 serializer = ProfilerRequestContextSerializer(serializer) 

204 else: 

205 serializer = RequestContextSerializer(serializer) 

206 

207 return messaging.get_rpc_client(TRANSPORT, target, 

208 version_cap=version_cap, serializer=serializer, 

209 call_monitor_timeout=call_monitor_timeout) 

210 

211 

212def get_server(target, endpoints, serializer=None): 

213 assert TRANSPORT is not None 

214 

215 if profiler: 

216 serializer = ProfilerRequestContextSerializer(serializer) 

217 else: 

218 serializer = RequestContextSerializer(serializer) 

219 access_policy = dispatcher.DefaultRPCAccessPolicy 

220 return messaging.get_rpc_server(TRANSPORT, 

221 target, 

222 endpoints, 

223 executor='eventlet', 

224 serializer=serializer, 

225 access_policy=access_policy) 

226 

227 

228def get_notifier(service, host=None): 

229 assert LEGACY_NOTIFIER is not None 

230 publisher_id = '%s.%s' % (service, host or CONF.host) 

231 return LegacyValidatingNotifier( 

232 LEGACY_NOTIFIER.prepare(publisher_id=publisher_id)) 

233 

234 

235def get_versioned_notifier(publisher_id): 

236 assert NOTIFIER is not None 

237 return NOTIFIER.prepare(publisher_id=publisher_id) 

238 

239 

240def if_notifications_enabled(f): 

241 """Calls decorated method only if versioned notifications are enabled.""" 

242 @functools.wraps(f) 

243 def wrapped(*args, **kwargs): 

244 if (NOTIFIER.is_enabled() and 

245 CONF.notifications.notification_format in ('both', 

246 'versioned')): 

247 return f(*args, **kwargs) 

248 else: 

249 return None 

250 return wrapped 

251 

252 

253def create_transport(url): 

254 exmods = get_allowed_exmods() 

255 return messaging.get_rpc_transport(CONF, 

256 url=url, 

257 allowed_remote_exmods=exmods) 

258 

259 

260class LegacyValidatingNotifier(object): 

261 """Wraps an oslo.messaging Notifier and checks for allowed event_types.""" 

262 

263 # If true an exception is thrown if the event_type is not allowed, if false 

264 # then only a WARNING is logged 

265 fatal = False 

266 

267 # This list contains the already existing therefore allowed legacy 

268 # notification event_types. New items shall not be added to the list as 

269 # Nova does not allow new legacy notifications any more. This list will be 

270 # removed when all the notification is transformed to versioned 

271 # notifications. 

272 allowed_legacy_notification_event_types = [ 

273 'aggregate.addhost.end', 

274 'aggregate.addhost.start', 

275 'aggregate.create.end', 

276 'aggregate.create.start', 

277 'aggregate.delete.end', 

278 'aggregate.delete.start', 

279 'aggregate.removehost.end', 

280 'aggregate.removehost.start', 

281 'aggregate.updatemetadata.end', 

282 'aggregate.updatemetadata.start', 

283 'aggregate.updateprop.end', 

284 'aggregate.updateprop.start', 

285 'compute.instance.create.end', 

286 'compute.instance.create.error', 

287 'compute.instance.create_ip.end', 

288 'compute.instance.create_ip.start', 

289 'compute.instance.create.start', 

290 'compute.instance.delete.end', 

291 'compute.instance.delete_ip.end', 

292 'compute.instance.delete_ip.start', 

293 'compute.instance.delete.start', 

294 'compute.instance.evacuate', 

295 'compute.instance.exists', 

296 'compute.instance.finish_resize.end', 

297 'compute.instance.finish_resize.start', 

298 'compute.instance.live.migration.abort.start', 

299 'compute.instance.live.migration.abort.end', 

300 'compute.instance.live.migration.force.complete.start', 

301 'compute.instance.live.migration.force.complete.end', 

302 'compute.instance.live_migration.post.dest.end', 

303 'compute.instance.live_migration.post.dest.start', 

304 'compute.instance.live_migration._post.end', 

305 'compute.instance.live_migration._post.start', 

306 'compute.instance.live_migration.pre.end', 

307 'compute.instance.live_migration.pre.start', 

308 'compute.instance.live_migration.rollback.dest.end', 

309 'compute.instance.live_migration.rollback.dest.start', 

310 'compute.instance.live_migration._rollback.end', 

311 'compute.instance.live_migration._rollback.start', 

312 'compute.instance.pause.end', 

313 'compute.instance.pause.start', 

314 'compute.instance.power_off.end', 

315 'compute.instance.power_off.start', 

316 'compute.instance.power_on.end', 

317 'compute.instance.power_on.start', 

318 'compute.instance.reboot.end', 

319 'compute.instance.reboot.error', 

320 'compute.instance.reboot.start', 

321 'compute.instance.rebuild.end', 

322 'compute.instance.rebuild.error', 

323 'compute.instance.rebuild.scheduled', 

324 'compute.instance.rebuild.start', 

325 'compute.instance.rescue.end', 

326 'compute.instance.rescue.start', 

327 'compute.instance.resize.confirm.end', 

328 'compute.instance.resize.confirm.start', 

329 'compute.instance.resize.end', 

330 'compute.instance.resize.error', 

331 'compute.instance.resize.prep.end', 

332 'compute.instance.resize.prep.start', 

333 'compute.instance.resize.revert.end', 

334 'compute.instance.resize.revert.start', 

335 'compute.instance.resize.start', 

336 'compute.instance.restore.end', 

337 'compute.instance.restore.start', 

338 'compute.instance.resume.end', 

339 'compute.instance.resume.start', 

340 'compute.instance.shelve.end', 

341 'compute.instance.shelve_offload.end', 

342 'compute.instance.shelve_offload.start', 

343 'compute.instance.shelve.start', 

344 'compute.instance.shutdown.end', 

345 'compute.instance.shutdown.start', 

346 'compute.instance.snapshot.end', 

347 'compute.instance.snapshot.start', 

348 'compute.instance.soft_delete.end', 

349 'compute.instance.soft_delete.start', 

350 'compute.instance.suspend.end', 

351 'compute.instance.suspend.start', 

352 'compute.instance.trigger_crash_dump.end', 

353 'compute.instance.trigger_crash_dump.start', 

354 'compute.instance.unpause.end', 

355 'compute.instance.unpause.start', 

356 'compute.instance.unrescue.end', 

357 'compute.instance.unrescue.start', 

358 'compute.instance.unshelve.start', 

359 'compute.instance.unshelve.end', 

360 'compute.instance.update', 

361 'compute.instance.volume.attach', 

362 'compute.instance.volume.detach', 

363 'compute.libvirt.error', 

364 'compute.metrics.update', 

365 'compute_task.build_instances', 

366 'compute_task.migrate_server', 

367 'compute_task.rebuild_server', 

368 'HostAPI.power_action.end', 

369 'HostAPI.power_action.start', 

370 'HostAPI.set_enabled.end', 

371 'HostAPI.set_enabled.start', 

372 'HostAPI.set_maintenance.end', 

373 'HostAPI.set_maintenance.start', 

374 'keypair.create.start', 

375 'keypair.create.end', 

376 'keypair.delete.start', 

377 'keypair.delete.end', 

378 'keypair.import.start', 

379 'keypair.import.end', 

380 'network.floating_ip.allocate', 

381 'network.floating_ip.associate', 

382 'network.floating_ip.deallocate', 

383 'network.floating_ip.disassociate', 

384 'scheduler.select_destinations.end', 

385 'scheduler.select_destinations.start', 

386 'servergroup.addmember', 

387 'servergroup.create', 

388 'servergroup.delete', 

389 'volume.usage', 

390 ] 

391 

392 message = _('%(event_type)s is not a versioned notification and not ' 

393 'whitelisted. See ./doc/source/reference/notifications.rst') 

394 

395 def __init__(self, notifier): 

396 self.notifier = notifier 

397 for priority in ['debug', 'info', 'warn', 'error', 'critical']: 

398 setattr(self, priority, 

399 functools.partial(self._notify, priority)) 

400 

401 def _is_wrap_exception_notification(self, payload): 

402 # nova.exception_wrapper.wrap_exception decorator emits notification 

403 # where the event_type is the name of the decorated function. This 

404 # is used in many places but it will be converted to versioned 

405 # notification in one run by updating the decorator so it is pointless 

406 # to white list all the function names here we white list the 

407 # notification itself detected by the special payload keys. 

408 return {'exception', 'args'} == set(payload.keys()) 

409 

410 def _notify(self, priority, ctxt, event_type, payload): 

411 if (event_type not in self.allowed_legacy_notification_event_types and 411 ↛ 413line 411 didn't jump to line 413 because the condition on line 411 was never true

412 not self._is_wrap_exception_notification(payload)): 

413 if self.fatal: 

414 raise AssertionError(self.message % {'event_type': event_type}) 

415 else: 

416 LOG.warning(self.message, {'event_type': event_type}) 

417 

418 getattr(self.notifier, priority)(ctxt, event_type, payload) 

419 

420 

421class ClientRouter(periodic_task.PeriodicTasks): 

422 """Creates RPC clients that honor the context's RPC transport 

423 or provides a default. 

424 """ 

425 

426 def __init__(self, default_client): 

427 super(ClientRouter, self).__init__(CONF) 

428 self.default_client = default_client 

429 self.target = default_client.target 

430 self.version_cap = default_client.version_cap 

431 self.serializer = default_client.serializer 

432 

433 def client(self, context): 

434 transport = context.mq_connection 

435 if transport: 

436 cmt = self.default_client.call_monitor_timeout 

437 return messaging.get_rpc_client(transport, self.target, 

438 version_cap=self.version_cap, 

439 serializer=self.serializer, 

440 call_monitor_timeout=cmt) 

441 else: 

442 return self.default_client