Skip to content

Commit

Permalink
memory sparse table & brpc communication upgrade dependency (#36734)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaocaibei123 authored Nov 1, 2021
1 parent 249081b commit 29c6bcb
Show file tree
Hide file tree
Showing 11 changed files with 640 additions and 16 deletions.
1 change: 1 addition & 0 deletions paddle/fluid/distributed/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
"${DISTRIBUTE_COMPILE_FLAGS} -faligned-new")
endif()

add_subdirectory(common)
add_subdirectory(service)
add_subdirectory(table)
add_subdirectory(test)
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/distributed/common/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

cc_library(afs_wrapper SRCS afs_warpper.cc DEPS fs ps_framework_proto)

#set_property(GLOBAL PROPERTY COMMON_DEPS afs_warpper)
89 changes: 89 additions & 0 deletions paddle/fluid/distributed/common/afs_warpper.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/common/afs_warpper.h"
#include "paddle/fluid/framework/io/fs.h"

namespace paddle {
namespace distributed {
// AfsClient impl
int AfsClient::initialize(const FsClientParameter& fs_client_param) {
// temporarily implemented with hdfs-client
return initialize(fs_client_param.hadoop_bin(), fs_client_param.uri(),
fs_client_param.user(), fs_client_param.passwd(),
fs_client_param.buffer_size());
}
int AfsClient::initialize(const std::string& hadoop_bin, const std::string& uri,
const std::string& user, const std::string& passwd,
int buffer_size_param) {
return initialize(hadoop_bin, uri, paddle::string::format_string(
"%s,%s", user.c_str(), passwd.c_str()),
buffer_size_param);
}
int AfsClient::initialize(const std::string& hadoop_bin, const std::string& uri,
const std::string& ugi, int buffer_size_param) {
// temporarily implemented with hdfs-client
size_t buffer_size = 1L << 25; // 32MB
if (buffer_size_param > static_cast<int>(buffer_size)) {
buffer_size = buffer_size_param;
}
paddle::framework::hdfs_set_buffer_size(buffer_size);
paddle::framework::hdfs_set_command(paddle::string::format_string(
"2>>./hdfs_err.log %s fs -Dfs.default.name=%s -Dhadoop.job.ugi=%s "
"-Ddfs.client.block.write.retries=15 -Ddfs.rpc.timeout=300000",
hadoop_bin.c_str(), uri.c_str(), ugi.c_str()));
return 0;
}

// open file in 'w' or 'r'
std::shared_ptr<FsReadChannel> AfsClient::open_r(const FsChannelConfig& config,
uint32_t buffer_size,
int* err_no) {
std::shared_ptr<FsReadChannel> channel =
std::make_shared<FsReadChannel>(buffer_size);
std::shared_ptr<FILE> fp =
paddle::framework::fs_open_read(config.path, err_no, config.deconverter);
channel->open(fp, config);
return channel;
}
std::shared_ptr<FsWriteChannel> AfsClient::open_w(const FsChannelConfig& config,
uint32_t buffer_size,
int* err_no) {
std::shared_ptr<FsWriteChannel> channel =
std::make_shared<FsWriteChannel>(buffer_size);
std::shared_ptr<FILE> fp =
paddle::framework::fs_open_write(config.path, err_no, config.converter);
channel->open(fp, config);
return channel;
}

// remove file in path, path maybe a reg, such as 'part-000-*'
void AfsClient::remove(const std::string& path) {
return paddle::framework::fs_remove(path);
}
void AfsClient::remove_dir(const std::string& dir) {
return paddle::framework::fs_remove(dir);
}

// list files in path, path maybe a dir with reg
std::vector<std::string> AfsClient::list(const std::string& path) {
return paddle::framework::fs_list(path);
}

// exist or not
bool AfsClient::exist(const std::string& dir) {
return paddle::framework::fs_exists(dir);
}
} // namespace distributed
} // namespace paddle
156 changes: 156 additions & 0 deletions paddle/fluid/distributed/common/afs_warpper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/string/string_helper.h"

namespace paddle {
namespace distributed {
struct FsDataConverter {
std::string converter;
std::string deconverter;
};

struct FsChannelConfig {
std::string path; // path of file
std::string converter; // data converter
std::string deconverter;
};

class FsReadChannel {
public:
FsReadChannel() : _buffer_size(0) {}
explicit FsReadChannel(uint32_t buffer_size) : _buffer_size(buffer_size) {}
virtual ~FsReadChannel() {}
FsReadChannel(FsReadChannel&&) = delete;
FsReadChannel(const FsReadChannel&) = delete;
int open(std::shared_ptr<FILE> fp, const FsChannelConfig& config) {
_file = fp;
return 0;
}
inline int close() {
_file.reset();
return 0;
}

inline uint32_t read_line(std::string& line_data) { // NOLINT
line_data.clear();
char buffer = '\0';
size_t read_count = 0;
while (1 == fread(&buffer, 1, 1, _file.get()) && buffer != '\n') {
++read_count;
line_data.append(&buffer, 1);
}
if (read_count == 0 && buffer != '\n') {
return -1;
}
return 0;
}

private:
uint32_t _buffer_size;
FsChannelConfig _config;
std::shared_ptr<FILE> _file;
};
class FsWriteChannel {
public:
FsWriteChannel() : _buffer_size(0) {}
explicit FsWriteChannel(uint32_t buffer_size) : _buffer_size(buffer_size) {}
virtual ~FsWriteChannel() {}
FsWriteChannel(FsWriteChannel&&) = delete;
FsWriteChannel(const FsWriteChannel&) = delete;

int open(std::shared_ptr<FILE> fp, const FsChannelConfig& config) {
_file = fp;

// the buffer has set in fs.cc
// if (_buffer_size != 0) {
// _buffer = std::shared_ptr<char>(new char[_buffer_size]);

// CHECK(0 == setvbuf(&*_file, _buffer.get(), _IOFBF, _buffer_size));
//}
return 0;
}

inline void flush() { return; }

inline int close() {
flush();
_file.reset();
return 0;
}

inline uint32_t write_line(const char* data, uint32_t size) {
size_t write_count = fwrite_unlocked(data, 1, size, _file.get());
if (write_count != size) {
return -1;
}
write_count = fwrite_unlocked("\n", 1, 1, _file.get());
if (write_count != 1) {
return -1;
}
return 0;
}
inline uint32_t write_line(const std::string& data) {
return write_line(data.c_str(), data.size());
}

private:
uint32_t _buffer_size;
FsChannelConfig _config;
std::shared_ptr<FILE> _file;
std::shared_ptr<char> _buffer;
};

class AfsClient {
public:
AfsClient() {}
virtual ~AfsClient() {}
AfsClient(AfsClient&&) = delete;
AfsClient(const AfsClient&) = delete;

int initialize(const FsClientParameter& fs_client_param);
int initialize(const std::string& hadoop_bin, const std::string& uri,
const std::string& user, const std::string& passwd,
int buffer_size_param = (1L << 25));
int initialize(const std::string& hadoop_bin, const std::string& uri,
const std::string& ugi, int buffer_size_param = (1L << 25));

// open file in 'w' or 'r'
std::shared_ptr<FsReadChannel> open_r(const FsChannelConfig& config,
uint32_t buffer_size = 0,
int* err_no = nullptr);
std::shared_ptr<FsWriteChannel> open_w(const FsChannelConfig& config,
uint32_t buffer_size = 0,
int* err_no = nullptr);

// remove file in path, path maybe a reg, such as 'part-000-*'
void remove(const std::string& path);
void remove_dir(const std::string& dir);

// list files in path, path maybe a dir with reg
std::vector<std::string> list(const std::string& path);

// exist or not
bool exist(const std::string& dir);
};
} // namespace distributed
} // namespace paddle
93 changes: 93 additions & 0 deletions paddle/fluid/distributed/common/cost_timer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include <memory>
#include <unordered_map>
#include "butil/time.h"
#include "bvar/latency_recorder.h"
#include "glog/logging.h"

namespace paddle {
namespace distributed {

struct CostProfilerNode {
std::shared_ptr<bvar::LatencyRecorder> recorder;
};

class CostProfiler {
public:
~CostProfiler() {}
static CostProfiler& instance() {
static CostProfiler profiler;
return profiler;
}

void register_profiler(const std::string& label) {
if (_cost_profiler_map.find(label) != _cost_profiler_map.end()) {
return;
}
auto profiler_node = std::make_shared<CostProfilerNode>();
profiler_node->recorder.reset(
new bvar::LatencyRecorder("cost_profiler", label));
_cost_profiler_map[label] = profiler_node;
}

CostProfilerNode* profiler(const std::string& label) {
auto itr = _cost_profiler_map.find(label);
if (itr != _cost_profiler_map.end()) {
return itr->second.get();
}
return NULL;
}

private:
CostProfiler() {}
std::unordered_map<std::string, std::shared_ptr<CostProfilerNode>>
_cost_profiler_map;
};

class CostTimer {
public:
explicit CostTimer(const std::string& label) {
_label = label;
auto& profiler = CostProfiler::instance();
_profiler_node = profiler.profiler(label);
// 如果不在profiler中,则使用log输出耗时信息
_is_print_cost = _profiler_node == NULL;
_start_time_ms = butil::gettimeofday_ms();
}
explicit CostTimer(CostProfilerNode& profiler_node) { // NOLINT
_is_print_cost = false;
_profiler_node = &profiler_node;
_start_time_ms = butil::gettimeofday_ms();
}
~CostTimer() {
if (_is_print_cost) {
LOG(INFO) << "CostTimer label:" << _label
<< ", cost:" << butil::gettimeofday_ms() - _start_time_ms
<< "ms";
} else {
*(_profiler_node->recorder) << butil::gettimeofday_ms() - _start_time_ms;
}
}

private:
std::string _label;
bool _is_print_cost;
uint64_t _start_time_ms;
CostProfilerNode* _profiler_node;
};
} // namespace distributed
} // namespace paddle
15 changes: 15 additions & 0 deletions paddle/fluid/distributed/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ inline void ADD(int n, const T* x, const T y, T* z) {
}
}

template <typename T>
inline void DIV(int n, const T x, const T* y, T* z) {
for (int i = 0; i < n; ++i) {
z[i] = x / y[i];
}
}

template <typename T>
inline void ELE_MUL(int n, const T* x, const T* y, T* z) {
for (int i = 0; i < n; ++i) {
z[i] = x[i] * y[i];
}
}

static bool StartWith(const std::string& str, const std::string& substr) {
return str.find(substr) == 0;
}
Expand Down Expand Up @@ -91,5 +105,6 @@ inline double GetCurrentUS() {
gettimeofday(&time, NULL);
return 1e+6 * time.tv_sec + time.tv_usec;
}

} // namespace distributed
} // namespace paddle
Loading

0 comments on commit 29c6bcb

Please sign in to comment.