Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions components/backends/vllm/src/dynamo/vllm/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
from vllm.utils import FlexibleArgumentParser

from dynamo._core import get_reasoning_parser_names, get_tool_parser_names
from dynamo.runtime import DistributedRuntime

from . import __version__
from .ports import (
DEFAULT_DYNAMO_PORT_MAX,
DEFAULT_DYNAMO_PORT_MIN,
DynamoPortRange,
EtcdContext,
PortAllocationRequest,
PortMetadata,
allocate_and_reserve_port,
Expand Down Expand Up @@ -195,10 +195,8 @@ def parse_args() -> Config:
return config


async def configure_ports_with_etcd(config: Config, etcd_client):
"""Configure all settings that require ETCD, including port allocation and vLLM overrides."""

etcd_context = EtcdContext(client=etcd_client, namespace=config.namespace)
async def configure_ports(runtime: DistributedRuntime, config: Config):
"""Configure including port allocation and vLLM overrides."""

dp_rank = config.engine_args.data_parallel_rank or 0
worker_id = f"vllm-{config.component}-dp{dp_rank}"
Expand All @@ -207,7 +205,8 @@ async def configure_ports_with_etcd(config: Config, etcd_client):
if config.engine_args.enable_prefix_caching:
kv_metadata = PortMetadata(worker_id=worker_id, reason="zmq_kv_event_port")
kv_port = await allocate_and_reserve_port(
etcd_context=etcd_context,
runtime=runtime,
namespace=config.namespace,
metadata=kv_metadata,
port_range=config.port_range,
)
Expand All @@ -230,12 +229,13 @@ async def configure_ports_with_etcd(config: Config, etcd_client):
worker_id=worker_id, reason="nixl_side_channel_port"
)
nixl_request = PortAllocationRequest(
etcd_context=etcd_context,
metadata=nixl_metadata,
port_range=config.port_range,
block_size=tp_size,
)
allocated_ports = await allocate_and_reserve_port_block(nixl_request)
allocated_ports = await allocate_and_reserve_port_block(
runtime, config.namespace, nixl_request
)
first_port_for_dp_rank = allocated_ports[0]

# Calculate the base port that NIXL expects
Expand Down Expand Up @@ -273,7 +273,7 @@ def create_kv_events_config(config: Config) -> Optional[KVEventsConfig]:
logger.info("Creating Dynamo default kv_events_config for prefix caching")
if config.kv_port is None:
raise ValueError(
"config.kv_port is not set; call configure_ports_with_etcd(...) before overwrite_args "
"config.kv_port is not set; call configure_ports(...) before overwrite_args "
"or provide --kv-event-config to supply an explicit endpoint."
)
dp_rank = config.engine_args.data_parallel_rank or 0
Expand Down
13 changes: 3 additions & 10 deletions components/backends/vllm/src/dynamo/vllm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,7 @@
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging

from .args import (
ENABLE_LMCACHE,
Config,
configure_ports_with_etcd,
overwrite_args,
parse_args,
)
from .args import ENABLE_LMCACHE, Config, configure_ports, overwrite_args, parse_args
from .handlers import DecodeWorkerHandler, PrefillWorkerHandler
from .health_check import VllmHealthCheckPayload
from .publisher import StatLoggerFactory
Expand Down Expand Up @@ -69,8 +63,7 @@ async def graceful_shutdown(runtime):
async def worker(runtime: DistributedRuntime):
config = parse_args()

etcd_client = runtime.do_not_use_etcd_client()
await configure_ports_with_etcd(config, etcd_client)
await configure_ports(runtime, config)
overwrite_args(config)

# Set up signal handler for graceful shutdown
Expand Down Expand Up @@ -208,7 +201,7 @@ async def init(runtime: DistributedRuntime, config: Config):
config, factory
)

# TODO Hack to get data, move this to registering in ETCD
# TODO Hack to get data, move this to registering in TBD
factory.set_num_gpu_blocks_all(vllm_config.cache_config.num_gpu_blocks)
factory.set_request_total_slots_all(vllm_config.scheduler_config.max_num_seqs)
factory.init_publish()
Expand Down
200 changes: 37 additions & 163 deletions components/backends/vllm/src/dynamo/vllm/ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@

"""Port allocation and management utilities for Dynamo services."""

import asyncio
import json
import logging
import os
import random
import socket
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from dataclasses import dataclass

from dynamo.runtime import EtcdKvCache
from dynamo.runtime import DistributedRuntime

logger = logging.getLogger(__name__)

Expand All @@ -40,77 +37,31 @@ def __post_init__(self):
)


@dataclass
class EtcdContext:
"""Context for ETCD operations"""

client: EtcdKvCache # etcd client instance
namespace: str # Namespace for keys (used in key prefix)

def make_port_key(self, port: int) -> str:
"""Generate ETCD key for a port reservation"""
node_ip = get_host_ip()
return f"dyn://{self.namespace}/ports/{node_ip}/{port}"


@dataclass
class PortMetadata:
"""Metadata to store with port reservations in ETCD"""
"""Metadata to store with port reservations"""

worker_id: str # Worker identifier (e.g., "vllm-backend-dp0")
reason: str # Purpose of the port (e.g., "nixl_side_channel_port")
block_info: dict = field(default_factory=dict) # Optional block allocation info

def to_etcd_value(self) -> dict:
"""Convert to dictionary for ETCD storage"""
value = {
"worker_id": self.worker_id,
"reason": self.reason,
"reserved_at": time.time(),
"pid": os.getpid(),
}
if self.block_info:
value.update(self.block_info)
return value


@dataclass
class PortAllocationRequest:
"""Parameters for port allocation"""

etcd_context: EtcdContext
metadata: PortMetadata
port_range: DynamoPortRange
block_size: int = 1
max_attempts: int = 100


@contextmanager
def hold_ports(ports: int | list[int]):
"""Context manager to hold port binding(s).

Holds socket bindings to ensure exclusive access to ports during reservation.
Can handle a single port or multiple ports.

Args:
ports: Single port number or list of port numbers to hold
"""
if isinstance(ports, int):
ports = [ports]

sockets = []
try:
for port in ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", port))
sockets.append(sock)

yield

finally:
for sock in sockets:
sock.close()
def __post_init__(self):
if self.block_size < 1:
raise ValueError("block_size must be >= 1")
range_len = self.port_range.max - self.port_range.min + 1
if self.block_size > range_len:
raise ValueError(
f"block_size {self.block_size} exceeds range length {range_len} "
f"({self.port_range.min}-{self.port_range.max})"
)


def check_port_available(port: int) -> bool:
Expand All @@ -123,140 +74,63 @@ def check_port_available(port: int) -> bool:
return False


async def reserve_port_in_etcd(
etcd_context: EtcdContext,
port: int,
metadata: PortMetadata,
) -> None:
"""Reserve a single port in ETCD."""
key = etcd_context.make_port_key(port)
value = metadata.to_etcd_value()

await etcd_context.client.kv_create(
key=key,
value=json.dumps(value).encode(),
lease_id=etcd_context.client.primary_lease_id(),
)


async def allocate_and_reserve_port_block(request: PortAllocationRequest) -> list[int]:
async def allocate_and_reserve_port_block(
runtime: DistributedRuntime, namespace: str, request: PortAllocationRequest
) -> list[int]:
"""
Allocate a contiguous block of ports from the specified range and atomically reserve them in ETCD.
Allocate a contiguous block of ports from the specified range and atomically reserve them.
Returns a list of all allocated ports in order.

This function uses a context manager to hold port bindings while reserving in ETCD,
preventing race conditions between multiple processes.

Args:
request: PortAllocationRequest containing all allocation parameters

Returns:
list[int]: List of all allocated ports in ascending order

Raises:
RuntimeError: If unable to reserve a port block within max_attempts
OSError: If unable to create sockets (system resource issues)
"""
# Create a list of valid starting ports (must have room for the entire block)
max_start_port = request.port_range.max - request.block_size + 1
if max_start_port < request.port_range.min:
raise ValueError(
f"Port range {request.port_range.min}-{request.port_range.max} is too small for block size {request.block_size}"
)

available_start_ports = list(range(request.port_range.min, max_start_port + 1))
random.shuffle(available_start_ports)

actual_max_attempts = min(len(available_start_ports), request.max_attempts)

for attempt in range(1, actual_max_attempts + 1):
start_port = available_start_ports[attempt - 1]
ports_to_reserve = list(range(start_port, start_port + request.block_size))

try:
# Try to bind to all ports in the block atomically
with hold_ports(ports_to_reserve):
logger.debug(
f"Successfully bound to ports {ports_to_reserve}, now reserving in ETCD"
)

# We have exclusive access to these ports, now reserve them in ETCD
for i, port in enumerate(ports_to_reserve):
port_metadata = PortMetadata(
worker_id=f"{request.metadata.worker_id}-{i}"
if request.block_size > 1
else request.metadata.worker_id,
reason=request.metadata.reason,
block_info={
"block_index": i,
"block_size": request.block_size,
"block_start": start_port,
}
if request.block_size > 1
else {},
)

await reserve_port_in_etcd(
etcd_context=request.etcd_context,
port=port,
metadata=port_metadata,
)

logger.debug(
f"Reserved port block {ports_to_reserve} from range {request.port_range.min}-{request.port_range.max} "
f"for {request.metadata.worker_id} (block_size={request.block_size})"
)
return ports_to_reserve

except OSError as e:
logger.debug(
f"Failed to bind to port block starting at {start_port} (attempt {attempt}): {e}"
)
except Exception as e:
logger.debug(
f"Failed to reserve port block starting at {start_port} in ETCD (attempt {attempt}): {e}"
)

if attempt < actual_max_attempts:
await asyncio.sleep(0.01)

raise RuntimeError(
f"Failed to allocate and reserve a port block of size {request.block_size} from range "
f"{request.port_range.min}-{request.port_range.max} after {actual_max_attempts} attempts"
context_json = {
"worker_id": str(request.metadata.worker_id),
"reason": request.metadata.reason,
"reserved_at": time.time(),
"pid": os.getpid(),
"block_size": request.block_size,
}

return await runtime.allocate_port_block(
namespace,
request.port_range.min,
request.port_range.max,
request.block_size,
json.dumps(context_json),
)


async def allocate_and_reserve_port(
etcd_context: EtcdContext,
runtime: DistributedRuntime,
namespace: str,
metadata: PortMetadata,
port_range: DynamoPortRange,
max_attempts: int = 100,
) -> int:
"""
Allocate a port from the specified range and atomically reserve it in ETCD.
Allocate a port from the specified range and atomically reserve it.
This is a convenience wrapper around allocate_and_reserve_port_block with block_size=1.

Args:
etcd_context: ETCD context for operations
metadata: Port metadata for ETCD storage
metadata: Port metadata / context
port_range: DynamoPortRange object specifying min and max ports to try
max_attempts: Maximum number of ports to try (default: 100)

Returns:
int: The allocated port number

Raises:
RuntimeError: If unable to reserve a port within max_attempts
OSError: If unable to create sockets (system resource issues)
"""
request = PortAllocationRequest(
etcd_context=etcd_context,
metadata=metadata,
port_range=port_range,
block_size=1,
max_attempts=max_attempts,
)
allocated_ports = await allocate_and_reserve_port_block(request)
allocated_ports = await allocate_and_reserve_port_block(runtime, namespace, request)
if not allocated_ports:
raise RuntimeError("Failed to allocate required ports")
return allocated_ports[0] # Return the single allocated port


Expand Down
Loading
Loading