Coverage for nova/api/openstack/urlmap.py: 88%
167 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-17 15:08 +0000
1# Copyright 2011 OpenStack Foundation
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import re
17from urllib import request as urllib2
19from oslo_log import log as logging
20import paste.urlmap
22from nova.api.openstack import wsgi
25LOG = logging.getLogger(__name__)
28_quoted_string_re = r'"[^"\\]*(?:\\.[^"\\]*)*"'
29_option_header_piece_re = re.compile(r';\s*([^\s;=]+|%s)\s*'
30 r'(?:=\s*([^;]+|%s))?\s*' %
31 (_quoted_string_re, _quoted_string_re))
34def unquote_header_value(value):
35 """Unquotes a header value.
36 This does not use the real unquoting but what browsers are actually
37 using for quoting.
39 :param value: the header value to unquote.
40 """
41 if value and value[0] == value[-1] == '"': 41 ↛ 46line 41 didn't jump to line 46 because the condition on line 41 was never true
42 # this is not the real unquoting, but fixing this so that the
43 # RFC is met will result in bugs with internet explorer and
44 # probably some other browsers as well. IE for example is
45 # uploading files with "C:\foo\bar.txt" as filename
46 value = value[1:-1]
47 return value
50def parse_list_header(value):
51 """Parse lists as described by RFC 2068 Section 2.
53 In particular, parse comma-separated lists where the elements of
54 the list may include quoted-strings. A quoted-string could
55 contain a comma. A non-quoted string could have quotes in the
56 middle. Quotes are removed automatically after parsing.
58 The return value is a standard :class:`list`:
60 >>> parse_list_header('token, "quoted value"')
61 ['token', 'quoted value']
63 :param value: a string with a list header.
64 :return: :class:`list`
65 """
66 result = []
67 for item in urllib2.parse_http_list(value):
68 if item[:1] == item[-1:] == '"': 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true
69 item = unquote_header_value(item[1:-1])
70 result.append(item)
71 return result
74def parse_options_header(value):
75 """Parse a ``Content-Type`` like header into a tuple with the content
76 type and the options:
78 >>> parse_options_header('Content-Type: text/html; mimetype=text/html')
79 ('Content-Type:', {'mimetype': 'text/html'})
81 :param value: the header to parse.
82 :return: (str, options)
83 """
84 def _tokenize(string):
85 for match in _option_header_piece_re.finditer(string):
86 key, value = match.groups()
87 key = unquote_header_value(key)
88 if value is not None:
89 value = unquote_header_value(value)
90 yield key, value
92 if not value:
93 return '', {}
95 parts = _tokenize(';' + value)
96 name = next(parts)[0]
97 extra = dict(parts)
98 return name, extra
101class Accept(object):
102 def __init__(self, value):
103 self._content_types = [parse_options_header(v) for v in
104 parse_list_header(value)]
106 def best_match(self, supported_content_types):
107 # FIXME: Should we have a more sophisticated matching algorithm that
108 # takes into account the version as well?
109 best_quality = -1
110 best_content_type = None
111 best_params = {}
112 best_match = '*/*'
114 for content_type in supported_content_types:
115 for content_mask, params in self._content_types:
116 try:
117 quality = float(params.get('q', 1))
118 except ValueError:
119 continue
121 if quality < best_quality:
122 continue
123 elif best_quality == quality:
124 if best_match.count('*') <= content_mask.count('*'): 124 ↛ 127line 124 didn't jump to line 127 because the condition on line 124 was always true
125 continue
127 if self._match_mask(content_mask, content_type):
128 best_quality = quality
129 best_content_type = content_type
130 best_params = params
131 best_match = content_mask
133 return best_content_type, best_params
135 def _match_mask(self, mask, content_type):
136 if '*' not in mask:
137 return content_type == mask
138 if mask == '*/*': 138 ↛ 140line 138 didn't jump to line 140 because the condition on line 138 was always true
139 return True
140 mask_major = mask[:-2]
141 content_type_major = content_type.split('/', 1)[0]
142 return content_type_major == mask_major
145def urlmap_factory(loader, global_conf, **local_conf):
146 if 'not_found_app' in local_conf: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 not_found_app = local_conf.pop('not_found_app')
148 else:
149 not_found_app = global_conf.get('not_found_app')
150 if not_found_app: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 not_found_app = loader.get_app(not_found_app, global_conf=global_conf)
152 urlmap = URLMap(not_found_app=not_found_app)
153 for path, app_name in local_conf.items():
154 path = paste.urlmap.parse_path_expression(path)
155 app = loader.get_app(app_name, global_conf=global_conf)
156 urlmap[path] = app
157 return urlmap
160class URLMap(paste.urlmap.URLMap):
161 def _match(self, host, port, path_info):
162 """Find longest match for a given URL path."""
163 for (domain, app_url), app in self.applications: 163 ↛ 189line 163 didn't jump to line 189 because the loop on line 163 didn't complete
164 if domain and domain != host and domain != host + ':' + port: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 continue
166 # Rudimentary "wildcard" support:
167 # By declaring a urlmap path ending in '/+', you're saying the
168 # incoming path must start with everything up to and including the
169 # '/' *and* have something after that as well. For example, path
170 # /foo/bar/+ will match /foo/bar/baz, but not /foo/bar/ or /foo/bar
171 # NOTE(efried): This assumes we'll never need a path URI component
172 # that legitimately starts with '+'. (We could use a
173 # more obscure character/sequence here in that case.)
174 if app_url.endswith('/+'):
175 # Must be requesting at least the path element (including /)
176 if not path_info.startswith(app_url[:-1]):
177 continue
178 # ...but also must be requesting something after that /
179 if len(path_info) < len(app_url):
180 continue
181 # Trim the /+ off the app_url to make it look "normal" for e.g.
182 # proper splitting of SCRIPT_NAME and PATH_INFO.
183 return app, app_url[:-2]
184 # Normal (non-wildcarded) prefix match
185 if (path_info == app_url or
186 path_info.startswith(app_url + '/')):
187 return app, app_url
189 return None, None
191 def _set_script_name(self, app, app_url):
192 def wrap(environ, start_response):
193 environ['SCRIPT_NAME'] += app_url
194 return app(environ, start_response)
196 return wrap
198 def _munge_path(self, app, path_info, app_url):
199 def wrap(environ, start_response):
200 environ['SCRIPT_NAME'] += app_url
201 environ['PATH_INFO'] = path_info[len(app_url):]
202 return app(environ, start_response)
204 return wrap
206 def _path_strategy(self, host, port, path_info):
207 """Check path suffix for MIME type and path prefix for API version."""
208 mime_type = app = app_url = None
210 parts = path_info.rsplit('.', 1)
211 if len(parts) > 1:
212 possible_type = 'application/' + parts[1]
213 if possible_type in wsgi.get_supported_content_types(): 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true
214 mime_type = possible_type
216 parts = path_info.split('/')
217 if len(parts) > 1:
218 possible_app, possible_app_url = self._match(host, port, path_info)
219 # Don't use prefix if it ends up matching default
220 if possible_app and possible_app_url:
221 app_url = possible_app_url
222 app = self._munge_path(possible_app, path_info, app_url)
224 return mime_type, app, app_url
226 def _content_type_strategy(self, host, port, environ):
227 """Check Content-Type header for API version."""
228 app = None
229 params = parse_options_header(environ.get('CONTENT_TYPE', ''))[1]
230 if 'version' in params:
231 app, app_url = self._match(host, port, '/v' + params['version'])
232 if app: 232 ↛ 235line 232 didn't jump to line 235 because the condition on line 232 was always true
233 app = self._set_script_name(app, app_url)
235 return app
237 def _accept_strategy(self, host, port, environ, supported_content_types):
238 """Check Accept header for best matching MIME type and API version."""
239 accept = Accept(environ.get('HTTP_ACCEPT', ''))
241 app = None
243 # Find the best match in the Accept header
244 mime_type, params = accept.best_match(supported_content_types)
245 if 'version' in params:
246 app, app_url = self._match(host, port, '/v' + params['version'])
247 if app: 247 ↛ 250line 247 didn't jump to line 250 because the condition on line 247 was always true
248 app = self._set_script_name(app, app_url)
250 return mime_type, app
252 def __call__(self, environ, start_response):
253 host = environ.get('HTTP_HOST', environ.get('SERVER_NAME')).lower()
254 if ':' in host:
255 host, port = host.split(':', 1)
256 else:
257 if environ['wsgi.url_scheme'] == 'http': 257 ↛ 260line 257 didn't jump to line 260 because the condition on line 257 was always true
258 port = '80'
259 else:
260 port = '443'
262 path_info = environ['PATH_INFO']
263 path_info = self.normalize_url(path_info, False)[1]
265 # The MIME type for the response is determined in one of two ways:
266 # 1) URL path suffix (eg /servers/detail.json)
267 # 2) Accept header (eg application/json;q=0.8, application/xml;q=0.2)
269 # The API version is determined in one of three ways:
270 # 1) URL path prefix (eg /v1.1/tenant/servers/detail)
271 # 2) Content-Type header (eg application/json;version=1.1)
272 # 3) Accept header (eg application/json;q=0.8;version=1.1)
274 supported_content_types = list(wsgi.get_supported_content_types())
276 mime_type, app, app_url = self._path_strategy(host, port, path_info)
278 # Accept application/atom+xml for the index query of each API
279 # version mount point as well as the root index
280 if (app_url and app_url + '/' == path_info) or path_info == '/':
281 supported_content_types.append('application/atom+xml')
283 if not app:
284 app = self._content_type_strategy(host, port, environ)
286 if not mime_type or not app: 286 ↛ 294line 286 didn't jump to line 294 because the condition on line 286 was always true
287 possible_mime_type, possible_app = self._accept_strategy(
288 host, port, environ, supported_content_types)
289 if possible_mime_type and not mime_type:
290 mime_type = possible_mime_type
291 if possible_app and not app:
292 app = possible_app
294 if not mime_type:
295 mime_type = 'application/json'
297 if not app:
298 # Didn't match a particular version, probably matches default
299 app, app_url = self._match(host, port, path_info)
300 if app: 300 ↛ 303line 300 didn't jump to line 303 because the condition on line 300 was always true
301 app = self._munge_path(app, path_info, app_url)
303 if app: 303 ↛ 307line 303 didn't jump to line 307 because the condition on line 303 was always true
304 environ['nova.best_content_type'] = mime_type
305 return app(environ, start_response)
307 LOG.debug('Could not find application for %s', environ['PATH_INFO'])
308 environ['paste.urlmap_object'] = self
309 return self.not_found_application(environ, start_response)