Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 56 additions & 37 deletions mooncake-integration/transfer_engine/transfer_engine_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,30 @@ std::pair<std::string, std::string> parseConnectionString(
}

int TransferEnginePy::initialize(const char *local_hostname,
const char *metadata_server, const char *protocol,
const char *device_name) {
const char *metadata_server,
const char *protocol,
const char *device_name) {
auto conn_string = parseConnectionString(metadata_server);
return initializeExt(local_hostname, conn_string.second.c_str(), protocol,
device_name, conn_string.first.c_str());
}

int TransferEnginePy::initializeExt(const char *local_hostname,
const char *metadata_server,
const char *protocol, const char *device_name,
const char *metadata_type) {
const char *metadata_server,
const char *protocol,
const char *device_name,
const char *metadata_type) {
std::string conn_string = metadata_server;
if (conn_string.find("://") == std::string::npos)
conn_string =
std::string(metadata_type) + "://" + std::string(metadata_server);

// TODO: remove `false` in the feature, it's for keep same API in SGLang.
engine_ = std::make_unique<TransferEngine>(false);
auto_discovery_ = false;
if (device_name == nullptr || strlen(device_name) == 0) {
auto_discovery_ = true;
}

engine_ = std::make_unique<TransferEngine>(auto_discovery_);
if (getenv("MC_LEGACY_RPC_PORT_BINDING")) {
auto hostname_port = parseHostNameWithPort(local_hostname);
int ret =
Expand All @@ -101,24 +107,28 @@ int TransferEnginePy::initializeExt(const char *local_hostname,
if (ret) return -1;
}

xport_ = nullptr;
if (strcmp(protocol, "rdma") == 0) {
auto device_names = formatDeviceNames(device_name);
std::string nic_priority_matrix =
"{\"cpu:0\": [[" + device_names + "], []],"
"\"cuda:0\": [[" + device_names + "], []]}";
void **args = (void **)malloc(2 * sizeof(void *));
args[0] = (void *)nic_priority_matrix.c_str();
args[1] = nullptr;
xport_ = engine_->installTransport("rdma", args);
} else if (strcmp(protocol, "tcp") == 0) {
xport_ = engine_->installTransport("tcp", nullptr);
} else {
LOG(ERROR) << "Unsupported protocol";
return -1;
if (!auto_discovery_) {
xport_ = nullptr;
if (strcmp(protocol, "rdma") == 0) {
auto device_names = formatDeviceNames(device_name);
std::string nic_priority_matrix = "{\"cpu:0\": [[" + device_names +
"], []],"
"\"cuda:0\": [[" +
device_names + "], []]}";
void **args = (void **)malloc(2 * sizeof(void *));
args[0] = (void *)nic_priority_matrix.c_str();
args[1] = nullptr;
xport_ = engine_->installTransport("rdma", args);
} else if (strcmp(protocol, "tcp") == 0) {
xport_ = engine_->installTransport("tcp", nullptr);
} else {
LOG(ERROR) << "Unsupported protocol";
return -1;
}

if (!xport_) return -1;
}

if (!xport_) return -1;
free_list_.resize(kSlabSizeKBTabLen);
doBuddyAllocate(kMaxClassId);
return 0;
Expand Down Expand Up @@ -193,8 +203,10 @@ int TransferEnginePy::freeManagedBuffer(uintptr_t buffer_addr, size_t length) {
return 0;
}

int TransferEnginePy::transferSyncWrite(const char *target_hostname, uintptr_t buffer,
uintptr_t peer_buffer_address, size_t length) {
int TransferEnginePy::transferSyncWrite(const char *target_hostname,
uintptr_t buffer,
uintptr_t peer_buffer_address,
size_t length) {
Transport::SegmentHandle handle;
if (handle_map_.count(target_hostname)) {
handle = handle_map_[target_hostname];
Expand Down Expand Up @@ -229,8 +241,10 @@ int TransferEnginePy::transferSyncWrite(const char *target_hostname, uintptr_t b
}
}

int TransferEnginePy::transferSyncRead(const char *target_hostname, uintptr_t buffer,
uintptr_t peer_buffer_address, size_t length) {
int TransferEnginePy::transferSyncRead(const char *target_hostname,
uintptr_t buffer,
uintptr_t peer_buffer_address,
size_t length) {
Transport::SegmentHandle handle;
if (handle_map_.count(target_hostname)) {
handle = handle_map_[target_hostname];
Expand Down Expand Up @@ -309,16 +323,19 @@ int TransferEnginePy::transferSync(const char *target_hostname,

int TransferEnginePy::registerMemory(uintptr_t buffer_addr, size_t capacity) {
char *buffer = reinterpret_cast<char *>(buffer_addr);
std::string location = "cpu:0";
if (!auto_discovery_) {
std::string location = "cpu:0";
#ifdef USE_CUDA
// check pointer on GPU
cudaPointerAttributes attributes;
cudaPointerGetAttributes(&attributes, buffer);
if (attributes.type == cudaMemoryTypeDevice) {
location = "cuda:0";
}
// check pointer on GPU
cudaPointerAttributes attributes;
cudaPointerGetAttributes(&attributes, buffer);
if (attributes.type == cudaMemoryTypeDevice) {
location = "cuda:0";
}
#endif
return engine_->registerLocalMemory(buffer, capacity, location);
return engine_->registerLocalMemory(buffer, capacity, location);
}
return engine_->registerLocalMemory(buffer, capacity);
}

int TransferEnginePy::unregisterMemory(uintptr_t buffer_addr) {
Expand Down Expand Up @@ -348,13 +365,15 @@ PYBIND11_MODULE(engine, m) {
.def(py::init<>())
.def("initialize", &TransferEnginePy::initialize)
.def("initialize_ext", &TransferEnginePy::initializeExt)
.def("allocate_managed_buffer", &TransferEnginePy::allocateManagedBuffer)
.def("allocate_managed_buffer",
&TransferEnginePy::allocateManagedBuffer)
.def("free_managed_buffer", &TransferEnginePy::freeManagedBuffer)
.def("transfer_sync_write", &TransferEnginePy::transferSyncWrite)
.def("transfer_sync_read", &TransferEnginePy::transferSyncRead)
.def("transfer_sync", &TransferEnginePy::transferSync)
.def("write_bytes_to_buffer", &TransferEnginePy::writeBytesToBuffer)
.def("read_bytes_from_buffer", &TransferEnginePy::readBytesFromBuffer)
.def("read_bytes_from_buffer",
&TransferEnginePy::readBytesFromBuffer)
.def("register_memory", &TransferEnginePy::registerMemory)
.def("unregister_memory", &TransferEnginePy::unregisterMemory)
.def("get_first_buffer_address",
Expand Down
1 change: 1 addition & 0 deletions mooncake-integration/transfer_engine/transfer_engine_py.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,5 @@ class TransferEnginePy {
std::vector<char *> buffer_list_;
std::unordered_set<char *> large_buffer_list_;
std::unordered_map<std::string, Transport::SegmentHandle> handle_map_;
bool auto_discovery_;
};
5 changes: 3 additions & 2 deletions mooncake-transfer-engine/include/memory_location.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <glog/logging.h>

#include <memory>
#include <string>

#include "common.h"

Expand All @@ -30,11 +31,11 @@ struct MemoryLocationEntry {
std::string location;
};

// Get CPU numa node id
// TODO: support getting cuda device id from unified address.
const std::vector<MemoryLocationEntry> getMemoryLocation(void *start,
size_t len);

const static std::string kWildcardLocation = "*";

} // namespace mooncake

#endif // MEMORY_LOCATION_H
3 changes: 2 additions & 1 deletion mooncake-transfer-engine/include/transfer_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <vector>

#include "multi_transport.h"
#include "memory_location.h"
#include "transfer_metadata.h"
#include "transport/transport.h"

Expand Down Expand Up @@ -69,7 +70,7 @@ class TransferEngine {
int closeSegment(SegmentHandle handle);

int registerLocalMemory(void *addr, size_t length,
const std::string &location,
const std::string &location = kWildcardLocation,
bool remote_accessible = true,
bool update_metadata = true);

Expand Down
35 changes: 32 additions & 3 deletions mooncake-transfer-engine/src/memory_location.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,46 @@

#include "memory_location.h"

#ifdef USE_CUDA
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, USE_CUDA is off. So we need to set USE_CUDA as ON?

#include <cuda_runtime.h>
#endif

namespace mooncake {

uintptr_t alignPage(uintptr_t address) { return address & ~(pagesize - 1); }

std::string genCpuNodeName(int node) {
if (node >= 0) return "cpu:" + std::to_string(node);
return kWildcardLocation;
}

// use "*" when failed to get the numa node.
return "*";
std::string genGpuNodeName(int node) {
if (node >= 0) return "cuda:" + std::to_string(node);
return kWildcardLocation;
}

const std::vector<MemoryLocationEntry> getMemoryLocation(void *start,
size_t len) {
std::vector<MemoryLocationEntry> entries;

#ifdef USE_CUDA
cudaPointerAttributes attributes;
cudaError_t result;
result = cudaPointerGetAttributes(&attributes, start);
if (result != cudaSuccess) {
LOG(ERROR) << message << " (Error code: " << result << " - "
<< cudaGetErrorString(result) << ")" << std::endl;
entries.push_back({(uint64_t)start, len, kWildcardLocation});
return entries;
}

if (attributes.type == cudaMemoryTypeDevice) {
entries.push_back(
{(uint64_t)start, len, genGpuNodeName(attributes.device)});
return entries;
}
#endif

// start and end address may not be page aligned.
uintptr_t aligned_start = alignPage((uintptr_t)start);
int n = (uintptr_t(start) - aligned_start + len + pagesize - 1) / pagesize;
Expand All @@ -43,7 +68,9 @@ const std::vector<MemoryLocationEntry> getMemoryLocation(void *start,
if (rc != 0) {
PLOG(WARNING) << "Failed to get NUMA node, addr: " << start
<< ", len: " << len;
entries.push_back({(uint64_t)start, len, "*"});
entries.push_back({(uint64_t)start, len, kWildcardLocation});
free(pages);
free(status);
return entries;
}

Expand All @@ -61,6 +88,8 @@ const std::vector<MemoryLocationEntry> getMemoryLocation(void *start,
}
entries.push_back(
{start_addr, (uint64_t)start + len - start_addr, genCpuNodeName(node)});
free(pages);
free(status);
return entries;
}

Expand Down
7 changes: 3 additions & 4 deletions mooncake-transfer-engine/src/topology.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <sys/types.h>

#include "topology.h"
#include "memory_location.h"

namespace mooncake {
struct InfinibandDevice {
Expand Down Expand Up @@ -293,8 +294,7 @@ int Topology::resolve() {
hca_id_map[hca] = next_hca_map_index;
next_hca_map_index++;

// "*" means any device
resolved_matrix_["*"].preferred_hca.push_back(hca_id_map[hca]);
resolved_matrix_[kWildcardLocation].preferred_hca.push_back(hca_id_map[hca]);
}
resolved_matrix_[entry.first].preferred_hca.push_back(
hca_id_map[hca]);
Expand All @@ -305,8 +305,7 @@ int Topology::resolve() {
hca_id_map[hca] = next_hca_map_index;
next_hca_map_index++;

// "*" means any device
resolved_matrix_["*"].preferred_hca.push_back(hca_id_map[hca]);
resolved_matrix_[kWildcardLocation].preferred_hca.push_back(hca_id_map[hca]);
}
resolved_matrix_[entry.first].avail_hca.push_back(hca_id_map[hca]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ int RdmaTransport::registerLocalMemory(void *addr, size_t length,
}

// Get the memory location automatically after registered MR(pinned),
// when the name is "*".
if (name == "*") {
// when the name is kWildcardLocation("*").
if (name == kWildcardLocation) {
const std::vector<MemoryLocationEntry> entries =
getMemoryLocation(addr, length);
for (auto &entry : entries) {
Expand Down Expand Up @@ -451,6 +451,8 @@ int RdmaTransport::selectDevice(SegmentDesc *desc, uint64_t offset,
continue;
device_id = desc->topology.selectDevice(buffer_desc.name, retry_count);
if (device_id >= 0) return 0;
device_id = desc->topology.selectDevice(kWildcardLocation, retry_count);
if (device_id >= 0) return 0;
}

return ERR_ADDRESS_NOT_REGISTERED;
Expand Down
2 changes: 1 addition & 1 deletion mooncake-transfer-engine/tests/memory_location_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ TEST(MemoryLocationTest, MallocSimpleNode0) {

// check the memory location, no node before page fault
EXPECT_EQ(entries[0].start, reinterpret_cast<uint64_t>(addr));
EXPECT_EQ(entries[0].location, "*");
EXPECT_EQ(entries[0].location, mooncake::kWildcardLocation);
EXPECT_EQ(entries[0].len, static_cast<size_t>(size));

// trigger page fault
Expand Down
2 changes: 1 addition & 1 deletion mooncake-transfer-engine/tests/rdma_transport_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ int initiator() {
LOG_ASSERT(!rc);
#else
addr = allocateMemoryPool(ram_buffer_size, 0, false);
int rc = engine->registerLocalMemory(addr, ram_buffer_size, "*");
int rc = engine->registerLocalMemory(addr, ram_buffer_size, kWildcardLocation);
LOG_ASSERT(!rc);
#endif

Expand Down
5 changes: 3 additions & 2 deletions mooncake-transfer-engine/tests/topology_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <gtest/gtest.h>

#include "transfer_metadata.h"
#include "memory_location.h"

TEST(ToplogyTest, GetTopologyMatrix) {
mooncake::Topology topology;
Expand Down Expand Up @@ -96,10 +97,10 @@ TEST(ToplogyTest, TestSelectDeviceAny) {
topology.parse(json_str);
std::set<int> items = {0, 1};
int device;
device = topology.selectDevice("*", 2);
device = topology.selectDevice(mooncake::kWildcardLocation, 2);
ASSERT_TRUE(items.count(device));
items.erase(device);
device = topology.selectDevice("*", 1);
device = topology.selectDevice(mooncake::kWildcardLocation, 1);
ASSERT_TRUE(items.count(device));
items.erase(device);
ASSERT_TRUE(items.empty());
Expand Down
Loading