Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitlab/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ EXTRA_BUILD_ARGS=${3:-""}
# UCX_VERSION is the version of UCX to build override default with env variable.
UCX_VERSION=${UCX_VERSION:-v1.19.0}
# LIBFABRIC_VERSION is the version of libfabric to build override default with env variable.
LIBFABRIC_VERSION=${LIBFABRIC_VERSION:-v2.3.0}
LIBFABRIC_VERSION=${LIBFABRIC_VERSION:-v1.21.0}
# LIBFABRIC_INSTALL_DIR can be set via environment variable, defaults to INSTALL_DIR
LIBFABRIC_INSTALL_DIR=${LIBFABRIC_INSTALL_DIR:-$INSTALL_DIR}

Expand Down
4 changes: 2 additions & 2 deletions benchmark/nixlbench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ sudo ldconfig

**LibFabric:**
```bash
wget https://github.com/ofiwg/libfabric/releases/download/v2.3.0/libfabric-2.3.0.tar.bz2
tar xjf libfabric-2.3.0.tar.bz2 && cd libfabric-2.3.0
wget https://github.com/ofiwg/libfabric/releases/download/v1.21.0/libfabric-1.21.0.tar.bz2
tar xjf libfabric-1.21.0.tar.bz2 && cd libfabric-1.21.0
./configure --prefix=/usr/local --with-cuda=/usr/local/cuda --enable-cuda-dlopen --enable-efa
make -j$(nproc) && sudo make install
```
Expand Down
2 changes: 1 addition & 1 deletion benchmark/nixlbench/contrib/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ ARG DEFAULT_PYTHON_VERSION
ARG WHL_PYTHON_VERSIONS="3.12"
ARG WHL_PLATFORM="manylinux_2_39_$ARCH"
ARG BUILD_TYPE="release"
ARG LIBFABRIC_VERSION="v2.3.0"
ARG LIBFABRIC_VERSION="v1.21.0"
ARG NPROC

WORKDIR /workspace
Expand Down
2 changes: 1 addition & 1 deletion contrib/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ ARG NIXL_PREFIX="/usr/local/nixl"
ARG NIXL_PLUGIN_DIR="$NIXL_PREFIX/lib/$ARCH-linux-gnu/plugins"
ARG NPROC
ARG WHL_DEFAULT_PYTHON_VERSIONS="3.12"
ARG LIBFABRIC_VERSION="v2.3.0"
ARG LIBFABRIC_VERSION="v1.21.0"
ARG LIBFABRIC_INSTALL_PATH="/usr/local"

# Install build dependencies from Ubuntu repository
Expand Down
2 changes: 1 addition & 1 deletion contrib/Dockerfile.manylinux
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ FROM ${BASE_IMAGE}:${BASE_IMAGE_TAG}
ARG DEFAULT_PYTHON_VERSION="3.12"
ARG ARCH="x86_64"
ARG UCX_REF="v1.19.0"
ARG LIBFABRIC_VERSION="v2.3.0"
ARG LIBFABRIC_VERSION="v1.21.0"

RUN yum groupinstall -y 'Development Tools' && \
dnf install -y almalinux-release-synergy && \
Expand Down
3 changes: 1 addition & 2 deletions contrib/aws-efa/aws_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,14 @@ setup_cmd="set -x && \
git clone ${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY} && \
cd nixl && \
${GIT_CHECKOUT_CMD}"
efa_validation_cmd="fi_info -p efa"
build_cmd=".gitlab/build.sh \${NIXL_INSTALL_DIR} \${UCX_INSTALL_DIR}"

# Add timeout only if TEST_TIMEOUT is set (expects minutes)
if [ -n "$TEST_TIMEOUT" ]; then
test_cmd="timeout ${TEST_TIMEOUT}m ${test_cmd}"
fi

export AWS_CMD="${setup_cmd} && ${build_cmd} && ${efa_validation_cmd} && ${test_cmd}"
export AWS_CMD="${setup_cmd} && ${build_cmd} && ${test_cmd}"

# Generate AWS job properties json from template
envsubst < aws_vars.template > aws_vars.json
Expand Down
6 changes: 3 additions & 3 deletions src/plugins/libfabric/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ EFA Specific **Topology-Aware Optimization**: Hardware-aware GPU-to-EFA and NUMA
### Required Dependencies

- **Libfabric**
- Many system will have installed libfabric already. If not, custom libfabric installation is available via https://ofiwg.github.io/libfabric/ - Minimum required version: v2.3.0rc2
- For EFA enabled AWS instances, it is recommanded to install through AWS EFA installer: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start.html - Minimum required version: 1.43.2
- Many system will have installed libfabric already. If not, custom libfabric installation is available via https://ofiwg.github.io/libfabric/ - Minimum required version: v1.21.0
- For EFA enabled AWS instances, it is recommanded to install through AWS EFA installer: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start.html - Recommend to use the latest version

- **hwloc**
- hwloc is used to understand the underlying architecture to optimize application performance. Suggested version: 2.10.0 or newer
Expand All @@ -30,7 +30,7 @@ EFA Specific **Topology-Aware Optimization**: Hardware-aware GPU-to-EFA and NUMA
Validated compatiblity with:
- **AWS EFA** (Elastic Fabric Adapter)

Any other Libfabric providers that support heterogeneous memory (FI_HMEM) should also work but have not been validated in production environments. Community validation and feedback are highly appreciated!
Any other Libfabric providers should also work but have not been validated in production environments. Community validation and feedback are highly appreciated!

## Build Instructions

Expand Down
49 changes: 24 additions & 25 deletions src/plugins/libfabric/libfabric_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,13 @@ nixlLibfabricBackendH::nixlLibfabricBackendH(nixl_xfer_op_t op, const std::strin
// Initialize BinaryNotification
binary_notif.clear();

NIXL_DEBUG << "constructor called, this: " << this
NIXL_DEBUG << " handle constructor called, address: " << this
<< " total_requests_used=" << submitted_requests_.load()
<< " BinaryNotification initialized";
}

nixlLibfabricBackendH::~nixlLibfabricBackendH() {
NIXL_DEBUG << "destructor called, this: " << this;
NIXL_DEBUG << "handle destructor called, address: " << this;
}

// Multi-request completion tracking methods
Expand Down Expand Up @@ -425,7 +425,7 @@ nixlLibfabricEngine::getConnInfo(std::string &str) const {

NIXL_DEBUG << "Rail Manager serialized connection info for " << rail_manager.getNumDataRails()
<< " rails, " << rail_manager.getNumControlRails() << " control rails, "
<< "total size: " << str.length();
<< "total size=" << str.length();

return NIXL_SUCCESS;
}
Expand All @@ -436,7 +436,7 @@ nixlLibfabricEngine::loadRemoteConnInfo(const std::string &remote_agent,
std::lock_guard<std::mutex> lock(connection_state_mutex_);

NIXL_DEBUG << "Loading remote info for agent: " << remote_agent
<< ", info length: " << remote_conn_info.length()
<< ", info length=" << remote_conn_info.length()
<< ", info (hex): " << LibfabricUtils::hexdump(remote_conn_info.data());

if (remote_conn_info.empty()) {
Expand Down Expand Up @@ -475,13 +475,13 @@ nixlLibfabricEngine::connect(const std::string &remote_agent) {
std::lock_guard<std::mutex> lock(connection_state_mutex_);

NIXL_DEBUG << "Connecting to agent: " << remote_agent
<< ", connections_ size: " << connections_.size();
<< ", connections_ size=" << connections_.size();

// Check if connection is already established
auto it = connections_.find(remote_agent);
if (it != connections_.end() && it->second->overall_state_ == ConnectionState::CONNECTED) {
NIXL_DEBUG << "Connection already established for " << remote_agent
<< ", fi_addr: " << it->second->rail_remote_addr_list_[0][0];
<< ", fi_addr=" << it->second->rail_remote_addr_list_[0][0];
return NIXL_SUCCESS;
}

Expand Down Expand Up @@ -519,7 +519,7 @@ nixlLibfabricEngine::disconnect(const std::string &remote_agent) {
// Connection exists - check if already disconnected
if (it->second->overall_state_ == ConnectionState::DISCONNECTED) {
NIXL_DEBUG << "Connection already established for " << remote_agent
<< ", fi_addr: " << it->second->rail_remote_addr_list_[0][0];
<< ", fi_addr=" << it->second->rail_remote_addr_list_[0][0];
return NIXL_SUCCESS;
}
// TODO: Implement disconnect logic to cleanup the AV Address Entries from both local and remote
Expand Down Expand Up @@ -997,7 +997,7 @@ nixlLibfabricEngine::postXfer(const nixl_xfer_op_t &operation,
backend_handle->binary_notif.expected_completions =
0; // Will be incremented during transfer submission

NIXL_DEBUG << "Using pre-allocated BinaryNotification with XFER_ID: "
NIXL_DEBUG << "Using pre-allocated BinaryNotification with XFER_ID="
<< backend_handle->binary_notif.xfer_id;

nixlLibfabricReq::OpType op_type;
Expand Down Expand Up @@ -1032,8 +1032,8 @@ nixlLibfabricEngine::postXfer(const nixl_xfer_op_t &operation,
int gpu_id = local[desc_idx].devId;

NIXL_DEBUG << "Processing descriptor " << desc_idx << " GPU " << gpu_id
<< " local_addr: " << transfer_addr << " size: " << transfer_size
<< " remote_addr: " << (void *)remote[desc_idx].addr;
<< " local_addr: " << transfer_addr << " size=" << transfer_size
<< " remote_addr=" << (void *)remote[desc_idx].addr;

NIXL_DEBUG << "DEBUG: remote_agent='" << remote_agent << "' localAgent='" << localAgent
<< "'";
Expand Down Expand Up @@ -1091,7 +1091,7 @@ nixlLibfabricEngine::postXfer(const nixl_xfer_op_t &operation,
NIXL_ERROR << "Failed to send notification";
return notif_status;
}
NIXL_DEBUG << "Notification sent immediately with xfer_id: "
NIXL_DEBUG << "Notification sent immediately with XFER_ID="
<< backend_handle->binary_notif.xfer_id << ", expected_completions: "
<< backend_handle->binary_notif.expected_completions;
}
Expand Down Expand Up @@ -1259,25 +1259,24 @@ nixlLibfabricEngine::getNotifs(notif_list_t &notif_list) {
// Background progress function that continuously processes completions on all rails
nixl_status_t
nixlLibfabricEngine::cmThread() {
NIXL_DEBUG << "ConnectionManagement thread started successfully";
NIXL_DEBUG << "Initial receives already posted in main thread, entering progress loop";
NIXL_DEBUG << "CM: Thread started successfully";

// Main progress loop - continuously process completions on all rails
while (!cm_thread_stop_.load()) {

nixl_status_t status = rail_manager.progressAllControlRails();
if (status == NIXL_SUCCESS) {
NIXL_DEBUG << "Processed completions on control rails";
NIXL_DEBUG << "CM: Processed completions on control rails";
} else if (status != NIXL_IN_PROG && status != NIXL_SUCCESS) {
NIXL_ERROR << "Failed to process completions on control rails";
NIXL_ERROR << "CM: Failed to process completions on control rails";
return NIXL_ERR_BACKEND;
}
// Sleep briefly to avoid spinning too aggressively when blocking cq read is not used
if (!rail_manager.getControlRail(0).blocking_cq_sread_supported) {
std::this_thread::sleep_for(std::chrono::nanoseconds(10));
}
}
NIXL_DEBUG << "ConnectionManagement thread exiting cleanly";
NIXL_DEBUG << "CM: Thread exiting cleanly";
return NIXL_SUCCESS;
}

Expand All @@ -1288,24 +1287,24 @@ nixlLibfabricEngine::cmThread() {
// Progress thread that continuously processes completions only on data rails
nixl_status_t
nixlLibfabricEngine::progressThread() {
NIXL_DEBUG << "Progress thread started successfully for data rails only";
NIXL_DEBUG << "PT: Thread started successfully for data rails only";
// Main progress loop - continuously process completions only on data rails
while (!progress_thread_stop_.load()) {
// Process completions only on data rails (non-blocking)
bool any_completions = false;
nixl_status_t status = rail_manager.progressActiveDataRails();
if (status == NIXL_SUCCESS) {
any_completions = true;
NIXL_DEBUG << "Processed completions on data rails";
NIXL_DEBUG << "PT: Processed completions on data rails";
} else if (status != NIXL_IN_PROG && status != NIXL_SUCCESS) {
NIXL_ERROR << "Failed to process completions on data rails";
NIXL_ERROR << "PT: Failed to process completions on data rails";
// Don't return error, continue for robustness
}
if (!any_completions) {
std::this_thread::sleep_for(progress_thread_delay_);
}
}
NIXL_DEBUG << "Progress thread exiting cleanly";
NIXL_DEBUG << "PT: Thread exiting cleanly";
return NIXL_SUCCESS;
}

Expand Down Expand Up @@ -1356,11 +1355,11 @@ void
nixlLibfabricEngine::processNotification(const std::string &serialized_notif) {
// Only handle binary notification format
// Check if this is a binary notification (fixed size)
NIXL_DEBUG << "Received notification size: " << serialized_notif.size()
NIXL_DEBUG << "Received notification size=" << serialized_notif.size()
<< ", sizeof(Notification): " << sizeof(BinaryNotification);

if (serialized_notif.size() != sizeof(BinaryNotification)) {
NIXL_ERROR << "Invalid notification size: " << serialized_notif.size()
NIXL_ERROR << "Invalid notification size=" << serialized_notif.size()
<< ", expected: " << sizeof(BinaryNotification);
return;
}
Expand All @@ -1375,7 +1374,7 @@ nixlLibfabricEngine::processNotification(const std::string &serialized_notif) {
uint32_t expected_completions = binary_notif->expected_completions;

NIXL_TRACE << "Received notification from " << remote_name << " msg: " << msg
<< " xfer_id: " << xfer_id << " expected_completions: " << expected_completions;
<< " XFER_ID=" << xfer_id << " expected_completions: " << expected_completions;

// Check if this is a transfer notification that needs completions matching
if (expected_completions > 0) {
Expand Down Expand Up @@ -1426,7 +1425,7 @@ nixlLibfabricEngine::processConnectionAck(uint16_t agent_idx,
ConnectionState state) {
std::string remote_agent_name = agent_names_[agent_idx];
NIXL_DEBUG << "Connection state callback for agent " << remote_agent_name
<< " agent_idx: " << agent_idx;
<< " agent_idx=" << agent_idx;
std::lock_guard<std::mutex> lock(connections_[remote_agent_name]->conn_state_mutex_);
connections_[remote_agent_name]->overall_state_ = ConnectionState::CONNECTED;
connections_[remote_agent_name]->cv_.notify_all();
Expand Down Expand Up @@ -1478,7 +1477,7 @@ nixlLibfabricEngine::processConnectionRequest(uint16_t agent_idx,

NIXL_DEBUG << "Successfully inserted addresses for " << data_fi_addrs.size()
<< " data rails and " << control_fi_addrs.size() << " control rails"
<< ", initiator_control_fi_addr: " << initiator_control_fi_addr;
<< ", initiator_control_fi_addr=" << initiator_control_fi_addr;

// Send acknowledgement back to the initiator using the rail manager
size_t ep_name_len = sizeof(rail->ep_name);
Expand Down
4 changes: 2 additions & 2 deletions src/plugins/libfabric/libfabric_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ class nixlLibfabricConnection : public nixlBackendConnMD {
size_t agent_index_; // Unique agent identifier in agent_names vector
std::string remoteAgent_; // Remote agent name
std::unordered_map<size_t, std::vector<fi_addr_t>>
rail_remote_addr_list_; // Data rail libfabric addresses. Key: data rail id.
rail_remote_addr_list_; // Data rail libfabric addresses. key=data rail id.
std::unordered_map<size_t, std::vector<fi_addr_t>>
control_rail_remote_addr_list_; // Control rail libfabric addresses. Key: control rail id.
control_rail_remote_addr_list_; // Control rail libfabric addresses. key=control rail id.
std::vector<char *> src_ep_names_; // Data rail endpoint names
std::vector<char *> control_ep_names_; // Control rail endpoint names
ConnectionState overall_state_; // Current connection state
Expand Down
8 changes: 4 additions & 4 deletions src/utils/libfabric/libfabric_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ getAvailableNetworkDevices() {
std::string device_name = cur->domain_attr->name;
std::string provider_name = cur->fabric_attr->prov_name;

NIXL_TRACE << "Found device - domain: " << device_name
<< ", provider: " << provider_name << ", ep_type: " << cur->ep_attr->type
<< ", caps: 0x" << std::hex << cur->caps << std::dec;
NIXL_TRACE << "Found device - domain: " << device_name << ", provider=" << provider_name
<< ", ep_type=" << cur->ep_attr->type << ", caps=" << std::hex << cur->caps
<< std::dec;

if (provider_device_map.find(provider_name) == provider_device_map.end()) {
provider_device_map[provider_name] = {};
Expand All @@ -81,7 +81,7 @@ getAvailableNetworkDevices() {

for (auto device_list : provider_device_map) {
for (auto device : device_list.second) {
NIXL_TRACE << "Provider: " << device_list.first << ", Device: " << device;
NIXL_TRACE << "provider=" << device_list.first << ", device=" << device;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/utils/libfabric/libfabric_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

// Libfabric configuration constants
#define NIXL_LIBFABRIC_DEFAULT_CONTROL_RAILS 1
#define NIXL_LIBFABRIC_CQ_SREAD_TIMEOUT_SEC 1
#define NIXL_LIBFABRIC_CQ_SREAD_TIMEOUT_MS 1000
#define NIXL_LIBFABRIC_DEFAULT_STRIPING_THRESHOLD (128 * 1024) // 128KB
#define LF_EP_NAME_MAX_LEN 56

Expand Down
Loading