Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[heterps]edit shrink and unseenday logit for pslib #36194

Merged
merged 6 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions paddle/fluid/framework/fleet/fleet_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,29 @@ void FleetWrapper::SaveModelOneTablePrefix(const uint64_t table_id,
#endif
}

void FleetWrapper::SetDate(const uint64_t table_id, const std::string& date) {
#ifdef PADDLE_WITH_PSLIB
assert(date.size() == 8);
int year = std::stoi(date.substr(0, 4));
int month = std::stoi(date.substr(4, 2));
int day = std::stoi(date.substr(6, 2));
struct std::tm b;
b.tm_year = year - 1900;
b.tm_mon = month - 1;
b.tm_mday = day;
b.tm_hour = b.tm_min = b.tm_sec = 0;
std::time_t seconds_from_1970 = std::mktime(&b);
int day_id = seconds_from_1970 / 86400;
auto ret = pslib_ptr_->_worker_ptr->set_day_id(table_id, day_id);
ret.wait();
if (ret.get() != 0) {
LOG(ERROR) << "setdate : " << date << " failed";
}
#else
VLOG(0) << "FleetWrapper::SetDate does nothing when no pslib";
#endif
}

void FleetWrapper::PrintTableStat(const uint64_t table_id) {
#ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->print_table_stat(table_id);
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/framework/fleet/fleet_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ class FleetWrapper {
// this performs better than rand_r, especially large data
std::default_random_engine& LocalRandomEngine();

void SetDate(const uint64_t table_id, const std::string& date);

#ifdef PADDLE_WITH_PSLIB
static std::shared_ptr<paddle::distributed::PSlib> pslib_ptr_;
#endif
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void HashTable<KeyType, ValType>::dump_to_cpu(int devid, cudaStream_t stream) {
downpour_value->resize(gpu_val.mf_size + downpour_value_size);
}
float* cpu_val = downpour_value->data();
cpu_val[0] = 0;
// cpu_val[0] = 0;
cpu_val[1] = gpu_val.delta_score;
cpu_val[2] = gpu_val.show;
cpu_val[3] = gpu_val.clk;
Expand Down
13 changes: 13 additions & 0 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,19 @@ void PSGPUWrapper::BuildTask(std::shared_ptr<HeterContext> gpu_task) {
VLOG(3) << "GpuPs shard: " << i << " key len: " << local_keys[i].size();
local_ptr[i].resize(local_keys[i].size());
}

#ifdef PADDLE_WITH_PSLIB
// get day_id: day nums from 1970
struct std::tm b;
b.tm_year = year_ - 1900;
b.tm_mon = month_ - 1;
b.tm_mday = day_;
b.tm_min = b.tm_hour = b.tm_sec = 0;
std::time_t seconds_from_1970 = std::mktime(&b);
int day_id = seconds_from_1970 / 86400;
fleet_ptr->pslib_ptr_->_worker_ptr->set_day_id(table_id_, day_id);
#endif

timeline.Start();
auto ptl_func = [this, &local_keys, &local_ptr, &fleet_ptr](int i) {
size_t key_size = local_keys[i].size();
Expand Down
9 changes: 9 additions & 0 deletions paddle/fluid/framework/fleet/ps_gpu_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ class PSGPUWrapper {
mf_max_bound);
}
}
void SetDate(int year, int month, int day) {
year_ = year;
month_ = month;
day_ = day;
}

void SetDataset(Dataset* dataset) { dataset_ = dataset; }

// PSGPUWrapper singleton
Expand Down Expand Up @@ -283,6 +289,9 @@ class PSGPUWrapper {
int thread_keys_thread_num_ = 37;
int thread_keys_shard_num_ = 37;
uint64_t max_fea_num_per_pass_ = 5000000000;
int year_;
int month_;
int day_;

std::shared_ptr<
paddle::framework::ChannelObject<std::shared_ptr<HeterContext>>>
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/pybind/fleet_wrapper_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ void BindFleetWrapper(py::module* m) {
.def("save_model_one_table", &framework::FleetWrapper::SaveModelOneTable)
.def("save_model_one_table_with_prefix",
&framework::FleetWrapper::SaveModelOneTablePrefix)
.def("set_date", &framework::FleetWrapper::SetDate)
.def("copy_table", &framework::FleetWrapper::CopyTable)
.def("copy_table_by_feasign",
&framework::FleetWrapper::CopyTableByFeasign);
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/pybind/ps_gpu_wrapper_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ void BindPSGPUWrapper(py::module* m) {
py::call_guard<py::gil_scoped_release>())
.def("init_GPU_server", &framework::PSGPUWrapper::InitializeGPUServer,
py::call_guard<py::gil_scoped_release>())
.def("set_date", &framework::PSGPUWrapper::SetDate,
py::call_guard<py::gil_scoped_release>())
.def("set_dataset", &framework::PSGPUWrapper::SetDataset,
py::call_guard<py::gil_scoped_release>())
.def("init_gpu_ps", &framework::PSGPUWrapper::InitializeGPU,
Expand Down
36 changes: 36 additions & 0 deletions python/paddle/distributed/fleet/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,42 @@ def _generate_local_tables_unlock(self, table_id, fea_dim, read_thread_num,
self.dataset.generate_local_tables_unlock(
table_id, fea_dim, read_thread_num, consume_thread_num, shard_num)

def set_date(self, date):
"""
:api_attr: Static Graph

Set training date for pull sparse parameters, saving and loading model. Only used in psgpu

Args:
date(str): training date(format : YYMMDD). eg.20211111

Examples:
.. code-block:: python

import paddle
paddle.enable_static()

dataset = paddle.distributed.InMemoryDataset()
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
var = paddle.static.data(
name=slot, shape=[None, 1], dtype="int64", lod_level=1)
slots_vars.append(var)
dataset.init(
batch_size=1,
thread_num=2,
input_type=1,
pipe_command="cat",
use_var=slots_vars)
dataset.set_date("20211111")
"""
year = int(date[:4])
month = int(date[4:6])
day = int(date[6:])
if self.use_ps_gpu and core._is_compiled_with_heterps():
self.psgpu.set_date(year, month, day)

def load_into_memory(self, is_shuffle=False):
"""
:api_attr: Static Graph
Expand Down
26 changes: 26 additions & 0 deletions python/paddle/fluid/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,32 @@ def generate_local_tables_unlock(self, table_id, fea_dim, read_thread_num,
self.dataset.generate_local_tables_unlock(
table_id, fea_dim, read_thread_num, consume_thread_num, shard_num)

def set_date(self, date):
"""
:api_attr: Static Graph

Set training date for pull sparse parameters, saving and loading model. Only used in psgpu

Args:
date(str): training date(format : YYMMDD). eg.20211111

Examples:
.. code-block:: python

import paddle.fluid as fluid
paddle.enable_static()

dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.set_date("20211111")
"""
year = int(date[:4])
month = int(date[4:6])
day = int(date[6:])
if self.use_ps_gpu and core._is_compiled_with_heterps():
self.psgpu.set_date(year, month, day)

@deprecated(
since="2.0.0",
update_to="paddle.distributed.InMemoryDataset.load_into_memory")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,15 @@ def save_one_table(self, table_id, model_dir, **kwargs):
self._fleet_ptr.save_model_one_table(table_id, model_dir, mode)
self._role_maker._barrier_worker()

def set_date(self, table_id, date):
"""
set_date, eg, 20210918
"""
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.set_date(table_id, str(date))
self._role_maker._barrier_worker()

def _set_opt_info(self, opt_info):
"""
this function saves the result from DistributedOptimizer.minimize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def test_communicator_ps_gpu(self):
batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars)
dataset.set_filelist(["test_communicator_ps_gpu.txt"])
dataset._set_use_ps_gpu(1)
dataset.set_date("20211111")
dataset.load_into_memory(is_shuffle=True)

os.environ["TEST_MODE"] = "1"
Expand All @@ -88,7 +89,6 @@ def test_communicator_ps_gpu(self):
pass
except Exception as e:
self.assertTrue(False)

time.sleep(10)
fleet.stop_worker()
os.remove("./test_communicator_ps_gpu.txt")
Expand Down