Coverage for nova/api/openstack/compute/keypairs.py: 100%
151 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
1# Copyright 2011 OpenStack Foundation
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""Keypair management extension."""
18import webob
19import webob.exc
21from nova.api.openstack import api_version_request
22from nova.api.openstack import common
23from nova.api.openstack.compute.schemas import keypairs
24from nova.api.openstack.compute.views import keypairs as keypairs_view
25from nova.api.openstack import wsgi
26from nova.api import validation
27from nova.compute import api as compute_api
28from nova import exception
29from nova.objects import keypair as keypair_obj
30from nova.policies import keypairs as kp_policies
33class KeypairController(wsgi.Controller):
35 """Keypair API controller for the OpenStack API."""
37 _view_builder_class = keypairs_view.ViewBuilder
39 def __init__(self):
40 super(KeypairController, self).__init__()
41 self.api = compute_api.KeypairAPI()
43 @wsgi.Controller.api_version("2.10")
44 @wsgi.response(201)
45 @wsgi.expected_errors((400, 403, 409))
46 @validation.schema(keypairs.create_v210, "2.10", "2.91")
47 @validation.schema(keypairs.create_v292, "2.92")
48 def create(self, req, body):
49 """Create or import keypair.
51 Keypair generations are allowed until version 2.91.
52 Afterwards, only imports are allowed.
54 A policy check restricts users from creating keys for other users
56 params: keypair object with:
57 name (required) - string
58 public_key (optional or required if >=2.92) - string
59 type (optional) - string
60 user_id (optional) - string
61 """
62 # handle optional user-id for admin only
63 user_id = body['keypair'].get('user_id')
64 return self._create(req, body, key_type=True, user_id=user_id)
66 @wsgi.Controller.api_version("2.2", "2.9") # noqa
67 @wsgi.response(201)
68 @wsgi.expected_errors((400, 403, 409))
69 @validation.schema(keypairs.create_v22)
70 def create(self, req, body): # noqa
71 """Create or import keypair.
73 Sending name will generate a key and return private_key
74 and fingerprint.
76 Keypair will have the type ssh or x509, specified by type.
78 You can send a public_key to add an existing ssh/x509 key.
80 params: keypair object with:
81 name (required) - string
82 public_key (optional) - string
83 type (optional) - string
84 """
85 return self._create(req, body, key_type=True)
87 @wsgi.Controller.api_version("2.1", "2.1") # noqa
88 @wsgi.expected_errors((400, 403, 409))
89 @validation.schema(keypairs.create_v20, "2.0", "2.0")
90 @validation.schema(keypairs.create, "2.1", "2.1")
91 def create(self, req, body): # noqa
92 """Create or import keypair.
94 Sending name will generate a key and return private_key
95 and fingerprint.
97 You can send a public_key to add an existing ssh key.
99 params: keypair object with:
100 name (required) - string
101 public_key (optional) - string
102 """
103 return self._create(req, body)
105 def _create(self, req, body, user_id=None, key_type=False):
106 context = req.environ['nova.context']
107 params = body['keypair']
108 name = common.normalize_name(params['name'])
109 key_type_value = params.get('type', keypair_obj.KEYPAIR_TYPE_SSH)
110 user_id = user_id or context.user_id
111 context.can(kp_policies.POLICY_ROOT % 'create',
112 target={'user_id': user_id})
114 return_priv_key = False
115 try:
116 if 'public_key' in params:
117 keypair = self.api.import_key_pair(
118 context, user_id, name, params['public_key'],
119 key_type_value)
120 else:
121 # public_key is a required field starting with 2.92 so this
122 # generation should only happen with older versions.
123 keypair, private_key = self.api.create_key_pair(
124 context, user_id, name, key_type_value)
125 keypair['private_key'] = private_key
126 return_priv_key = True
127 except exception.KeypairLimitExceeded as e:
128 raise webob.exc.HTTPForbidden(explanation=str(e))
129 except exception.InvalidKeypair as exc:
130 raise webob.exc.HTTPBadRequest(explanation=exc.format_message())
131 except exception.KeyPairExists as exc:
132 raise webob.exc.HTTPConflict(explanation=exc.format_message())
134 return self._view_builder.create(keypair,
135 private_key=return_priv_key,
136 key_type=key_type)
138 @wsgi.Controller.api_version("2.1", "2.1")
139 @validation.query_schema(keypairs.delete_query_schema_v20)
140 @wsgi.response(202)
141 @wsgi.expected_errors(404)
142 def delete(self, req, id):
143 self._delete(req, id)
145 @wsgi.Controller.api_version("2.2", "2.9") # noqa
146 @validation.query_schema(keypairs.delete_query_schema_v20)
147 @wsgi.response(204)
148 @wsgi.expected_errors(404)
149 def delete(self, req, id): # noqa
150 self._delete(req, id)
152 @wsgi.Controller.api_version("2.10") # noqa
153 @validation.query_schema(keypairs.delete_query_schema_v275, '2.75')
154 @validation.query_schema(keypairs.delete_query_schema_v210, '2.10', '2.74')
155 @wsgi.response(204)
156 @wsgi.expected_errors(404)
157 def delete(self, req, id): # noqa
158 # handle optional user-id for admin only
159 user_id = self._get_user_id(req)
160 self._delete(req, id, user_id=user_id)
162 def _delete(self, req, id, user_id=None):
163 """Delete a keypair with a given name."""
164 context = req.environ['nova.context']
165 # handle optional user-id for admin only
166 user_id = user_id or context.user_id
167 context.can(kp_policies.POLICY_ROOT % 'delete',
168 target={'user_id': user_id})
169 try:
170 self.api.delete_key_pair(context, user_id, id)
171 except exception.KeypairNotFound as exc:
172 raise webob.exc.HTTPNotFound(explanation=exc.format_message())
174 def _get_user_id(self, req):
175 if 'user_id' in req.GET.keys():
176 user_id = req.GET.getall('user_id')[0]
177 return user_id
179 @wsgi.Controller.api_version("2.10")
180 @validation.query_schema(keypairs.show_query_schema_v210, '2.10', '2.74')
181 @validation.query_schema(keypairs.show_query_schema_v275, '2.75')
182 @wsgi.expected_errors(404)
183 def show(self, req, id):
184 # handle optional user-id for admin only
185 user_id = self._get_user_id(req)
186 return self._show(req, id, key_type=True, user_id=user_id)
188 @wsgi.Controller.api_version("2.2", "2.9") # noqa
189 @validation.query_schema(keypairs.show_query_schema_v20)
190 @wsgi.expected_errors(404)
191 def show(self, req, id): # noqa
192 return self._show(req, id, key_type=True)
194 @wsgi.Controller.api_version("2.1", "2.1") # noqa
195 @validation.query_schema(keypairs.show_query_schema_v20)
196 @wsgi.expected_errors(404)
197 def show(self, req, id): # noqa
198 return self._show(req, id)
200 def _show(self, req, id, key_type=False, user_id=None):
201 """Return data for the given key name."""
202 context = req.environ['nova.context']
203 user_id = user_id or context.user_id
204 context.can(kp_policies.POLICY_ROOT % 'show',
205 target={'user_id': user_id})
207 try:
208 keypair = self.api.get_key_pair(context, user_id, id)
209 except exception.KeypairNotFound as exc:
210 raise webob.exc.HTTPNotFound(explanation=exc.format_message())
211 return self._view_builder.show(keypair, key_type=key_type)
213 @wsgi.Controller.api_version("2.35")
214 @validation.query_schema(keypairs.index_query_schema_v275, '2.75')
215 @validation.query_schema(keypairs.index_query_schema_v235, '2.35', '2.74')
216 @wsgi.expected_errors(400)
217 def index(self, req):
218 user_id = self._get_user_id(req)
219 return self._index(req, key_type=True, user_id=user_id, links=True)
221 @wsgi.Controller.api_version("2.10", "2.34") # noqa
222 @validation.query_schema(keypairs.index_query_schema_v210)
223 @wsgi.expected_errors(())
224 def index(self, req): # noqa
225 # handle optional user-id for admin only
226 user_id = self._get_user_id(req)
227 return self._index(req, key_type=True, user_id=user_id)
229 @wsgi.Controller.api_version("2.2", "2.9") # noqa
230 @validation.query_schema(keypairs.index_query_schema_v20)
231 @wsgi.expected_errors(())
232 def index(self, req): # noqa
233 return self._index(req, key_type=True)
235 @wsgi.Controller.api_version("2.1", "2.1") # noqa
236 @validation.query_schema(keypairs.index_query_schema_v20)
237 @wsgi.expected_errors(())
238 def index(self, req): # noqa
239 return self._index(req)
241 def _index(self, req, key_type=False, user_id=None, links=False):
242 """List of keypairs for a user."""
243 context = req.environ['nova.context']
244 user_id = user_id or context.user_id
245 context.can(kp_policies.POLICY_ROOT % 'index',
246 target={'user_id': user_id})
248 if api_version_request.is_supported(req, min_version='2.35'):
249 limit, marker = common.get_limit_and_marker(req)
250 else:
251 limit = marker = None
253 try:
254 key_pairs = self.api.get_key_pairs(
255 context, user_id, limit=limit, marker=marker)
256 except exception.MarkerNotFound as e:
257 raise webob.exc.HTTPBadRequest(explanation=e.format_message())
259 return self._view_builder.index(req, key_pairs, key_type=key_type,
260 links=links)