diff --git a/paddle/fluid/framework/new_executor/interpretercore.cc b/paddle/fluid/framework/new_executor/interpretercore.cc index 91175f463ce90..883fc5dcb0ca7 100644 --- a/paddle/fluid/framework/new_executor/interpretercore.cc +++ b/paddle/fluid/framework/new_executor/interpretercore.cc @@ -77,108 +77,18 @@ paddle::framework::FetchList InterpreterCore::Run( return *(fetch_var->GetMutable()); } -void update_var_min_rw_op(const std::map>& op2dependences, - std::map>& var2min_rw_op, - int cur_op, int rw_var) { - // rw_var is inputs or outputs of cur_op - // this function update the var2min_rw_op set . - if (var2min_rw_op.find(rw_var) == var2min_rw_op.end()) - var2min_rw_op[rw_var] = std::list(); - for (auto dep_op : op2dependences.at(cur_op)) { - var2min_rw_op[rw_var].remove(dep_op); - } - var2min_rw_op[rw_var].push_back(cur_op); -} - -std::map> get_downstream_map( - const std::map>& op2dependences) { - // op2dependences is op -> it's dependences. we want to get op -> [ops] map, - // where ops is the next instruction of op. - std::map> result; - for (auto& item : op2dependences) { - int op = item.first; - for (auto dep_op : item.second) { - if (result.find(dep_op) == result.end()) - result[dep_op] = std::list(); - result[dep_op].push_back(op); - } - } - return std::move(result); -} - void InterpreterCore::BuildOperatorDependences() { - // set the dependecy_count_ and Call Schedule - // refer to http://agroup.baidu.com/share/md/92946214aa4c4785a2cc4c1f361a023c - // for pesudo code + // analysis the dependences between ops, set the dependecy_count_ and Call + // Schedule auto op_nums = vec_instruction_.size(); - auto var2min_rw_op = std::map< - int, std::list>(); // # map from variable id to read / write op id. - auto var2recent_write_op = - std::map(); // # map from variable to recent write op. - auto op2dependences = - std::map>(); //# map from op to the dependence list, - // op must run after the dependence. - std::set remove_duplicate; - - // reserve - for (size_t op = 0; op < vec_instruction_.size(); ++op) { - op2dependences[op] = std::set(); - } dependecy_count_.resize(op_nums); - - for (size_t op = 0; op < vec_instruction_.size(); ++op) { - remove_duplicate.clear(); - // step1: update the op2dependences structure - for (auto& item : - vec_instruction_[op].Inputs()) { // for all inputs(read only) - for (auto var : item.second) { - if (var2recent_write_op.count(var)) - op2dependences[op].insert(var2recent_write_op[var]); - } - } - - for (auto& item : vec_instruction_[op].Outputs()) { // for all write vars - for (auto var : item.second) { - if (var2min_rw_op.count(var)) { - for (auto dep_op : var2min_rw_op[var]) { - op2dependences[op].insert(dep_op); - } - } - } - } - - // step2: update 2 var2xxxx data structure - for (auto& item : - vec_instruction_[op].Inputs()) { // for all inputs(read only) - for (auto var : item.second) { - update_var_min_rw_op(op2dependences, var2min_rw_op, op, var); - remove_duplicate.insert(var); - } - } - - for (auto& item : vec_instruction_[op].Outputs()) { // for all write vars - for (auto var : item.second) { - var2recent_write_op[var] = op; - if (remove_duplicate.count(var) == - 0) { // var in input list and in output list, so remove it. - update_var_min_rw_op(op2dependences, var2min_rw_op, op, var); - } - } - } - } - - auto op2downstream = get_downstream_map(op2dependences); - - VLOG(5) << "the size of vec_instruction_ : " << vec_instruction_.size(); - + auto op2downstream = interpreter::build_op_downstream_map(vec_instruction_); for (size_t op = 0; op < vec_instruction_.size(); ++op) { - VLOG(5) << "the op2downstream : " << op; auto op_list = op2downstream[op]; std::vector downsteam_vector(op_list.begin(), op_list.end()); stream_analyzer_.Schedule(downsteam_vector, &vec_instruction_, op); for (auto inst_id : op_list) { - VLOG(5) << "\t " << inst_id; dependecy_count_[inst_id]++; } } diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index 011b1b6dece8e..b135c214bd33c 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -631,6 +631,97 @@ std::vector merge_vector(const std::vector& first, return out; } +void update_var_min_rw_op(const std::map>& op2dependences, + std::map>& var2min_rw_op, + int cur_op, int rw_var) { + // rw_var is inputs or outputs of cur_op + // this function update the var2min_rw_op set . + if (var2min_rw_op.find(rw_var) == var2min_rw_op.end()) + var2min_rw_op[rw_var] = std::list(); + for (auto dep_op : op2dependences.at(cur_op)) { + var2min_rw_op[rw_var].remove(dep_op); + } + var2min_rw_op[rw_var].push_back(cur_op); +} + +std::map> get_downstream_map( + const std::map>& op2dependences) { + // op2dependences is op -> it's dependences. we want to get op -> [ops] map, + // where ops is the next instruction of op. + std::map> result; + for (auto& item : op2dependences) { + int op = item.first; + for (auto dep_op : item.second) { + if (result.find(dep_op) == result.end()) + result[dep_op] = std::list(); + result[dep_op].push_back(op); + } + } + return std::move(result); +} + +std::map> build_op_downstream_map( + const std::vector& vec_instruction) { + auto var2min_rw_op = std::map< + int, std::list>(); // # map from variable id to read / write op id. + auto var2recent_write_op = + std::map(); // # map from variable to recent write op. + auto op2dependences = + std::map>(); //# map from op to the dependence list, + // op must run after the dependence. + std::set + remove_duplicate; // remove the duplicate between inputs and outputs + + // reserve + for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) { + op2dependences[op_idx] = std::set(); + } + + for (size_t op_idx = 0; op_idx < vec_instruction.size(); ++op_idx) { + remove_duplicate.clear(); + // step1: update the op2dependences structure + for (auto& item : + vec_instruction[op_idx].Inputs()) { // for all inputs(read only) + for (auto var : item.second) { + if (var2recent_write_op.count(var)) + op2dependences[op_idx].insert(var2recent_write_op[var]); + } + } + + for (auto& item : + vec_instruction[op_idx].Outputs()) { // for all write vars + for (auto var : item.second) { + if (var2min_rw_op.count(var)) { + for (auto dep_op : var2min_rw_op[var]) { + op2dependences[op_idx].insert(dep_op); + } + } + } + } + + // step2: update 2 var2xxxx data structure + for (auto& item : + vec_instruction[op_idx].Inputs()) { // for all inputs(read only) + for (auto var : item.second) { + update_var_min_rw_op(op2dependences, var2min_rw_op, op_idx, var); + remove_duplicate.insert(var); + } + } + + for (auto& item : + vec_instruction[op_idx].Outputs()) { // for all write vars + for (auto var : item.second) { + var2recent_write_op[var] = op_idx; + if (remove_duplicate.count(var) == + 0) { // var in input list and in output list, so remove it. + update_var_min_rw_op(op2dependences, var2min_rw_op, op_idx, var); + } + } + } + } + return std::move(get_downstream_map(op2dependences)); +} + } // namespace interpreter } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.h b/paddle/fluid/framework/new_executor/interpretercore_util.h index 375fed2356a01..f3b1a8a6b4a53 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.h +++ b/paddle/fluid/framework/new_executor/interpretercore_util.h @@ -105,6 +105,9 @@ void build_op_func_list(const platform::Place& place, std::vector* vec_func_list, VariableScope* var_scope); +std::map> build_op_downstream_map( + const std::vector& vec_instruction); + void add_fetch(const std::vector& fetch_names, framework::BlockDesc* block);