Coverage for nova/policy.py: 93%

92 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-17 15:08 +0000

1# Copyright (c) 2011 OpenStack Foundation 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""Policy Engine For Nova.""" 

17import copy 

18import re 

19 

20from oslo_config import cfg 

21from oslo_log import log as logging 

22from oslo_policy import policy 

23from oslo_utils import excutils 

24 

25 

26from nova import exception 

27from nova import policies 

28 

29 

30CONF = cfg.CONF 

31LOG = logging.getLogger(__name__) 

32_ENFORCER = None 

33# This list is about the resources which support user based policy enforcement. 

34# Avoid sending deprecation warning for those resources. 

35USER_BASED_RESOURCES = ['os-keypairs'] 

36# oslo_policy will read the policy configuration file again when the file 

37# is changed in runtime so the old policy rules will be saved to 

38# saved_file_rules and used to compare with new rules to determine the 

39# rules whether were updated. 

40saved_file_rules = [] 

41KEY_EXPR = re.compile(r'%\((\w+)\)s') 

42 

43 

44def reset(): 

45 global _ENFORCER 

46 if _ENFORCER: 

47 _ENFORCER.clear() 

48 _ENFORCER = None 

49 

50 

51def init(policy_file=None, rules=None, default_rule=None, use_conf=True, 

52 suppress_deprecation_warnings=False): 

53 """Init an Enforcer class. 

54 

55 :param policy_file: Custom policy file to use, if none is specified, 

56 `CONF.policy_file` will be used. 

57 :param rules: Default dictionary / Rules to use. It will be 

58 considered just in the first instantiation. 

59 :param default_rule: Default rule to use, CONF.default_rule will 

60 be used if none is specified. 

61 :param use_conf: Whether to load rules from config file. 

62 :param suppress_deprecation_warnings: Whether to suppress the 

63 deprecation warnings. 

64 """ 

65 

66 global _ENFORCER 

67 global saved_file_rules 

68 

69 if not _ENFORCER: 

70 _ENFORCER = policy.Enforcer( 

71 CONF, 

72 policy_file=policy_file, 

73 rules=rules, 

74 default_rule=default_rule, 

75 use_conf=use_conf) 

76 # NOTE(gmann): Explicitly disable the warnings for policies 

77 # changing their default check_str. During policy-defaults-refresh 

78 # work, all the policy defaults have been changed and warning for 

79 # each policy started filling the logs limit for various tool. 

80 # Once we move to new defaults only world then we can enable these 

81 # warning again. 

82 _ENFORCER.suppress_default_change_warnings = True 

83 if suppress_deprecation_warnings: 83 ↛ 85line 83 didn't jump to line 85 because the condition on line 83 was always true

84 _ENFORCER.suppress_deprecation_warnings = True 

85 register_rules(_ENFORCER) 

86 _ENFORCER.load_rules() 

87 

88 # Only the rules which are loaded from file may be changed. 

89 current_file_rules = _ENFORCER.file_rules 

90 current_file_rules = _serialize_rules(current_file_rules) 

91 

92 # Checks whether the rules are updated in the runtime 

93 if saved_file_rules != current_file_rules: 

94 _warning_for_deprecated_user_based_rules(current_file_rules) 

95 saved_file_rules = copy.deepcopy(current_file_rules) 

96 

97 

98def _serialize_rules(rules): 

99 """Serialize all the Rule object as string which is used to compare the 

100 rules list. 

101 """ 

102 result = [(rule_name, str(rule)) 

103 for rule_name, rule in rules.items()] 

104 return sorted(result, key=lambda rule: rule[0]) 

105 

106 

107def _warning_for_deprecated_user_based_rules(rules): 

108 """Warning user based policy enforcement used in the rule but the rule 

109 doesn't support it. 

110 """ 

111 for rule in rules: 

112 # We will skip the warning for the resources which support user based 

113 # policy enforcement. 

114 if [resource for resource in USER_BASED_RESOURCES 

115 if resource in rule[0]]: 

116 continue 

117 if 'user_id' in KEY_EXPR.findall(rule[1]): 

118 LOG.warning( 

119 "The user_id attribute isn't supported in the rule '%s'. " 

120 "All the user_id based policy enforcement will be removed in " 

121 "the future.", 

122 rule[0] 

123 ) 

124 

125 

126def set_rules(rules, overwrite=True, use_conf=False): 

127 """Set rules based on the provided dict of rules. 

128 

129 :param rules: New rules to use. It should be an instance of dict. 

130 :param overwrite: Whether to overwrite current rules or update them 

131 with the new rules. 

132 :param use_conf: Whether to reload rules from config file. 

133 """ 

134 

135 init(use_conf=False) 

136 _ENFORCER.set_rules(rules, overwrite, use_conf) 

137 

138 

139def authorize(context, action, target=None, do_raise=True, exc=None): 

140 """Verifies that the action is valid on the target in this context. 

141 

142 :param context: nova context 

143 :param action: string representing the action to be checked 

144 this should be colon separated for clarity. 

145 i.e. ``compute:create_instance``, 

146 ``compute:attach_volume``, 

147 ``volume:attach_volume`` 

148 :param target: dictionary representing the object of the action 

149 for object creation this should be a dictionary representing the 

150 location of the object e.g. ``{'project_id': instance.project_id}`` 

151 If None, then this default target will be considered: 

152 {'project_id': self.project_id, 'user_id': self.user_id} 

153 :param do_raise: if True (the default), raises PolicyNotAuthorized; 

154 if False, returns False 

155 :param exc: Class of the exception to raise if the check fails. 

156 Any remaining arguments passed to :meth:`authorize` (both 

157 positional and keyword arguments) will be passed to 

158 the exception class. If not specified, 

159 :class:`PolicyNotAuthorized` will be used. 

160 

161 :raises nova.exception.PolicyNotAuthorized: if verification fails 

162 and do_raise is True. Or if 'exc' is specified it will raise an 

163 exception of that type. 

164 

165 :return: returns a non-False value (not necessarily "True") if 

166 authorized, and the exact value False if not authorized and 

167 do_raise is False. 

168 """ 

169 init() 

170 if not exc: 170 ↛ 175line 170 didn't jump to line 175 because the condition on line 170 was always true

171 exc = exception.PolicyNotAuthorized 

172 

173 # Legacy fallback for empty target from context.can() 

174 # should be removed once we improve testing and scope checks 

175 if target is None: 

176 target = default_target(context) 

177 

178 try: 

179 result = _ENFORCER.authorize(action, target, context, 

180 do_raise=do_raise, exc=exc, action=action) 

181 except policy.PolicyNotRegistered: 

182 with excutils.save_and_reraise_exception(): 

183 LOG.exception('Policy not registered') 

184 except policy.InvalidScope: 

185 LOG.debug('Policy check for %(action)s failed with scope check ' 

186 '%(credentials)s', 

187 {'action': action, 

188 'credentials': context.to_policy_values()}) 

189 raise exc(action=action) 

190 except Exception: 

191 with excutils.save_and_reraise_exception(): 

192 LOG.debug('Policy check for %(action)s failed with credentials ' 

193 '%(credentials)s', 

194 {'action': action, 

195 'credentials': context.to_policy_values()}) 

196 return result 

197 

198 

199def default_target(context): 

200 return {'project_id': context.project_id, 'user_id': context.user_id} 

201 

202 

203def check_is_admin(context): 

204 """Whether or not roles contains 'admin' role according to policy setting. 

205 

206 """ 

207 

208 init() 

209 # the target is user-self 

210 target = default_target(context) 

211 return _ENFORCER.authorize('context_is_admin', target, context) 

212 

213 

214@policy.register('is_admin') 

215class IsAdminCheck(policy.Check): 

216 """An explicit check for is_admin.""" 

217 

218 def __init__(self, kind, match): 

219 """Initialize the check.""" 

220 

221 self.expected = (match.lower() == 'true') 

222 

223 super(IsAdminCheck, self).__init__(kind, str(self.expected)) 

224 

225 def __call__(self, target, creds, enforcer): 

226 """Determine whether is_admin matches the requested value.""" 

227 

228 return creds['is_admin'] == self.expected 

229 

230 

231def get_rules(): 

232 if _ENFORCER: 232 ↛ exitline 232 didn't return from function 'get_rules' because the condition on line 232 was always true

233 return _ENFORCER.rules 

234 

235 

236def register_rules(enforcer): 

237 enforcer.register_defaults(policies.list_rules()) 

238 

239 

240def get_enforcer(): 

241 # This method is used by oslopolicy CLI scripts in order to generate policy 

242 # files from overrides on disk and defaults in code. 

243 cfg.CONF([], project='nova') 

244 init() 

245 return _ENFORCER 

246 

247 

248def verify_deprecated_policy(old_policy, new_policy, default_rule, context): 

249 """Check the rule of the deprecated policy action 

250 

251 If the current rule of the deprecated policy action is set to a non-default 

252 value, then a warning message is logged stating that the new policy 

253 action should be used to dictate permissions as the old policy action is 

254 being deprecated. 

255 

256 :param old_policy: policy action that is being deprecated 

257 :param new_policy: policy action that is replacing old_policy 

258 :param default_rule: the old_policy action default rule value 

259 :param context: the nova context 

260 """ 

261 

262 if _ENFORCER: 262 ↛ 265line 262 didn't jump to line 265 because the condition on line 262 was always true

263 current_rule = str(_ENFORCER.rules[old_policy]) 

264 else: 

265 current_rule = None 

266 

267 if current_rule != default_rule: 

268 LOG.warning("Start using the new action '%(new_policy)s'. " 

269 "The existing action '%(old_policy)s' is being deprecated " 

270 "and will be removed in future release.", 

271 {'new_policy': new_policy, 'old_policy': old_policy}) 

272 context.can(old_policy) 

273 return True 

274 else: 

275 return False