-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRipeRisStreamer.py
102 lines (76 loc) · 3.15 KB
/
RipeRisStreamer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/python3
import asyncio
import websockets
import json
import ssl
import logging
import os
from importlib import import_module
class RipeRisStreamer:
"""A class for streaming updates from the RIPE RIS Live project"""
def __init__(self, options):
self._options = options
self._ws = None
self._sslcontext = ssl.create_default_context()
self._sslcontext.check_hostname = False
self._sslcontext.verify_mode = ssl.CERT_NONE
self._report = self._send_message_screen
self._format = self._format_none
if(self._options.output_plugin):
imported_module = import_module(self._options.output_plugin)
plugin = imported_module.Plugin(self, options)
self._report = plugin.send_message
self._format = plugin.format
def _format_none(self, msg):
return(msg)
def _send_message_screen(self, msg):
print(msg)
def disconnect(self):
self._ws.close()
return(True)
def start_streaming(self):
async def _start_streaming(uri):
logging.debug("Going to create websocket connection...")
async with websockets.connect(uri, ssl=self._sslcontext) as websocket:
self._ws = websocket
logging.debug("Sending RIS parameters...")
await websocket.send(self._get_ris_params())
logging.debug("Going to start the reception loop...")
async for message in websocket:
try:
self._report(self._format(message))
except Exception as e:
print(str(e))
pass
asyncio.get_event_loop().run_until_complete(_start_streaming('wss://ris-live.ripe.net/v1/ws/?client=RipeRisStreamer'))
def _get_ris_params(self):
#params = {
# "moreSpecific": True,
# "host": "rrc21",
# "socketOptions": {
# "includeRaw": True
# }
#}
params = {}
if self._options.include_raw:
params['socketOptions']['socketOptions'] = {'includeRaw': True}
if self._options.filter_host:
params['host'] = self._options.filter_host
if self._options.filter_type:
params['type'] = self._options.filter_type
if self._options.filter_key:
params['require'] = self._options.filter_key
if self._options.filter_peer:
params['peer'] = self._options.filter_peer
if self._options.filter_aspath:
params['path'] = self._options.filter_aspath
if self._options.filter_prefix:
params['prefix'] = self._options.filter_prefix
if self._options.match_more_specific:
params['moreSpecific'] = self._options.match_more_specific
if self._options.match_less_specific:
params['lessSpecific'] = self._options.match_less_specific
return(json.dumps({
"type": "ris_subscribe",
"data": params
}))