Skip to content

Commit

Permalink
Allow optional RDMA queue depth (#30)
Browse files Browse the repository at this point in the history
* make rdma queue depth optional

* fix bug

* improve logging
  • Loading branch information
ymjiang authored Mar 28, 2020
1 parent 2f40b17 commit 7e4800f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 14 deletions.
31 changes: 23 additions & 8 deletions src/rdma_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,27 @@ struct Endpoint {
struct rdma_cm_id *cm_id;
std::shared_ptr<Transport> trans;

WRContext rx_ctx[kRxDepth];

WRContext start_ctx[kStartDepth];
WRContext reply_ctx[kReplyDepth];
int kStartDepth = 128;
int kRxDepth = 2048;
int kReplyDepth = kRxDepth;
WRContext *rx_ctx;
WRContext *start_ctx;
WRContext *reply_ctx;

ThreadsafeQueue<WRContext *> free_start_ctx;
ThreadsafeQueue<WRContext *> free_reply_ctx;

Endpoint() : status(IDLE), node_id(Node::kEmpty), cm_id(nullptr), rx_ctx() {}
Endpoint() : status(IDLE), node_id(Node::kEmpty), cm_id(nullptr), rx_ctx() {
auto byteps_rx_depth = Environment::Get()->find("BYTEPS_RDMA_RX_DEPTH");
auto byteps_start_depth = Environment::Get()->find("BYTEPS_RDMA_START_DEPTH");
kStartDepth = byteps_start_depth ? atoi(byteps_start_depth) : kStartDepth;
kRxDepth = byteps_rx_depth ? atoi(byteps_rx_depth) : kRxDepth;
kReplyDepth = kRxDepth;

start_ctx = new WRContext[kStartDepth];
reply_ctx = new WRContext[kReplyDepth];
rx_ctx = new WRContext[kRxDepth];
}

~Endpoint() {
for (int i = 0; i < kRxDepth; ++i) {
Expand Down Expand Up @@ -92,7 +104,9 @@ struct Endpoint {
ib_malloc((void**) &buf, kMempoolChunkSize);
CHECK(buf);
struct ibv_mr *mr = ibv_reg_mr(pd, buf, kMempoolChunkSize, 0);
CHECK(mr);
CHECK(mr) << "ibv_reg_mr failed: " << strerror(errno)
<< "\nYou can try to reduce BYTEPS_RDMA_START_DEPTH (default 128)"
<< " or BYTEPS_RDMA_RX_DEPTH (default 2048)";

ctx[i].type = type;
ctx[i].buffer = mr;
Expand Down Expand Up @@ -127,8 +141,9 @@ struct Endpoint {
CHECK(buf);
struct ibv_mr *mr =
ibv_reg_mr(pd, buf, kMempoolChunkSize, IBV_ACCESS_LOCAL_WRITE);
CHECK(mr)<< "ibv_reg_mr failed: " << strerror(errno)
<< ", i=" << i <<", kMempoolChunkSize="<< kMempoolChunkSize;
CHECK(mr) << "ibv_reg_mr failed: " << strerror(errno)
<< "\nYou can try to reduce BYTEPS_RDMA_START_DEPTH (default 128)"
<< " or BYTEPS_RDMA_RX_DEPTH (default 2048)";

rx_ctx[i].type = kReceiveContext;
rx_ctx[i].buffer = mr;
Expand Down
6 changes: 0 additions & 6 deletions src/rdma_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,9 @@ namespace ps {
#define DIVUP(x, y) (((x)+(y)-1)/(y))
#define ROUNDUP(x, y) (DIVUP((x), (y))*(y))

static const int kStartDepth = 128;
static const int kRxDepth = 2048; // should be larger than kStartDepth
static const int kReplyDepth = kRxDepth;

static const int kSGEntry = 1;
static const int kTimeoutms = 1000;
static const int kRdmaListenBacklog = 128;
static const int kMaxConcurrentWorkRequest =
kRxDepth + kStartDepth + kReplyDepth;
static const int kMaxHostnameLength = 16;
static const int kMaxDataFields = 4;

Expand Down
18 changes: 18 additions & 0 deletions src/rdma_van.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,22 @@ class RDMAVan : public Van {
enable_log_ = val ? atoi(val) : false;
if (enable_log_) LOG(INFO) << "Enable RDMA logging.";

val = Environment::Get()->find("BYTEPS_RDMA_MAX_CONCURR_WR");
if (val) {
// should make sure: kMaxConcurrentWorkRequest >= kStartDepth + kReplyDepth + kRxDepth
kMaxConcurrentWorkRequest = atoi(val);

auto start_depth_env = Environment::Get()->find("BYTEPS_RDMA_START_DEPTH");
auto rx_depth_env = Environment::Get()->find("BYTEPS_RDMA_RX_DEPTH");

auto start_depth = start_depth_env ? atoi(start_depth_env) : 128;
auto rx_depth = rx_depth_env ? atoi(rx_depth_env) : 2048;
auto reply_depth = rx_depth;

CHECK_GE(kMaxConcurrentWorkRequest, start_depth + reply_depth + rx_depth)
<< "Should make sure: kMaxConcurrentWorkRequest >= kStartDepth + kReplyDepth + kRxDepth";
}

start_mu_.unlock();
Van::Start(customer_id);
}
Expand Down Expand Up @@ -955,6 +971,8 @@ class RDMAVan : public Van {
bool enable_log_;
std::mutex log_mu_;

int kMaxConcurrentWorkRequest = 4224; // 128 + 2048 * 2

}; // class RDMAVan

}; // namespace ps
Expand Down

0 comments on commit 7e4800f

Please sign in to comment.