Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c461757
remote instance gloo comm
JD-ETH Jan 20, 2026
da95870
refactor design
JD-ETH Jan 23, 2026
8f54bc4
no longer global state
JD-ETH Feb 24, 2026
f5b3911
Merge upstream/main and resolve conflicts
JD-ETH Mar 7, 2026
99dbe44
Improve _sync_scheduler_infos_across_nodes: add retry logic, clean up
JD-ETH Mar 7, 2026
a67fa58
Reduce retry defaults and remove per-attempt warning
JD-ETH Mar 7, 2026
1107be1
Use 15-min TCPStore timeout instead of retry loop
JD-ETH Mar 7, 2026
99fcb85
test case pass
JD-ETH Mar 7, 2026
0a32f1b
format
JD-ETH Mar 7, 2026
d7e1093
update
JD-ETH Mar 14, 2026
fd4d45d
update
JD-ETH Mar 17, 2026
0ab2dcc
Merge branch 'feat/gloo-info-on-server-rank' of https://github.com/JD…
JD-ETH Mar 17, 2026
1af71d9
Merge branch 'main' of https://github.com/JD-ETH/sglang into feat/glo…
JD-ETH Mar 18, 2026
e134c30
Merge upstream/main and resolve http_server.py conflict
JD-ETH Mar 18, 2026
6d96652
Refactor _register_to_engine_info_bootstrap to reuse NetworkAddress
JD-ETH Mar 18, 2026
4b6bcd1
reorder server starts
JD-ETH Mar 18, 2026
eb8c886
update
JD-ETH Mar 18, 2026
f3e2ffd
bug fix
JD-ETH Mar 18, 2026
1d41b0f
update bootstrap
JD-ETH Mar 18, 2026
b5256f5
deterministic port and fastapi
JD-ETH Mar 19, 2026
5ccf4e7
refactor piping away
JD-ETH Mar 19, 2026
7600596
Merge remote-tracking branch 'upstream/main' into feat/gloo-info-on-s…
JD-ETH Mar 19, 2026
263e724
address comments
JD-ETH Mar 20, 2026
8ed1559
format
JD-ETH Mar 20, 2026
490da34
Merge branch 'feat/gloo-info-on-server-rank' of https://github.com/JD…
JD-ETH Mar 20, 2026
9797567
Merge remote-tracking branch 'upstream/main' into feat/gloo-info-on-s…
JD-ETH Mar 20, 2026
741496c
no get
JD-ETH Mar 20, 2026
824ea3b
rename endpoint
JD-ETH Mar 20, 2026
2aad7d6
optional admin
JD-ETH Mar 20, 2026
1035d69
address comments
JD-ETH Mar 20, 2026
c3dd88a
add file
JD-ETH Mar 20, 2026
9a9cedc
Merge branch 'feat/gloo-info-on-server-rank' of https://github.com/JD…
JD-ETH Mar 20, 2026
366caf9
Merge branch 'main' into feat/gloo-info-on-server-rank
ShangmingCai Mar 23, 2026
dffea01
Auto-derive engine_info_bootstrap_port from --port to avoid conflicts
JD-ETH Mar 26, 2026
6a2d8cc
Fix multi-node bootstrap port: derive from dist_init_addr not --port
JD-ETH Mar 26, 2026
ad00d26
Use NetworkAddress.parse instead of fragile string split for dist_ini…
JD-ETH Mar 26, 2026
85b03b1
Replace auto-derive engine_info_bootstrap_port with fixed default 6789
JD-ETH Mar 26, 2026
3982849
fix test
JD-ETH Mar 28, 2026
1f1b1ab
Merge branch 'main' into feat/gloo-info-on-server-rank
JD-ETH Mar 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions python/sglang/srt/entrypoints/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
import zmq

from sglang.srt.elastic_ep.expert_backup_manager import run_expert_backup_manager
from sglang.srt.entrypoints.engine_info_bootstrap_server import (
EngineInfoBootstrapServer,
)
from sglang.srt.entrypoints.EngineBase import EngineBase
from sglang.srt.managers.data_parallel_controller import (
run_data_parallel_controller_process,
Expand Down Expand Up @@ -80,9 +83,6 @@
from sglang.srt.managers.template_manager import TemplateManager
from sglang.srt.managers.tokenizer_manager import TokenizerManager
from sglang.srt.managers.tokenizer_manager_multiitem_mixin import ScoreResult
from sglang.srt.model_loader.remote_instance_weight_loader_utils import (
parse_remote_instance_transfer_engine_info_from_scheduler_infos,
)
from sglang.srt.observability.trace import process_tracing_init, trace_set_thread_info
from sglang.srt.server_args import PortArgs, ServerArgs
from sglang.srt.utils import (
Expand All @@ -98,7 +98,7 @@
set_prometheus_multiproc_dir,
set_ulimit,
)
from sglang.srt.utils.network import get_zmq_socket
from sglang.srt.utils.network import get_zmq_socket, is_port_available
from sglang.srt.utils.torch_memory_saver_adapter import TorchMemorySaverAdapter
from sglang.srt.utils.watchdog import SubprocessWatchdog
from sglang.version import __version__
Expand All @@ -116,6 +116,7 @@ class SchedulerInitResult:
scheduler_infos: List[Dict[str, Any]]
wait_for_ready: Callable[[], None] = lambda: None
wait_for_completion: Callable[[], None] = lambda: None
engine_info_bootstrap_server: Optional[Any] = None


def init_tokenizer_manager(
Expand Down Expand Up @@ -201,11 +202,11 @@ def __init__(self, **kwargs):
if tokenizer_manager is not None:
tokenizer_manager._subprocess_watchdog = subprocess_watchdog
self.port_args = port_args
self.remote_instance_transfer_engine_info = (
parse_remote_instance_transfer_engine_info_from_scheduler_infos(
scheduler_init_result.scheduler_infos
# Access transfer engine info if bootstrap server is started.
if scheduler_init_result.engine_info_bootstrap_server is not None:
self.remote_instance_transfer_engine_info = (
scheduler_init_result.engine_info_bootstrap_server.transfer_engine_info
)
)

# Initialize ZMQ sockets
context = zmq.Context(2)
Expand Down Expand Up @@ -642,10 +643,30 @@ def _launch_subprocesses(
port_args = PortArgs.init_new(server_args)
logger.info(f"{server_args=}")

# Start the engine info bootstrap server if per-rank info is needed.
engine_info_bootstrap_server = None
if (
server_args.remote_instance_weight_loader_start_seed_via_transfer_engine
and server_args.node_rank == 0
):
bootstrap_port = server_args.engine_info_bootstrap_port
if not is_port_available(bootstrap_port):
raise RuntimeError(
f"engine_info_bootstrap_port {bootstrap_port} is already in use. "
f"When running multiple instances on the same node, each instance must use a "
f"different --engine-info-bootstrap-port."
)
engine_info_bootstrap_server = EngineInfoBootstrapServer(
host=server_args.host, port=bootstrap_port
)

# Launch scheduler processes
scheduler_init_result, scheduler_procs = cls._launch_scheduler_processes(
server_args, port_args, run_scheduler_process_func
)
scheduler_init_result.engine_info_bootstrap_server = (
engine_info_bootstrap_server
)

if (
server_args.enable_elastic_expert_backup
Expand Down
105 changes: 105 additions & 0 deletions python/sglang/srt/entrypoints/engine_info_bootstrap_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright 2023-2024 SGLang Team
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

import logging
import threading
from typing import Dict, Optional, Tuple

import uvicorn
from fastapi import FastAPI, HTTPException
from fastapi.responses import PlainTextResponse

logger = logging.getLogger(__name__)


class EngineInfoBootstrapServer:
"""Lightweight HTTP server for per-rank model info registration.

Runs in a daemon thread on node_rank==0. Each ModelRunner registers its
info via HTTP PUT after model initialization. The Engine
accesses the collected info directly in-process; external consumers can
query via HTTP GET.

Currently supports transfer engine memory registration info.
"""

def __init__(self, host: str, port: int):
self.host = host
self.port = port

# Storage: {tp_rank: (session_id, weights_info_dict)}
self.transfer_engine_info: Dict[int, Tuple] = {}
self.lock = threading.Lock()

app = FastAPI()

@app.get("/health")
def health():
return PlainTextResponse("OK")

@app.put("/register_transfer_engine_info")
def register_transfer_engine_info(data: dict):
try:
tp_rank = data["tp_rank"]
info = data["transfer_engine_info"]
session_id = info["session_id"]
weights_info_dict = info["weights_info_dict"]

with self.lock:
self.transfer_engine_info[tp_rank] = (
session_id,
weights_info_dict,
)

logger.info(
f"Registered transfer engine info for tp_rank={tp_rank}, "
f"session_id={session_id}"
)
return PlainTextResponse("OK")
except Exception as e:
logger.error(f"Failed to register engine info: {e}")
raise HTTPException(status_code=400, detail=str(e))

@app.get("/get_transfer_engine_info")
def get_transfer_engine_info(rank: int):
if rank < 0:
raise HTTPException(status_code=400, detail="Invalid rank parameter")

with self.lock:
info = self.transfer_engine_info.get(rank)

if info is None:
raise HTTPException(
status_code=404,
detail=f"No transfer engine info for rank {rank}",
)

return {"rank": rank, "remote_instance_transfer_engine_info": list(info)}

config = uvicorn.Config(app, host=host, port=port, log_level="warning")
self._server = uvicorn.Server(config)
self._thread = threading.Thread(
target=self._server.run,
daemon=True,
)
self._thread.start()
logger.info(f"EngineInfoBootstrapServer started on {host}:{port}")

def close(self):
self._server.should_exit = True
self._thread.join(timeout=5)

def get_transfer_engine_info(self, rank: int) -> Optional[Tuple]:
"""Direct in-process access for co-located HTTP server (no HTTP round-trip)."""
return self.transfer_engine_info.get(rank)
65 changes: 30 additions & 35 deletions python/sglang/srt/entrypoints/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@
)
from sglang.srt.managers.template_manager import TemplateManager
from sglang.srt.managers.tokenizer_manager import ServerStatus, TokenizerManager
from sglang.srt.model_loader.remote_instance_weight_loader_utils import (
parse_remote_instance_transfer_engine_info_from_scheduler_infos,
)
from sglang.srt.observability.func_timer import enable_func_timer
from sglang.srt.observability.trace import (
process_tracing_init,
Expand Down Expand Up @@ -196,15 +193,6 @@ class _GlobalState:
tokenizer_manager: Union[TokenizerManager, MultiTokenizerRouter, TokenizerWorker]
template_manager: TemplateManager
scheduler_info: Dict
# Dict{
# rank: Tuple(
# session_id,
# Dict{
# name: Tuple (d_ptr, numel, element_size)
# }
# )
# }
remote_instance_transfer_engine_info: Optional[Dict] = None


_global_state: Optional[_GlobalState] = None
Expand Down Expand Up @@ -1030,26 +1018,39 @@ async def send_weights_to_remote_instance(
@app.get("/get_remote_instance_transfer_engine_info")
@auth_level(AuthLevel.ADMIN_OPTIONAL)
async def get_remote_instance_transfer_engine_info(rank: int = None):
if rank is None or rank < 0:
return Response(status_code=HTTPStatus.BAD_REQUEST)
"""Get the server information (deprecated - use /remote_instance_transfer_engine_info instead)."""
logger.warning(
"Endpoint '/get_remote_instance_transfer_engine_info' is deprecated and will be removed in a future version. "
"Please use '/remote_instance_transfer_engine_info' instead."
)
return await remote_instance_transfer_engine_info(rank=rank)

if (
_global_state.remote_instance_transfer_engine_info is None
or len(_global_state.remote_instance_transfer_engine_info) == 0
):
return Response(status_code=HTTPStatus.BAD_REQUEST)

@app.get("/remote_instance_transfer_engine_info")
@auth_level(AuthLevel.ADMIN_OPTIONAL)
async def remote_instance_transfer_engine_info(rank: int = None):
if rank is None or rank < 0:
return ORJSONResponse(
{"error": {"message": "Missing or invalid rank parameter"}},
status_code=HTTPStatus.BAD_REQUEST,
)

server_args = _global_state.tokenizer_manager.server_args
try:
result = {
"rank": rank,
"remote_instance_transfer_engine_info": _global_state.remote_instance_transfer_engine_info[
rank
],
}
return result
except Exception as e:
logger.error(f"Exception: {e}")
return Response(status_code=HTTPStatus.BAD_REQUEST)
resp = requests.get(
f"{server_args.engine_info_bootstrap_url}/get_transfer_engine_info",
params={"rank": rank},
timeout=5,
)
if resp.status_code == 200:
return resp.json()
except (requests.exceptions.RequestException, ValueError) as e:
logger.warning(f"Failed to get transfer engine info for rank {rank}: {e}")

return ORJSONResponse(
{"error": {"message": f"Failed to get transfer engine info for rank {rank}"}},
status_code=HTTPStatus.BAD_REQUEST,
)


@app.post("/init_weights_update_group")
Expand Down Expand Up @@ -1993,18 +1994,12 @@ def _setup_and_run_http_server(

Called by launch_server after subprocesses have been launched.
"""
# Parse info got from the schedulers
remote_instance_transfer_engine_info = (
parse_remote_instance_transfer_engine_info_from_scheduler_infos(scheduler_infos)
)

# Set global states
set_global_state(
_GlobalState(
tokenizer_manager=tokenizer_manager,
template_manager=template_manager,
scheduler_info=scheduler_infos[0],
remote_instance_transfer_engine_info=remote_instance_transfer_engine_info,
)
)

Expand Down
16 changes: 0 additions & 16 deletions python/sglang/srt/managers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1255,19 +1255,6 @@ def get_init_info(self) -> Dict[str, Any]:
"max_req_input_len": self.max_req_input_len,
}

if self.server_args.remote_instance_weight_loader_use_transfer_engine():
(
remote_instance_transfer_engine_session_id,
remote_instance_transfer_engine_weights_info_dict,
) = self.get_remote_instance_transfer_engine_info()
result_dict.update(
{
"tp_rank": self.tp_rank,
"remote_instance_transfer_engine_session_id": remote_instance_transfer_engine_session_id,
"remote_instance_transfer_engine_weights_info_dict": remote_instance_transfer_engine_weights_info_dict,
}
)

return result_dict

def run_event_loop(self) -> None:
Expand Down Expand Up @@ -3377,9 +3364,6 @@ def update_cache_from_scheduler(
):
pass

def get_remote_instance_transfer_engine_info(self):
return self.tp_worker.get_remote_instance_transfer_engine_info()


class IdleSleeper:
"""
Expand Down
6 changes: 0 additions & 6 deletions python/sglang/srt/managers/tp_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,12 +441,6 @@ def _forward_batch_generation_dllm(
can_run_cuda_graph=can_run_cuda_graph,
)

def get_remote_instance_transfer_engine_info(self):
return (
self.model_runner.remote_instance_transfer_engine_session_id,
self.model_runner.remote_instance_transfer_engine_weight_info,
)

def forward_batch_generation(
self,
model_worker_batch: ModelWorkerBatch,
Expand Down
48 changes: 48 additions & 0 deletions python/sglang/srt/model_executor/model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,9 +520,11 @@ def initialize(self, pre_model_load_memory: float):
and self.remote_instance_transfer_engine is not None
and self.remote_instance_transfer_engine_weight_info is None
):
# Register memory and upstream the transfer engine info to the bootstrap server
self.remote_instance_transfer_engine_weight_info = register_memory_region(
self.model, self.remote_instance_transfer_engine
)
self._register_to_engine_info_bootstrap()

# For MTP models like DeepSeek-V3 or GLM-4.5, the MTP layer(s) are used separately as draft
# models for speculative decoding. In those cases, `num_nextn_predict_layers` is used to
Expand Down Expand Up @@ -700,6 +702,52 @@ def remote_instance_init_transfer_engine(self):
local_ip, self.remote_instance_transfer_engine.get_rpc_port()
).to_host_port_str()

def _register_to_engine_info_bootstrap(self):
"""Register transfer engine info with the EngineInfoBootstrapServer via HTTP PUT.

The bootstrap server runs on node_rank==0. For multi-node setups, the
host is derived from dist_init_addr. For single-node, use 127.0.0.1.
"""
import requests as http_requests

if self.server_args.dist_init_addr:
# Multi-node: bootstrap server is on the head node (node_rank==0).
# Derive host from dist_init_addr (shared across all nodes).
bootstrap_host = (
NetworkAddress.parse(self.server_args.dist_init_addr).resolved().host
)
else:
bootstrap_host = "127.0.0.1"

bootstrap_port = self.server_args.engine_info_bootstrap_port
bootstrap_na = NetworkAddress(bootstrap_host, bootstrap_port)
url = f"{bootstrap_na.to_url()}/register_transfer_engine_info"

payload = {
"tp_rank": self.tp_rank,
"transfer_engine_info": {
"session_id": self.remote_instance_transfer_engine_session_id,
"weights_info_dict": self.remote_instance_transfer_engine_weight_info,
},
}

try:
resp = http_requests.put(url, json=payload, timeout=5)
if resp.status_code == 200:
logger.info(
f"Registered transfer engine info for tp_rank={self.tp_rank} "
f"with bootstrap server at {bootstrap_na}"
)
else:
logger.error(
f"Failed to register transfer engine info for tp_rank={self.tp_rank}: "
f"{resp.status_code}, {resp.text}"
)
except Exception as e:
logger.error(
f"Failed to register transfer engine info for tp_rank={self.tp_rank}: {e}"
)

def _publish_modelexpress_metadata(self):
"""Publish TransferEngine metadata to ModelExpress server (seed mode)."""
try:
Expand Down
Loading
Loading