Coverage for nova/service.py: 83%

168 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-24 11:16 +0000

1# Copyright 2010 United States Government as represented by the 

2# Administrator of the National Aeronautics and Space Administration. 

3# Copyright 2011 Justin Santa Barbara 

4# All Rights Reserved. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); you may 

7# not use this file except in compliance with the License. You may obtain 

8# a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

15# License for the specific language governing permissions and limitations 

16# under the License. 

17 

18"""Generic Node base class for all workers that run on hosts.""" 

19 

20import os 

21import os.path 

22import random 

23import sys 

24 

25from oslo_log import log as logging 

26import oslo_messaging as messaging 

27from oslo_service import service 

28from oslo_utils import importutils 

29 

30from nova import baserpc 

31from nova import conductor 

32import nova.conf 

33from nova import context 

34from nova import debugger 

35from nova import exception 

36from nova.i18n import _ 

37from nova import objects 

38from nova.objects import base as objects_base 

39from nova.objects import service as service_obj 

40from nova import rpc 

41from nova import servicegroup 

42from nova import utils 

43from nova import version 

44 

45osprofiler = importutils.try_import("osprofiler") 

46osprofiler_initializer = importutils.try_import("osprofiler.initializer") 

47 

48CONF = nova.conf.CONF 

49LOG = logging.getLogger(__name__) 

50 

51SERVICE_MANAGERS = { 

52 'nova-compute': 'nova.compute.manager.ComputeManager', 

53 'nova-conductor': 'nova.conductor.manager.ConductorManager', 

54 'nova-scheduler': 'nova.scheduler.manager.SchedulerManager', 

55} 

56 

57 

58def _create_service_ref(this_service, context): 

59 service = objects.Service(context) 

60 service.host = this_service.host 

61 service.binary = this_service.binary 

62 service.topic = this_service.topic 

63 service.report_count = 0 

64 service.create() 

65 return service 

66 

67 

68def _update_service_ref(service): 

69 if service.version != service_obj.SERVICE_VERSION: 69 ↛ exitline 69 didn't return from function '_update_service_ref' because the condition on line 69 was always true

70 LOG.info('Updating service version for %(binary)s on ' 

71 '%(host)s from %(old)i to %(new)i', 

72 {'binary': service.binary, 

73 'host': service.host, 

74 'old': service.version, 

75 'new': service_obj.SERVICE_VERSION}) 

76 service.version = service_obj.SERVICE_VERSION 

77 service.save() 

78 

79 

80def setup_profiler(binary, host): 

81 if osprofiler and CONF.profiler.enabled: 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true

82 osprofiler.initializer.init_from_conf( 

83 conf=CONF, 

84 context=context.get_admin_context().to_dict(), 

85 project="nova", 

86 service=binary, 

87 host=host) 

88 LOG.info("OSProfiler is enabled.") 

89 

90 

91class Service(service.Service): 

92 """Service object for binaries running on hosts. 

93 

94 A service takes a manager and enables rpc by listening to queues based 

95 on topic. It also periodically runs tasks on the manager and reports 

96 its state to the database services table. 

97 """ 

98 

99 def __init__(self, host, binary, topic, manager, report_interval=None, 

100 periodic_enable=None, periodic_fuzzy_delay=None, 

101 periodic_interval_max=None, *args, **kwargs): 

102 super(Service, self).__init__() 

103 self.host = host 

104 self.binary = binary 

105 self.topic = topic 

106 self.manager_class_name = manager 

107 self.servicegroup_api = servicegroup.API() 

108 manager_class = importutils.import_class(self.manager_class_name) 

109 if objects_base.NovaObject.indirection_api: 

110 conductor_api = conductor.API() 

111 conductor_api.wait_until_ready(context.get_admin_context()) 

112 self.manager = manager_class(host=self.host, *args, **kwargs) 

113 self.rpcserver = None 

114 self.report_interval = report_interval 

115 self.periodic_enable = periodic_enable 

116 self.periodic_fuzzy_delay = periodic_fuzzy_delay 

117 self.periodic_interval_max = periodic_interval_max 

118 self.saved_args, self.saved_kwargs = args, kwargs 

119 self.backdoor_port = None 

120 setup_profiler(binary, self.host) 

121 

122 def __repr__(self): 

123 return "<%(cls_name)s: host=%(host)s, binary=%(binary)s, " \ 

124 "manager_class_name=%(manager)s>" % { 

125 'cls_name': self.__class__.__name__, 

126 'host': self.host, 

127 'binary': self.binary, 

128 'manager': self.manager_class_name 

129 } 

130 

131 def start(self): 

132 """Start the service. 

133 

134 This includes starting an RPC service, initializing 

135 periodic tasks, etc. 

136 """ 

137 # NOTE(melwitt): Clear the cell cache holding database transaction 

138 # context manager objects. We do this to ensure we create new internal 

139 # oslo.db locks to avoid a situation where a child process receives an 

140 # already locked oslo.db lock when it is forked. When a child process 

141 # inherits a locked oslo.db lock, database accesses through that 

142 # transaction context manager will never be able to acquire the lock 

143 # and requests will fail with CellTimeout errors. 

144 # See https://bugs.python.org/issue6721 for more information. 

145 # With python 3.7, it would be possible for oslo.db to make use of the 

146 # os.register_at_fork() method to reinitialize its lock. Until we 

147 # require python 3.7 as a minimum version, we must handle the situation 

148 # outside of oslo.db. 

149 context.CELL_CACHE = {} 

150 

151 verstr = version.version_string_with_package() 

152 LOG.info('Starting %(topic)s node (version %(version)s)', 

153 {'topic': self.topic, 'version': verstr}) 

154 self.basic_config_check() 

155 ctxt = context.get_admin_context() 

156 self.service_ref = objects.Service.get_by_host_and_binary( 

157 ctxt, self.host, self.binary) 

158 self.manager.init_host(self.service_ref) 

159 self.model_disconnected = False 

160 if self.service_ref: 

161 _update_service_ref(self.service_ref) 

162 

163 else: 

164 try: 

165 self.service_ref = _create_service_ref(self, ctxt) 

166 except (exception.ServiceTopicExists, 

167 exception.ServiceBinaryExists): 

168 # NOTE(danms): If we race to create a record with a sibling 

169 # worker, don't fail here. 

170 self.service_ref = objects.Service.get_by_host_and_binary( 

171 ctxt, self.host, self.binary) 

172 

173 self.manager.pre_start_hook(self.service_ref) 

174 

175 if self.backdoor_port is not None: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 self.manager.backdoor_port = self.backdoor_port 

177 

178 LOG.debug("Creating RPC server for service %s", self.topic) 

179 

180 target = messaging.Target(topic=self.topic, server=self.host) 

181 

182 endpoints = [ 

183 self.manager, 

184 baserpc.BaseRPCAPI(self.manager.service_name, self.backdoor_port) 

185 ] 

186 endpoints.extend(self.manager.additional_endpoints) 

187 

188 serializer = objects_base.NovaObjectSerializer() 

189 

190 self.rpcserver = rpc.get_server(target, endpoints, serializer) 

191 self.rpcserver.start() 

192 

193 self.manager.post_start_hook() 

194 

195 LOG.debug("Join ServiceGroup membership for this service %s", 

196 self.topic) 

197 # Add service to the ServiceGroup membership group. 

198 self.servicegroup_api.join(self.host, self.topic, self) 

199 

200 if self.periodic_enable: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 if self.periodic_fuzzy_delay: 

202 initial_delay = random.randint(0, self.periodic_fuzzy_delay) 

203 else: 

204 initial_delay = None 

205 

206 self.tg.add_dynamic_timer(self.periodic_tasks, 

207 initial_delay=initial_delay, 

208 periodic_interval_max= 

209 self.periodic_interval_max) 

210 

211 def __getattr__(self, key): 

212 manager = self.__dict__.get('manager', None) 

213 return getattr(manager, key) 

214 

215 @classmethod 

216 def create(cls, host=None, binary=None, topic=None, manager=None, 

217 report_interval=None, periodic_enable=None, 

218 periodic_fuzzy_delay=None, periodic_interval_max=None): 

219 """Instantiates class and passes back application object. 

220 

221 :param host: defaults to CONF.host 

222 :param binary: defaults to basename of executable 

223 :param topic: defaults to bin_name - 'nova-' part 

224 :param manager: defaults to CONF.<topic>_manager 

225 :param report_interval: defaults to CONF.report_interval 

226 :param periodic_enable: defaults to CONF.periodic_enable 

227 :param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay 

228 :param periodic_interval_max: if set, the max time to wait between runs 

229 

230 """ 

231 if not host: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 host = CONF.host 

233 if not binary: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 binary = os.path.basename(sys.argv[0]) 

235 if not topic: 

236 topic = binary.rpartition('nova-')[2] 

237 if not manager: 

238 manager = SERVICE_MANAGERS.get(binary) 

239 if report_interval is None: 239 ↛ 241line 239 didn't jump to line 241 because the condition on line 239 was always true

240 report_interval = CONF.report_interval 

241 if periodic_enable is None: 241 ↛ 243line 241 didn't jump to line 243 because the condition on line 241 was always true

242 periodic_enable = CONF.periodic_enable 

243 if periodic_fuzzy_delay is None: 243 ↛ 246line 243 didn't jump to line 246 because the condition on line 243 was always true

244 periodic_fuzzy_delay = CONF.periodic_fuzzy_delay 

245 

246 debugger.init() 

247 

248 service_obj = cls(host, binary, topic, manager, 

249 report_interval=report_interval, 

250 periodic_enable=periodic_enable, 

251 periodic_fuzzy_delay=periodic_fuzzy_delay, 

252 periodic_interval_max=periodic_interval_max) 

253 

254 # NOTE(gibi): This have to be after the service object creation as 

255 # that is the point where we can safely use the RPC to the conductor. 

256 # E.g. the Service.__init__ actually waits for the conductor to start 

257 # up before it allows the service to be created. The 

258 # raise_if_old_compute() depends on the RPC to be up and does not 

259 # implement its own retry mechanism to connect to the conductor. 

260 try: 

261 utils.raise_if_old_compute() 

262 except exception.TooOldComputeService as e: 

263 if CONF.workarounds.disable_compute_service_check_for_ffu: 

264 LOG.warning(str(e)) 

265 else: 

266 raise 

267 

268 return service_obj 

269 

270 def kill(self): 

271 """Destroy the service object in the datastore. 

272 

273 NOTE: Although this method is not used anywhere else than tests, it is 

274 convenient to have it here, so the tests might easily and in clean way 

275 stop and remove the service_ref. 

276 

277 """ 

278 self.stop() 

279 try: 

280 self.service_ref.destroy() 

281 except exception.NotFound: 

282 LOG.warning('Service killed that has no database entry') 

283 

284 def stop(self): 

285 """stop the service and clean up.""" 

286 try: 

287 self.rpcserver.stop() 

288 self.rpcserver.wait() 

289 except Exception: 

290 pass 

291 

292 try: 

293 self.manager.cleanup_host() 

294 except Exception: 

295 LOG.exception('Service error occurred during cleanup_host') 

296 pass 

297 

298 super(Service, self).stop() 

299 

300 def periodic_tasks(self, raise_on_error=False): 

301 """Tasks to be run at a periodic interval.""" 

302 ctxt = context.get_admin_context() 

303 return self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error) 

304 

305 def basic_config_check(self): 

306 """Perform basic config checks before starting processing.""" 

307 # Make sure the tempdir exists and is writable 

308 try: 

309 with utils.tempdir(): 

310 pass 

311 except Exception as e: 

312 LOG.error('Temporary directory is invalid: %s', e) 

313 sys.exit(1) 

314 

315 def reset(self): 

316 """reset the service.""" 

317 self.manager.reset() 

318 # Reset the cell cache that holds database transaction context managers 

319 context.CELL_CACHE = {} 

320 

321 

322def process_launcher(): 

323 return service.ProcessLauncher(CONF, restart_method='mutate') 

324 

325 

326# NOTE(vish): the global launcher is to maintain the existing 

327# functionality of calling service.serve + 

328# service.wait 

329_launcher = None 

330 

331 

332def serve(server, workers=None): 

333 global _launcher 

334 if _launcher: 

335 raise RuntimeError(_('serve() can only be called once')) 

336 

337 _launcher = service.launch(CONF, server, workers=workers, 

338 restart_method='mutate') 

339 

340 

341def wait(): 

342 _launcher.wait()