Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow optional RDMA queue depth #30

Merged
merged 3 commits into from
Mar 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions src/rdma_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,27 @@ struct Endpoint {
struct rdma_cm_id *cm_id;
std::shared_ptr<Transport> trans;

WRContext rx_ctx[kRxDepth];

WRContext start_ctx[kStartDepth];
WRContext reply_ctx[kReplyDepth];
int kStartDepth = 128;
int kRxDepth = 2048;
int kReplyDepth = kRxDepth;
WRContext *rx_ctx;
WRContext *start_ctx;
WRContext *reply_ctx;

ThreadsafeQueue<WRContext *> free_start_ctx;
ThreadsafeQueue<WRContext *> free_reply_ctx;

Endpoint() : status(IDLE), node_id(Node::kEmpty), cm_id(nullptr), rx_ctx() {}
Endpoint() : status(IDLE), node_id(Node::kEmpty), cm_id(nullptr), rx_ctx() {
auto byteps_rx_depth = Environment::Get()->find("BYTEPS_RDMA_RX_DEPTH");
auto byteps_start_depth = Environment::Get()->find("BYTEPS_RDMA_START_DEPTH");
kStartDepth = byteps_start_depth ? atoi(byteps_start_depth) : kStartDepth;
kRxDepth = byteps_rx_depth ? atoi(byteps_rx_depth) : kRxDepth;
kReplyDepth = kRxDepth;

start_ctx = new WRContext[kStartDepth];
reply_ctx = new WRContext[kReplyDepth];
rx_ctx = new WRContext[kRxDepth];
}

~Endpoint() {
for (int i = 0; i < kRxDepth; ++i) {
Expand Down Expand Up @@ -92,7 +104,9 @@ struct Endpoint {
ib_malloc((void**) &buf, kMempoolChunkSize);
CHECK(buf);
struct ibv_mr *mr = ibv_reg_mr(pd, buf, kMempoolChunkSize, 0);
CHECK(mr);
CHECK(mr) << "ibv_reg_mr failed: " << strerror(errno)
<< "\nYou can try to reduce BYTEPS_RDMA_START_DEPTH (default 128)"
<< " or BYTEPS_RDMA_RX_DEPTH (default 2048)";

ctx[i].type = type;
ctx[i].buffer = mr;
Expand Down Expand Up @@ -127,8 +141,9 @@ struct Endpoint {
CHECK(buf);
struct ibv_mr *mr =
ibv_reg_mr(pd, buf, kMempoolChunkSize, IBV_ACCESS_LOCAL_WRITE);
CHECK(mr)<< "ibv_reg_mr failed: " << strerror(errno)
<< ", i=" << i <<", kMempoolChunkSize="<< kMempoolChunkSize;
CHECK(mr) << "ibv_reg_mr failed: " << strerror(errno)
<< "\nYou can try to reduce BYTEPS_RDMA_START_DEPTH (default 128)"
<< " or BYTEPS_RDMA_RX_DEPTH (default 2048)";

rx_ctx[i].type = kReceiveContext;
rx_ctx[i].buffer = mr;
Expand Down
6 changes: 0 additions & 6 deletions src/rdma_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,9 @@ namespace ps {
#define DIVUP(x, y) (((x)+(y)-1)/(y))
#define ROUNDUP(x, y) (DIVUP((x), (y))*(y))

static const int kStartDepth = 128;
static const int kRxDepth = 2048; // should be larger than kStartDepth
static const int kReplyDepth = kRxDepth;

static const int kSGEntry = 1;
static const int kTimeoutms = 1000;
static const int kRdmaListenBacklog = 128;
static const int kMaxConcurrentWorkRequest =
kRxDepth + kStartDepth + kReplyDepth;
static const int kMaxHostnameLength = 16;
static const int kMaxDataFields = 4;

Expand Down
18 changes: 18 additions & 0 deletions src/rdma_van.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,22 @@ class RDMAVan : public Van {
enable_log_ = val ? atoi(val) : false;
if (enable_log_) LOG(INFO) << "Enable RDMA logging.";

val = Environment::Get()->find("BYTEPS_RDMA_MAX_CONCURR_WR");
if (val) {
// should make sure: kMaxConcurrentWorkRequest >= kStartDepth + kReplyDepth + kRxDepth
kMaxConcurrentWorkRequest = atoi(val);

auto start_depth_env = Environment::Get()->find("BYTEPS_RDMA_START_DEPTH");
auto rx_depth_env = Environment::Get()->find("BYTEPS_RDMA_RX_DEPTH");

auto start_depth = start_depth_env ? atoi(start_depth_env) : 128;
auto rx_depth = rx_depth_env ? atoi(rx_depth_env) : 2048;
auto reply_depth = rx_depth;

CHECK_GE(kMaxConcurrentWorkRequest, start_depth + reply_depth + rx_depth)
<< "Should make sure: kMaxConcurrentWorkRequest >= kStartDepth + kReplyDepth + kRxDepth";
}

start_mu_.unlock();
Van::Start(customer_id);
}
Expand Down Expand Up @@ -955,6 +971,8 @@ class RDMAVan : public Van {
bool enable_log_;
std::mutex log_mu_;

int kMaxConcurrentWorkRequest = 4224; // 128 + 2048 * 2

}; // class RDMAVan

}; // namespace ps
Expand Down