Skip to content

Commit 87f821a

Browse files
huajsjpfk-beta
authored andcommitted
[Runtime][Pipeline Executor] multiple threads management and the data forwarding notification mechanism. (apache#10234)
* [Runtime][Pipeline Executor] multiple threads management and the data forwarding notification mechanism. In this patch we create working threads for each runtime of pipeline. the threads would be terminated once the runtime class gets destroyed. We also add a notification mechanism derived from the 'binding configuration' of the runtime to forward the data notification. * address review comments. * address review comments. * fix typo. * fix typo. * trigger build. * address review comments. * address review comments. * address review comments. * address review comments.
1 parent 67806db commit 87f821a

File tree

7 files changed

+339
-37
lines changed

7 files changed

+339
-37
lines changed

python/tvm/contrib/pipeline_executor.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,22 +117,18 @@ def __init__(self, module):
117117
# Get the packed functions from the pipeline executor.
118118
self._get_params_group_pipeline_map = self.module["get_params_group_pipeline_map"]
119119
self._run = self.module["run"]
120-
self._stop = self.module["stop"]
121120
self._set_param = self.module["set_param"]
122121
self._set_input = self.module["set_input"]
123122
self._get_input = self.module["get_input"]
124123
self._get_output = self.module["get_output"]
125124
self._get_num_outputs = self.module["get_num_outputs"]
126125
self._get_input_pipeline_map = self.module["get_input_pipeline_map"]
126+
self._get_pipe_execute_count = self.module["get_execute_count"]
127127

128128
def run(self, sync=False):
129129
"""Run the pipeline executor."""
130130
self._run(sync)
131131

132-
def stop(self):
133-
"""Stop the pipeline executor."""
134-
self._stop()
135-
136132
def get_input_pipeline_map(self, name):
137133
"""Using the "name" to get the corresponding subgraph index and also get the "input name"
138134
of the corresponding subgraph interface.
@@ -213,6 +209,16 @@ def get_output(self):
213209
"""
214210
return self._get_output()
215211

212+
@property
213+
def num_executing_pipeline(self):
214+
"""Getting the count of running pipeline.
215+
Returns
216+
-------
217+
count : int
218+
The count of running pipeline.
219+
"""
220+
return self._get_pipe_execute_count()
221+
216222
@property
217223
def num_outputs(self):
218224
"""Get the number of outputs.

src/runtime/pipeline/pipeline_executor.cc

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name,
7979
[sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetOutput(); });
8080
} else if (name == "run") {
8181
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(args[0]); });
82-
} else if (name == "stop") {
83-
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Stop(); });
82+
} else if (name == "get_execute_count") {
83+
return PackedFunc(
84+
[sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetExecutionCount(); });
8485
} else {
8586
LOG(FATAL) << "Unknown packed function: " << name;
8687
return PackedFunc();
@@ -96,7 +97,6 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name,
9697
void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) {
9798
std::pair<int, int> indexs = this->GetInputIndex(input_name);
9899
if (indexs.first < 0 || indexs.first >= static_cast<int>(runtimes_.size())) {
99-
this->Stop();
100100
LOG(FATAL) << "input name " << input_name << " not found.";
101101
}
102102
runtimes_[indexs.first]->SetInput(indexs.second, data_in);
@@ -109,7 +109,6 @@ void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) {
109109
NDArray PipelineExecutor::GetInput(std::string input_name) {
110110
std::pair<int, int> indexs = this->GetInputIndex(input_name);
111111
if (indexs.first < 0 || indexs.first >= static_cast<int>(runtimes_.size())) {
112-
this->Stop();
113112
LOG(FATAL) << "input name " << input_name << " not found.";
114113
}
115114
return runtimes_[indexs.first]->GetInput(indexs.second);
@@ -153,11 +152,6 @@ void PipelineExecutor::Run(bool serialized_mode) {
153152
* \brief return A list of global output data.
154153
*/
155154
Array<NDArray> PipelineExecutor::GetOutput(void) { return pipeline_scheduler_.PipelineGetOutput(); }
156-
/*!
157-
* \brief Stop the pipeline executor.
158-
*/
159-
void PipelineExecutor::Stop() { pipeline_scheduler_.PipelineStop(); }
160-
161155
/*!
162156
* \brief Use the mod_config information to create a graph runtime list.
163157
* \param mod_config The config information that generates by the export library function call.
@@ -242,6 +236,10 @@ std::pair<int, int> PipelineExecutor::GetInputIndex(const std::string& name) {
242236
auto gruntime = runtimes_[index.first];
243237
return std::make_pair(index.first, gruntime->GetInputIndex(index.second));
244238
}
239+
/*!
240+
* \brief Getting the count of running pipeline.
241+
*/
242+
int PipelineExecutor::GetExecutionCount() { return runtimes_.back()->GetExecutionCount(); }
245243
/*!
246244
* \brief Initialize the pipeline executor with a list of modules to be pipelined
247245
* and config in JSON format.

src/runtime/pipeline/pipeline_executor.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ class TVM_DLL PipelineExecutor : public ModuleNode {
9595
* \return Return input data.
9696
*/
9797
NDArray GetInput(std::string input_name);
98+
/*!
99+
* \brief Getting the count of running pipeline.
100+
*/
101+
int GetExecutionCount();
98102
/*!
99103
* \brief Use the parameters group name to get the specific backend runtime then use
100104
* the param_key_name to set param data for the said backend runtime.
@@ -114,10 +118,6 @@ class TVM_DLL PipelineExecutor : public ModuleNode {
114118
* \param serialized_mode Whether run the pipeline executor in serialized mode.
115119
*/
116120
void Run(bool serialized_mode);
117-
/*!
118-
* \brief Stop the pipeline executor.
119-
*/
120-
void Stop();
121121
/*!
122122
* \brief Get a list output data.
123123
* \return A list of output data.

src/runtime/pipeline/pipeline_scheduler.cc

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@ std::vector<std::shared_ptr<BackendRuntime>> PipelineScheduler::PipelineInit(
3232
const std::vector<Module>& modules, const ConfigPipelineExecution& pipeline_config) {
3333
std::vector<std::shared_ptr<BackendRuntime>> runtimes;
3434
graph_modules_ = modules;
35+
// Creating a list of runtimes.
3536
for (size_t i = 0; i < graph_modules_.size(); i++) {
36-
auto runItem = std::make_shared<BackendRuntime>(graph_modules_[i], i);
37-
runtimes.push_back(runItem);
37+
auto run_item = std::make_shared<BackendRuntime>(graph_modules_[i], i);
38+
runtimes.push_back(run_item);
3839
}
3940
// Creating a list of NDArray in order to storage the outputs data.
4041
auto global_output_map = pipeline_config.GetGlobalConfigOutputBindings();
@@ -46,6 +47,10 @@ std::vector<std::shared_ptr<BackendRuntime>> PipelineScheduler::PipelineInit(
4647
NDArray output = runtimes[output_pair.first]->CreateFromOutput(output_pair.second);
4748
output_arrays_.push_back(output);
4849
}
50+
// Initializing and then running the worker thread.
51+
for (auto runtime : runtimes) {
52+
runtime->InitializePipeline(pipeline_config, &runtimes);
53+
}
4954
return runtimes;
5055
}
5156
/*!
@@ -101,18 +106,11 @@ void PipelineScheduler::PipelineRunSequential(
101106
void PipelineScheduler::PipelineRun(const std::vector<std::shared_ptr<BackendRuntime>>& runtimes,
102107
ConfigPipelineExecution pipeline_config, bool sequential_mode) {
103108
if (!sequential_mode) {
104-
// TODO(huajsj) remove this check after all of pipeline features in.
105-
LOG(FATAL) << "Currently only supports sequential mode.";
109+
runtimes.front()->RunPipeline();
106110
} else {
107111
PipelineRunSequential(runtimes, pipeline_config);
108112
}
109113
}
110-
/*!
111-
* \brief Stop the pipeline exection.
112-
*/
113-
void PipelineScheduler::PipelineStop() {
114-
// TODO(huajsj) Add stop logic.
115-
}
116114
/*!
117115
* \brief Get a list of output.
118116
*/

src/runtime/pipeline/pipeline_scheduler.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,6 @@ class PipelineScheduler {
5858
*/
5959
void PipelineRunSequential(const std::vector<std::shared_ptr<BackendRuntime>>& runtimes,
6060
ConfigPipelineExecution pipeline_config);
61-
/*!
62-
* \brief Stop the pipeline exection.
63-
*/
64-
void PipelineStop();
6561
/*!
6662
* \brief Get a list of outputs.
6763
*/

0 commit comments

Comments
 (0)