From 284c08e0e9ca3c10516e07adc279b03657ad1c3a Mon Sep 17 00:00:00 2001 From: huajsj Date: Tue, 1 Feb 2022 17:07:33 -0800 Subject: [PATCH 01/10] [Runtime][Pipeline Executor] multiple threads management and the data forwarding notification mechanism. In this patch we create working threads for each runtime of pipeline. the threads would be terminated once the runtime class gets destroyed. We also add a notification mechanism derived from the 'binding configuration' of the runtime to forward the data notification. --- python/tvm/contrib/pipeline_executor.py | 16 +- src/runtime/pipeline/pipeline_executor.cc | 20 +- src/runtime/pipeline/pipeline_executor.h | 9 +- src/runtime/pipeline/pipeline_scheduler.cc | 14 +- src/runtime/pipeline/pipeline_scheduler.h | 4 - src/runtime/pipeline/pipeline_struct.h | 272 ++++++++++++++++++- tests/python/relay/test_pipeline_executor.py | 15 +- 7 files changed, 315 insertions(+), 35 deletions(-) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index b858a209db83..6e26049e5b60 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -117,22 +117,18 @@ def __init__(self, module): # Get the packed functions from the pipeline executor. self._get_params_group_pipeline_map = self.module["get_params_group_pipeline_map"] self._run = self.module["run"] - self._stop = self.module["stop"] self._set_param = self.module["set_param"] self._set_input = self.module["set_input"] self._get_input = self.module["get_input"] self._get_output = self.module["get_output"] self._get_num_outputs = self.module["get_num_outputs"] self._get_input_pipeline_map = self.module["get_input_pipeline_map"] + self._get_pipe_execute_number = self.module["get_statistic_pipe_execute_number"] def run(self, sync=False): """Run the pipeline executor.""" self._run(sync) - def stop(self): - """Stop the pipeline executor.""" - self._stop() - def get_input_pipeline_map(self, name): """Using the "name" to get the corresponding subgraph index and also get the "input name" of the corresponding subgraph interface. @@ -213,6 +209,16 @@ def get_output(self): """ return self._get_output() + @property + def number_pipe_execute(self): + """Getting the times of pipeline running. + Returns + ------- + count : int + The times of pipeline running. + """ + return self._get_pipe_execute_number() + @property def num_outputs(self): """Get the number of outputs. diff --git a/src/runtime/pipeline/pipeline_executor.cc b/src/runtime/pipeline/pipeline_executor.cc index 2cad8cf3b060..d09490825055 100644 --- a/src/runtime/pipeline/pipeline_executor.cc +++ b/src/runtime/pipeline/pipeline_executor.cc @@ -79,8 +79,10 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name, [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetOutput(); }); } else if (name == "run") { return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(args[0]); }); - } else if (name == "stop") { - return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Stop(); }); + } else if (name == "get_statistic_pipe_execute_number") { + return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { + *rv = this->PipelineStatisticPipeExecuteNumber(); + }); } else { LOG(FATAL) << "Unknown packed function: " << name; return PackedFunc(); @@ -96,7 +98,6 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name, void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) { std::pair indexs = this->GetInputIndex(input_name); if (indexs.first < 0 || indexs.first >= static_cast(runtimes_.size())) { - this->Stop(); LOG(FATAL) << "input name " << input_name << " not found."; } runtimes_[indexs.first]->SetInput(indexs.second, data_in); @@ -109,7 +110,6 @@ void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) { NDArray PipelineExecutor::GetInput(std::string input_name) { std::pair indexs = this->GetInputIndex(input_name); if (indexs.first < 0 || indexs.first >= static_cast(runtimes_.size())) { - this->Stop(); LOG(FATAL) << "input name " << input_name << " not found."; } return runtimes_[indexs.first]->GetInput(indexs.second); @@ -153,11 +153,6 @@ void PipelineExecutor::Run(bool serialized_mode) { * \brief return A list of global output data. */ Array PipelineExecutor::GetOutput(void) { return pipeline_scheduler_.PipelineGetOutput(); } -/*! - * \brief Stop the pipeline executor. - */ -void PipelineExecutor::Stop() { pipeline_scheduler_.PipelineStop(); } - /*! * \brief Use the mod_config information to create a graph runtime list. * \param mod_config The config information that generates by the export library function call. @@ -242,6 +237,13 @@ std::pair PipelineExecutor::GetInputIndex(const std::string& name) { auto gruntime = runtimes_[index.first]; return std::make_pair(index.first, gruntime->GetInputIndex(index.second)); } +/*! + * \brief Getting the times of pipeline running. + * \return Returning the times of pipeline running. + */ +int PipelineExecutor::PipelineStatisticPipeExecuteNumber() { + return runtimes_.back()->GetStatisticPipelineExecuteNumber(); +} /*! * \brief Initialize the pipeline executor with a list of modules to be pipelined * and config in JSON format. diff --git a/src/runtime/pipeline/pipeline_executor.h b/src/runtime/pipeline/pipeline_executor.h index 7b9f5eadf92b..0db2e2e5ca71 100644 --- a/src/runtime/pipeline/pipeline_executor.h +++ b/src/runtime/pipeline/pipeline_executor.h @@ -95,6 +95,11 @@ class TVM_DLL PipelineExecutor : public ModuleNode { * \return Return input data. */ NDArray GetInput(std::string input_name); + /*! + * \brief Getting the times of pipeline running. + * \return Returning the times of pipeline running. + */ + int PipelineStatisticPipeExecuteNumber(); /*! * \brief Use the parameters group name to get the specific backend runtime then use * the param_key_name to set param data for the said backend runtime. @@ -114,10 +119,6 @@ class TVM_DLL PipelineExecutor : public ModuleNode { * \param serialized_mode Whether run the pipeline executor in serialized mode. */ void Run(bool serialized_mode); - /*! - * \brief Stop the pipeline executor. - */ - void Stop(); /*! * \brief Get a list output data. * \return A list of output data. diff --git a/src/runtime/pipeline/pipeline_scheduler.cc b/src/runtime/pipeline/pipeline_scheduler.cc index 4a3368e32391..d11bc67cc4ff 100644 --- a/src/runtime/pipeline/pipeline_scheduler.cc +++ b/src/runtime/pipeline/pipeline_scheduler.cc @@ -32,6 +32,7 @@ std::vector> PipelineScheduler::PipelineInit( const std::vector& modules, const ConfigPipelineExecution& pipeline_config) { std::vector> runtimes; graph_modules_ = modules; + // Creating a list of runtimes. for (size_t i = 0; i < graph_modules_.size(); i++) { auto runItem = std::make_shared(graph_modules_[i], i); runtimes.push_back(runItem); @@ -46,6 +47,10 @@ std::vector> PipelineScheduler::PipelineInit( NDArray output = runtimes[output_pair.first]->CreateFromOutput(output_pair.second); output_arrays_.push_back(output); } + // Initializing and then running the work thread. + for (auto runtime : runtimes) { + runtime->Pipeline_Initialize(pipeline_config, &runtimes); + } return runtimes; } /*! @@ -101,18 +106,11 @@ void PipelineScheduler::PipelineRunSequential( void PipelineScheduler::PipelineRun(const std::vector>& runtimes, ConfigPipelineExecution pipeline_config, bool sequential_mode) { if (!sequential_mode) { - // TODO(huajsj) remove this check after all of pipeline features in. - LOG(FATAL) << "Currently only supports sequential mode."; + runtimes.front()->RunPipeLine(); } else { PipelineRunSequential(runtimes, pipeline_config); } } -/*! - * \brief Stop the pipeline exection. - */ -void PipelineScheduler::PipelineStop() { - // TODO(huajsj) Add stop logic. -} /*! * \brief Get a list of output. */ diff --git a/src/runtime/pipeline/pipeline_scheduler.h b/src/runtime/pipeline/pipeline_scheduler.h index 6075747a6c7f..3339d4376083 100644 --- a/src/runtime/pipeline/pipeline_scheduler.h +++ b/src/runtime/pipeline/pipeline_scheduler.h @@ -58,10 +58,6 @@ class PipelineScheduler { */ void PipelineRunSequential(const std::vector>& runtimes, ConfigPipelineExecution pipeline_config); - /*! - * \brief Stop the pipeline exection. - */ - void PipelineStop(); /*! * \brief Get a list of outputs. */ diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index 4002885f6702..e1fd2a5eaeac 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -24,8 +24,12 @@ #include #include +#include +#include #include +#include #include +#include #include #include #include @@ -33,7 +37,13 @@ namespace tvm { namespace runtime { #define GLOBAL_MODULE_INDEX -1 /*! - *\brief The pair includes the module output index and the global output index. + *\brief The function is used to build the binding configuration for a runtime. The first + * 'int' is the output index of the current runtime, the second 'int' is the index of child + * runtime, and the 'string' is the input name of child runtime. + */ +using BindingConfigParseFunc = std::function; +/*! + *\brief The 'pair' includes the module output index and the global output index. * The first 'int' is the module output index, and the second 'int' is the global output index. */ using GlobalOutputPair = std::pair; @@ -42,6 +52,75 @@ using GlobalOutputPair = std::pair; * The first 'int' is the module index, and the second 'int' is the module output index. */ using ModuleOutputPair = std::pair; +/*! + *\brief The pair includes the module index and the module input index. + * The first 'int' is the module index, and the second 'int' is the module input index. + */ +using ModuleInputPair = std::pair; +/*!\brief The data notification structure.*/ +class DataNotify { + private: + /*!\brief The 'contitional variable' is used to wait for notification.*/ + std::condition_variable condition_; + /*!\brief The mutex is used to protect the 'conditional variable'.*/ + std::mutex mutex_; + /*!\brief Whether a data is ready or not.*/ + volatile bool data_ready_ = false; + /*!\brief Whether the thread should exit or not.*/ + volatile bool exit_state_ = false; + /*!\brief The index of runtime which is sending out the data notification.*/ + int parent_idx_; + /*!\brief The index of runtime output interface which is sending out the data notification.*/ + int parent_output_idx_; + + public: + /*! + * \brief Constructing the DataNotify class. + * \param parent_idx The index of runtime which is sending out the data notification + * \param parent_output_idx The index of runtime output interface which is sending out + * the data notification. + */ + DataNotify(int parent_idx, int parent_output_idx) { + parent_idx_ = parent_idx; + parent_output_idx_ = parent_output_idx; + } + /*! + * \brief Getting the notification source. + * \return The first 'int' is the runtime index, and the second 'int' is the output index. + */ + std::pair GetNotifySource(void) { + return std::make_pair(parent_idx_, parent_output_idx_); + } + /*! + *\brief Waiting the notification. + *\return Returning the value 'false' when the notification is in a 'exit' state, else + * return true. + */ + bool Wait(void) { + std::unique_lock lock(mutex_); + condition_.wait(lock, [&] { return this->data_ready_; }); + data_ready_ = false; + return !exit_state_; + } + /*!brief Sending the notification in which the related data is ready.*/ + void Notify(void) { + { + std::lock_guard lock(mutex_); + data_ready_ = true; + } + condition_.notify_one(); + } + /*!brief Sending the notification when the notification state changes into 'exit'.*/ + void ExitNotify(void) { + exit_state_ = true; + Notify(); + } + /*! + *\brief Getting the 'exit state'. + *\return Returning the value of 'exit_state_' + */ + bool GetExitState(void) { return exit_state_; } +}; /*! * \brief All binding information of a output interface. */ @@ -53,6 +132,16 @@ class ConfigBindings { int GetGlobalOutputIndex() const { return global_output_index_; } /*!\brief Returning the binding configuration.*/ std::unordered_map& Get() { return bindings_; } + /*! + * \brief Enumrating the binding configuration. + * \param parse_function The function is used to parse the binding configuration. + * \param output_idx The index of output interface is used for parsing. + */ + void VisitOutput(BindingConfigParseFunc parse_function, int output_idx) { + for (auto output : bindings_) { + parse_function(output_idx, output.first, output.second); + } + } /*! * \brief Create a module interface map from JSONReader. * \param reader JSON reader. @@ -118,6 +207,15 @@ class ConfigOutputBindings { ICHECK(output_binding_map_.find(key) != output_binding_map_.end()); return output_binding_map_[key]; } + /*! + * \brief Enumerating the output configuration. + * \param parse_function The callback function is used to parse the binding configeration. + */ + void VisitOutputConfig(BindingConfigParseFunc parse_function) { + for (auto output : output_binding_map_) { + output.second.VisitOutput(parse_function, output.first); + } + } /*!brief Return the variable "output_binding_map_".*/ std::unordered_map GetOutBindings() const { return output_binding_map_; } /*! @@ -189,6 +287,18 @@ class ConfigPipelineExecution { ICHECK(config_.find(key) != config_.end()); return config_[key]; } + /*! + * \brief Enumurating the binding configuration for a specify runtime. + * \param parse_function The callback function is used to parse the binding configuration. + * \param runtime_index The index of a runtime is used to parse the binding configuration. + */ + void VisitRuntimeOutputConfig(BindingConfigParseFunc parse_function, int runtime_index) { + auto config = config_.find(runtime_index); + if (config == config_.end()) { + LOG(FATAL) << "Do not finding the runtime " << runtime_index; + } + config->second.VisitOutputConfig(parse_function); + } /* *!\brief This function is used to verify whether config is loaded successfully. * \return Return "true" to indicate that this class has not been successfully loaded. @@ -367,11 +477,21 @@ struct ParamConnectionConfig { *\brief Backend Runtime. */ class BackendRuntime { + using ModuleInputPairList = std::vector, int>>; + private: /*\brief The index of runtime indicates the runtime position in the pipeline.*/ int runtime_idx_; /*\brief The Runtime module of a backend graph executor.*/ Module module_; + /*\brief The thread is associated with the current runtime*/ + std::thread thread_; + /*\brief A list of runtime which depends on the current runtime.*/ + std::unordered_map childs_; + /*\brief A map including the runtime input index and the notification data struction.*/ + std::unordered_map> parents_notify_; + /*\brief The times of using pipeline function. */ + uint32_t statistic_pipeline_execute_times_ = 0; /*! *\brief In order to transfer data from one backend runtime to another, we need a local * tensor variable as a medium. "input_tensor_local_copy_" is a map including @@ -386,6 +506,99 @@ class BackendRuntime { tvm::runtime::PackedFunc get_num_inputs_; tvm::runtime::PackedFunc get_input_index_; tvm::runtime::PackedFunc run_; + /*!\brief The working thread is used to execute the runtimes in pipeline.*/ + void StartWorkThread() { + if (runtime_idx_ == 0) { + this->CreateParentsNotify(0, GLOBAL_MODULE_INDEX, 0); + } else { + // Only launching work thread for the runtimes after the first runtime. + thread_ = std::thread([&]() { + while (!this->WaitAndLoadPipeLineData()) { + this->RunPipeLine(); + } + VLOG(1) << "Runtime " << this->runtime_idx_ << " exit."; + }); + } + return; + } + /*!\brief Stopping the threads in pipeline.*/ + void StopPipeline() { + for (auto notify : parents_notify_) { + notify.second->ExitNotify(); + } + if (thread_.joinable()) { + thread_.join(); + } + } + /*! + * \brief Waiting for the internal forwarding data. + * \return Returning 'true' when getting a 'exit' notification otherwise returning 'false'. + */ + bool WaitAndLoadPipeLineData() { + std::unordered_map> notifys = parents_notify_; + bool exit_notify = false; + while (!notifys.empty() && !exit_notify) { + auto notify = notifys.begin(); + // Breaking the loop when the notification is in the exit state. + if ((exit_notify = notify->second->GetExitState())) break; + // Getting the source which sends this notification. + auto notify_source = notify->second->GetNotifySource(); + // Loading the binding data. + while (!this->LoadBindingData(notify->first, notify_source.first, notify_source.second)) { + // Waiting for the notification. + if (!notify->second->Wait()) { + VLOG(1) << "runtime index:" << runtime_idx_ << " receive exit notify."; + exit_notify = true; + break; + } + // TODO(huajsj): removing this 'break' after finishing the 'LoadBindingData'. + break; + } + VLOG(1) << "runtime_index.input_index:" << runtime_idx_ << "." << notify->first + << "from runtime_index.output_index:" << notify_source.first << "." + << notify_source.second; + notifys.erase(notify); + } + return exit_notify; + } + /*! + * \brief Loading the binding data. + * \param parent_idx The index of runtime which forwards data to current runtime. + * \param parent_output_idx The index of output where the forwarding data is coming from. + * \param input_idx The index of input where the data will be forwarding to. + * \return Returning 'true' when data is loaded successfully, otherwise returning 'false'. + */ + bool LoadBindingData(int parent_idx, int parent_output_idx, int input_idx) { + // TODO(huajsj): Loading data. + return false; + } + /*! + * \brief Forwarding the output data into the child runtimes. + */ + void ForwardingOutputDataToChilds(void) { + for (auto child : childs_) { + // TODO(huajsj): Getting the output data from the current runtime in order to forward + // data to the child. + + // Notifying the 'childs runtime' that the forwarding data are ready. + for (auto module_pair : child.second) { + module_pair.first->ParentNotify(module_pair.second); + } + } + } + /*! + *\brief Creating a parent notification. + *\param input_index The input index of the 'current runtime'. + *\param parent_idx The index of 'parent runtime' which will send the notification. + *\param parent_output_idx The output index of the 'parent runtime' which will send + * the nofication. + */ + void CreateParentsNotify(int input_index, int parent_idx, int parent_output_idx) { + if (parents_notify_.find(input_index) != parents_notify_.end()) { + LOG(FATAL) << "Not finding the input index " << input_index << " in runtime " << runtime_idx_; + } + parents_notify_[input_index] = std::make_shared(parent_idx, parent_output_idx); + } /*! * \brief Copying from a given tensor and using 'CPU' as the device. */ @@ -439,11 +652,60 @@ class BackendRuntime { get_output_ = module_.GetFunction("get_output"); run_ = module_.GetFunction("run"); } - BackendRuntime(void) {} ~BackendRuntime() { for (auto data : input_tensor_local_copy_) { TVMArrayFree(data.second); } + StopPipeline(); + } + /*!brief Getting the runtime index*/ + int GetIndex() const { return runtime_idx_; } + /*! + * \brief Getting the times of using pipeline function. + * \return The times of using pipeline function. + */ + int GetStatisticPipelineExecuteNumber() const { return statistic_pipeline_execute_times_; } + /*! + * \brief Initializing data structures for the pipeline execution. + * \param config The pipeline configueration. + * \param runtimes A list of BackendRuntime. + */ + void Pipeline_Initialize(ConfigPipelineExecution config, + std::vector>* runtimes) { + // Getting the 'binding configuration' for each runtime. + config.VisitRuntimeOutputConfig( + [&](int output_idx, int child_idx, std::string child_input_name) { + int runtime_idx_max = runtimes->size(); + if (child_idx < 0 || child_idx >= runtime_idx_max) { + LOG(FATAL) << "The runtime index " << child_idx << " is out of the range."; + } + auto child_runtime = runtimes->at(child_idx); + int input_index = child_runtime->GetInputIndex(child_input_name); + if (input_index < 0) { + LOG(FATAL) << "Can not find the input " << input_index << "in runtime " << child_idx; + } + childs_[output_idx].push_back(std::make_pair(child_runtime, input_index)); + child_runtime->CreateParentsNotify(input_index, runtime_idx_, output_idx); + VLOG(1) << " parent_idx.output:" << runtime_idx_ << "." << output_idx << " child.input" + << child_idx << "." << input_index; + }, + runtime_idx_); + + StartWorkThread(); + } + /*! + * \brief Notifying a input is ready. + * \param input_index The index of 'input interface' which is ready for data. + */ + void ParentNotify(int input_index) { + auto notify = parents_notify_.find(input_index); + if (notify == parents_notify_.end()) { + LOG(FATAL) << "Can not find the input for index " << input_index << " in runtime" + << runtime_idx_; + return; + } + notify->second->Notify(); + VLOG(1) << "Notification at runtime_index.input_index:" << runtime_idx_ << "." << input_index; } /*!\brief Creating a NDArray containing same shape and data type with a module output. */ NDArray CreateFromOutput(int idx) { @@ -475,6 +737,12 @@ class BackendRuntime { NDArray GetOutput(int index) { return get_output_(index); } /*!\brief Running the runtime.*/ void Run() { run_(); } + /*!\brief Running the runtime in the pipeline mode.*/ + void RunPipeLine() { + Run(); + ForwardingOutputDataToChilds(); + statistic_pipeline_execute_times_++; + } }; /*! * \brief The information used to initialize the graph executor module, the information diff --git a/tests/python/relay/test_pipeline_executor.py b/tests/python/relay/test_pipeline_executor.py index 0851d377fe6a..1fecf85d31d9 100644 --- a/tests/python/relay/test_pipeline_executor.py +++ b/tests/python/relay/test_pipeline_executor.py @@ -325,8 +325,7 @@ def test_pipeline(): pipe_config[mod3].target = "llvm" pipe_config[mod3].dev = tvm.cpu(0) - - # Here is to check the correctness of the configuration generated by API. + # Checking the configuration of modules dependency. mconfig = pipe_config.get_config() assert mconfig["module_connection"] == get_manual_conf([mod1, mod2, mod3], target) @@ -392,7 +391,17 @@ def test_pipeline(): for i in range(len(outputs)): tvm.testing.assert_allclose(normal_output[i], outputs[i].numpy()) assert not (normal_output[i] == wrong_output[i]).all() - pipeline_module_test.stop() + # Running the pipeline executor in the pipeline mode. + pipeline_module_test.run(False) + + # TODO(huajsj:) Replacing the checking logic with getting output logic. + # Checking the statistic value of pipeline. + statistic_time = 0 + while pipeline_module_test.number_pipe_execute < len(datas): + statistic_time = statistic_time + 1 + # Setting the timeout to 10 seconds. + assert statistic_time < 10 + time.sleep(1) if __name__ == "__main__": From 3545898cc530a7cb83712d13efae728461788ae4 Mon Sep 17 00:00:00 2001 From: huajsj Date: Tue, 15 Feb 2022 01:16:29 -0800 Subject: [PATCH 02/10] address review comments. --- python/tvm/contrib/pipeline_executor.py | 4 +- src/runtime/pipeline/pipeline_executor.cc | 3 +- src/runtime/pipeline/pipeline_executor.h | 3 +- src/runtime/pipeline/pipeline_scheduler.cc | 2 +- src/runtime/pipeline/pipeline_struct.h | 54 ++++++++++++---------- 5 files changed, 34 insertions(+), 32 deletions(-) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index 6e26049e5b60..3b66b73a0489 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -211,11 +211,11 @@ def get_output(self): @property def number_pipe_execute(self): - """Getting the times of pipeline running. + """Getting the numbers of pipeline running. Returns ------- count : int - The times of pipeline running. + The numbers of pipeline running. """ return self._get_pipe_execute_number() diff --git a/src/runtime/pipeline/pipeline_executor.cc b/src/runtime/pipeline/pipeline_executor.cc index d09490825055..9e70101aaa63 100644 --- a/src/runtime/pipeline/pipeline_executor.cc +++ b/src/runtime/pipeline/pipeline_executor.cc @@ -238,8 +238,7 @@ std::pair PipelineExecutor::GetInputIndex(const std::string& name) { return std::make_pair(index.first, gruntime->GetInputIndex(index.second)); } /*! - * \brief Getting the times of pipeline running. - * \return Returning the times of pipeline running. + * \brief Getting the numbers of pipeline running. */ int PipelineExecutor::PipelineStatisticPipeExecuteNumber() { return runtimes_.back()->GetStatisticPipelineExecuteNumber(); diff --git a/src/runtime/pipeline/pipeline_executor.h b/src/runtime/pipeline/pipeline_executor.h index 0db2e2e5ca71..822fe4b6187d 100644 --- a/src/runtime/pipeline/pipeline_executor.h +++ b/src/runtime/pipeline/pipeline_executor.h @@ -96,8 +96,7 @@ class TVM_DLL PipelineExecutor : public ModuleNode { */ NDArray GetInput(std::string input_name); /*! - * \brief Getting the times of pipeline running. - * \return Returning the times of pipeline running. + * \brief Getting the numbers of pipeline running. */ int PipelineStatisticPipeExecuteNumber(); /*! diff --git a/src/runtime/pipeline/pipeline_scheduler.cc b/src/runtime/pipeline/pipeline_scheduler.cc index d11bc67cc4ff..9766510d8a7a 100644 --- a/src/runtime/pipeline/pipeline_scheduler.cc +++ b/src/runtime/pipeline/pipeline_scheduler.cc @@ -49,7 +49,7 @@ std::vector> PipelineScheduler::PipelineInit( } // Initializing and then running the work thread. for (auto runtime : runtimes) { - runtime->Pipeline_Initialize(pipeline_config, &runtimes); + runtime->InitializePipeline(pipeline_config, &runtimes); } return runtimes; } diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index e1fd2a5eaeac..e44176a47912 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -57,48 +58,50 @@ using ModuleOutputPair = std::pair; * The first 'int' is the module index, and the second 'int' is the module input index. */ using ModuleInputPair = std::pair; +/*! + *\brief The pair includes the module index and the module output index. + * The first 'int' is the module index, and the second 'int' is the module output index. + */ +using ModuleOutputPair = std::pair; /*!\brief The data notification structure.*/ class DataNotify { private: /*!\brief The 'contitional variable' is used to wait for notification.*/ - std::condition_variable condition_; + std::condition_variable notify_cv; /*!\brief The mutex is used to protect the 'conditional variable'.*/ std::mutex mutex_; /*!\brief Whether a data is ready or not.*/ volatile bool data_ready_ = false; /*!\brief Whether the thread should exit or not.*/ volatile bool exit_state_ = false; - /*!\brief The index of runtime which is sending out the data notification.*/ - int parent_idx_; - /*!\brief The index of runtime output interface which is sending out the data notification.*/ - int parent_output_idx_; + /*! + * \brief The 'ModuleOutputPair' in which the data was ready and triggered this + * notification. + */ + ModuleOutputPair notification_source_; public: /*! * \brief Constructing the DataNotify class. - * \param parent_idx The index of runtime which is sending out the data notification - * \param parent_output_idx The index of runtime output interface which is sending out + * \param parent_output_pair The index of runtime which is sending out the data notification * the data notification. */ - DataNotify(int parent_idx, int parent_output_idx) { - parent_idx_ = parent_idx; - parent_output_idx_ = parent_output_idx; + explicit DataNotify(ModuleOutputPair parent_output_pair) { + notification_source_ = parent_output_pair; } /*! * \brief Getting the notification source. * \return The first 'int' is the runtime index, and the second 'int' is the output index. */ - std::pair GetNotifySource(void) { - return std::make_pair(parent_idx_, parent_output_idx_); - } + ModuleOutputPair GetNotifySource(void) { return notification_source_; } /*! - *\brief Waiting the notification. + *\brief Waiting for the notification. *\return Returning the value 'false' when the notification is in a 'exit' state, else * return true. */ bool Wait(void) { std::unique_lock lock(mutex_); - condition_.wait(lock, [&] { return this->data_ready_; }); + notify_cv.wait(lock, [&] { return this->data_ready_; }); data_ready_ = false; return !exit_state_; } @@ -108,7 +111,7 @@ class DataNotify { std::lock_guard lock(mutex_); data_ready_ = true; } - condition_.notify_one(); + notify_cv.notify_one(); } /*!brief Sending the notification when the notification state changes into 'exit'.*/ void ExitNotify(void) { @@ -133,7 +136,7 @@ class ConfigBindings { /*!\brief Returning the binding configuration.*/ std::unordered_map& Get() { return bindings_; } /*! - * \brief Enumrating the binding configuration. + * \brief Enumerating the binding configuration. * \param parse_function The function is used to parse the binding configuration. * \param output_idx The index of output interface is used for parsing. */ @@ -487,8 +490,8 @@ class BackendRuntime { /*\brief The thread is associated with the current runtime*/ std::thread thread_; /*\brief A list of runtime which depends on the current runtime.*/ - std::unordered_map childs_; - /*\brief A map including the runtime input index and the notification data struction.*/ + std::unordered_map children_; + /*\brief A map including the runtime input index and the notification data structure.*/ std::unordered_map> parents_notify_; /*\brief The times of using pipeline function. */ uint32_t statistic_pipeline_execute_times_ = 0; @@ -506,7 +509,7 @@ class BackendRuntime { tvm::runtime::PackedFunc get_num_inputs_; tvm::runtime::PackedFunc get_input_index_; tvm::runtime::PackedFunc run_; - /*!\brief The working thread is used to execute the runtimes in pipeline.*/ + /*!\brief The worker thread is used to execute the runtimes in pipeline.*/ void StartWorkThread() { if (runtime_idx_ == 0) { this->CreateParentsNotify(0, GLOBAL_MODULE_INDEX, 0); @@ -576,7 +579,7 @@ class BackendRuntime { * \brief Forwarding the output data into the child runtimes. */ void ForwardingOutputDataToChilds(void) { - for (auto child : childs_) { + for (auto child : children_) { // TODO(huajsj): Getting the output data from the current runtime in order to forward // data to the child. @@ -597,7 +600,8 @@ class BackendRuntime { if (parents_notify_.find(input_index) != parents_notify_.end()) { LOG(FATAL) << "Not finding the input index " << input_index << " in runtime " << runtime_idx_; } - parents_notify_[input_index] = std::make_shared(parent_idx, parent_output_idx); + parents_notify_[input_index] = + std::make_shared(std::make_pair(parent_idx, parent_output_idx)); } /*! * \brief Copying from a given tensor and using 'CPU' as the device. @@ -670,8 +674,8 @@ class BackendRuntime { * \param config The pipeline configueration. * \param runtimes A list of BackendRuntime. */ - void Pipeline_Initialize(ConfigPipelineExecution config, - std::vector>* runtimes) { + void InitializePipeline(ConfigPipelineExecution config, + std::vector>* runtimes) { // Getting the 'binding configuration' for each runtime. config.VisitRuntimeOutputConfig( [&](int output_idx, int child_idx, std::string child_input_name) { @@ -684,7 +688,7 @@ class BackendRuntime { if (input_index < 0) { LOG(FATAL) << "Can not find the input " << input_index << "in runtime " << child_idx; } - childs_[output_idx].push_back(std::make_pair(child_runtime, input_index)); + children_[output_idx].push_back(std::make_pair(child_runtime, input_index)); child_runtime->CreateParentsNotify(input_index, runtime_idx_, output_idx); VLOG(1) << " parent_idx.output:" << runtime_idx_ << "." << output_idx << " child.input" << child_idx << "." << input_index; From c9e353d75850840f2ec0b9975a87e8336fd50edf Mon Sep 17 00:00:00 2001 From: huajsj Date: Tue, 15 Feb 2022 01:22:03 -0800 Subject: [PATCH 03/10] address review comments. --- src/runtime/pipeline/pipeline_executor.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/runtime/pipeline/pipeline_executor.h b/src/runtime/pipeline/pipeline_executor.h index 822fe4b6187d..509191d3b39e 100644 --- a/src/runtime/pipeline/pipeline_executor.h +++ b/src/runtime/pipeline/pipeline_executor.h @@ -96,7 +96,7 @@ class TVM_DLL PipelineExecutor : public ModuleNode { */ NDArray GetInput(std::string input_name); /*! - * \brief Getting the numbers of pipeline running. + * \brief Getting the numbers of running pipeline. */ int PipelineStatisticPipeExecuteNumber(); /*! From 3f3b470920c77f454d3333a625b170082611ae0c Mon Sep 17 00:00:00 2001 From: huajsj Date: Tue, 15 Feb 2022 10:41:28 -0800 Subject: [PATCH 04/10] fix typo. --- python/tvm/contrib/pipeline_executor.py | 4 ++-- src/runtime/pipeline/pipeline_executor.cc | 2 +- src/runtime/pipeline/pipeline_scheduler.cc | 2 +- src/runtime/pipeline/pipeline_struct.h | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index 3b66b73a0489..c7f4425299ed 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -211,11 +211,11 @@ def get_output(self): @property def number_pipe_execute(self): - """Getting the numbers of pipeline running. + """Getting the numbers of running pipeline. Returns ------- count : int - The numbers of pipeline running. + The numbers of running pipeline. """ return self._get_pipe_execute_number() diff --git a/src/runtime/pipeline/pipeline_executor.cc b/src/runtime/pipeline/pipeline_executor.cc index 9e70101aaa63..9efa2d6ec61f 100644 --- a/src/runtime/pipeline/pipeline_executor.cc +++ b/src/runtime/pipeline/pipeline_executor.cc @@ -238,7 +238,7 @@ std::pair PipelineExecutor::GetInputIndex(const std::string& name) { return std::make_pair(index.first, gruntime->GetInputIndex(index.second)); } /*! - * \brief Getting the numbers of pipeline running. + * \brief Getting the numbers of running pipeline. */ int PipelineExecutor::PipelineStatisticPipeExecuteNumber() { return runtimes_.back()->GetStatisticPipelineExecuteNumber(); diff --git a/src/runtime/pipeline/pipeline_scheduler.cc b/src/runtime/pipeline/pipeline_scheduler.cc index 9766510d8a7a..58e0a74236d7 100644 --- a/src/runtime/pipeline/pipeline_scheduler.cc +++ b/src/runtime/pipeline/pipeline_scheduler.cc @@ -47,7 +47,7 @@ std::vector> PipelineScheduler::PipelineInit( NDArray output = runtimes[output_pair.first]->CreateFromOutput(output_pair.second); output_arrays_.push_back(output); } - // Initializing and then running the work thread. + // Initializing and then running the worker thread. for (auto runtime : runtimes) { runtime->InitializePipeline(pipeline_config, &runtimes); } diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index e44176a47912..cbec868d0fd5 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -291,7 +291,7 @@ class ConfigPipelineExecution { return config_[key]; } /*! - * \brief Enumurating the binding configuration for a specify runtime. + * \brief Enumerating the binding configuration for a specified runtime. * \param parse_function The callback function is used to parse the binding configuration. * \param runtime_index The index of a runtime is used to parse the binding configuration. */ @@ -514,7 +514,7 @@ class BackendRuntime { if (runtime_idx_ == 0) { this->CreateParentsNotify(0, GLOBAL_MODULE_INDEX, 0); } else { - // Only launching work thread for the runtimes after the first runtime. + // Only launching the worker thread for the runtimes after the first runtime. thread_ = std::thread([&]() { while (!this->WaitAndLoadPipeLineData()) { this->RunPipeLine(); From 9392b2e610e64735486c2b22e799183ce1c50d9a Mon Sep 17 00:00:00 2001 From: huajsj Date: Tue, 15 Feb 2022 11:11:39 -0800 Subject: [PATCH 05/10] fix typo. --- src/runtime/pipeline/pipeline_struct.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index cbec868d0fd5..9e6df0d43e31 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -578,12 +578,12 @@ class BackendRuntime { /*! * \brief Forwarding the output data into the child runtimes. */ - void ForwardingOutputDataToChilds(void) { + void ForwardingOutputDataToChildren(void) { for (auto child : children_) { // TODO(huajsj): Getting the output data from the current runtime in order to forward // data to the child. - // Notifying the 'childs runtime' that the forwarding data are ready. + // Notifying the 'children runtime' that the forwarding data are ready. for (auto module_pair : child.second) { module_pair.first->ParentNotify(module_pair.second); } @@ -744,7 +744,7 @@ class BackendRuntime { /*!\brief Running the runtime in the pipeline mode.*/ void RunPipeLine() { Run(); - ForwardingOutputDataToChilds(); + ForwardingOutputDataToChildren(); statistic_pipeline_execute_times_++; } }; From 70f1560fa99dba2b31bbedbe07c1d17b190f3d0b Mon Sep 17 00:00:00 2001 From: huajsj Date: Tue, 15 Feb 2022 22:10:08 -0800 Subject: [PATCH 06/10] trigger build. --- src/runtime/pipeline/pipeline_struct.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index 9e6df0d43e31..a82f83b9e77d 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -54,7 +54,7 @@ using GlobalOutputPair = std::pair; */ using ModuleOutputPair = std::pair; /*! - *\brief The pair includes the module index and the module input index. + *\brief The pair includes the runtime module index and the module input index. * The first 'int' is the module index, and the second 'int' is the module input index. */ using ModuleInputPair = std::pair; From 2d97c17fa0d70e07f4a76746e0b51ab6a1c57fbb Mon Sep 17 00:00:00 2001 From: huajsj Date: Tue, 22 Feb 2022 21:19:50 -0800 Subject: [PATCH 07/10] address review comments. --- python/tvm/contrib/pipeline_executor.py | 10 ++-- src/runtime/pipeline/pipeline_executor.cc | 8 +-- src/runtime/pipeline/pipeline_executor.h | 4 +- src/runtime/pipeline/pipeline_struct.h | 53 ++++++++++++++------ tests/python/relay/test_pipeline_executor.py | 2 +- 5 files changed, 51 insertions(+), 26 deletions(-) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index c7f4425299ed..dc59bd0d73b2 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -123,7 +123,7 @@ def __init__(self, module): self._get_output = self.module["get_output"] self._get_num_outputs = self.module["get_num_outputs"] self._get_input_pipeline_map = self.module["get_input_pipeline_map"] - self._get_pipe_execute_number = self.module["get_statistic_pipe_execute_number"] + self._get_pipe_execute_count = self.module["get_statistic_pipe_execute_count"] def run(self, sync=False): """Run the pipeline executor.""" @@ -210,14 +210,14 @@ def get_output(self): return self._get_output() @property - def number_pipe_execute(self): - """Getting the numbers of running pipeline. + def num_executing_pipeline(self): + """Getting the count of running pipeline. Returns ------- count : int - The numbers of running pipeline. + The count of running pipeline. """ - return self._get_pipe_execute_number() + return self._get_pipe_execute_count() @property def num_outputs(self): diff --git a/src/runtime/pipeline/pipeline_executor.cc b/src/runtime/pipeline/pipeline_executor.cc index 9efa2d6ec61f..9c88646f06f3 100644 --- a/src/runtime/pipeline/pipeline_executor.cc +++ b/src/runtime/pipeline/pipeline_executor.cc @@ -79,9 +79,9 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name, [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetOutput(); }); } else if (name == "run") { return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(args[0]); }); - } else if (name == "get_statistic_pipe_execute_number") { + } else if (name == "get_statistic_pipe_execute_count") { return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { - *rv = this->PipelineStatisticPipeExecuteNumber(); + *rv = this->PipelineStatisticPipeExecuterCount(); }); } else { LOG(FATAL) << "Unknown packed function: " << name; @@ -238,9 +238,9 @@ std::pair PipelineExecutor::GetInputIndex(const std::string& name) { return std::make_pair(index.first, gruntime->GetInputIndex(index.second)); } /*! - * \brief Getting the numbers of running pipeline. + * \brief Getting the count of running pipeline. */ -int PipelineExecutor::PipelineStatisticPipeExecuteNumber() { +int PipelineExecutor::PipelineStatisticPipeExecuterCount() { return runtimes_.back()->GetStatisticPipelineExecuteNumber(); } /*! diff --git a/src/runtime/pipeline/pipeline_executor.h b/src/runtime/pipeline/pipeline_executor.h index 509191d3b39e..a887053a554b 100644 --- a/src/runtime/pipeline/pipeline_executor.h +++ b/src/runtime/pipeline/pipeline_executor.h @@ -96,9 +96,9 @@ class TVM_DLL PipelineExecutor : public ModuleNode { */ NDArray GetInput(std::string input_name); /*! - * \brief Getting the numbers of running pipeline. + * \brief Getting the count of running pipeline. */ - int PipelineStatisticPipeExecuteNumber(); + int PipelineStatisticPipeExecuterCount(); /*! * \brief Use the parameters group name to get the specific backend runtime then use * the param_key_name to set param data for the said backend runtime. diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index a82f83b9e77d..b7e6061429a0 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -37,6 +37,7 @@ namespace tvm { namespace runtime { #define GLOBAL_MODULE_INDEX -1 +#define memory_barrier() std::atomic_thread_fence(std::memory_order_acquire) /*! *\brief The function is used to build the binding configuration for a runtime. The first * 'int' is the output index of the current runtime, the second 'int' is the index of child @@ -58,11 +59,33 @@ using ModuleOutputPair = std::pair; * The first 'int' is the module index, and the second 'int' is the module input index. */ using ModuleInputPair = std::pair; +/*!\brief The runtime module interface type.*/ +enum InterfaceType { + INPUT = 0, + OUTPUT, +}; /*! - *\brief The pair includes the module index and the module output index. - * The first 'int' is the module index, and the second 'int' is the module output index. + *\brief The structure includes the module index and the module output index. */ -using ModuleOutputPair = std::pair; +struct ModuleInterfaceID { + ModuleInterfaceID() : runtime_idx(0), runtime_interface_idx(0), interface_type(OUTPUT) { ; } + ModuleInterfaceID(int runtime_index, int runtime_interface_index, InterfaceType type = OUTPUT) { + runtime_idx = runtime_index; + runtime_interface_idx = runtime_interface_index; + interface_type = type; + } + int runtime_idx; + union { + /*!\brief The output interface index.*/ + int runtime_output_idx; + /*!\brief The input interface index.*/ + int runtime_input_idx; + /*!\brief The interface index.*/ + int runtime_interface_idx; + }; + /*!\brief The interface type*/ + InterfaceType interface_type; +}; /*!\brief The data notification structure.*/ class DataNotify { private: @@ -75,25 +98,25 @@ class DataNotify { /*!\brief Whether the thread should exit or not.*/ volatile bool exit_state_ = false; /*! - * \brief The 'ModuleOutputPair' in which the data was ready and triggered this + * \brief The 'ModuleInterfaceID' in which the data was ready and triggered this * notification. */ - ModuleOutputPair notification_source_; + ModuleInterfaceID notification_source_; public: /*! * \brief Constructing the DataNotify class. - * \param parent_output_pair The index of runtime which is sending out the data notification - * the data notification. + * \param parent_output_id The id of a runtime interface which is sending out the data + * notification. */ - explicit DataNotify(ModuleOutputPair parent_output_pair) { - notification_source_ = parent_output_pair; + explicit DataNotify(ModuleInterfaceID parent_output_id) { + notification_source_ = parent_output_id; } /*! * \brief Getting the notification source. * \return The first 'int' is the runtime index, and the second 'int' is the output index. */ - ModuleOutputPair GetNotifySource(void) { return notification_source_; } + ModuleInterfaceID GetNotifySource(void) { return notification_source_; } /*! *\brief Waiting for the notification. *\return Returning the value 'false' when the notification is in a 'exit' state, else @@ -116,6 +139,7 @@ class DataNotify { /*!brief Sending the notification when the notification state changes into 'exit'.*/ void ExitNotify(void) { exit_state_ = true; + memory_barrier(); Notify(); } /*! @@ -547,7 +571,8 @@ class BackendRuntime { // Getting the source which sends this notification. auto notify_source = notify->second->GetNotifySource(); // Loading the binding data. - while (!this->LoadBindingData(notify->first, notify_source.first, notify_source.second)) { + while (!this->LoadBindingData(notify->first, notify_source.runtime_idx, + notify_source.runtime_output_idx)) { // Waiting for the notification. if (!notify->second->Wait()) { VLOG(1) << "runtime index:" << runtime_idx_ << " receive exit notify."; @@ -558,8 +583,8 @@ class BackendRuntime { break; } VLOG(1) << "runtime_index.input_index:" << runtime_idx_ << "." << notify->first - << "from runtime_index.output_index:" << notify_source.first << "." - << notify_source.second; + << "from runtime_index.output_index:" << notify_source.runtime_idx << "." + << notify_source.runtime_output_idx; notifys.erase(notify); } return exit_notify; @@ -601,7 +626,7 @@ class BackendRuntime { LOG(FATAL) << "Not finding the input index " << input_index << " in runtime " << runtime_idx_; } parents_notify_[input_index] = - std::make_shared(std::make_pair(parent_idx, parent_output_idx)); + std::make_shared(ModuleInterfaceID(parent_idx, parent_output_idx)); } /*! * \brief Copying from a given tensor and using 'CPU' as the device. diff --git a/tests/python/relay/test_pipeline_executor.py b/tests/python/relay/test_pipeline_executor.py index 1fecf85d31d9..8ab2265db3d6 100644 --- a/tests/python/relay/test_pipeline_executor.py +++ b/tests/python/relay/test_pipeline_executor.py @@ -397,7 +397,7 @@ def test_pipeline(): # TODO(huajsj:) Replacing the checking logic with getting output logic. # Checking the statistic value of pipeline. statistic_time = 0 - while pipeline_module_test.number_pipe_execute < len(datas): + while pipeline_module_test.num_executing_pipeline < len(datas): statistic_time = statistic_time + 1 # Setting the timeout to 10 seconds. assert statistic_time < 10 From f1f4ca5135fe3b5ce179b8b12f4260c31923745a Mon Sep 17 00:00:00 2001 From: huajsj Date: Tue, 22 Feb 2022 23:57:08 -0800 Subject: [PATCH 08/10] address review comments. --- src/runtime/pipeline/pipeline_struct.h | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index b7e6061429a0..426368139819 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -37,7 +37,6 @@ namespace tvm { namespace runtime { #define GLOBAL_MODULE_INDEX -1 -#define memory_barrier() std::atomic_thread_fence(std::memory_order_acquire) /*! *\brief The function is used to build the binding configuration for a runtime. The first * 'int' is the output index of the current runtime, the second 'int' is the index of child @@ -94,9 +93,9 @@ class DataNotify { /*!\brief The mutex is used to protect the 'conditional variable'.*/ std::mutex mutex_; /*!\brief Whether a data is ready or not.*/ - volatile bool data_ready_ = false; + bool data_ready_ = false; /*!\brief Whether the thread should exit or not.*/ - volatile bool exit_state_ = false; + std::atomic exit_state_{false}; /*! * \brief The 'ModuleInterfaceID' in which the data was ready and triggered this * notification. @@ -138,15 +137,14 @@ class DataNotify { } /*!brief Sending the notification when the notification state changes into 'exit'.*/ void ExitNotify(void) { - exit_state_ = true; - memory_barrier(); + exit_state_.store(true, std::memory_order_release); Notify(); } /*! *\brief Getting the 'exit state'. *\return Returning the value of 'exit_state_' */ - bool GetExitState(void) { return exit_state_; } + bool GetExitState(void) { return exit_state_.load(std::memory_order_acquire); } }; /*! * \brief All binding information of a output interface. From 161631a018faf7d3866885a71f40e82a000f1a0a Mon Sep 17 00:00:00 2001 From: huajsj Date: Wed, 23 Feb 2022 13:21:32 -0800 Subject: [PATCH 09/10] address review comments. --- python/tvm/contrib/pipeline_executor.py | 2 +- src/runtime/pipeline/pipeline_executor.cc | 11 ++++------- src/runtime/pipeline/pipeline_executor.h | 2 +- src/runtime/pipeline/pipeline_scheduler.cc | 6 +++--- src/runtime/pipeline/pipeline_struct.h | 16 ++++++++-------- 5 files changed, 17 insertions(+), 20 deletions(-) diff --git a/python/tvm/contrib/pipeline_executor.py b/python/tvm/contrib/pipeline_executor.py index dc59bd0d73b2..bdf6019bc6c8 100644 --- a/python/tvm/contrib/pipeline_executor.py +++ b/python/tvm/contrib/pipeline_executor.py @@ -123,7 +123,7 @@ def __init__(self, module): self._get_output = self.module["get_output"] self._get_num_outputs = self.module["get_num_outputs"] self._get_input_pipeline_map = self.module["get_input_pipeline_map"] - self._get_pipe_execute_count = self.module["get_statistic_pipe_execute_count"] + self._get_pipe_execute_count = self.module["get_execute_count"] def run(self, sync=False): """Run the pipeline executor.""" diff --git a/src/runtime/pipeline/pipeline_executor.cc b/src/runtime/pipeline/pipeline_executor.cc index 9c88646f06f3..ccf5e09ebcf1 100644 --- a/src/runtime/pipeline/pipeline_executor.cc +++ b/src/runtime/pipeline/pipeline_executor.cc @@ -79,10 +79,9 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name, [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetOutput(); }); } else if (name == "run") { return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(args[0]); }); - } else if (name == "get_statistic_pipe_execute_count") { - return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { - *rv = this->PipelineStatisticPipeExecuterCount(); - }); + } else if (name == "get_execute_count") { + return PackedFunc( + [sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetExecutionCount(); }); } else { LOG(FATAL) << "Unknown packed function: " << name; return PackedFunc(); @@ -240,9 +239,7 @@ std::pair PipelineExecutor::GetInputIndex(const std::string& name) { /*! * \brief Getting the count of running pipeline. */ -int PipelineExecutor::PipelineStatisticPipeExecuterCount() { - return runtimes_.back()->GetStatisticPipelineExecuteNumber(); -} +int PipelineExecutor::GetExecutionCount() { return runtimes_.back()->GetExecutionCount(); } /*! * \brief Initialize the pipeline executor with a list of modules to be pipelined * and config in JSON format. diff --git a/src/runtime/pipeline/pipeline_executor.h b/src/runtime/pipeline/pipeline_executor.h index a887053a554b..1d547f206a95 100644 --- a/src/runtime/pipeline/pipeline_executor.h +++ b/src/runtime/pipeline/pipeline_executor.h @@ -98,7 +98,7 @@ class TVM_DLL PipelineExecutor : public ModuleNode { /*! * \brief Getting the count of running pipeline. */ - int PipelineStatisticPipeExecuterCount(); + int GetExecutionCount(); /*! * \brief Use the parameters group name to get the specific backend runtime then use * the param_key_name to set param data for the said backend runtime. diff --git a/src/runtime/pipeline/pipeline_scheduler.cc b/src/runtime/pipeline/pipeline_scheduler.cc index 58e0a74236d7..760bcd2c07a8 100644 --- a/src/runtime/pipeline/pipeline_scheduler.cc +++ b/src/runtime/pipeline/pipeline_scheduler.cc @@ -34,8 +34,8 @@ std::vector> PipelineScheduler::PipelineInit( graph_modules_ = modules; // Creating a list of runtimes. for (size_t i = 0; i < graph_modules_.size(); i++) { - auto runItem = std::make_shared(graph_modules_[i], i); - runtimes.push_back(runItem); + auto run_item = std::make_shared(graph_modules_[i], i); + runtimes.push_back(run_item); } // Creating a list of NDArray in order to storage the outputs data. auto global_output_map = pipeline_config.GetGlobalConfigOutputBindings(); @@ -106,7 +106,7 @@ void PipelineScheduler::PipelineRunSequential( void PipelineScheduler::PipelineRun(const std::vector>& runtimes, ConfigPipelineExecution pipeline_config, bool sequential_mode) { if (!sequential_mode) { - runtimes.front()->RunPipeLine(); + runtimes.front()->RunPipeline(); } else { PipelineRunSequential(runtimes, pipeline_config); } diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index 426368139819..72a2bd7be859 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -89,7 +89,7 @@ struct ModuleInterfaceID { class DataNotify { private: /*!\brief The 'contitional variable' is used to wait for notification.*/ - std::condition_variable notify_cv; + std::condition_variable notify_cv_; /*!\brief The mutex is used to protect the 'conditional variable'.*/ std::mutex mutex_; /*!\brief Whether a data is ready or not.*/ @@ -123,7 +123,7 @@ class DataNotify { */ bool Wait(void) { std::unique_lock lock(mutex_); - notify_cv.wait(lock, [&] { return this->data_ready_; }); + notify_cv_.wait(lock, [&] { return this->data_ready_; }); data_ready_ = false; return !exit_state_; } @@ -133,7 +133,7 @@ class DataNotify { std::lock_guard lock(mutex_); data_ready_ = true; } - notify_cv.notify_one(); + notify_cv_.notify_one(); } /*!brief Sending the notification when the notification state changes into 'exit'.*/ void ExitNotify(void) { @@ -538,8 +538,8 @@ class BackendRuntime { } else { // Only launching the worker thread for the runtimes after the first runtime. thread_ = std::thread([&]() { - while (!this->WaitAndLoadPipeLineData()) { - this->RunPipeLine(); + while (!this->WaitAndLoadPipelineData()) { + this->RunPipeline(); } VLOG(1) << "Runtime " << this->runtime_idx_ << " exit."; }); @@ -559,7 +559,7 @@ class BackendRuntime { * \brief Waiting for the internal forwarding data. * \return Returning 'true' when getting a 'exit' notification otherwise returning 'false'. */ - bool WaitAndLoadPipeLineData() { + bool WaitAndLoadPipelineData() { std::unordered_map> notifys = parents_notify_; bool exit_notify = false; while (!notifys.empty() && !exit_notify) { @@ -691,7 +691,7 @@ class BackendRuntime { * \brief Getting the times of using pipeline function. * \return The times of using pipeline function. */ - int GetStatisticPipelineExecuteNumber() const { return statistic_pipeline_execute_times_; } + int GetExecutionCount() const { return statistic_pipeline_execute_times_; } /*! * \brief Initializing data structures for the pipeline execution. * \param config The pipeline configueration. @@ -765,7 +765,7 @@ class BackendRuntime { /*!\brief Running the runtime.*/ void Run() { run_(); } /*!\brief Running the runtime in the pipeline mode.*/ - void RunPipeLine() { + void RunPipeline() { Run(); ForwardingOutputDataToChildren(); statistic_pipeline_execute_times_++; From c9497b027d66feefc717f30a30bf22f11642b444 Mon Sep 17 00:00:00 2001 From: huajsj Date: Wed, 23 Feb 2022 13:40:19 -0800 Subject: [PATCH 10/10] address review comments. --- src/runtime/pipeline/pipeline_struct.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/runtime/pipeline/pipeline_struct.h b/src/runtime/pipeline/pipeline_struct.h index 72a2bd7be859..05eeb221aa37 100644 --- a/src/runtime/pipeline/pipeline_struct.h +++ b/src/runtime/pipeline/pipeline_struct.h @@ -125,7 +125,7 @@ class DataNotify { std::unique_lock lock(mutex_); notify_cv_.wait(lock, [&] { return this->data_ready_; }); data_ready_ = false; - return !exit_state_; + return !GetExitState(); } /*!brief Sending the notification in which the related data is ready.*/ void Notify(void) {