Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions python/tvm/contrib/pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,22 +117,18 @@ def __init__(self, module):
# Get the packed functions from the pipeline executor.
self._get_params_group_pipeline_map = self.module["get_params_group_pipeline_map"]
self._run = self.module["run"]
self._stop = self.module["stop"]
self._set_param = self.module["set_param"]
self._set_input = self.module["set_input"]
self._get_input = self.module["get_input"]
self._get_output = self.module["get_output"]
self._get_num_outputs = self.module["get_num_outputs"]
self._get_input_pipeline_map = self.module["get_input_pipeline_map"]
self._get_pipe_execute_number = self.module["get_statistic_pipe_execute_number"]

def run(self, sync=False):
"""Run the pipeline executor."""
self._run(sync)

def stop(self):
"""Stop the pipeline executor."""
self._stop()

def get_input_pipeline_map(self, name):
"""Using the "name" to get the corresponding subgraph index and also get the "input name"
of the corresponding subgraph interface.
Expand Down Expand Up @@ -213,6 +209,16 @@ def get_output(self):
"""
return self._get_output()

@property
def number_pipe_execute(self):
"""Getting the numbers of running pipeline.
Returns
-------
count : int
The numbers of running pipeline.
"""
return self._get_pipe_execute_number()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are using the term time or number throughout the code, but please use more specific terms. I think time is more like duration and number is more like elapsed_time?

Please update your naming convention in all files. I won't point out each of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think num_executing_pipeline is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.


@property
def num_outputs(self):
"""Get the number of outputs.
Expand Down
19 changes: 10 additions & 9 deletions src/runtime/pipeline/pipeline_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name,
[sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetOutput(); });
} else if (name == "run") {
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(args[0]); });
} else if (name == "stop") {
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Stop(); });
} else if (name == "get_statistic_pipe_execute_number") {
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) {
*rv = this->PipelineStatisticPipeExecuteNumber();
});
} else {
LOG(FATAL) << "Unknown packed function: " << name;
return PackedFunc();
Expand All @@ -96,7 +98,6 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name,
void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) {
std::pair<int, int> indexs = this->GetInputIndex(input_name);
if (indexs.first < 0 || indexs.first >= static_cast<int>(runtimes_.size())) {
this->Stop();
LOG(FATAL) << "input name " << input_name << " not found.";
}
runtimes_[indexs.first]->SetInput(indexs.second, data_in);
Expand All @@ -109,7 +110,6 @@ void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) {
NDArray PipelineExecutor::GetInput(std::string input_name) {
std::pair<int, int> indexs = this->GetInputIndex(input_name);
if (indexs.first < 0 || indexs.first >= static_cast<int>(runtimes_.size())) {
this->Stop();
LOG(FATAL) << "input name " << input_name << " not found.";
}
return runtimes_[indexs.first]->GetInput(indexs.second);
Expand Down Expand Up @@ -153,11 +153,6 @@ void PipelineExecutor::Run(bool serialized_mode) {
* \brief return A list of global output data.
*/
Array<NDArray> PipelineExecutor::GetOutput(void) { return pipeline_scheduler_.PipelineGetOutput(); }
/*!
* \brief Stop the pipeline executor.
*/
void PipelineExecutor::Stop() { pipeline_scheduler_.PipelineStop(); }

/*!
* \brief Use the mod_config information to create a graph runtime list.
* \param mod_config The config information that generates by the export library function call.
Expand Down Expand Up @@ -242,6 +237,12 @@ std::pair<int, int> PipelineExecutor::GetInputIndex(const std::string& name) {
auto gruntime = runtimes_[index.first];
return std::make_pair(index.first, gruntime->GetInputIndex(index.second));
}
/*!
* \brief Getting the numbers of running pipeline.
*/
int PipelineExecutor::PipelineStatisticPipeExecuteNumber() {
return runtimes_.back()->GetStatisticPipelineExecuteNumber();
}
/*!
* \brief Initialize the pipeline executor with a list of modules to be pipelined
* and config in JSON format.
Expand Down
8 changes: 4 additions & 4 deletions src/runtime/pipeline/pipeline_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ class TVM_DLL PipelineExecutor : public ModuleNode {
* \return Return input data.
*/
NDArray GetInput(std::string input_name);
/*!
* \brief Getting the numbers of running pipeline.
*/
int PipelineStatisticPipeExecuteNumber();
/*!
* \brief Use the parameters group name to get the specific backend runtime then use
* the param_key_name to set param data for the said backend runtime.
Expand All @@ -114,10 +118,6 @@ class TVM_DLL PipelineExecutor : public ModuleNode {
* \param serialized_mode Whether run the pipeline executor in serialized mode.
*/
void Run(bool serialized_mode);
/*!
* \brief Stop the pipeline executor.
*/
void Stop();
/*!
* \brief Get a list output data.
* \return A list of output data.
Expand Down
14 changes: 6 additions & 8 deletions src/runtime/pipeline/pipeline_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ std::vector<std::shared_ptr<BackendRuntime>> PipelineScheduler::PipelineInit(
const std::vector<Module>& modules, const ConfigPipelineExecution& pipeline_config) {
std::vector<std::shared_ptr<BackendRuntime>> runtimes;
graph_modules_ = modules;
// Creating a list of runtimes.
for (size_t i = 0; i < graph_modules_.size(); i++) {
auto runItem = std::make_shared<BackendRuntime>(graph_modules_[i], i);
runtimes.push_back(runItem);
Expand All @@ -46,6 +47,10 @@ std::vector<std::shared_ptr<BackendRuntime>> PipelineScheduler::PipelineInit(
NDArray output = runtimes[output_pair.first]->CreateFromOutput(output_pair.second);
output_arrays_.push_back(output);
}
// Initializing and then running the worker thread.
for (auto runtime : runtimes) {
runtime->InitializePipeline(pipeline_config, &runtimes);
}
return runtimes;
}
/*!
Expand Down Expand Up @@ -101,18 +106,11 @@ void PipelineScheduler::PipelineRunSequential(
void PipelineScheduler::PipelineRun(const std::vector<std::shared_ptr<BackendRuntime>>& runtimes,
ConfigPipelineExecution pipeline_config, bool sequential_mode) {
if (!sequential_mode) {
// TODO(huajsj) remove this check after all of pipeline features in.
LOG(FATAL) << "Currently only supports sequential mode.";
runtimes.front()->RunPipeLine();
} else {
PipelineRunSequential(runtimes, pipeline_config);
}
}
/*!
* \brief Stop the pipeline exection.
*/
void PipelineScheduler::PipelineStop() {
// TODO(huajsj) Add stop logic.
}
/*!
* \brief Get a list of output.
*/
Expand Down
4 changes: 0 additions & 4 deletions src/runtime/pipeline/pipeline_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ class PipelineScheduler {
*/
void PipelineRunSequential(const std::vector<std::shared_ptr<BackendRuntime>>& runtimes,
ConfigPipelineExecution pipeline_config);
/*!
* \brief Stop the pipeline exection.
*/
void PipelineStop();
/*!
* \brief Get a list of outputs.
*/
Expand Down
Loading