Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add redis to accelerate access of minio #482

Merged
merged 1 commit into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/apps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
#app.config["LOGIN_DISABLED"] = True
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem"
app.config['MAX_CONTENT_LENGTH'] = os.environ.get("MAX_CONTENT_LENGTH", 128 * 1024 * 1024)
app.config['MAX_CONTENT_LENGTH'] = int(os.environ.get("MAX_CONTENT_LENGTH", 128 * 1024 * 1024))

Session(app)
login_manager = LoginManager()
Expand Down
2 changes: 1 addition & 1 deletion api/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def filename_type(filename):
return FileType.PDF.value

if re.match(
r".*\.(docx|doc|ppt|pptx|yml|xml|htm|json|csv|txt|ini|xls|xlsx|wps|rtf|hlp|pages|numbers|key|md)$", filename):
r".*\.(docx|ppt|pptx|yml|xml|htm|json|csv|txt|ini|xls|xlsx|wps|rtf|hlp|pages|numbers|key|md)$", filename):
return FileType.DOC.value

if re.match(
Expand Down
2 changes: 1 addition & 1 deletion conf/mapping.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"settings": {
"index": {
"number_of_shards": 4,
"number_of_shards": 2,
"number_of_replicas": 0,
"refresh_interval" : "1000ms"
},
Expand Down
5 changes: 5 additions & 0 deletions conf/service_conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ minio:
user: 'rag_flow'
password: 'infini_rag_flow'
host: 'minio:9000'
redis:
db: 1
password: 'infini_rag_flow'
host: 'redis:6379'
es:
hosts: 'http://es01:9200'
user_default_llm:
factory: 'Tongyi-Qianwen'
api_key: 'sk-xxxxxxxxxxxxx'
base_url: ''
oauth:
github:
client_id: xxxxxxxxxxxxxxxxxxxxxxxxx
Expand Down
1 change: 0 additions & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ services:
image: infiniflow/ragflow:${RAGFLOW_VERSION}
container_name: ragflow-server
ports:
- ${SVR_HTTP_PORT}:9380
- ${SVR_HTTP_PORT}:9380
- 80:80
- 443:443
Expand Down
4 changes: 4 additions & 0 deletions docker/service_conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ minio:
user: 'rag_flow'
password: 'infini_rag_flow'
host: 'minio:9000'
redis:
db: 1
password: 'infini_rag_flow'
host: 'redis:6379'
es:
hosts: 'http://es01:9200'
user_default_llm:
Expand Down
6 changes: 4 additions & 2 deletions rag/app/naive.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def __call__(self, filename, binary=None, from_page=0, to_page=100000):
class Pdf(PdfParser):
def __call__(self, filename, binary=None, from_page=0,
to_page=100000, zoomin=3, callback=None):
from timeit import default_timer as timer
start = timer()
callback(msg="OCR is running...")
self.__images__(
filename if not binary else binary,
Expand All @@ -75,8 +77,8 @@ def __call__(self, filename, binary=None, from_page=0,
callback
)
callback(msg="OCR finished")
cron_logger.info("OCR: {}".format(timer() - start))

from timeit import default_timer as timer
start = timer()
self._layouts_rec(zoomin)
callback(0.63, "Layout analysis finished.")
Expand All @@ -90,7 +92,7 @@ def __call__(self, filename, binary=None, from_page=0,
self._concat_downward()
#self._filter_forpages()

cron_logger.info("paddle layouts:".format(
cron_logger.info("paddle layouts: {}".format(
(timer() - start) / (self.total_page + 0.1)))
return [(b["text"], self._line_tag(b, zoomin))
for b in self.boxes], tbls
Expand Down
6 changes: 6 additions & 0 deletions rag/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@

ES = get_base_config("es", {})
MINIO = decrypt_database_config(name="minio")
try:
REDIS = decrypt_database_config(name="redis")
except Exception as e:
REDIS = {}
pass
DOC_MAXIMUM_SIZE = 128 * 1024 * 1024

# Logger
Expand All @@ -39,5 +44,6 @@
es_logger = getLogger("es")
minio_logger = getLogger("minio")
cron_logger = getLogger("cron_logger")
cron_logger.setLevel(20)
chunk_logger = getLogger("chunk_logger")
database_logger = getLogger("database")
14 changes: 10 additions & 4 deletions rag/svr/task_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from api.settings import database_logger
from api.utils import get_format_time, get_uuid
from api.utils.file_utils import get_project_base_directory
from rag.utils.redis_conn import REDIS_CONN


def collect(tm):
Expand Down Expand Up @@ -84,10 +85,16 @@ def new_task():

tsks = []
try:
file_bin = MINIO.get(r["kb_id"], r["location"])
if REDIS_CONN.is_alive():
try:
REDIS_CONN.set("{}/{}".format(r["kb_id"], r["location"]), file_bin, 12*60)
except Exception as e:
cron_logger.warning("Put into redis[EXCEPTION]:" + str(e))

if r["type"] == FileType.PDF.value:
do_layout = r["parser_config"].get("layout_recognize", True)
pages = PdfParser.total_page_number(
r["name"], MINIO.get(r["kb_id"], r["location"]))
pages = PdfParser.total_page_number(r["name"], file_bin)
page_size = r["parser_config"].get("task_page_size", 12)
if r["parser_id"] == "paper":
page_size = r["parser_config"].get("task_page_size", 22)
Expand All @@ -110,8 +117,7 @@ def new_task():

elif r["parser_id"] == "table":
rn = HuExcelParser.row_number(
r["name"], MINIO.get(
r["kb_id"], r["location"]))
r["name"], file_bin)
for i in range(0, rn, 3000):
task = new_task()
task["from_page"] = i
Expand Down
21 changes: 14 additions & 7 deletions rag/svr/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
import os
import hashlib
import copy
import random
import re
import sys
import time
import traceback
from functools import partial

from rag.utils import MINIO
from api.db.db_models import close_connection
from rag.settings import database_logger
from rag.settings import cron_logger, DOC_MAXIMUM_SIZE
Expand All @@ -35,7 +34,7 @@
from multiprocessing.context import TimeoutError
from api.db.services.task_service import TaskService
from rag.utils import ELASTICSEARCH
from rag.utils import MINIO
from timeit import default_timer as timer
from rag.utils import rmSpace, findMaxTm

from rag.nlp import search
Expand All @@ -48,6 +47,7 @@
from api.db.services.document_service import DocumentService
from api.db.services.llm_service import LLMBundle
from api.utils.file_utils import get_project_base_directory
from rag.utils.redis_conn import REDIS_CONN

BATCH_SIZE = 64

Expand Down Expand Up @@ -105,11 +105,16 @@ def collect(comm, mod, tm):

def get_minio_binary(bucket, name):
global MINIO
if REDIS_CONN.is_alive():
try:
r = REDIS_CONN.get("{}/{}".format(bucket, name))
if r: return r
except Exception as e:
cron_logger.warning("Get redis[EXCEPTION]:" + str(e))
return MINIO.get(bucket, name)


def build(row):
from timeit import default_timer as timer
if row["size"] > DOC_MAXIMUM_SIZE:
set_progress(row["id"], prog=-1, msg="File size exceeds( <= %dMb )" %
(int(DOC_MAXIMUM_SIZE / 1024 / 1024)))
Expand Down Expand Up @@ -265,16 +270,18 @@ def main(comm, mod):
callback(
msg="Finished slicing files(%d). Start to embedding the content." %
len(cks))
st = timer()
try:
tk_count = embedding(cks, embd_mdl, r["parser_config"], callback)
except Exception as e:
callback(-1, "Embedding error:{}".format(str(e)))
cron_logger.error(str(e))
tk_count = 0

callback(msg="Finished embedding! Start to build index!")
callback(msg="Finished embedding({})! Start to build index!".format(timer()-st))
init_kb(r)
chunk_count = len(set([c["_id"] for c in cks]))
st = timer()
es_r = ELASTICSEARCH.bulk(cks, search.index_name(r["tenant_id"]))
if es_r:
callback(-1, "Index failure!")
Expand All @@ -290,8 +297,8 @@ def main(comm, mod):
DocumentService.increment_chunk_num(
r["doc_id"], r["kb_id"], tk_count, chunk_count, 0)
cron_logger.info(
"Chunk doc({}), token({}), chunks({})".format(
r["id"], tk_count, len(cks)))
"Chunk doc({}), token({}), chunks({}), elapsed:{}".format(
r["id"], tk_count, len(cks), timer()-st))

tmf.write(str(r["update_time"]) + "\n")
tmf.close()
Expand Down
55 changes: 55 additions & 0 deletions rag/utils/redis_conn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import json

import redis
import logging
from rag import settings
from rag.utils import singleton

@singleton
class RedisDB:
def __init__(self):
self.REDIS = None
self.config = settings.REDIS
self.__open__()

def __open__(self):
try:
self.REDIS = redis.Redis(host=self.config.get("host", "redis").split(":")[0],
port=int(self.config.get("host", ":6379").split(":")[1]),
db=int(self.config.get("db", 1)),
password=self.config.get("password"))
except Exception as e:
logging.warning("Redis can't be connected.")
return self.REDIS

def is_alive(self):
return self.REDIS is not None

def get(self, k):
if not self.REDIS: return
try:
return self.REDIS.get(k)
except Exception as e:
logging.warning("[EXCEPTION]get" + str(k) + "||" + str(e))
self.__open__()

def set_obj(self, k, obj, exp=3600):
try:
self.REDIS.set(k, json.dumps(obj, ensure_ascii=False), exp)
return True
except Exception as e:
logging.warning("[EXCEPTION]set_obj" + str(k) + "||" + str(e))
self.__open__()
return False

def set(self, k, v, exp=3600):
try:
self.REDIS.set(k, v, exp)
return True
except Exception as e:
logging.warning("[EXCEPTION]set" + str(k) + "||" + str(e))
self.__open__()
return False


REDIS_CONN = RedisDB()