diff --git a/api/apps/__init__.py b/api/apps/__init__.py index 38f54069f7..d654f3a5f7 100644 --- a/api/apps/__init__.py +++ b/api/apps/__init__.py @@ -54,7 +54,7 @@ #app.config["LOGIN_DISABLED"] = True app.config["SESSION_PERMANENT"] = False app.config["SESSION_TYPE"] = "filesystem" -app.config['MAX_CONTENT_LENGTH'] = os.environ.get("MAX_CONTENT_LENGTH", 128 * 1024 * 1024) +app.config['MAX_CONTENT_LENGTH'] = int(os.environ.get("MAX_CONTENT_LENGTH", 128 * 1024 * 1024)) Session(app) login_manager = LoginManager() diff --git a/api/utils/file_utils.py b/api/utils/file_utils.py index 159090f500..e5979b6114 100644 --- a/api/utils/file_utils.py +++ b/api/utils/file_utils.py @@ -147,7 +147,7 @@ def filename_type(filename): return FileType.PDF.value if re.match( - r".*\.(docx|doc|ppt|pptx|yml|xml|htm|json|csv|txt|ini|xls|xlsx|wps|rtf|hlp|pages|numbers|key|md)$", filename): + r".*\.(docx|ppt|pptx|yml|xml|htm|json|csv|txt|ini|xls|xlsx|wps|rtf|hlp|pages|numbers|key|md)$", filename): return FileType.DOC.value if re.match( diff --git a/conf/mapping.json b/conf/mapping.json index 5cc11dde7a..c8831346aa 100644 --- a/conf/mapping.json +++ b/conf/mapping.json @@ -1,7 +1,7 @@ { "settings": { "index": { - "number_of_shards": 4, + "number_of_shards": 2, "number_of_replicas": 0, "refresh_interval" : "1000ms" }, diff --git a/conf/service_conf.yaml b/conf/service_conf.yaml index 661d181328..c19f8c7a83 100644 --- a/conf/service_conf.yaml +++ b/conf/service_conf.yaml @@ -13,11 +13,16 @@ minio: user: 'rag_flow' password: 'infini_rag_flow' host: 'minio:9000' +redis: + db: 1 + password: 'infini_rag_flow' + host: 'redis:6379' es: hosts: 'http://es01:9200' user_default_llm: factory: 'Tongyi-Qianwen' api_key: 'sk-xxxxxxxxxxxxx' + base_url: '' oauth: github: client_id: xxxxxxxxxxxxxxxxxxxxxxxxx diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 490a055f5f..e34749cc52 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -12,7 +12,6 @@ services: image: infiniflow/ragflow:${RAGFLOW_VERSION} container_name: ragflow-server ports: - - ${SVR_HTTP_PORT}:9380 - ${SVR_HTTP_PORT}:9380 - 80:80 - 443:443 diff --git a/docker/service_conf.yaml b/docker/service_conf.yaml index a277c7245e..c19f8c7a83 100644 --- a/docker/service_conf.yaml +++ b/docker/service_conf.yaml @@ -13,6 +13,10 @@ minio: user: 'rag_flow' password: 'infini_rag_flow' host: 'minio:9000' +redis: + db: 1 + password: 'infini_rag_flow' + host: 'redis:6379' es: hosts: 'http://es01:9200' user_default_llm: diff --git a/rag/app/naive.py b/rag/app/naive.py index fb321a7fce..eb87fca704 100644 --- a/rag/app/naive.py +++ b/rag/app/naive.py @@ -66,6 +66,8 @@ def __call__(self, filename, binary=None, from_page=0, to_page=100000): class Pdf(PdfParser): def __call__(self, filename, binary=None, from_page=0, to_page=100000, zoomin=3, callback=None): + from timeit import default_timer as timer + start = timer() callback(msg="OCR is running...") self.__images__( filename if not binary else binary, @@ -75,8 +77,8 @@ def __call__(self, filename, binary=None, from_page=0, callback ) callback(msg="OCR finished") + cron_logger.info("OCR: {}".format(timer() - start)) - from timeit import default_timer as timer start = timer() self._layouts_rec(zoomin) callback(0.63, "Layout analysis finished.") @@ -90,7 +92,7 @@ def __call__(self, filename, binary=None, from_page=0, self._concat_downward() #self._filter_forpages() - cron_logger.info("paddle layouts:".format( + cron_logger.info("paddle layouts: {}".format( (timer() - start) / (self.total_page + 0.1))) return [(b["text"], self._line_tag(b, zoomin)) for b in self.boxes], tbls diff --git a/rag/settings.py b/rag/settings.py index da022628f4..407a307469 100644 --- a/rag/settings.py +++ b/rag/settings.py @@ -25,6 +25,11 @@ ES = get_base_config("es", {}) MINIO = decrypt_database_config(name="minio") +try: + REDIS = decrypt_database_config(name="redis") +except Exception as e: + REDIS = {} + pass DOC_MAXIMUM_SIZE = 128 * 1024 * 1024 # Logger @@ -39,5 +44,6 @@ es_logger = getLogger("es") minio_logger = getLogger("minio") cron_logger = getLogger("cron_logger") +cron_logger.setLevel(20) chunk_logger = getLogger("chunk_logger") database_logger = getLogger("database") diff --git a/rag/svr/task_broker.py b/rag/svr/task_broker.py index b54792aa98..f44ba3f865 100644 --- a/rag/svr/task_broker.py +++ b/rag/svr/task_broker.py @@ -32,6 +32,7 @@ from api.settings import database_logger from api.utils import get_format_time, get_uuid from api.utils.file_utils import get_project_base_directory +from rag.utils.redis_conn import REDIS_CONN def collect(tm): @@ -84,10 +85,16 @@ def new_task(): tsks = [] try: + file_bin = MINIO.get(r["kb_id"], r["location"]) + if REDIS_CONN.is_alive(): + try: + REDIS_CONN.set("{}/{}".format(r["kb_id"], r["location"]), file_bin, 12*60) + except Exception as e: + cron_logger.warning("Put into redis[EXCEPTION]:" + str(e)) + if r["type"] == FileType.PDF.value: do_layout = r["parser_config"].get("layout_recognize", True) - pages = PdfParser.total_page_number( - r["name"], MINIO.get(r["kb_id"], r["location"])) + pages = PdfParser.total_page_number(r["name"], file_bin) page_size = r["parser_config"].get("task_page_size", 12) if r["parser_id"] == "paper": page_size = r["parser_config"].get("task_page_size", 22) @@ -110,8 +117,7 @@ def new_task(): elif r["parser_id"] == "table": rn = HuExcelParser.row_number( - r["name"], MINIO.get( - r["kb_id"], r["location"])) + r["name"], file_bin) for i in range(0, rn, 3000): task = new_task() task["from_page"] = i diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 667df422b5..9ad044e87b 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -19,13 +19,12 @@ import os import hashlib import copy -import random import re import sys import time import traceback from functools import partial - +from rag.utils import MINIO from api.db.db_models import close_connection from rag.settings import database_logger from rag.settings import cron_logger, DOC_MAXIMUM_SIZE @@ -35,7 +34,7 @@ from multiprocessing.context import TimeoutError from api.db.services.task_service import TaskService from rag.utils import ELASTICSEARCH -from rag.utils import MINIO +from timeit import default_timer as timer from rag.utils import rmSpace, findMaxTm from rag.nlp import search @@ -48,6 +47,7 @@ from api.db.services.document_service import DocumentService from api.db.services.llm_service import LLMBundle from api.utils.file_utils import get_project_base_directory +from rag.utils.redis_conn import REDIS_CONN BATCH_SIZE = 64 @@ -105,11 +105,16 @@ def collect(comm, mod, tm): def get_minio_binary(bucket, name): global MINIO + if REDIS_CONN.is_alive(): + try: + r = REDIS_CONN.get("{}/{}".format(bucket, name)) + if r: return r + except Exception as e: + cron_logger.warning("Get redis[EXCEPTION]:" + str(e)) return MINIO.get(bucket, name) def build(row): - from timeit import default_timer as timer if row["size"] > DOC_MAXIMUM_SIZE: set_progress(row["id"], prog=-1, msg="File size exceeds( <= %dMb )" % (int(DOC_MAXIMUM_SIZE / 1024 / 1024))) @@ -265,6 +270,7 @@ def main(comm, mod): callback( msg="Finished slicing files(%d). Start to embedding the content." % len(cks)) + st = timer() try: tk_count = embedding(cks, embd_mdl, r["parser_config"], callback) except Exception as e: @@ -272,9 +278,10 @@ def main(comm, mod): cron_logger.error(str(e)) tk_count = 0 - callback(msg="Finished embedding! Start to build index!") + callback(msg="Finished embedding({})! Start to build index!".format(timer()-st)) init_kb(r) chunk_count = len(set([c["_id"] for c in cks])) + st = timer() es_r = ELASTICSEARCH.bulk(cks, search.index_name(r["tenant_id"])) if es_r: callback(-1, "Index failure!") @@ -290,8 +297,8 @@ def main(comm, mod): DocumentService.increment_chunk_num( r["doc_id"], r["kb_id"], tk_count, chunk_count, 0) cron_logger.info( - "Chunk doc({}), token({}), chunks({})".format( - r["id"], tk_count, len(cks))) + "Chunk doc({}), token({}), chunks({}), elapsed:{}".format( + r["id"], tk_count, len(cks), timer()-st)) tmf.write(str(r["update_time"]) + "\n") tmf.close() diff --git a/rag/utils/redis_conn.py b/rag/utils/redis_conn.py new file mode 100644 index 0000000000..0fe30202d8 --- /dev/null +++ b/rag/utils/redis_conn.py @@ -0,0 +1,55 @@ +import json + +import redis +import logging +from rag import settings +from rag.utils import singleton + +@singleton +class RedisDB: + def __init__(self): + self.REDIS = None + self.config = settings.REDIS + self.__open__() + + def __open__(self): + try: + self.REDIS = redis.Redis(host=self.config.get("host", "redis").split(":")[0], + port=int(self.config.get("host", ":6379").split(":")[1]), + db=int(self.config.get("db", 1)), + password=self.config.get("password")) + except Exception as e: + logging.warning("Redis can't be connected.") + return self.REDIS + + def is_alive(self): + return self.REDIS is not None + + def get(self, k): + if not self.REDIS: return + try: + return self.REDIS.get(k) + except Exception as e: + logging.warning("[EXCEPTION]get" + str(k) + "||" + str(e)) + self.__open__() + + def set_obj(self, k, obj, exp=3600): + try: + self.REDIS.set(k, json.dumps(obj, ensure_ascii=False), exp) + return True + except Exception as e: + logging.warning("[EXCEPTION]set_obj" + str(k) + "||" + str(e)) + self.__open__() + return False + + def set(self, k, v, exp=3600): + try: + self.REDIS.set(k, v, exp) + return True + except Exception as e: + logging.warning("[EXCEPTION]set" + str(k) + "||" + str(e)) + self.__open__() + return False + + +REDIS_CONN = RedisDB() \ No newline at end of file