-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathauthorization.py
109 lines (96 loc) · 4.33 KB
/
authorization.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# Copyright (c) Sebastian Scholz
# See LICENSE for details.
""" Provides methods to authorize requests. """
from functools import wraps
try:
from urlparse import urlparse, parse_qs
except ImportError:
# noinspection PyUnresolvedReferences
from urllib.parse import urlparse, parse_qs
from twisted.web.server import NOT_DONE_YET
from txoauth2.errors import MissingTokenError, InvalidTokenRequestError, InsecureConnectionError, \
InsufficientScopeRequestError, MultipleTokensError, OAuth2Error
from txoauth2.token import TokenResource
def _getToken(request):
"""
Helper method to get a token from a request, if it contains any.
:raises ValueError: If more than one token was found in the request.
:param request: The request.
:return: A token that was send with the request or None.
"""
token = None
authHeader = request.getHeader(b'Authorization')
if authHeader is not None and authHeader.startswith(b'Bearer '):
token = authHeader[7:]
if b'access_token' in request.args:
if request.method == b'POST' and \
request.getHeader(b'Content-Type') == b'application/x-www-form-urlencoded':
accessTokenArg = request.args[b'access_token']
else:
query = parse_qs(urlparse(request.uri).query)
accessTokenArg = query.get(b'access_token')
if accessTokenArg is not None:
request.setHeader(b'Cache-Control', b'private')
if accessTokenArg is not None:
if token is not None or len(accessTokenArg) != 1:
raise ValueError('Found multiple tokens in the request')
token = accessTokenArg[0]
return token
def isAuthorized(request, scope, allowInsecureRequestDebug=False):
"""
Returns True if the token in the request grants access to the given
scope. The token is validated via the authTokenStorage singleton
given to the TokenResource instance. If the token is invalid,
does not grant access to the scope or was not send via a secure
protocol, False is returned, an error is written to the request
and the request is closed.
You can not write to the request if this function returned False!
:param request: The request.
:param scope: The scope or list of scopes the token must grant access to.
:param allowInsecureRequestDebug: Allow requests to originate from
insecure connections. Only use for local testing!
:return: True, if the request is authorized, False otherwise.
"""
try:
scope = scope if isinstance(scope, list) else [scope]
if not (allowInsecureRequestDebug or request.isSecure()):
raise InsecureConnectionError()
try:
requestToken = _getToken(request)
except ValueError:
raise MultipleTokensError(scope)
if requestToken is None:
raise MissingTokenError(scope)
try:
requestToken = requestToken.decode('utf-8')
except UnicodeDecodeError:
raise InvalidTokenRequestError(scope)
tokenStorage = TokenResource.getTokenStorageSingleton()
if not tokenStorage.contains(requestToken):
raise InvalidTokenRequestError(scope)
if tokenStorage.hasAccess(requestToken, scope):
return True
raise InsufficientScopeRequestError(scope)
except OAuth2Error as error:
request.write(error.generate(request))
request.finish()
return False
def oauth2(scope, allowInsecureRequestDebug=False):
"""
Function decorator that checks the first argument, which must
be a request object, with isAuthorized.
If the request is authorized, the function is called,
otherwise the request is closed and NOT_DONE_YET is returned.
:param scope: The scope or list of scopes the token must grant access to.
:param allowInsecureRequestDebug: Allow requests to originate from
insecure connections. Only use for local testing!
:return: The wrapped function.
"""
def decorator(func): # pylint: disable=missing-docstring
@wraps(func)
def wrapper(self, request, *args, **kwargs): # pylint: disable=missing-docstring
if not isAuthorized(request, scope, allowInsecureRequestDebug):
return NOT_DONE_YET
return func(self, request, *args, **kwargs)
return wrapper
return decorator