-
Notifications
You must be signed in to change notification settings - Fork 88
/
oauth.py
265 lines (226 loc) · 8.88 KB
/
oauth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
import base64
import json
import secrets
import time
from urllib.parse import urlparse, urlunparse
from django.contrib.auth.mixins import LoginRequiredMixin
from django.http import (
HttpResponse,
HttpResponseForbidden,
HttpResponseRedirect,
JsonResponse,
)
from django.shortcuts import render
from django.utils import timezone
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import View
from api.models import Application, Authorization, Token
class OauthRedirect(HttpResponseRedirect):
def __init__(self, redirect_uri, **kwargs):
url_parts = urlparse(redirect_uri)
self.allowed_schemes = [url_parts.scheme]
# Either add or join the query section
url_parts = list(url_parts)
query_string = url_parts[4]
for key, value in kwargs.items():
if value is None:
continue
if not query_string:
query_string = f"{key}={value}"
else:
query_string += f"&{key}={value}"
url_parts[4] = query_string
super().__init__(urlunparse(url_parts))
def get_json_and_formdata(request):
# Did they submit JSON?
if request.content_type == "application/json" and request.body.strip():
return json.loads(request.body)
# Fall back to form data
value = {}
for key, item in request.POST.items():
value[key] = item
for key, item in request.GET.items():
value[key] = item
return value
class AuthorizationView(LoginRequiredMixin, View):
"""
Asks the user to authorize access.
Could maybe be a FormView, but things are weird enough we just handle the
POST manually.
"""
def get(self, request):
redirect_uri = self.request.GET["redirect_uri"]
scope = self.request.GET.get("scope", "read")
state = self.request.GET.get("state")
response_type = self.request.GET.get("response_type")
if response_type != "code":
return render(
request,
"api/oauth_error.html",
{"error": f"Invalid response type '{response_type}'"},
status=400,
)
application = Application.objects.filter(
client_id=self.request.GET.get("client_id"),
).first()
if application is None:
return render(
request,
"api/oauth_error.html",
{"error": "Invalid client_id"},
status=400,
)
if application.redirect_uris and redirect_uri not in application.redirect_uris:
return render(
request,
"api/oauth_error.html",
{"error": "Invalid application redirect URI"},
status=401,
)
context = {
"application": application,
"state": state,
"redirect_uri": redirect_uri,
"scope": scope,
"identities": self.request.user.identities.all(),
}
return render(request, "api/oauth_authorize.html", context)
def post(self, request):
post_data = get_json_and_formdata(request)
# Grab the application and other details again
redirect_uri = post_data["redirect_uri"]
scope = post_data["scope"]
application = Application.objects.get(client_id=post_data["client_id"])
# Get the identity
identity = self.request.user.identities.get(pk=post_data["identity"])
extra_args = {}
if post_data.get("state"):
extra_args["state"] = post_data["state"]
# Make a token
token = Authorization.objects.create(
application=application,
user=self.request.user,
identity=identity,
code=secrets.token_urlsafe(43),
redirect_uri=redirect_uri,
scopes=scope.split(),
)
# If it's an out of band request, show the code
if redirect_uri == "urn:ietf:wg:oauth:2.0:oob":
return render(request, "api/oauth_code.html", {"code": token.code})
# Redirect with the token's code
return OauthRedirect(redirect_uri, code=token.code, **extra_args)
def extract_client_info_from_basic_auth(request):
if "authorization" in request.headers:
auth = request.headers["authorization"].split()
if len(auth) == 2:
if auth[0].lower() == "basic":
client_id, client_secret = (
base64.b64decode(auth[1]).decode("utf8").split(":", 1)
)
return client_id, client_secret
return None, None
@method_decorator(csrf_exempt, name="dispatch")
class TokenView(View):
def verify_code(
self, authorization: Authorization, client_id, client_secret, redirect_uri
):
application = authorization.application
return (
application.client_id == client_id
and application.client_secret == client_secret
and authorization.redirect_uri == redirect_uri
)
def post(self, request):
post_data = get_json_and_formdata(request)
auth_client_id, auth_client_secret = extract_client_info_from_basic_auth(
request
)
post_data.setdefault("client_id", auth_client_id)
post_data.setdefault("client_secret", auth_client_secret)
grant_type = post_data.get("grant_type")
if grant_type not in (
"authorization_code",
"client_credentials",
):
return JsonResponse({"error": "invalid_grant_type"}, status=400)
if grant_type == "client_credentials":
# We don't support individual client credential tokens, but instead
# just have a fixed one (since anyone can register an app at any
# time anyway)
return JsonResponse(
{
"access_token": "__app__",
"token_type": "Bearer",
"scope": "read",
"created_at": int(time.time()),
}
)
elif grant_type == "authorization_code":
code = post_data.get("code")
if not code:
return JsonResponse(
{
"error": "invalid_request",
"error_description": "Required param : code",
},
status=400,
)
authorization = Authorization.objects.get(code=code)
if (
not authorization
or timezone.now() - authorization.created
> timezone.timedelta(seconds=authorization.valid_for_seconds)
):
return JsonResponse({"error": "access_denied"}, status=401)
application = Application.objects.filter(
client_id=post_data["client_id"],
client_secret=post_data["client_secret"],
).first()
code_verified = self.verify_code(
authorization,
client_id=post_data.get("client_id"),
client_secret=post_data.get("client_secret"),
redirect_uri=post_data.get("redirect_uri"),
)
if not application or authorization.token or not code_verified:
# this authorization code has already been used
return JsonResponse({"error": "access_denied"}, status=401)
token = Token.objects.create(
application=application,
user=authorization.user,
identity=authorization.identity,
token=secrets.token_urlsafe(43),
scopes=authorization.scopes,
)
token.save()
# Return them the token
return JsonResponse(
{
"access_token": token.token,
"token_type": "Bearer",
"scope": " ".join(token.scopes),
"created_at": int(token.created.timestamp()),
}
)
@method_decorator(csrf_exempt, name="dispatch")
class RevokeTokenView(View):
def post(self, request):
post_data = get_json_and_formdata(request)
auth_client_id, auth_client_secret = extract_client_info_from_basic_auth(
request
)
post_data.setdefault("client_id", auth_client_id)
post_data.setdefault("client_secret", auth_client_secret)
token_str = post_data["token"]
application = Application.objects.filter(
client_id=post_data["client_id"],
client_secret=post_data["client_secret"],
).first()
token = Token.objects.filter(application=application, token=token_str).first()
if token is None:
return HttpResponseForbidden()
token.revoked = timezone.now()
token.save()
return HttpResponse("")