Skip to content

Commit 2c2b2e0

Browse files
authored
API: start parsing (#1377)
### What problem does this PR solve? Make the document start parsing. ### Type of change - [x] New Feature (non-breaking change which adds functionality)
1 parent 8d7fb12 commit 2c2b2e0

File tree

4 files changed

+438
-18
lines changed

4 files changed

+438
-18
lines changed

api/apps/dataset_api.py

+169-18
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,35 @@
1818
import warnings
1919
from io import BytesIO
2020

21+
from elasticsearch_dsl import Q
2122
from flask import request, send_file
2223
from flask_login import login_required, current_user
2324
from httpx import HTTPError
24-
from minio import S3Error
2525

2626
from api.contants import NAME_LENGTH_LIMIT
27-
from api.db import FileType, ParserType, FileSource
27+
from api.db import FileType, ParserType, FileSource, TaskStatus
2828
from api.db import StatusEnum
29-
from api.db.db_models import File
29+
from api.db.db_models import File, Task
3030
from api.db.services import duplicate_name
3131
from api.db.services.document_service import DocumentService
3232
from api.db.services.file2document_service import File2DocumentService
3333
from api.db.services.file_service import FileService
3434
from api.db.services.knowledgebase_service import KnowledgebaseService
35+
from api.db.services.task_service import TaskService
3536
from api.db.services.user_service import TenantService
3637
from api.settings import RetCode
3738
from api.utils import get_uuid
3839
from api.utils.api_utils import construct_json_result, construct_error_response
3940
from api.utils.api_utils import construct_result, validate_request
4041
from api.utils.file_utils import filename_type, thumbnail
42+
from rag.app import book, laws, manual, naive, one, paper, presentation, qa, resume, table, picture
43+
from rag.nlp import search
44+
from rag.utils.es_conn import ELASTICSEARCH
4145
from rag.utils.minio_conn import MINIO
4246

4347
MAXIMUM_OF_UPLOADING_FILES = 256
4448

49+
4550
# ------------------------------ create a dataset ---------------------------------------
4651

4752
@manager.route("/", methods=["POST"])
@@ -116,6 +121,7 @@ def create_dataset():
116121
except Exception as e:
117122
return construct_error_response(e)
118123

124+
119125
# -----------------------------list datasets-------------------------------------------------------
120126

121127
@manager.route("/", methods=["GET"])
@@ -135,6 +141,7 @@ def list_datasets():
135141
except HTTPError as http_err:
136142
return construct_json_result(http_err)
137143

144+
138145
# ---------------------------------delete a dataset ----------------------------
139146

140147
@manager.route("/<dataset_id>", methods=["DELETE"])
@@ -162,13 +169,15 @@ def remove_dataset(dataset_id):
162169

163170
# delete the dataset
164171
if not KnowledgebaseService.delete_by_id(dataset_id):
165-
return construct_json_result(code=RetCode.DATA_ERROR, message="There was an error during the dataset removal process. "
166-
"Please check the status of the RAGFlow server and try the removal again.")
172+
return construct_json_result(code=RetCode.DATA_ERROR,
173+
message="There was an error during the dataset removal process. "
174+
"Please check the status of the RAGFlow server and try the removal again.")
167175
# success
168176
return construct_json_result(code=RetCode.SUCCESS, message=f"Remove dataset: {dataset_id} successfully")
169177
except Exception as e:
170178
return construct_error_response(e)
171179

180+
172181
# ------------------------------ get details of a dataset ----------------------------------------
173182

174183
@manager.route("/<dataset_id>", methods=["GET"])
@@ -182,6 +191,7 @@ def get_dataset(dataset_id):
182191
except Exception as e:
183192
return construct_json_result(e)
184193

194+
185195
# ------------------------------ update a dataset --------------------------------------------
186196

187197
@manager.route("/<dataset_id>", methods=["PUT"])
@@ -209,8 +219,9 @@ def update_dataset(dataset_id):
209219
if name.lower() != dataset.name.lower() \
210220
and len(KnowledgebaseService.query(name=name, tenant_id=current_user.id,
211221
status=StatusEnum.VALID.value)) > 1:
212-
return construct_json_result(code=RetCode.DATA_ERROR, message=f"The name: {name.lower()} is already used by other "
213-
f"datasets. Please choose a different name.")
222+
return construct_json_result(code=RetCode.DATA_ERROR,
223+
message=f"The name: {name.lower()} is already used by other "
224+
f"datasets. Please choose a different name.")
214225

215226
dataset_updating_data = {}
216227
chunk_num = req.get("chunk_num")
@@ -222,17 +233,21 @@ def update_dataset(dataset_id):
222233
if chunk_num == 0:
223234
dataset_updating_data["embd_id"] = req["embedding_model_id"]
224235
else:
225-
construct_json_result(code=RetCode.DATA_ERROR, message="You have already parsed the document in this "
236+
return construct_json_result(code=RetCode.DATA_ERROR, message="You have already parsed the document in this "
226237
"dataset, so you cannot change the embedding "
227238
"model.")
228239
# only if chunk_num is 0, the user can update the chunk_method
229-
if req.get("chunk_method"):
230-
if chunk_num == 0:
231-
dataset_updating_data['parser_id'] = req["chunk_method"]
232-
else:
240+
if "chunk_method" in req:
241+
type_value = req["chunk_method"]
242+
if is_illegal_value_for_enum(type_value, ParserType):
243+
return construct_json_result(message=f"Illegal value {type_value} for 'chunk_method' field.",
244+
code=RetCode.DATA_ERROR)
245+
if chunk_num != 0:
233246
construct_json_result(code=RetCode.DATA_ERROR, message="You have already parsed the document "
234247
"in this dataset, so you cannot "
235248
"change the chunk method.")
249+
dataset_updating_data["parser_id"] = req["template_type"]
250+
236251
# convert the photo parameter to avatar
237252
if req.get("photo"):
238253
dataset_updating_data["avatar"] = req["photo"]
@@ -265,6 +280,7 @@ def update_dataset(dataset_id):
265280
except Exception as e:
266281
return construct_error_response(e)
267282

283+
268284
# --------------------------------content management ----------------------------------------------
269285

270286
# ----------------------------upload files-----------------------------------------------------
@@ -339,9 +355,10 @@ def upload_documents(dataset_id):
339355
location += "_"
340356

341357
blob = file.read()
358+
342359
# the content is empty, raising a warning
343360
if blob == b'':
344-
warnings.warn(f"[WARNING]: The file {filename} is empty.")
361+
warnings.warn(f"[WARNING]: The content of the file {filename} is empty.")
345362

346363
MINIO.put(dataset_id, location, blob)
347364

@@ -453,6 +470,7 @@ def list_documents(dataset_id):
453470
except Exception as e:
454471
return construct_error_response(e)
455472

473+
456474
# ----------------------------update: enable rename-----------------------------------------------------
457475
@manager.route("/<dataset_id>/documents/<document_id>", methods=["PUT"])
458476
@login_required
@@ -555,6 +573,7 @@ def update_document(dataset_id, document_id):
555573
def is_illegal_value_for_enum(value, enum_class):
556574
return value not in enum_class.__members__.values()
557575

576+
558577
# ----------------------------download a file-----------------------------------------------------
559578
@manager.route("/<dataset_id>/documents/<document_id>", methods=["GET"])
560579
@login_required
@@ -563,7 +582,8 @@ def download_document(dataset_id, document_id):
563582
# Check whether there is this dataset
564583
exist, _ = KnowledgebaseService.get_by_id(dataset_id)
565584
if not exist:
566-
return construct_json_result(code=RetCode.DATA_ERROR, message=f"This dataset '{dataset_id}' cannot be found!")
585+
return construct_json_result(code=RetCode.DATA_ERROR,
586+
message=f"This dataset '{dataset_id}' cannot be found!")
567587

568588
# Check whether there is this document
569589
exist, document = DocumentService.get_by_id(document_id)
@@ -591,8 +611,142 @@ def download_document(dataset_id, document_id):
591611
except Exception as e:
592612
return construct_error_response(e)
593613

594-
# ----------------------------start parsing-----------------------------------------------------
595614

615+
# ----------------------------start parsing a document-----------------------------------------------------
616+
# helper method for parsing
617+
def dummy(prog=None, msg=""):
618+
pass
619+
620+
621+
def doc_parse(binary, doc_name, parser_name, tenant_id):
622+
match parser_name:
623+
case "book":
624+
book.chunk(doc_name, binary=binary, callback=dummy)
625+
case "laws":
626+
laws.chunk(doc_name, binary=binary, callback=dummy)
627+
case "manual":
628+
manual.chunk(doc_name, binary=binary, callback=dummy)
629+
case "naive":
630+
# It's the mode by default, which is general in the front-end
631+
naive.chunk(doc_name, binary=binary, callback=dummy)
632+
case "one":
633+
one.chunk(doc_name, binary=binary, callback=dummy)
634+
case "paper":
635+
paper.chunk(doc_name, binary=binary, callback=dummy)
636+
case "picture":
637+
picture.chunk(doc_name, binary=binary, tenant_id=tenant_id, lang="Chinese", callback=dummy)
638+
case "presentation":
639+
presentation.chunk(doc_name, binary=binary, callback=dummy)
640+
case "qa":
641+
qa.chunk(doc_name, binary=binary, callback=dummy)
642+
case "resume":
643+
resume.chunk(doc_name, binary=binary, callback=dummy)
644+
case "table":
645+
table.chunk(doc_name, binary=binary, callback=dummy)
646+
case _:
647+
return False
648+
649+
return True
650+
651+
652+
@manager.route("/<dataset_id>/documents/<document_id>/status", methods=["POST"])
653+
@login_required
654+
def parse_document(dataset_id, document_id):
655+
try:
656+
# valid dataset
657+
exist, _ = KnowledgebaseService.get_by_id(dataset_id)
658+
if not exist:
659+
return construct_json_result(code=RetCode.DATA_ERROR,
660+
message=f"This dataset '{dataset_id}' cannot be found!")
661+
message = ""
662+
res = get_message_during_parsing_document(document_id, message)
663+
if isinstance(res, str):
664+
message += res
665+
return construct_json_result(code=RetCode.SUCCESS, message=message)
666+
else:
667+
return res
668+
669+
except Exception as e:
670+
return construct_error_response(e)
671+
672+
673+
# ----------------------------start parsing documents-----------------------------------------------------
674+
@manager.route("/<dataset_id>/documents/status", methods=["POST"])
675+
@login_required
676+
def parse_documents(dataset_id):
677+
doc_ids = request.json["doc_ids"]
678+
try:
679+
exist, _ = KnowledgebaseService.get_by_id(dataset_id)
680+
if not exist:
681+
return construct_json_result(code=RetCode.DATA_ERROR,
682+
message=f"This dataset '{dataset_id}' cannot be found!")
683+
684+
def process(doc_ids):
685+
message = ""
686+
# for loop
687+
for id in doc_ids:
688+
res = get_message_during_parsing_document(id, message)
689+
if isinstance(res, str):
690+
message += res
691+
else:
692+
return res
693+
return construct_json_result(data=True, code=RetCode.SUCCESS, message=message)
694+
695+
# two conditions
696+
if doc_ids:
697+
return process(doc_ids)
698+
else:
699+
# documents inside the dataset
700+
docs, total = DocumentService.list_documents_in_dataset(dataset_id, 0, -1, "create_time",
701+
True, "")
702+
doc_ids = [doc["id"] for doc in docs]
703+
return process(doc_ids)
704+
705+
except Exception as e:
706+
return construct_error_response(e)
707+
708+
709+
# helper method for getting message or response when parsing the document
710+
def get_message_during_parsing_document(id, message):
711+
try:
712+
# Check whether there is this document
713+
exist, document = DocumentService.get_by_id(id)
714+
if not exist:
715+
return construct_json_result(message=f"This document '{id}' cannot be found!",
716+
code=RetCode.ARGUMENT_ERROR)
717+
718+
tenant_id = DocumentService.get_tenant_id(id)
719+
if not tenant_id:
720+
return construct_json_result(message="Tenant not found!", code=RetCode.AUTHENTICATION_ERROR)
721+
722+
info = {"run": "1", "progress": 0}
723+
info["progress_msg"] = ""
724+
info["chunk_num"] = 0
725+
info["token_num"] = 0
726+
727+
DocumentService.update_by_id(id, info)
728+
729+
ELASTICSEARCH.deleteByQuery(Q("match", doc_id=id), idxnm=search.index_name(tenant_id))
730+
731+
_, doc_attributes = DocumentService.get_by_id(id)
732+
doc_attributes = doc_attributes.to_dict()
733+
doc_id = doc_attributes["id"]
734+
735+
bucket, doc_name = File2DocumentService.get_minio_address(doc_id=doc_id)
736+
binary = MINIO.get(bucket, doc_name)
737+
parser_name = doc_attributes["parser_id"]
738+
if binary:
739+
res = doc_parse(binary, doc_name, parser_name, tenant_id)
740+
if res is False:
741+
message += f"The parser id: {parser_name} of the document {doc_id} is not supported; "
742+
else:
743+
message += f"Empty data in the document: {doc_name}; "
744+
# failed in parsing
745+
if doc_attributes["status"] == TaskStatus.FAIL.value:
746+
message += f"Failed in parsing the document: {doc_id}; "
747+
return message
748+
except Exception as e:
749+
return construct_error_response(e)
596750
# ----------------------------stop parsing-----------------------------------------------------
597751

598752
# ----------------------------show the status of the file-----------------------------------------------------
@@ -610,6 +764,3 @@ def download_document(dataset_id, document_id):
610764
# ----------------------------get a specific chunk-----------------------------------------------------
611765

612766
# ----------------------------retrieval test-----------------------------------------------------
613-
614-
615-

sdk/python/ragflow/ragflow.py

+12
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,19 @@ def download_file(self, dataset_id, document_id):
142142
with open(file_path, "wb") as file:
143143
file.write(content)
144144
return {"code": RetCode.SUCCESS, "data": content}
145+
145146
# ----------------------------start parsing-----------------------------------------------------
147+
def start_parsing_document(self, dataset_id, document_id):
148+
endpoint = f"{self.dataset_url}/{dataset_id}/documents/{document_id}/status"
149+
res = requests.post(endpoint, headers=self.authorization_header)
150+
151+
return res.json()
152+
153+
def start_parsing_documents(self, dataset_id, doc_ids=None):
154+
endpoint = f"{self.dataset_url}/{dataset_id}/documents/status"
155+
res = requests.post(endpoint, headers=self.authorization_header, json={"doc_ids": doc_ids})
156+
157+
return res.json()
146158

147159
# ----------------------------stop parsing-----------------------------------------------------
148160

sdk/python/test/test_data/lol.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
llll
2+
ooooo
3+
llll

0 commit comments

Comments
 (0)