Coverage for nova/service.py: 83%
168 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-24 11:16 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-24 11:16 +0000
1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# Copyright 2011 Justin Santa Barbara
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
18"""Generic Node base class for all workers that run on hosts."""
20import os
21import os.path
22import random
23import sys
25from oslo_log import log as logging
26import oslo_messaging as messaging
27from oslo_service import service
28from oslo_utils import importutils
30from nova import baserpc
31from nova import conductor
32import nova.conf
33from nova import context
34from nova import debugger
35from nova import exception
36from nova.i18n import _
37from nova import objects
38from nova.objects import base as objects_base
39from nova.objects import service as service_obj
40from nova import rpc
41from nova import servicegroup
42from nova import utils
43from nova import version
45osprofiler = importutils.try_import("osprofiler")
46osprofiler_initializer = importutils.try_import("osprofiler.initializer")
48CONF = nova.conf.CONF
49LOG = logging.getLogger(__name__)
51SERVICE_MANAGERS = {
52 'nova-compute': 'nova.compute.manager.ComputeManager',
53 'nova-conductor': 'nova.conductor.manager.ConductorManager',
54 'nova-scheduler': 'nova.scheduler.manager.SchedulerManager',
55}
58def _create_service_ref(this_service, context):
59 service = objects.Service(context)
60 service.host = this_service.host
61 service.binary = this_service.binary
62 service.topic = this_service.topic
63 service.report_count = 0
64 service.create()
65 return service
68def _update_service_ref(service):
69 if service.version != service_obj.SERVICE_VERSION: 69 ↛ exitline 69 didn't return from function '_update_service_ref' because the condition on line 69 was always true
70 LOG.info('Updating service version for %(binary)s on '
71 '%(host)s from %(old)i to %(new)i',
72 {'binary': service.binary,
73 'host': service.host,
74 'old': service.version,
75 'new': service_obj.SERVICE_VERSION})
76 service.version = service_obj.SERVICE_VERSION
77 service.save()
80def setup_profiler(binary, host):
81 if osprofiler and CONF.profiler.enabled: 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true
82 osprofiler.initializer.init_from_conf(
83 conf=CONF,
84 context=context.get_admin_context().to_dict(),
85 project="nova",
86 service=binary,
87 host=host)
88 LOG.info("OSProfiler is enabled.")
91class Service(service.Service):
92 """Service object for binaries running on hosts.
94 A service takes a manager and enables rpc by listening to queues based
95 on topic. It also periodically runs tasks on the manager and reports
96 its state to the database services table.
97 """
99 def __init__(self, host, binary, topic, manager, report_interval=None,
100 periodic_enable=None, periodic_fuzzy_delay=None,
101 periodic_interval_max=None, *args, **kwargs):
102 super(Service, self).__init__()
103 self.host = host
104 self.binary = binary
105 self.topic = topic
106 self.manager_class_name = manager
107 self.servicegroup_api = servicegroup.API()
108 manager_class = importutils.import_class(self.manager_class_name)
109 if objects_base.NovaObject.indirection_api:
110 conductor_api = conductor.API()
111 conductor_api.wait_until_ready(context.get_admin_context())
112 self.manager = manager_class(host=self.host, *args, **kwargs)
113 self.rpcserver = None
114 self.report_interval = report_interval
115 self.periodic_enable = periodic_enable
116 self.periodic_fuzzy_delay = periodic_fuzzy_delay
117 self.periodic_interval_max = periodic_interval_max
118 self.saved_args, self.saved_kwargs = args, kwargs
119 self.backdoor_port = None
120 setup_profiler(binary, self.host)
122 def __repr__(self):
123 return "<%(cls_name)s: host=%(host)s, binary=%(binary)s, " \
124 "manager_class_name=%(manager)s>" % {
125 'cls_name': self.__class__.__name__,
126 'host': self.host,
127 'binary': self.binary,
128 'manager': self.manager_class_name
129 }
131 def start(self):
132 """Start the service.
134 This includes starting an RPC service, initializing
135 periodic tasks, etc.
136 """
137 # NOTE(melwitt): Clear the cell cache holding database transaction
138 # context manager objects. We do this to ensure we create new internal
139 # oslo.db locks to avoid a situation where a child process receives an
140 # already locked oslo.db lock when it is forked. When a child process
141 # inherits a locked oslo.db lock, database accesses through that
142 # transaction context manager will never be able to acquire the lock
143 # and requests will fail with CellTimeout errors.
144 # See https://bugs.python.org/issue6721 for more information.
145 # With python 3.7, it would be possible for oslo.db to make use of the
146 # os.register_at_fork() method to reinitialize its lock. Until we
147 # require python 3.7 as a minimum version, we must handle the situation
148 # outside of oslo.db.
149 context.CELL_CACHE = {}
151 verstr = version.version_string_with_package()
152 LOG.info('Starting %(topic)s node (version %(version)s)',
153 {'topic': self.topic, 'version': verstr})
154 self.basic_config_check()
155 ctxt = context.get_admin_context()
156 self.service_ref = objects.Service.get_by_host_and_binary(
157 ctxt, self.host, self.binary)
158 self.manager.init_host(self.service_ref)
159 self.model_disconnected = False
160 if self.service_ref:
161 _update_service_ref(self.service_ref)
163 else:
164 try:
165 self.service_ref = _create_service_ref(self, ctxt)
166 except (exception.ServiceTopicExists,
167 exception.ServiceBinaryExists):
168 # NOTE(danms): If we race to create a record with a sibling
169 # worker, don't fail here.
170 self.service_ref = objects.Service.get_by_host_and_binary(
171 ctxt, self.host, self.binary)
173 self.manager.pre_start_hook(self.service_ref)
175 if self.backdoor_port is not None: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 self.manager.backdoor_port = self.backdoor_port
178 LOG.debug("Creating RPC server for service %s", self.topic)
180 target = messaging.Target(topic=self.topic, server=self.host)
182 endpoints = [
183 self.manager,
184 baserpc.BaseRPCAPI(self.manager.service_name, self.backdoor_port)
185 ]
186 endpoints.extend(self.manager.additional_endpoints)
188 serializer = objects_base.NovaObjectSerializer()
190 self.rpcserver = rpc.get_server(target, endpoints, serializer)
191 self.rpcserver.start()
193 self.manager.post_start_hook()
195 LOG.debug("Join ServiceGroup membership for this service %s",
196 self.topic)
197 # Add service to the ServiceGroup membership group.
198 self.servicegroup_api.join(self.host, self.topic, self)
200 if self.periodic_enable: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true
201 if self.periodic_fuzzy_delay:
202 initial_delay = random.randint(0, self.periodic_fuzzy_delay)
203 else:
204 initial_delay = None
206 self.tg.add_dynamic_timer(self.periodic_tasks,
207 initial_delay=initial_delay,
208 periodic_interval_max=
209 self.periodic_interval_max)
211 def __getattr__(self, key):
212 manager = self.__dict__.get('manager', None)
213 return getattr(manager, key)
215 @classmethod
216 def create(cls, host=None, binary=None, topic=None, manager=None,
217 report_interval=None, periodic_enable=None,
218 periodic_fuzzy_delay=None, periodic_interval_max=None):
219 """Instantiates class and passes back application object.
221 :param host: defaults to CONF.host
222 :param binary: defaults to basename of executable
223 :param topic: defaults to bin_name - 'nova-' part
224 :param manager: defaults to CONF.<topic>_manager
225 :param report_interval: defaults to CONF.report_interval
226 :param periodic_enable: defaults to CONF.periodic_enable
227 :param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
228 :param periodic_interval_max: if set, the max time to wait between runs
230 """
231 if not host: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true
232 host = CONF.host
233 if not binary: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 binary = os.path.basename(sys.argv[0])
235 if not topic:
236 topic = binary.rpartition('nova-')[2]
237 if not manager:
238 manager = SERVICE_MANAGERS.get(binary)
239 if report_interval is None: 239 ↛ 241line 239 didn't jump to line 241 because the condition on line 239 was always true
240 report_interval = CONF.report_interval
241 if periodic_enable is None: 241 ↛ 243line 241 didn't jump to line 243 because the condition on line 241 was always true
242 periodic_enable = CONF.periodic_enable
243 if periodic_fuzzy_delay is None: 243 ↛ 246line 243 didn't jump to line 246 because the condition on line 243 was always true
244 periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
246 debugger.init()
248 service_obj = cls(host, binary, topic, manager,
249 report_interval=report_interval,
250 periodic_enable=periodic_enable,
251 periodic_fuzzy_delay=periodic_fuzzy_delay,
252 periodic_interval_max=periodic_interval_max)
254 # NOTE(gibi): This have to be after the service object creation as
255 # that is the point where we can safely use the RPC to the conductor.
256 # E.g. the Service.__init__ actually waits for the conductor to start
257 # up before it allows the service to be created. The
258 # raise_if_old_compute() depends on the RPC to be up and does not
259 # implement its own retry mechanism to connect to the conductor.
260 try:
261 utils.raise_if_old_compute()
262 except exception.TooOldComputeService as e:
263 if CONF.workarounds.disable_compute_service_check_for_ffu:
264 LOG.warning(str(e))
265 else:
266 raise
268 return service_obj
270 def kill(self):
271 """Destroy the service object in the datastore.
273 NOTE: Although this method is not used anywhere else than tests, it is
274 convenient to have it here, so the tests might easily and in clean way
275 stop and remove the service_ref.
277 """
278 self.stop()
279 try:
280 self.service_ref.destroy()
281 except exception.NotFound:
282 LOG.warning('Service killed that has no database entry')
284 def stop(self):
285 """stop the service and clean up."""
286 try:
287 self.rpcserver.stop()
288 self.rpcserver.wait()
289 except Exception:
290 pass
292 try:
293 self.manager.cleanup_host()
294 except Exception:
295 LOG.exception('Service error occurred during cleanup_host')
296 pass
298 super(Service, self).stop()
300 def periodic_tasks(self, raise_on_error=False):
301 """Tasks to be run at a periodic interval."""
302 ctxt = context.get_admin_context()
303 return self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
305 def basic_config_check(self):
306 """Perform basic config checks before starting processing."""
307 # Make sure the tempdir exists and is writable
308 try:
309 with utils.tempdir():
310 pass
311 except Exception as e:
312 LOG.error('Temporary directory is invalid: %s', e)
313 sys.exit(1)
315 def reset(self):
316 """reset the service."""
317 self.manager.reset()
318 # Reset the cell cache that holds database transaction context managers
319 context.CELL_CACHE = {}
322def process_launcher():
323 return service.ProcessLauncher(CONF, restart_method='mutate')
326# NOTE(vish): the global launcher is to maintain the existing
327# functionality of calling service.serve +
328# service.wait
329_launcher = None
332def serve(server, workers=None):
333 global _launcher
334 if _launcher:
335 raise RuntimeError(_('serve() can only be called once'))
337 _launcher = service.launch(CONF, server, workers=workers,
338 restart_method='mutate')
341def wait():
342 _launcher.wait()