-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathasterisk.py
53 lines (42 loc) · 1.43 KB
/
asterisk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import asyncio
import os
import requests
from panoramisk import Manager
ASTERISK_IP = os.environ["ASTERISK_IP"]
ASTERISK_PORT = os.environ["ASTERISK_PORT"]
ASTERISK_USER = os.environ["ASTERISK_USER"]
ASTERISK_PASSWORD = os.environ["ASTERISK_PASSWORD"]
ASTERISK_CONNECTOR_ENDPOINT = os.environ["ASTERISK_CONNECTOR_ENDPOINT"]
def fire(message):
requests.post(ASTERISK_CONNECTOR_ENDPOINT, json=dict(message.items()))
def main():
"""Função principal da aplicação."""
manager = Manager(
loop=asyncio.get_event_loop(),
host=ASTERISK_IP,
port=ASTERISK_PORT,
username=ASTERISK_USER,
secret=ASTERISK_PASSWORD,
)
@manager.register_event("QueueCallerLeave")
@manager.register_event("AgentCalled")
@manager.register_event("AgentConnect")
@manager.register_event("AgentComplete")
@manager.register_event("AgentDump")
# @manager.register_event("AgentRingNoAnswer")
@manager.register_event("MessageWaiting")
# @manager.register_event('UserEvent')
async def callback_agent(manager, message):
fire(message)
@manager.register_event("Hangup")
async def callback_hangup(manager, message):
# hangup from queue, external only
if message.context == "ext-queues":
fire(message)
manager.connect()
try:
manager.loop.run_forever()
except KeyboardInterrupt:
manager.loop.close()
if __name__ == "__main__":
main()