-
Notifications
You must be signed in to change notification settings - Fork 210
/
Copy pathbaseapi.py
294 lines (251 loc) · 9.91 KB
/
baseapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
from __future__ import annotations
import json
import os
import time
import threading
import requests
from requests.adapters import HTTPAdapter
from urllib3 import Retry
from google.oauth2 import service_account
import google.auth.transport.requests
from pyfcm.errors import (
AuthenticationError,
InvalidDataError,
FCMError,
FCMServerError,
FCMNotRegisteredError,
)
# Migration to v1 - https://firebase.google.com/docs/cloud-messaging/migrate-v1
class BaseAPI(object):
FCM_END_POINT = "https://fcm.googleapis.com/v1/projects"
def __init__(
self,
service_account_file: str | None,
project_id: str,
credentials=None,
proxy_dict=None,
env=None,
json_encoder=None,
adapter=None,
):
"""
Override existing init function to give ability to use v1 endpoints of Firebase Cloud Messaging API
Attributes:
service_account_file (str): path to service account JSON file
project_id (str): project ID of Google account
credentials (Credentials): Google oauth2 credentials instance, such as ADC
proxy_dict (dict): proxy settings dictionary, use proxy (keys: `http`, `https`)
env (dict): environment settings dictionary, for example "app_engine"
json_encoder (BaseJSONEncoder): JSON encoder
adapter (BaseAdapter): adapter instance
"""
self.service_account_file = service_account_file
self.project_id = project_id
self.FCM_END_POINT = self.FCM_END_POINT + f"/{self.project_id}/messages:send"
self.FCM_REQ_PROXIES = None
self.custom_adapter = adapter
self.thread_local = threading.local()
self.credentials = credentials
if not service_account_file and not credentials:
raise AuthenticationError(
"Please provide a service account file path or credentials in the constructor"
)
if (
proxy_dict
and isinstance(proxy_dict, dict)
and (("http" in proxy_dict) or ("https" in proxy_dict))
):
self.FCM_REQ_PROXIES = proxy_dict
self.requests_session.proxies.update(proxy_dict)
if env == "app_engine":
try:
from requests_toolbelt.adapters import appengine
appengine.monkeypatch()
except ModuleNotFoundError:
pass
self.json_encoder = json_encoder
@property
def requests_session(self):
if getattr(self.thread_local, "requests_session", None) is None:
retries = Retry(
backoff_factor=1,
status_forcelist=[502, 503],
allowed_methods=(Retry.DEFAULT_ALLOWED_METHODS | frozenset(["POST"])),
)
adapter = self.custom_adapter or HTTPAdapter(max_retries=retries)
self.thread_local.requests_session = requests.Session()
self.thread_local.requests_session.mount("http://", adapter)
self.thread_local.requests_session.mount("https://", adapter)
self.thread_local.token_expiry = 0
current_timestamp = time.time()
if self.thread_local.token_expiry < current_timestamp:
self.thread_local.requests_session.headers.update(self.request_headers())
self.thread_local.token_expiry = current_timestamp + 1800
return self.thread_local.requests_session
def send_request(self, payload=None, timeout=None):
response = self.requests_session.post(
self.FCM_END_POINT, data=payload, timeout=timeout
)
if (
"Retry-After" in response.headers
and int(response.headers["Retry-After"]) > 0
):
sleep_time = int(response.headers["Retry-After"])
time.sleep(sleep_time)
return self.send_request(payload, timeout)
return response
def send_async_request(self, params_list, timeout):
import asyncio
from .async_fcm import fetch_tasks
payloads = [self.parse_payload(**params) for params in params_list]
responses = asyncio.new_event_loop().run_until_complete(
fetch_tasks(
end_point=self.FCM_END_POINT,
headers=self.request_headers(),
payloads=payloads,
timeout=timeout,
)
)
return responses
def _get_access_token(self):
"""
Generates access token from credentials.
If token expires then new access token is generated.
Returns:
str: Access token
"""
# get OAuth 2.0 access token
try:
if self.service_account_file:
credentials = service_account.Credentials.from_service_account_file(
self.service_account_file,
scopes=["https://www.googleapis.com/auth/firebase.messaging"],
)
else:
credentials = self.credentials
request = google.auth.transport.requests.Request()
credentials.refresh(request)
return credentials.token
except Exception as e:
raise InvalidDataError(e)
def request_headers(self):
"""
Generates request headers including Content-Type and Authorization of Bearer token
Returns:
dict: request headers
"""
return {
"Content-Type": "application/json",
"Authorization": "Bearer " + self._get_access_token(),
}
def json_dumps(self, data):
"""
Standardized json.dumps function with separators and sorted keys set
Args:
data (dict or list): data to be dumped
Returns:
string: json
"""
return json.dumps(
data,
separators=(",", ":"),
sort_keys=True,
cls=self.json_encoder,
ensure_ascii=False,
).encode("utf8")
def parse_response(self, response):
"""
Parses the json response sent back by the server and tries to get out the important return variables
Returns:
dict: name (str) - uThe identifier of the message sent, in the format of projects/*/messages/{message_id}
Raises:
FCMServerError: FCM is temporary not available
AuthenticationError: error authenticating the sender account
InvalidDataError: data passed to FCM was incorrecly structured
"""
if response.status_code == 200:
if (
"content-length" in response.headers
and int(response.headers["content-length"]) <= 0
):
raise FCMServerError(
"FCM server connection error, the response is empty"
)
else:
return response.json()
elif response.status_code == 401:
raise AuthenticationError(
"There was an error authenticating the sender account"
)
elif response.status_code == 400:
raise InvalidDataError(response.text)
elif response.status_code == 404:
raise FCMNotRegisteredError("Token not registered")
else:
raise FCMServerError(
f"FCM server error: Unexpected status code {response.status_code}. The server might be temporarily unavailable."
)
def parse_payload(
self,
fcm_token=None,
notification_title=None,
notification_body=None,
notification_image=None,
data_payload=None,
topic_name=None,
topic_condition=None,
android_config=None,
apns_config=None,
webpush_config=None,
fcm_options=None,
dry_run=False,
):
"""
:rtype: json
"""
fcm_payload = dict()
if fcm_token:
fcm_payload["token"] = fcm_token
if topic_name:
fcm_payload["topic"] = topic_name
if topic_condition:
fcm_payload["condition"] = topic_condition
if data_payload:
if isinstance(data_payload, dict):
fcm_payload["data"] = data_payload
else:
raise InvalidDataError("Provided data_payload is in the wrong format")
if android_config:
if isinstance(android_config, dict):
fcm_payload["android"] = android_config
else:
raise InvalidDataError("Provided android_config is in the wrong format")
if webpush_config:
if isinstance(webpush_config, dict):
fcm_payload["webpush"] = webpush_config
else:
raise InvalidDataError("Provided webpush_config is in the wrong format")
if apns_config:
if isinstance(apns_config, dict):
fcm_payload["apns"] = apns_config
else:
raise InvalidDataError("Provided apns_config is in the wrong format")
if fcm_options:
if isinstance(fcm_options, dict):
fcm_payload["fcm_options"] = fcm_options
else:
raise InvalidDataError("Provided fcm_options is in the wrong format")
fcm_payload["notification"] = (
{}
) # - https://firebase.google.com/docs/reference/fcm/rest/v1/projects.messages#notification
# If title is present, use it
if notification_title:
fcm_payload["notification"]["title"] = notification_title
if notification_body:
fcm_payload["notification"]["body"] = notification_body
if notification_image:
fcm_payload["notification"]["image"] = notification_image
# Do this if you only want to send a data message.
if data_payload and (not notification_title and not notification_body):
del fcm_payload["notification"]
return self.json_dumps({"message": fcm_payload, "validate_only": dry_run})