Skip to content

Commit 03646ac

Browse files
huajsjsunggg
authored andcommitted
[Runtime][PipelineExecutor] Add Pipeline Executor Interface (apache#10010)
Adding interfaces into Pipeline Executor to "run", "stop","set input", and "get input" from the pipeline executor, In this patch, we also implemented the "BackendRuntime" structure to wrap the graph runtime interface in order to support pipeline executor interface and implement data copy method. This method is used to transfer data between two backend runtimes.
1 parent eb344fd commit 03646ac

File tree

7 files changed

+271
-11
lines changed

7 files changed

+271
-11
lines changed

python/tvm/contrib/pipeline_executor.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,22 @@ def __init__(self, module):
115115
else:
116116
self.module = module
117117
# Get the packed functions from the pipeline executor.
118-
self._get_num_outputs = self.module["get_num_outputs"]
119-
self._get_input_pipeline_map = self.module["get_input_pipeline_map"]
120118
self._get_params_group_pipeline_map = self.module["get_params_group_pipeline_map"]
119+
self._run = self.module["run"]
120+
self._stop = self.module["stop"]
121121
self._set_param = self.module["set_param"]
122+
self._set_input = self.module["set_input"]
123+
self._get_input = self.module["get_input"]
124+
self._get_num_outputs = self.module["get_num_outputs"]
125+
self._get_input_pipeline_map = self.module["get_input_pipeline_map"]
126+
127+
def run(self, sync=False):
128+
"""Run the pipeline executor."""
129+
self._run(sync)
130+
131+
def stop(self):
132+
"""Stop the pipeline executor."""
133+
self._stop()
122134

123135
def get_input_pipeline_map(self, name):
124136
"""Using the "name" to get the corresponding subgraph index and also get the "input name"
@@ -145,6 +157,21 @@ def get_params_group_pipeline_map(self, name):
145157
"""
146158
return self._get_params_group_pipeline_map(name)
147159

160+
def set_input(self, key, value):
161+
"""Set the input via input name.
162+
163+
Parameters
164+
----------
165+
key : str
166+
The input name
167+
value : array_like.
168+
The input value
169+
"""
170+
v = self._get_input(key)
171+
if v is None:
172+
raise RuntimeError("Could not find '%s' in pipeline's inputs" % key)
173+
v.copyfrom(value)
174+
148175
def set_params(self, params_group_name, params_data):
149176
"""Set the parameter group value given the parameter group name. Note that the parameter
150177
group name is declared in the pipeline executor config.
@@ -163,6 +190,19 @@ def set_params(self, params_group_name, params_data):
163190
for key, val in params_data.items():
164191
self._set_param(params_group_name, key, val)
165192

193+
def get_input(self, key):
194+
"""Get the input via an input name.
195+
Parameters
196+
----------
197+
key : str
198+
The input key
199+
Returns
200+
-------
201+
data : NDArray
202+
The input data.
203+
"""
204+
return self._get_input(key)
205+
166206
@property
167207
def num_outputs(self):
168208
"""Get the number of outputs.

src/runtime/pipeline/pipeline_executor.cc

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,59 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name,
5858
LOG(FATAL) << "Function only support the parameter name and the key in the form of string";
5959
}
6060
});
61+
} else if (name == "set_input") {
62+
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) {
63+
if (String::CanConvertFrom(args[0])) {
64+
this->SetInput(args[0].operator String(), args[1]);
65+
} else {
66+
LOG(FATAL) << "Function only support the input name value in the form of string";
67+
}
68+
});
69+
} else if (name == "get_input") {
70+
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) {
71+
if (String::CanConvertFrom(args[0])) {
72+
*rv = this->GetInput(args[0].operator String());
73+
} else {
74+
LOG(FATAL) << "Function only support the input name value in the form of string";
75+
}
76+
});
77+
} else if (name == "run") {
78+
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(args[0]); });
79+
} else if (name == "stop") {
80+
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Stop(); });
6181
} else {
6282
LOG(FATAL) << "Unknown packed function: " << name;
6383
return PackedFunc();
6484
}
6585
return nullptr;
6686
}
6787

88+
/*!
89+
* \brief set input to the runtime module.
90+
* \param input_name The input name.
91+
* \param data_in The input data.
92+
*/
93+
void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) {
94+
std::pair<int, int> indexs = this->GetInputIndex(input_name);
95+
if (indexs.first < 0 || indexs.first >= static_cast<int>(runtimes_.size())) {
96+
this->Stop();
97+
LOG(FATAL) << "input name " << input_name << " not found.";
98+
}
99+
runtimes_[indexs.first]->SetInput(indexs.second, data_in);
100+
}
101+
/*!
102+
* \brief get input from the runtime module.
103+
* \param input_name The input name.
104+
* \return Return the input data for a specific input name.
105+
*/
106+
NDArray PipelineExecutor::GetInput(std::string input_name) {
107+
std::pair<int, int> indexs = this->GetInputIndex(input_name);
108+
if (indexs.first < 0 || indexs.first >= static_cast<int>(runtimes_.size())) {
109+
this->Stop();
110+
LOG(FATAL) << "input name " << input_name << " not found.";
111+
}
112+
return runtimes_[indexs.first]->GetInput(indexs.second);
113+
}
68114
/*!
69115
* \brief Using the global input name to get the index, and also get the input interface name
70116
of corresponding subgraph from the input connection configuration.
@@ -85,6 +131,20 @@ int PipelineExecutor::GetParamsGroupPipelineMap(const std::string& name) {
85131
return param_connection_config[name];
86132
}
87133

134+
/*!
135+
* \brief Run the pipeline executor.
136+
* \param serialized_mode Whether run the pipeline executor in serialized mode.
137+
*/
138+
void PipelineExecutor::Run(bool serialized_mode) {
139+
// TODO(huajsj): Run the pipeline executor.
140+
}
141+
/*!
142+
* \brief Stop the pipeline executor.
143+
*/
144+
void PipelineExecutor::Stop() {
145+
// TODO(huajsj): Stop the pipeline executor.
146+
}
147+
88148
/*!
89149
* \brief Use the mod_config information to create a graph runtime list.
90150
* \param mod_config The config information that generates by the export library function call.
@@ -152,6 +212,16 @@ void PipelineExecutor::SetParam(std::string param_group_name, std::string param_
152212
int module_index = this->GetParamsGroupPipelineMap(param_group_name);
153213
// TODO(huajsj): set the parameters into runtime module.
154214
}
215+
/*!
216+
* \brief Return the input index and module index for a given input name.
217+
* \param name The input name.
218+
* \return std::pair<int, int> A pair of module index and the input index.
219+
*/
220+
std::pair<int, int> PipelineExecutor::GetInputIndex(const std::string& name) {
221+
std::pair<int, std::string> index = input_connection_config[name];
222+
auto gruntime = runtimes_[index.first];
223+
return std::make_pair(index.first, gruntime->GetInputIndex(index.second));
224+
}
155225
/*!
156226
* \brief Initialize the pipeline executor with a list of modules to be pipelined
157227
* and config in JSON format.
@@ -165,9 +235,10 @@ void PipelineExecutor::Init(const std::vector<Module>& modules, const std::strin
165235
dmlc::JSONReader reader(&is);
166236
this->LoadConfig(&reader);
167237
ICHECK(!pipeline_config_.Empty()) << "The pipeline config information is empty.";
238+
num_outputs_ = pipeline_config_.GetGlobalOutputNum();
168239
// Initialize the pipeline function class used for pipeline thread pool management
169-
// and schedule etc. This function returns the number of output.
170-
num_outputs_ = pipeline_scheduler_.PipelineInit(modules, pipeline_config_);
240+
// and schedule etc. This function returns a list of runtime.
241+
runtimes_ = pipeline_scheduler_.PipelineInit(modules, pipeline_config_);
171242
return;
172243
}
173244

src/runtime/pipeline/pipeline_executor.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include <array>
3131
#include <iostream>
32+
#include <memory>
3233
#include <sstream>
3334
#include <string>
3435
#include <utility>
@@ -82,6 +83,18 @@ class TVM_DLL PipelineExecutor : public ModuleNode {
8283
* \return Returning a runtime module index.
8384
*/
8485
int GetParamsGroupPipelineMap(const std::string& name);
86+
/*!
87+
* \brief Use the input name to set the input data of pipeline executor.
88+
* \param input_name The input name.
89+
* \param data_in The input data.
90+
*/
91+
void SetInput(std::string input_name, DLTensor* data_in);
92+
/*!
93+
* \brief Use the input name to get the input data.
94+
* \param input name The input name.
95+
* \return Return input data.
96+
*/
97+
NDArray GetInput(std::string input_name);
8598
/*!
8699
* \brief Use the parameters group name to get the specific backend runtime then use
87100
* the param_key_name to set param data for the said backend runtime.
@@ -96,6 +109,22 @@ class TVM_DLL PipelineExecutor : public ModuleNode {
96109
* \return The number of outputs.
97110
*/
98111
int NumOutputs() const { return num_outputs_; }
112+
/*!
113+
* \brief Run the pipeline executor.
114+
* \param serialized_mode Whether run the pipeline executor in serialized mode.
115+
*/
116+
void Run(bool serialized_mode);
117+
/*!
118+
* \brief Stop the pipeline executor.
119+
*/
120+
void Stop();
121+
/*!
122+
* \brief A pipeline input with a specific name correspond with a input of a specific
123+
* backend module, this function return a module index and a input index in "pair"
124+
* form for a input name.
125+
* return Return a module index and a input index.
126+
*/
127+
std::pair<int, int> GetInputIndex(const std::string& name);
99128
/*!\brief Load the module files information.*/
100129
ModuleConfig& LoadModuleConfig(dmlc::JSONReader* reader) {
101130
reader->BeginArray();
@@ -145,6 +174,8 @@ class TVM_DLL PipelineExecutor : public ModuleNode {
145174
ModuleConfig mod_config_;
146175
/*!\brief How many outputs are in this pipeline executor.*/
147176
size_t num_outputs_ = 0;
177+
/*!The list of backend runtime module.*/
178+
std::vector<std::shared_ptr<BackendRuntime>> runtimes_;
148179
/*!\brief Json loader.*/
149180
void LoadConfig(dmlc::JSONReader* reader) {
150181
reader->BeginObject();

src/runtime/pipeline/pipeline_scheduler.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,15 @@ namespace runtime {
2727
* \param modules The list of graph executor modules.
2828
* \param pipeline_conf The dependency information of each graph executor module.
2929
*/
30-
size_t PipelineScheduler::PipelineInit(const std::vector<Module>& modules,
31-
const ConfigPipelineExecution& pipeline_config) {
30+
std::vector<std::shared_ptr<BackendRuntime>> PipelineScheduler::PipelineInit(
31+
const std::vector<Module>& modules, const ConfigPipelineExecution& pipeline_config) {
32+
std::vector<std::shared_ptr<BackendRuntime>> runtimes;
3233
graph_modules_ = modules;
33-
int num_output = pipeline_config.GetGlobalOutputNum();
34-
return num_output;
34+
for (size_t i = 0; i < graph_modules_.size(); i++) {
35+
auto runItem = std::make_shared<BackendRuntime>(graph_modules_[i], i);
36+
runtimes.push_back(runItem);
37+
}
38+
return runtimes;
3539
}
3640
} // namespace runtime
3741
} // namespace tvm

src/runtime/pipeline/pipeline_scheduler.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ class PipelineScheduler {
4141
* \param modules The list of graph executor module.
4242
* \param pipeline_config The dependency information of each graph executor module.
4343
*/
44-
size_t PipelineInit(const std::vector<Module>& modules,
45-
const ConfigPipelineExecution& pipeline_config);
44+
std::vector<std::shared_ptr<BackendRuntime>> PipelineInit(
45+
const std::vector<Module>& modules, const ConfigPipelineExecution& pipeline_config);
4646

4747
private:
4848
/*!\brief The list of graph executors.*/

src/runtime/pipeline/pipeline_struct.h

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,16 @@
2121
#include <assert.h>
2222
#include <dlpack/dlpack.h>
2323
#include <dmlc/json.h>
24+
#include <tvm/runtime/ndarray.h>
25+
#include <tvm/runtime/packed_func.h>
2426

2527
#include <limits>
2628
#include <string>
2729
#include <unordered_map>
2830
#include <utility>
2931
#include <vector>
32+
namespace tvm {
33+
namespace runtime {
3034
/*!
3135
* \brief All binding information of a output interface.
3236
*/
@@ -292,7 +296,106 @@ struct ParamConnectionConfig {
292296
}
293297
}
294298
};
299+
/*
300+
*\brief Backend Runtime.
301+
*/
302+
class BackendRuntime {
303+
private:
304+
/*\brief The index of runtime indicates the runtime position in the pipeline.*/
305+
int runtime_idx_;
306+
/*\brief The Runtime module of a backend graph executor.*/
307+
Module module_;
308+
/*!
309+
*\brief In order to transfer data from one backend runtime to another, we need a local
310+
* tensor variable as a medium. "input_tensor_local_copy_" is a map including
311+
* input data and local tensor vairable.
312+
*/
313+
std::unordered_map<DLTensor*, DLTensor*> input_tensor_local_copy_;
314+
/*!\brief The packed functions.*/
315+
tvm::runtime::PackedFunc set_input_;
316+
tvm::runtime::PackedFunc get_input_;
317+
tvm::runtime::PackedFunc get_num_output_;
318+
tvm::runtime::PackedFunc get_num_inputs_;
319+
tvm::runtime::PackedFunc get_input_index_;
320+
/*!
321+
* \brief Copying from a given tensor and using 'CPU' as the device.
322+
*/
323+
inline DLTensor* CopyDLTensorToCPU(const DLTensor* from) {
324+
DLTensor* ret = NULL;
325+
TVMArrayAlloc(from->shape, from->ndim, from->dtype.code, from->dtype.bits, from->dtype.lanes,
326+
kDLCPU, 0, &ret);
327+
return ret;
328+
}
329+
/*!\brief Creating a new NDArray with same shape and data type as the given DLTensor.*/
330+
NDArray CreateNDArrayFromDLTensor(const DLTensor* from) {
331+
std::vector<int64_t> shape;
332+
for (int i = 0; i < from->ndim; i++) {
333+
shape.push_back(from->shape[i]);
334+
}
335+
auto ndarray = NDArray::Empty(shape, from->dtype, from->device);
336+
ndarray.CreateView(shape, from->dtype);
337+
return ndarray;
338+
}
339+
/*
340+
*\brief Copying data from one DLTensor to another.
341+
*/
342+
void CopyFromTo(DLTensor* from, DLTensor* to) {
343+
// When the 'from' device and the 'to' device are not the same, we use a temporary CPU
344+
// DLTensor as the bridge.
345+
if (from->device.device_type != to->device.device_type && from->device.device_type != kDLCPU &&
346+
to->device.device_type != kDLCPU) {
347+
DLTensor* dltensor_local = nullptr;
348+
if (input_tensor_local_copy_.find(to) == input_tensor_local_copy_.end()) {
349+
dltensor_local = CopyDLTensorToCPU(from);
350+
input_tensor_local_copy_[to] = dltensor_local;
351+
} else {
352+
dltensor_local = input_tensor_local_copy_[to];
353+
}
354+
TVMArrayCopyFromTo(from, dltensor_local, nullptr);
355+
from = dltensor_local;
356+
}
295357

358+
TVMArrayCopyFromTo(from, to, nullptr);
359+
}
360+
361+
public:
362+
BackendRuntime(Module mod, int mod_idx) {
363+
module_ = mod;
364+
runtime_idx_ = mod_idx;
365+
get_input_index_ = module_.GetFunction("get_input_index");
366+
get_num_output_ = module_.GetFunction("get_num_outputs");
367+
get_num_inputs_ = module_.GetFunction("get_num_inputs");
368+
set_input_ = module_.GetFunction("set_input");
369+
get_input_ = module_.GetFunction("get_input");
370+
}
371+
BackendRuntime(void) {}
372+
~BackendRuntime() {
373+
for (auto data : input_tensor_local_copy_) {
374+
TVMArrayFree(data.second);
375+
}
376+
}
377+
/*!\brief Return the index of the current module.*/
378+
int GetModuleIndex() { return runtime_idx_; }
379+
/*!\brief Return the number of output*/
380+
int NumOutputs() const { return get_num_output_(); }
381+
/*!\brief Return the number of input*/
382+
int NumInputs() const { return get_num_inputs_(); }
383+
/*!\brief Setting the data to this module via input index.*/
384+
void SetInput(const int index, DLTensor* data_in) {
385+
NDArray input = get_input_(index);
386+
DLTensor* dltensor_input = const_cast<DLTensor*>(input.operator->());
387+
CopyFromTo(data_in, dltensor_input);
388+
}
389+
/*!\brief Setting the data to the current runtime moduel via the input name. */
390+
void SetInput(const std::string name, DLTensor* data_in) {
391+
int index = this->GetInputIndex(name);
392+
SetInput(index, data_in);
393+
}
394+
/*!\brief Getting the input data via the input index.*/
395+
NDArray GetInput(int index) const { return get_input_(index); }
396+
/*!\bief Getting the input data via the input name.*/
397+
int GetInputIndex(const std::string& name) { return get_input_index_(name); }
398+
};
296399
/*!
297400
* \brief The information used to initialize the graph executor module, the information
298401
* come from the export library function call.
@@ -309,4 +412,6 @@ struct GraphModuleLoadInfo {
309412
};
310413
/*! The Module information of each module.The 'int' is module index. */
311414
using ModuleConfig = std::unordered_map<int, GraphModuleLoadInfo>;
415+
}; // namespace runtime
416+
}; // namespace tvm
312417
#endif // TVM_RUNTIME_PIPELINE_PIPELINE_STRUCT_H_

0 commit comments

Comments
 (0)