-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream.py
36 lines (25 loc) · 1.01 KB
/
stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from celery import Celery
from pubnub.callbacks import SubscribeCallback
from pubnub.models.consumer.pubsub import PNMessageResult
from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub import PubNub
import os
class MySubscribeCallback(SubscribeCallback):
def status(self, pubnub, status):
pass
def presence(self, pubnub, presence):
pass
def message(self, pubnub, pn_message_result: PNMessageResult):
app.send_task('message_location.fetch_message_location', kwargs={'message': pn_message_result.message})
app = Celery("stream")
app.conf.task_routes = {
'http_interface.*': {'queue': 'http_queue'},
'stream_input.*': {'queue': 'stream_queue'},
'message_location.*': {'queue': 'message_location_queue'},
}
pnconfig = PNConfiguration()
pnconfig.ssl = False
pnconfig.subscribe_key = os.environ.get("PUBNUB_SUBSCRIBE_KEY")
pubnub = PubNub(pnconfig)
pubnub.add_listener(MySubscribeCallback())
pubnub.subscribe().channels(os.environ.get("PUBNUB_CHANNEL")).execute()