diff --git a/examples/python/NIXL_API_2PROC.md b/examples/python/NIXL_API_2PROC.md new file mode 100644 index 000000000..7777d3571 --- /dev/null +++ b/examples/python/NIXL_API_2PROC.md @@ -0,0 +1,320 @@ +# NIXL Two-Process Example + +## Overview + +A **basic two-process communication pattern** using NIXL demonstrating fundamental data transfer operations. This example shows both `initialize_xfer` and `make_prepped_xfer` transfer modes in a simple target-initiator pattern. + +**Key Features:** +- Two transfer modes (initialize vs. prepared) +- Target-initiator pattern +- Synchronous transfer completion +- Reusable utility functions +- Simple TCP-based metadata exchange + +--- + +## Quick Start + +### Usage + +```bash +# Run the example (assumes NIXL is properly installed) +python3 nixl_api_2proc.py +``` + +**Expected Output:** +``` +[main] Starting TCP metadata server... +[main] Starting target and initiator processes... +[target] Starting target process +[initiator] Starting initiator process +[target] Memory registered +[target] Published metadata and xfer descriptors to TCP server +[target] Waiting for transfers... +[initiator] Memory registered +[initiator] Waiting for target metadata... +[initiator] Loaded remote agent: target +[initiator] Successfully retrieved target descriptors +[initiator] Starting transfer 1 (READ)... +[initiator] Initial transfer state: PROC +[initiator] Transfer 1 done +[target] Transfer 1 done +[initiator] Starting transfer 2 (WRITE)... +[initiator] Transfer 2 done +[target] Transfer 2 done +[target] Target process complete +[initiator] Initiator process complete +[main] ✓ Test Complete - Both processes succeeded! +``` + +--- + +## Architecture Summary + +### Processes + +**Target Process (lines 37-80):** +- Allocates and registers 2 buffers (256 bytes each) +- Publishes metadata and descriptors to TCP server +- Waits for transfers to complete (polling `check_remote_xfer_done`) +- Passive role - data is written to/read from its buffers + +**Initiator Process (lines 83-183):** +- Allocates and registers 2 buffers (256 bytes each) +- Retrieves target's metadata and descriptors +- Performs Transfer 1: READ (using `initialize_xfer`) +- Performs Transfer 2: WRITE (using `make_prepped_xfer`) +- Active role - initiates all transfers + +### Transfer Modes + +**Transfer 1 - Initialize Mode (lines 111-136):** +```python +xfer_handle_1 = nixl_agent2.initialize_xfer( + "READ", agent2_xfer_descs, agent1_xfer_descs, remote_name, b"UUID1" +) +state = nixl_agent2.transfer(xfer_handle_1) +# Poll for completion +while nixl_agent2.check_xfer_state(xfer_handle_1) != "DONE": + time.sleep(0.001) +``` + +**Transfer 2 - Prepared Mode (lines 138-172):** +```python +local_prep_handle = nixl_agent2.prep_xfer_dlist( + "NIXL_INIT_AGENT", [(addr3, buf_size, 0), (addr4, buf_size, 0)], "DRAM" +) +remote_prep_handle = nixl_agent2.prep_xfer_dlist( + remote_name, agent1_xfer_descs, "DRAM" +) +xfer_handle_2 = nixl_agent2.make_prepped_xfer( + "WRITE", local_prep_handle, [0, 1], remote_prep_handle, [1, 0], b"UUID2" +) +``` + +--- + +## Code Structure + +### Phase 1: Setup (lines 37-57, 83-101) + +**Target:** +```python +# Create agent with UCX backend +agent_config = nixl_agent_config(backends=["UCX"]) +nixl_agent1 = nixl_agent("target", agent_config) + +# Allocate memory (2 buffers, 256 bytes each) +addr1 = nixl_utils.malloc_passthru(buf_size * 2) +addr2 = addr1 + buf_size + +# Create descriptors (4-tuple for registration, 3-tuple for transfer) +agent1_reg_descs = nixl_agent1.get_reg_descs(agent1_strings, "DRAM") +agent1_xfer_descs = nixl_agent1.get_xfer_descs(agent1_addrs, "DRAM") + +# Register with NIXL +nixl_agent1.register_memory(agent1_reg_descs) +``` + +**Initiator:** +```python +# Create agent (uses default config) +nixl_agent2 = nixl_agent("initiator", None) +# Similar allocation and registration... +``` + +### Phase 2: Metadata Exchange (lines 59-62, 103-109) + +```python +# Target: Publish +publish_agent_metadata(nixl_agent1, "target_meta") +publish_descriptors(nixl_agent1, agent1_xfer_descs, "target_descs") + +# Initiator: Retrieve +remote_name = retrieve_agent_metadata(nixl_agent2, "target_meta", + timeout=10.0, role_name="initiator") +agent1_xfer_descs = retrieve_descriptors(nixl_agent2, "target_descs") +``` + +### Phase 3: Transfers (lines 111-172) + +**Transfer 1 - Simple approach:** +- Use `initialize_xfer()` for one-time transfers +- Simpler API, creates transfer on-the-fly +- Good for occasional transfers + +**Transfer 2 - Optimized approach:** +- Use `prep_xfer_dlist()` + `make_prepped_xfer()` +- Pre-creates reusable transfer handles +- Better for repeated transfers + +### Phase 4: Synchronization (lines 64-75) + +**Target waits for completion:** +```python +while not nixl_agent1.check_remote_xfer_done("initiator", b"UUID1"): + time.sleep(0.001) +``` + +**Initiator polls transfer state:** +```python +while nixl_agent2.check_xfer_state(xfer_handle_1) != "DONE": + time.sleep(0.001) +``` + +--- + +## Utility Functions + +### From `nixl_metadata_utils.py` + +- **`publish_agent_metadata(agent, key)`** - Publish agent metadata to TCP server +- **`retrieve_agent_metadata(agent, key, timeout=10.0, role_name)`** - Retrieve remote agent (customizable timeout) +- **`publish_descriptors(agent, xfer_descs, key)`** - Publish serialized descriptors +- **`retrieve_descriptors(agent, key)`** - Retrieve and deserialize descriptors + +### From `tcp_server.py` + +- Simple key-value store for metadata exchange +- Only used during setup phase +- Not involved in actual data transfers + +--- + +## Key NIXL Concepts + +1. **Memory Registration**: `agent.register_memory(reg_descs)` before transfers +2. **Agent Metadata**: Exchange via `get_agent_metadata()` and `add_remote_agent()` +3. **Descriptor Types**: + - **Registration descriptors**: 4-tuple `(addr, size, dev_id, name)` for memory registration + - **Transfer descriptors**: 3-tuple `(addr, size, dev_id)` for actual transfers +4. **Transfer Modes**: + - **Initialize mode**: `initialize_xfer()` - simple, one-shot + - **Prepared mode**: `prep_xfer_dlist()` + `make_prepped_xfer()` - optimized, reusable +5. **Transfer Operations**: + - **READ**: Initiator reads from target's memory + - **WRITE**: Initiator writes to target's memory +6. **Synchronization**: + - **Local**: `check_xfer_state()` - check local transfer status + - **Remote**: `check_remote_xfer_done()` - check if remote agent completed transfer + +--- + +## Comparison: Initialize vs. Prepared Transfer + +| Aspect | Initialize Mode | Prepared Mode | +|--------|----------------|---------------| +| **API** | `initialize_xfer()` | `prep_xfer_dlist()` + `make_prepped_xfer()` | +| **Setup** | Simple, one call | More complex, two-step | +| **Reusability** | One-time use | Reusable handles | +| **Performance** | Good for occasional | Better for repeated | +| **Use Case** | Simple transfers | High-frequency transfers | +| **Example** | Transfer 1 (line 113) | Transfer 2 (line 150) | + +--- + +## References + +- **Advanced Example**: `nixl_sender_receiver.py` - Queue-based flow control +- **Utility Functions**: `nixl_metadata_utils.py`, `nixl_memory_utils.py` +- **NIXL Examples**: `nixl_api_example.py` + +--- + +## Detailed Architecture Diagrams + +### Setup and Metadata Exchange + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ SETUP PHASE │ +└─────────────────────────────────────────────────────────────────────────────┘ + + TCP Metadata Server Target Process Initiator Process + (Port 9998) ┌──────────────┐ ┌──────────────┐ + │ │ │ │ │ + │ │ 1. Create │ │ 1. Create │ + │ │ Agent │ │ Agent │ + │ │ (UCX) │ │ (default) │ + │ │ │ │ │ + │ │ 2. Allocate │ │ 2. Allocate │ + │ │ 2 buffers │ │ 2 buffers │ + │ │ (256B ea) │ │ (256B ea) │ + │ │ │ │ │ + │ │ 3. Register │ │ 3. Register │ + │ │ Memory │ │ Memory │ + │ │ │ │ │ + │◄────(publish)───┤ │ │ │ + │ "target_meta" │ │ │ │ + │ + descriptors │ │ │ │ + │ │ │ │ │ + │ │ 4. Wait for │ │ │ + │ │ transfers │ │ │ + │ │ │ │ │ + │─(retrieve)──────────────────────────────────►│ 4. Retrieve │ + │ "target_meta" + descriptors │ metadata │ + │ │ │ │ │ + │ │ │ │ 5. Add │ + │ │ │ │ remote │ + │ │ │ │ agent │ + │ └──────────────┘ └──────┬───────┘ + │ │ + │ Connection Established │ + │ │ + (TCP server only used for metadata exchange, not for data transfers) +``` + +### Transfer Flow + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ TRANSFER PHASE │ +└─────────────────────────────────────────────────────────────────────────────┘ + + Target Initiator + (Passive) (Active) + │ │ + │ Waiting for transfers... │ + │ │ + │ │ + │ ┌───────────────┴──────────┐ + │ │ Transfer 1: READ │ + │ │ Mode: initialize_xfer │ + │ └───────────────┬──────────┘ + │ │ + │◄──────────────────────RDMA READ (256B)─────────────────────┤ + │ Initiator reads from target's buffer │ + │ │ + │ check_remote_xfer_done("initiator", "UUID1") │ + │ → TRUE │ + │ │ + │ Transfer 1 done ───────────────────────────────────────► check_xfer_state() + │ → DONE + │ │ + │ │ + │ ┌───────────────┴──────────┐ + │ │ Transfer 2: WRITE │ + │ │ Mode: make_prepped_xfer │ + │ └───────────────┬──────────┘ + │ │ + │◄──────────────────────RDMA WRITE (512B)────────────────────┤ + │ Initiator writes to target's buffer │ + │ (crossing: writes buf2→buf1, buf1→buf2) │ + │ │ + │ check_remote_xfer_done("initiator", "UUID2") │ + │ → TRUE │ + │ │ + │ Transfer 2 done ───────────────────────────────────────► check_xfer_state() + │ → DONE + │ │ + │ Cleanup & exit ◄───────────────────────────────────────► Cleanup & exit + │ │ +``` + +--- + +## License + +SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +SPDX-License-Identifier: Apache-2.0 + diff --git a/examples/python/NIXL_SENDER_RECEIVER.md b/examples/python/NIXL_SENDER_RECEIVER.md new file mode 100644 index 000000000..b94db861b --- /dev/null +++ b/examples/python/NIXL_SENDER_RECEIVER.md @@ -0,0 +1,322 @@ +# NIXL Sender-Receiver Example + +## Overview + +A **queue-based producer-consumer pattern** using NIXL with head/tail pointer flow control. Demonstrates circular buffer management with RDMA WRITE operations for high-throughput streaming. + +**Key Features:** +- Queue-based flow control (head/tail pointers) +- RDMA WRITE for data and control +- Circular buffer management +- Bandwidth measurement +- Reusable utility functions + +--- + +## Quick Start + +### Configuration + +Edit constants at the top of `nixl_sender_receiver.py`: + +```python +NUM_BUFFERS = 2 # Queue size (2 optimal for point-to-point) +BUFFER_SIZE = 16 * 1024 * 1024 # 16MB per buffer +NUM_TRANSFERS = 100 # Number of transfers to perform +``` + +### Usage + +```bash +# Run the example (assumes NIXL is properly installed) +python3 nixl_sender_receiver.py +``` + +**Expected Output:** +``` +[main] Starting sender-receiver: queue_size=2, num_transfers=100, buffer_size=16777216 +[receiver] Starting +[sender] Starting +... +[sender] Completed 100 transfers in 1.467s +[sender] Bandwidth: 1091.01 MB/s +[receiver] Completed 100 transfers in 1.447s +[receiver] Bandwidth: 1105.43 MB/s +[main] ✓ Success! +``` + +--- + +## Architecture Summary + +### Memory Layout + +**Receiver:** +``` +[Tail(8B)][Buffer0][Buffer1]... ← Sender WRITES data here +[Head(8B)] ← Receiver WRITES to sender +``` + +**Sender:** +``` +[Tail(8B)] ← Local update only +[Buffer0][Buffer1]... ← Local data preparation +[Head(8B)] ← Receiver WRITES here +``` + +### Flow Control + +**Queue States:** +- Empty: `Tail == Head` +- Full: `(Tail + 1) % NUM_BUFFERS == Head` +- Buffer index: `Tail % NUM_BUFFERS` (sender), `Head % NUM_BUFFERS` (receiver) + +**Sender:** Check queue not full → prepare data → RDMA WRITE data → update tail → RDMA WRITE tail + +**Receiver:** Check queue not empty → process data → update head → RDMA WRITE head + +--- + +## Code Structure + +### Phase 1: Setup (lines 50-88, 208-241) +```python +# Create NIXL agent +agent = nixl_agent("receiver", nixl_agent_config(backends=["UCX"])) + +# Allocate and register memory +tail_and_buffers_addr = nixl_utils.malloc_passthru(8 + NUM_BUFFERS * BUFFER_SIZE) +head_addr = nixl_utils.malloc_passthru(8) +agent.register_memory(reg_descs) +``` + +### Phase 2: Metadata Exchange (lines 90-102, 243-255) +```python +# Publish own metadata and descriptors +publish_agent_metadata(agent, "receiver_meta") +publish_descriptors(agent, tail_descs, "receiver_tail_desc") +publish_descriptors(agent, head_descs, "receiver_head_desc") + +# Retrieve remote agent +remote_name = retrieve_agent_metadata(agent, "sender_meta", + timeout=10.0, role_name="receiver") +sender_descs = retrieve_descriptors(agent, "sender_tail_desc") +``` + +### Phase 3: Transfer Preparation (lines 106-115, 259-290) +```python +# Prepare reusable transfer handles +local_prep = agent.prep_xfer_dlist("NIXL_INIT_AGENT", local_list, "DRAM") +remote_prep = agent.prep_xfer_dlist(remote_name, remote_descs, "DRAM") +xfer_handle = agent.make_prepped_xfer("WRITE", local_prep, [0], + remote_prep, [0], b"UUID") +``` + +### Phase 4: Main Loop (lines 134-177, 317-371) +```python +# Receiver: consume queue +while transfers_received < NUM_TRANSFERS: + remote_tail = read_uint64(local_tail_addr) + if remote_tail != local_head: # Not empty + process_buffer(local_head % NUM_BUFFERS) + local_head = (local_head + 1) % NUM_BUFFERS + write_uint64(head_addr, local_head) + agent.transfer(head_xfer_handle) # RDMA WRITE head + +# Sender: fill queue +while transfers_sent < NUM_TRANSFERS: + remote_head = read_uint64(head_addr) + if (local_tail + 1) % NUM_BUFFERS != remote_head: # Not full + prepare_buffer(local_tail % NUM_BUFFERS) + agent.transfer(buffer_xfer_handles[local_tail % NUM_BUFFERS]) + local_tail = (local_tail + 1) % NUM_BUFFERS + write_uint64(tail_addr, local_tail) + agent.transfer(tail_xfer_handle) # RDMA WRITE tail +``` + +--- + +## Utility Functions + +### `nixl_metadata_utils.py` + +- **`publish_agent_metadata(agent, key)`** - Publish agent metadata to TCP server +- **`retrieve_agent_metadata(agent, key, timeout=10.0, role_name)`** - Retrieve remote agent (customizable timeout) +- **`publish_descriptors(agent, xfer_descs, key)`** - Publish serialized descriptors +- **`retrieve_descriptors(agent, key)`** - Retrieve and deserialize descriptors + +### `nixl_memory_utils.py` + +- **`write_uint64(addr, value)`** - Write 64-bit integer +- **`read_uint64(addr)`** - Read 64-bit integer +- **`write_data(addr, data)`** - Write NumPy array/bytes +- **`read_data(addr, size)`** - Read data as NumPy array + +--- + + +## Key NIXL Concepts + +1. **Memory Registration**: `agent.register_memory(reg_descs)` before transfers +2. **Descriptor Serialization**: Share memory regions via `get_serialized_descs()`/`deserialize_descs()` +3. **Prepared Transfers**: Pre-create handles with `prep_xfer_dlist()` + `make_prepped_xfer()` for reuse +4. **RDMA WRITE**: One-sided operation, direct remote memory write +5. **Asynchronous Transfers**: `transfer()` is non-blocking, poll with `check_xfer_state()` + +--- + +--- + +## References + +- **Simple Example**: `nixl_api_2proc.py` - Basic two-process transfers +- **Utility Functions**: `nixl_metadata_utils.py`, `nixl_memory_utils.py` +- **NIXL Examples**: `nixl_api_example.py` + +--- + +## Detailed Architecture Diagrams + +### Setup Phase + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ SETUP PHASE │ +└─────────────────────────────────────────────────────────────────────────────┘ + + TCP Metadata Server Receiver Process Sender Process + (Port 9998) ┌──────────────┐ ┌──────────────┐ + │ │ │ │ │ + │ │ 1. Create │ │ 1. Create │ + │ │ NIXL │ │ NIXL │ + │ │ Agent │ │ Agent │ + │ │ │ │ │ + │ │ 2. Allocate: │ │ 2. Allocate: │ + │ │ • Tail + │ │ • Tail │ + │ │ Buffers │ │ • Head │ + │ │ • Head │ │ • Buffers │ + │ │ │ │ │ + │◄────(publish)───┤ │ │ │ + │ "receiver_ │ │ │ │ + │ metadata" │ │ │ │ + │ + descriptors │ │ │ │ + │ │ │ │ │ + │◄────────────────────────────(publish)────────┤ │ + │ "sender_metadata" │ │ + │ + descriptors │ │ + │ │ │ │ │ + │─(retrieve)──────► │ │ │ + │ "sender_meta" │ │ │ │ + │ + descriptors │ │ │ │ + │ │ │ │ │ + │─(retrieve)──────────────────────────────────►│ │ + │ "receiver_meta" + descriptors │ │ + │ │ │ │ │ + │ │ 3. Add │ │ 3. Add │ + │ │ Remote │ │ Remote │ + │ │ Agent │ │ Agent │ + │ └──────┬───────┘ └──────┬───────┘ + │ │ │ + │ └──────NIXL Connection───────┘ + │ Established! + │ + (TCP server only used for initial metadata/descriptor exchange) +``` + +### Memory Layout Details + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ MEMORY LAYOUT │ +└─────────────────────────────────────────────────────────────────────────────┘ + +Receiver's Memory: +┌────────────┬──────────────┬──────────────┐ +│ Tail (8B) │ Buffer 0 │ Buffer 1 │ ← Sender WRITES here +└────────────┴──────────────┴──────────────┘ + ▲ + └─ Sender updates this via RDMA WRITE + +┌────────────┐ +│ Head (8B) │ ← Receiver updates locally, WRITES to sender +└────────────┘ + +Sender's Memory: +┌────────────┐ +│ Tail (8B) │ ← Sender updates locally +└────────────┘ + +┌────────────┬──────────────┬──────────────┐ +│ Buffers │ Buffer 0 │ Buffer 1 │ ← Sender fills locally +└────────────┴──────────────┴──────────────┘ + +┌────────────┐ +│ Head (8B) │ ← Receiver WRITES here +└────────────┘ +``` + +### Main Transfer Loop + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ MAIN TRANSFER LOOP (Queue-Based Flow Control) │ +└─────────────────────────────────────────────────────────────────────────────┘ + + Receiver Sender + (Consumer) (Producer) + │ │ + │ Initialize: Head = 0, Tail = 0 (queue empty) │ + │ │ + │◄───────────RDMA WRITE: Head = 0 (initial sync)────────────────┤ + │ │ + │ │ + │ Check: Tail+1 != Head? + │ (Queue not full) │ + │ │ + │ Prepare Buffer 0 │ + │ (Header: ID=0) │ + │ │ + │◄───────────RDMA WRITE: Data → receiver.buffer[0]──────────────┤ + │ │ + │◄───────────RDMA WRITE: Tail = 1 ──────────────────────────────┤ + │ │ + Read local Tail │ + (Tail=1 != Head=0) │ + Queue not empty! │ + │ │ + Process buffer 0 │ + Verify ID = 0 │ + │ │ + Update: Head = 1 │ + │ │ + │────────────RDMA WRITE: Head = 1 ─────────────────────────────►│ + │ │ + │ Read remote Head │ + │ (Head updated!) │ + │ │ + │ Check: Tail+1 != Head? + │ (Queue not full) │ + │ │ + │ Prepare Buffer 1 │ + │ │ + │◄──────────RDMA WRITE: Data → receiver.buffer[1]───────────────┤ + │◄──────────RDMA WRITE: Tail = 0 (wrapped)──────────────────────┤ + │ │ + Read local Tail │ + (Tail=0 != Head=1) │ + │ │ + Process buffer 1 │ + Update: Head = 0 │ + │ │ + │─────────RDMA WRITE: Head = 0 ────────────────────────────────►│ + │ │ + │ ... Continue circular queue operation ... │ + │ │ +``` + +--- + +## License + +SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +SPDX-License-Identifier: Apache-2.0 diff --git a/examples/python/nixl_api_2proc.py b/examples/python/nixl_api_2proc.py new file mode 100755 index 000000000..0826a2e83 --- /dev/null +++ b/examples/python/nixl_api_2proc.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python3 + +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +from multiprocessing import Process + +import tcp_server +from nixl_metadata_utils import ( + publish_agent_metadata, + publish_descriptors, + retrieve_agent_metadata, + retrieve_descriptors, +) + +import nixl._utils as nixl_utils +from nixl._api import nixl_agent, nixl_agent_config +from nixl.logging import get_logger + +# Configure logging +logger = get_logger(__name__) + + +def target_process(): + """Target process - receives data""" + buf_size = 256 + logger.info("[target] Starting target process") + + # Create agent + agent_config = nixl_agent_config(backends=["UCX"]) + nixl_agent1 = nixl_agent("target", agent_config) + + # Allocate and register memory + addr1 = nixl_utils.malloc_passthru(buf_size * 2) + addr2 = addr1 + buf_size + + agent1_addrs = [(addr1, buf_size, 0), (addr2, buf_size, 0)] + agent1_strings = [(addr1, buf_size, 0, "a"), (addr2, buf_size, 0, "b")] + + agent1_reg_descs = nixl_agent1.get_reg_descs(agent1_strings, "DRAM") + agent1_xfer_descs = nixl_agent1.get_xfer_descs(agent1_addrs, "DRAM") + + assert nixl_agent1.register_memory(agent1_reg_descs) is not None + logger.info("[target] Memory registered") + + # Publish metadata and descriptors + publish_agent_metadata(nixl_agent1, "target_meta") + publish_descriptors(nixl_agent1, agent1_xfer_descs, "target_descs") + logger.info("[target] Published metadata and xfer descriptors to TCP server") + + # Wait for initiator to complete transfers + logger.info("[target] Waiting for transfers...") + + # Check for transfer 1 completion + while not nixl_agent1.check_remote_xfer_done("initiator", b"UUID1"): + time.sleep(0.001) + logger.info("[target] Transfer 1 done") + + # Check for transfer 2 completion + while not nixl_agent1.check_remote_xfer_done("initiator", b"UUID2"): + time.sleep(0.001) + logger.info("[target] Transfer 2 done") + + # Cleanup + nixl_agent1.deregister_memory(agent1_reg_descs) + nixl_utils.free_passthru(addr1) + logger.info("[target] Target process complete") + + +def initiator_process(): + """Initiator process - sends data""" + buf_size = 256 + logger.info("[initiator] Starting initiator process") + + # Create agent + nixl_agent2 = nixl_agent("initiator", None) + addr3 = nixl_utils.malloc_passthru(buf_size * 2) + addr4 = addr3 + buf_size + + agent2_addrs = [(addr3, buf_size, 0), (addr4, buf_size, 0)] + agent2_strings = [(addr3, buf_size, 0, "a"), (addr4, buf_size, 0, "b")] + + agent2_reg_descs = nixl_agent2.get_reg_descs(agent2_strings, "DRAM") + agent2_xfer_descs = nixl_agent2.get_xfer_descs(agent2_addrs, "DRAM") + + agent2_descs = nixl_agent2.register_memory(agent2_reg_descs) + assert agent2_descs is not None + logger.info("[initiator] Memory registered") + + # Retrieve target's metadata and descriptors + remote_name = retrieve_agent_metadata(nixl_agent2, "target_meta", role_name="initiator") + if not remote_name: + return + + agent1_xfer_descs = retrieve_descriptors(nixl_agent2, "target_descs") + logger.info("[initiator] Successfully retrieved target descriptors") + + # Transfer 1: initialize transfer mode + logger.info("[initiator] Starting transfer 1 (READ)...") + xfer_handle_1 = nixl_agent2.initialize_xfer( + "READ", agent2_xfer_descs, agent1_xfer_descs, remote_name, b"UUID1" + ) + if not xfer_handle_1: + logger.error("[initiator] Creating transfer failed") + return + + state = nixl_agent2.transfer(xfer_handle_1) + logger.info("[initiator] Initial transfer state: %s", state) + if state == "ERR": + logger.error("[initiator] Transfer failed immediately") + return + + # Wait for transfer 1 to complete + init_done = False + while not init_done: + state = nixl_agent2.check_xfer_state(xfer_handle_1) + if state == "ERR": + logger.error("[initiator] Transfer got to Error state") + return + elif state == "DONE": + init_done = True + logger.info("[initiator] Transfer 1 done") + time.sleep(0.001) + + # Transfer 2: prep transfer mode + logger.info("[initiator] Starting transfer 2 (WRITE)...") + local_prep_handle = nixl_agent2.prep_xfer_dlist( + "NIXL_INIT_AGENT", [(addr3, buf_size, 0), (addr4, buf_size, 0)], "DRAM" + ) + remote_prep_handle = nixl_agent2.prep_xfer_dlist( + remote_name, agent1_xfer_descs, "DRAM" + ) + + assert local_prep_handle != 0 + assert remote_prep_handle != 0 + + xfer_handle_2 = nixl_agent2.make_prepped_xfer( + "WRITE", local_prep_handle, [0, 1], remote_prep_handle, [1, 0], b"UUID2" + ) + if not xfer_handle_2: + logger.error("[initiator] Make prepped transfer failed") + return + + state = nixl_agent2.transfer(xfer_handle_2) + if state == "ERR": + logger.error("[initiator] Transfer 2 failed immediately") + return + + # Wait for transfer 2 to complete + init_done = False + while not init_done: + state = nixl_agent2.check_xfer_state(xfer_handle_2) + if state == "ERR": + logger.error("[initiator] Transfer 2 got to Error state") + return + elif state == "DONE": + init_done = True + logger.info("[initiator] Transfer 2 done") + time.sleep(0.001) + + # Cleanup + nixl_agent2.release_xfer_handle(xfer_handle_1) + nixl_agent2.release_xfer_handle(xfer_handle_2) + nixl_agent2.release_dlist_handle(local_prep_handle) + nixl_agent2.release_dlist_handle(remote_prep_handle) + nixl_agent2.remove_remote_agent("target") + nixl_agent2.deregister_memory(agent2_reg_descs) + nixl_utils.free_passthru(addr3) + + logger.info("[initiator] Initiator process complete") + + +if __name__ == "__main__": + logger.info("Using NIXL Plugins from:\n%s", os.environ["NIXL_PLUGIN_DIR"]) + + # Start TCP metadata server + logger.info("[main] Starting TCP metadata server...") + try: + tcp_server.start_server(9998) + time.sleep(0.2) + except OSError: + pass # Server may already be running + tcp_server.clear_metadata("127.0.0.1", 9998) + + logger.info("[main] Starting target and initiator processes...") + + # Start both processes + target_proc = Process(target=target_process) + initiator_proc = Process(target=initiator_process) + + target_proc.start() + initiator_proc.start() + + # Wait for both to complete + target_proc.join() + initiator_proc.join() + + if target_proc.exitcode == 0 and initiator_proc.exitcode == 0: + logger.info("[main] ✓ Test Complete - Both processes succeeded!") + else: + logger.error(f"[main] ✗ Process error - Target: {target_proc.exitcode}, Initiator: {initiator_proc.exitcode}") diff --git a/examples/python/nixl_memory_utils.py b/examples/python/nixl_memory_utils.py new file mode 100644 index 000000000..7a8d94eec --- /dev/null +++ b/examples/python/nixl_memory_utils.py @@ -0,0 +1,97 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +NIXL Memory Utilities + +Helper functions for reading and writing data to memory addresses using NumPy. +These utilities provide a safe and efficient way to work with raw memory pointers. +""" + +import ctypes + +import numpy as np + + +def write_uint64(addr, value): + """ + Write uint64 to local memory using NumPy. + + Args: + addr: Memory address (integer) + value: 64-bit unsigned integer value to write + """ + # Create a NumPy view of the memory location (8 bytes for uint64) + char_buffer = (ctypes.c_char * 8).from_address(addr) + arr = np.ndarray((1,), dtype=np.uint64, buffer=char_buffer) + arr[0] = value + + +def read_uint64(addr): + """ + Read uint64 from local memory using NumPy. + + Args: + addr: Memory address (integer) + + Returns: + 64-bit unsigned integer value + """ + # Create a NumPy view of the memory location (8 bytes for uint64) + char_buffer = (ctypes.c_char * 8).from_address(addr) + arr = np.frombuffer(char_buffer, dtype=np.uint64, count=1) + return int(arr[0]) + + +def write_data(addr, data): + """ + Write data to local memory using NumPy. + + Args: + addr: Memory address (integer) + data: Data to write (NumPy array, bytes, or bytearray) + + Raises: + TypeError: If data type is not supported + """ + # Convert data to NumPy array if needed + if isinstance(data, np.ndarray): + src_arr = data + elif isinstance(data, (bytes, bytearray)): + src_arr = np.frombuffer(data, dtype=np.uint8) + else: + raise TypeError(f"Unsupported data type: {type(data)}") + + # Create a NumPy view of the destination and copy + char_buffer = (ctypes.c_char * len(src_arr)).from_address(addr) + dst_arr = np.ndarray(src_arr.shape, dtype=np.uint8, buffer=char_buffer) + np.copyto(dst_arr, src_arr) + + +def read_data(addr, size): + """ + Read data from local memory using NumPy. + + Args: + addr: Memory address (integer) + size: Number of bytes to read + + Returns: + NumPy array containing the read data + """ + # Create a NumPy view and return a copy + char_buffer = (ctypes.c_char * size).from_address(addr) + arr = np.ndarray((size,), dtype=np.uint8, buffer=char_buffer) + return arr.copy() diff --git a/examples/python/nixl_metadata_utils.py b/examples/python/nixl_metadata_utils.py new file mode 100644 index 000000000..0df26a09b --- /dev/null +++ b/examples/python/nixl_metadata_utils.py @@ -0,0 +1,126 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +NIXL Metadata Exchange Utilities + +Helper functions for publishing and retrieving NIXL agent metadata and descriptors +via TCP server for metadata exchange. +""" + +import base64 +import time + +import tcp_server + +from nixl.logging import get_logger + +logger = get_logger(__name__) + +DEFAULT_SERVER_HOST = "127.0.0.1" +DEFAULT_SERVER_PORT = 9998 +DEFAULT_TIMEOUT = 10.0 + + +def publish_agent_metadata(agent, key, host=DEFAULT_SERVER_HOST, port=DEFAULT_SERVER_PORT): + """ + Publish agent metadata to TCP server. + + Args: + agent: NIXL agent instance + key: Metadata key name + host: TCP server host + port: TCP server port + """ + metadata = agent.get_agent_metadata() + metadata_b64 = base64.b64encode(metadata).decode('utf-8') + tcp_server.set_metadata(key, metadata_b64, host, port) + + +def retrieve_agent_metadata(agent, key, host=DEFAULT_SERVER_HOST, port=DEFAULT_SERVER_PORT, + timeout=DEFAULT_TIMEOUT, role_name="process"): + """ + Retrieve remote agent metadata and add to local agent. + + Args: + agent: NIXL agent instance + key: Metadata key name + host: TCP server host + port: TCP server port + timeout: Timeout in seconds + role_name: Name for logging (e.g., "initiator", "sender") + + Returns: + Remote agent name (str) or None on failure + """ + logger.info(f"[{role_name}] Waiting for {key}...") + start_wait = time.time() + metadata_b64 = None + + while not metadata_b64 and (time.time() - start_wait) < timeout: + metadata_b64 = tcp_server.get_metadata(key, host, port) + if not metadata_b64: + time.sleep(0.1) + + if not metadata_b64: + logger.error(f"[{role_name}] Timeout waiting for {key}") + return None + + metadata = base64.b64decode(metadata_b64.encode('utf-8')) + remote_name = agent.add_remote_agent(metadata) + + # Convert bytes to string if needed + if isinstance(remote_name, bytes): + remote_name = remote_name.decode('utf-8') + + logger.info(f"[{role_name}] Loaded remote agent: {remote_name}") + return remote_name + + +def publish_descriptors(agent, xfer_descs, key, host=DEFAULT_SERVER_HOST, port=DEFAULT_SERVER_PORT): + """ + Serialize and publish descriptors to TCP server. + + Args: + agent: NIXL agent instance + xfer_descs: Transfer descriptors to publish + key: Metadata key name + host: TCP server host + port: TCP server port + """ + serialized = agent.get_serialized_descs(xfer_descs) + serialized_b64 = base64.b64encode(serialized).decode('utf-8') + tcp_server.set_metadata(key, serialized_b64, host, port) + + +def retrieve_descriptors(agent, key, host=DEFAULT_SERVER_HOST, port=DEFAULT_SERVER_PORT): + """ + Retrieve and deserialize descriptors from TCP server. + + Args: + agent: NIXL agent instance + key: Metadata key name + host: TCP server host + port: TCP server port + + Returns: + Deserialized descriptors or None on failure + """ + serialized_b64 = tcp_server.get_metadata(key, host, port) + if not serialized_b64: + return None + + serialized = base64.b64decode(serialized_b64.encode('utf-8')) + return agent.deserialize_descs(serialized) diff --git a/examples/python/nixl_sender_receiver.py b/examples/python/nixl_sender_receiver.py new file mode 100755 index 000000000..4a7011ded --- /dev/null +++ b/examples/python/nixl_sender_receiver.py @@ -0,0 +1,456 @@ +#!/usr/bin/env python3 + +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +NIXL Sender-Receiver Example: Queue-Based Flow Control + +Demonstrates a producer-consumer pattern using head/tail pointers with RDMA WRITE operations. +The sender fills a circular queue of buffers, and the receiver consumes them, with bandwidth reporting. +""" + +import os +import sys +import time +from multiprocessing import Process + +import numpy as np +import tcp_server +from nixl_memory_utils import read_data, read_uint64, write_data, write_uint64 +from nixl_metadata_utils import ( + publish_agent_metadata, + publish_descriptors, + retrieve_agent_metadata, + retrieve_descriptors, +) + +import nixl._utils as nixl_utils +from nixl._api import nixl_agent, nixl_agent_config +from nixl.logging import get_logger + +logger = get_logger(__name__) + +NUM_BUFFERS = 2 # Queue size (optimal) +BUFFER_SIZE = 16 * 1024 * 1024 # 16MB (optimal) +NUM_TRANSFERS = 100 + + +def receiver_process(): + """Receiver with queue-based flow control""" + logger.info("[receiver] Starting") + + # Create NIXL agent (single worker is optimal for point-to-point) + config = nixl_agent_config(backends=["UCX"]) + agent = nixl_agent("receiver", config) + + # Allocate Buffers + # Tail + data buffers (remote writes here) + tail_and_buffers_size = 8 + (NUM_BUFFERS * BUFFER_SIZE) + tail_and_buffers_addr = nixl_utils.malloc_passthru(tail_and_buffers_size) + + # Head (local, will write to remote) + head_addr = nixl_utils.malloc_passthru(8) + + # Register and create descriptors + tail_reg_desc = [(tail_and_buffers_addr, tail_and_buffers_size, 0, "tail_buffers")] + head_reg_desc = [(head_addr, 8, 0, "head")] + + tail_reg_descs = agent.get_reg_descs(tail_reg_desc, "DRAM") + head_reg_descs = agent.get_reg_descs(head_reg_desc, "DRAM") + + agent.register_memory(tail_reg_descs) + agent.register_memory(head_reg_descs) + + # Create xfer_descs for transfers - separate tail pointer from data buffers + tail_pointer_xfer_desc = [(tail_and_buffers_addr, 8, 0)] # Just the tail pointer + head_xfer_desc = [(head_addr, 8, 0)] + + # Create individual buffer descriptors (one for each buffer slot) + data_start_addr = tail_and_buffers_addr + 8 + buffers_xfer_desc = [(data_start_addr + i * BUFFER_SIZE, BUFFER_SIZE, 0) for i in range(NUM_BUFFERS)] + + tail_pointer_xfer_descs = agent.get_xfer_descs(tail_pointer_xfer_desc, "DRAM") + buffers_xfer_descs = agent.get_xfer_descs(buffers_xfer_desc, "DRAM") + head_xfer_descs = agent.get_xfer_descs(head_xfer_desc, "DRAM") + + logger.info(f"[receiver] Allocated: tail_buffers at 0x{tail_and_buffers_addr:x}, head at 0x{head_addr:x}") + + # Exchange metadata and descriptors + publish_agent_metadata(agent, "receiver_meta") + publish_descriptors(agent, tail_pointer_xfer_descs, "receiver_tail_desc") + publish_descriptors(agent, buffers_xfer_descs, "receiver_buffers_desc") + publish_descriptors(agent, head_xfer_descs, "receiver_head_desc") + + # Retrieve sender's metadata and descriptors + remote_name = retrieve_agent_metadata(agent, "sender_meta", role_name="receiver") + if not remote_name: + return + + # Note: sender_tail_desc not needed by receiver + sender_head_descs = retrieve_descriptors(agent, "sender_head_desc") + + logger.info(f"[receiver] Connected to {remote_name}") + + # Create xfer_handler for writing Head to sender + # Use deserialized reg_descs directly as the example does + local_prep = agent.prep_xfer_dlist("NIXL_INIT_AGENT", [(head_addr, 8, 0)], "DRAM") + remote_prep = agent.prep_xfer_dlist(remote_name, sender_head_descs, "DRAM") + head_xfer_handle = agent.make_prepped_xfer("WRITE", local_prep, [0], remote_prep, [0], b"HEAD_UPDATE") + + if not head_xfer_handle: + logger.error("[receiver] Failed to create head xfer handle") + return + + # Init local Head and Tail to 0 (empty) + local_head = 0 + local_tail_addr = tail_and_buffers_addr + write_uint64(head_addr, 0) + write_uint64(local_tail_addr, 0) + + # Transfer initial Head value + state = agent.transfer(head_xfer_handle) + if state == "ERR": + logger.error("[receiver] Failed to transfer initial head") + return + + # Wait for transfer to complete + while agent.check_xfer_state(head_xfer_handle) == "PENDING": + time.sleep(0.001) + + logger.info("[receiver] Initialized, starting main loop") + + # Main loop + transfers_received = 0 + data_start_addr = tail_and_buffers_addr + 8 + + # Performance tracking + start_time = time.time() + first_transfer_time = None + + while transfers_received < NUM_TRANSFERS: + # Read local tail (sender writes here) + remote_tail = read_uint64(local_tail_addr) + + # Check if not empty: tail != head + if remote_tail != local_head: + # Process buffer using NumPy + buffer_idx = local_head % NUM_BUFFERS + buffer_offset = data_start_addr + (buffer_idx * BUFFER_SIZE) + + # Read header (8 bytes) as NumPy array + header_data = read_data(buffer_offset, 8) + received_id = int(header_data.view(np.uint64)[0]) + + if received_id != transfers_received: + logger.error(f"[receiver] Mismatch! Expected {transfers_received}, got {received_id}") + + # Track first transfer time + if first_transfer_time is None: + first_transfer_time = time.time() + + # Update head + local_head = (local_head + 1) % NUM_BUFFERS + write_uint64(head_addr, local_head) + + # Transfer head + state = agent.transfer(head_xfer_handle) + if state == "ERR": + logger.error("[receiver] Transfer head failed") + break + + transfers_received += 1 + + if transfers_received % 10 == 0: + logger.info(f"[receiver] Processed {transfers_received}/{NUM_TRANSFERS}") + else: + # Queue is empty, wait + time.sleep(0.0001) + + end_time = time.time() + + # Calculate performance metrics + total_time = end_time - start_time + if first_transfer_time: + actual_transfer_time = end_time - first_transfer_time + else: + actual_transfer_time = total_time + + total_bytes = transfers_received * BUFFER_SIZE + bandwidth_mbps = (total_bytes / actual_transfer_time) / (1024 * 1024) if actual_transfer_time > 0 else 0 + + logger.info(f"[receiver] Completed {transfers_received} transfers in {actual_transfer_time:.3f}s") + logger.info(f"[receiver] Bandwidth: {bandwidth_mbps:.2f} MB/s") + + # Cleanup + agent.release_xfer_handle(head_xfer_handle) + agent.release_dlist_handle(local_prep) + agent.release_dlist_handle(remote_prep) + agent.deregister_memory(tail_reg_descs) + agent.deregister_memory(head_reg_descs) + nixl_utils.free_passthru(tail_and_buffers_addr) + nixl_utils.free_passthru(head_addr) + + +def sender_process(): + """Sender with queue-based flow control""" + logger.info("[sender] Starting") + + # Create NIXL agent (single worker is optimal for point-to-point) + config = nixl_agent_config(backends=["UCX"]) + agent = nixl_agent("sender", config) + + # Allocate Buffers + tail_addr = nixl_utils.malloc_passthru(8) + buffers_size = NUM_BUFFERS * BUFFER_SIZE + buffers_addr = nixl_utils.malloc_passthru(buffers_size) + + head_addr = nixl_utils.malloc_passthru(8) + + # Register and create descriptors + tail_reg_desc = [(tail_addr, 8, 0, "tail")] + buffers_reg_desc = [(buffers_addr, buffers_size, 0, "buffers")] + head_reg_desc = [(head_addr, 8, 0, "head")] + + tail_reg_descs = agent.get_reg_descs(tail_reg_desc, "DRAM") + buffers_reg_descs = agent.get_reg_descs(buffers_reg_desc, "DRAM") + head_reg_descs = agent.get_reg_descs(head_reg_desc, "DRAM") + + agent.register_memory(tail_reg_descs) + agent.register_memory(buffers_reg_descs) + agent.register_memory(head_reg_descs) + + # Create xfer_descs for transfers + tail_xfer_desc = [(tail_addr, 8, 0)] + head_xfer_desc = [(head_addr, 8, 0)] + + tail_xfer_descs = agent.get_xfer_descs(tail_xfer_desc, "DRAM") + head_xfer_descs = agent.get_xfer_descs(head_xfer_desc, "DRAM") + + logger.info(f"[sender] Allocated: tail at 0x{tail_addr:x}, buffers at 0x{buffers_addr:x}, head at 0x{head_addr:x}") + + # Exchange metadata and descriptors + publish_agent_metadata(agent, "sender_meta") + publish_descriptors(agent, tail_xfer_descs, "sender_tail_desc") + publish_descriptors(agent, head_xfer_descs, "sender_head_desc") + + # Retrieve receiver's metadata and descriptors + remote_name = retrieve_agent_metadata(agent, "receiver_meta", role_name="sender") + if not remote_name: + return + + receiver_tail_descs = retrieve_descriptors(agent, "receiver_tail_desc") + receiver_buffers_descs = retrieve_descriptors(agent, "receiver_buffers_desc") + # Note: receiver_head_desc not needed by sender + + logger.info(f"[sender] Connected to {remote_name}") + + # Create xfer_handlers using prep_xfer_dlist + local_buffer_list = [(buffers_addr + i * BUFFER_SIZE, BUFFER_SIZE, 0) for i in range(NUM_BUFFERS)] + local_buffers_prep = agent.prep_xfer_dlist("NIXL_INIT_AGENT", local_buffer_list, "DRAM") + + remote_buffers_prep = agent.prep_xfer_dlist(remote_name, receiver_buffers_descs, "DRAM") + + tail_local_prep = agent.prep_xfer_dlist("NIXL_INIT_AGENT", [(tail_addr, 8, 0)], "DRAM") + tail_remote_prep = agent.prep_xfer_dlist(remote_name, receiver_tail_descs, "DRAM") + tail_xfer_handle = agent.make_prepped_xfer("WRITE", tail_local_prep, [0], tail_remote_prep, [0], b"TAIL_UPDATE") + + if not tail_xfer_handle or not local_buffers_prep or not remote_buffers_prep: + logger.error("[sender] Failed to create transfer handles") + return + + # Pre-create all buffer transfer handles (reuse them!) + buffer_xfer_handles = [] + for i in range(NUM_BUFFERS): + handle = agent.make_prepped_xfer( + "WRITE", + local_buffers_prep, [i], + remote_buffers_prep, [i], + f"BUFFER_{i}".encode('utf-8') + ) + if not handle: + logger.error(f"[sender] Failed to create buffer handle {i}") + return + buffer_xfer_handles.append(handle) + + logger.info(f"[sender] Prepared {NUM_BUFFERS} buffer slots for transfers") + + np_buffer = np.zeros(BUFFER_SIZE, dtype=np.uint8) + np_header = np_buffer[:8].view(np.uint64) + np_payload = np_buffer[8:] + logger.info(f"[sender] Pre-allocated NumPy buffer ({BUFFER_SIZE / (1024 * 1024):.1f} MB)") + + # Init local Head and Tail to 0 (empty) + local_tail = 0 + write_uint64(tail_addr, 0) + write_uint64(head_addr, 0) + + # Transfer initial Tail value + state = agent.transfer(tail_xfer_handle) + if state == "ERR": + logger.error("[sender] Failed to transfer initial tail") + return + + # Wait for transfer to complete + while agent.check_xfer_state(tail_xfer_handle) != "DONE": + time.sleep(0.001) + + logger.info("[sender] Initialized, starting main loop") + + # Main loop + transfers_sent = 0 + + # Performance tracking + start_time = time.time() + first_transfer_time = None + + while transfers_sent < NUM_TRANSFERS: + # Read local head (receiver writes here) + remote_head = read_uint64(head_addr) + + # Check if not full: (tail + 1) % NUM_BUFFERS != head + next_tail = (local_tail + 1) % NUM_BUFFERS + if next_tail != remote_head: + # Prepare data in local buffer + buffer_idx = local_tail % NUM_BUFFERS + buffer_offset = buffers_addr + (buffer_idx * BUFFER_SIZE) + + # Prepare data using NumPy arrays + np_header[0] = transfers_sent # Write transfer ID directly + np_payload.fill(transfers_sent % 256) # Fill payload + write_data(buffer_offset, np_buffer) # Write entire NumPy buffer + + # Track first transfer time + if first_transfer_time is None: + first_transfer_time = time.time() + + # Transfer buffer using prepped transfer + buffer_xfer_handle = buffer_xfer_handles[buffer_idx] + state = agent.transfer(buffer_xfer_handle) + + if state == "ERR": + logger.error("[sender] Transfer buffer failed") + break + + # Wait for buffer transfer to complete + while agent.check_xfer_state(buffer_xfer_handle) != "DONE": + time.sleep(0.0001) + + # Update tail + local_tail = (local_tail + 1) % NUM_BUFFERS + write_uint64(tail_addr, local_tail) + + # Transfer tail + state = agent.transfer(tail_xfer_handle) + + if state == "ERR": + logger.error("[sender] Transfer tail failed") + break + + # Wait for tail transfer to complete before next iteration + while agent.check_xfer_state(tail_xfer_handle) != "DONE": + time.sleep(0.0001) + + transfers_sent += 1 + + if transfers_sent % 10 == 0: + logger.info(f"[sender] Sent {transfers_sent}/{NUM_TRANSFERS}") + else: + # Queue is full, wait + time.sleep(0.0001) + + end_time = time.time() + + # Calculate performance metrics + total_time = end_time - start_time + if first_transfer_time: + actual_transfer_time = end_time - first_transfer_time + else: + actual_transfer_time = total_time + + total_bytes = transfers_sent * BUFFER_SIZE + bandwidth_mbps = (total_bytes / actual_transfer_time) / (1024 * 1024) if actual_transfer_time > 0 else 0 + + logger.info(f"[sender] Completed {transfers_sent} transfers in {actual_transfer_time:.3f}s") + logger.info(f"[sender] Bandwidth: {bandwidth_mbps:.2f} MB/s") + + # Cleanup + agent.release_xfer_handle(tail_xfer_handle) + # Release pre-created buffer handles + for handle in buffer_xfer_handles: + agent.release_xfer_handle(handle) + agent.release_dlist_handle(local_buffers_prep) + agent.release_dlist_handle(remote_buffers_prep) + agent.release_dlist_handle(tail_local_prep) + agent.release_dlist_handle(tail_remote_prep) + agent.deregister_memory(tail_reg_descs) + agent.deregister_memory(buffers_reg_descs) + agent.deregister_memory(head_reg_descs) + nixl_utils.free_passthru(tail_addr) + nixl_utils.free_passthru(buffers_addr) + nixl_utils.free_passthru(head_addr) + + +def run_test(num_buffers, buffer_size, num_transfers): + """Run a single test with given parameters""" + global NUM_BUFFERS, BUFFER_SIZE, NUM_TRANSFERS + NUM_BUFFERS = num_buffers + BUFFER_SIZE = buffer_size + NUM_TRANSFERS = num_transfers + + tcp_server.clear_metadata("127.0.0.1", 9998) + + receiver_proc = Process(target=receiver_process) + sender_proc = Process(target=sender_process) + + receiver_proc.start() + sender_proc.start() + + receiver_proc.join(timeout=15) + sender_proc.join(timeout=15) + + success = receiver_proc.exitcode == 0 and sender_proc.exitcode == 0 + + # Terminate if hanging + if receiver_proc.is_alive(): + receiver_proc.terminate() + if sender_proc.is_alive(): + sender_proc.terminate() + + return success + + +def main(): + if "NIXL_PLUGIN_DIR" not in os.environ: + logger.error("[main] NIXL_PLUGIN_DIR not set") + sys.exit(1) + + # Start TCP server + try: + tcp_server.start_server(9998) + time.sleep(0.2) + except OSError: + pass # Server may already be running + + logger.info(f"[main] Starting sender-receiver: queue_size={NUM_BUFFERS}, num_transfers={NUM_TRANSFERS}, buffer_size={BUFFER_SIZE}") + success = run_test(NUM_BUFFERS, BUFFER_SIZE, NUM_TRANSFERS) + if success: + logger.info("[main] ✓ Success!") + else: + logger.error("[main] ✗ Error") + + +if __name__ == "__main__": + main() diff --git a/examples/python/tcp_server.py b/examples/python/tcp_server.py new file mode 100755 index 000000000..3cc3aebf1 --- /dev/null +++ b/examples/python/tcp_server.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 + +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Simple TCP server for NIXL metadata exchange. + +Provides a lightweight key-value store for exchanging metadata between processes. +""" + +import json +import socket +from socketserver import StreamRequestHandler, ThreadingTCPServer +from threading import Lock, Thread + +from nixl.logging import get_logger + +logger = get_logger(__name__) + +# --- Server state --- +_metadata = {} +_lock = Lock() + + +# --- Request handler --- +class MetadataHandler(StreamRequestHandler): + def handle(self): + try: + line = self.rfile.readline().strip().decode() + request = json.loads(line) + + cmd = request.get("cmd") + + if cmd == "SET": + # Store metadata: {"cmd": "SET", "key": "...", "value": ...} + key = request.get("key") + value = request.get("value") + with _lock: + _metadata[key] = value + response = {"status": "OK"} + + elif cmd == "GET": + # Retrieve metadata: {"cmd": "GET", "key": "..."} + key = request.get("key") + with _lock: + value = _metadata.get(key) + if value is not None: + response = {"status": "OK", "value": value} + else: + response = {"status": "ERROR", "message": f"Key '{key}' not found"} + + elif cmd == "GET_ALL": + # Get all metadata: {"cmd": "GET_ALL"} + with _lock: + response = {"status": "OK", "data": dict(_metadata)} + + elif cmd == "DELETE": + # Delete metadata: {"cmd": "DELETE", "key": "..."} + key = request.get("key") + with _lock: + if key in _metadata: + del _metadata[key] + response = {"status": "OK"} + else: + response = {"status": "ERROR", "message": f"Key '{key}' not found"} + + elif cmd == "CLEAR": + # Clear all metadata: {"cmd": "CLEAR"} + with _lock: + _metadata.clear() + response = {"status": "OK"} + + else: + response = {"status": "ERROR", "message": f"Unknown command: {cmd}"} + + self.wfile.write((json.dumps(response) + "\n").encode()) + + except json.JSONDecodeError: + error = {"status": "ERROR", "message": "Invalid JSON"} + self.wfile.write((json.dumps(error) + "\n").encode()) + except Exception as e: + error = {"status": "ERROR", "message": str(e)} + self.wfile.write((json.dumps(error) + "\n").encode()) + + +# --- TCPServer subclass to reuse port immediately --- +class ReusableTCPServer(ThreadingTCPServer): + allow_reuse_address = True + + +# --- Singleton server instance --- +_server_instance = None +_server_lock = Lock() + + +def start_server(port=9998): + """Start the metadata server (singleton pattern)""" + global _server_instance + with _server_lock: + if _server_instance is None: + try: + _server_instance = ReusableTCPServer(("0.0.0.0", port), MetadataHandler) + Thread(target=_server_instance.serve_forever, daemon=True).start() + return True + except OSError: + # Server already running (possibly from another process) + return False + return False + + +# --- Client API --- + + +def set_metadata(key, value, server="127.0.0.1", port=9998): + """Set a metadata key-value pair""" + s = socket.create_connection((server, port)) + request = {"cmd": "SET", "key": key, "value": value} + s.sendall((json.dumps(request) + "\n").encode()) + response = json.loads(s.recv(4096).decode().strip()) + s.close() + return response.get("status") == "OK" + + +def get_metadata(key, server="127.0.0.1", port=9998): + """Get a metadata value by key""" + s = socket.create_connection((server, port)) + request = {"cmd": "GET", "key": key} + s.sendall((json.dumps(request) + "\n").encode()) + response = json.loads(s.recv(4096).decode().strip()) + s.close() + if response.get("status") == "OK": + return response.get("value") + return None + + +def get_all_metadata(server="127.0.0.1", port=9998): + """Get all metadata""" + s = socket.create_connection((server, port)) + request = {"cmd": "GET_ALL"} + s.sendall((json.dumps(request) + "\n").encode()) + response = json.loads(s.recv(4096).decode().strip()) + s.close() + if response.get("status") == "OK": + return response.get("data", {}) + return {} + + +def delete_metadata(key, server="127.0.0.1", port=9998): + """Delete a metadata key""" + s = socket.create_connection((server, port)) + request = {"cmd": "DELETE", "key": key} + s.sendall((json.dumps(request) + "\n").encode()) + response = json.loads(s.recv(4096).decode().strip()) + s.close() + return response.get("status") == "OK" + + +def clear_metadata(server="127.0.0.1", port=9998): + """Clear all metadata""" + s = socket.create_connection((server, port)) + request = {"cmd": "CLEAR"} + s.sendall((json.dumps(request) + "\n").encode()) + response = json.loads(s.recv(4096).decode().strip()) + s.close() + return response.get("status") == "OK"