diff --git a/paddle/fluid/framework/boxps_trainer.cc b/paddle/fluid/framework/boxps_trainer.cc index fb79516f23cbb..c7fde620f785f 100644 --- a/paddle/fluid/framework/boxps_trainer.cc +++ b/paddle/fluid/framework/boxps_trainer.cc @@ -141,16 +141,16 @@ void BoxPSTrainer::InitTrainerEnv(const ProgramDesc& main_program, std::set async_param_name; if (async_mode_) { - async_param_name = dense_table_->Init(*root_scope_, *param_need_sync_.get(), - persistable_vars_); + async_param_name = dense_table_->Init(*root_scope_, *param_need_sync_.get(), + persistable_vars_); } for (int i = 0; i < thread_num_; ++i) { auto this_worker = std::dynamic_pointer_cast(workers_[i]); this_worker->SetRootScope(root_scope_); if (async_mode_) { - this_worker->SetDenseTable(dense_table_.get()); - this_worker->SetAsyncParamName(async_param_name); + this_worker->SetDenseTable(dense_table_.get()); + this_worker->SetAsyncParamName(async_param_name); } this_worker->CreateDeviceResource(main_program); // CopyParameters(*root_scope_, i); diff --git a/paddle/fluid/framework/boxps_worker.cc b/paddle/fluid/framework/boxps_worker.cc index 3bdb430e7e6a6..e4f38c679020b 100644 --- a/paddle/fluid/framework/boxps_worker.cc +++ b/paddle/fluid/framework/boxps_worker.cc @@ -36,13 +36,13 @@ namespace framework { BoxPSAsynDenseTable::BoxPSAsynDenseTable(const int device_num) : device_num_(device_num) { - int buffer_size = device_num * 4; // magic number - device_grads_.resize(buffer_size); - buffer_poll_.reset(new PSBufferQueue(buffer_size)); - for (int i= 0; i < buffer_size; i++) { - buffer_poll_->Send(&device_grads_[i]); - } - VLOG(0) << "BoxPSAsynDenseTable init finish "; + int buffer_size = device_num * 4; // magic number + device_grads_.resize(buffer_size); + buffer_poll_.reset(new PSBufferQueue(buffer_size)); + for (int i = 0; i < buffer_size; i++) { + buffer_poll_->Send(&device_grads_[i]); + } + VLOG(0) << "BoxPSAsynDenseTable init finish "; } BoxPSAsynDenseTable::~BoxPSAsynDenseTable() {} @@ -69,10 +69,10 @@ std::set BoxPSAsynDenseTable::Init( async_param_list_.begin(), async_param_list_ .end()); // xx_param.b_0, xx_param_moment1_0, xx_param_moment2_0 - for (size_t i = 0; i < async_param_list_.size(); i+=3) { - const LoDTensor& root_tensor = - root_scope.FindVar(async_param_list_[i])->Get(); - total_param_len_ += root_tensor.numel(); + for (size_t i = 0; i < async_param_list_.size(); i += 3) { + const LoDTensor& root_tensor = + root_scope.FindVar(async_param_list_[i])->Get(); + total_param_len_ += root_tensor.numel(); } VLOG(0) << "alloc param length dense table:" << total_param_len_; @@ -80,8 +80,8 @@ std::set BoxPSAsynDenseTable::Init( mom1_.mutable_data({total_param_len_, 1}, platform::CPUPlace()); mom2_.mutable_data({total_param_len_, 1}, platform::CPUPlace()); for (size_t i = 0; i < device_grads_.size(); ++i) { - device_grads_[i].mutable_data({static_cast(total_param_len_), 1}, - platform::CPUPlace()); + device_grads_[i].mutable_data( + {static_cast(total_param_len_), 1}, platform::CPUPlace()); } int64_t offset = 0; @@ -93,12 +93,18 @@ std::set BoxPSAsynDenseTable::Init( auto dim = root_tensor.dims(); size_t len = root_tensor.numel(); if (i % 3 == 0) { - original_ps_[i].ShareDataWith(ps_.Slice(offset, offset + len)).Resize(dim); + original_ps_[i] + .ShareDataWith(ps_.Slice(offset, offset + len)) + .Resize(dim); } else if (i % 3 == 1) { - original_ps_[i].ShareDataWith(mom1_.Slice(offset, offset + len)).Resize(dim); + original_ps_[i] + .ShareDataWith(mom1_.Slice(offset, offset + len)) + .Resize(dim); } else { - original_ps_[i].ShareDataWith(mom2_.Slice(offset, offset + len)).Resize(dim); - offset += len; + original_ps_[i] + .ShareDataWith(mom2_.Slice(offset, offset + len)) + .Resize(dim); + offset += len; } TensorCopy(*static_cast(&root_tensor), platform::CPUPlace(), static_cast(&(original_ps_[i]))); @@ -129,13 +135,13 @@ std::set BoxPSAsynDenseTable::Init( std::map lr_map = box_ptr->GetLRMap(); int lr_index = 0; for (size_t i = 0; i < async_param_list_.size() / 3; ++i) { - float learning_rate = base_lr_; - if (lr_map.find(async_param_list_[i * 3]) != lr_map.end()) { - learning_rate = lr_map[async_param_list_[i * 3]]; - } - for (int j=0; j < original_ps_[i * 3].numel(); j++) { - all_lr_[lr_index++] = learning_rate; - } + float learning_rate = base_lr_; + if (lr_map.find(async_param_list_[i * 3]) != lr_map.end()) { + learning_rate = lr_map[async_param_list_[i * 3]]; + } + for (int j = 0; j < original_ps_[i * 3].numel(); j++) { + all_lr_[lr_index++] = learning_rate; + } } InitThreadGroup(); update_thread_ = new std::thread(&BoxPSAsynDenseTable::AsyncUpdate, this); @@ -155,7 +161,7 @@ void BoxPSAsynDenseTable::Finalize(void) { auto* root_tensor = root_scope_->Var(async_param_list_[i])->GetMutable(); TensorCopySync(*static_cast(&original_ps_[i]), - platform::CPUPlace(), root_tensor); + platform::CPUPlace(), root_tensor); } ps_buffer_ = nullptr; @@ -165,62 +171,62 @@ void BoxPSAsynDenseTable::Finalize(void) { } void BoxPSAsynDenseTable::ThreadUpdate(int thread_id, - std::vector & grad, size_t merge_num) { - float* grad_data = grad[0]->mutable_data(platform::CPUPlace()); - float* param_data = - ps_.mutable_data(platform::CPUPlace()); - float* mom1_data = mom1_.mutable_data(platform::CPUPlace()); - float* mom2_data = mom2_.mutable_data(platform::CPUPlace()); - // merge grad - const size_t start = thread_start_index_[thread_id]; - const size_t end = thread_end_index_[thread_id]; - if (merge_num == 2) { - LoDTensor* grad_tensor_1 = grad[1]; - float* grad_tensor_1_data = - grad_tensor_1->mutable_data(platform::CPUPlace()); - for (size_t j = start; j < end; ++j) { - grad_data[j] = (grad_data[j] + grad_tensor_1_data[j]) / 2; - } - } else if (merge_num == 3) { - LoDTensor* grad_tensor_1 = grad[1]; - LoDTensor* grad_tensor_2 = grad[2]; - float* grad_tensor_1_data = - grad_tensor_1->mutable_data(platform::CPUPlace()); - float* grad_tensor_2_data = - grad_tensor_2->mutable_data(platform::CPUPlace()); - for (size_t j = start; j < end; ++j) { - grad_data[j] = (grad_data[j] + grad_tensor_1_data[j] + grad_tensor_2_data[j]) / 3; - } - } else if (merge_num == 4) { - LoDTensor* grad_tensor_1 = grad[1]; - LoDTensor* grad_tensor_2 = grad[2]; - LoDTensor* grad_tensor_3 = grad[3]; - float* grad_tensor_1_data = - grad_tensor_1->mutable_data(platform::CPUPlace()); - float* grad_tensor_2_data = - grad_tensor_2->mutable_data(platform::CPUPlace()); - float* grad_tensor_3_data = - grad_tensor_3->mutable_data(platform::CPUPlace()); - for (size_t j = start; j < end; ++j) { - grad_data[j] = (grad_data[j] + grad_tensor_1_data[j] + - grad_tensor_2_data[j]+ grad_tensor_3_data[j]) / 4; - } - } - + const std::vector& grad, + size_t merge_num) { + float* grad_data = grad[0]->mutable_data(platform::CPUPlace()); + float* param_data = ps_.mutable_data(platform::CPUPlace()); + float* mom1_data = mom1_.mutable_data(platform::CPUPlace()); + float* mom2_data = mom2_.mutable_data(platform::CPUPlace()); + // merge grad + const size_t start = thread_start_index_[thread_id]; + const size_t end = thread_end_index_[thread_id]; + if (merge_num == 2) { + LoDTensor* grad_tensor_1 = grad[1]; + float* grad_tensor_1_data = + grad_tensor_1->mutable_data(platform::CPUPlace()); + for (size_t j = start; j < end; ++j) { + grad_data[j] = (grad_data[j] + grad_tensor_1_data[j]) / 2; + } + } else if (merge_num == 3) { + LoDTensor* grad_tensor_1 = grad[1]; + LoDTensor* grad_tensor_2 = grad[2]; + float* grad_tensor_1_data = + grad_tensor_1->mutable_data(platform::CPUPlace()); + float* grad_tensor_2_data = + grad_tensor_2->mutable_data(platform::CPUPlace()); + for (size_t j = start; j < end; ++j) { + grad_data[j] = + (grad_data[j] + grad_tensor_1_data[j] + grad_tensor_2_data[j]) / 3; + } + } else if (merge_num == 4) { + LoDTensor* grad_tensor_1 = grad[1]; + LoDTensor* grad_tensor_2 = grad[2]; + LoDTensor* grad_tensor_3 = grad[3]; + float* grad_tensor_1_data = + grad_tensor_1->mutable_data(platform::CPUPlace()); + float* grad_tensor_2_data = + grad_tensor_2->mutable_data(platform::CPUPlace()); + float* grad_tensor_3_data = + grad_tensor_3->mutable_data(platform::CPUPlace()); for (size_t j = start; j < end; ++j) { - mom1_data[j] = 0.99 * mom1_data[j] + - 0.01 * grad_data[j]; // magic beta and episilon - mom2_data[j] = - 0.9999 * mom2_data[j] + 0.0001 * grad_data[j] * grad_data[j]; - param_data[j] -= - all_lr_[j] * (mom1_data[j] / (sqrt(mom2_data[j]) + 1e-8)); + grad_data[j] = (grad_data[j] + grad_tensor_1_data[j] + + grad_tensor_2_data[j] + grad_tensor_3_data[j]) / + 4; } - return; + } + + for (size_t j = start; j < end; ++j) { + mom1_data[j] = + 0.99 * mom1_data[j] + 0.01 * grad_data[j]; // magic beta and episilon + mom2_data[j] = 0.9999 * mom2_data[j] + 0.0001 * grad_data[j] * grad_data[j]; + param_data[j] -= all_lr_[j] * (mom1_data[j] / (sqrt(mom2_data[j]) + 1e-8)); + } + return; } void BoxPSAsynDenseTable::AsyncUpdate() { VLOG(0) << "Begin AsyncUpdate"; - std::vector grad(4, nullptr); // max package + std::vector grad(4, nullptr); // max package auto box_ptr = BoxWrapper::GetInstance(); @@ -235,16 +241,15 @@ void BoxPSAsynDenseTable::AsyncUpdate() { AutoWRLock ps_lock(&ps_lock_); std::vector> wait_futures; for (int64_t i = 0; i < thread_num_; ++i) { - wait_futures.emplace_back(thread_pool->Run([this, i , &grad, merge_num]() { - ThreadUpdate(i, grad, merge_num); - })); + wait_futures.emplace_back(thread_pool->Run( + [this, i, &grad, merge_num]() { ThreadUpdate(i, grad, merge_num); })); } for (int64_t i = 0; i < thread_num_; ++i) { - wait_futures[i].get(); + wait_futures[i].get(); } for (size_t i = 0; i < merge_num; ++i) { - buffer_poll_->Send(grad[i]); + buffer_poll_->Send(grad[i]); } } @@ -253,41 +258,41 @@ void BoxPSAsynDenseTable::AsyncUpdate() { // async void BoxPSAsynDenseTable::PullDense(const platform::Place& place, - Tensor * tensor) { + Tensor* tensor) { // while(ps_buffer_->Size() != 0) {//Size have lock, may have perf problem. // And will hang when the lock was removed // ; // } AutoRDLock ps_lock(&ps_lock_); TensorCopy(*static_cast(&ps_), place, - static_cast(tensor)); + static_cast(tensor)); } void BoxPSAsynDenseTable::PushDense(const platform::Place& place, - Tensor * tensor) { - LoDTensor * grad = nullptr; + Tensor* tensor) { + LoDTensor* grad = nullptr; buffer_poll_->Receive(&grad); TensorCopy(*static_cast(tensor), platform::CPUPlace(), - static_cast(grad)); + static_cast(grad)); ps_buffer_->Send(grad); } void BoxPSAsynDenseTable::InitThreadGroup() { - thread_num_ = 32; - thread_start_index_.resize(thread_num_, 0); - thread_end_index_.resize(thread_num_, 0); - size_t prefix_sum = 0; - size_t thread_update_avg_len = total_param_len_ / thread_num_; - int unalloc_len = total_param_len_ % thread_num_; - for (int i = 0; i < thread_num_; i++) { - thread_start_index_[i] = prefix_sum; - if (i < unalloc_len) { - prefix_sum += thread_update_avg_len + 1; - } else { - prefix_sum += thread_update_avg_len; - } - thread_end_index_[i] = prefix_sum; + thread_num_ = 32; + thread_start_index_.resize(thread_num_, 0); + thread_end_index_.resize(thread_num_, 0); + size_t prefix_sum = 0; + size_t thread_update_avg_len = total_param_len_ / thread_num_; + int unalloc_len = total_param_len_ % thread_num_; + for (int i = 0; i < thread_num_; i++) { + thread_start_index_[i] = prefix_sum; + if (i < unalloc_len) { + prefix_sum += thread_update_avg_len + 1; + } else { + prefix_sum += thread_update_avg_len; } - thread_pool.reset(new paddle::framework::ThreadPool(thread_num_)); + thread_end_index_[i] = prefix_sum; + } + thread_pool.reset(new paddle::framework::ThreadPool(thread_num_)); } static const int DenseKStepNode = 1; @@ -377,7 +382,8 @@ int64_t BoxPSWorker::AllocParamTensorAsync() { int64_t total_param_len = 0; for (auto& var : block.AllVars()) { std::string name = var->Name(); - if (!var->Persistable() || async_param_name_.find(name) == async_param_name_.end()) { + if (!var->Persistable() || + async_param_name_.find(name) == async_param_name_.end()) { continue; } const LoDTensor& root_tensor = root_scope_->FindVar(name)->Get(); @@ -402,9 +408,9 @@ void BoxPSWorker::CreateDeviceResource(const ProgramDesc& main_prog) { int64_t pad_len = 0; if (sync_mode_ > 0) { - AllocParamTensor(&pad_len); + AllocParamTensor(&pad_len); } else if (dense_table_) { - AllocParamTensorAsync(); + AllocParamTensorAsync(); } auto& block = program_->Block(0); @@ -413,27 +419,32 @@ void BoxPSWorker::CreateDeviceResource(const ProgramDesc& main_prog) { int64_t offset = 0; int64_t grad_offset = 0; // make param and param@GRAD in same order - std::vector sorted_var = block.AllVars(); - std::sort(sorted_var.begin(), sorted_var.end(), [] - (const VarDesc * var1, const VarDesc * var2) { - return var1->Name() < var2->Name(); - }); + std::vector sorted_var = block.AllVars(); + std::sort(sorted_var.begin(), sorted_var.end(), + [](const VarDesc* var1, const VarDesc* var2) { + return var1->Name() < var2->Name(); + }); // init var and copy persistable for (auto& var : sorted_var) { std::string name = var->Name(); if (!var->Persistable()) { - if (dense_table_ && async_param_name_.find(name) != async_param_name_.end()) { - // parm@GRAD can not find in root_scope_ use parm length replace - const LoDTensor& root_tensor = - root_scope_->FindVar(name.substr(0, name.length() - 5))->Get(); - LoDTensor* gpu_tensor = thread_scope_->Var(name)->GetMutable(); - auto dim = root_tensor.dims(); - size_t len = root_tensor.numel(); - gpu_tensor->ShareDataWith(grad_async_.Slice(grad_offset, grad_offset + len)).Resize(dim); - grad_offset += len; + if (dense_table_ && + async_param_name_.find(name) != async_param_name_.end()) { + // parm@GRAD can not find in root_scope_ use parm length replace + const LoDTensor& root_tensor = + root_scope_->FindVar(name.substr(0, name.length() - 5)) + ->Get(); + LoDTensor* gpu_tensor = + thread_scope_->Var(name)->GetMutable(); + auto dim = root_tensor.dims(); + size_t len = root_tensor.numel(); + gpu_tensor + ->ShareDataWith(grad_async_.Slice(grad_offset, grad_offset + len)) + .Resize(dim); + grad_offset += len; } else { - auto* ptr = thread_scope_->Var(name); - InitializeVariable(ptr, var->GetType()); + auto* ptr = thread_scope_->Var(name); + InitializeVariable(ptr, var->GetType()); } } else { const LoDTensor& root_tensor = @@ -448,22 +459,23 @@ void BoxPSWorker::CreateDeviceResource(const ProgramDesc& main_prog) { offset += len; } } else if (dense_table_) { - if (async_param_name_.find(name) != async_param_name_.end()) { - auto dim = root_tensor.dims(); - size_t len = root_tensor.numel(); - gpu_tensor->ShareDataWith(param_async_.Slice(offset, offset + len)).Resize(dim); - offset += len; - } + if (async_param_name_.find(name) != async_param_name_.end()) { + auto dim = root_tensor.dims(); + size_t len = root_tensor.numel(); + gpu_tensor->ShareDataWith(param_async_.Slice(offset, offset + len)) + .Resize(dim); + offset += len; + } } TensorCopy(*static_cast(&root_tensor), place_, static_cast(gpu_tensor)); } } if (sync_mode_ > 0) { - CHECK(offset <= (param_sync_.numel() - pad_len)); + CHECK(offset <= (param_sync_.numel() - pad_len)); } else if (dense_table_) { - CHECK(offset <= param_async_.numel()); - CHECK(grad_offset <= grad_async_.numel()); + CHECK(offset <= param_async_.numel()); + CHECK(grad_offset <= grad_async_.numel()); } } void BoxPSWorker::SyncParam(void) { diff --git a/paddle/fluid/framework/device_worker.h b/paddle/fluid/framework/device_worker.h index f6f00aeb599cc..6ace491ee7841 100644 --- a/paddle/fluid/framework/device_worker.h +++ b/paddle/fluid/framework/device_worker.h @@ -33,9 +33,9 @@ limitations under the License. */ #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/reader.h" +#include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/framework/trainer_desc.pb.h" #include "paddle/fluid/framework/variable_helper.h" -#include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/operators/reader/blocking_queue.h" #include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/port.h" @@ -584,22 +584,21 @@ class SectionWorker : public DeviceWorker { #ifdef PADDLE_WITH_BOX_PS class BoxPSAsynDenseTable { - typedef operators::reader::BlockingQueue - PSBufferQueue; + typedef operators::reader::BlockingQueue PSBufferQueue; public: explicit BoxPSAsynDenseTable(const int device_num); ~BoxPSAsynDenseTable(); std::set Init(const Scope& root_scope, - const std::vector& param_need_sync, - const std::vector& persistable_vars); + const std::vector& param_need_sync, + const std::vector& persistable_vars); void Finalize(void); - void PullDense(const platform::Place& place, Tensor * tensor); - void PushDense(const platform::Place& place, Tensor * tensor); + void PullDense(const platform::Place& place, Tensor* tensor); + void PushDense(const platform::Place& place, Tensor* tensor); void InitThreadGroup(); - void ThreadUpdate(int thread_id, - std::vector & grad, size_t merge_num); + void ThreadUpdate(int thread_id, const std::vector& grad, + size_t merge_num); void AsyncUpdate(); private: @@ -616,8 +615,8 @@ class BoxPSAsynDenseTable { std::shared_ptr ps_buffer_ = nullptr; Scope* root_scope_ = nullptr; int64_t total_param_len_ = 0; - std::vector thread_start_index_; - std::vector thread_end_index_; + std::vector thread_start_index_; + std::vector thread_end_index_; std::shared_ptr thread_pool = nullptr; int thread_num_ = 0; RWLock ps_lock_; @@ -649,7 +648,9 @@ class BoxPSWorker : public DeviceWorker { void SetParamSyncStep(int step) { param_sync_step_ = step; } void SetDenseSyncMode(int mode) { sync_mode_ = mode; } void SetOneRing(bool one_ring) { one_ring_ = one_ring; } - void SetAsyncParamName(const std::set & async_param_name) {async_param_name_ = async_param_name;} + void SetAsyncParamName(const std::set& async_param_name) { + async_param_name_ = async_param_name; + } protected: int PackBatchTask(void);