-
Notifications
You must be signed in to change notification settings - Fork 3
/
handler.py
310 lines (235 loc) · 9.92 KB
/
handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
import logging
from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton, BotCommand
from telegram.ext import Updater, MessageHandler, CommandHandler, Filters, CallbackContext, CallbackQueryHandler
import json
import hashlib
import os
import sqlite3
import asyncio
import re
DB_NAME = './db/tobedo.sqlite3'
CHECK_CHAR = '✅'
UNCHECK_CHAR = '⬜'
def gen_db():
# table: Replies
# message_and_chat_id, reply_id, created_at
# unique index on message_id
with sqlite3.connect(DB_NAME) as db:
db.execute('''
CREATE TABLE IF NOT EXISTS Replies (
message_and_chat_id VARCHAR(255) NOT NULL,
reply_and_chat_id VARCHAR(255) NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
state TEXT,
PRIMARY KEY (message_and_chat_id),
UNIQUE (reply_and_chat_id)
)
''')
# store mode for each chat
db.execute('''
CREATE TABLE IF NOT EXISTS ChatModes (
chat_id VARCHAR(255) NOT NULL,
mode VARCHAR(255) NOT NULL,
PRIMARY KEY (chat_id)
)
''')
db.commit()
def set_chat_mode(update, context, mode):
if not check_admin(update, context):
return
chat_id = update.message.chat_id
print('setting chat mode', chat_id, mode)
with sqlite3.connect(DB_NAME) as db:
db.execute('''
INSERT OR REPLACE INTO ChatModes (chat_id, mode) VALUES (?, ?)
''', (chat_id, mode))
db.commit()
# reply to user
update.message.reply_text(f'Now checklist will appear in reply only to messages starting with "todo" or "td" or "c "' if mode == 'todo' else \
'Now checklist will appear in reply to any message (expect starting with . or !)')
def get_chat_mode(chat_id):
with sqlite3.connect(DB_NAME) as db:
cursor = db.execute('''
SELECT mode FROM ChatModes WHERE chat_id = ?
''', (chat_id,))
row = cursor.fetchone()
if not row:
return None
return row[0]
def cleanup_old_replies():
# delete posts older then 1 year
with sqlite3.connect(DB_NAME) as db:
db.execute('''
DELETE FROM Replies WHERE created_at < datetime('now', '-1 year')
''')
db.commit()
def get_reply_by_message_id(message_id, chat_id) -> tuple:
with sqlite3.connect(DB_NAME) as db:
cursor = db.execute('''
SELECT reply_and_chat_id, state FROM Replies WHERE message_and_chat_id = ?
''', (f"{chat_id}_{message_id}",))
row = cursor.fetchone()
if not row:
return None, None
return row[0], json.loads(row[1]) if row[1] else {}
def insert_reply(message_id, reply_id, chat_id):
with sqlite3.connect(DB_NAME) as db:
db.execute(
'''
INSERT INTO Replies (message_and_chat_id, reply_and_chat_id, state) VALUES (?, ?, ?)
''',
(f"{chat_id}_{message_id}", f"{chat_id}_{reply_id}", "{}")
)
db.commit()
def update_reply(reply_id, chat_id, state: dict):
with sqlite3.connect(DB_NAME) as db:
db.execute('''
UPDATE Replies SET state = ? WHERE reply_and_chat_id = ?
''',
(json.dumps(state), f"{chat_id}_{reply_id}"))
db.commit()
logger = logging.getLogger(__name__)
TOKEN = os.environ.get('TG_TOKEN')
if not TOKEN:
print('TG_TOKEN not specified in env, please set TG_TOKEN with your bot token generated by @BotFather')
exit(1)
def button_click(update, context):
query = update.callback_query
if query.data.startswith('toggle__'):
index = query.data.replace('toggle__', '')
if not query.message.reply_markup:
print(f'no reply_markup for message_id {query.message.message_id} and chat_id {query.message.chat_id}')
return
for i, btn in enumerate(query.message.reply_markup.inline_keyboard):
checked = btn[0].text.startswith(CHECK_CHAR)
btn_text = btn[0].text.replace(f'{UNCHECK_CHAR} ', '').replace(f'{CHECK_CHAR} ', '')
if i == int(index):
print('found a btn', btn_text)
new_text = f'{CHECK_CHAR} {btn_text}' if not checked else f'{UNCHECK_CHAR} {btn_text}'
btn[0].text = new_text
break
state = {}
for btn in query.message.reply_markup.inline_keyboard:
checked = btn[0].text.startswith(CHECK_CHAR)
btn_text = btn[0].text.replace(f'{UNCHECK_CHAR} ', '').replace(f'{CHECK_CHAR} ', '')
state[btn_text] = checked
print('setting state', state)
reply_id = query.message.message_id
update_reply(reply_id, query.message.chat_id, state)
context.bot.edit_message_text(
chat_id=query.message.chat_id,
message_id=query.message.message_id,
text=f'Click to toggle',
reply_markup=query.message.reply_markup
)
def md5hash(text):
return hashlib.md5(text.encode('utf-8')).hexdigest()
TODO_MODE_REGEX = re.compile(r'^(todo|tobedo|td|tbd|tobedone|checklist|check |cl |c )\s?', re.IGNORECASE)
def echo(update: Update, context: CallbackContext) -> None:
"""
This function would be added to the dispatcher as a handler for messages coming from the Bot API
"""
msg = None
update_object = None
if update.channel_post:
msg = update.channel_post.text
update_object = update.channel_post
elif update.message:
msg = update.message.text
update_object = update.message
elif update.edited_message:
msg = update.edited_message.text
update_object = update.edited_message
elif update.edited_channel_post:
msg = update.edited_channel_post.text
update_object = update.edited_channel_post
if not msg:
print('Unrecognized update')
return
is_update = update.edited_message or update.edited_channel_post
previous_state = {}
if is_update:
message_id = update_object.message_id
reply_id_with_message_id, previous_state = get_reply_by_message_id(message_id, update_object.chat_id)
if not reply_id_with_message_id:
print(f'reply_id for message_id {message_id} and chat_id {update_object.chat_id} not found')
return
if msg.startswith('/'):
return
mode = get_chat_mode(update_object.chat_id)
if mode == None or mode == 'all':
if msg.startswith('!') or msg.startswith('.'):
return
else:
if re.match(TODO_MODE_REGEX, msg) == None:
return
msg = re.sub(TODO_MODE_REGEX, '', msg)
lines = msg.split('\n')
keyboard = []
index = 0
for line in lines:
line_strip = line.strip()
if line_strip == '':
continue
print('previous_state', previous_state)
keyboard.append([InlineKeyboardButton(
f"{CHECK_CHAR if previous_state.get(line_strip, False) else UNCHECK_CHAR} {line_strip}",
callback_data=f"toggle__{index}"
)])
index += 1
reply_markup = InlineKeyboardMarkup(keyboard)
if not is_update:
# add new reply
reply = update_object.reply_text('Click to toggle', reply_markup=reply_markup)
reply_id = reply.message_id
message_id = update_object.message_id
chat_id = update_object.chat_id
insert_reply(message_id, reply_id, chat_id)
else:
# find previous reply to same message and edit it
# this is a workaround for editing messages in channels
#
reply_id = reply_id_with_message_id.split('_')[1]
context.bot.edit_message_reply_markup(
chat_id=update_object.chat_id,
message_id=reply_id,
reply_markup=reply_markup
)
def check_admin(update, context):
now_in_channel = update.message.chat.type == 'channel'
now_in_group = update.message.chat.type == 'group'
ia_am_admin = update.message.chat.get_member(context.bot.id).status in ['administrator', 'creator']
if now_in_channel or now_in_group and not ia_am_admin:
update.message.reply_text(f'Please promote me to admin in {update.message.chat.type} settings to enable me to generate checklists in replies. I need it because you added me to a {update.message.chat.type}, also you can use me in direct chat without promoting me')
return False
return True
def startBotPrompt(update, context):
if check_admin(update, context):
update.message.reply_text("""Hello! I am your checklist bot. I will reply to your messages with a checklist. You can click on each item to toggle it.
You can set the mode to "todo" with /only_todo command to make me reply only to messages starting with "todo" or "td" or "c " """)
def main() -> None:
gen_db()
cleanup_old_replies()
updater = Updater(TOKEN)
# Get the dispatcher to register handlers
# Then, we register each handler and the conditions the update must meet to trigger it
dispatcher = updater.dispatcher
dispatcher.add_handler(CallbackQueryHandler(button_click))
# Echo any message that is not a command
dispatcher.add_handler(MessageHandler(~Filters.command, echo))
# add command /setmode
# normal, todo, checklist
dispatcher.add_handler(CommandHandler('only_todo', lambda up, con: set_chat_mode(up, con, 'todo')))
dispatcher.add_handler(CommandHandler('all', lambda up, con: set_chat_mode(up, con, 'all')))
dispatcher.add_handler(CommandHandler('start', startBotPrompt))
dispatcher.bot.set_my_commands([
BotCommand("only_todo", "Set mode to 'todo' messages"),
BotCommand("all", "Set mode to any messages"),
BotCommand("start", "Start the bot")
])
# Start the Bot
updater.start_polling()
# Run the bot until you press Ctrl-C
updater.idle()
if __name__ == '__main__':
main()