Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

异步合batch对lod的支持。 #1366

Merged
merged 17 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 30 additions & 16 deletions core/configure/proto/server_configure.proto
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@ message EngineDesc {
required string reloadable_type = 4;
required string model_dir = 5;
repeated int32 gpu_ids = 6;
required int32 runtime_thread_num = 7;
required int32 batch_infer_size = 8;
required int32 enable_batch_align = 9;
optional string version_file = 10;
optional string version_type = 11;
optional string version_file = 7;
optional string version_type = 8;

/*
* Sparse Parameter Service type. Valid types are:
Expand All @@ -39,17 +36,34 @@ message EngineDesc {
LOCAL = 1;
REMOTE = 2;
}
optional SparseParamServiceType sparse_param_service_type = 12;
optional string sparse_param_service_table_name = 13;
optional bool enable_memory_optimization = 14;
optional bool enable_ir_optimization = 15;
optional bool use_trt = 16;
optional bool use_lite = 17;
optional bool use_xpu = 18;
optional bool use_gpu = 19;
optional bool combined_model = 20;
optional bool encrypted_model = 21;
optional bool gpu_multi_stream = 22;
optional SparseParamServiceType sparse_param_service_type = 10;
optional string sparse_param_service_table_name = 11;
optional bool enable_memory_optimization = 12;
optional bool enable_ir_optimization = 13;
optional bool use_trt = 14;
optional bool use_lite = 15;
optional bool use_xpu = 16;
optional bool use_gpu = 17;
optional bool combined_model = 18;
optional bool encrypted_model = 19;
optional bool gpu_multi_stream = 20;

/*
* "runtime_thread_num": n == 0 means don`t use Asynchronous task scheduling
* mode.
* n > 0 means how many Predictor for this engine in Asynchronous task
* scheduling mode.
* "batch_infer_size": the max batch for this engine in Asynchronous task
* scheduling mode.
* "enable_overrun": always put a whole task into the TaskQueue even if the
* total batch is bigger than "batch_infer_size".
* "allow_split_request": allow to split task(which is corresponding to
* request).
*/
optional int32 runtime_thread_num = 30 [ default = 0 ];
optional int32 batch_infer_size = 31 [ default = 32 ];
optional bool enable_overrun = 32 [ default = false ];
optional bool allow_split_request = 33 [ default = true ];
};

// model_toolkit conf
Expand Down
177 changes: 162 additions & 15 deletions core/predictor/framework/bsf-inl.h
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,90 @@
#include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/memory.h"

// this file is included by bsf.h
namespace im {
namespace bsf {

template <typename InItemT, typename OutItemT>
bool Task<InItemT, OutItemT>::task_fetch_init(BatchTasks<TaskT>& batchTask) {
// 双检锁,减少加锁的粒度
if (!fetch_init) {
if (taskmeta_num > 1) {
// 对于task被拆分为多个taskmeta,需要加锁。
AutoMutex lock(task_mut);
task_fetch_create(batchTask);
} else {
// 对于task只有1个taskmeta,不需要加锁。
task_fetch_create(batchTask);
}
}
return true;
}

template <typename InItemT, typename OutItemT>
bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& batchTask) {
if (!fetch_init) {
vector_fetch_lod_index = batchTask.vector_fetch_lod_index;
set_fetch_nobatch_index = batchTask.set_fetch_nobatch_index;
OutVectorT taskMetaOutLodTensor;
size_t fetchvar_num = batchTask._batch_out.size();
for (size_t fetchvar_index = 0; fetchvar_index < fetchvar_num;
++fetchvar_index) {
size_t fetchvar_bytesize_index =
batchTask.fetchvar_bytesize(fetchvar_index);
size_t fetchvar_batch = 0;
// 1. nobatch fetchvar情况
if (set_fetch_nobatch_index.size() > 0 &&
set_fetch_nobatch_index.find(fetchvar_index) !=
set_fetch_nobatch_index.end()) {
fetchvar_batch = 1;
} else if (vector_fetch_lod_index.size() > 0 &&
std::find(vector_fetch_lod_index.begin(),
vector_fetch_lod_index.end(),
fetchvar_index) != vector_fetch_lod_index.end()) {
// lod fetchvar情况,此时无法确定总的shape[0]
// 根据task中的task_num总数开辟task_num个临时空间
// 每个lod型的fetchvar拷贝到对应的临时空间中
// 最后再计算临时空间的总量,合并fetchvar和lod
fetchvar_batch = 0;

} else {
// 普通fetchvar情况,此时该Task总的fetchvar_batch =
// 输入的总的batch_size()
fetchvar_batch = batch_size();
}
paddle::PaddleTensor tensor_out;
tensor_out.name = batchTask._batch_out[fetchvar_index].name;
tensor_out.dtype =
paddle::PaddleDType(batchTask._batch_out[fetchvar_index].dtype);
tensor_out.shape = batchTask._batch_out[fetchvar_index].shape;
tensor_out.shape[0] = fetchvar_batch;
if (fetchvar_batch != 0) {
// 此时 lod 为空。
tensor_out.lod = batchTask._batch_out[fetchvar_index].lod;
// resize all batch memory at one time
size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index;
tensor_out.data.Resize(databuf_size);
} else {
// 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task
// 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy

// 当task被分为多个taskMeta时,需要临时对象记录
// 收齐后再一起合并
if (taskmeta_num > 1) {
taskMetaOutLodTensor.push_back(tensor_out);
}
}
outVectorT_ptr->push_back(tensor_out);
}
// outLodTensorVector实际是一个双层vector
// shape为taskmeta_num * vector_fetch_lod_index.size();
outLodTensorVector.resize(taskmeta_num, taskMetaOutLodTensor);
fetch_init = true;
}
return true;
}

template <typename TaskT>
void* TaskExecutor<TaskT>::thread_entry(void* args) {
ThreadContext<TaskT>* context = static_cast<ThreadContext<TaskT>*>(args);
Expand Down Expand Up @@ -136,7 +217,7 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
}

/*
if (!BatchTasks<TaskT>::check_valid(in, out, _batch_align)) {
if (!BatchTasks<TaskT>::check_valid(in, out, _overrun)) {
LOG(ERROR) << "Invalid input & output";
return TaskHandler<TaskT>::valid_handle();
}
Expand All @@ -156,9 +237,11 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(

task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr;
task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr;
if (!task->task_init()) {
LOG(ERROR) << "task->init() failed";
}
task->rem = task->batch_size();
task->index.store(0, butil::memory_order_relaxed);

AutoMutex lock(_mut);
_task_queue.push_back(task);
THREAD_COND_SIGNAL(&_cond);
Expand All @@ -168,11 +251,12 @@ TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(

// this function is accessed by multi thread.
// so AutoMutex at first.
// so batch.append_task is thread safe.
// so batchTask.append_task is thread safe.
// you dont need to add extra lock in append_task()
// task is already init.
template <typename TaskT>
bool TaskExecutor<TaskT>::move_task_to_batch(
BatchTasks<TaskT>& batch) { // NOLINT
BatchTasks<TaskT>& batchTask) { // NOLINT
AutoMutex lock(_mut);
while (_task_queue.empty()) {
THREAD_COND_WAIT(&_cond, &_mut);
Expand All @@ -183,15 +267,65 @@ bool TaskExecutor<TaskT>::move_task_to_batch(
return false;
}

TaskT* previous_task = nullptr;
while (!_task_queue.empty()) {
TaskT* task = _task_queue.front();
size_t rem = batch.append_task(task);

// 由于无法确定fetchVar是否为lod(即使输入是非lod,输出也可能是lod)
// 简单的处理方法是:task不能被拆分,即用户的请求可以合并一起预测,但不能拆分两个小部分去预测。
// 只需要设置engine的属性allow_split_request = false即可。

// 复杂的处理方法是允许拆分Task,无论是否包含lod.
// 难点:预测前,能够知道被拆成了几个taskmeta,但只有预测后,才知道有多少个fetchvar,多少个lod的fetchvar
// 所以,task中先要创建taskmeta_num* fetchvar
// num(lod类型的)个临时PaddleTensor(存储data及Lod)
// 由于多线程调度的单位是taskmeta,故只能在notify_task中,用taskmeta->task去创建
// 此时由于多个taskmeta对应一个task,存在多线程竞争,所以需要在task中加锁。
// 原子操作不可行,因为多个线程必须等待创建好上述的PaddleTensor后才能继续。
// 对于普通的fetch,也需要加锁去创建PaddleTensor,后续才能往里拷贝。

// _overrun表示,异步BatchTasks是否允许单次临时超过限制。
// _overrun为true时,即使BatchTasks剩下1-batch,也会全放入一个完整的Task,允许临时超限。
// _overrun为false时,不允许。
// 对于模型本身有最大Batch限制的情况,应将该值设为false,默认为false。
// 对于模型本身无最大Batch限制,但自己设置了BatchTasks的最大Batch,可以考虑设置为True。

// _allow_split_request ==
// true,则允许拆分task.BatchTasks剩下1-batch,则会从下一个Task中拆出1-Batch
// _allow_split_request ==
// false,则每个task不会被拆分。BatchTasks剩下1-batch会被浪费
// 默认为true,允许拆分task从而使得空间利用率最大。
if (!batchTask.get_allow_split_request()) {
if (task->batch_size() > batchTask.get_rem_size() &&
!batchTask.get_overrun()) {
break;
}
}

// combine_task_valid负责判断是否能够合并
// 除最外层的shape外,内层shape应一致才能合并。
// 否则跳出循环,放入下一个batchTask中。
// 以此保证batch.append_task(task)中的task的内层shape相同。

// 对于Shape[0] = 1 而!=batch的情况,因为合并时,取其中一个的值
// 所以要求该feedvar必须相等,才能合并。
// 否则跳出循环,放入下一个batchTask中。
// 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存.
// TODO(HexToString): 可以考虑后期支持AutoPadding.
if (previous_task != nullptr) {
if (!task->combine_task_valid(previous_task)) {
break;
}
}
size_t rem = batchTask.append_task(task);
previous_task = task;
if (task->rem <= 0) {
_task_queue.pop_front();
}
if (rem <= 0) break;
}

LOG(INFO) << "Number of tasks remaining in _task_queue is"
<< _task_queue.size();
return true;
}

Expand All @@ -201,11 +335,12 @@ bool TaskExecutor<TaskT>::move_task_to_batch(
// TaskT is from the SingleTon TaskExecutor`s _task_queue
// although TaskMeta is a local variable, but several TaskMeta may points to
// the same TaskT which is get from the SingleTon TaskExecutor`s _task_queue.
// put TaskMeta to the local variable BatchTasks<TaskT> batch.
// put TaskMeta to the local variable BatchTasks<TaskT> batchTask.

// batch.merge_tasks() and batch.notify_tasks() has no lock.
// BatchTasks<TaskT> batch itself is a local variable, it`s thread safe.
// If batch.merge_tasks() and batch.notify_tasks() do something to TaskMeta
// batchTask.merge_tasks() and batchTask.notify_tasks() has no lock.
// BatchTasks<TaskT> batchTask itself is a local variable, it`s thread safe.
// If batchTask.merge_tasks() and batchTask.notify_tasks() do something to
// TaskMeta
// you need to pay attention to that.
// Multi-Thread deal with different TaskMeta(cause it`s created as local
// variable)
Expand Down Expand Up @@ -242,11 +377,23 @@ int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
return -1;
}

BatchTasks<TaskT> batch(_batch_size, _batch_align);
if (move_task_to_batch(batch)) {
batch.merge_tasks();
_fn(&batch.in(), &batch.out());
batch.notify_tasks();
// move_task_to_batch() take the original task from the `_task_queue`
// put the original task into its own Vector<taskmeta>
// the capacity of its own Vector<taskmeta> is decided by `_batch_size` or
// `_overrun`

// merge_tasks() move the imput-data into `_batch_in` from its own
// Vector<taskmeta>.
// because the predictor`s input is the `_batch_in`

// notify_tasks() move the output-data into every single taskmeta from
// `_batch_out`.
// because the predictor`s output is the `_batch_out`
BatchTasks<TaskT> batchTask(_batch_size, _overrun, _allow_split_request);
if (move_task_to_batch(batchTask)) {
batchTask.merge_tasks();
_fn(&batchTask.in(), &batchTask.out());
batchTask.notify_tasks();
}
}

Expand Down
Loading