-
Notifications
You must be signed in to change notification settings - Fork 12
/
index.py
144 lines (128 loc) · 5.92 KB
/
index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from flask import Flask, request, jsonify
import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
import os
from dotenv import load_dotenv
load_dotenv() # Load environment variables from a .env file
app = Flask(__name__)
COMMANDS = {
'TEXT': 'Simple text message',
'IMAGE': 'Send image',
'DOCUMENT': 'Send document',
'VIDEO': 'Send video',
'CONTACT': 'Send contact',
'PRODUCT': 'Send product',
'GROUP_CREATE': 'Create group',
'GROUP_TEXT': 'Simple text message for the group',
'GROUPS_IDS': "Get the id's of your three groups"
}
FILES = {
'IMAGE': './files/file_example_JPG_100kB.jpg',
'DOCUMENT': './files/file-example_PDF_500_kB.pdf',
'VIDEO': './files/file_example_MP4_480_1_5MG.mp4',
'VCARD': './files/sample-vcard.txt'
}
def send_whapi_request(endpoint, params=None, method='POST'):
headers = {
'Authorization': f"Bearer {os.getenv('TOKEN')}"
}
url = f"{os.getenv('API_URL')}/{endpoint}"
if params:
if 'media' in params:
details = params.pop('media').split(';')
with open(details[0], 'rb') as file:
m = MultipartEncoder(fields={**params, 'media': (details[0], file, details[1])})
headers['Content-Type'] = m.content_type
response = requests.request(method, url, data=m, headers=headers)
elif method == 'GET':
response = requests.get(url, params=params, headers=headers)
else:
headers['Content-Type'] = 'application/json'
response = requests.request(method, url, json=params, headers=headers)
else:
response = requests.request(method, url, headers=headers)
print('Whapi response:', response.json())
return response.json()
def set_hook():
if os.getenv('BOT_URL'):
settings = {
'webhooks': [
{
'url': os.getenv('BOT_URL'),
'events': [
{'type': "messages", 'method': "post"}
],
'mode': "method"
}
]
}
send_whapi_request('settings', settings, 'PATCH')
@app.route('/hook/messages', methods=['POST'])
def handle_new_messages():
try:
messages = request.json.get('messages', [])
endpoint = None
for message in messages:
if message.get('from_me'):
continue
sender = {'to': message.get('chat_id')}
command_input = message.get('text', {}).get('body', '').strip()
command = list(COMMANDS.keys())[int(command_input) - 1] if command_input.isdigit() else None
if command == 'TEXT':
sender['body'] = 'Simple text message'
endpoint = 'messages/text'
elif command == 'IMAGE':
sender['caption'] = 'Text under the photo.'
sender['media'] = FILES['IMAGE'] + ';image/jpeg'
endpoint = 'messages/image'
elif command == 'DOCUMENT':
sender['caption'] = 'Text under the document.'
sender['media'] = FILES['DOCUMENT'] + ';application/pdf'
endpoint = 'messages/document'
elif command == 'VIDEO':
sender['caption'] = 'Text under the video.'
sender['media'] = FILES['VIDEO'] + ';video/mp4'
endpoint = 'messages/video'
elif command == 'CONTACT':
sender['name'] = 'Whapi Test'
with open(FILES['VCARD'], 'r') as vcard_file:
sender['vcard'] = vcard_file.read()
endpoint = 'messages/contact'
elif command == 'PRODUCT':
# Example: You need to replace config.product with an actual product ID
product_id = os.getenv('PRODUCT_ID') # Replace with your product ID
endpoint = f'business/products/{product_id}'
elif command == 'GROUP_CREATE':
# Example: You need to replace config.phone with an actual phone number
participants = [message.get('chat_id').split('@')[0]] # Replace with the phone number
response = send_whapi_request('groups', {'subject': 'Whapi.Cloud Test', 'participants': participants})
sender['body'] = f"Group created. Group id: {response.get('group_id')}" if response.get('group_id') else 'Error'
endpoint = 'messages/text'
elif command == 'GROUP_TEXT':
sender['to'] = os.getenv('GROUP_ID') # Replace with your group ID
sender['body'] = 'Simple text message for the group'
endpoint = 'messages/text'
elif command == 'GROUPS_IDS':
groups_response = send_whapi_request('groups', {'count': 3}, 'GET')
groups = groups_response.get('groups', [])
sender['body'] = ',\n '.join(f"{group['id']} - {group['name']}" for group in groups) if groups else 'No groups'
endpoint = 'messages/text'
else:
sender['body'] = "Hi. Send me a number from the list. Don't forget to change the actual data in the code!\n\n" + \
'\n'.join(f"{i + 1}. {text}" for i, text in enumerate(COMMANDS.values()))
endpoint = 'messages/text'
if endpoint is None:
return 'Ok', 200
response = send_whapi_request(endpoint, sender)
print(f"Response from Whapi: {response}")
return 'Ok', 200
except Exception as e:
print(e)
return str(e), 500
@app.route('/', methods=['GET'])
def index():
return 'Bot is running'
if __name__ == '__main__':
set_hook()
port = os.getenv('PORT') or (443 if os.getenv('BOT_URL', '').startswith('https:') else 80)
app.run(port=port, debug=True)