Coverage for nova/api/openstack/urlmap.py: 88%

167 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-17 15:08 +0000

1# Copyright 2011 OpenStack Foundation 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import re 

17from urllib import request as urllib2 

18 

19from oslo_log import log as logging 

20import paste.urlmap 

21 

22from nova.api.openstack import wsgi 

23 

24 

25LOG = logging.getLogger(__name__) 

26 

27 

28_quoted_string_re = r'"[^"\\]*(?:\\.[^"\\]*)*"' 

29_option_header_piece_re = re.compile(r';\s*([^\s;=]+|%s)\s*' 

30 r'(?:=\s*([^;]+|%s))?\s*' % 

31 (_quoted_string_re, _quoted_string_re)) 

32 

33 

34def unquote_header_value(value): 

35 """Unquotes a header value. 

36 This does not use the real unquoting but what browsers are actually 

37 using for quoting. 

38 

39 :param value: the header value to unquote. 

40 """ 

41 if value and value[0] == value[-1] == '"': 41 ↛ 46line 41 didn't jump to line 46 because the condition on line 41 was never true

42 # this is not the real unquoting, but fixing this so that the 

43 # RFC is met will result in bugs with internet explorer and 

44 # probably some other browsers as well. IE for example is 

45 # uploading files with "C:\foo\bar.txt" as filename 

46 value = value[1:-1] 

47 return value 

48 

49 

50def parse_list_header(value): 

51 """Parse lists as described by RFC 2068 Section 2. 

52 

53 In particular, parse comma-separated lists where the elements of 

54 the list may include quoted-strings. A quoted-string could 

55 contain a comma. A non-quoted string could have quotes in the 

56 middle. Quotes are removed automatically after parsing. 

57 

58 The return value is a standard :class:`list`: 

59 

60 >>> parse_list_header('token, "quoted value"') 

61 ['token', 'quoted value'] 

62 

63 :param value: a string with a list header. 

64 :return: :class:`list` 

65 """ 

66 result = [] 

67 for item in urllib2.parse_http_list(value): 

68 if item[:1] == item[-1:] == '"': 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true

69 item = unquote_header_value(item[1:-1]) 

70 result.append(item) 

71 return result 

72 

73 

74def parse_options_header(value): 

75 """Parse a ``Content-Type`` like header into a tuple with the content 

76 type and the options: 

77 

78 >>> parse_options_header('Content-Type: text/html; mimetype=text/html') 

79 ('Content-Type:', {'mimetype': 'text/html'}) 

80 

81 :param value: the header to parse. 

82 :return: (str, options) 

83 """ 

84 def _tokenize(string): 

85 for match in _option_header_piece_re.finditer(string): 

86 key, value = match.groups() 

87 key = unquote_header_value(key) 

88 if value is not None: 

89 value = unquote_header_value(value) 

90 yield key, value 

91 

92 if not value: 

93 return '', {} 

94 

95 parts = _tokenize(';' + value) 

96 name = next(parts)[0] 

97 extra = dict(parts) 

98 return name, extra 

99 

100 

101class Accept(object): 

102 def __init__(self, value): 

103 self._content_types = [parse_options_header(v) for v in 

104 parse_list_header(value)] 

105 

106 def best_match(self, supported_content_types): 

107 # FIXME: Should we have a more sophisticated matching algorithm that 

108 # takes into account the version as well? 

109 best_quality = -1 

110 best_content_type = None 

111 best_params = {} 

112 best_match = '*/*' 

113 

114 for content_type in supported_content_types: 

115 for content_mask, params in self._content_types: 

116 try: 

117 quality = float(params.get('q', 1)) 

118 except ValueError: 

119 continue 

120 

121 if quality < best_quality: 

122 continue 

123 elif best_quality == quality: 

124 if best_match.count('*') <= content_mask.count('*'): 124 ↛ 127line 124 didn't jump to line 127 because the condition on line 124 was always true

125 continue 

126 

127 if self._match_mask(content_mask, content_type): 

128 best_quality = quality 

129 best_content_type = content_type 

130 best_params = params 

131 best_match = content_mask 

132 

133 return best_content_type, best_params 

134 

135 def _match_mask(self, mask, content_type): 

136 if '*' not in mask: 

137 return content_type == mask 

138 if mask == '*/*': 138 ↛ 140line 138 didn't jump to line 140 because the condition on line 138 was always true

139 return True 

140 mask_major = mask[:-2] 

141 content_type_major = content_type.split('/', 1)[0] 

142 return content_type_major == mask_major 

143 

144 

145def urlmap_factory(loader, global_conf, **local_conf): 

146 if 'not_found_app' in local_conf: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 not_found_app = local_conf.pop('not_found_app') 

148 else: 

149 not_found_app = global_conf.get('not_found_app') 

150 if not_found_app: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 not_found_app = loader.get_app(not_found_app, global_conf=global_conf) 

152 urlmap = URLMap(not_found_app=not_found_app) 

153 for path, app_name in local_conf.items(): 

154 path = paste.urlmap.parse_path_expression(path) 

155 app = loader.get_app(app_name, global_conf=global_conf) 

156 urlmap[path] = app 

157 return urlmap 

158 

159 

160class URLMap(paste.urlmap.URLMap): 

161 def _match(self, host, port, path_info): 

162 """Find longest match for a given URL path.""" 

163 for (domain, app_url), app in self.applications: 163 ↛ 189line 163 didn't jump to line 189 because the loop on line 163 didn't complete

164 if domain and domain != host and domain != host + ':' + port: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 continue 

166 # Rudimentary "wildcard" support: 

167 # By declaring a urlmap path ending in '/+', you're saying the 

168 # incoming path must start with everything up to and including the 

169 # '/' *and* have something after that as well. For example, path 

170 # /foo/bar/+ will match /foo/bar/baz, but not /foo/bar/ or /foo/bar 

171 # NOTE(efried): This assumes we'll never need a path URI component 

172 # that legitimately starts with '+'. (We could use a 

173 # more obscure character/sequence here in that case.) 

174 if app_url.endswith('/+'): 

175 # Must be requesting at least the path element (including /) 

176 if not path_info.startswith(app_url[:-1]): 

177 continue 

178 # ...but also must be requesting something after that / 

179 if len(path_info) < len(app_url): 

180 continue 

181 # Trim the /+ off the app_url to make it look "normal" for e.g. 

182 # proper splitting of SCRIPT_NAME and PATH_INFO. 

183 return app, app_url[:-2] 

184 # Normal (non-wildcarded) prefix match 

185 if (path_info == app_url or 

186 path_info.startswith(app_url + '/')): 

187 return app, app_url 

188 

189 return None, None 

190 

191 def _set_script_name(self, app, app_url): 

192 def wrap(environ, start_response): 

193 environ['SCRIPT_NAME'] += app_url 

194 return app(environ, start_response) 

195 

196 return wrap 

197 

198 def _munge_path(self, app, path_info, app_url): 

199 def wrap(environ, start_response): 

200 environ['SCRIPT_NAME'] += app_url 

201 environ['PATH_INFO'] = path_info[len(app_url):] 

202 return app(environ, start_response) 

203 

204 return wrap 

205 

206 def _path_strategy(self, host, port, path_info): 

207 """Check path suffix for MIME type and path prefix for API version.""" 

208 mime_type = app = app_url = None 

209 

210 parts = path_info.rsplit('.', 1) 

211 if len(parts) > 1: 

212 possible_type = 'application/' + parts[1] 

213 if possible_type in wsgi.get_supported_content_types(): 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 mime_type = possible_type 

215 

216 parts = path_info.split('/') 

217 if len(parts) > 1: 

218 possible_app, possible_app_url = self._match(host, port, path_info) 

219 # Don't use prefix if it ends up matching default 

220 if possible_app and possible_app_url: 

221 app_url = possible_app_url 

222 app = self._munge_path(possible_app, path_info, app_url) 

223 

224 return mime_type, app, app_url 

225 

226 def _content_type_strategy(self, host, port, environ): 

227 """Check Content-Type header for API version.""" 

228 app = None 

229 params = parse_options_header(environ.get('CONTENT_TYPE', ''))[1] 

230 if 'version' in params: 

231 app, app_url = self._match(host, port, '/v' + params['version']) 

232 if app: 232 ↛ 235line 232 didn't jump to line 235 because the condition on line 232 was always true

233 app = self._set_script_name(app, app_url) 

234 

235 return app 

236 

237 def _accept_strategy(self, host, port, environ, supported_content_types): 

238 """Check Accept header for best matching MIME type and API version.""" 

239 accept = Accept(environ.get('HTTP_ACCEPT', '')) 

240 

241 app = None 

242 

243 # Find the best match in the Accept header 

244 mime_type, params = accept.best_match(supported_content_types) 

245 if 'version' in params: 

246 app, app_url = self._match(host, port, '/v' + params['version']) 

247 if app: 247 ↛ 250line 247 didn't jump to line 250 because the condition on line 247 was always true

248 app = self._set_script_name(app, app_url) 

249 

250 return mime_type, app 

251 

252 def __call__(self, environ, start_response): 

253 host = environ.get('HTTP_HOST', environ.get('SERVER_NAME')).lower() 

254 if ':' in host: 

255 host, port = host.split(':', 1) 

256 else: 

257 if environ['wsgi.url_scheme'] == 'http': 257 ↛ 260line 257 didn't jump to line 260 because the condition on line 257 was always true

258 port = '80' 

259 else: 

260 port = '443' 

261 

262 path_info = environ['PATH_INFO'] 

263 path_info = self.normalize_url(path_info, False)[1] 

264 

265 # The MIME type for the response is determined in one of two ways: 

266 # 1) URL path suffix (eg /servers/detail.json) 

267 # 2) Accept header (eg application/json;q=0.8, application/xml;q=0.2) 

268 

269 # The API version is determined in one of three ways: 

270 # 1) URL path prefix (eg /v1.1/tenant/servers/detail) 

271 # 2) Content-Type header (eg application/json;version=1.1) 

272 # 3) Accept header (eg application/json;q=0.8;version=1.1) 

273 

274 supported_content_types = list(wsgi.get_supported_content_types()) 

275 

276 mime_type, app, app_url = self._path_strategy(host, port, path_info) 

277 

278 # Accept application/atom+xml for the index query of each API 

279 # version mount point as well as the root index 

280 if (app_url and app_url + '/' == path_info) or path_info == '/': 

281 supported_content_types.append('application/atom+xml') 

282 

283 if not app: 

284 app = self._content_type_strategy(host, port, environ) 

285 

286 if not mime_type or not app: 286 ↛ 294line 286 didn't jump to line 294 because the condition on line 286 was always true

287 possible_mime_type, possible_app = self._accept_strategy( 

288 host, port, environ, supported_content_types) 

289 if possible_mime_type and not mime_type: 

290 mime_type = possible_mime_type 

291 if possible_app and not app: 

292 app = possible_app 

293 

294 if not mime_type: 

295 mime_type = 'application/json' 

296 

297 if not app: 

298 # Didn't match a particular version, probably matches default 

299 app, app_url = self._match(host, port, path_info) 

300 if app: 300 ↛ 303line 300 didn't jump to line 303 because the condition on line 300 was always true

301 app = self._munge_path(app, path_info, app_url) 

302 

303 if app: 303 ↛ 307line 303 didn't jump to line 307 because the condition on line 303 was always true

304 environ['nova.best_content_type'] = mime_type 

305 return app(environ, start_response) 

306 

307 LOG.debug('Could not find application for %s', environ['PATH_INFO']) 

308 environ['paste.urlmap_object'] = self 

309 return self.not_found_application(environ, start_response)