This repository has been archived by the owner on Nov 10, 2022. It is now read-only.
forked from CityofSantaMonica/mds-provider
-
Notifications
You must be signed in to change notification settings - Fork 0
/
auth.py
162 lines (133 loc) · 4.55 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""
Authentication module for MDS API calls.
"""
import requests
class AuthorizationToken():
"""
Represents an authenticated session via an Authorization token header.
To implement a new token-based auth type, create a subclass of AuthorizationToken and implement:
__init__(self, provider)
Initialize self.session.
@classmethod
can_auth(cls, provider): bool
Return True if the auth type can be used on the provider.
See OAuthClientCredentialsAuth for an example implementation.
"""
def __init__(self, provider):
"""
Establishes a session for the provider and includes the Authorization token header.
"""
session = requests.Session()
session.headers.update({ "Authorization": f"{provider.auth_type} {provider.token}" })
headers = getattr(provider, "headers", None)
if headers:
session.headers.update(headers)
self.session = session
@classmethod
def can_auth(cls, provider):
"""
Returns True if this auth type can be used for the provider.
"""
return all([
hasattr(provider, "auth_type"),
hasattr(provider, "token"),
not hasattr(provider, "token_url")
])
class OAuthClientCredentials(AuthorizationToken):
"""
Represents an authenticated session via OAuth 2.0 client_credentials grant flow.
"""
def __init__(self, provider):
"""
Acquires a Bearer token before establishing a session with the provider.
"""
payload = {
"client_id": provider.client_id,
"client_secret": provider.client_secret,
"grant_type": "client_credentials",
"scope": provider.scope.split(",")
}
r = requests.post(provider.token_url, data=payload)
provider.token = r.json()["access_token"]
AuthorizationToken.__init__(self, provider)
@classmethod
def can_auth(cls, provider):
"""
Returns True if this auth type can be used for the provider.
"""
return all([
hasattr(provider, "client_id"),
hasattr(provider, "client_secret"),
hasattr(provider, "scope")
])
class BoltClientCredentials(AuthorizationToken):
"""
Represents an authenticated session via the Bolt authentication scheme.
Currently, your config needs:
* email
* password
* token_url
"""
def __init__(self, provider):
"""
Acquires the provider token for Bolt before establishing a session.
"""
payload = {
"email": provider.email,
"password": provider.password
}
r = requests.post(provider.token_url, params=payload)
provider.token = r.json()["token"]
AuthorizationToken.__init__(self, provider)
@classmethod
def can_auth(cls, provider):
"""
Returns True if this auth type can be used for the provider.
"""
return all([
provider.provider_name.lower() == "bolt",
hasattr(provider, "email"),
hasattr(provider, "password"),
hasattr(provider, "token_url")
])
class SpinClientCredentials(AuthorizationToken):
"""
Represents an authenticated session via the Spin authentication scheme, documented at:
https://web.spin.pm/datafeeds
Currently, your config needs:
* email
* password
* token_url (try https://web.spin.pm/api/v1/auth_tokens)
"""
def __init__(self, provider):
"""
Acquires the bearer token for Spin before establishing a session.
"""
payload = {
"email": provider.email,
"password": provider.password,
"grant_type": "api"
}
r = requests.post(provider.token_url, params=payload)
provider.token = r.json()["jwt"]
AuthorizationToken.__init__(self, provider)
@classmethod
def can_auth(cls, provider):
"""
Returns True if this auth type can be used for the provider.
"""
return all([
provider.provider_name.lower() == "spin",
hasattr(provider, "email"),
hasattr(provider, "password"),
hasattr(provider, "token_url")
])
def auth_types():
"""
Return a list of all supported authentication types.
"""
def all_subs(cls):
return set(cls.__subclasses__()).union(
[s for c in cls.__subclasses__() for s in all_subs(c)]
).union([cls])
return all_subs(AuthorizationToken)