From ea4cd87e22dbc858b1a623a3dcbeee8726f7532f Mon Sep 17 00:00:00 2001 From: staryxchen Date: Wed, 17 Sep 2025 20:58:42 +0800 Subject: [PATCH 1/3] feat(rdma): add parallel memory region registration support - Add ``parallel_reg_mr`` config option with environment variable control - Implement parallel registration/unregistration using std::async - Maintain backward compatibility with sequential mode Signed-off-by: staryxchen --- mooncake-transfer-engine/include/config.h | 1 + mooncake-transfer-engine/src/config.cpp | 7 ++ .../rdma_transport/rdma_transport.cpp | 86 ++++++++++++++++--- 3 files changed, 82 insertions(+), 12 deletions(-) diff --git a/mooncake-transfer-engine/include/config.h b/mooncake-transfer-engine/include/config.h index e4d849c25..f700f9470 100644 --- a/mooncake-transfer-engine/include/config.h +++ b/mooncake-transfer-engine/include/config.h @@ -48,6 +48,7 @@ struct GlobalConfig { bool use_ipv6 = false; size_t fragment_limit = 16384; bool enable_dest_device_affinity = false; + bool parallel_reg_mr = true; }; void loadGlobalConfig(GlobalConfig &config); diff --git a/mooncake-transfer-engine/src/config.cpp b/mooncake-transfer-engine/src/config.cpp index 61fcc835e..3651da5fa 100644 --- a/mooncake-transfer-engine/src/config.cpp +++ b/mooncake-transfer-engine/src/config.cpp @@ -258,6 +258,12 @@ void loadGlobalConfig(GlobalConfig &config) { if (std::getenv("MC_ENABLE_DEST_DEVICE_AFFINITY")) { config.enable_dest_device_affinity = true; } + + const char *disable_parallel_reg_mr = std::getenv("MC_DISABLE_PARALLEL_REG_MR"); + if (disable_parallel_reg_mr) { + LOG(INFO) << "Disable parallel register memory region"; + config.parallel_reg_mr = false; + } } std::string mtuLengthToString(ibv_mtu mtu) { @@ -306,6 +312,7 @@ void dumpGlobalConfig() { LOG(INFO) << "max_wr = " << config.max_wr; LOG(INFO) << "max_inline = " << config.max_inline; LOG(INFO) << "mtu_length = " << mtuLengthToString(config.mtu_length); + LOG(INFO) << "parallel_reg_mr = " << (config.parallel_reg_mr ? "true" : "false"); } GlobalConfig &globalConfig() { diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp index e87d14db5..dad6846ee 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp @@ -92,9 +92,43 @@ int RdmaTransport::registerLocalMemory(void *addr, size_t length, const static int access_rights = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; + // Memory region registration - parallel or sequential based on configuration + if (globalConfig().parallel_reg_mr) { + // Parallel memory region registration using std::async + std::vector> registration_futures; + registration_futures.reserve(context_list_.size()); + + // Launch parallel registration tasks + for (auto &context : context_list_) { + registration_futures.emplace_back( + std::async(std::launch::async, [&context, addr, length]() -> int { + return context->registerMemoryRegion(addr, length, + access_rights); + })); + } + + // Collect results and check for errors + for (size_t i = 0; i < registration_futures.size(); ++i) { + int ret = registration_futures[i].get(); + if (ret) { + LOG(ERROR) << "Failed to register memory region with context " << i; + return ret; + } + } + } else { + // Sequential memory region registration + for (size_t i = 0; i < context_list_.size(); ++i) { + int ret = context_list_[i]->registerMemoryRegion(addr, length, + access_rights); + if (ret) { + LOG(ERROR) << "Failed to register memory region with context " << i; + return ret; + } + } + } + + // Collect lkey and rkey after all registrations are complete for (auto &context : context_list_) { - int ret = context->registerMemoryRegion(addr, length, access_rights); - if (ret) return ret; buffer_desc.lkey.push_back(context->lkey(addr)); buffer_desc.rkey.push_back(context->rkey(addr)); } @@ -106,19 +140,16 @@ int RdmaTransport::registerLocalMemory(void *addr, size_t length, getMemoryLocation(addr, length); if (entries.empty()) return -1; buffer_desc.name = entries[0].location; - buffer_desc.addr = (uint64_t)addr; - buffer_desc.length = length; - int rc = metadata_->addLocalMemoryBuffer(buffer_desc, update_metadata); - if (rc) return rc; } else { buffer_desc.name = name; - buffer_desc.addr = (uint64_t)addr; - buffer_desc.length = length; - int rc = metadata_->addLocalMemoryBuffer(buffer_desc, update_metadata); - - if (rc) return rc; } + // Set common buffer descriptor fields and add to metadata + buffer_desc.addr = (uint64_t)addr; + buffer_desc.length = length; + int rc = metadata_->addLocalMemoryBuffer(buffer_desc, update_metadata); + if (rc) return rc; + return 0; } @@ -126,7 +157,38 @@ int RdmaTransport::unregisterLocalMemory(void *addr, bool update_metadata) { int rc = metadata_->removeLocalMemoryBuffer(addr, update_metadata); if (rc) return rc; - for (auto &context : context_list_) context->unregisterMemoryRegion(addr); + // Memory region unregistration - parallel or sequential based on configuration + if (globalConfig().parallel_reg_mr) { + // Parallel memory region unregistration using std::async + std::vector> unregistration_futures; + unregistration_futures.reserve(context_list_.size()); + + // Launch parallel unregistration tasks + for (auto &context : context_list_) { + unregistration_futures.emplace_back( + std::async(std::launch::async, [&context, addr]() -> int { + return context->unregisterMemoryRegion(addr); + })); + } + + // Collect results and check for errors + for (size_t i = 0; i < unregistration_futures.size(); ++i) { + int ret = unregistration_futures[i].get(); + if (ret) { + LOG(ERROR) << "Failed to unregister memory region with context " << i; + return ret; + } + } + } else { + // Sequential memory region unregistration + for (size_t i = 0; i < context_list_.size(); ++i) { + int ret = context_list_[i]->unregisterMemoryRegion(addr); + if (ret) { + LOG(ERROR) << "Failed to unregister memory region with context " << i; + return ret; + } + } + } return 0; } From 0aa5f8942baceabeee675da225d00f157c0b5b0e Mon Sep 17 00:00:00 2001 From: staryxchen Date: Wed, 17 Sep 2025 21:24:46 +0800 Subject: [PATCH 2/3] format and remove some comment Signed-off-by: staryxchen --- mooncake-transfer-engine/src/config.cpp | 6 ++-- .../rdma_transport/rdma_transport.cpp | 28 +++++++------------ 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/mooncake-transfer-engine/src/config.cpp b/mooncake-transfer-engine/src/config.cpp index 3651da5fa..c0dd5a84e 100644 --- a/mooncake-transfer-engine/src/config.cpp +++ b/mooncake-transfer-engine/src/config.cpp @@ -259,7 +259,8 @@ void loadGlobalConfig(GlobalConfig &config) { config.enable_dest_device_affinity = true; } - const char *disable_parallel_reg_mr = std::getenv("MC_DISABLE_PARALLEL_REG_MR"); + const char *disable_parallel_reg_mr = + std::getenv("MC_DISABLE_PARALLEL_REG_MR"); if (disable_parallel_reg_mr) { LOG(INFO) << "Disable parallel register memory region"; config.parallel_reg_mr = false; @@ -312,7 +313,8 @@ void dumpGlobalConfig() { LOG(INFO) << "max_wr = " << config.max_wr; LOG(INFO) << "max_inline = " << config.max_inline; LOG(INFO) << "mtu_length = " << mtuLengthToString(config.mtu_length); - LOG(INFO) << "parallel_reg_mr = " << (config.parallel_reg_mr ? "true" : "false"); + LOG(INFO) << "parallel_reg_mr = " + << (config.parallel_reg_mr ? "true" : "false"); } GlobalConfig &globalConfig() { diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp index dad6846ee..9aeff0b82 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp @@ -92,42 +92,38 @@ int RdmaTransport::registerLocalMemory(void *addr, size_t length, const static int access_rights = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; - // Memory region registration - parallel or sequential based on configuration if (globalConfig().parallel_reg_mr) { - // Parallel memory region registration using std::async std::vector> registration_futures; registration_futures.reserve(context_list_.size()); - // Launch parallel registration tasks for (auto &context : context_list_) { - registration_futures.emplace_back( - std::async(std::launch::async, [&context, addr, length]() -> int { + registration_futures.emplace_back(std::async( + std::launch::async, [&context, addr, length]() -> int { return context->registerMemoryRegion(addr, length, access_rights); })); } - // Collect results and check for errors for (size_t i = 0; i < registration_futures.size(); ++i) { int ret = registration_futures[i].get(); if (ret) { - LOG(ERROR) << "Failed to register memory region with context " << i; + LOG(ERROR) << "Failed to register memory region with context " + << i; return ret; } } } else { - // Sequential memory region registration for (size_t i = 0; i < context_list_.size(); ++i) { int ret = context_list_[i]->registerMemoryRegion(addr, length, access_rights); if (ret) { - LOG(ERROR) << "Failed to register memory region with context " << i; + LOG(ERROR) << "Failed to register memory region with context " + << i; return ret; } } } - // Collect lkey and rkey after all registrations are complete for (auto &context : context_list_) { buffer_desc.lkey.push_back(context->lkey(addr)); buffer_desc.rkey.push_back(context->rkey(addr)); @@ -144,7 +140,6 @@ int RdmaTransport::registerLocalMemory(void *addr, size_t length, buffer_desc.name = name; } - // Set common buffer descriptor fields and add to metadata buffer_desc.addr = (uint64_t)addr; buffer_desc.length = length; int rc = metadata_->addLocalMemoryBuffer(buffer_desc, update_metadata); @@ -157,13 +152,10 @@ int RdmaTransport::unregisterLocalMemory(void *addr, bool update_metadata) { int rc = metadata_->removeLocalMemoryBuffer(addr, update_metadata); if (rc) return rc; - // Memory region unregistration - parallel or sequential based on configuration if (globalConfig().parallel_reg_mr) { - // Parallel memory region unregistration using std::async std::vector> unregistration_futures; unregistration_futures.reserve(context_list_.size()); - // Launch parallel unregistration tasks for (auto &context : context_list_) { unregistration_futures.emplace_back( std::async(std::launch::async, [&context, addr]() -> int { @@ -171,20 +163,20 @@ int RdmaTransport::unregisterLocalMemory(void *addr, bool update_metadata) { })); } - // Collect results and check for errors for (size_t i = 0; i < unregistration_futures.size(); ++i) { int ret = unregistration_futures[i].get(); if (ret) { - LOG(ERROR) << "Failed to unregister memory region with context " << i; + LOG(ERROR) << "Failed to unregister memory region with context " + << i; return ret; } } } else { - // Sequential memory region unregistration for (size_t i = 0; i < context_list_.size(); ++i) { int ret = context_list_[i]->unregisterMemoryRegion(addr); if (ret) { - LOG(ERROR) << "Failed to unregister memory region with context " << i; + LOG(ERROR) << "Failed to unregister memory region with context " + << i; return ret; } } From 3015f109550d5386013a692b5778694051151262 Mon Sep 17 00:00:00 2001 From: staryxchen Date: Fri, 19 Sep 2025 15:24:01 +0800 Subject: [PATCH 3/3] fix(config): change parallel memory region registration to default disabled - Default value of parallel_reg_mr changed from true to false - Environment variable switched from MC_DISABLE_PARALLEL_REG_MR to MC_ENABLE_PARALLEL_REG_MR Signed-off-by: staryxchen --- mooncake-transfer-engine/include/config.h | 2 +- mooncake-transfer-engine/src/config.cpp | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/mooncake-transfer-engine/include/config.h b/mooncake-transfer-engine/include/config.h index f700f9470..260d85bd1 100644 --- a/mooncake-transfer-engine/include/config.h +++ b/mooncake-transfer-engine/include/config.h @@ -48,7 +48,7 @@ struct GlobalConfig { bool use_ipv6 = false; size_t fragment_limit = 16384; bool enable_dest_device_affinity = false; - bool parallel_reg_mr = true; + bool parallel_reg_mr = false; }; void loadGlobalConfig(GlobalConfig &config); diff --git a/mooncake-transfer-engine/src/config.cpp b/mooncake-transfer-engine/src/config.cpp index c0dd5a84e..339951b71 100644 --- a/mooncake-transfer-engine/src/config.cpp +++ b/mooncake-transfer-engine/src/config.cpp @@ -259,11 +259,11 @@ void loadGlobalConfig(GlobalConfig &config) { config.enable_dest_device_affinity = true; } - const char *disable_parallel_reg_mr = - std::getenv("MC_DISABLE_PARALLEL_REG_MR"); - if (disable_parallel_reg_mr) { - LOG(INFO) << "Disable parallel register memory region"; - config.parallel_reg_mr = false; + const char *enable_parallel_reg_mr = + std::getenv("MC_ENABLE_PARALLEL_REG_MR"); + if (enable_parallel_reg_mr) { + LOG(INFO) << "Enable parallel register memory region"; + config.parallel_reg_mr = true; } }