-
Notifications
You must be signed in to change notification settings - Fork 9
/
sip2mqtt.py
216 lines (175 loc) · 9.5 KB
/
sip2mqtt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import sys
import os as os
import signal
import logging
import threading
import time
import re
import argparse
import json
import pjsua as pj
import paho.mqtt.client as mqtt
global args
def extract_caller_id(url):
m = re.match(r"\"(.*)\".*:(.*)@", url)
return m.group(1) + " " + phone_format(m.group(2))
def phone_format(phone_number):
clean_phone_number = re.sub('[^0-9]+', '', phone_number)
formatted_phone_number = ''
try:
formatted_phone_number = re.sub("(\d)(?=(\d{3})+(?!\d))", r"\1-", "%d" % int(clean_phone_number[:-1])) + clean_phone_number[-1]
except:
logging.warn( "Warning: unable to format " + clean_phone_number )
formatted_phone_number = clean_phone_number
return formatted_phone_number
def signal_handler(signal, frame):
logging.info( 'Exiting...' )
logging.info( '-- Unregistering --' )
time.sleep(2)
logging.info( '-- Destroying Libraries --' )
time.sleep(2)
lib.destroy()
sys.exit(0)
# Method to print Log of callback class
def log_cb(level, str, len):
logging.debug("SIP debug: " + str),
# Callback for an established MQTT broker connection
def mqtt_connect(broker, userdata, flags, rc):
logging.info("MQTT: Connected with the broker...")
# Callback to receive events from account
class SMAccountCallback(pj.AccountCallback):
global args
def __init__(self, account=None):
pj.AccountCallback.__init__(self, account)
self.args = args
def on_reg_state(self):
logging.info( "SIP: Registration complete, status=" + str(self.account.info().reg_status) + " (" + str(self.account.info().reg_reason) + ")" )
def on_incoming_call(self, call):
# Unless this callback is implemented, the default behavior is to reject the call with default status code.
logging.info( "SIP: Incoming call from " + extract_caller_id( call.info().remote_uri ) )
broker.publish(args.mqtt_topic, payload="{\"verb\": \"incoming\", \"caller\":\"" + extract_caller_id( call.info().remote_uri ) + "\", \"uri\":" + json.dumps(call.info().remote_uri) + "}", qos=0, retain=True)
current_call = call
call_cb = SMCallCallback(current_call)
current_call.set_callback(call_cb)
def on_pager(self, from_uri, contact, mime_type, body):
logging.info( "SIP: Incoming SMS from " + from_uri )
broker.publish(args.mqtt_topic, payload="{\"verb\": \"sms\", \"caller\":\"" + from_uri + "\", \"body\":" + json.dumps(body) + "}", qos=0, retain=True)
class SMCallCallback(pj.CallCallback):
def __init__(self, call=None):
pj.CallCallback.__init__(self, call)
self.args = args
# Notification when call state has changed
def on_state(self):
logging.info( 'SIP: Call state is: ' + self.call.info().state_text )
if self.call.info().state == pj.CallState.CONFIRMED:
logging.info( 'SIP: Current call is answered' )
broker.publish(args.mqtt_topic, payload="{\"verb\": \"answered\", \"caller\":\"" + extract_caller_id( self.call.info().remote_uri ) + "\", \"uri\":" + json.dumps(self.call.info().remote_uri) + "}", qos=0, retain=True)
elif self.call.info().state == pj.CallState.DISCONNECTED:
logging.info( 'SIP: Current call has ended' )
broker.publish(args.mqtt_topic, payload="{\"verb\": \"disconnected\", \"caller\":\"\", \"uri\":\"\"}", qos=0, retain=True)
def main(argv):
global broker
global pj
global lib
global args
app_name="SIP2MQTT"
parser = argparse.ArgumentParser(description='A SIP monitoring tool that publishes incoming calls with CallerID to an MQTT channel')
requiredNamed = parser.add_argument_group('required named arguments')
requiredNamed.add_argument("-a", "--mqtt_domain", type=str, required=True, help="the MQTT broker domain string", default=os.environ.get('MQTT_DOMAIN', None))
requiredNamed.add_argument("-t", "--mqtt_port", type=int, required=True, help="the MQTT broker port number", default=os.environ.get('MQTT_PORT', None))
parser.add_argument( "--mqtt_keepalive", type=int, required=False, help="the MQTT broker keep alive in seconds", default=60)
parser.add_argument( "--mqtt_protocol", type=str, required=False, help="the MQTT broker protocol", default="MQTTv311", choices=['MQTTv31', 'MQTTv311'])
requiredNamed.add_argument("-u", "--mqtt_username", type=str, required=True, help="the MQTT broker username", default=os.environ.get('MQTT_USERNAME', None))
requiredNamed.add_argument("-p", "--mqtt_password", type=str, required=False, help="the MQTT broker password", default=os.environ.get('MQTT_PASSWORD', None))
parser.add_argument( "--mqtt_topic", type=str, required=False, help="the MQTT broker topic", default=os.environ.get('MQTT_TOPIC', "home/sip"))
requiredNamed.add_argument("-d", "--sip_domain", type=str, required=True, help="the SIP domain", default=os.environ.get('SIP_DOMAIN', None))
parser.add_argument( "--sip_port", type=int, required=False, help="the SIP transport port number", default=os.environ.get('SIP_PORT', 5060))
requiredNamed.add_argument("-n", "--sip_username", type=str, required=True, help="the SIP username", default=os.environ.get('SIP_USERNAME', None))
requiredNamed.add_argument("-s", "--sip_password", type=str, required=False, help="the SIP password", default=os.environ.get('SIP_PASSWORD', None))
parser.add_argument( "--sip_display", type=str, required=False, help="the SIP user display name", default=app_name)
parser.add_argument( "--log_level", type=int, required=False, help="the application log level", default=3, choices=[0, 1, 2, 3])
parser.add_argument("-v", "--verbosity", action="count", help="increase output verbosity", default=3)
args = parser.parse_args()
log_level = logging.INFO #Deault logging level
if args.verbosity == 1:
log_level = logging.ERROR
elif args.verbosity == 2:
log_level = logging.WARN
elif args.verbosity == 3:
log_level = logging.INFO
elif args.verbosity >= 4:
log_level = logging.DEBUG
# Configure logging
# logging.basicConfig(filename="sip2mqtt.log", format="%(asctime)s - %(levelname)s - %(message)s",
# datefmt="%m/%d/%Y %I:%M:%S %p", level=log_level)
root = logging.getLogger()
root.setLevel(log_level)
# ch = logging.StreamHandler(sys.stdout)
# ch.setLevel(log_level)
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# ch.setFormatter(formatter)
# root.addHandler(ch)
# A more docker-friendly approach is to output to stdout
logging.basicConfig(stream=sys.stdout, format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p", level=log_level)
# Log startup messages and our configuration parameters
logging.info("------------------------")
logging.info("Starting up...")
logging.info("--- MQTT Broker Configuration ---")
logging.info("Domain: " + args.mqtt_domain)
logging.info("Port: " + str(args.mqtt_port))
logging.info("Protocol: " + args.mqtt_protocol)
logging.info("Username: " + args.mqtt_username)
logging.info("Keepalive Interval: " + str(args.mqtt_keepalive))
logging.info("Status Topic: " + args.mqtt_topic)
logging.info("--- SIP Configuration ---")
logging.info("Domain: " + args.sip_domain)
logging.info("Username: " + args.sip_username)
logging.info("DisplayName: " + args.sip_display)
try:
# Handle mqtt connection and callbacks
broker = mqtt.Client(client_id="", clean_session=True, userdata=None, protocol=eval("mqtt." + args.mqtt_protocol))
broker.username_pw_set(args.mqtt_username, password=args.mqtt_password)
broker.on_connect = mqtt_connect
#broker.on_message = mqtt_message #don't need this callback for now
broker.connect(args.mqtt_domain, args.mqtt_port, args.mqtt_keepalive)
# Create library instance of Lib class
lib = pj.Lib()
ua = pj.UAConfig()
ua.user_agent = app_name
mc = pj.MediaConfig()
mc.clock_rate = 8000
lib.init(ua_cfg = ua, log_cfg = pj.LogConfig(level=args.verbosity, callback=None), media_cfg=mc)
lib.create_transport(pj.TransportType.UDP, pj.TransportConfig(args.sip_port))
lib.set_null_snd_dev()
lib.start()
acc_cfg = pj.AccountConfig()
acc_cfg.id = "sip:" + args.sip_username + "@" + args.sip_domain
acc_cfg.reg_uri = "sip:" + args.sip_domain
acc_cfg.auth_cred = [ pj.AuthCred("*", args.sip_username, args.sip_password) ]
acc_cfg.allow_contact_rewrite = False
acc = lib.create_account(acc_cfg)
acc_cb = SMAccountCallback(acc)
acc.set_callback(acc_cb)
logging.info( "-- Registration Complete --" )
logging.info( 'SIP: Status = ' + str(acc.info().reg_status) + ' (' + acc.info().reg_reason + ')' )
except pj.Error, e:
logging.critical( ("Exception: " + str(e)) )
lib.destroy()
sys.exit(1)
# Main work loop
try:
rc = broker.loop_start()
if rc:
logging.warn( "Warning: " + str(rc) )
signal.signal(signal.SIGINT, signal_handler)
while True:
time.sleep(1)
broker.loop_stop()
except Exception, ex:
logging.critical("Exception: " + str(ex))
lib.destroy()
sys.exit(1)
# Get things started
if __name__ == '__main__':
main(sys.argv[1:])