Skip to content

Commit 4a85dcd

Browse files
authored
[Runtime][PipelineExecutor] Getting the asynchronous output (#10723)
This patch create a new GlobalRuntime to check whether the output data ready and poll global output of pipeline, it also removed the sequence pipeline execution logic as the asynchronous logic already done.
1 parent 375566b commit 4a85dcd

File tree

7 files changed

+222
-208
lines changed

7 files changed

+222
-208
lines changed

python/tvm/contrib/pipeline_executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,9 @@ def __init__(self, module):
125125
self._get_input_pipeline_map = self.module["get_input_pipeline_map"]
126126
self._get_pipe_execute_count = self.module["get_execute_count"]
127127

128-
def run(self, sync=False):
128+
def run(self):
129129
"""Run the pipeline executor."""
130-
self._run(sync)
130+
self._run()
131131

132132
def get_input_pipeline_map(self, name):
133133
"""Using the "name" to get the corresponding subgraph index and also get the "input name"

src/runtime/pipeline/pipeline_executor.cc

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name,
7878
return PackedFunc(
7979
[sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetOutput(); });
8080
} else if (name == "run") {
81-
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(args[0]); });
81+
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(); });
8282
} else if (name == "get_execute_count") {
8383
return PackedFunc(
8484
[sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetExecutionCount(); });
@@ -140,13 +140,8 @@ int PipelineExecutor::GetParamsGroupPipelineMap(const std::string& name) {
140140
return param_connection_config[name];
141141
}
142142

143-
/*!
144-
* \brief Run the pipeline executor.
145-
* \param serialized_mode Whether run the pipeline executor in serialized mode.
146-
*/
147-
void PipelineExecutor::Run(bool serialized_mode) {
148-
pipeline_scheduler_.PipelineRun(runtimes_, pipeline_config_, serialized_mode);
149-
}
143+
/*!\brief Run the pipeline executor.*/
144+
void PipelineExecutor::Run() { pipeline_scheduler_.PipelineRun(runtimes_, pipeline_config_); }
150145
/*!
151146
* \brief return A list of global output data.
152147
*/

src/runtime/pipeline/pipeline_executor.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,8 @@ class TVM_DLL PipelineExecutor : public ModuleNode {
113113
* \return The number of outputs.
114114
*/
115115
int NumOutputs() const { return num_outputs_; }
116-
/*!
117-
* \brief Run the pipeline executor.
118-
* \param serialized_mode Whether run the pipeline executor in serialized mode.
119-
*/
120-
void Run(bool serialized_mode);
116+
/*!\brief Run the pipeline executor.*/
117+
void Run();
121118
/*!
122119
* \brief Get a list output data.
123120
* \return A list of output data.

src/runtime/pipeline/pipeline_scheduler.cc

Lines changed: 8 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ std::vector<std::shared_ptr<BackendRuntime>> PipelineScheduler::PipelineInit(
3232
const std::vector<Module>& modules, const ConfigPipelineExecution& pipeline_config) {
3333
std::vector<std::shared_ptr<BackendRuntime>> runtimes;
3434
graph_modules_ = modules;
35+
global_runtime_ = std::make_shared<GlobalRuntime>(GLOBAL_MODULE_INDEX);
3536
// Creating a list of runtimes.
3637
for (size_t i = 0; i < graph_modules_.size(); i++) {
3738
auto run_item = std::make_shared<BackendRuntime>(graph_modules_[i], i);
@@ -49,71 +50,25 @@ std::vector<std::shared_ptr<BackendRuntime>> PipelineScheduler::PipelineInit(
4950
}
5051
// Initializing and then running the worker thread.
5152
for (auto runtime : runtimes) {
52-
runtime->InitializePipeline(pipeline_config, &runtimes);
53+
runtime->InitializePipeline(pipeline_config, &runtimes, global_runtime_);
5354
}
5455
return runtimes;
5556
}
56-
/*!
57-
* \brief Running the pipeline logic in the sequential mode.
58-
* \param runtimes A list of backend runtime modules.
59-
* \param pipeline_config The dependent configuration of each runtime module.
60-
*/
61-
void PipelineScheduler::PipelineRunSequential(
62-
const std::vector<std::shared_ptr<BackendRuntime>>& runtimes,
63-
ConfigPipelineExecution pipeline_config) {
64-
for (size_t i = 0; i < runtimes.size(); i++) {
65-
// The "runtimes" is a list of runtime sorted by the runtime index which should be
66-
// contiguous ascend.
67-
if (static_cast<int>(i) != runtimes[i]->GetModuleIndex()) {
68-
LOG(FATAL) << "Runtime index " << runtimes[i]->GetModuleIndex()
69-
<< " is not as same as vector offset value " << i;
70-
}
71-
72-
if (!pipeline_config.FindModuleInConfig(i)) {
73-
LOG(FATAL) << "Not find the configuration for the module " << i;
74-
}
75-
76-
runtimes[i]->Run();
77-
// Getting the output then forwarding into other module once it is configured as input of
78-
// another module or storaging into the "output_array" when the output is a global one.
79-
int outputs_num = runtimes[i]->NumOutputs();
80-
for (int j = 0; j < outputs_num; j++) {
81-
ConfigBindings& out_binding = pipeline_config[i][j];
82-
std::unordered_map<int, std::string>& input_connections = out_binding.Get();
83-
NDArray output = runtimes[i]->GetOutput(j);
84-
for (auto bind : input_connections) {
85-
// "bind.first < 0" means the bind is a global bind, by pass the forwarding for
86-
// a global bind.
87-
if (bind.first < 0) continue;
88-
// Setting the output as an input data into the runtime module.
89-
runtimes[bind.first]->SetInput(bind.second, const_cast<DLTensor*>(output.operator->()));
90-
}
91-
// Store the output.
92-
if (out_binding.IsGlobalOutput()) {
93-
int global_idx = out_binding.GetGlobalOutputIndex();
94-
TVMArrayCopyFromTo(const_cast<DLTensor*>(output.operator->()),
95-
const_cast<DLTensor*>(output_arrays_[global_idx].operator->()), nullptr);
96-
}
97-
}
98-
}
99-
}
10057
/*!
10158
* \brief Running pipeline logic.
10259
* \param runtimes A list of backend runtime modules.
10360
* \param pipeline_config The dependency configuration of each runtime module.
104-
* \param sequential_mode Whether the execution is in a sequential mode.
10561
*/
10662
void PipelineScheduler::PipelineRun(const std::vector<std::shared_ptr<BackendRuntime>>& runtimes,
107-
ConfigPipelineExecution pipeline_config, bool sequential_mode) {
108-
if (!sequential_mode) {
109-
runtimes.front()->RunPipeline();
110-
} else {
111-
PipelineRunSequential(runtimes, pipeline_config);
112-
}
63+
ConfigPipelineExecution pipeline_config) {
64+
runtimes.front()->RunPipeline();
11365
}
11466
/*!
11567
* \brief Get a list of output.
11668
*/
117-
Array<NDArray> PipelineScheduler::PipelineGetOutput() { return output_arrays_; }
69+
Array<NDArray> PipelineScheduler::PipelineGetOutput() {
70+
bool ret = global_runtime_->GetOutput(&output_arrays_);
71+
return ret ? output_arrays_ : Array<NDArray>{};
72+
}
11873
} // namespace runtime
11974
} // namespace tvm

src/runtime/pipeline/pipeline_scheduler.h

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,9 @@ class PipelineScheduler {
4747
* \brief Running the pipeline logic.
4848
* \param runtimes A list of backend runtime modules.
4949
* \param pipeline_config The dependency configuration of each runtime module.
50-
* \param sequential_mode Whether the execution is in a sequential mode.
5150
*/
5251
void PipelineRun(const std::vector<std::shared_ptr<BackendRuntime>>& runtimes,
53-
ConfigPipelineExecution pipeline_config, bool sequential_mode = false);
54-
/*!
55-
* \brief Running the pipeline logic in the sequential mode.
56-
* \param runtimes A list of backend runtime modules.
57-
* \param pipeline_config The dependent configuration of each runtime module.
58-
*/
59-
void PipelineRunSequential(const std::vector<std::shared_ptr<BackendRuntime>>& runtimes,
60-
ConfigPipelineExecution pipeline_config);
52+
ConfigPipelineExecution pipeline_config);
6153
/*!
6254
* \brief Get a list of outputs.
6355
*/
@@ -68,6 +60,8 @@ class PipelineScheduler {
6860
std::vector<Module> graph_modules_;
6961
/*!\brief A list of NDArray used to storage outputs.*/
7062
Array<NDArray> output_arrays_;
63+
/*!\brief The global runtime to represent the pipeline executor.*/
64+
std::shared_ptr<GlobalRuntime> global_runtime_;
7165
};
7266
} // namespace runtime
7367
} // namespace tvm

0 commit comments

Comments
 (0)