Skip to content

Commit

Permalink
Merge pull request #37 from qingshui/paddlebox
Browse files Browse the repository at this point in the history
add 768 embedx, parser file add path param,  pull_sparse support skip offset, fix topk bug
  • Loading branch information
qingshui authored May 26, 2022
2 parents 920c89f + 4f4d42d commit d735315
Show file tree
Hide file tree
Showing 19 changed files with 442 additions and 254 deletions.
34 changes: 22 additions & 12 deletions paddle/fluid/framework/data_feed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ typedef void (*MyPadBoxFreeObject)(paddle::framework::ISlotParser*);
#endif

DECLARE_bool(enable_ins_parser_file);
DECLARE_bool(enable_ins_parser_add_file_path);

namespace paddle {
namespace framework {
Expand Down Expand Up @@ -2991,6 +2992,17 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByFile(void) {
}
};

std::function<int(char* buf, int len)> read_func = nullptr;
if (reader != nullptr) {
read_func = [this, reader](char* buf, int len) {
return reader->read(buf, len);
};
} else {
read_func = [this](char* buf, int len) {
return fread(buf, sizeof(char), len, this->fp_.get());
};
}

std::string filename;
while (this->PickOneFile(&filename)) {
VLOG(3) << "PickOneFile, filename=" << filename
Expand All @@ -3001,16 +3013,10 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByFile(void) {
int lines = 0;
bool is_ok = true;
do {
if (BoxWrapper::GetInstance()->UseAfsApi() && pipe_command_.empty()) {
if (reader != nullptr) {
while (reader->open(filename) < 0) {
sleep(1);
}
is_ok = parser->ParseFileInstance(
[this, reader](char* buf, int len) {
return reader->read(buf, len);
},
pull_record_func, lines);
reader->close();
} else {
if (BoxWrapper::GetInstance()->UseAfsApi()) {
this->fp_ = BoxWrapper::GetInstance()->OpenReadFile(
Expand All @@ -3021,11 +3027,15 @@ void SlotPaddleBoxDataFeed::LoadIntoMemoryByFile(void) {
}
CHECK(this->fp_ != nullptr);
__fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);
is_ok = parser->ParseFileInstance(
[this](char* buf, int len) {
return fread(buf, sizeof(char), len, this->fp_.get());
},
pull_record_func, lines);
}
if (FLAGS_enable_ins_parser_add_file_path) {
is_ok = parser->ParseFileInstance(filename.c_str(), read_func,
pull_record_func, lines);
} else {
is_ok = parser->ParseFileInstance(read_func, pull_record_func, lines);
}
if (reader != nullptr) {
reader->close();
}
if (!is_ok) {
LOG(WARNING) << "parser error, filename=" << filename
Expand Down
8 changes: 8 additions & 0 deletions paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,14 @@ class ISlotParser {
int& lines) { // NOLINT
return false;
}
// add parser file path
virtual bool ParseFileInstance(
const char* path, std::function<int(char* buf, int len)> ReadBuffFunc,
std::function<void(std::vector<SlotRecord>&, int, int)>
PullRecordsFunc, // NOLINT
int& lines) { // NOLINT
return false;
}
};
struct UsedSlotInfo {
int idx;
Expand Down
106 changes: 66 additions & 40 deletions paddle/fluid/framework/fleet/box_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -391,18 +391,22 @@ void BoxWrapper::CheckEmbedSizeIsValid(int embedx_dim, int expand_embed_dim) {
"expand_embed_dim = %d, but got %d.",
expand_embed_dim_, expand_embed_dim));
}
PADDLE_ENFORCE_EQ(
embedx_dim_, embedx_dim,
platform::errors::InvalidArgument("SetInstance(): invalid embedx_dim. "
"When embedx_dim = %d, but got %d.",
embedx_dim_, embedx_dim));
// skip embedx dim zero
if (embedx_dim_ > 0) {
PADDLE_ENFORCE_EQ(
embedx_dim_, embedx_dim,
platform::errors::InvalidArgument("SetInstance(): invalid embedx_dim. "
"When embedx_dim = %d, but got %d.",
embedx_dim_, embedx_dim));
}
}

void BoxWrapper::PullSparse(const paddle::platform::Place& place,
const std::vector<const uint64_t*>& keys,
const std::vector<float*>& values,
const std::vector<int64_t>& slot_lengths,
const int hidden_size, const int expand_embed_dim) {
const int hidden_size, const int expand_embed_dim,
const int skip_offset) {
#define EMBEDX_CASE(i, ...) \
case i: { \
constexpr size_t EmbedxDim = i; \
Expand All @@ -420,29 +424,42 @@ void BoxWrapper::PullSparse(const paddle::platform::Place& place,
if (feature_type_ == static_cast<int>(boxps::FEATURE_SHARE_EMBEDDING)) { \
PullSparseCase< \
boxps::FeaturePullValueGpuShareEmbedding<EmbedxDim, ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim, \
skip_offset); \
} else if (feature_type_ == static_cast<int>(boxps::FEATURE_PCOC)) { \
PullSparseCase<boxps::FeaturePullValueGpuPCOC<EmbedxDim, ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim, \
skip_offset); \
} else if (feature_type_ == static_cast<int>(boxps::FEATURE_QUANT) || \
feature_type_ == static_cast<int>(boxps::FEATURE_SHOWCLK)) { \
PullSparseCase<boxps::FeaturePullValueGpuQuant<EmbedxDim, ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim, \
skip_offset); \
} else if (feature_type_ == static_cast<int>(boxps::FEATURE_CONV)) { \
PullSparseCase<boxps::FeaturePullValueGpuConv<EmbedxDim, ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim, \
skip_offset); \
} else if (feature_type_ == static_cast<int>(boxps::FEATURE_VARIABLE)) { \
PullSparseCase<boxps::FeatureVarPullValueGpu<EmbedxDim, ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim, \
skip_offset); \
} else if (EmbedxDim == 0 && \
feature_type_ == static_cast<int>(boxps::FEATURE_ADAM)) { \
PullSparseCase<boxps::FeatureVarPullValueGpu<EmbedxDim, ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim, \
skip_offset); \
} else { \
PullSparseCase<boxps::FeaturePullValueGpu<EmbedxDim, ExpandDim>>( \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim); \
place, keys, values, slot_lengths, hidden_size, expand_embed_dim, \
skip_offset); \
} \
} break

CheckEmbedSizeIsValid(hidden_size - cvm_offset_, expand_embed_dim);
CheckEmbedSizeIsValid(hidden_size + skip_offset - cvm_offset_,
expand_embed_dim);
switch (embedx_dim_) {
EMBEDX_CASE(0, PULLSPARSE_CASE(0););
EMBEDX_CASE(0, PULLSPARSE_CASE(0); PULLSPARSE_CASE(255);
PULLSPARSE_CASE(767););
EMBEDX_CASE(2, PULLSPARSE_CASE(0););
EMBEDX_CASE(4, PULLSPARSE_CASE(0););
EMBEDX_CASE(8, PULLSPARSE_CASE(0); PULLSPARSE_CASE(1); PULLSPARSE_CASE(2);
Expand Down Expand Up @@ -472,7 +489,7 @@ void BoxWrapper::PushSparseGrad(const paddle::platform::Place& place,
const std::vector<int64_t>& slot_lengths,
const int hidden_size,
const int expand_embed_dim,
const int batch_size) {
const int batch_size, const int skip_offset) {
#define EMBEDX_CASE(i, ...) \
case i: { \
constexpr size_t EmbedxDim = i; \
Expand All @@ -491,31 +508,38 @@ void BoxWrapper::PushSparseGrad(const paddle::platform::Place& place,
PushSparseGradCase< \
boxps::FeaturePushValueGpuShareEmbedding<EmbedxDim, ExpandDim>>( \
place, keys, grad_values, slot_lengths, hidden_size, \
expand_embed_dim, batch_size); \
expand_embed_dim, batch_size, skip_offset); \
} else if (feature_type_ == static_cast<int>(boxps::FEATURE_PCOC)) { \
PushSparseGradCase< \
boxps::FeaturePushValueGpuPCOC<EmbedxDim, ExpandDim>>( \
place, keys, grad_values, slot_lengths, hidden_size, \
expand_embed_dim, batch_size); \
expand_embed_dim, batch_size, skip_offset); \
} else if (feature_type_ == static_cast<int>(boxps::FEATURE_VARIABLE)) { \
PushSparseGradCase<boxps::FeatureVarPushValueGpu<EmbedxDim, ExpandDim>>( \
place, keys, grad_values, slot_lengths, hidden_size, \
expand_embed_dim, batch_size); \
expand_embed_dim, batch_size, skip_offset); \
} else if (feature_type_ == static_cast<int>(boxps::FEATURE_CONV)) { \
PushSparseGradCase< \
boxps::FeaturePushValueGpuConv<EmbedxDim, ExpandDim>>( \
place, keys, grad_values, slot_lengths, hidden_size, \
expand_embed_dim, batch_size); \
expand_embed_dim, batch_size, skip_offset); \
} else if (EmbedxDim == 0 && \
feature_type_ == static_cast<int>(boxps::FEATURE_ADAM)) { \
PushSparseGradCase<boxps::FeatureVarPushValueGpu<EmbedxDim, ExpandDim>>( \
place, keys, grad_values, slot_lengths, hidden_size, \
expand_embed_dim, batch_size, skip_offset); \
} else { \
PushSparseGradCase<boxps::FeaturePushValueGpu<EmbedxDim, ExpandDim>>( \
place, keys, grad_values, slot_lengths, hidden_size, \
expand_embed_dim, batch_size); \
expand_embed_dim, batch_size, skip_offset); \
} \
} break

CheckEmbedSizeIsValid(hidden_size - cvm_offset_, expand_embed_dim);
CheckEmbedSizeIsValid(hidden_size + skip_offset - cvm_offset_,
expand_embed_dim);
switch (embedx_dim_) {
EMBEDX_CASE(0, PUSHSPARSE_CASE(0););
EMBEDX_CASE(0, PUSHSPARSE_CASE(0); PUSHSPARSE_CASE(255);
PUSHSPARSE_CASE(767););
EMBEDX_CASE(2, PUSHSPARSE_CASE(0););
EMBEDX_CASE(4, PUSHSPARSE_CASE(0););
EMBEDX_CASE(8, PUSHSPARSE_CASE(0); PUSHSPARSE_CASE(1); PUSHSPARSE_CASE(2);
Expand Down Expand Up @@ -1388,25 +1412,27 @@ const std::string BoxWrapper::SaveBase(const char* batch_model_path,
const char* xbox_model_path,
const std::string& date) {
VLOG(3) << "Begin SaveBase";
PADDLE_ENFORCE_EQ(
date.length(), 8,
platform::errors::PreconditionNotMet(
"date[%s] is invalid, correct example is 20190817", date.c_str()));
int year = std::stoi(date.substr(0, 4));
int month = std::stoi(date.substr(4, 2));
int day = std::stoi(date.substr(6, 2));

struct std::tm b;
b.tm_year = year - 1900;
b.tm_mon = month - 1;
b.tm_mday = day;
b.tm_hour = FLAGS_fix_dayid ? 8 : 0;
b.tm_min = b.tm_sec = 0;
std::time_t seconds_from_1970 = std::mktime(&b);

int day_id = -1;
if (!date.empty()) {
PADDLE_ENFORCE_EQ(
date.length(), 8,
platform::errors::PreconditionNotMet(
"date[%s] is invalid, correct example is 20190817", date.c_str()));
int year = std::stoi(date.substr(0, 4));
int month = std::stoi(date.substr(4, 2));
int day = std::stoi(date.substr(6, 2));

struct std::tm b;
b.tm_year = year - 1900;
b.tm_mon = month - 1;
b.tm_mday = day;
b.tm_hour = FLAGS_fix_dayid ? 8 : 0;
b.tm_min = b.tm_sec = 0;
day_id = std::mktime(&b) / 86400;
}
std::string ret_str;
int ret = boxps_ptr_->SaveBase(batch_model_path, xbox_model_path, ret_str,
seconds_from_1970 / 86400);
int ret =
boxps_ptr_->SaveBase(batch_model_path, xbox_model_path, ret_str, day_id);
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet(
"SaveBase failed in BoxPS."));
return ret_str;
Expand Down
Loading

0 comments on commit d735315

Please sign in to comment.