From 7e4800febf9f3203c41f7451f191361270a11b7d Mon Sep 17 00:00:00 2001 From: Yimin Jiang Date: Sat, 28 Mar 2020 11:27:41 +0800 Subject: [PATCH] Allow optional RDMA queue depth (#30) * make rdma queue depth optional * fix bug * improve logging --- src/rdma_transport.h | 31 +++++++++++++++++++++++-------- src/rdma_utils.h | 6 ------ src/rdma_van.h | 18 ++++++++++++++++++ 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/rdma_transport.h b/src/rdma_transport.h index 522b7a15..7f9b3f78 100644 --- a/src/rdma_transport.h +++ b/src/rdma_transport.h @@ -34,15 +34,27 @@ struct Endpoint { struct rdma_cm_id *cm_id; std::shared_ptr trans; - WRContext rx_ctx[kRxDepth]; - - WRContext start_ctx[kStartDepth]; - WRContext reply_ctx[kReplyDepth]; + int kStartDepth = 128; + int kRxDepth = 2048; + int kReplyDepth = kRxDepth; + WRContext *rx_ctx; + WRContext *start_ctx; + WRContext *reply_ctx; ThreadsafeQueue free_start_ctx; ThreadsafeQueue free_reply_ctx; - Endpoint() : status(IDLE), node_id(Node::kEmpty), cm_id(nullptr), rx_ctx() {} + Endpoint() : status(IDLE), node_id(Node::kEmpty), cm_id(nullptr), rx_ctx() { + auto byteps_rx_depth = Environment::Get()->find("BYTEPS_RDMA_RX_DEPTH"); + auto byteps_start_depth = Environment::Get()->find("BYTEPS_RDMA_START_DEPTH"); + kStartDepth = byteps_start_depth ? atoi(byteps_start_depth) : kStartDepth; + kRxDepth = byteps_rx_depth ? atoi(byteps_rx_depth) : kRxDepth; + kReplyDepth = kRxDepth; + + start_ctx = new WRContext[kStartDepth]; + reply_ctx = new WRContext[kReplyDepth]; + rx_ctx = new WRContext[kRxDepth]; + } ~Endpoint() { for (int i = 0; i < kRxDepth; ++i) { @@ -92,7 +104,9 @@ struct Endpoint { ib_malloc((void**) &buf, kMempoolChunkSize); CHECK(buf); struct ibv_mr *mr = ibv_reg_mr(pd, buf, kMempoolChunkSize, 0); - CHECK(mr); + CHECK(mr) << "ibv_reg_mr failed: " << strerror(errno) + << "\nYou can try to reduce BYTEPS_RDMA_START_DEPTH (default 128)" + << " or BYTEPS_RDMA_RX_DEPTH (default 2048)"; ctx[i].type = type; ctx[i].buffer = mr; @@ -127,8 +141,9 @@ struct Endpoint { CHECK(buf); struct ibv_mr *mr = ibv_reg_mr(pd, buf, kMempoolChunkSize, IBV_ACCESS_LOCAL_WRITE); - CHECK(mr)<< "ibv_reg_mr failed: " << strerror(errno) - << ", i=" << i <<", kMempoolChunkSize="<< kMempoolChunkSize; + CHECK(mr) << "ibv_reg_mr failed: " << strerror(errno) + << "\nYou can try to reduce BYTEPS_RDMA_START_DEPTH (default 128)" + << " or BYTEPS_RDMA_RX_DEPTH (default 2048)"; rx_ctx[i].type = kReceiveContext; rx_ctx[i].buffer = mr; diff --git a/src/rdma_utils.h b/src/rdma_utils.h index 848e1e34..1d82fbe7 100644 --- a/src/rdma_utils.h +++ b/src/rdma_utils.h @@ -54,15 +54,9 @@ namespace ps { #define DIVUP(x, y) (((x)+(y)-1)/(y)) #define ROUNDUP(x, y) (DIVUP((x), (y))*(y)) -static const int kStartDepth = 128; -static const int kRxDepth = 2048; // should be larger than kStartDepth -static const int kReplyDepth = kRxDepth; - static const int kSGEntry = 1; static const int kTimeoutms = 1000; static const int kRdmaListenBacklog = 128; -static const int kMaxConcurrentWorkRequest = - kRxDepth + kStartDepth + kReplyDepth; static const int kMaxHostnameLength = 16; static const int kMaxDataFields = 4; diff --git a/src/rdma_van.h b/src/rdma_van.h index d6952ec1..e76dceb8 100755 --- a/src/rdma_van.h +++ b/src/rdma_van.h @@ -52,6 +52,22 @@ class RDMAVan : public Van { enable_log_ = val ? atoi(val) : false; if (enable_log_) LOG(INFO) << "Enable RDMA logging."; + val = Environment::Get()->find("BYTEPS_RDMA_MAX_CONCURR_WR"); + if (val) { + // should make sure: kMaxConcurrentWorkRequest >= kStartDepth + kReplyDepth + kRxDepth + kMaxConcurrentWorkRequest = atoi(val); + + auto start_depth_env = Environment::Get()->find("BYTEPS_RDMA_START_DEPTH"); + auto rx_depth_env = Environment::Get()->find("BYTEPS_RDMA_RX_DEPTH"); + + auto start_depth = start_depth_env ? atoi(start_depth_env) : 128; + auto rx_depth = rx_depth_env ? atoi(rx_depth_env) : 2048; + auto reply_depth = rx_depth; + + CHECK_GE(kMaxConcurrentWorkRequest, start_depth + reply_depth + rx_depth) + << "Should make sure: kMaxConcurrentWorkRequest >= kStartDepth + kReplyDepth + kRxDepth"; + } + start_mu_.unlock(); Van::Start(customer_id); } @@ -955,6 +971,8 @@ class RDMAVan : public Van { bool enable_log_; std::mutex log_mu_; + int kMaxConcurrentWorkRequest = 4224; // 128 + 2048 * 2 + }; // class RDMAVan }; // namespace ps