diff --git a/mooncake-integration/transfer_engine/transfer_engine_py.cpp b/mooncake-integration/transfer_engine/transfer_engine_py.cpp index 503e09c54..003e5ac62 100644 --- a/mooncake-integration/transfer_engine/transfer_engine_py.cpp +++ b/mooncake-integration/transfer_engine/transfer_engine_py.cpp @@ -100,32 +100,50 @@ int TransferEnginePy::initializeExt(const char *local_hostname, const char *device_name, const char *metadata_type) { std::string conn_string = buildConnString(metadata_type, metadata_server); + if (conn_string.find("://") == std::string::npos) + conn_string = + std::string(metadata_type) + "://" + std::string(metadata_server); + + auto_discovery_ = false; + if (device_name == nullptr || strlen(device_name) == 0) { + auto_discovery_ = true; + } - // TODO: remove `false` in the feature, it's for keep same API in SGLang. - engine_ = std::make_unique(false); - // the last two params are unused - int ret = engine_->init(conn_string, local_hostname, "", 0); - if (ret) return -1; - - xport_ = nullptr; - if (strcmp(protocol, "rdma") == 0) { - auto device_names = formatDeviceNames(device_name); - std::string nic_priority_matrix = "{\"cpu:0\": [[" + device_names + - "], []]," - "\"cuda:0\": [[" + - device_names + "], []]}"; - void **args = (void **)malloc(2 * sizeof(void *)); - args[0] = (void *)nic_priority_matrix.c_str(); - args[1] = nullptr; - xport_ = engine_->installTransport("rdma", args); - } else if (strcmp(protocol, "tcp") == 0) { - xport_ = engine_->installTransport("tcp", nullptr); + engine_ = std::make_unique(auto_discovery_); + if (getenv("MC_LEGACY_RPC_PORT_BINDING")) { + auto hostname_port = parseHostNameWithPort(local_hostname); + int ret = + engine_->init(conn_string, local_hostname, + hostname_port.first.c_str(), hostname_port.second); + if (ret) return -1; } else { - LOG(ERROR) << "Unsupported protocol"; - return -1; + // the last two params are unused + int ret = engine_->init(conn_string, local_hostname, "", 0); + if (ret) return -1; + } + + if (!auto_discovery_) { + xport_ = nullptr; + if (strcmp(protocol, "rdma") == 0) { + auto device_names = formatDeviceNames(device_name); + std::string nic_priority_matrix = "{\"cpu:0\": [[" + device_names + + "], []]," + "\"cuda:0\": [[" + + device_names + "], []]}"; + void **args = (void **)malloc(2 * sizeof(void *)); + args[0] = (void *)nic_priority_matrix.c_str(); + args[1] = nullptr; + xport_ = engine_->installTransport("rdma", args); + } else if (strcmp(protocol, "tcp") == 0) { + xport_ = engine_->installTransport("tcp", nullptr); + } else { + LOG(ERROR) << "Unsupported protocol"; + return -1; + } + + if (!xport_) return -1; } - if (!xport_) return -1; free_list_.resize(kSlabSizeKBTabLen); doBuddyAllocate(kMaxClassId); return 0; @@ -322,16 +340,19 @@ int TransferEnginePy::transferSync(const char *target_hostname, int TransferEnginePy::registerMemory(uintptr_t buffer_addr, size_t capacity) { char *buffer = reinterpret_cast(buffer_addr); - std::string location = "cpu:0"; + if (!auto_discovery_) { + std::string location = "cpu:0"; #ifdef USE_CUDA - // check pointer on GPU - cudaPointerAttributes attributes; - cudaPointerGetAttributes(&attributes, buffer); - if (attributes.type == cudaMemoryTypeDevice) { - location = "cuda:0"; - } + // check pointer on GPU + cudaPointerAttributes attributes; + cudaPointerGetAttributes(&attributes, buffer); + if (attributes.type == cudaMemoryTypeDevice) { + location = "cuda:0"; + } #endif - return engine_->registerLocalMemory(buffer, capacity, location); + return engine_->registerLocalMemory(buffer, capacity, location); + } + return engine_->registerLocalMemory(buffer, capacity); } int TransferEnginePy::unregisterMemory(uintptr_t buffer_addr) { diff --git a/mooncake-integration/transfer_engine/transfer_engine_py.h b/mooncake-integration/transfer_engine/transfer_engine_py.h index 26003762d..addc960c2 100644 --- a/mooncake-integration/transfer_engine/transfer_engine_py.h +++ b/mooncake-integration/transfer_engine/transfer_engine_py.h @@ -108,4 +108,5 @@ class TransferEnginePy { std::vector buffer_list_; std::unordered_set large_buffer_list_; std::unordered_map handle_map_; + bool auto_discovery_; }; diff --git a/mooncake-transfer-engine/include/memory_location.h b/mooncake-transfer-engine/include/memory_location.h index 9b6aef51d..8116958a4 100644 --- a/mooncake-transfer-engine/include/memory_location.h +++ b/mooncake-transfer-engine/include/memory_location.h @@ -18,6 +18,7 @@ #include #include +#include #include "common.h" @@ -30,11 +31,11 @@ struct MemoryLocationEntry { std::string location; }; -// Get CPU numa node id -// TODO: support getting cuda device id from unified address. const std::vector getMemoryLocation(void *start, size_t len); +const static std::string kWildcardLocation = "*"; + } // namespace mooncake #endif // MEMORY_LOCATION_H diff --git a/mooncake-transfer-engine/include/transfer_engine.h b/mooncake-transfer-engine/include/transfer_engine.h index 5cebc3256..12ff7ba41 100644 --- a/mooncake-transfer-engine/include/transfer_engine.h +++ b/mooncake-transfer-engine/include/transfer_engine.h @@ -31,6 +31,7 @@ #include #include "multi_transport.h" +#include "memory_location.h" #include "transfer_metadata.h" #include "transport/transport.h" @@ -71,7 +72,7 @@ class TransferEngine { int closeSegment(SegmentHandle handle); int registerLocalMemory(void *addr, size_t length, - const std::string &location, + const std::string &location = kWildcardLocation, bool remote_accessible = true, bool update_metadata = true); diff --git a/mooncake-transfer-engine/src/memory_location.cpp b/mooncake-transfer-engine/src/memory_location.cpp index 1c66038f1..467a32488 100644 --- a/mooncake-transfer-engine/src/memory_location.cpp +++ b/mooncake-transfer-engine/src/memory_location.cpp @@ -14,21 +14,46 @@ #include "memory_location.h" +#ifdef USE_CUDA +#include +#endif + namespace mooncake { uintptr_t alignPage(uintptr_t address) { return address & ~(pagesize - 1); } std::string genCpuNodeName(int node) { if (node >= 0) return "cpu:" + std::to_string(node); + return kWildcardLocation; +} - // use "*" when failed to get the numa node. - return "*"; +std::string genGpuNodeName(int node) { + if (node >= 0) return "cuda:" + std::to_string(node); + return kWildcardLocation; } const std::vector getMemoryLocation(void *start, size_t len) { std::vector entries; +#ifdef USE_CUDA + cudaPointerAttributes attributes; + cudaError_t result; + result = cudaPointerGetAttributes(&attributes, start); + if (result != cudaSuccess) { + LOG(ERROR) << message << " (Error code: " << result << " - " + << cudaGetErrorString(result) << ")" << std::endl; + entries.push_back({(uint64_t)start, len, kWildcardLocation}); + return entries; + } + + if (attributes.type == cudaMemoryTypeDevice) { + entries.push_back( + {(uint64_t)start, len, genGpuNodeName(attributes.device)}); + return entries; + } +#endif + // start and end address may not be page aligned. uintptr_t aligned_start = alignPage((uintptr_t)start); int n = (uintptr_t(start) - aligned_start + len + pagesize - 1) / pagesize; @@ -43,7 +68,9 @@ const std::vector getMemoryLocation(void *start, if (rc != 0) { PLOG(WARNING) << "Failed to get NUMA node, addr: " << start << ", len: " << len; - entries.push_back({(uint64_t)start, len, "*"}); + entries.push_back({(uint64_t)start, len, kWildcardLocation}); + free(pages); + free(status); return entries; } @@ -61,6 +88,8 @@ const std::vector getMemoryLocation(void *start, } entries.push_back( {start_addr, (uint64_t)start + len - start_addr, genCpuNodeName(node)}); + free(pages); + free(status); return entries; } diff --git a/mooncake-transfer-engine/src/topology.cpp b/mooncake-transfer-engine/src/topology.cpp index 2baa8b431..94e6c310f 100644 --- a/mooncake-transfer-engine/src/topology.cpp +++ b/mooncake-transfer-engine/src/topology.cpp @@ -36,6 +36,7 @@ #include #include "topology.h" +#include "memory_location.h" namespace mooncake { struct InfinibandDevice { @@ -293,8 +294,7 @@ int Topology::resolve() { hca_id_map[hca] = next_hca_map_index; next_hca_map_index++; - // "*" means any device - resolved_matrix_["*"].preferred_hca.push_back(hca_id_map[hca]); + resolved_matrix_[kWildcardLocation].preferred_hca.push_back(hca_id_map[hca]); } resolved_matrix_[entry.first].preferred_hca.push_back( hca_id_map[hca]); @@ -305,8 +305,7 @@ int Topology::resolve() { hca_id_map[hca] = next_hca_map_index; next_hca_map_index++; - // "*" means any device - resolved_matrix_["*"].preferred_hca.push_back(hca_id_map[hca]); + resolved_matrix_[kWildcardLocation].preferred_hca.push_back(hca_id_map[hca]); } resolved_matrix_[entry.first].avail_hca.push_back(hca_id_map[hca]); } diff --git a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp index a4da14f3e..678a4908e 100644 --- a/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp +++ b/mooncake-transfer-engine/src/transport/rdma_transport/rdma_transport.cpp @@ -100,8 +100,8 @@ int RdmaTransport::registerLocalMemory(void *addr, size_t length, } // Get the memory location automatically after registered MR(pinned), - // when the name is "*". - if (name == "*") { + // when the name is kWildcardLocation("*"). + if (name == kWildcardLocation) { const std::vector entries = getMemoryLocation(addr, length); for (auto &entry : entries) { @@ -448,6 +448,8 @@ int RdmaTransport::selectDevice(SegmentDesc *desc, uint64_t offset, continue; device_id = desc->topology.selectDevice(buffer_desc.name, retry_count); if (device_id >= 0) return 0; + device_id = desc->topology.selectDevice(kWildcardLocation, retry_count); + if (device_id >= 0) return 0; } return ERR_ADDRESS_NOT_REGISTERED; diff --git a/mooncake-transfer-engine/tests/memory_location_test.cpp b/mooncake-transfer-engine/tests/memory_location_test.cpp index 5f5448020..865344568 100644 --- a/mooncake-transfer-engine/tests/memory_location_test.cpp +++ b/mooncake-transfer-engine/tests/memory_location_test.cpp @@ -16,7 +16,7 @@ TEST(MemoryLocationTest, MallocSimpleNode0) { // check the memory location, no node before page fault EXPECT_EQ(entries[0].start, reinterpret_cast(addr)); - EXPECT_EQ(entries[0].location, "*"); + EXPECT_EQ(entries[0].location, mooncake::kWildcardLocation); EXPECT_EQ(entries[0].len, static_cast(size)); // trigger page fault diff --git a/mooncake-transfer-engine/tests/rdma_transport_test.cpp b/mooncake-transfer-engine/tests/rdma_transport_test.cpp index 92e7b0a83..5d040b614 100644 --- a/mooncake-transfer-engine/tests/rdma_transport_test.cpp +++ b/mooncake-transfer-engine/tests/rdma_transport_test.cpp @@ -274,7 +274,7 @@ int initiator() { LOG_ASSERT(!rc); #else addr = allocateMemoryPool(ram_buffer_size, 0, false); - int rc = engine->registerLocalMemory(addr, ram_buffer_size, "*"); + int rc = engine->registerLocalMemory(addr, ram_buffer_size, kWildcardLocation); LOG_ASSERT(!rc); #endif diff --git a/mooncake-transfer-engine/tests/topology_test.cpp b/mooncake-transfer-engine/tests/topology_test.cpp index 513de13cb..5b39d5d55 100644 --- a/mooncake-transfer-engine/tests/topology_test.cpp +++ b/mooncake-transfer-engine/tests/topology_test.cpp @@ -4,6 +4,7 @@ #include #include "transfer_metadata.h" +#include "memory_location.h" TEST(ToplogyTest, GetTopologyMatrix) { mooncake::Topology topology; @@ -96,10 +97,10 @@ TEST(ToplogyTest, TestSelectDeviceAny) { topology.parse(json_str); std::set items = {0, 1}; int device; - device = topology.selectDevice("*", 2); + device = topology.selectDevice(mooncake::kWildcardLocation, 2); ASSERT_TRUE(items.count(device)); items.erase(device); - device = topology.selectDevice("*", 1); + device = topology.selectDevice(mooncake::kWildcardLocation, 1); ASSERT_TRUE(items.count(device)); items.erase(device); ASSERT_TRUE(items.empty());