From 632ee90e5280670489b9908f994fb78a8b80cfc1 Mon Sep 17 00:00:00 2001 From: Liangsheng Yin Date: Tue, 24 Feb 2026 18:42:49 -0800 Subject: [PATCH 1/5] [PD-Disagg] Fix bootstrap server race condition when prefill workers not yet registered The bootstrap server starts accepting HTTP requests before any prefill worker has registered via PUT. If a decode worker queries server info during this window, PrefillServerInfo is constructed with None fields, causing TypeError crash. Fix: track registered worker count on bootstrap server, return 503 until all workers registered; add client-side retry in ensure_parallel_info. Co-authored-by: Cursor --- .../sglang/srt/disaggregation/common/conn.py | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/python/sglang/srt/disaggregation/common/conn.py b/python/sglang/srt/disaggregation/common/conn.py index 844c847afe5f..d8b3581754af 100644 --- a/python/sglang/srt/disaggregation/common/conn.py +++ b/python/sglang/srt/disaggregation/common/conn.py @@ -135,13 +135,26 @@ def record_failure(self, bootstrap_room: int, failure_reason: str): with self.failure_lock: self.failure_records[bootstrap_room] = failure_reason - def ensure_parallel_info(self, bootstrap_addr: str) -> bool: + def ensure_parallel_info( + self, bootstrap_addr: str, max_retries: int = 20, retry_interval: float = 1.0 + ) -> bool: """Fetch and cache prefill parallel info if not yet available. Returns True if info is available (cached or freshly fetched). + Retries with backoff if the prefill server hasn't registered yet. """ if bootstrap_addr in self.prefill_info_table: return True - info = self._fetch_prefill_server_info(bootstrap_addr) + info = None + for attempt in range(max_retries): + info = self._fetch_prefill_server_info(bootstrap_addr) + if info is not None: + break + if attempt < max_retries - 1: + logger.info( + f"Prefill server info not available from {bootstrap_addr}, " + f"retrying ({attempt + 1}/{max_retries})..." + ) + time.sleep(retry_interval) if info is None: return False @@ -573,6 +586,7 @@ def __init__(self, host: str, port: int, dp_size: int = 1): int, Dict[int, Dict[int, Dict[str, Union[str, int]]]] ] = {} self.room_to_dp_rank: Dict[int, Dict[str, Union[int, float]]] = {} + self._registered_count = 0 self.entry_cleanup_interval = ( envs.SGLANG_DISAGGREGATION_BOOTSTRAP_ENTRY_CLEANUP_INTERVAL.get() ) @@ -584,6 +598,12 @@ def __init__(self, host: str, port: int, dp_size: int = 1): def run(self): self.thread.start() + def _is_ready(self) -> bool: + if self.attn_tp_size is None or self.pp_size is None: + return False + expected = self.dp_size * self.attn_tp_size * self.pp_size + return self._registered_count >= expected + def _setup_routes(self): self.app.router.add_route("*", "/route", self._handle_route) self.app.router.add_post("/register_dp_rank", self._handle_register_dp_rank) @@ -654,8 +674,10 @@ async def _handle_route_put(self, request: web.Request): "rank_ip": rank_ip, "rank_port": rank_port, } + self._registered_count += 1 logger.debug( f"Register prefill bootstrap: DP{dp_group} TP{attn_tp_rank} PP{pp_rank} with rank_ip: {rank_ip} and rank_port: {rank_port}" + f" ({self._registered_count}/{self.dp_size * self.attn_tp_size * self.pp_size} registered)" ) return web.Response(text="OK", status=200) @@ -672,6 +694,12 @@ async def _handle_route_get(self, request: web.Request): and int(prefill_dp_rank) == -1 and int(target_pp_rank) == -1 ): + if not self._is_ready(): + return web.Response( + text=f"Prefill server not fully registered yet" + f" ({self._registered_count} workers registered).", + status=503, + ) info = PrefillServerInfo( attn_tp_size=self.attn_tp_size, dp_size=self.dp_size, From b17e28cfe2d5decddd5723bffae6503bc88d03cf Mon Sep 17 00:00:00 2001 From: Liangsheng Yin Date: Tue, 24 Feb 2026 19:08:22 -0800 Subject: [PATCH 2/5] Simplify _is_ready() to just check _registered_count > 0 Only need at least one prefill worker registered for metadata to be available. Specific worker queries have their own error handling. Co-authored-by: Cursor --- python/sglang/srt/disaggregation/common/conn.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/python/sglang/srt/disaggregation/common/conn.py b/python/sglang/srt/disaggregation/common/conn.py index d8b3581754af..9314f91b0d19 100644 --- a/python/sglang/srt/disaggregation/common/conn.py +++ b/python/sglang/srt/disaggregation/common/conn.py @@ -599,10 +599,7 @@ def run(self): self.thread.start() def _is_ready(self) -> bool: - if self.attn_tp_size is None or self.pp_size is None: - return False - expected = self.dp_size * self.attn_tp_size * self.pp_size - return self._registered_count >= expected + return self._registered_count > 0 def _setup_routes(self): self.app.router.add_route("*", "/route", self._handle_route) @@ -677,7 +674,7 @@ async def _handle_route_put(self, request: web.Request): self._registered_count += 1 logger.debug( f"Register prefill bootstrap: DP{dp_group} TP{attn_tp_rank} PP{pp_rank} with rank_ip: {rank_ip} and rank_port: {rank_port}" - f" ({self._registered_count}/{self.dp_size * self.attn_tp_size * self.pp_size} registered)" + f" ({self._registered_count} registered)" ) return web.Response(text="OK", status=200) From 6c06c978c8bf3fce2cfb8aa1e5b74bc4cf8abe3c Mon Sep 17 00:00:00 2001 From: Liangsheng Yin Date: Tue, 24 Feb 2026 19:17:38 -0800 Subject: [PATCH 3/5] Require all workers registered before serving any query Revert _is_ready() to check full worker count (dp * tp * pp). Also guard specific worker queries with _is_ready() and handle KeyError to prevent server crash on missing entries. Co-authored-by: Cursor --- .../sglang/srt/disaggregation/common/conn.py | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/python/sglang/srt/disaggregation/common/conn.py b/python/sglang/srt/disaggregation/common/conn.py index 9314f91b0d19..007762510208 100644 --- a/python/sglang/srt/disaggregation/common/conn.py +++ b/python/sglang/srt/disaggregation/common/conn.py @@ -599,7 +599,10 @@ def run(self): self.thread.start() def _is_ready(self) -> bool: - return self._registered_count > 0 + if self.attn_tp_size is None or self.pp_size is None: + return False + expected = self.dp_size * self.attn_tp_size * self.pp_size + return self._registered_count >= expected def _setup_routes(self): self.app.router.add_route("*", "/route", self._handle_route) @@ -672,9 +675,10 @@ async def _handle_route_put(self, request: web.Request): "rank_port": rank_port, } self._registered_count += 1 + expected = self.dp_size * self.attn_tp_size * self.pp_size logger.debug( f"Register prefill bootstrap: DP{dp_group} TP{attn_tp_rank} PP{pp_rank} with rank_ip: {rank_ip} and rank_port: {rank_port}" - f" ({self._registered_count} registered)" + f" ({self._registered_count}/{expected} registered)" ) return web.Response(text="OK", status=200) @@ -710,11 +714,25 @@ async def _handle_route_get(self, request: web.Request): ) return web.json_response(dataclasses.asdict(info), status=200) + if not self._is_ready(): + return web.Response( + text=f"Prefill server not fully registered yet" + f" ({self._registered_count} workers registered).", + status=503, + ) + # Find corresponding prefill info - async with self.lock: - bootstrap_info = self.prefill_port_table[int(prefill_dp_rank)][ - int(engine_rank) - ][int(target_pp_rank)] + try: + async with self.lock: + bootstrap_info = self.prefill_port_table[int(prefill_dp_rank)][ + int(engine_rank) + ][int(target_pp_rank)] + except KeyError: + return web.Response( + text=f"Bootstrap info not found for dp_rank={prefill_dp_rank} " + f"engine_rank={engine_rank} pp_rank={target_pp_rank}", + status=404, + ) if bootstrap_info is not None: return web.json_response(bootstrap_info, status=200) From bfcbd2cfb488a84b30885222515a4102a9436118 Mon Sep 17 00:00:00 2001 From: Liangsheng Yin Date: Tue, 24 Feb 2026 19:19:25 -0800 Subject: [PATCH 4/5] Add TODO comment for expected worker count calculation Co-authored-by: Cursor --- python/sglang/srt/disaggregation/common/conn.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/sglang/srt/disaggregation/common/conn.py b/python/sglang/srt/disaggregation/common/conn.py index 007762510208..ee36b391240d 100644 --- a/python/sglang/srt/disaggregation/common/conn.py +++ b/python/sglang/srt/disaggregation/common/conn.py @@ -601,6 +601,8 @@ def run(self): def _is_ready(self) -> bool: if self.attn_tp_size is None or self.pp_size is None: return False + # TODO: verify this expected count is correct for all parallelism + # combinations (CP / DP attention / system DP / TP / PP). expected = self.dp_size * self.attn_tp_size * self.pp_size return self._registered_count >= expected From 570113da3742b8c9dad92f80601b5a906cb02ee1 Mon Sep 17 00:00:00 2001 From: Liangsheng Yin Date: Tue, 24 Feb 2026 19:30:13 -0800 Subject: [PATCH 5/5] remove deadcode and simplify --- python/sglang/srt/disaggregation/common/conn.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python/sglang/srt/disaggregation/common/conn.py b/python/sglang/srt/disaggregation/common/conn.py index ee36b391240d..6030d76989c1 100644 --- a/python/sglang/srt/disaggregation/common/conn.py +++ b/python/sglang/srt/disaggregation/common/conn.py @@ -736,10 +736,7 @@ async def _handle_route_get(self, request: web.Request): status=404, ) - if bootstrap_info is not None: - return web.json_response(bootstrap_info, status=200) - else: - return web.Response(text="Bootstrap info not Found", status=404) + return web.json_response(bootstrap_info, status=200) async def _handle_register_dp_rank(self, request: web.Request): data = await request.json()