Skip to content

Commit

Permalink
[cherry-pick 2.2 heterps]bug fix for launch_utils.py (PaddlePaddle#37521
Browse files Browse the repository at this point in the history
)

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* fix. test=develop

* [heterps]bug fix for _run_from_dataset

* fix heter_server.cc

* fix launch_utils.py

* fix heter_section_worker.cc

* fix. test=develop

* fix. test=develop
  • Loading branch information
zmxdream authored and Zjq9409 committed Dec 10, 2021
1 parent 0994bb6 commit 9286e66
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 17 deletions.
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/service/heter_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ void HeterServer::StartHeterService() {

{
std::lock_guard<std::mutex> lock(this->mutex_ready_);
stoped_ = false;
ready_ = 1;
}
condition_ready_.notify_all();
std::unique_lock<std::mutex> running_lock(mutex_);
stoped_ = false;
cv_.wait(running_lock, [&] {
VLOG(1) << "Heter Server is Stop? " << stoped_;
return stoped_;
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/heter_section_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ void HeterSectionWorker::BatchPostProcess() {
DumpParam(*((*microbatch_scopes_)[0]), batch_num_);
}
// print each op time
if (thread_id_ == 0) {
if (debug_ && thread_id_ == 0) {
size_t total_ops_size = forward_ops_.size() + backward_ops_.size();
if (batch_num_ > 0 && batch_num_ % 100 == 0) {
for (size_t i = 0; i < total_ops_size; ++i) {
Expand Down
19 changes: 10 additions & 9 deletions python/paddle/distributed/fleet/launch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1088,8 +1088,6 @@ def get_role_endpoints(self, args):
heter_worker_endpoints_list = args.heter_workers.split(";")
self.heter_worker_endpoints = ""
for i in range(len(heter_worker_endpoints_list)):
if self.heter_worker_endpoints != "":
self.heter_worker_endpoints += ","
heter_worker_endpoints = heter_worker_endpoints_list[
i].split(",")
self.stage_heter_trainer_num.append(
Expand Down Expand Up @@ -1182,15 +1180,18 @@ def get_role_endpoints(self, args):
_, self.current_node_ip = get_host_name_ip()
else:
self.current_node_ip = pod_ip
assert self.current_node_ip in self.node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \
% (self.current_node_ip, self.node_ips)
self.node_rank = self.node_ips.index(self.current_node_ip)

logger.debug(
"parsed from args: node_ips:{} current_node_ip:{} node_rank:{}".
format(self.node_ips, self.current_node_ip, self.node_rank))
if not self.distribute_mode == DistributeMode.PS_HETER:
assert self.current_node_ip in self.node_ips, "Can't find your local ip {%s} in args.servers and args.workers ips: {%s}" \
% (self.current_node_ip, self.node_ips)
if self.current_node_ip in self.node_ips:
self.node_rank = self.node_ips.index(self.current_node_ip)
logger.debug(
"parsed from args: node_ips:{} current_node_ip:{} node_rank:{}".
format(self.node_ips, self.current_node_ip, self.node_rank))

def start_ps(self):
if not self.current_node_ip in self.node_ips:
return
cluster = Cluster(hdfs=None)
server_rank = 0
worker_rank = 0
Expand Down
8 changes: 2 additions & 6 deletions python/paddle/fluid/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1737,12 +1737,8 @@ def _run_from_dataset(self,
for var in program.global_block().vars.values():
if var.is_data:
data_vars.append(var)
if core.is_compiled_with_npu():
dataset = paddle.fluid.DatasetFactory().create_dataset(
'InMemoryDataset')
else:
dataset = paddle.fluid.DatasetFactory().create_dataset(
'FileInstantDataset')
dataset = paddle.fluid.DatasetFactory().create_dataset(
'InMemoryDataset')
dataset.set_batch_size(1)
dataset.set_thread(1)
dataset.set_filelist(['None'])
Expand Down

0 comments on commit 9286e66

Please sign in to comment.