Coverage for nova/api/openstack/compute/keypairs.py: 100%

151 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-24 11:16 +0000

1# Copyright 2011 OpenStack Foundation 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""Keypair management extension.""" 

17 

18import webob 

19import webob.exc 

20 

21from nova.api.openstack import api_version_request 

22from nova.api.openstack import common 

23from nova.api.openstack.compute.schemas import keypairs 

24from nova.api.openstack.compute.views import keypairs as keypairs_view 

25from nova.api.openstack import wsgi 

26from nova.api import validation 

27from nova.compute import api as compute_api 

28from nova import exception 

29from nova.objects import keypair as keypair_obj 

30from nova.policies import keypairs as kp_policies 

31 

32 

33class KeypairController(wsgi.Controller): 

34 

35 """Keypair API controller for the OpenStack API.""" 

36 

37 _view_builder_class = keypairs_view.ViewBuilder 

38 

39 def __init__(self): 

40 super(KeypairController, self).__init__() 

41 self.api = compute_api.KeypairAPI() 

42 

43 @wsgi.Controller.api_version("2.10") 

44 @wsgi.response(201) 

45 @wsgi.expected_errors((400, 403, 409)) 

46 @validation.schema(keypairs.create_v210, "2.10", "2.91") 

47 @validation.schema(keypairs.create_v292, "2.92") 

48 def create(self, req, body): 

49 """Create or import keypair. 

50 

51 Keypair generations are allowed until version 2.91. 

52 Afterwards, only imports are allowed. 

53 

54 A policy check restricts users from creating keys for other users 

55 

56 params: keypair object with: 

57 name (required) - string 

58 public_key (optional or required if >=2.92) - string 

59 type (optional) - string 

60 user_id (optional) - string 

61 """ 

62 # handle optional user-id for admin only 

63 user_id = body['keypair'].get('user_id') 

64 return self._create(req, body, key_type=True, user_id=user_id) 

65 

66 @wsgi.Controller.api_version("2.2", "2.9") # noqa 

67 @wsgi.response(201) 

68 @wsgi.expected_errors((400, 403, 409)) 

69 @validation.schema(keypairs.create_v22) 

70 def create(self, req, body): # noqa 

71 """Create or import keypair. 

72 

73 Sending name will generate a key and return private_key 

74 and fingerprint. 

75 

76 Keypair will have the type ssh or x509, specified by type. 

77 

78 You can send a public_key to add an existing ssh/x509 key. 

79 

80 params: keypair object with: 

81 name (required) - string 

82 public_key (optional) - string 

83 type (optional) - string 

84 """ 

85 return self._create(req, body, key_type=True) 

86 

87 @wsgi.Controller.api_version("2.1", "2.1") # noqa 

88 @wsgi.expected_errors((400, 403, 409)) 

89 @validation.schema(keypairs.create_v20, "2.0", "2.0") 

90 @validation.schema(keypairs.create, "2.1", "2.1") 

91 def create(self, req, body): # noqa 

92 """Create or import keypair. 

93 

94 Sending name will generate a key and return private_key 

95 and fingerprint. 

96 

97 You can send a public_key to add an existing ssh key. 

98 

99 params: keypair object with: 

100 name (required) - string 

101 public_key (optional) - string 

102 """ 

103 return self._create(req, body) 

104 

105 def _create(self, req, body, user_id=None, key_type=False): 

106 context = req.environ['nova.context'] 

107 params = body['keypair'] 

108 name = common.normalize_name(params['name']) 

109 key_type_value = params.get('type', keypair_obj.KEYPAIR_TYPE_SSH) 

110 user_id = user_id or context.user_id 

111 context.can(kp_policies.POLICY_ROOT % 'create', 

112 target={'user_id': user_id}) 

113 

114 return_priv_key = False 

115 try: 

116 if 'public_key' in params: 

117 keypair = self.api.import_key_pair( 

118 context, user_id, name, params['public_key'], 

119 key_type_value) 

120 else: 

121 # public_key is a required field starting with 2.92 so this 

122 # generation should only happen with older versions. 

123 keypair, private_key = self.api.create_key_pair( 

124 context, user_id, name, key_type_value) 

125 keypair['private_key'] = private_key 

126 return_priv_key = True 

127 except exception.KeypairLimitExceeded as e: 

128 raise webob.exc.HTTPForbidden(explanation=str(e)) 

129 except exception.InvalidKeypair as exc: 

130 raise webob.exc.HTTPBadRequest(explanation=exc.format_message()) 

131 except exception.KeyPairExists as exc: 

132 raise webob.exc.HTTPConflict(explanation=exc.format_message()) 

133 

134 return self._view_builder.create(keypair, 

135 private_key=return_priv_key, 

136 key_type=key_type) 

137 

138 @wsgi.Controller.api_version("2.1", "2.1") 

139 @validation.query_schema(keypairs.delete_query_schema_v20) 

140 @wsgi.response(202) 

141 @wsgi.expected_errors(404) 

142 def delete(self, req, id): 

143 self._delete(req, id) 

144 

145 @wsgi.Controller.api_version("2.2", "2.9") # noqa 

146 @validation.query_schema(keypairs.delete_query_schema_v20) 

147 @wsgi.response(204) 

148 @wsgi.expected_errors(404) 

149 def delete(self, req, id): # noqa 

150 self._delete(req, id) 

151 

152 @wsgi.Controller.api_version("2.10") # noqa 

153 @validation.query_schema(keypairs.delete_query_schema_v275, '2.75') 

154 @validation.query_schema(keypairs.delete_query_schema_v210, '2.10', '2.74') 

155 @wsgi.response(204) 

156 @wsgi.expected_errors(404) 

157 def delete(self, req, id): # noqa 

158 # handle optional user-id for admin only 

159 user_id = self._get_user_id(req) 

160 self._delete(req, id, user_id=user_id) 

161 

162 def _delete(self, req, id, user_id=None): 

163 """Delete a keypair with a given name.""" 

164 context = req.environ['nova.context'] 

165 # handle optional user-id for admin only 

166 user_id = user_id or context.user_id 

167 context.can(kp_policies.POLICY_ROOT % 'delete', 

168 target={'user_id': user_id}) 

169 try: 

170 self.api.delete_key_pair(context, user_id, id) 

171 except exception.KeypairNotFound as exc: 

172 raise webob.exc.HTTPNotFound(explanation=exc.format_message()) 

173 

174 def _get_user_id(self, req): 

175 if 'user_id' in req.GET.keys(): 

176 user_id = req.GET.getall('user_id')[0] 

177 return user_id 

178 

179 @wsgi.Controller.api_version("2.10") 

180 @validation.query_schema(keypairs.show_query_schema_v210, '2.10', '2.74') 

181 @validation.query_schema(keypairs.show_query_schema_v275, '2.75') 

182 @wsgi.expected_errors(404) 

183 def show(self, req, id): 

184 # handle optional user-id for admin only 

185 user_id = self._get_user_id(req) 

186 return self._show(req, id, key_type=True, user_id=user_id) 

187 

188 @wsgi.Controller.api_version("2.2", "2.9") # noqa 

189 @validation.query_schema(keypairs.show_query_schema_v20) 

190 @wsgi.expected_errors(404) 

191 def show(self, req, id): # noqa 

192 return self._show(req, id, key_type=True) 

193 

194 @wsgi.Controller.api_version("2.1", "2.1") # noqa 

195 @validation.query_schema(keypairs.show_query_schema_v20) 

196 @wsgi.expected_errors(404) 

197 def show(self, req, id): # noqa 

198 return self._show(req, id) 

199 

200 def _show(self, req, id, key_type=False, user_id=None): 

201 """Return data for the given key name.""" 

202 context = req.environ['nova.context'] 

203 user_id = user_id or context.user_id 

204 context.can(kp_policies.POLICY_ROOT % 'show', 

205 target={'user_id': user_id}) 

206 

207 try: 

208 keypair = self.api.get_key_pair(context, user_id, id) 

209 except exception.KeypairNotFound as exc: 

210 raise webob.exc.HTTPNotFound(explanation=exc.format_message()) 

211 return self._view_builder.show(keypair, key_type=key_type) 

212 

213 @wsgi.Controller.api_version("2.35") 

214 @validation.query_schema(keypairs.index_query_schema_v275, '2.75') 

215 @validation.query_schema(keypairs.index_query_schema_v235, '2.35', '2.74') 

216 @wsgi.expected_errors(400) 

217 def index(self, req): 

218 user_id = self._get_user_id(req) 

219 return self._index(req, key_type=True, user_id=user_id, links=True) 

220 

221 @wsgi.Controller.api_version("2.10", "2.34") # noqa 

222 @validation.query_schema(keypairs.index_query_schema_v210) 

223 @wsgi.expected_errors(()) 

224 def index(self, req): # noqa 

225 # handle optional user-id for admin only 

226 user_id = self._get_user_id(req) 

227 return self._index(req, key_type=True, user_id=user_id) 

228 

229 @wsgi.Controller.api_version("2.2", "2.9") # noqa 

230 @validation.query_schema(keypairs.index_query_schema_v20) 

231 @wsgi.expected_errors(()) 

232 def index(self, req): # noqa 

233 return self._index(req, key_type=True) 

234 

235 @wsgi.Controller.api_version("2.1", "2.1") # noqa 

236 @validation.query_schema(keypairs.index_query_schema_v20) 

237 @wsgi.expected_errors(()) 

238 def index(self, req): # noqa 

239 return self._index(req) 

240 

241 def _index(self, req, key_type=False, user_id=None, links=False): 

242 """List of keypairs for a user.""" 

243 context = req.environ['nova.context'] 

244 user_id = user_id or context.user_id 

245 context.can(kp_policies.POLICY_ROOT % 'index', 

246 target={'user_id': user_id}) 

247 

248 if api_version_request.is_supported(req, min_version='2.35'): 

249 limit, marker = common.get_limit_and_marker(req) 

250 else: 

251 limit = marker = None 

252 

253 try: 

254 key_pairs = self.api.get_key_pairs( 

255 context, user_id, limit=limit, marker=marker) 

256 except exception.MarkerNotFound as e: 

257 raise webob.exc.HTTPBadRequest(explanation=e.format_message()) 

258 

259 return self._view_builder.index(req, key_pairs, key_type=key_type, 

260 links=links)