Skip to content

Commit

Permalink
Merge branch 'develop' into op/norm_conv
Browse files Browse the repository at this point in the history
  • Loading branch information
Xreki committed Sep 28, 2021
2 parents 65b8b6a + bc7e2b9 commit e94a526
Show file tree
Hide file tree
Showing 67 changed files with 2,594 additions and 276 deletions.
2 changes: 1 addition & 1 deletion cmake/external/dlpack.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ set(DLPACK_PREFIX_DIR ${THIRD_PARTY_PATH}/dlpack)
set(DLPACK_SOURCE_DIR ${THIRD_PARTY_PATH}/dlpack/src/extern_dlpack)

set(DLPACK_REPOSITORY ${GIT_URL}/dmlc/dlpack.git)
set(DLPACK_TAG v0.2)
set(DLPACK_TAG v0.4)

cache_third_party(extern_dlpack
REPOSITORY ${DLPACK_REPOSITORY}
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ add_subdirectory(details)
add_subdirectory(fleet)
add_subdirectory(io)
add_subdirectory(new_executor)
add_subdirectory(paddle2cinn)
#ddim lib
proto_library(framework_proto SRCS framework.proto)
proto_library(pass_desc_proto SRCS pass_desc.proto DEPS framework_proto)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void ScaleLossGradOpHandle::RunImpl() {
#endif
}

std::string ScaleLossGradOpHandle::Name() const { return "Scale LossGrad"; }
std::string ScaleLossGradOpHandle::Name() const { return "ScaleLossGrad"; }
} // namespace details
} // namespace framework
} // namespace paddle
8 changes: 0 additions & 8 deletions paddle/fluid/framework/device_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ class PSGPUWorker : public HogwildWorker {
virtual void Initialize(const TrainerDesc& desc);
virtual void TrainFiles();
virtual void TrainFilesWithProfiler();
virtual void SetNeedDump(bool need_dump_field);
virtual void SetChannelWriter(ChannelObject<std::string>* queue);
virtual void SetWorkerNum(int num) { worker_num_ = num; }
virtual void CacheProgram(const ProgramDesc& main_program) {
Expand All @@ -467,26 +466,19 @@ class PSGPUWorker : public HogwildWorker {

protected:
void PushGradients();
void DumpParam();
void CopySparseTable();
void CopyDenseTable();
void CopyDenseVars();

private:
int mpi_rank_;
std::mutex mutex_;
std::vector<std::string> send_var_list_;
int worker_num_;
ProgramDesc program_;
HeterObjectPool<HeterTask> object_pool_;
bool need_dump_param_;
std::vector<std::string> dump_param_;
bool need_to_push_dense_;
bool need_dump_field_;
bool dump_slot_;
bool need_to_push_sparse_;
std::vector<std::string> dump_fields_;
ChannelWriter<std::string> writer_;
DownpourWorkerParameter param_;
float scale_datanorm_;
// just save the value in param_ for easy access
Expand Down
80 changes: 35 additions & 45 deletions paddle/fluid/framework/dlpack_tensor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,10 @@ static ::DLDataType GetDLDataTypeCode() {
::DLDataType dtype;
if (std::is_same<T, platform::complex<float>>::value ||
std::is_same<T, platform::complex<double>>::value) {
// The current dlpack library version is v0.2, and does not define
// kDLComplex value. But kDLComplex is defined by 5U in v0.4, so we set
// dtype.code to 5U directly here. After the dlpack library version being
// upgraded to v0.4, it should be written as follow.
// dtype.code = kDLComplex;
dtype.code = 5U;
dtype.code = kDLComplex;
} else if (std::is_same<T, platform::bfloat16>::value) {
dtype.code = kDLBfloat;
} else if (std::is_same<T, platform::float16>::value ||
std::is_same<T, platform::bfloat16>::value ||
std::is_floating_point<T>::value) {
dtype.code = kDLFloat;
} else if (std::is_unsigned<T>::value) {
Expand Down Expand Up @@ -77,47 +73,47 @@ static DLDataType GetDLDataTypeFromTypeIndex(proto::VarType::Type type) {
#undef REG_DL_DATA_TYPE
}

struct DLContextVisitor : public boost::static_visitor<::DLContext> {
inline ::DLContext operator()(const platform::CPUPlace &place) const {
::DLContext ctx;
ctx.device_type = kDLCPU;
ctx.device_id = 0;
return ctx;
struct DLDeviceVisitor : public boost::static_visitor<::DLDevice> {
inline ::DLDevice operator()(const platform::CPUPlace &place) const {
::DLDevice device;
device.device_type = kDLCPU;
device.device_id = 0;
return device;
}

inline ::DLContext operator()(const platform::XPUPlace &place) const {
inline ::DLDevice operator()(const platform::XPUPlace &place) const {
PADDLE_THROW(
platform::errors::Unimplemented("platform::XPUPlace is not supported"));
}

inline ::DLContext operator()(const platform::NPUPlace &place) const {
inline ::DLDevice operator()(const platform::NPUPlace &place) const {
PADDLE_THROW(
platform::errors::Unimplemented("platform::NPUPlace is not supported"));
}

inline ::DLContext operator()(const platform::NPUPinnedPlace &place) const {
inline ::DLDevice operator()(const platform::NPUPinnedPlace &place) const {
PADDLE_THROW(platform::errors::Unimplemented(
"platform::NPUPinnedPlace is not supported"));
}

inline ::DLContext operator()(const platform::CUDAPlace &place) const {
inline ::DLDevice operator()(const platform::CUDAPlace &place) const {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
::DLContext ctx;
ctx.device_type = kDLGPU;
ctx.device_id = place.device;
return ctx;
::DLDevice device;
device.device_type = kDLGPU;
device.device_id = place.device;
return device;
#else
PADDLE_THROW(platform::errors::Unavailable(
"platform::CUDAPlace is not supported in CPU only version."));
#endif
}

inline ::DLContext operator()(const platform::CUDAPinnedPlace &place) const {
inline ::DLDevice operator()(const platform::CUDAPinnedPlace &place) const {
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
::DLContext ctx;
ctx.device_type = kDLCPUPinned;
ctx.device_id = 0;
return ctx;
::DLDevice device;
device.device_type = kDLCPUPinned;
device.device_id = 0;
return device;
#else
PADDLE_THROW(platform::errors::Unavailable(
"platform::CUDAPinnedPlace is not supported in CPU only version."));
Expand All @@ -130,9 +126,9 @@ DLPackTensor::DLPackTensor(const Tensor &tensor, LaneType lanes) {
// init data, data buffer
t_.data = const_cast<void *>(tensor.data<void>());

// init ctx, DLContext type with device_type and device_id
// init device, DLDevice type with device_type and device_id
auto place = tensor.place();
t_.ctx = boost::apply_visitor(internal::DLContextVisitor(), place);
t_.device = boost::apply_visitor(internal::DLDeviceVisitor(), place);

// init dtype
t_.dtype = internal::GetDLDataTypeFromTypeIndex(tensor.type());
Expand All @@ -156,30 +152,24 @@ DLPackTensor::DLPackTensor(const Tensor &tensor, LaneType lanes) {
t_.byte_offset = 0;
}

::DLManagedTensor *DLPackTensor::ToCudfCompatibleDLManagedTensor() {
// init shape, tensor dims
// for DLManagedTensor shape need to be compatible with ndim
// refer to cupy and cudf, we new int64[ndim]
::DLManagedTensor *DLPackTensor::ToDLManagedTensor() {
// init shape
auto shape = new int64_t[t_.ndim];
using DimType = decltype(t_.ndim); // int
for (DimType i = 0; i < t_.ndim; ++i) {
shape[i] = t_.shape[i];
}
t_.shape = shape;

// init strides, nullptr means the tensor is compact
// refer to cupy and cudf, the compact tensor first dim's strides need to be 1
// and second dim's strides need to be length of rows of cudf
// cudf now only support dim=2
PADDLE_ENFORCE_LE(t_.ndim, 2, platform::errors::InvalidArgument(
"cudf now only supports dimension is 2, "
"but received dimension is %d.",
t_.ndim));

if (t_.ndim > 1)
t_.strides = new int64_t[2]{1, t_.shape[1]};
else
t_.strides = new int64_t[1]{1};
// init strides
auto strides = new int64_t[t_.ndim];
for (DimType i = 0; i < t_.ndim; ++i) {
strides[i] = 1;
}
for (DimType i = t_.ndim - 2; i >= 0; --i) {
strides[i] = t_.shape[i + 1] * strides[i + 1];
}
t_.strides = strides;

auto tensor = new DLManagedTensor;
tensor->dl_tensor = t_;
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/dlpack_tensor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class DLPackTensor {

inline operator ::DLTensor&() { return t_; }

::DLManagedTensor* ToCudfCompatibleDLManagedTensor();
::DLManagedTensor* ToDLManagedTensor();

private:
::DLTensor t_;
Expand Down
29 changes: 16 additions & 13 deletions paddle/fluid/framework/dlpack_tensor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ template <typename T>
constexpr uint8_t GetDLDataTypeCode() {
if (std::is_same<T, platform::complex<float>>::value ||
std::is_same<T, platform::complex<double>>::value) {
return static_cast<uint8_t>(5);
return static_cast<uint8_t>(kDLComplex);
}

if (std::is_same<T, platform::bfloat16>::value) {
return static_cast<uint8_t>(kDLBfloat);
}

return std::is_same<platform::float16, T>::value ||
Expand All @@ -55,15 +59,15 @@ void TestMain(const platform::Place &place, uint16_t lanes) {

CHECK_EQ(p, dl_tensor.data);
if (platform::is_cpu_place(place)) {
CHECK_EQ(kDLCPU, dl_tensor.ctx.device_type);
CHECK_EQ(0, dl_tensor.ctx.device_id);
CHECK_EQ(kDLCPU, dl_tensor.device.device_type);
CHECK_EQ(0, dl_tensor.device.device_id);
} else if (platform::is_gpu_place(place)) {
CHECK_EQ(kDLGPU, dl_tensor.ctx.device_type);
CHECK_EQ(kDLGPU, dl_tensor.device.device_type);
CHECK_EQ(BOOST_GET_CONST(platform::CUDAPlace, place).device,
dl_tensor.ctx.device_id);
dl_tensor.device.device_id);
} else if (platform::is_cuda_pinned_place(place)) {
CHECK_EQ(kDLCPUPinned, dl_tensor.ctx.device_type);
CHECK_EQ(0, dl_tensor.ctx.device_id);
CHECK_EQ(kDLCPUPinned, dl_tensor.device.device_type);
CHECK_EQ(0, dl_tensor.device.device_id);
} else {
CHECK_EQ(false, true);
}
Expand All @@ -83,25 +87,24 @@ void TestMain(const platform::Place &place, uint16_t lanes) {
}

template <typename T>
void TestToCudfCompatibleDLManagedTensor(const platform::Place &place,
uint16_t lanes) {
void TestToDLManagedTensor(const platform::Place &place, uint16_t lanes) {
DDim dims{6, 7};
Tensor tensor;
tensor.Resize(dims);
tensor.mutable_data<T>(place);

DLPackTensor dlpack_tensor(tensor, lanes);

::DLManagedTensor *dl_managed_tensor =
dlpack_tensor.ToCudfCompatibleDLManagedTensor();
::DLManagedTensor *dl_managed_tensor = dlpack_tensor.ToDLManagedTensor();

CHECK_EQ(dl_managed_tensor->manager_ctx == nullptr, true);

for (auto i = 0; i < dims.size(); ++i) {
CHECK_EQ(dims[i], dl_managed_tensor->dl_tensor.shape[i]);
}

CHECK_EQ(dl_managed_tensor->dl_tensor.strides[0] == 1, true);
CHECK_EQ(dl_managed_tensor->dl_tensor.strides[0] == 7, true);
CHECK_EQ(dl_managed_tensor->dl_tensor.strides[1] == 1, true);

dl_managed_tensor->deleter(dl_managed_tensor);
}
Expand All @@ -122,7 +125,7 @@ void TestMainLoop() {
for (auto &p : places) {
for (auto &l : lanes) {
TestMain<T>(p, l);
TestToCudfCompatibleDLManagedTensor<T>(p, l);
TestToDLManagedTensor<T>(p, l);
}
}
}
Expand Down
45 changes: 44 additions & 1 deletion paddle/fluid/framework/fleet/gloo_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ void HdfsStore::set(const std::string& key, const std::vector<char>& data) {
}
}
paddle::framework::fs_mv(tmp, path);
auto start = std::chrono::steady_clock::now();
while (paddle::framework::fs_exists(path) == false) {
VLOG(0) << "HdfsStore::set fs_mv retrying...";
paddle::framework::fs_mv(tmp, path);
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start);
if (wait_timeout_ != gloo::kNoTimeout && elapsed > wait_timeout_) {
PADDLE_THROW(paddle::platform::errors::ExecutionTimeout(
"fs_mv failed, tmp: %s, path: %s", tmp, path));
}
std::this_thread::sleep_for(std::chrono::milliseconds(wait_sleep_ms_));
}
#endif
}

Expand Down Expand Up @@ -140,6 +152,7 @@ void HdfsStore::wait(const std::vector<std::string>& keys,
auto start = std::chrono::steady_clock::now();
std::vector<bool> check_key_status(keys.size(), false);
while (!Check(keys, &check_key_status)) {
VLOG(0) << "HdfsStore::wait checking repeatedly...";
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start);
if (wait_timeout_ != gloo::kNoTimeout && elapsed > wait_timeout_) {
Expand Down Expand Up @@ -209,6 +222,8 @@ void ParallelConnectContext::connectFullMesh(
// Create pairs
auto transportContext = dev->createContext(rank, size);
transportContext->setTimeout(getTimeout());
VLOG(0) << "transportContext timeout: " << getTimeout().count()
<< ", curr rank: " << rank;
for (int i = 0; i < size; i++) {
if (i == rank) {
continue;
Expand All @@ -225,6 +240,7 @@ void ParallelConnectContext::connectFullMesh(

std::vector<std::shared_ptr<std::thread>> connect_threads(thread_num_);
// Connect every pair
VLOG(0) << "connect_thread_num: " << thread_num_ << ", size: " << size;
for (uint32_t i = 0; i < connect_threads.size(); ++i) {
connect_threads[i].reset(new std::thread(
[&store, &transportContext, total_add_size, this](
Expand Down Expand Up @@ -252,10 +268,36 @@ void ParallelConnectContext::connectFullMesh(
sleep(5);
--max_retry_times;
}

auto addr = extractAddress(allAddrs, i);
if (addr.empty()) {
VLOG(0) << "peer address is null";
}
Impl impl_;
memcpy(&impl_, addr.data(), sizeof(impl_));
struct sockaddr_in* sa = (struct sockaddr_in*)&(impl_.ss);
std::string ip = getCharIpAddr(sa->sin_addr.s_addr);
VLOG(0) << "peer " << i << " ip addr: " << ip
<< ", port: " << sa->sin_port;

auto start = std::chrono::steady_clock::now();
std::chrono::seconds connect_wait_timeout_ =
std::chrono::seconds(600);
while (true) {
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start);
if (elapsed > connect_wait_timeout_) {
break;
}
try {
transportContext->getPair(i)->connect(addr);
break;
} catch (...) {
VLOG(0) << "gloo connect failed, retrying...";
}
}
transportContext->getPair(i)->connect(addr);
}
VLOG(0) << "peer connected success";
},
i, connect_threads.size()));
}
Expand All @@ -264,6 +306,7 @@ void ParallelConnectContext::connectFullMesh(
}
device_ = dev;
transportContext_ = std::move(transportContext);
VLOG(0) << "ParallelConnectContext::connectFullMesh() is over";
}
#endif
} // namespace rendezvous
Expand Down
20 changes: 20 additions & 0 deletions paddle/fluid/framework/fleet/gloo_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@ class ParallelConnectContext : public gloo::rendezvous::Context {
// slowly in case big size, especialy in HdfsStore
void connectFullMesh(Store& store, // NOLINT
std::shared_ptr<transport::Device>& dev); // NOLINT
struct Impl {
// IP address of the listening socket.
struct sockaddr_storage ss;
// Sequence number of this address.
// If this is equal to -1, the address is assumed to
// represent the listening socket of a device. The sequence number
// must be set before it can be used by a pair.
ssize_t seq{-1};
};
std::string getCharIpAddr(uint32_t ipAddress) {
const int NBYTES = 4;
uint8_t octet[NBYTES];
char ipAddressFinal[16];
for (int i = 0; i < NBYTES; i++) {
octet[i] = ipAddress >> (i * 8);
}
snprintf(ipAddressFinal, sizeof(ipAddressFinal), "%d.%d.%d.%d", octet[0],
octet[1], octet[2], octet[3]);
return std::string(ipAddressFinal);
}

protected:
int thread_num_ = 6;
Expand Down
Loading

0 comments on commit e94a526

Please sign in to comment.