Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add slot attr for push sparse op #44422

Merged
merged 6 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions paddle/fluid/distributed/ps/wrapper/fleet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,10 +529,12 @@ void FleetWrapper::PushSparseFromTensorAsync(
uint64_t padding_id,
platform::Place place,
std::vector<const LoDTensor*>* inputs,
std::vector<int>& slots,
const LoDTensor* shows,
const LoDTensor* clks,
std::vector<LoDTensor*>* outputs,
bool use_cvm_op) {
CHECK(slots.size() == inputs->size());
int batch_size = -1;
bool batch_size_consist = true;
for (auto* input : *inputs) {
Expand Down Expand Up @@ -568,8 +570,8 @@ void FleetWrapper::PushSparseFromTensorAsync(
// TODO(zhaocaibei123): check type of show/clk is int? float? uint64?
// const long int* show_tensor = shows->data<int64_t>();
// const long int* clk_tensor = clks->data<int64_t>();
const int64_t* show_tensor = shows->data<int64_t>();
const int64_t* clk_tensor = clks->data<int64_t>();
const float* show_tensor = shows->data<float>();
const float* clk_tensor = clks->data<float>();

for (size_t index = 0; index < inputs->size(); ++index) {
framework::LoDTensor* g_tensor = outputs->at(index);
Expand Down Expand Up @@ -603,15 +605,14 @@ void FleetWrapper::PushSparseFromTensorAsync(
push_keys.emplace_back(real_id);
if (use_cvm_op) {
push_values.emplace_back(fea_dim + 1);
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[0] = static_cast<float>(slots[index]);
float* data = push_values.back().data() + 1;
memcpy(data, g + output_len, sizeof(float) * fea_dim);
} else {
push_values.emplace_back(fea_dim + 3);
// slot show clk grad... consistent with CtrCommonPushValue defined
// in
// ctr_accessor.h
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
// in ctr_accessor.h
push_values.back()[0] = static_cast<float>(slots[index]);
push_values.back()[1] =
(i >= show_size ? 1 : static_cast<float>(show_tensor[i]));
push_values.back()[2] =
Expand All @@ -631,18 +632,16 @@ void FleetWrapper::PushSparseFromTensorAsync(
push_keys.emplace_back(real_id);
if (use_cvm_op) {
push_values.emplace_back(fea_dim + 1);
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[0] = static_cast<float>(slots[index]);
float* data = push_values.back().data() + 1;
memcpy(data, g + output_len, sizeof(float) * fea_dim);
} else {
push_values.emplace_back(fea_dim + 3);
// slot show clk grad... consistent with CtrCommonPushValue defined in
// ctr_accessor.h
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[1] =
(i >= show_size ? 1 : static_cast<float>(show_tensor[i]));
push_values.back()[2] =
(i >= clk_size ? 0 : static_cast<float>(clk_tensor[i]));
push_values.back()[0] = static_cast<float>(slots[index]);
push_values.back()[1] = (i >= show_size ? 1 : show_tensor[i]);
push_values.back()[2] = (i >= clk_size ? 0 : clk_tensor[i]);
float* data = push_values.back().data() + 3;
memcpy(data, g + output_len, sizeof(float) * fea_dim);
}
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/distributed/ps/wrapper/fleet.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,13 @@ class FleetWrapper {
const std::vector<std::string>& input_names,
std::vector<const LoDTensor*>* inputs, // NOLINT
std::vector<const LoDTensor*>* outputs); // NOLINT

void PushSparseFromTensorAsync(const uint64_t table_id,
int fea_dim,
uint64_t padding_id,
platform::Place place,
std::vector<const LoDTensor*>* inputs,
std::vector<int>& slots, // NOLINT
const LoDTensor* shows,
const LoDTensor* clicks,
std::vector<LoDTensor*>* outputs,
Expand Down
14 changes: 7 additions & 7 deletions paddle/fluid/framework/data_feed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ void PrivateQueueDataFeed<T>::ReadThread() {
std::string filename;
while (PickOneFile(&filename)) {
int err_no = 0;
fp_ = fs_open_read(filename, &err_no, pipe_command_);
fp_ = fs_open_read(filename, &err_no, pipe_command_, true);
__fsetlocking(&*fp_, FSETLOCKING_BYCALLER);
T instance;
while (ParseOneInstanceFromPipe(&instance)) {
Expand Down Expand Up @@ -538,7 +538,7 @@ void InMemoryDataFeed<T>::LoadIntoMemory() {
} else {
#endif
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true);
#ifdef PADDLE_WITH_BOX_PS
}
#endif
Expand Down Expand Up @@ -574,7 +574,7 @@ void InMemoryDataFeed<T>::LoadIntoMemoryFromSo() {
(defined PADDLE_WITH_PSLIB)
VLOG(3) << "LoadIntoMemoryFromSo() begin, thread_id=" << thread_id_;
int buf_len = 1024 * 1024 * 10;
char* buf = (char*)malloc(buf_len + 10);
char* buf = reinterpret_cast<char*>(malloc(buf_len + 10));
auto ps_gpu_ptr = PSGPUWrapper::GetInstance();

paddle::framework::CustomParser* parser =
Expand Down Expand Up @@ -681,7 +681,7 @@ void MultiSlotDataFeed::ReadThread() {
std::string filename;
while (PickOneFile(&filename)) {
int err_no = 0;
fp_ = fs_open_read(filename, &err_no, pipe_command_);
fp_ = fs_open_read(filename, &err_no, pipe_command_, true);
CHECK(fp_ != nullptr);
__fsetlocking(&*fp_, FSETLOCKING_BYCALLER);
std::vector<MultiSlotType> instance;
Expand Down Expand Up @@ -2175,7 +2175,7 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByFile(void) {
lines);
} else {
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true);

CHECK(this->fp_ != nullptr);
__fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);
Expand Down Expand Up @@ -2265,7 +2265,7 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByLine(void) {

do {
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true);
CHECK(this->fp_ != nullptr);
__fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);
lines = line_reader.read_file(this->fp_.get(), line_func, lines);
Expand Down Expand Up @@ -2314,7 +2314,7 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByCommand(void) {

do {
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true);
CHECK(this->fp_ != nullptr);
__fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);

Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/data_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void DatasetImpl<T>::SetHdfsConfig(const std::string& fs_name,
cmd += " -D fs.default.name=" + fs_name;
cmd += " -D hadoop.job.ugi=" + fs_ugi;
cmd += " -Ddfs.client.block.write.retries=15 -Ddfs.rpc.timeout=500000";
paddle::framework::hdfs_set_command(cmd);
paddle::framework::dataset_hdfs_set_command(cmd);
}

template <typename T>
Expand Down
40 changes: 33 additions & 7 deletions paddle/fluid/framework/io/fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,20 @@ const std::string& hdfs_command() { return hdfs_command_internal(); }

void hdfs_set_command(const std::string& x) { hdfs_command_internal() = x; }

// dataset and model may be on different afs cluster
static std::string& dataset_hdfs_command_internal() {
static std::string x = "hadoop fs";
return x;
}

const std::string& dataset_hdfs_command() {
return dataset_hdfs_command_internal();
}

void dataset_hdfs_set_command(const std::string& x) {
dataset_hdfs_command_internal() = x;
}

static std::string& customized_download_cmd_internal() {
static std::string x = "";
return x;
Expand All @@ -243,17 +257,28 @@ void set_download_command(const std::string& x) {

std::shared_ptr<FILE> hdfs_open_read(std::string path,
int* err_no,
const std::string& converter) {
const std::string& converter,
bool read_data) {
if (download_cmd() != "") { // use customized download command
path = string::format_string(
"%s \"%s\"", download_cmd().c_str(), path.c_str());
} else {
if (fs_end_with_internal(path, ".gz")) {
path = string::format_string(
"%s -text \"%s\"", hdfs_command().c_str(), path.c_str());
if (read_data) {
path = string::format_string(
"%s -text \"%s\"", dataset_hdfs_command().c_str(), path.c_str());
} else {
path = string::format_string(
"%s -text \"%s\"", hdfs_command().c_str(), path.c_str());
}
} else {
path = string::format_string(
"%s -cat \"%s\"", hdfs_command().c_str(), path.c_str());
if (read_data) {
path = string::format_string(
"%s -cat \"%s\"", dataset_hdfs_command().c_str(), path.c_str());
} else {
path = string::format_string(
"%s -cat \"%s\"", hdfs_command().c_str(), path.c_str());
}
}
}

Expand Down Expand Up @@ -370,13 +395,14 @@ int fs_select_internal(const std::string& path) {

std::shared_ptr<FILE> fs_open_read(const std::string& path,
int* err_no,
const std::string& converter) {
const std::string& converter,
bool read_data) {
switch (fs_select_internal(path)) {
case 0:
return localfs_open_read(path, converter);

case 1:
return hdfs_open_read(path, err_no, converter);
return hdfs_open_read(path, err_no, converter, read_data);

default:
PADDLE_THROW(platform::errors::Unimplemented(
Expand Down
10 changes: 8 additions & 2 deletions paddle/fluid/framework/io/fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,18 @@ extern const std::string& hdfs_command();

extern void hdfs_set_command(const std::string& x);

extern const std::string& dataset_hdfs_command();

extern void dataset_hdfs_set_command(const std::string& x);

extern const std::string& download_cmd();

extern void set_download_command(const std::string& x);

extern std::shared_ptr<FILE> hdfs_open_read(std::string path,
int* err_no,
const std::string& converter);
const std::string& converter,
bool read_data);

extern std::shared_ptr<FILE> hdfs_open_write(std::string path,
int* err_no,
Expand All @@ -91,7 +96,8 @@ extern void hdfs_mv(const std::string& src, const std::string& dest);
// aut-detect fs
extern std::shared_ptr<FILE> fs_open_read(const std::string& path,
int* err_no,
const std::string& converter);
const std::string& converter,
bool read_data = false);

extern std::shared_ptr<FILE> fs_open_write(const std::string& path,
int* err_no,
Expand Down
13 changes: 13 additions & 0 deletions paddle/fluid/framework/io/test_fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,18 @@ TEST(FS, mv) {
} catch (...) {
VLOG(3) << "test hdfs_mv, catch expected errors of unknown prefix";
}

try {
paddle::framework::dataset_hdfs_set_command(
"hadoop -D hadoop.job.ugi=anotherxxx fs -text");
int err_no = 0;
paddle::framework::hdfs_open_read("afs:/none.gz", &err_no, "", true);
paddle::framework::hdfs_open_read("afs:/none.gz", &err_no, "", false);
paddle::framework::hdfs_open_read("afs:/none", &err_no, "", true);
paddle::framework::hdfs_open_read("afs:/none", &err_no, "", false);
} catch (...) {
VLOG(3) << "test hdfs_open_read, catch expected errors of unknown path";
}

#endif
}
1 change: 1 addition & 0 deletions paddle/fluid/operators/lookup_table_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class LookupTableOpMaker : public framework::OpProtoAndCheckerMaker {
"in the order of input variables for mapping")
.SetDefault({});
AddAttr<int>("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0);
AddAttr<int>("slot", "slot of id").SetDefault(0).AsExtra();
AddAttr<bool>("grad_inplace",
"(boolean, default false) "
"If the grad op reuse the input's variable.")
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/operators/lookup_table_v2_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class LookupTableV2OpMaker : public framework::OpProtoAndCheckerMaker {
AddAttr<int>("trainer_id", "trainer id from 0 ~ worker_num.")
.SetDefault(0)
.AsExtra();
AddAttr<int>("slot", "slot of id").SetDefault(0).AsExtra();
AddAttr<std::vector<int64_t>>("height_sections",
"Height for each output SelectedRows.")
.SetDefault(std::vector<int64_t>({}))
Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/operators/pscore/distributed_push_sparse_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ class DistributedPushSparseOpMaker : public framework::OpProtoAndCheckerMaker {
AddAttr<bool>("use_cvm_op", "(boolean, default false) Use cvm op or not.")
.SetDefault(false);

AddAttr<std::vector<int>>("slots",
"[slot_id1, slot_id2] Slots array of Ids.")
.SetDefault({})
.AsExtra();

AddComment(R"DOC(
Lookup Tablel Prefetch Operator.
This operator is used to perform lookup on parameter W,
Expand Down
3 changes: 3 additions & 0 deletions paddle/fluid/operators/pscore/distributed_push_sparse_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class DistributedPushSparseKernel : public framework::OpKernel<T> {
auto table_id = context.Attr<int>("table_id");
auto emb_dim = context.Attr<int>("size");
auto use_cvm_op = context.Attr<bool>("use_cvm_op");
auto slots = context.Attr<std::vector<int>>("slots");

auto inputs = context.MultiInput<framework::LoDTensor>("Ids");
auto shows = context.Input<framework::LoDTensor>("Shows");
Expand All @@ -47,6 +48,7 @@ class DistributedPushSparseKernel : public framework::OpKernel<T> {
static_cast<uint64_t>(padding_idx),
context.GetPlace(),
&inputs,
slots,
shows,
clks,
&outputs,
Expand Down Expand Up @@ -103,6 +105,7 @@ class DistributedPushSparseKernel : public framework::OpKernel<T> {
static_cast<uint64_t>(padding_idx),
context.GetPlace(),
&tmp_input_vec,
slots,
tmp_shows_tensor,
tmp_clicks_tensor,
&tmp_output_vec);
Expand Down
12 changes: 8 additions & 4 deletions python/paddle/distributed/passes/ps_trainer_pass.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def _push_sparse_fuse(self, _program, push_sparse_ops, attrs, use_cvm_op):
print('ShowClickEntry not configured, will not use')
show = _program.global_block().create_var(
name="show",
dtype=core.VarDesc.VarType.INT64,
dtype=core.VarDesc.VarType.FP32,
persistable=False,
stop_gradient=True)
_program.global_block()._insert_op(index=0,
Expand All @@ -165,7 +165,7 @@ def _push_sparse_fuse(self, _program, push_sparse_ops, attrs, use_cvm_op):

clk = _program.global_block().create_var(
name="clk",
dtype=core.VarDesc.VarType.INT64,
dtype=core.VarDesc.VarType.FP32,
persistable=False,
stop_gradient=True)
_program.global_block()._insert_op(index=0,
Expand All @@ -190,6 +190,9 @@ def _push_sparse_fuse(self, _program, push_sparse_ops, attrs, use_cvm_op):
padding_idx = ops[0].attr("padding_idx")
is_distributed = ops[0].attr("is_distributed")
op_type = ops[0].type

slots = [op.attr("slot") for op in ops]
print('debug zcb slots: ', slots)
outputs = [
_program.global_block().vars[op.input("Out@GRAD")[0]]
for op in ops
Expand All @@ -204,7 +207,7 @@ def _push_sparse_fuse(self, _program, push_sparse_ops, attrs, use_cvm_op):
'W': w,
"Outputs": outputs,
"Shows": show,
"Clicks": clk
"Clicks": clk,
},
outputs={"Outputs": outputs},
attrs={
Expand All @@ -213,7 +216,8 @@ def _push_sparse_fuse(self, _program, push_sparse_ops, attrs, use_cvm_op):
"padding_idx": padding_idx,
"table_id": table_id,
"size": self.emb_size[param],
"use_cvm_op": use_cvm_op
"use_cvm_op": use_cvm_op,
"slots": slots
})

def _pull_sparse_fuse(self, _program, pull_sparse_ops, attrs, send_ctx):
Expand Down
Loading