forked from paulbricman/autocards
-
Notifications
You must be signed in to change notification settings - Fork 2
/
autocards.py
539 lines (474 loc) · 22.3 KB
/
autocards.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
from pipelines import qg_pipeline
from tqdm import tqdm
from pathlib import Path
import pandas as pd
import time
import re
import os
from contextlib import suppress
import json
import urllib.request
import requests
from tika import parser
from bs4 import BeautifulSoup
from pprint import pprint
from epub_conversion.utils import open_book, convert_epub_to_lines
os.environ["TOKENIZERS_PARALLELISM"] = "true"
class Autocards:
"""
Main class used to create flashcards from text. The variable
'store_content' defines whether the original paragraph is stored in the
output. This allows to store context alongside the question and answer pair
but dramatically increase size. The variable notetype refers to the type
of flashcard that must be created: either cloze, basic or both. The
variable wtm allow to specify wether you want to remove the mention of
Autocards in your cards.
"""
def __init__(self,
store_content=True,
in_lang="any",
out_lang="en",
cloze_type="anki",
model = "valhalla/distilt5-qa-qg-hl-12-6",
ans_model = "valhalla/distilt5-qa-qg-hl-12-6"):
print("Loading backend, this can take some time...")
self.store_content = store_content
self.model = model
self.ans_model = ans_model
if len(out_lang) != 2 or len(in_lang) not in [2, 3]:
print("Output and input language has to be a two letter code like 'en' or 'fr'")
raise SystemExit()
if in_lang == "any": # otherwise the user might thought that the
in_lang = "en" # input has to be in english
if in_lang != "en":
print("The document will automatically be translated before \
creating flashcards. Expect lower quality cards than usual.")
try:
print("Loading input translation model...")
from transformers import pipeline
self.in_trans = pipeline(f"translation_{in_lang}_to_en",
model = f"Helsinki-NLP/opus-mt-{in_lang}-en")
except Exception as e:
print(f"Was not able to load translation pipeline: {e}")
print("Resetting input language to english.")
in_lang = "en"
if out_lang != "en":
print("The flashcards will be automatically translated after being \
created. This can result in lower quality cards. Expect lowest quality cards \
than usual.")
try:
print("Loading output translation model...")
from transformers import pipeline
self.out_trans = pipeline(f"translation_en_to_{out_lang}",
model = f"Helsinki-NLP/opus-mt-en-{out_lang}")
except Exception as e:
print(f"Was not able to load translation pipeline: {e}")
print("Resetting output language to english.")
out_lang = "en"
self.in_lang = in_lang
self.out_lang = out_lang
self.cloze_type = cloze_type
self.qg = qg_pipeline('question-generation',
model=model,
ans_model=ans_model)
self.qa_dic_list = []
if self.cloze_type not in ["anki", "SM"]:
print("Invalid cloze type, must be either 'anki' or \
'SM'")
raise SystemExit()
def _call_qg(self, text, title):
"""
Call question generation module, then turn the answer into a
dictionnary containing metadata (clozed formating, creation time,
title, source text)
"""
to_add = []
to_add_cloze = []
to_add_basic = []
if self.in_lang != "en":
text_orig = str(text)
text = self.in_trans(text)[0]["translation_text"]
else:
text_orig = ""
try:
to_add = self.qg(text)
to_add_cloze = [qa for qa in to_add if qa["note_type"] == "cloze"]
to_add_basic = [qa for qa in to_add if qa["note_type"] == "basic"]
except IndexError:
tqdm.write(f"\nSkipping section because no cards \
could be made from that text: '{text}'")
to_add_basic.append({"question": "skipped",
"answer": "skipped",
"cloze": "",
"note_type": "basic"})
cur_time = time.asctime()
if self.store_content is False:
# don't store content, to minimize the size of the output file
stored_text = ""
stored_text_orig = ""
else:
stored_text = text
stored_text_orig = text_orig
# loop over all newly added qa to format the text:
if to_add_basic != []:
for i in range(0, len(to_add_basic)):
if to_add_basic[i]["note_type"] == "basic":
if self.out_lang != "en":
to_add_basic[i]["question_orig"] = to_add_basic[i]["question"]
to_add_basic[i]["answer_orig"] = to_add_basic[i]["answer"]
to_add_basic[i]["question"] = self.out_trans(to_add_basic[i]["question"])[0]["translation_text"]
to_add_basic[i]["answer"] = self.out_trans(to_add_basic[i]["answer"])[0]["translation_text"]
else:
to_add_basic[i]["answer_orig"] = ""
to_add_basic[i]["question_orig"] = ""
clozed_fmt = to_add_basic[i]['question'] + "<br>{{c1::"\
+ to_add_basic[i]['answer'] + "}}"
to_add_basic[i]["basic_in_clozed_format"] = clozed_fmt
if to_add_cloze != []:
for i in range(0, len(to_add_cloze)):
if to_add_cloze[i]["note_type"] == "cloze": # cloze formating
if self.out_lang != "en":
to_add_cloze[i]["cloze_orig"] = to_add_cloze[i]["cloze"]
cl_str_ut = to_add_cloze[i]["cloze_orig"]
cl_str_ut = cl_str_ut.replace("generate question: ", "")
cl_str_ut = cl_str_ut.replace("<hl> ", "{{c1::", 1)
cl_str_ut = cl_str_ut.replace(" <hl>", "}}", 1)
cl_str_ut = cl_str_ut.replace(" </s>", "")
cl_str_ut = cl_str_ut.strip()
to_add_cloze[i]["cloze_orig"] = cl_str_ut
cl_str = to_add_cloze[i]["cloze"]
cl_str = cl_str.replace("generate question: ", "")
cl_str = cl_str.replace("\"", "'")
cl_str = cl_str.replace("<hl> ", "\"").replace(" <hl>", "\"")
cl_str = cl_str.replace(" </s>", "")
cl_str = cl_str.strip()
cl_str = self.out_trans(cl_str)[0]["translation_text"]
cl_str = cl_str.replace("\"", "{{c1::", 1)
cl_str = cl_str.replace("\"", "}}", 1)
to_add_cloze[i]["cloze"] = cl_str
else:
to_add_cloze[i]["cloze_orig"] = ""
cl_str = to_add_cloze[i]["cloze"]
cl_str = cl_str.replace("generate question: ", "")
cl_str = cl_str.replace("<hl> ", "{{c1::", 1)
cl_str = cl_str.replace(" <hl>", "}}", 1)
cl_str = cl_str.replace(" </s>", "")
cl_str = cl_str.strip()
to_add_cloze[i]["cloze"] = cl_str
to_add_cloze[i]["basic_in_clozed_format"] = ""
# merging cloze of the same text as a single qa with several cloze:
if to_add_cloze != []:
for i in range(0, len(to_add_cloze)-1):
if self.cloze_type == "SM":
tqdm.write("SM cloze not yet implemented, luckily \
SuperMemo supports importing from anki format. Hence the anki format will \
be used for your input.")
self.cloze_type = "anki"
if self.cloze_type == "anki" and len(self.qa_dic_list) != i:
cl1 = re.sub(r"{{c\d+::|}}|\s", "",
to_add_cloze[i]["cloze"])
cl2 = re.sub(r"{{c\d+::|}}|\s", "",
to_add_cloze[i+1]["cloze"])
if cl1 == cl2:
match = re.findall(r"{{c\d+::(.*?)}}",
to_add_cloze[i]["cloze"])
match.extend(re.findall(r"{{c\d+::(.*?)}}",
to_add_cloze[i+1]["cloze"]))
clean_cloze = re.sub(r"{{c\d+::|}}", "",
to_add_cloze[i]["cloze"])
if "" in match:
match.remove("")
match = list(set(match))
for cloze_number, q in enumerate(match):
q = q.strip()
new_q = "{{c" + str(cloze_number+1) + "::" +\
q + "}}"
clean_cloze = clean_cloze.replace(q, new_q)
clean_cloze = clean_cloze.strip()
to_add_cloze[i]['cloze'] = clean_cloze + "___TO_REMOVE___"
to_add_cloze[i+1]['cloze'] = clean_cloze
to_add_full = to_add_cloze + to_add_basic
for qa in to_add_full:
qa["date"] = cur_time
qa["source_title"] = title
qa["source_text"] = stored_text
qa["source_text_orig"] = stored_text_orig
if qa["note_type"] == "basic":
self.qa_dic_list.append(qa)
elif not qa["cloze"].endswith("___TO_REMOVE___"):
self.qa_dic_list.append(qa)
tqdm.write(f"Number of question generated so far: {len(self.qa_dic_list)}")
def _sanitize_text(self, text):
"correct common errors in text"
text = text.strip()
# occurs sometimes in epubs apparently:
text = text.replace("\xa0", " ")
# wikipedia style citation:
text = re.sub(r"\[\d*\]", "", text)
return text
def consume_var(self, text, title="untitled variable",
per_paragraph=False):
"Take text as input and create qa pairs"
text = text.replace('\xad ', '')
text = text.strip()
self.title = title
if per_paragraph:
print("Consuming text by paragraph:")
for paragraph in tqdm(text.split('\n\n'),
desc="Processing by paragraph",
unit="paragraph"):
paragraph = paragraph.replace("\n", " ")
self._call_qg(paragraph, title)
else:
print("Consuming text:")
text = re.sub(r"\n\n*", ". ", text)
text = re.sub(r"\.\.*", ".", text)
text = self._sanitize_text(text)
self._call_qg(text, title)
def consume_user_input(self, title="untitled user input"):
"Take user input and create qa pairs"
user_input = input("Enter your text below then press Enter (press\
enter twice to validate input):\n>")
user_input = user_input.strip()
print("\nFeeding your text to Autocards...")
user_input = self._sanitize_text(user_input)
self.consume_var(user_input, title, per_paragraph=False)
print("Done feeding text.")
def consume_pdf(self, pdf_path, per_paragraph=True):
"Take pdf file as input and create qa pairs"
if not Path(pdf_path).exists():
print(f"PDF file not found at {pdf_path}!")
return None
print("Warning: pdf parsing is usually of poor quality because \
there are no good cross platform libraries. Consider using consume_textfile() \
after preprocessing the text yourself.")
title = pdf_path.replace("\\", "").split("/")[-1]
raw = str(parser.from_file(pdf_path))
safe_text = raw.encode('utf-8', errors='ignore')
safe_text = str(safe_text).replace("\\n", "\n").replace("\\t", " ").replace("\\", "")
text = self._sanitize_text(safe_text)
self.consume_var(text, title, per_paragraph)
def consume_textfile(self, filepath, per_paragraph=False):
"Take text file as input and create qa pairs"
if not Path(filepath).exists():
print(f"File not found at {filepath}")
text = open(filepath).read()
text = self._sanitize_text(text)
filename = str(filepath).split("/")[-1]
if per_paragraph is False and len(text) > 300:
ans = input("The text is more than 300 characters long, \
are you sure you don't want to try to split the text by paragraph?\n(y/n)>")
if ans != "n":
per_paragraph = True
self.consume_var(text,
filename,
per_paragraph=per_paragraph)
def consume_epub(self, filepath, title="untitled epub file"):
"Take an epub file as input and create qa pairs"
book = open_book(filepath)
text = " ".join(convert_epub_to_lines(book))
text = re.sub("<.*?>", "", text)
text = text.replace(" ", " ")
text = text.replace("‐", "-")
text = re.sub("&.*?;", " ", text)
# make paragraph limitation as expected in self.consume_var:
text = text.replace("\r", "\n\n")
text = re.sub("\n\n\n*", "\n\n", text)
text = self._sanitize_text(text)
self.consume_var(text, title, per_paragraph=True)
def consume_web(self, source, mode="url", element="p"):
"Take html file (local or via url) and create qa pairs"
if mode not in ["local", "url"]:
return "invalid arguments"
if mode == "local":
soup = BeautifulSoup(open(source), 'xml')
elif mode == "url":
res = requests.get(source, timeout=15)
html = res.content
soup = BeautifulSoup(html, 'xml')
try:
el = soup.article.body.find_all(element)
except AttributeError:
print("Using fallback method to extract page content")
el = soup.find_all(element)
title = ""
with suppress(Exception):
title = soup.find_all('h1')[0].text
if title == "":
with suppress(Exception):
title = soup.find_all('h1').text
if title == "":
with suppress(Exception):
title = soup.find_all('title').text
if title == "":
print("Couldn't find title of the page")
title = source
title = title.strip()
self.title = title
valid_sections = [] # remove text sections that are too short:
for section in el:
section = ' '.join(section.get_text().split())
if len(section) > 40:
valid_sections += [section]
else:
print(f"Ignored string because too short: {section}")
if not valid_sections:
print("No valid sections found, change the 'element' argument\
to look for other html sections than 'p'. Find the relevant 'element' using \
the 'inspect' functionnality in your favorite browser.")
return None
for section in tqdm(valid_sections,
desc="Processing by section",
unit="section"):
section = self._sanitize_text(section)
self._call_qg(section, title)
def clear_qa(self):
"Delete currently stored qa pairs"
self.qa_dic_list = []
def string_output(self, prefix='', jeopardy=False):
"Return qa pairs to the user"
if prefix != "" and prefix[-1] != ' ':
prefix += ' '
if len(self.qa_dic_list) == 0:
print("No qa generated yet!")
return None
res = []
for qa_pair in self.qa_dic_list:
if qa_pair['note_type'] == "basic":
if jeopardy:
string = f"\"{prefix}{qa_pair['answer']}\",\" {qa_pair['question']}\""
else:
string = f"\"{prefix}{qa_pair['question']}\",\" {qa_pair['answer']}\""
elif qa_pair['note_type'] == "cloze":
string = f"\"{prefix}{qa_pair['cloze']}\""
res.append(string)
return res
def print(self, *args, **kwargs):
"Print qa pairs to the user"
print(self.string_output(*args, **kwargs))
def pprint(self, *args, **kwargs):
"Prettyprint qa pairs to the user"
pprint(self.string_output(*args, **kwargs))
def _combine_df_columns(self, row, col_names):
combined = ""
for col in col_names:
combined += f"{col.upper()}: {dict(row)[col]}<br>\n"
return "#"*15 + "Combined columns:<br>\n" + combined + "#"*15
def pandas_df(self, prefix=''):
if len(self.qa_dic_list) == 0:
print("No qa generated yet!")
return None
"Output a Pandas DataFrame containing qa pairs and metadata"
df = pd.DataFrame(columns=list(self.qa_dic_list[0].keys()))
for qa in self.qa_dic_list:
df = df.append(qa, ignore_index=True)
for i in df.index:
for c in df.columns:
if pd.isna(df.loc[i, c]):
# otherwise export functions break:
df.loc[i, c] = ""
if self.in_lang == "en":
df = df.drop(columns=["source_text_orig"], axis=1)
if self.out_lang == "en":
df = df.drop(columns=["cloze_orig", "question_orig", "answer_orig"],
axis=1)
df["combined_columns"] = [self._combine_df_columns(df.loc[x, :], df.columns)
for x in df.index ]
return df
def to_csv(self, filename="Autocards_export.csv", prefix=''):
"Export qa pairs as csv file"
if len(self.qa_dic_list) == 0:
print("No qa generated yet!")
return None
if prefix != "" and prefix[-1] != ' ':
prefix += ' '
df = self.pandas_df(prefix)
for i in df.index:
for c in df.columns:
df.loc[i, c] = str(df.loc[i, c]).replace(",", r"\,")
if ".csv" in filename:
filename = filename.replace(".csv", "")
df[df["note_type"] == "cloze"].to_csv(f"{filename}_cloze.csv")
df[df["note_type"] != "cloze"].to_csv(f"{filename}_basic.csv")
print(f"Done writing qa pairs to {filename}_cloze.csv and {filename}_basic.csv")
def to_json(self, filename="Autocards_export.json", prefix=''):
"Export qa pairs as json file"
if len(self.qa_dic_list) == 0:
print("No qa generated yet!")
return None
if prefix != "" and prefix[-1] != ' ':
prefix += ' '
df = self.pandas_df(prefix)
if ".json" in filename:
filename = filename.replace(".json", "")
df[df["note_type"] == "cloze"].to_json(f"{filename}_cloze.json")
df[df["note_type"] != "cloze"].to_json(f"{filename}_basic.json")
print(f"Done writing qa pairs to {filename}_cloze.json and \
{filename}_basic.json")
def _ankiconnect_invoke(self, action, **params):
"send requests to ankiconnect addon"
def request_wrapper(action, **params):
return {'action': action, 'params': params, 'version': 6}
requestJson = json.dumps(request_wrapper(action, **params)
).encode('utf-8')
try:
response = json.load(urllib.request.urlopen(
urllib.request.Request(
'http://localhost:8765',
requestJson)))
except (ConnectionRefusedError, urllib.error.URLError) as e:
print(f"{e}: is Anki open? Is the addon 'anki-connect' enabled?")
raise SystemExit()
if len(response) != 2:
raise Exception('response has an unexpected number of fields')
if 'error' not in response:
raise Exception('response is missing required error field')
if 'result' not in response:
raise Exception('response is missing required result field')
if response['error'] == "Model name already exists":
print("Note type model already existing.")
if response['error'] is not None:
raise Exception(response['error'])
return response['result']
def to_anki(self, deckname="Autocards_export", tags=[""]):
"Export cards to anki using anki-connect addon"
df = self.pandas_df()
df["generation_order"] = [str(int(x)+1) for x in list(df.index)]
columns = df.columns.tolist()
columns.remove("combined_columns")
tags.append(f"Autocards::{self.title.replace(' ', '_')}")
with suppress(ValueError):
tags.remove("")
# model formatting
note_list = []
for entry in df.index:
note_list.append({"deckName": deckname,
"modelName": "Autocards",
"tags": tags,
"fields": df.loc[entry, :].to_dict()
})
template_content = [{"Front": "",
"Back": ""}]
# send new card type to anki
try:
self._ankiconnect_invoke(action="createModel",
modelName="Autocards",
inOrderFields=[
"combined_columns"] + columns,
cardTemplates=template_content)
except Exception as e:
print(f"{e}")
# create new deck
self._ankiconnect_invoke(action="createDeck", deck=deckname)
# send notes to anki
out = self._ankiconnect_invoke(action="addNotes", notes=note_list)
if None in out:
print(f"{len(note_list) - len(list(set(out)))} cards were not \
sent correctly.")
if list(set(out)) != [None]:
print("Cards sent to anki collection.\nYou can now open anki and use \
'change note type' to export the fields you need to your prefered notetype.")
return out
else:
print("An error happened: no cards were successfuly sent to anki.")
return out