Skip to content

Commit

Permalink
rdma: only reg_mr for vals (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
ymjiang authored Jan 15, 2020
1 parent b368e38 commit 7a708d6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 15 deletions.
8 changes: 4 additions & 4 deletions src/rdma_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,17 +305,17 @@ class RDMATransport : public Transport {
}

void SendPushRequest(Message &msg, MessageBuffer *msg_buf, RemoteTuple remote_tuple) {
CHECK_EQ(msg_buf->mrs.size(), 3);
CHECK_EQ(msg_buf->mrs.size(), 1);
auto raddr = std::get<0>(remote_tuple);
auto rkey = std::get<1>(remote_tuple);
auto idx = std::get<2>(remote_tuple);

// push request, split the meta and data into two writes
// further, it does not send keys and lens since these meta already carries these info
struct ibv_sge my_sge;
my_sge.addr = reinterpret_cast<uint64_t>(msg_buf->mrs[1].first->addr);
my_sge.length = msg_buf->mrs[1].second;
my_sge.lkey = msg_buf->mrs[1].first->lkey;
my_sge.addr = reinterpret_cast<uint64_t>(msg_buf->mrs[0].first->addr);
my_sge.length = msg_buf->mrs[0].second;
my_sge.lkey = msg_buf->mrs[0].first->lkey;

// this rdma-write will not trigger any signal both remotely and locally
struct ibv_send_wr wr, *bad_wr = nullptr;
Expand Down
24 changes: 13 additions & 11 deletions src/rdma_van.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ class RDMAVan : public Van {
LOG(INFO) << "Send Push Request: key=" << msg.meta.key
<< "\t timestamp=" << msg.meta.timestamp
<< "\t recver=" << msg.meta.recver
<< "\t tensor_len=" << msg_buf->mrs[1].second
<< "\t tensor_len=" << msg_buf->mrs[0].second
<< "\t remote_idx=" << std::get<2>(remote_tuple)
<< "\t remote_addr=" << std::get<0>(remote_tuple)
<< std::flush;
Expand Down Expand Up @@ -532,31 +532,33 @@ class RDMAVan : public Van {
return msg_buf;
}

void RegisterMemory(Message &msg) {
void RegisterMemory(Message &msg) {
size_t sa_cnt = 0;
for (auto& sa : msg.data) {
if (sa.size() == 0) continue;
std::lock_guard<std::mutex> lock(map_mu_);
if (mem_mr_.find(sa.data()) == mem_mr_.end()) {
if ((mem_mr_.find(sa.data()) == mem_mr_.end()) && (sa_cnt==1)) { // only vals register memory
struct ibv_mr *temp_mr;
CHECK (temp_mr = ibv_reg_mr(mem_allocator_->GetPD(), sa.data(), sa.size(),
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE))
<< "Failed to register the memory region: " << strerror(errno)
<< ", sa.size()=" << sa.size();
mem_mr_[sa.data()] = temp_mr;
}
++sa_cnt;
}
}

void PrepareData(Message &msg, MessageBuffer *msg_buf) {
if (!(msg.meta.push && msg.meta.request)) return; // only push request
for (auto &sa : msg_buf->data) {
if (sa.size() == 0) continue;
std::lock_guard<std::mutex> lock(map_mu_);
auto it = mem_mr_.find(sa.data());
MRPtr ptr(it->second, [](struct ibv_mr *mr) {});
CHECK(ptr.get()) << strerror(errno);
msg_buf->mrs.push_back(std::make_pair(std::move(ptr), sa.size()));
}
auto &sa = msg_buf->data[1];
if (sa.size() == 0) return;
std::lock_guard<std::mutex> lock(map_mu_);
auto it = mem_mr_.find(sa.data());
CHECK_NE(it, mem_mr_.end());
MRPtr ptr(it->second, [](struct ibv_mr *mr) {});
CHECK(ptr.get()) << strerror(errno);
msg_buf->mrs.push_back(std::make_pair(std::move(ptr), sa.size()));
}

void AddMeta(Message &msg) {
Expand Down

0 comments on commit 7a708d6

Please sign in to comment.