Coverage for nova/cmd/policy.py: 96%
69 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-24 11:16 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-24 11:16 +0000
1# Copyright 2016 Cloudbase Solutions Srl
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""
17 CLI interface for nova policy rule commands.
18"""
20import functools
21import os
22import sys
24from oslo_config import cfg
26from nova.cmd import common as cmd_common
27import nova.conf
28from nova import config
29from nova import context as nova_context
30from nova.db.main import api as db
31from nova import exception
32from nova.i18n import _
33from nova import policies
34from nova import version
36CONF = nova.conf.CONF
39cli_opts = [
40 cfg.ListOpt(
41 'os-roles',
42 metavar='<auth-roles>',
43 default=os.environ.get('OS_ROLES'),
44 help=_('Defaults to env[OS_ROLES].')),
45 cfg.StrOpt(
46 'os-user-id',
47 metavar='<auth-user-id>',
48 default=os.environ.get('OS_USER_ID'),
49 help=_('Defaults to env[OS_USER_ID].')),
50 cfg.StrOpt(
51 'os-tenant-id',
52 metavar='<auth-tenant-id>',
53 default=os.environ.get('OS_TENANT_ID'),
54 help=_('Defaults to env[OS_TENANT_ID].')),
55]
58class PolicyCommands(object):
59 """Commands for policy rules."""
61 _ACCEPTABLE_TARGETS = [
62 'project_id', 'user_id', 'quota_class', 'availability_zone',
63 'instance_id']
65 @cmd_common.args(
66 '--api-name', dest='api_name', metavar='<name>',
67 help=(
68 'Return only the passing policy rules containing the given API '
69 'name. If unspecified, all passing policy rules will be returned.'
70 ),
71 )
72 @cmd_common.args(
73 '--target', nargs='+', dest='target', metavar='<target>',
74 help=(
75 "The target(s) against which the policy rule authorization will "
76 "be tested. The available targets are: %s. When 'instance_id' is "
77 "used, the other targets will be overwritten. If unspecified, the "
78 "given user will be considered the target." % ', '.join(
79 _ACCEPTABLE_TARGETS
80 )
81 ),
82 )
83 def check(self, api_name=None, target=None):
84 """Prints all passing policy rules for the given user."""
85 context = self._get_context()
86 api_name = api_name or ''
87 target = self._get_target(context, target)
89 allowed_operations = self._filter_rules(context, api_name, target)
91 if allowed_operations: 91 ↛ 95line 91 didn't jump to line 95 because the condition on line 91 was always true
92 print('\n'.join(allowed_operations))
93 return 0
94 else:
95 print('No rules matched or allowed')
96 return 1
98 def _get_context(self):
99 return nova_context.RequestContext(
100 roles=CONF.os_roles,
101 user_id=CONF.os_user_id,
102 project_id=CONF.os_tenant_id)
104 def _get_target(self, context, target):
105 """Processes and validates the CLI given target and adapts it for
106 policy authorization.
108 :returns: None if the given target is None, otherwise returns a proper
109 authorization target.
110 :raises nova.exception.InvalidAttribute: if a key in the given target
111 is not an acceptable.
112 :raises nova.exception.InstanceNotFound: if 'instance_id' is given, and
113 there is no instance match the id.
114 """
115 if not target:
116 return None
118 new_target = {}
119 for t in target:
120 key, value = t.split('=')
121 if key not in self._ACCEPTABLE_TARGETS:
122 raise exception.InvalidAttribute(attr=key)
123 new_target[key] = value
125 # if the target is an instance_id, return an instance instead.
126 instance_id = new_target.get('instance_id')
127 if instance_id:
128 admin_ctxt = nova_context.get_admin_context()
129 instance = db.instance_get_by_uuid(admin_ctxt, instance_id)
130 new_target = {'user_id': instance['user_id'],
131 'project_id': instance['project_id']}
133 return new_target
135 def _filter_rules(self, context, api_name, target):
136 all_rules = policies.list_rules()
137 return [rule.name for rule in all_rules if api_name in rule.name and
138 context.can(rule.name, target, fatal=False)]
141CATEGORIES = {
142 'policy': PolicyCommands,
143}
146add_command_parsers = functools.partial(cmd_common.add_command_parsers,
147 categories=CATEGORIES)
150category_opt = cfg.SubCommandOpt('category',
151 title='Command categories',
152 help='Available categories',
153 handler=add_command_parsers)
156def main():
157 """Parse options and call the appropriate class/method."""
158 CONF.register_cli_opts(cli_opts)
159 CONF.register_cli_opt(category_opt)
160 config.parse_args(sys.argv)
162 if CONF.category.name == "version":
163 print(version.version_string_with_package())
164 return 0
166 if CONF.category.name == "bash-completion":
167 cmd_common.print_bash_completion(CATEGORIES)
168 return 0
170 try:
171 fn, fn_args, fn_kwargs = cmd_common.get_action_fn()
172 ret = fn(*fn_args, **fn_kwargs)
173 return ret
174 except Exception as ex:
175 print(_("error: %s") % ex)
176 return 1