Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,7 @@ generated-values.yaml
TensorRT-LLM

# Local build artifacts for devcontainer
.build/
.build/

# Pytest
.coverage
195 changes: 74 additions & 121 deletions components/backends/vllm/src/dynamo/vllm/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,30 @@
# SPDX-License-Identifier: Apache-2.0


import asyncio
import json
import logging
import os
import socket
import sys
import time
from typing import Optional

from vllm.config import KVTransferConfig
from vllm.distributed.kv_events import KVEventsConfig
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.utils import FlexibleArgumentParser

from .ports import (
DEFAULT_DYNAMO_PORT_MAX,
DEFAULT_DYNAMO_PORT_MIN,
DynamoPortRange,
EtcdContext,
PortAllocationRequest,
PortMetadata,
allocate_and_reserve_port,
allocate_and_reserve_port_block,
get_host_ip,
)

logger = logging.getLogger(__name__)

# Only used if you run it manually from the command line
DEFAULT_ENDPOINT = "dyn://dynamo.backend.generate"
DEFAULT_MODEL = "Qwen/Qwen3-0.6B"

Expand All @@ -34,6 +41,7 @@ class Config:
migration_limit: int = 0
kv_port: Optional[int] = None
side_channel_port: Optional[int] = None
port_range: DynamoPortRange

# mirror vLLM
model: str
Expand Down Expand Up @@ -64,6 +72,18 @@ def parse_args() -> Config:
default=0,
help="Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine.",
)
parser.add_argument(
"--dynamo-port-min",
type=int,
default=DEFAULT_DYNAMO_PORT_MIN,
help=f"Minimum port number for Dynamo services (default: {DEFAULT_DYNAMO_PORT_MIN}). Must be in registered ports range (1024-49151).",
)
parser.add_argument(
"--dynamo-port-max",
type=int,
default=DEFAULT_DYNAMO_PORT_MAX,
help=f"Maximum port number for Dynamo services (default: {DEFAULT_DYNAMO_PORT_MAX}). Must be in registered ports range (1024-49151).",
)

parser = AsyncEngineArgs.add_cli_args(parser)
args = parser.parse_args()
Expand Down Expand Up @@ -110,6 +130,9 @@ def parse_args() -> Config:
config.engine_args = engine_args
config.is_prefill_worker = args.is_prefill_worker
config.migration_limit = args.migration_limit
config.port_range = DynamoPortRange(
min=args.dynamo_port_min, max=args.dynamo_port_max
)

if config.engine_args.block_size is None:
config.engine_args.block_size = 16
Expand All @@ -120,106 +143,66 @@ def parse_args() -> Config:
return config


async def allocate_and_reserve_port(
namespace,
etcd_client,
worker_id: str,
reason: str,
max_attempts: int = 100,
) -> int:
"""
Get an OS-assigned port and atomically reserve it in ETCD.
Retries until successful or max_attempts reached.

Args:
max_attempts: Maximum number of ports to try (default: 100)

Raises:
RuntimeError: If unable to reserve a port within max_attempts
OSError: If unable to create sockets (system resource issues)
"""

node_name = socket.gethostname()
try:
node_ip = socket.gethostbyname(node_name)
except socket.gaierror:
# If hostname cannot be resolved, fall back to localhost
logger.warning(
f"Hostname '{node_name}' cannot be resolved, falling back to '127.0.0.1'"
)
node_ip = "127.0.0.1"

for attempt in range(1, max_attempts + 1):
# Hold socket open just long enough to reserve in ETCD
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", 0))
port = sock.getsockname()[1]

# Reserve in ETCD while holding the socket
key = f"dyn://{namespace}/ports/{node_ip}/{port}"
value = {
"worker_id": worker_id,
"reason": reason,
"reserved_at": time.time(),
"pid": os.getpid(),
}

try:
await etcd_client.kv_create(
key=key,
value=json.dumps(value).encode(),
lease_id=etcd_client.primary_lease_id(),
)
logger.debug(f"Reserved OS-assigned port {port} for {worker_id}")
return port

except Exception as e:
logger.debug(
f"Port {port} on {node_name} was already reserved (attempt {attempt}): {e}"
)

if attempt < max_attempts:
await asyncio.sleep(0.01)

raise RuntimeError(
f"Failed to allocate and reserve a port after {max_attempts} attempts"
)


async def configure_ports_with_etcd(config: Config, etcd_client):
"""Configure all settings that require ETCD, including port allocation and vLLM overrides."""

# First, allocate ports
etcd_context = EtcdContext(client=etcd_client, namespace=config.namespace)

dp_rank = config.engine_args.data_parallel_rank or 0
worker_id = f"vllm-{config.component}-dp{dp_rank}"

# Allocate KV events port
kv_port = await allocate_and_reserve_port(
namespace=config.namespace,
etcd_client=etcd_client,
worker_id=f"{worker_id}",
reason="zmq_kv_event_port",
if config.engine_args.enable_prefix_caching:
kv_metadata = PortMetadata(worker_id=worker_id, reason="zmq_kv_event_port")
kv_port = await allocate_and_reserve_port(
etcd_context=etcd_context,
metadata=kv_metadata,
port_range=config.port_range,
)
config.kv_port = kv_port
logger.info(f"Allocated ZMQ KV events port: {kv_port} (worker_id={worker_id})")

# Allocate side channel ports
# https://github.com/vllm-project/vllm/blob/releases/v0.10.0/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py#L372
# NIXL calculates ports as: base_port + (dp_rank * tp_size) + tp_rank
# For dp_rank, we need to reserve tp_size consecutive ports
tp_size = config.engine_args.tensor_parallel_size or 1

# The first port for this dp_rank will be at: base_port + (dp_rank * tp_size)
# We need to allocate tp_size consecutive ports starting from there
nixl_metadata = PortMetadata(worker_id=worker_id, reason="nixl_side_channel_port")
nixl_request = PortAllocationRequest(
etcd_context=etcd_context,
metadata=nixl_metadata,
port_range=config.port_range,
block_size=tp_size,
)
allocated_ports = await allocate_and_reserve_port_block(nixl_request)
first_port_for_dp_rank = allocated_ports[0]

# Calculate the base port that NIXL expects
# base_port = first_port_for_dp_rank - (dp_rank * tp_size)
nixl_offset = dp_rank * tp_size
base_side_channel_port = first_port_for_dp_rank - nixl_offset

if base_side_channel_port < 0:
raise ValueError(
f"NIXL base port calculation resulted in negative port: "
f"first_allocated_port={first_port_for_dp_rank}, offset={nixl_offset}, "
f"base_port={base_side_channel_port}. Current range: {config.port_range.min}-{config.port_range.max}. "
f"Consider using a higher port range."
)

# Allocate side channel port
side_channel_port = await allocate_and_reserve_port(
namespace=config.namespace,
etcd_client=etcd_client,
worker_id=f"{worker_id}",
reason="nixl_side_channel_port",
)
config.side_channel_port = base_side_channel_port

# Update config with allocated ports
config.kv_port = kv_port
config.side_channel_port = side_channel_port
logger.info(
f"Allocated NIXL side channel ports: base={base_side_channel_port}, "
f"allocated_ports={allocated_ports} (worker_id={worker_id}, dp_rank={dp_rank}, tp_size={tp_size})"
)


def overwrite_args(config):
"""Set vLLM defaults for Dynamo."""
assert (
config.kv_port is not None
), "Must set the kv_port, use configure_ports_with_etcd"
assert (
config.side_channel_port is not None
), "Must set the kv_port, use configure_ports_with_etcd"
Comment on lines 206 to 208
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix outdated comment referencing kv_port.

The assertion only checks side_channel_port, but the comment still mentions kv_port.

-    ), "Must set the kv_port, use configure_ports_with_etcd"
+    ), "Must set the side_channel_port, use configure_ports_with_etcd"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
assert (
config.side_channel_port is not None
), "Must set the kv_port, use configure_ports_with_etcd"
assert (
config.side_channel_port is not None
), "Must set the side_channel_port, use configure_ports_with_etcd"
🤖 Prompt for AI Agents
In components/backends/vllm/src/dynamo/vllm/args.py around lines 206 to 208,
update the assertion error message to correctly reference side_channel_port
instead of kv_port. Change the message to clearly indicate that
side_channel_port must be set and suggest using configure_ports_with_etcd if
needed, ensuring the comment matches the actual assertion condition.

Expand Down Expand Up @@ -263,36 +246,6 @@ def overwrite_args(config):
raise ValueError(f"{key} not found in AsyncEngineArgs from vLLM.")


def get_host_ip() -> str:
"""Get the IP address of the host.
This is needed for the side channel to work in multi-node deployments.
"""
try:
host_name = socket.gethostname()
except socket.error as e:
logger.warning(f"Failed to get hostname: {e}, falling back to '127.0.0.1'")
return "127.0.0.1"
else:
try:
# Get the IP address of the hostname - this is needed for the side channel to work in multi-node deployments
host_ip = socket.gethostbyname(host_name)
# Test if the IP is actually usable by binding to it
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket:
test_socket.bind((host_ip, 0))
return host_ip
except socket.gaierror as e:
logger.warning(
f"Hostname '{host_name}' cannot be resolved: {e}, falling back to '127.0.0.1'"
)
return "127.0.0.1"
except socket.error as e:
# If hostname is not usable for binding, fall back to localhost
logger.warning(
f"Hostname '{host_name}' is not usable for binding: {e}, falling back to '127.0.0.1'"
)
return "127.0.0.1"


def set_side_channel_host_and_port(config: Config):
"""vLLM V1 NixlConnector creates a side channel to exchange metadata with other NIXL connectors.
This sets the port number for the side channel.
Expand Down
Loading
Loading