Skip to content

Commit b86eebc

Browse files
committed
PYTHON: new remote storage example
Signed-off-by: Timothy Stamler <[email protected]>
1 parent 223d1ea commit b86eebc

File tree

6 files changed

+512
-0
lines changed

6 files changed

+512
-0
lines changed
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# NIXL Storage Transfer Sample
2+
3+
A high-performance storage transfer system built on NIXL (NVIDIA Inference Xfer Library) that demonstrates local and remote storage operations using POSIX and GDS (GPU Direct Storage) backends.
4+
5+
## Features
6+
7+
- **Flexible Storage Backends**
8+
- GDS (GPU Direct Storage) support for high-performance transfers
9+
- POSIX fallback for standard storage operations
10+
- Automatic backend selection based on availability
11+
12+
- **Transfer Modes**
13+
- Local memory-to-storage transfers
14+
- Remote memory-to-storage transfers
15+
- Bidirectional operations (READ/WRITE)
16+
- Batch processing support
17+
18+
- **Network Communication**
19+
- UCX-based data transfer
20+
- Metadata exchange between nodes
21+
- Asynchronous notification system
22+
23+
## Project Structure
24+
25+
- `nixl_storage_utils/` - Core module providing storage utilities
26+
- `__init__.py` - Module initialization and exports
27+
- `common.py` - Common utilities and shared functionality
28+
- `nixl_p2p_storage_example.py` - Main application that can run as either initiator or target
29+
30+
## Requirements
31+
32+
- Python 3.6+
33+
- NIXL library with the following plugins:
34+
- GDS (optional)
35+
- POSIX
36+
- UCX
37+
38+
## Usage
39+
40+
The system operates in two modes: client and server.
41+
42+
The servers wait for requests from clients so it can READ/WRITE from its storage to a remote node.
43+
44+
The initiator initiates transfers and can perform both local and remote operations with storage servers.
45+
46+
### Running as Client
47+
48+
```bash
49+
python nixl_p2p_storage_example.py --role client \
50+
--agents_file <file path> \
51+
--fileprefix <path_prefix> \
52+
--agent_name <name> \
53+
[--buf_size <size>] \
54+
[--batch_size <count>]
55+
```
56+
57+
Role specifies client or server. The agents file is a list of storage servers you want the client to connect to.
58+
59+
The agents file should have agents separated by line, with "<agent name> <ip address> <port>" on each line.
60+
61+
File prefix lets you specify a path to run local storage transfers on.
62+
63+
Agent name is the name you want to give the NIXL agent on this client.
64+
65+
You can optionally specifically buf_size and batch_size to change how much data is transferred.
66+
67+
### Running as Server
68+
69+
```bash
70+
python nixl_p2p_storage_example.py --role server \
71+
--fileprefix <path_prefix> \
72+
--agent_name <name> \
73+
[--buf_size <size>] \
74+
[--batch_size <count>]
75+
```
76+
77+
Parameters are same as before, but names must match what is in the client agents file.
78+
79+
Additionally, buf_size and batch_size must match what the client specifies.
80+
81+
## Architecture
82+
83+
![Client/Server Interaction](client_server_diagram.png)
84+
85+
### Storage Module
86+
87+
The `nixl_storage_utils` module provides core functionality:
88+
- Agent creation and plugin management
89+
- Memory and file resource handling
90+
- Transfer state monitoring
91+
- Common utilities and configuration
92+
93+
### Storage Backends
94+
95+
The system automatically selects the best available storage backend:
96+
1. Prefers GDS when available for high-performance GPU-direct storage operations
97+
2. Falls back to POSIX when GDS is unavailable
98+
3. Requires at least one storage backend to operate
99+
100+
### Transfer Process
101+
102+
#### Local Transfers
103+
1. Register memory and file descriptors
104+
2. Perform direct memory-to-storage transfers
105+
3. Support both read and write operations
106+
107+
#### Remote Transfers
108+
1. Initiator sends memory descriptors to target
109+
2. Target performs storage-to-memory or memory-to-storage operations
110+
3. Data is transferred between initiator and target memory
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
target1 127.0.0.1 8888
204 KB
Loading
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""
17+
NIXL Peer-to-Peer Storage Example
18+
Demonstrates peer-to-peer storage transfers using NIXL with initiator and target modes.
19+
"""
20+
21+
import time
22+
23+
import nixl_storage_utils as nsu
24+
25+
from nixl.logging import get_logger
26+
27+
logger = get_logger(__name__)
28+
29+
30+
def execute_transfer(my_agent, local_descs, remote_descs, remote_name, operation):
31+
handle = my_agent.initialize_xfer(operation, local_descs, remote_descs, remote_name)
32+
my_agent.transfer(handle)
33+
nsu.wait_for_transfer(my_agent, handle)
34+
my_agent.release_xfer_handle(handle)
35+
36+
37+
def remote_storage_transfer(my_agent, my_mem_descs, operation, remote_agent_name):
38+
"""Initiate remote memory transfer."""
39+
if operation != "READ" and operation != "WRITE":
40+
logger.error("Invalid operation, exiting")
41+
exit(-1)
42+
43+
if operation == "WRITE":
44+
operation = b"WRTE"
45+
else:
46+
operation = b"READ"
47+
48+
# Send the descriptors that you want to read into or write from
49+
logger.info(f"Sending {operation} request to {remote_agent_name}")
50+
test_descs_str = my_agent.get_serialized_descs(my_mem_descs)
51+
my_agent.send_notif(remote_agent_name, operation + test_descs_str)
52+
53+
while not my_agent.check_remote_xfer_done(remote_agent_name, b"COMPLETE"):
54+
continue
55+
56+
57+
def connect_to_agents(my_agent, agents_file):
58+
target_agents = []
59+
with open(agents_file, "r") as f:
60+
for line in f:
61+
# Each line in file should be: "<agent_name> <ip> <port>"
62+
parts = line.strip().split()
63+
if len(parts) == 3:
64+
target_agents.append(parts[0])
65+
my_agent.send_local_metadata(parts[1], int(parts[2]))
66+
my_agent.fetch_remote_metadata(parts[0], parts[1], int(parts[2]))
67+
68+
while my_agent.check_remote_metadata(parts[0]) is False:
69+
logger.info(f"Waiting for remote metadata for {parts[0]}...")
70+
time.sleep(0.2)
71+
72+
logger.info(f"Remote metadata for {parts[0]} fetched")
73+
else:
74+
logger.error(f"Invalid line in {agents_file}: {line}")
75+
exit(-1)
76+
77+
logger.info("All remote metadata fetched")
78+
79+
return target_agents
80+
81+
82+
def handle_remote_transfer_request(my_agent, my_mem_descs, my_file_descs):
83+
"""Handle remote memory and storage transfers as target."""
84+
# Wait for initiator to send list of memory descriptors
85+
notifs = my_agent.get_new_notifs()
86+
87+
logger.info("Waiting for a remote transfer request...")
88+
89+
while len(notifs) == 0:
90+
notifs = my_agent.get_new_notifs()
91+
92+
for req_agent in notifs:
93+
recv_msg = notifs[req_agent][0]
94+
95+
operation = None
96+
if recv_msg[:4] == b"READ":
97+
operation = "READ"
98+
elif recv_msg[:4] == b"WRTE":
99+
operation = "WRITE"
100+
else:
101+
logger.error("Invalid operation, exiting")
102+
exit(-1)
103+
104+
sent_descs = my_agent.deserialize_descs(recv_msg[4:])
105+
106+
logger.info("Checking to ensure metadata is loaded...")
107+
while my_agent.check_remote_metadata(req_agent, sent_descs) is False:
108+
continue
109+
110+
if operation == "READ":
111+
logger.info("Starting READ operation")
112+
113+
# Read from file first
114+
execute_transfer(
115+
my_agent, my_mem_descs, my_file_descs, my_agent.name, "READ"
116+
)
117+
# Send to client
118+
execute_transfer(my_agent, my_mem_descs, sent_descs, req_agent, "WRITE")
119+
120+
elif operation == "WRITE":
121+
logger.info("Starting WRITE operation")
122+
123+
# Read from client first
124+
execute_transfer(my_agent, my_mem_descs, sent_descs, req_agent, "READ")
125+
# Write to storage
126+
execute_transfer(
127+
my_agent, my_mem_descs, my_file_descs, my_agent.name, "WRITE"
128+
)
129+
130+
# Send completion notification to initiator
131+
my_agent.send_notif(req_agent, b"COMPLETE")
132+
133+
logger.info("One transfer test complete.")
134+
135+
136+
def run_client(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, agents_file):
137+
logger.info("Client initialized, ready for local transfer test...")
138+
139+
# For sample purposes, write to and then read from local storage
140+
logger.info("Starting local transfer test...")
141+
execute_transfer(
142+
my_agent,
143+
nixl_mem_reg_descs.trim(),
144+
nixl_file_reg_descs.trim(),
145+
my_agent.name,
146+
"WRITE",
147+
)
148+
execute_transfer(
149+
my_agent,
150+
nixl_mem_reg_descs.trim(),
151+
nixl_file_reg_descs.trim(),
152+
my_agent.name,
153+
"READ",
154+
)
155+
logger.info("Local transfer test complete")
156+
157+
logger.info("Starting remote transfer test...")
158+
159+
target_agents = connect_to_agents(my_agent, agents_file)
160+
161+
# For sample purposes, write to and then read from each target agent
162+
for target_agent in target_agents:
163+
remote_storage_transfer(
164+
my_agent, nixl_mem_reg_descs.trim(), "WRITE", target_agent
165+
)
166+
remote_storage_transfer(
167+
my_agent, nixl_mem_reg_descs.trim(), "READ", target_agent
168+
)
169+
170+
logger.info("Remote transfer test complete")
171+
172+
173+
def run_storage_server(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs):
174+
logger.info("Server initialized, ready for remote transfer test...")
175+
while True:
176+
handle_remote_transfer_request(
177+
my_agent, nixl_mem_reg_descs.trim(), nixl_file_reg_descs.trim()
178+
)
179+
180+
181+
if __name__ == "__main__":
182+
parser = nsu.get_base_parser()
183+
parser.add_argument(
184+
"--role",
185+
type=str,
186+
choices=["server", "client"],
187+
required=True,
188+
help="Role of this node (server or client)",
189+
)
190+
parser.add_argument(
191+
"--port",
192+
type=int,
193+
default=5555,
194+
help="Port to listen on for remote transfers (only needed for server)",
195+
)
196+
parser.add_argument("--name", type=str, help="NIXL agent name")
197+
parser.add_argument(
198+
"--agents_file",
199+
type=str,
200+
help="File containing list of target agents (only needed for client)",
201+
)
202+
args = parser.parse_args()
203+
204+
my_agent = nsu.create_agent_with_plugins(args.name, args.port)
205+
206+
(
207+
my_mem_list,
208+
my_file_list,
209+
nixl_mem_reg_descs,
210+
nixl_file_reg_descs,
211+
) = nsu.setup_memory_and_files(
212+
my_agent, args.batch_size, args.buf_size, args.fileprefix
213+
)
214+
215+
if args.role == "client":
216+
if not args.agents_file:
217+
parser.error("--agents_file is required when role is client")
218+
try:
219+
run_client(
220+
my_agent, nixl_mem_reg_descs, nixl_file_reg_descs, args.agents_file
221+
)
222+
finally:
223+
nsu.cleanup_resources(
224+
my_agent,
225+
nixl_mem_reg_descs,
226+
nixl_file_reg_descs,
227+
my_mem_list,
228+
my_file_list,
229+
)
230+
else:
231+
if args.agents_file:
232+
logger.warning("Warning: --agents_file is ignored when role is server")
233+
try:
234+
run_storage_server(my_agent, nixl_mem_reg_descs, nixl_file_reg_descs)
235+
finally:
236+
nsu.cleanup_resources(
237+
my_agent,
238+
nixl_mem_reg_descs,
239+
nixl_file_reg_descs,
240+
my_mem_list,
241+
my_file_list,
242+
)
243+
244+
logger.info("Test Complete.")
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""
17+
NIXL Storage Utilities Module
18+
Provides utilities for high-performance storage transfers using NIXL.
19+
"""
20+
21+
from .common import (
22+
cleanup_resources,
23+
create_agent_with_plugins,
24+
get_base_parser,
25+
setup_memory_and_files,
26+
wait_for_transfer,
27+
)
28+
29+
__all__ = [
30+
"create_agent_with_plugins",
31+
"setup_memory_and_files",
32+
"cleanup_resources",
33+
"get_base_parser",
34+
"wait_for_transfer",
35+
]

0 commit comments

Comments
 (0)