Skip to content

Commit 3865a94

Browse files
authored
feat: Port vllm port allocator to Rust in bindings (#3125)
Signed-off-by: Graham King <[email protected]>
1 parent 19948b7 commit 3865a94

File tree

11 files changed

+263
-330
lines changed

11 files changed

+263
-330
lines changed

components/backends/vllm/src/dynamo/vllm/args.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212
from vllm.utils import FlexibleArgumentParser
1313

1414
from dynamo._core import get_reasoning_parser_names, get_tool_parser_names
15+
from dynamo.runtime import DistributedRuntime
1516

1617
from . import __version__
1718
from .ports import (
1819
DEFAULT_DYNAMO_PORT_MAX,
1920
DEFAULT_DYNAMO_PORT_MIN,
2021
DynamoPortRange,
21-
EtcdContext,
2222
PortAllocationRequest,
2323
PortMetadata,
2424
allocate_and_reserve_port,
@@ -195,10 +195,8 @@ def parse_args() -> Config:
195195
return config
196196

197197

198-
async def configure_ports_with_etcd(config: Config, etcd_client):
199-
"""Configure all settings that require ETCD, including port allocation and vLLM overrides."""
200-
201-
etcd_context = EtcdContext(client=etcd_client, namespace=config.namespace)
198+
async def configure_ports(runtime: DistributedRuntime, config: Config):
199+
"""Configure including port allocation and vLLM overrides."""
202200

203201
dp_rank = config.engine_args.data_parallel_rank or 0
204202
worker_id = f"vllm-{config.component}-dp{dp_rank}"
@@ -207,7 +205,8 @@ async def configure_ports_with_etcd(config: Config, etcd_client):
207205
if config.engine_args.enable_prefix_caching:
208206
kv_metadata = PortMetadata(worker_id=worker_id, reason="zmq_kv_event_port")
209207
kv_port = await allocate_and_reserve_port(
210-
etcd_context=etcd_context,
208+
runtime=runtime,
209+
namespace=config.namespace,
211210
metadata=kv_metadata,
212211
port_range=config.port_range,
213212
)
@@ -230,12 +229,13 @@ async def configure_ports_with_etcd(config: Config, etcd_client):
230229
worker_id=worker_id, reason="nixl_side_channel_port"
231230
)
232231
nixl_request = PortAllocationRequest(
233-
etcd_context=etcd_context,
234232
metadata=nixl_metadata,
235233
port_range=config.port_range,
236234
block_size=tp_size,
237235
)
238-
allocated_ports = await allocate_and_reserve_port_block(nixl_request)
236+
allocated_ports = await allocate_and_reserve_port_block(
237+
runtime, config.namespace, nixl_request
238+
)
239239
first_port_for_dp_rank = allocated_ports[0]
240240

241241
# Calculate the base port that NIXL expects
@@ -273,7 +273,7 @@ def create_kv_events_config(config: Config) -> Optional[KVEventsConfig]:
273273
logger.info("Creating Dynamo default kv_events_config for prefix caching")
274274
if config.kv_port is None:
275275
raise ValueError(
276-
"config.kv_port is not set; call configure_ports_with_etcd(...) before overwrite_args "
276+
"config.kv_port is not set; call configure_ports(...) before overwrite_args "
277277
"or provide --kv-event-config to supply an explicit endpoint."
278278
)
279279
dp_rank = config.engine_args.data_parallel_rank or 0

components/backends/vllm/src/dynamo/vllm/main.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,7 @@
2222
from dynamo.runtime import DistributedRuntime, dynamo_worker
2323
from dynamo.runtime.logging import configure_dynamo_logging
2424

25-
from .args import (
26-
ENABLE_LMCACHE,
27-
Config,
28-
configure_ports_with_etcd,
29-
overwrite_args,
30-
parse_args,
31-
)
25+
from .args import ENABLE_LMCACHE, Config, configure_ports, overwrite_args, parse_args
3226
from .handlers import DecodeWorkerHandler, PrefillWorkerHandler
3327
from .health_check import VllmHealthCheckPayload
3428
from .publisher import StatLoggerFactory
@@ -69,8 +63,7 @@ async def graceful_shutdown(runtime):
6963
async def worker(runtime: DistributedRuntime):
7064
config = parse_args()
7165

72-
etcd_client = runtime.do_not_use_etcd_client()
73-
await configure_ports_with_etcd(config, etcd_client)
66+
await configure_ports(runtime, config)
7467
overwrite_args(config)
7568

7669
# Set up signal handler for graceful shutdown
@@ -208,7 +201,7 @@ async def init(runtime: DistributedRuntime, config: Config):
208201
config, factory
209202
)
210203

211-
# TODO Hack to get data, move this to registering in ETCD
204+
# TODO Hack to get data, move this to registering in TBD
212205
factory.set_num_gpu_blocks_all(vllm_config.cache_config.num_gpu_blocks)
213206
factory.set_request_total_slots_all(vllm_config.scheduler_config.max_num_seqs)
214207
factory.init_publish()

components/backends/vllm/src/dynamo/vllm/ports.py

Lines changed: 37 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,14 @@
33

44
"""Port allocation and management utilities for Dynamo services."""
55

6-
import asyncio
76
import json
87
import logging
98
import os
10-
import random
119
import socket
1210
import time
13-
from contextlib import contextmanager
14-
from dataclasses import dataclass, field
11+
from dataclasses import dataclass
1512

16-
from dynamo.runtime import EtcdKvCache
13+
from dynamo.runtime import DistributedRuntime
1714

1815
logger = logging.getLogger(__name__)
1916

@@ -40,77 +37,31 @@ def __post_init__(self):
4037
)
4138

4239

43-
@dataclass
44-
class EtcdContext:
45-
"""Context for ETCD operations"""
46-
47-
client: EtcdKvCache # etcd client instance
48-
namespace: str # Namespace for keys (used in key prefix)
49-
50-
def make_port_key(self, port: int) -> str:
51-
"""Generate ETCD key for a port reservation"""
52-
node_ip = get_host_ip()
53-
return f"dyn://{self.namespace}/ports/{node_ip}/{port}"
54-
55-
5640
@dataclass
5741
class PortMetadata:
58-
"""Metadata to store with port reservations in ETCD"""
42+
"""Metadata to store with port reservations"""
5943

6044
worker_id: str # Worker identifier (e.g., "vllm-backend-dp0")
6145
reason: str # Purpose of the port (e.g., "nixl_side_channel_port")
62-
block_info: dict = field(default_factory=dict) # Optional block allocation info
63-
64-
def to_etcd_value(self) -> dict:
65-
"""Convert to dictionary for ETCD storage"""
66-
value = {
67-
"worker_id": self.worker_id,
68-
"reason": self.reason,
69-
"reserved_at": time.time(),
70-
"pid": os.getpid(),
71-
}
72-
if self.block_info:
73-
value.update(self.block_info)
74-
return value
7546

7647

7748
@dataclass
7849
class PortAllocationRequest:
7950
"""Parameters for port allocation"""
8051

81-
etcd_context: EtcdContext
8252
metadata: PortMetadata
8353
port_range: DynamoPortRange
8454
block_size: int = 1
85-
max_attempts: int = 100
86-
87-
88-
@contextmanager
89-
def hold_ports(ports: int | list[int]):
90-
"""Context manager to hold port binding(s).
91-
92-
Holds socket bindings to ensure exclusive access to ports during reservation.
93-
Can handle a single port or multiple ports.
94-
95-
Args:
96-
ports: Single port number or list of port numbers to hold
97-
"""
98-
if isinstance(ports, int):
99-
ports = [ports]
100-
101-
sockets = []
102-
try:
103-
for port in ports:
104-
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
105-
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
106-
sock.bind(("", port))
107-
sockets.append(sock)
10855

109-
yield
110-
111-
finally:
112-
for sock in sockets:
113-
sock.close()
56+
def __post_init__(self):
57+
if self.block_size < 1:
58+
raise ValueError("block_size must be >= 1")
59+
range_len = self.port_range.max - self.port_range.min + 1
60+
if self.block_size > range_len:
61+
raise ValueError(
62+
f"block_size {self.block_size} exceeds range length {range_len} "
63+
f"({self.port_range.min}-{self.port_range.max})"
64+
)
11465

11566

11667
def check_port_available(port: int) -> bool:
@@ -123,140 +74,63 @@ def check_port_available(port: int) -> bool:
12374
return False
12475

12576

126-
async def reserve_port_in_etcd(
127-
etcd_context: EtcdContext,
128-
port: int,
129-
metadata: PortMetadata,
130-
) -> None:
131-
"""Reserve a single port in ETCD."""
132-
key = etcd_context.make_port_key(port)
133-
value = metadata.to_etcd_value()
134-
135-
await etcd_context.client.kv_create(
136-
key=key,
137-
value=json.dumps(value).encode(),
138-
lease_id=etcd_context.client.primary_lease_id(),
139-
)
140-
141-
142-
async def allocate_and_reserve_port_block(request: PortAllocationRequest) -> list[int]:
77+
async def allocate_and_reserve_port_block(
78+
runtime: DistributedRuntime, namespace: str, request: PortAllocationRequest
79+
) -> list[int]:
14380
"""
144-
Allocate a contiguous block of ports from the specified range and atomically reserve them in ETCD.
81+
Allocate a contiguous block of ports from the specified range and atomically reserve them.
14582
Returns a list of all allocated ports in order.
14683
147-
This function uses a context manager to hold port bindings while reserving in ETCD,
148-
preventing race conditions between multiple processes.
149-
15084
Args:
15185
request: PortAllocationRequest containing all allocation parameters
15286
15387
Returns:
15488
list[int]: List of all allocated ports in ascending order
155-
156-
Raises:
157-
RuntimeError: If unable to reserve a port block within max_attempts
158-
OSError: If unable to create sockets (system resource issues)
15989
"""
16090
# Create a list of valid starting ports (must have room for the entire block)
161-
max_start_port = request.port_range.max - request.block_size + 1
162-
if max_start_port < request.port_range.min:
163-
raise ValueError(
164-
f"Port range {request.port_range.min}-{request.port_range.max} is too small for block size {request.block_size}"
165-
)
166-
167-
available_start_ports = list(range(request.port_range.min, max_start_port + 1))
168-
random.shuffle(available_start_ports)
169-
170-
actual_max_attempts = min(len(available_start_ports), request.max_attempts)
17191

172-
for attempt in range(1, actual_max_attempts + 1):
173-
start_port = available_start_ports[attempt - 1]
174-
ports_to_reserve = list(range(start_port, start_port + request.block_size))
175-
176-
try:
177-
# Try to bind to all ports in the block atomically
178-
with hold_ports(ports_to_reserve):
179-
logger.debug(
180-
f"Successfully bound to ports {ports_to_reserve}, now reserving in ETCD"
181-
)
182-
183-
# We have exclusive access to these ports, now reserve them in ETCD
184-
for i, port in enumerate(ports_to_reserve):
185-
port_metadata = PortMetadata(
186-
worker_id=f"{request.metadata.worker_id}-{i}"
187-
if request.block_size > 1
188-
else request.metadata.worker_id,
189-
reason=request.metadata.reason,
190-
block_info={
191-
"block_index": i,
192-
"block_size": request.block_size,
193-
"block_start": start_port,
194-
}
195-
if request.block_size > 1
196-
else {},
197-
)
198-
199-
await reserve_port_in_etcd(
200-
etcd_context=request.etcd_context,
201-
port=port,
202-
metadata=port_metadata,
203-
)
204-
205-
logger.debug(
206-
f"Reserved port block {ports_to_reserve} from range {request.port_range.min}-{request.port_range.max} "
207-
f"for {request.metadata.worker_id} (block_size={request.block_size})"
208-
)
209-
return ports_to_reserve
210-
211-
except OSError as e:
212-
logger.debug(
213-
f"Failed to bind to port block starting at {start_port} (attempt {attempt}): {e}"
214-
)
215-
except Exception as e:
216-
logger.debug(
217-
f"Failed to reserve port block starting at {start_port} in ETCD (attempt {attempt}): {e}"
218-
)
219-
220-
if attempt < actual_max_attempts:
221-
await asyncio.sleep(0.01)
222-
223-
raise RuntimeError(
224-
f"Failed to allocate and reserve a port block of size {request.block_size} from range "
225-
f"{request.port_range.min}-{request.port_range.max} after {actual_max_attempts} attempts"
92+
context_json = {
93+
"worker_id": str(request.metadata.worker_id),
94+
"reason": request.metadata.reason,
95+
"reserved_at": time.time(),
96+
"pid": os.getpid(),
97+
"block_size": request.block_size,
98+
}
99+
100+
return await runtime.allocate_port_block(
101+
namespace,
102+
request.port_range.min,
103+
request.port_range.max,
104+
request.block_size,
105+
json.dumps(context_json),
226106
)
227107

228108

229109
async def allocate_and_reserve_port(
230-
etcd_context: EtcdContext,
110+
runtime: DistributedRuntime,
111+
namespace: str,
231112
metadata: PortMetadata,
232113
port_range: DynamoPortRange,
233-
max_attempts: int = 100,
234114
) -> int:
235115
"""
236-
Allocate a port from the specified range and atomically reserve it in ETCD.
116+
Allocate a port from the specified range and atomically reserve it.
237117
This is a convenience wrapper around allocate_and_reserve_port_block with block_size=1.
238118
239119
Args:
240-
etcd_context: ETCD context for operations
241-
metadata: Port metadata for ETCD storage
120+
metadata: Port metadata / context
242121
port_range: DynamoPortRange object specifying min and max ports to try
243-
max_attempts: Maximum number of ports to try (default: 100)
244122
245123
Returns:
246124
int: The allocated port number
247-
248-
Raises:
249-
RuntimeError: If unable to reserve a port within max_attempts
250-
OSError: If unable to create sockets (system resource issues)
251125
"""
252126
request = PortAllocationRequest(
253-
etcd_context=etcd_context,
254127
metadata=metadata,
255128
port_range=port_range,
256129
block_size=1,
257-
max_attempts=max_attempts,
258130
)
259-
allocated_ports = await allocate_and_reserve_port_block(request)
131+
allocated_ports = await allocate_and_reserve_port_block(runtime, namespace, request)
132+
if not allocated_ports:
133+
raise RuntimeError("Failed to allocate required ports")
260134
return allocated_ports[0] # Return the single allocated port
261135

262136

0 commit comments

Comments
 (0)