Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,38 @@ def _get_unique_suffix(self, len: int = 6) -> str:
"""
return "".join(random.choices(string.ascii_letters + string.digits, k=len))

def _compute_port_offset(self) -> int:
"""Compute a deterministic port offset for this replica/process.

Priority:
1) data_parallel_rank if present (set by DPServer).
2) Stable hash of Serve replica tag (avoids cross-replica collisions when TP/PP only).

Returns:
A small non-negative integer to add to a base port.
"""
# Prefer explicit DP rank when available
dp_rank = self.llm_config.engine_kwargs.get("data_parallel_rank")
if isinstance(dp_rank, int) and dp_rank >= 0:
return dp_rank

# Fall back to a stable hash of the Serve replica tag if available
try:
# Import locally to avoid import-time side effects
from ray import serve # type: ignore

rc = serve.get_replica_context()
if rc and getattr(rc, "replica_tag", None):
import zlib

# Keep the offset bounded to avoid large jumps
return zlib.adler32(rc.replica_tag.encode("utf-8")) % 1024
except Exception:
# Best-effort fallback; avoid introducing failures in setup paths
pass

return 0

@abc.abstractmethod
def setup(self) -> None:
"""Setup the connector backend.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ def setup(self) -> None:
"""Initialize the LMCache connector backend.
This method sets up the LMCache connector by:
1. Checking if LMCache is installed.
2. Configuring the LMCache RPC port if not already set.
3. Creating a unique LMCache RPC port across replicas.
2. Configuring the LMCache RPC port name/value if not already set.
3. Creating a unique LMCache RPC port across replicas either by
appending a random suffix (default behavior for string port names),
or by adding a rank-based integer offset when a numeric base is provided.
Raises:
ImportError: If LMCache is not installed.
"""
Expand All @@ -41,21 +43,34 @@ def setup(self) -> None:
kv_connector_extra_config = self.kv_transfer_config[
LMCacheConnectorV1Backend.KV_CONNECTOR_EXTRA_CONFIG_FIELD_NAME
]
lmcache_rpc_port = (
kv_connector_extra_config.get(
LMCacheConnectorV1Backend.LMCACHE_RPC_PORT_FIELD_NAME,
LMCacheConnectorV1Backend.DEFAULT_LMCACHE_RPC_PORT_NAME,
)
+ self._get_unique_suffix()
# Determine the desired style of RPC port configuration.
# If user passes a numeric base (e.g., 50000), add a deterministic
# rank-based offset to avoid collisions across DP/TP/PP.
# Otherwise, default to string-based name + random suffix.
base_value = kv_connector_extra_config.get(
LMCacheConnectorV1Backend.LMCACHE_RPC_PORT_FIELD_NAME,
LMCacheConnectorV1Backend.DEFAULT_LMCACHE_RPC_PORT_NAME,
)
if (
LMCacheConnectorV1Backend.LMCACHE_RPC_PORT_FIELD_NAME
in kv_connector_extra_config
):

if isinstance(base_value, int):
# Numeric base; add rank-based offset and set as int
offset = self._compute_port_offset()
lmcache_rpc_port_value = int(base_value) + int(offset)
logger.info(
f"Setting unique {lmcache_rpc_port=} for current replica LMCacheConnectorV1."
f"Setting LMCache numeric rpc port base={base_value} offset={offset} value={lmcache_rpc_port_value}."
)
else:
# String name; append random suffix for uniqueness
base_str = str(base_value)
lmcache_rpc_port_value = base_str + self._get_unique_suffix()
if (
LMCacheConnectorV1Backend.LMCACHE_RPC_PORT_FIELD_NAME
in kv_connector_extra_config
):
logger.info(
f"Setting unique lmcache_rpc_port={lmcache_rpc_port_value} for current replica LMCacheConnectorV1."
)

kv_connector_extra_config[
LMCacheConnectorV1Backend.LMCACHE_RPC_PORT_FIELD_NAME
] = lmcache_rpc_port
] = lmcache_rpc_port_value
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ def _set_side_channel_port(self):
"NIXL_SIDE_CHANNEL_PORT_BASE", vllm_utils.get_open_port()
)
)
# If dp_rank is set, we should use the
# base port + dp_rank as the side channel port
# due to a potential ray condition for getting the free ports.
dp_rank = self.llm_config.engine_kwargs.get("data_parallel_rank", 0)
port = base_port + dp_rank
# Use a deterministic rank-based offset (DP rank if set; else replica hash)
port = base_port + self._compute_port_offset()
os.environ["VLLM_NIXL_SIDE_CHANNEL_PORT"] = str(port)

def _set_side_channel_host(self):
Expand Down