Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

solve task execution issues #90

Merged
merged 1 commit into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions api/apps/document_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,16 @@ def run():
try:
for id in req["doc_ids"]:
info = {"run": str(req["run"]), "progress": 0}
if str(req["run"]) == TaskStatus.RUNNING.value:info["progress_msg"] = ""
if str(req["run"]) == TaskStatus.RUNNING.value:
info["progress_msg"] = ""
info["chunk_num"] = 0
info["token_num"] = 0
DocumentService.update_by_id(id, info)
if str(req["run"]) == TaskStatus.CANCEL.value:
tenant_id = DocumentService.get_tenant_id(id)
if not tenant_id:
return get_data_error_result(retmsg="Tenant not found!")
ELASTICSEARCH.deleteByQuery(Q("match", doc_id=id), idxnm=search.index_name(tenant_id))
#if str(req["run"]) == TaskStatus.CANCEL.value:
tenant_id = DocumentService.get_tenant_id(id)
if not tenant_id:
return get_data_error_result(retmsg="Tenant not found!")
ELASTICSEARCH.deleteByQuery(Q("match", doc_id=id), idxnm=search.index_name(tenant_id))

return get_json_result(data=True)
except Exception as e:
Expand Down Expand Up @@ -311,13 +314,17 @@ def change_parser():
if doc.type == FileType.VISUAL or re.search(r"\.(ppt|pptx|pages)$", doc.name):
return get_data_error_result(retmsg="Not supported yet!")

e = DocumentService.update_by_id(doc.id, {"parser_id": req["parser_id"], "progress":0, "progress_msg": ""})
e = DocumentService.update_by_id(doc.id, {"parser_id": req["parser_id"], "progress":0, "progress_msg": "", "run": "0"})
if not e:
return get_data_error_result(retmsg="Document not found!")
if doc.token_num>0:
e = DocumentService.increment_chunk_num(doc.id, doc.kb_id, doc.token_num*-1, doc.chunk_num*-1, doc.process_duation*-1)
if not e:
return get_data_error_result(retmsg="Document not found!")
tenant_id = DocumentService.get_tenant_id(req["doc_id"])
if not tenant_id:
return get_data_error_result(retmsg="Tenant not found!")
ELASTICSEARCH.deleteByQuery(Q("match", doc_id=doc.id), idxnm=search.index_name(tenant_id))

return get_json_result(data=True)
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion api/db/services/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def do_cancel(cls, id):
try:
task = cls.model.get_by_id(id)
_, doc = DocumentService.get_by_id(task.doc_id)
return doc.run == TaskStatus.CANCEL.value
return doc.run == TaskStatus.CANCEL.value or doc.progress < 0
except Exception as e:
pass
return True
Expand Down
18 changes: 9 additions & 9 deletions api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,6 @@

DATABASE = decrypt_database_config(name="mysql")

# Logger
LoggerFactory.set_directory(os.path.join(get_project_base_directory(), "logs", "api"))
# {CRITICAL: 50, FATAL:50, ERROR:40, WARNING:30, WARN:30, INFO:20, DEBUG:10, NOTSET:0}
LoggerFactory.LEVEL = 10

stat_logger = getLogger("stat")
access_logger = getLogger("access")
database_logger = getLogger("database")

# Switch
# upload
UPLOAD_DATA_FROM_CLIENT = True
Expand Down Expand Up @@ -144,6 +135,15 @@

retrievaler = search.Dealer(ELASTICSEARCH)

# Logger
LoggerFactory.set_directory(os.path.join(get_project_base_directory(), "logs", "api"))
# {CRITICAL: 50, FATAL:50, ERROR:40, WARNING:30, WARN:30, INFO:20, DEBUG:10, NOTSET:0}
LoggerFactory.LEVEL = 10

stat_logger = getLogger("stat")
access_logger = getLogger("access")
database_logger = getLogger("database")

class CustomEnum(Enum):
@classmethod
def valid(cls, value):
Expand Down
27 changes: 22 additions & 5 deletions deepdoc/parser/pdf_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import re
import pdfplumber
import logging
from PIL import Image
from PIL import Image, ImageDraw
import numpy as np

from api.db import ParserType
Expand Down Expand Up @@ -930,13 +930,25 @@ def remove_tag(self, txt):

def crop(self, text, ZM=3):
imgs = []
poss = []
for tag in re.findall(r"@@[0-9-]+\t[0-9.\t]+##", text):
pn, left, right, top, bottom = tag.strip(
"#").strip("@").split("\t")
left, right, top, bottom = float(left), float(
right), float(top), float(bottom)
poss.append(([int(p) - 1 for p in pn.split("-")], left, right, top, bottom))
if not poss: return

max_width = np.max([right-left for (_, left, right, _, _) in poss])
GAP = 6
pos = poss[0]
poss.insert(0, ([pos[0][0]], pos[1], pos[2], max(0, pos[3]-120), max(pos[3]-GAP, 0)))
pos = poss[-1]
poss.append(([pos[0][-1]], pos[1], pos[2], min(self.page_images[pos[0][-1]].size[1]/ZM, pos[4]+GAP), min(self.page_images[pos[0][-1]].size[1]/ZM, pos[4]+120)))

for ii, (pns, left, right, top, bottom) in enumerate(poss):
right = left + max_width
bottom *= ZM
pns = [int(p) - 1 for p in pn.split("-")]
for pn in pns[1:]:
bottom += self.page_images[pn - 1].size[1]
imgs.append(
Expand All @@ -959,16 +971,21 @@ def crop(self, text, ZM=3):

if not imgs:
return
GAP = 2
height = 0
for img in imgs:
height += img.size[1] + GAP
height = int(height)
width = int(np.max([i.size[0] for i in imgs]))
pic = Image.new("RGB",
(int(np.max([i.size[0] for i in imgs])), height),
(width, height),
(245, 245, 245))
height = 0
for img in imgs:
for ii, img in enumerate(imgs):
if ii == 0 or ii + 1 == len(imgs):
img = img.convert('RGBA')
overlay = Image.new('RGBA', img.size, (0, 0, 0, 0))
overlay.putalpha(128)
img = Image.alpha_composite(img, overlay).convert("RGB")
pic.paste(img, (0, int(height)))
height += img.size[1] + GAP
return pic
Expand Down
2 changes: 1 addition & 1 deletion deepdoc/vision/layout_recognizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class LayoutRecognizer(Recognizer):
"Equation",
]
def __init__(self, domain):
super().__init__(self.labels, domain) #, os.path.join(get_project_base_directory(), "rag/res/deepdoc/"))
super().__init__(self.labels, domain, os.path.join(get_project_base_directory(), "rag/res/deepdoc/"))

def __call__(self, image_list, ocr_res, scale_factor=3, thr=0.2, batch_size=16):
def __is_garbage(b):
Expand Down
2 changes: 1 addition & 1 deletion deepdoc/vision/table_structure_recognizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class TableStructureRecognizer(Recognizer):
]

def __init__(self):
super().__init__(self.labels, "tsr")#,os.path.join(get_project_base_directory(), "rag/res/deepdoc/"))
super().__init__(self.labels, "tsr",os.path.join(get_project_base_directory(), "rag/res/deepdoc/"))

def __call__(self, images, thr=0.2):
tbls = super().__call__(images, thr)
Expand Down
25 changes: 8 additions & 17 deletions rag/app/book.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import copy
import re
from rag.nlp import bullets_category, is_english, tokenize, remove_contents_table, \
hierarchical_merge, make_colon_as_title, naive_merge, random_choices
hierarchical_merge, make_colon_as_title, naive_merge, random_choices, tokenize_table
from rag.nlp import huqie
from deepdoc.parser import PdfParser, DocxParser

Expand Down Expand Up @@ -90,25 +90,16 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
make_colon_as_title(sections)
bull = bullets_category([t for t in random_choices([t for t,_ in sections], k=100)])
if bull >= 0: cks = hierarchical_merge(bull, sections, 3)
else: cks = naive_merge(sections, kwargs.get("chunk_token_num", 256), kwargs.get("delimer", "\n。;!?"))
else:
sections = [s.split("@") for s in sections]
sections = [(pr[0], "@"+pr[1]) for pr in sections if len(pr)==2]
cks = naive_merge(sections, kwargs.get("chunk_token_num", 256), kwargs.get("delimer", "\n。;!?"))

sections = [t for t, _ in sections]
# is it English
eng = lang.lower() == "english"#is_english(random_choices(sections, k=218))
eng = lang.lower() == "english"#is_english(random_choices([t for t, _ in sections], k=218))

res = tokenize_table(tbls, doc, eng)

res = []
# add tables
for img, rows in tbls:
bs = 10
de = ";" if eng else ";"
for i in range(0, len(rows), bs):
d = copy.deepcopy(doc)
r = de.join(rows[i:i + bs])
r = re.sub(r"\t——(来自| in ).*”%s" % de, "", r)
tokenize(d, r, eng)
d["image"] = img
res.append(d)
print("TABLE", d["content_with_weight"])
# wrap up to es documents
for ck in cks:
d = copy.deepcopy(doc)
Expand Down
15 changes: 2 additions & 13 deletions rag/app/manual.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re

from api.db import ParserType
from rag.nlp import huqie, tokenize
from rag.nlp import huqie, tokenize, tokenize_table
from deepdoc.parser import PdfParser
from rag.utils import num_tokens_from_string

Expand Down Expand Up @@ -81,18 +81,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
# is it English
eng = lang.lower() == "english"#pdf_parser.is_english

res = []
# add tables
for img, rows in tbls:
bs = 10
de = ";" if eng else ";"
for i in range(0, len(rows), bs):
d = copy.deepcopy(doc)
r = de.join(rows[i:i + bs])
r = re.sub(r"\t——(来自| in ).*”%s" % de, "", r)
tokenize(d, r, eng)
d["image"] = img
res.append(d)
res = tokenize_table(tbls, doc, eng)

i = 0
chunk = []
Expand Down
15 changes: 3 additions & 12 deletions rag/app/naive.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import copy
import re
from rag.app import laws
from rag.nlp import huqie, is_english, tokenize, naive_merge
from rag.nlp import huqie, is_english, tokenize, naive_merge, tokenize_table
from deepdoc.parser import PdfParser
from rag.settings import cron_logger

Expand Down Expand Up @@ -72,17 +72,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
pdf_parser = Pdf()
sections, tbls = pdf_parser(filename if not binary else binary,
from_page=from_page, to_page=to_page, callback=callback)
# add tables
for img, rows in tbls:
bs = 10
de = ";" if eng else ";"
for i in range(0, len(rows), bs):
d = copy.deepcopy(doc)
r = de.join(rows[i:i + bs])
r = re.sub(r"\t——(来自| in ).*”%s" % de, "", r)
tokenize(d, r, eng)
d["image"] = img
res.append(d)
res = tokenize_table(tbls, doc, eng)
elif re.search(r"\.txt$", filename, re.IGNORECASE):
callback(0.1, "Start to parse.")
txt = ""
Expand All @@ -106,6 +96,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
# wrap up to es documents
for ck in cks:
print("--", ck)
if not ck:continue
d = copy.deepcopy(doc)
if pdf_parser:
d["image"] = pdf_parser.crop(ck)
Expand Down
15 changes: 2 additions & 13 deletions rag/app/paper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from collections import Counter

from api.db import ParserType
from rag.nlp import huqie, tokenize
from rag.nlp import huqie, tokenize, tokenize_table
from deepdoc.parser import PdfParser
import numpy as np
from rag.utils import num_tokens_from_string
Expand Down Expand Up @@ -158,18 +158,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
eng = lang.lower() == "english"#pdf_parser.is_english
print("It's English.....", eng)

res = []
# add tables
for img, rows in paper["tables"]:
bs = 10
de = ";" if eng else ";"
for i in range(0, len(rows), bs):
d = copy.deepcopy(doc)
r = de.join(rows[i:i + bs])
r = re.sub(r"\t——(来自| in ).*”%s" % de, "", r)
tokenize(d, r)
d["image"] = img
res.append(d)
res = tokenize_table(paper["tables"], doc, eng)

if paper["abstract"]:
d = copy.deepcopy(doc)
Expand Down
2 changes: 1 addition & 1 deletion rag/app/presentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

class Ppt(PptParser):
def __call__(self, fnm, from_page, to_page, callback=None):
txts = super.__call__(fnm, from_page, to_page)
txts = super().__call__(fnm, from_page, to_page)

callback(0.5, "Text extraction finished.")
import aspose.slides as slides
Expand Down
2 changes: 1 addition & 1 deletion rag/app/resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def chunk(filename, binary=None, callback=None, **kwargs):
resume = remote_call(filename, binary)
if len(resume.keys()) < 7:
callback(-1, "Resume is not successfully parsed.")
return []
raise Exception("Resume parser remote call fail!")
callback(0.6, "Done parsing. Chunking...")
print(json.dumps(resume, ensure_ascii=False, indent=2))

Expand Down
22 changes: 19 additions & 3 deletions rag/nlp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

import copy

from nltk.stem import PorterStemmer
stemmer = PorterStemmer()
Expand Down Expand Up @@ -80,6 +80,20 @@ def tokenize(d, t, eng):
d["content_sm_ltks"] = huqie.qieqie(d["content_ltks"])


def tokenize_table(tbls, doc, eng, batch_size=10):
res = []
# add tables
for img, rows in tbls:
de = "; " if eng else "; "
for i in range(0, len(rows), batch_size):
d = copy.deepcopy(doc)
r = de.join(rows[i:i + batch_size])
tokenize(d, r, eng)
d["image"] = img
res.append(d)
return res


def remove_contents_table(sections, eng=False):
i = 0
while i < len(sections):
Expand Down Expand Up @@ -201,10 +215,12 @@ def add_chunk(t, pos):
tnum = num_tokens_from_string(t)
if tnum < 8: pos = ""
if tk_nums[-1] > chunk_token_num:
cks.append(t + pos)
if t.find(pos) < 0: t += pos
cks.append(t)
tk_nums.append(tnum)
else:
cks[-1] += t + pos
if cks[-1].find(pos) < 0: t += pos
cks[-1] += t
tk_nums[-1] += tnum

for sec, pos in sections:
Expand Down
4 changes: 3 additions & 1 deletion rag/nlp/search.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
import json
import re
from copy import deepcopy

from elasticsearch_dsl import Q, Search
from typing import List, Optional, Dict, Union
from dataclasses import dataclass
Expand Down Expand Up @@ -98,7 +100,7 @@ def search(self, req, idxnm, emb_mdl=None):
del s["highlight"]
q_vec = s["knn"]["query_vector"]
es_logger.info("【Q】: {}".format(json.dumps(s)))
res = self.es.search(s, idxnm=idxnm, timeout="600s", src=src)
res = self.es.search(deepcopy(s), idxnm=idxnm, timeout="600s", src=src)
es_logger.info("TOTAL: {}".format(self.es.getTotal(res)))
if self.es.getTotal(res) == 0 and "knn" in s:
bqry, _ = self.qryr.question(qst, min_match="10%")
Expand Down
2 changes: 1 addition & 1 deletion rag/svr/task_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def new_task():
tsks.append(task)
else:
tsks.append(new_task())
print(tsks)

bulk_insert_into_db(Task, tsks, True)
set_dispatching(r["id"])
tmf.write(str(r["update_time"]) + "\n")
Expand Down
2 changes: 1 addition & 1 deletion rag/svr/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def build(row):
kb_id=row["kb_id"], parser_config=row["parser_config"], tenant_id=row["tenant_id"])
except Exception as e:
if re.search("(No such file|not found)", str(e)):
callback(-1, "Can not find file <%s>" % row["doc_name"])
callback(-1, "Can not find file <%s>" % row["name"])
else:
callback(-1, f"Internal server error: %s" %
str(e).replace("'", ""))
Expand Down