Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet Executor] Fix runtime graph based gpt, add debug message #37361

Merged
merged 1 commit into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ FleetExecutor::~FleetExecutor() {

void FleetExecutor::Init(const paddle::framework::ProgramDesc& program_desc) {
runtime_graph_ = std::make_unique<RuntimeGraph>(program_desc, exe_desc_);
VLOG(5) << runtime_graph_->DebugString();
InitCarrier();
InitMessageBus();
}
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/distributed/fleet_executor/fleet_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once
#include <memory>
#include <string>

#include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h"
#include "paddle/fluid/platform/macros.h"
Expand Down
44 changes: 27 additions & 17 deletions paddle/fluid/distributed/fleet_executor/runtime_graph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,24 @@ using OpRole = paddle::framework::OpRole;
using OpRegistry = paddle::framework::OpRegistry;
using ProgramDesc = paddle::framework::ProgramDesc;

bool IsForward(int64_t op_role) {
return (op_role == static_cast<int64_t>(OpRole::kForward)) ||
(op_role == (static_cast<int64_t>(OpRole::kForward) |
static_cast<int64_t>(OpRole::kLoss)));
bool IsForward(int32_t op_role) {
return (op_role == static_cast<int32_t>(OpRole::kForward)) ||
(op_role == (static_cast<int32_t>(OpRole::kForward) |
static_cast<int32_t>(OpRole::kLoss)));
}

bool IsLRSched(int64_t op_role) {
return op_role == static_cast<int64_t>(OpRole::kLRSched);
bool IsLRSched(int32_t op_role) {
return op_role == static_cast<int32_t>(OpRole::kLRSched);
}

bool IsBackward(int64_t op_role) {
return (op_role == static_cast<int64_t>(OpRole::kBackward)) ||
(op_role == (static_cast<int64_t>(OpRole::kBackward) |
static_cast<int64_t>(OpRole::kLoss)));
bool IsBackward(int32_t op_role) {
return (op_role == static_cast<int32_t>(OpRole::kBackward)) ||
(op_role == (static_cast<int32_t>(OpRole::kBackward) |
static_cast<int32_t>(OpRole::kLoss)));
}

bool IsOptimize(int64_t op_role) {
return op_role == static_cast<int64_t>(OpRole::kOptimize);
bool IsOptimize(int32_t op_role) {
return op_role == static_cast<int32_t>(OpRole::kOptimize);
}

struct DistCoord {
Expand Down Expand Up @@ -112,9 +112,9 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) {
for (const auto& op_desc : program.Block(0).AllOps()) {
ops_.emplace_back(OpRegistry::CreateOp(*op_desc));
}
std::unordered_map<int64_t, std::vector<OperatorBase*>> role_to_ops;
std::unordered_map<int32_t, std::vector<OperatorBase*>> role_to_ops;
for (const auto& op : ops_) {
int64_t op_role = op->Attr<int64_t>("op_role");
int32_t op_role = op->Attr<int32_t>("op_role");
OpRole new_op_role;
if (IsLRSched(op_role)) {
new_op_role = OpRole::kLRSched;
Expand All @@ -129,7 +129,7 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) {
"The op %s is None of LRSched, Forward, Backward or Optimize.",
op->Type()));
}
int64_t new_op_role_id = static_cast<int64_t>(new_op_role);
int32_t new_op_role_id = static_cast<int32_t>(new_op_role);
if (role_to_ops.find(new_op_role_id) == role_to_ops.end()) {
role_to_ops.insert({new_op_role_id, {}});
}
Expand All @@ -147,7 +147,7 @@ void RuntimeGraph::SplitProgramBasedFunctionality(const ProgramDesc& program) {
int64_t task_id = cur_rank * functionality_order.size();
for (std::size_t i = 0; i < functionality_order.size(); ++i) {
OpRole role = functionality_order[i];
int64_t role_id = static_cast<int64_t>(role);
int32_t role_id = static_cast<int64_t>(role);
int64_t max_run_times = num_micro_batches;
int64_t max_slot_nums = start_up_steps;
if (IsLRSched(role_id) || IsOptimize(role_id)) {
Expand Down Expand Up @@ -225,12 +225,22 @@ void RuntimeGraph::FakeRuntimeInfo() {
int64_t nrank = exe_desc_.cluster_info().size();
int32_t num_of_functionality = functionality_order.size();
for (int64_t i = 0; i < nrank; ++i) {
for (int64_t j = 0; j < num_of_functionality; ++j) {
for (int32_t j = 0; j < num_of_functionality; ++j) {
int64_t intercepter_id = i * num_of_functionality + j;
intercepter_id_to_rank_.insert({intercepter_id, i});
}
}
}

std::string RuntimeGraph::DebugString() const {
std::ostringstream os;
os << "\nRuntime Graph Debug: \n";
for (const auto& task : task_nodes_) {
os << task->DebugString();
os << "\n";
}
return os.str();
}

} // namespace distributed
} // namespace paddle
2 changes: 2 additions & 0 deletions paddle/fluid/distributed/fleet_executor/runtime_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h"
Expand Down Expand Up @@ -43,6 +44,7 @@ class RuntimeGraph final {
const std::unordered_map<int64_t, int64_t>& intercepter_id_to_rank() const {
return intercepter_id_to_rank_;
}
std::string DebugString() const;

private:
DISABLE_COPY_AND_ASSIGN(RuntimeGraph);
Expand Down
18 changes: 14 additions & 4 deletions paddle/fluid/distributed/fleet_executor/task_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace {
using OperatorBase = TaskNode::OperatorBase;
}

TaskNode::TaskNode(int64_t role, const std::vector<OperatorBase*>& ops,
TaskNode::TaskNode(int32_t role, const std::vector<OperatorBase*>& ops,
int64_t rank, int64_t task_id, int64_t max_run_times,
int64_t max_slot_nums)
: ops_(ops),
Expand All @@ -31,15 +31,15 @@ TaskNode::TaskNode(int64_t role, const std::vector<OperatorBase*>& ops,
max_run_times_(max_run_times),
max_slot_nums_(max_slot_nums) {}

TaskNode::TaskNode(int64_t role, int64_t rank, int64_t task_id,
TaskNode::TaskNode(int32_t role, int64_t rank, int64_t task_id,
int64_t max_run_times, int64_t max_slot_nums)
: role_(role),
rank_(rank),
task_id_(task_id),
max_run_times_(max_run_times),
max_slot_nums_(max_slot_nums) {}

std::unique_ptr<TaskNode> TaskNode::CreateEmptyTaskNode(int64_t role,
std::unique_ptr<TaskNode> TaskNode::CreateEmptyTaskNode(int32_t role,
int64_t rank,
int64_t task_id,
int64_t max_run_times,
Expand All @@ -49,7 +49,7 @@ std::unique_ptr<TaskNode> TaskNode::CreateEmptyTaskNode(int64_t role,
}

std::unique_ptr<TaskNode> TaskNode::CreateTaskNode(
int64_t role, const std::vector<OperatorBase*>& ops, int64_t rank,
int32_t role, const std::vector<OperatorBase*>& ops, int64_t rank,
int64_t task_id, int64_t max_run_times, int64_t max_slot_nums) {
return std::make_unique<TaskNode>(role, ops, rank, task_id, max_run_times,
max_slot_nums);
Expand All @@ -60,5 +60,15 @@ void TaskNode::AddUpstreamTask(int64_t task_id) { upstream_.insert(task_id); }
void TaskNode::AddDownstreamTask(int64_t task_id) {
downstream_.insert(task_id);
}

std::string TaskNode::DebugString() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some information about the upstream and downstream task id? In next PR?

std::ostringstream os;
os << "role: " << role_ << ", task_id: " << task_id_ << "\n";
for (std::size_t i = 0; i < ops_.size(); ++i) {
os << ops_[i]->Type() << " ";
}
os << "\n";
return os.str();
}
} // namespace distributed
} // namespace paddle
13 changes: 7 additions & 6 deletions paddle/fluid/distributed/fleet_executor/task_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,28 @@ namespace distributed {
class TaskNode final {
public:
using OperatorBase = paddle::framework::OperatorBase;
TaskNode(int64_t role, int64_t rank, int64_t task_id, int64_t max_run_times,
TaskNode(int32_t role, int64_t rank, int64_t task_id, int64_t max_run_times,
int64_t max_slot_nums);
TaskNode(int64_t role, const std::vector<OperatorBase*>& ops, int64_t rank,
TaskNode(int32_t role, const std::vector<OperatorBase*>& ops, int64_t rank,
int64_t task_id, int64_t max_run_times, int64_t max_slot_nums);
~TaskNode() = default;
int64_t rank() const { return rank_; }
int64_t task_id() const { return task_id_; }
int64_t role() const { return role_; }
int32_t role() const { return role_; }
int64_t max_run_times() const { return max_run_times_; }
int64_t max_slot_nums() const { return max_slot_nums_; }
const std::unordered_set<int64_t>& upstream() const { return upstream_; }
const std::unordered_set<int64_t>& downstream() const { return downstream_; }
void AddUpstreamTask(int64_t task_id);
void AddDownstreamTask(int64_t task_id);
static std::unique_ptr<TaskNode> CreateEmptyTaskNode(int64_t role,
std::string DebugString() const;
static std::unique_ptr<TaskNode> CreateEmptyTaskNode(int32_t role,
int64_t rank,
int64_t task_id,
int64_t max_run_times,
int64_t max_slot_nums);
static std::unique_ptr<TaskNode> CreateTaskNode(
int64_t role, const std::vector<OperatorBase*>& ops, int64_t rank,
int32_t role, const std::vector<OperatorBase*>& ops, int64_t rank,
int64_t task_id, int64_t max_run_times, int64_t max_slot_nums);

private:
Expand All @@ -57,7 +58,7 @@ class TaskNode final {
std::vector<OperatorBase*> ops_;
std::unordered_set<int64_t> upstream_;
std::unordered_set<int64_t> downstream_;
int64_t role_;
int32_t role_;
int64_t rank_;
int64_t task_id_;
int64_t max_run_times_;
Expand Down