-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
一个服务化的可多GPU并行处理的方案(基于LitServe) #667
Comments
你好,我在运行代码时,服务器端一直报Exception: Parsing error: 'Layoutlmv3_Predictor' object has no attribute 'parameters',客户端一直报requests.exceptions.HTTPError: 500 Server Error: Internal Server Error for url: http://127.0.0.1:8000/predict |
看样子是你的处理代码有问题,不是服务的问题 |
使用这个代码后,表格识别变得巨慢,是什么原因呢? |
你不使用服务化的方式,用magic-pdf cli的方式慢吗? |
这样的话速度是正常的,表格识别用的TableMaster |
代码实际上没看懂咋用,就习惯性地先开server.py,把client.py里面的文件路径改成自己的再启动。结果发现报错和small_ocr.pdf有关,明明我要处理的文件都没有small_ocr.pdf了,不知道如何解决。 |
应该是你的代码改错了吧,我这边正常运行,改了文件路径怎么可能还有small_ocr.pdf,这只是个example file @PoisonousBromineChan |
请问您这边跑的时候表格识别速度正常吗? |
表格我还没验证过,有时间我试试看 |
问题描述:
系统&环境:
magic-pdf.json配置
实验pdf链接: https://github.com/opendatalab/MinerU/blob/master/demo/demo1.pdf 使用litserve 输出日志为:
使用命令行
输出日志为:
|
确实是这个原因,里面写死了匹配的规则,我们修一下这里 use_gpu = True if device.startswith("cuda") else False |
问题描述: 参考server.py提供接口,15并发4gpu压测,发现gpu[0]总是爆满,其他gpu都是相对空闲。 期望结果: gpu的压力均分 实验过程执行:
输出日志:
|
@234687552 你这边是打开了表格识别了吗,如果打开了可以试试关闭表格识别,再测一下负载均衡,这样可以定位是不是表格识别的问题。 |
简单的方法是在调用client里面的do_parse函数时传入这些参数就可以了,不需要修改server的代码 |
情况描述: 之前是开启了表格识别:"is_table_recog_enable": true, 关闭后测试:gpu[0] 不会一直持续爆满,其他gpu相对均衡运转 关闭表格识别 cat ~/magic-pdf.json
gpu使用情况 nvidia-smi --loop=1
|
这边实际情况是必须开启表格识别的,现在不知道如何处理让表格识别也均衡单机使用多gpu |
看来我的猜测是对的,还是因为表格识别的bug引起的,可能还是在代码的某个地方,表格模型还是以.cuda的方式load的,还是没有正确识别到cuda:1这种。导致所有的表格模型都load到了gpu 0上,因而gpu 0爆满。 |
对于TableMaster表格识别模型,以下是存在bug的地方: 对于struct_eqtable表格模型,以下是存在bug的地方: |
paddle框架指定gpu的方式和torch框架不一致,目前paddle都是使用第一张卡去加速的,目前我们的开发重心还在提高解析质量上,暂时分不出人力优化多卡分配的逻辑,欢迎有能力解决多卡分配问题的开发者提交pr |
server.py import os
import torch
import filetype
import json, uuid
import litserve as ls
from fastapi import HTTPException
from magic_pdf.tools.common import do_parse
from magic_pdf.model.doc_analyze_by_custom_model import ModelSingleton
class MinerUAPI(ls.LitAPI):
def __init__(self, output_dir='/tmp'):
self.output_dir = output_dir
@staticmethod
def clean_memory(device):
import gc
if torch.cuda.is_available():
with torch.cuda.device(device):
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
gc.collect()
def setup(self, device):
device = torch.device(device)
os.environ['CUDA_VISIBLE_DEVICES'] = str(device.index)
model_manager = ModelSingleton()
model_manager.get_model(True, False)
model_manager.get_model(False, False)
print(f'Model initialization complete!')
def decode_request(self, request):
file = request['file'].file.read()
kwargs = json.loads(request['kwargs'])
assert filetype.guess_mime(file) == 'application/pdf'
return file, kwargs
def predict(self, inputs):
try:
pdf_name = str(uuid.uuid4())
do_parse(self.output_dir, pdf_name, inputs[0], [], **inputs[1])
return pdf_name
except Exception as e:
raise HTTPException(status_code=500, detail=f'{e}')
finally:
self.clean_memory(self.device)
def encode_response(self, response):
return {'output_dir': response}
if __name__ == '__main__':
server = ls.LitServer(MinerUAPI(), accelerator='gpu', devices=[0, 1], timeout=False)
server.run(port=8000) magic-pdf.json {
"bucket_info":{
"bucket-name-1":["ak", "sk", "endpoint"],
"bucket-name-2":["ak", "sk", "endpoint"]
},
"models-dir":"/opt/models",
"device-mode":"cuda",
"table-config": {
"model": "TableMaster",
"is_table_recog_enable": true,
"max_time": 400
}
} 试试把server.py改成我提供的新的代码,打开表格识别,再跑一次压测看看,应该是可以了 @234687552 |
情况描述 gpu是均衡分配占用【详看后面的日志和截图】,但是clean_memory有异常堆栈 参考改动如下:
异常堆栈:
gpu使用情况 nvidia-smi --loop=1
|
感谢,看来有进展!试试把with torch.cuda.device(device):这句话删掉@234687552 |
感谢支持,现在是可以多gpu正常运作了。 |
我也有类似的问题,当我提交了多卡的工作之后,server只在第一个卡上跑,所有的multiprocessor都在这张卡上跑 |
The check |
@randydl 请问我修改了一下这个代码,这样可以支持多张gpu一起运行吗 class MinerUAPI(ls.LitAPI):
if name == 'main': ` |
你可以试试这个 |
@xcvil 我尝试了一下还是只能一个gpu上运行,不管指定哪几个,最后只在指定的第一个gpu上跑,指定1,3,只跑1,指定0,1,只跑0 |
@234687552 你好,请问我尝试了你发的代码,为什么我跑的时候还是只在一个gpu上运行的 |
看看你代码呗 如果slurm/lsf, shell也发一下 |
@xcvil class MinerUAPI(ls.LitAPI):
if name == 'main': def to_b64(file_path): def do_parse(file_path, url='http://127.0.0.1:8000/predict', **kwargs):
def process_pdf_files_concurrently(pdf_files): def process_files_in_batches(directory, batch_size=20): if name == 'main': |
@zxwsd 你试了这个嘛 |
@xcvil 我更换了一个版本,把MinerU从10.5降到0.9.0,然后就可以正常多卡并行了 |
@randydl 大佬能帮忙看下吗,怎么才能支持multipart/form-data 这种类型的数据呢,想部署一个api服务,直接用的serve.py,但是发现怎么也接收不到数据, 这是请求的格式,不知道decode_request 这个方法怎么才能接收到这两个参数 |
请参考https://github.com/opendatalab/MinerU/blob/master/projects/multi_gpu/client.py |
请问,当pdf文件过大时,这个有防止显存爆掉的措施吗 |
按示例代码运行在windows双卡3090,两张卡都产生了显存占用。 magic_pdf==1.1.0 |
直接用遇到了这个问题: TypeError: cannot pickle '_io.BufferedRandom' object |
我和litserve的开发者聊了一下。 他说 使用 starlette == 0.45.3 才能避免这个问题。 参考 :Lightning-AI/LitServe#443 |
请问这个问题最后有定位得到解决吗?? |
解决了,用这里的代码:https://github.com/opendatalab/MinerU/tree/master/projects/multi_gpu |
可以问下 主要是那部分代码解决这个问题吗? |
"No module named 'magic_pdf.libs.MakeContentConfig'" 为什么我会找不到这个包 |
这是旧版本了。新版本你自己查这个变量在哪里。
…---- 回复的原邮件 ----
| 发件人 | ***@***.***> |
| 日期 | 2025年03月10日 11:44 |
| 收件人 | ***@***.***> |
| 抄送至 | ***@***.***>***@***.***> |
| 主题 | Re: [opendatalab/MinerU] 一个服务化的可多GPU并行处理的方案(基于LitServe) (Issue #667) |
感谢,跑通了。额外安装库 pip install python-multipart,然后启动服务器程序就请求成功了。 另外如果希望仅仅输出.md文件来节省存储空间和速度的话可以: from magic_pdf.libs.MakeContentConfig import MakeMode # 添加这行
修改do parse 函数:
do_parse(self.output_dir,
pdf_name, inputs[0],
[],
**inputs[1],
f_draw_span_bbox=False,
f_draw_layout_bbox=False,
f_dump_md=True,
f_dump_middle_json=False,
f_dump_model_json=False,
f_dump_orig_pdf=False,
f_dump_content_list=False,
f_make_md_mode=MakeMode.MM_MD,
f_draw_model_bbox=False)
"No module named 'magic_pdf.libs.MakeContentConfig'" 为什么我会找不到这个包
—
Reply to this email directly, view it on GitHub, or unsubscribe.
You are receiving this because you commented.Message ID: ***@***.***>
a694724555 left a comment (opendatalab/MinerU#667)
感谢,跑通了。额外安装库 pip install python-multipart,然后启动服务器程序就请求成功了。 另外如果希望仅仅输出.md文件来节省存储空间和速度的话可以: from magic_pdf.libs.MakeContentConfig import MakeMode # 添加这行
修改do parse 函数:
do_parse(self.output_dir,
pdf_name, inputs[0],
[],
**inputs[1],
f_draw_span_bbox=False,
f_draw_layout_bbox=False,
f_dump_md=True,
f_dump_middle_json=False,
f_dump_model_json=False,
f_dump_orig_pdf=False,
f_dump_content_list=False,
f_make_md_mode=MakeMode.MM_MD,
f_draw_model_bbox=False)
"No module named 'magic_pdf.libs.MakeContentConfig'" 为什么我会找不到这个包
—
Reply to this email directly, view it on GitHub, or unsubscribe.
You are receiving this because you commented.Message ID: ***@***.***>
|
同样的问题 如何解决呢 |
/谢谢 找到了 |
是用的projects里面的代码么 |
#1895 (comment) |
看了你写的代码,感觉不太对。我给你讲一讲实现多gpu并行处理的核心思路,就是启动多个进程,每个进程分配不同的gpu(用set cuda_visiable_devices来设定,且只能分配一个gpu),每个进程都会执行mineru的处理,进程之间是相互隔离的。 |
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""=================================================
@PROJECT_NAME: database-services
@File : multi_gpu_app.py.py
@Author : JarsonCai
@Date : 2025/3/29 23:10
@Function:
MinerU 多GPU并行推理服务
@Modify History:
@Copyright:Copyright(c) 2025-2027. All Rights Reserved
=================================================="""
import os
import json
import gc
import tempfile
import base64
import shutil
import fitz
import filetype
from typing import Tuple
import torch
import litserve as ls
from pathlib import Path
from fastapi import HTTPException
from glob import glob
from base64 import b64encode
from io import StringIO
from loguru import logger
import magic_pdf.model as model_config
from magic_pdf.config.enums import SupportedPdfParseMethod
from magic_pdf.data.data_reader_writer import DataWriter, FileBasedDataWriter
from magic_pdf.data.dataset import PymuDocDataset
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
from magic_pdf.tools.cli import convert_file_to_pdf
model_config.__use_inside_model__ = True
class MemoryDataWriter(DataWriter):
def __init__(self):
self.buffer = StringIO()
def write(self, path: str, data: bytes) -> None:
if isinstance(data, str):
self.buffer.write(data)
else:
self.buffer.write(data.decode("utf-8"))
def write_string(self, path: str, data: str) -> None:
self.buffer.write(data)
def get_value(self) -> str:
return self.buffer.getvalue()
def close(self):
self.buffer.close()
class MinerUAPI(ls.LitAPI):
def __init__(self, output_dir='/tmp'):
self.output_dir = Path(output_dir)
def setup(self, device):
"""初始化模型,支持多GPU"""
if device.startswith('cuda'):
gpu_id = device.split(':')[-1]
os.environ['CUDA_VISIBLE_DEVICES'] = gpu_id
# 初始化模型相关组件
from magic_pdf.model.doc_analyze_by_custom_model import ModelSingleton
model_manager = ModelSingleton()
model_manager.get_model(True, False)
model_manager.get_model(False, False)
logger.info(f'Model initialization complete on {device}!')
def decode_request(self, request):
"""解析请求参数,保持与原接口一致"""
# 正确解析FormData对象
file = request.get("file", "")
params = request.get('kwargs', {})
params_str = json.dumps(params, ensure_ascii=False, indent=4)
logger.info(f"params:\n {params_str}")
return file, {
"file_name": params.get("file_name", ""),
"parse_method": params.get("parse_method", "ocr"),
"start_page_id": int(params.get("start_page_id", 0)),
"end_page_id": int(params.get("end_page_id")) if params.get("end_page_id") else None,
"is_json_md_dump": params["is_json_md_dump"].lower() == "false",
"output_dir": params.get("output_dir", "output"),
"return_layout": params["return_layout"].lower() == "true",
"return_info": params["return_info"].lower() == "true",
"return_content_list": params["return_content_list"].lower() == "true",
"return_images": params["return_images"].lower() == "true"
}
def predict(self, inputs):
"""核心处理逻辑,保持与原逻辑一致"""
try:
file, params = inputs[0], inputs[1]
# 处理文件
file_name = params['file_name']
pdf_bytes, pdf_name = self.cvt2pdf(file, file_name)
is_json_md_dump = params["is_json_md_dump"]
return_layout = params["return_layout"]
return_info = params["return_info"]
return_content_list = params["return_content_list"]
return_images = params["return_images"]
# 处理PDF
output_path = f"{params['output_dir']}/{pdf_name}"
output_image_path = f"{output_path}/images"
image_writer = FileBasedDataWriter(output_image_path)
# 同步处理
infer_result, pipe_result = self.process_pdf(
pdf_bytes,
params["parse_method"],
image_writer,
params["start_page_id"],
params["end_page_id"]
)
# 生成响应数据
return self._build_response(
infer_result, pipe_result, output_path, output_image_path,
pdf_name, is_json_md_dump, return_layout, return_info,
return_content_list, return_images
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
self.clean_memory()
def encode_response(self, output):
"""保持与原接口一致的响应格式"""
return output
def process_pdf(self, pdf_bytes, parse_method, image_writer, start_page_id, end_page_id):
"""保持与原函数一致的处理逻辑"""
ds = PymuDocDataset(pdf_bytes)
if parse_method == "ocr":
infer_result = ds.apply(doc_analyze, ocr=True, start_page_id=start_page_id, end_page_id=end_page_id)
pipe_result = infer_result.pipe_ocr_mode(image_writer)
elif parse_method == "txt":
infer_result = ds.apply(doc_analyze, ocr=False, start_page_id=start_page_id, end_page_id=end_page_id)
pipe_result = infer_result.pipe_txt_mode(image_writer)
else:
if ds.classify() == SupportedPdfParseMethod.OCR:
infer_result = ds.apply(doc_analyze, ocr=True, start_page_id=start_page_id, end_page_id=end_page_id)
pipe_result = infer_result.pipe_ocr_mode(image_writer)
else:
infer_result = ds.apply(doc_analyze, ocr=False, start_page_id=start_page_id, end_page_id=end_page_id)
pipe_result = infer_result.pipe_txt_mode(image_writer)
return infer_result, pipe_result
def cvt2pdf(self, file_base64, filename: str) -> Tuple[bytes, str]:
try:
if not filename:
raise Exception('No file name provided')
pdf_name = filename.split('.')[0] + '.pdf'
temp_dir = Path(tempfile.mkdtemp())
temp_file = temp_dir.joinpath('tmpfile')
file_bytes = base64.b64decode(file_base64)
file_ext = filetype.guess_extension(file_bytes)
if file_ext in ['pdf', 'jpg', 'png', 'doc', 'docx', 'ppt', 'pptx']:
if file_ext == 'pdf':
return file_bytes, pdf_name
elif file_ext in ['jpg', 'png']:
with fitz.open(stream=file_bytes, filetype=file_ext) as f:
return f.convert_to_pdf(), pdf_name
else:
temp_file.write_bytes(file_bytes)
convert_file_to_pdf(temp_file, temp_dir)
return temp_file.with_suffix('.pdf').read_bytes(), pdf_name
else:
raise Exception('Unsupported file format')
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
def _build_response(self, infer_result, pipe_result, output_path, output_image_path,
pdf_name, is_json_md_dump, return_layout, return_info,
return_content_list, return_images):
"""构建响应数据结构"""
# 内存写入器初始化
content_list_writer = MemoryDataWriter()
md_content_writer = MemoryDataWriter()
middle_json_writer = MemoryDataWriter()
# 数据写入
pipe_result.dump_content_list(content_list_writer, "", "images")
pipe_result.dump_md(md_content_writer, "", "images")
pipe_result.dump_middle_json(middle_json_writer, "")
# 构建响应
data = {
"md_content": md_content_writer.get_value(),
"layout": json.loads(content_list_writer.get_value()) if return_layout else None,
"info": json.loads(middle_json_writer.get_value()) if return_info else None,
"content_list": json.loads(content_list_writer.get_value()) if return_content_list else None,
"images": self._get_images(output_image_path) if return_images else None
}
# 持久化存储
if is_json_md_dump:
self._save_results(output_path, pdf_name, content_list_writer,
md_content_writer, middle_json_writer, infer_result)
# 清理资源
content_list_writer.close()
md_content_writer.close()
middle_json_writer.close()
return data
def _get_images(self, image_path):
"""编码图片数据"""
return {
os.path.basename(p): f"data:image/jpeg;base64,{b64encode(open(p, 'rb').read()).decode()}"
for p in glob(f"{image_path}/*.jpg")
}
def _save_results(self, output_path, pdf_name, *writers):
"""保存结果文件"""
writer = FileBasedDataWriter(output_path)
writer.write_string(f"{pdf_name}_content_list.json", writers[0].get_value())
writer.write_string(f"{pdf_name}.md", writers[1].get_value())
writer.write_string(f"{pdf_name}_middle.json", writers[2].get_value())
writer.write_string(f"{pdf_name}_model.json", json.dumps(writers[3].get_infer_res(), indent=4))
def clean_memory(self):
"""内存清理"""
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
gc.collect()
if __name__ == '__main__':
# 根据容器的环境参数来设置每个GPU的worker数量
# 获取环境变量
workers_per_device = int(os.environ.get('WORKERS_PER_DEVICE', '1'))
server_timeout = int(os.environ.get('SERVER_TIMEOUT', False)) # 从请求进入开始计时,超时则返回504
server = ls.LitServer(
MinerUAPI(),
accelerator='cuda',
devices='auto',
workers_per_device=workers_per_device, # 每个GPU一个worker
timeout=server_timeout
)
server.run(port=8000) 大佬你好,这是我按照你的思路封装的API,为什么和他遇到的问题是一样的,我设置了10个worker,全部都启动到了cuda:0上 |
这个代码的 这一部分:with patch('magic_pdf.model.doc_analyze_by_custom_model.get_device') as mock_obj: |
支持传入jpg、png、pdf路径。批量处理的话大家只需要简单的多线程调用客户端的do_parse函数就可以了,服务端会自动在多个GPU上并行处理。
server.py
client.py
The text was updated successfully, but these errors were encountered: