Coverage for nova/cmd/policy.py: 96%

69 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-17 15:08 +0000

1# Copyright 2016 Cloudbase Solutions Srl 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16""" 

17 CLI interface for nova policy rule commands. 

18""" 

19 

20import functools 

21import os 

22import sys 

23 

24from oslo_config import cfg 

25 

26from nova.cmd import common as cmd_common 

27import nova.conf 

28from nova import config 

29from nova import context as nova_context 

30from nova.db.main import api as db 

31from nova import exception 

32from nova.i18n import _ 

33from nova import policies 

34from nova import version 

35 

36CONF = nova.conf.CONF 

37 

38 

39cli_opts = [ 

40 cfg.ListOpt( 

41 'os-roles', 

42 metavar='<auth-roles>', 

43 default=os.environ.get('OS_ROLES'), 

44 help=_('Defaults to env[OS_ROLES].')), 

45 cfg.StrOpt( 

46 'os-user-id', 

47 metavar='<auth-user-id>', 

48 default=os.environ.get('OS_USER_ID'), 

49 help=_('Defaults to env[OS_USER_ID].')), 

50 cfg.StrOpt( 

51 'os-tenant-id', 

52 metavar='<auth-tenant-id>', 

53 default=os.environ.get('OS_TENANT_ID'), 

54 help=_('Defaults to env[OS_TENANT_ID].')), 

55] 

56 

57 

58class PolicyCommands(object): 

59 """Commands for policy rules.""" 

60 

61 _ACCEPTABLE_TARGETS = [ 

62 'project_id', 'user_id', 'quota_class', 'availability_zone', 

63 'instance_id'] 

64 

65 @cmd_common.args( 

66 '--api-name', dest='api_name', metavar='<name>', 

67 help=( 

68 'Return only the passing policy rules containing the given API ' 

69 'name. If unspecified, all passing policy rules will be returned.' 

70 ), 

71 ) 

72 @cmd_common.args( 

73 '--target', nargs='+', dest='target', metavar='<target>', 

74 help=( 

75 "The target(s) against which the policy rule authorization will " 

76 "be tested. The available targets are: %s. When 'instance_id' is " 

77 "used, the other targets will be overwritten. If unspecified, the " 

78 "given user will be considered the target." % ', '.join( 

79 _ACCEPTABLE_TARGETS 

80 ) 

81 ), 

82 ) 

83 def check(self, api_name=None, target=None): 

84 """Prints all passing policy rules for the given user.""" 

85 context = self._get_context() 

86 api_name = api_name or '' 

87 target = self._get_target(context, target) 

88 

89 allowed_operations = self._filter_rules(context, api_name, target) 

90 

91 if allowed_operations: 91 ↛ 95line 91 didn't jump to line 95 because the condition on line 91 was always true

92 print('\n'.join(allowed_operations)) 

93 return 0 

94 else: 

95 print('No rules matched or allowed') 

96 return 1 

97 

98 def _get_context(self): 

99 return nova_context.RequestContext( 

100 roles=CONF.os_roles, 

101 user_id=CONF.os_user_id, 

102 project_id=CONF.os_tenant_id) 

103 

104 def _get_target(self, context, target): 

105 """Processes and validates the CLI given target and adapts it for 

106 policy authorization. 

107 

108 :returns: None if the given target is None, otherwise returns a proper 

109 authorization target. 

110 :raises nova.exception.InvalidAttribute: if a key in the given target 

111 is not an acceptable. 

112 :raises nova.exception.InstanceNotFound: if 'instance_id' is given, and 

113 there is no instance match the id. 

114 """ 

115 if not target: 

116 return None 

117 

118 new_target = {} 

119 for t in target: 

120 key, value = t.split('=') 

121 if key not in self._ACCEPTABLE_TARGETS: 

122 raise exception.InvalidAttribute(attr=key) 

123 new_target[key] = value 

124 

125 # if the target is an instance_id, return an instance instead. 

126 instance_id = new_target.get('instance_id') 

127 if instance_id: 

128 admin_ctxt = nova_context.get_admin_context() 

129 instance = db.instance_get_by_uuid(admin_ctxt, instance_id) 

130 new_target = {'user_id': instance['user_id'], 

131 'project_id': instance['project_id']} 

132 

133 return new_target 

134 

135 def _filter_rules(self, context, api_name, target): 

136 all_rules = policies.list_rules() 

137 return [rule.name for rule in all_rules if api_name in rule.name and 

138 context.can(rule.name, target, fatal=False)] 

139 

140 

141CATEGORIES = { 

142 'policy': PolicyCommands, 

143} 

144 

145 

146add_command_parsers = functools.partial(cmd_common.add_command_parsers, 

147 categories=CATEGORIES) 

148 

149 

150category_opt = cfg.SubCommandOpt('category', 

151 title='Command categories', 

152 help='Available categories', 

153 handler=add_command_parsers) 

154 

155 

156def main(): 

157 """Parse options and call the appropriate class/method.""" 

158 CONF.register_cli_opts(cli_opts) 

159 CONF.register_cli_opt(category_opt) 

160 config.parse_args(sys.argv) 

161 

162 if CONF.category.name == "version": 

163 print(version.version_string_with_package()) 

164 return 0 

165 

166 if CONF.category.name == "bash-completion": 

167 cmd_common.print_bash_completion(CATEGORIES) 

168 return 0 

169 

170 try: 

171 fn, fn_args, fn_kwargs = cmd_common.get_action_fn() 

172 ret = fn(*fn_args, **fn_kwargs) 

173 return ret 

174 except Exception as ex: 

175 print(_("error: %s") % ex) 

176 return 1