-
Notifications
You must be signed in to change notification settings - Fork 691
feat: allow framework tokenization/detokenization #3134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds bootstrap host/port flags to disagg.sh; stops forcing skip_tokenizer_init in args parsing; updates protocol models to allow either token or OpenAI chat request; adjusts registration to select tokenizer mode and pass new runtime params; refactors handlers to route inputs via a new helper and split streaming into token vs text paths. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant H as Prefill/Decode Handler
participant B as BaseWorkerHandler
participant TM as TokenizerManager
participant E as Engine
Note over C,H: Request may be token-based or OpenAI chat (messages)
C->>H: request (sampling, stream, disagg bootstrap)
H->>B: _get_input_param(request)
alt skip_tokenizer_init = True (token path)
B-->>H: { input_ids }
else skip_tokenizer_init = False (text/chat path)
B->>TM: apply_chat_template(messages)
TM-->>B: prompt (text)
B-->>H: { prompt }
end
H->>E: async_generate(**input_param, sampling_params, stream, bootstrap...)
alt Stream tokens
E-->>H: { output_ids, finish_reason? } (stream)
H-->>C: token deltas / finish
else Stream text (OpenAI-style)
E-->>H: { text, finish_reason? } (stream)
H-->>C: chat.completion.chunk deltas / finish
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Pre-merge checks❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
components/backends/sglang/src/dynamo/sglang/request_handlers/handler_base.py (1)
39-53: Remove raw print and avoid leaking request content.Printing full requests risks PII leakage and breaks logging hygiene. Use structured logging (or drop logging entirely) and guard for missing keys.
- def _get_input_param(self, request: dict) -> dict: - """Get the appropriate input parameter for SGLang""" - print(request) + def _get_input_param(self, request: dict) -> dict: + """Get the appropriate input parameter for SGLang.""" if self.skip_tokenizer_init: - return {"input_ids": request["token_ids"]} + try: + return {"input_ids": request["token_ids"]} + except KeyError as e: + raise ValueError("Missing 'token_ids' when skip_tokenizer_init=True") from e else: - # use sglang's chat templating itself but leave tokenization to the - # interal engine's TokenizerManager + # Use sglang's chat templating but leave tokenization to the + # internal engine's TokenizerManager prompt = self.engine.tokenizer_manager.tokenizer.apply_chat_template( request["messages"], tokenize=False, add_generation_prompt=True ) return {"prompt": prompt}components/backends/sglang/src/dynamo/sglang/register.py (1)
35-45: Add runtime_config to Python stub (regenerate stubs)Rust binding exposes runtime_config (lib/bindings/python/rust/lib.rs — #[pyo3(signature = (..., runtime_config=None, ...))]) but the stub lib/bindings/python/src/dynamo/_core.pyi omits it. Update the .pyi or regenerate the Python bindings so async def register_llm(..., runtime_config=...) matches the runtime; callers (e.g. components/backends/sglang/src/dynamo/sglang/register.py) currently pass runtime_config.
components/backends/sglang/src/dynamo/sglang/request_handlers/decode_handler.py (2)
69-81: Bug:requestis a JSON string; decode and pass nested fields; fix payload sent to prefillCurrent code passes the raw string into helpers and forwards a string under
"request"to the prefill worker, which expects a dict (prefill_handler.generateusesreq["request"]). Decode first; passreq["request"]to_get_input_param; send a structured dict to the prefill worker.- async def generate(self, request: str): - sampling_params = self._build_sampling_params(request) - input_param = self._get_input_param(request) + async def generate(self, request: str): + req = msgspec.json.decode(request, type=dict) if isinstance(request, (str, bytes, bytearray)) else request + sampling_params = self._build_sampling_params(req) + input_param = self._get_input_param(req["request"]) @@ - prefill_stream = await self.prefill_client.generate( - DisaggPreprocessedRequest( - request=request, - sampling_params=sampling_params, - ).model_dump_json() - ) + prefill_stream = await self.prefill_client.generate( + DisaggPreprocessedRequest( + request=req["request"], + sampling_params=sampling_params, + ).model_dump_json() + )Also applies to: 90-97
137-166: Remove debug print (PII risk) and make text streaming multi-choice safe
print(res)can leak user content. Use debug logging or drop it.- Track emitted lengths per
indexto support multiple choices; include"role"only on first chunk per OpenAI convention.- async def _process_text_stream(self, stream_source): - """Process stream for text input mode""" - count = 0 - - async for res in stream_source: - print(res) - index = res.get("index", 0) - text = res.get("text", "") - - finish_reason = res["meta_info"]["finish_reason"] - finish_reason_type = finish_reason["type"] if finish_reason else None - next_count = len(text) - delta = text[count:] - - choice_data = { - "index": index, - "delta": {"role": "assistant", "content": delta}, - "finish_reason": finish_reason_type, - } - - response = { - "id": res["meta_info"]["id"], - "created": int(time.time()), - "choices": [choice_data], - "model": self.config.server_args.served_model_name, - "object": "chat.completion.chunk", - } - yield response - count = next_count + async def _process_text_stream(self, stream_source): + """Process stream for text input mode""" + emitted_len_by_index: dict[int, int] = {} + + async for res in stream_source: + index = res.get("index", 0) + text = res.get("text", "") or "" + + finish_reason = res["meta_info"]["finish_reason"] + finish_reason_type = finish_reason["type"] if finish_reason else None + + prev_len = emitted_len_by_index.get(index, 0) + next_len = len(text) + delta_text = text[prev_len:] + + delta_payload = {"content": delta_text} + if prev_len == 0: + delta_payload["role"] = "assistant" + + choice_data = { + "index": index, + "delta": delta_payload, + "finish_reason": finish_reason_type, + } + + response = { + "id": res["meta_info"]["id"], + "created": int(time.time()), + "choices": [choice_data], + "model": self.config.server_args.served_model_name, + "object": "chat.completion.chunk", + } + yield response + emitted_len_by_index[index] = next_len
🧹 Nitpick comments (5)
components/backends/sglang/launch/disagg.sh (1)
29-33: Parameterize host/port; consider managing the decode PID as well.
- Hardcoding 0.0.0.0 and 12345 is brittle and can be unsafe in shared environments. Make them env‑overridable.
- Decode runs in the foreground and isn’t included in cleanup. Either background it and track DECODE_PID or document the intent.
Apply this diff to improve resiliency and cleanup:
@@ #!/bin/bash +set -Eeuo pipefail @@ # run prefill worker python3 -m dynamo.sglang \ @@ - --disaggregation-bootstrap-port 12345 \ - --host 0.0.0.0 \ + --disaggregation-bootstrap-port "${BOOTSTRAP_PORT:-12345}" \ + --host "${HOST:-0.0.0.0}" \ --disaggregation-transfer-backend nixl \ --skip-tokenizer-init & PREFILL_PID=$! @@ -# run decode worker -CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.sglang \ +# run decode worker +CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.sglang \ @@ - --disaggregation-bootstrap-port 12345 \ - --host 0.0.0.0 \ + --disaggregation-bootstrap-port "${BOOTSTRAP_PORT:-12345}" \ + --host "${HOST:-0.0.0.0}" \ --disaggregation-transfer-backend nixl \ - --skip-tokenizer-init + --skip-tokenizer-init & +DECODE_PID=$! @@ cleanup() { echo "Cleaning up background processes..." - kill $DYNAMO_PID $PREFILL_PID 2>/dev/null || true - wait $DYNAMO_PID $PREFILL_PID 2>/dev/null || true + kill $DYNAMO_PID $PREFILL_PID ${DECODE_PID:-} 2>/dev/null || true + wait $DYNAMO_PID $PREFILL_PID ${DECODE_PID:-} 2>/dev/null || true echo "Cleanup complete." }If keeping decode in the foreground by design, please confirm signal/cleanup behavior (Ctrl‑C on the script should also terminate prefill and ingress).
Also applies to: 43-46
components/backends/sglang/src/dynamo/sglang/register.py (1)
48-50: Log full traceback on registration failure.Use logging.exception to capture stack traces.
- except Exception as e: - logging.error(f"Failed to register with runtime config: {e}") + except Exception: + logging.exception("Failed to register with runtime config") return Falsecomponents/backends/sglang/src/dynamo/sglang/protocol.py (1)
4-5: Fix import order (pre-commit isort failure).Move third‑party imports after stdlib and keep a blank line between groups.
-from typing import List, Optional, Union -from sglang.srt.entrypoints.openai.protocol import ChatCompletionRequest +from typing import List, Optional, Union + +from pydantic import BaseModel, Field +from sglang.srt.entrypoints.openai.protocol import ChatCompletionRequest - -from pydantic import BaseModel, FieldRun: pre-commit run -a
components/backends/sglang/src/dynamo/sglang/request_handlers/decode_handler.py (2)
44-67: Make sampling param builder robust to multiple frontend shapes
- If
sampling_paramsalready provided, use it directly.- Token format: accept either
max_new_tokensormax_tokens.- OpenAI format: forward
stopif present.def _build_sampling_params(self, request: dict) -> dict: - """Build sampling params depending on request from frontend""" - if self.skip_tokenizer_init: + """Build sampling params depending on request from frontend""" + # If already normalized by caller/aggregator, just pass through. + if "sampling_params" in request and isinstance(request["sampling_params"], dict): + return request["sampling_params"] + if self.skip_tokenizer_init: # Token-based request format sampling_opts = request.get("sampling_options", {}) stop_conditions = request.get("stop_conditions", {}) param_mapping = { "temperature": sampling_opts.get("temperature"), "top_p": sampling_opts.get("top_p"), "top_k": sampling_opts.get("top_k"), - "max_new_tokens": stop_conditions.get("max_tokens"), + "max_new_tokens": stop_conditions.get("max_new_tokens", stop_conditions.get("max_tokens")), "ignore_eos": stop_conditions.get("ignore_eos"), } else: # OpenAI request format param_mapping = { "temperature": request.get("temperature"), "top_p": request.get("top_p"), "top_k": request.get("top_k"), "max_new_tokens": request.get("max_tokens"), + "stop": request.get("stop"), } return {k: v for k, v in param_mapping.items() if v is not None}
125-134: Preserve original KeyError context and trim messageRaise from the caught
KeyErrorto satisfy Ruff B904 and shorten message per TRY003.- except KeyError: - raise ValueError( - f"Missing 'output_ids' in response. Response keys: {list(res.keys())}" - ) + except KeyError as err: + raise ValueError("Missing 'output_ids' in stream response") from err
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
components/backends/sglang/launch/disagg.sh(2 hunks)components/backends/sglang/src/dynamo/sglang/args.py(0 hunks)components/backends/sglang/src/dynamo/sglang/protocol.py(2 hunks)components/backends/sglang/src/dynamo/sglang/register.py(1 hunks)components/backends/sglang/src/dynamo/sglang/request_handlers/decode_handler.py(3 hunks)components/backends/sglang/src/dynamo/sglang/request_handlers/handler_base.py(1 hunks)components/backends/sglang/src/dynamo/sglang/request_handlers/prefill_handler.py(1 hunks)
💤 Files with no reviewable changes (1)
- components/backends/sglang/src/dynamo/sglang/args.py
🧰 Additional context used
🧬 Code graph analysis (4)
components/backends/sglang/src/dynamo/sglang/request_handlers/prefill_handler.py (1)
components/backends/sglang/src/dynamo/sglang/request_handlers/handler_base.py (1)
_get_input_param(39-52)
components/backends/sglang/src/dynamo/sglang/request_handlers/handler_base.py (2)
components/backends/sglang/src/dynamo/sglang/request_handlers/decode_handler.py (2)
generate(69-116)cleanup(39-42)components/backends/sglang/src/dynamo/sglang/request_handlers/prefill_handler.py (2)
generate(49-72)cleanup(30-33)
components/backends/sglang/src/dynamo/sglang/register.py (2)
lib/bindings/python/src/dynamo/_core.pyi (3)
ModelInput(844-846)ModelType(848-850)register_llm(864-878)lib/bindings/python/rust/lib.rs (1)
register_llm(146-215)
components/backends/sglang/src/dynamo/sglang/request_handlers/decode_handler.py (2)
components/backends/sglang/src/dynamo/sglang/request_handlers/prefill_handler.py (1)
generate(49-72)components/backends/sglang/src/dynamo/sglang/request_handlers/handler_base.py (2)
generate(33-34)_get_input_param(39-52)
🪛 GitHub Actions: Pre Merge Validation of (ai-dynamo/dynamo/refs/pull/3134/merge) by ishandhanani.
components/backends/sglang/src/dynamo/sglang/protocol.py
[error] 2-9: pre-commit: isort reformatted protocol.py (import order).
components/backends/sglang/src/dynamo/sglang/request_handlers/handler_base.py
[error] 42-52: pre-commit: black reformatted handler_base.py (code formatting).
components/backends/sglang/src/dynamo/sglang/request_handlers/decode_handler.py
[error] 47-56: pre-commit: black reformatted decode_handler.py (region 1).
[error] 60-69: pre-commit: black reformatted decode_handler.py (region 2).
🪛 Ruff (0.12.2)
components/backends/sglang/src/dynamo/sglang/request_handlers/handler_base.py
36-37: BaseWorkerHandler.cleanup is an empty method in an abstract base class, but has no abstract decorator
(B027)
components/backends/sglang/src/dynamo/sglang/request_handlers/decode_handler.py
129-131: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
129-131: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Build and Test - dynamo
🔇 Additional comments (7)
components/backends/sglang/src/dynamo/sglang/register.py (2)
27-35: Tokenizer mode switch logic: LGTM.Defaulting to Tokens|Chat+Completions and downgrading to Text|Chat when skip_tokenizer_init=False is coherent with the runtime.
Please confirm that v1/completions is intentionally unavailable when Text mode is selected (skip_tokenizer_init=False).
35-45: Pass-through of runtime and cache params: LGTM.Using page_size for kv_cache_block_size and wiring migration_limit/runtime_config looks correct.
components/backends/sglang/src/dynamo/sglang/request_handlers/prefill_handler.py (1)
61-65: Switch to _get_input_param and kwargs expansion: LGTM.This cleanly supports both token and text inputs.
components/backends/sglang/src/dynamo/sglang/protocol.py (1)
47-50: Union request type: LGTM.Allowing PreprocessedRequest or ChatCompletionRequest aligns with the new input routing.
Confirm downstream consumers only rely on “token_ids” or “messages” accordingly.
components/backends/sglang/src/dynamo/sglang/request_handlers/handler_base.py (2)
30-30: Source of truth for skip_tokenizer_init: LGTM.Reading from config.server_args is consistent with CLI behavior.
42-52: Black formatting failure.Pre-commit reformatted this block; run pre-commit to stabilize formatting.
Run: pre-commit run -a
components/backends/sglang/src/dynamo/sglang/request_handlers/decode_handler.py (1)
47-56: Run pre-commit (Black) locally and commit formatting changesCI flagged Black reformatting in components/backends/sglang/src/dynamo/sglang/request_handlers/decode_handler.py (lines 47–56 and 60–69). Locally run:
git config --global user.email "[email protected]"
git config --global user.name "Your Name"
pip install pre-commit
pre-commit install
pre-commit run -a
git add -A && git commit -m "chore: apply black to decode_handler.py"
components/backends/sglang/src/dynamo/sglang/request_handlers/decode_handler.py
Show resolved
Hide resolved
components/backends/sglang/src/dynamo/sglang/request_handlers/handler_base.py
Show resolved
Hide resolved
|
@coderabbitai can you do a thorough analysis on if we will be hitting any performance snags on the skip-tokenizer-init path? |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chainI'll conduct a thorough performance analysis of the skip-tokenizer-init path. Let me examine the codebase to understand the implementation details and identify potential performance implications. @ishandhanani: Based on my analysis of the codebase, here's a comprehensive performance analysis of the skip-tokenizer-init path: Performance Analysis SummarySkip-Tokenizer-Init Path (skip_tokenizer_init=True)Advantages:
Potential Performance Concerns: 1. Memory Serialization OverheadThe
2. Streaming Performance SplitThe code introduces two separate streaming paths: # Token streaming - processes output_ids arrays
async def _process_token_stream(self, stream_source)
# Text streaming - constructs chat completion responses
async def _process_text_stream(self, stream_source)The token streaming path has potential issues:
3. Protocol Layer OverheadThe
4. Debug OverheadThe code includes async def _process_text_stream(self, stream_source):
async for res in stream_source:
print(res) # This should be removed5. Engine IntegrationThe Recommendations for Performance Optimization
Overall AssessmentThe skip-tokenizer-init path should generally be faster for CPU-bound workloads since it eliminates tokenization overhead. However, it may have higher memory and network costs for large sequences. The performance trade-off will depend heavily on:
I recommend running benchmarks with your typical workload patterns to validate these theoretical performance characteristics. 🏁 Script executed: Length of output: 10644 🏁 Script executed: Length of output: 12938 🏁 Script executed: Length of output: 19218 🏁 Script executed: Length of output: 4174 Based on my thorough analysis of the codebase, here's a comprehensive performance assessment of the skip-tokenizer-init path: Detailed Performance AnalysisSkip-Tokenizer-Init=True Path Performance Profile🚀 Performance Advantages:
🔍 Identified Performance Bottlenecks1. Critical: Array Slicing in Token Stream Processingasync def _process_token_stream(self, stream_source):
num_output_tokens_so_far = 0
async for res in stream_source:
# PERFORMANCE BOTTLENECK: Array slicing on every iteration
out = {"token_ids": res["output_ids"][num_output_tokens_so_far:]}
num_output_tokens_so_far = next_total_toksImpact: O(n) array slicing operation on every stream chunk. For long sequences, this creates quadratic memory copying behavior. 2. Serialization Overhead in Disaggregation# Heavy serialization for disaggregated requests
prefill_stream = await self.prefill_client.generate(
DisaggPreprocessedRequest(
request=request,
sampling_params=sampling_params,
).model_dump_json() # PERFORMANCE COST: Pydantic serialization
)Impact:
3. Debug Overhead (Production Issue)def _get_input_param(self, request: dict) -> dict:
print(request) # REMOVE FOR PRODUCTION
async def _process_text_stream(self, stream_source):
async for res in stream_source:
print(res) # REMOVE FOR PRODUCTIONImpact: Synchronous I/O operations in async context, potential logging buffer overhead. 4. JSON Processing Overhead# In prefill handler
req = msgspec.json.decode(request, type=dict) # Fast but still overheadImpact: While 📊 Performance Comparison Analysis
🎯 Specific Performance RecommendationsCritical Fixes:
Architecture Optimizations:
⚡ Expected Performance CharacteristicsSkip-tokenizer-init=True will be faster when:
Skip-tokenizer-init=False may be faster when:
🔬 Benchmarking RecommendationsTest both paths with:
The performance winner will depend heavily on your specific deployment characteristics and workload patterns. 🐰✨ ✏️ Learnings added
|
|
/ok to test 99ba666 |
|
/ok to test a33fbe9 |
rmccorm4
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving to unblock, but probably want to lean towards uniform CLI arg across backends for specifying that backend will do the preprocessing. Similar to other dynamo specific cli args that don't go through framework argparser.
|
/ok to test f27c5e5 |
Co-authored-by: Ubuntu <[email protected]> Signed-off-by: Jason Zhou <[email protected]>
Co-authored-by: Ubuntu <[email protected]> Signed-off-by: athreesh <[email protected]>
Co-authored-by: Ubuntu <[email protected]> Signed-off-by: Jason Zhou <[email protected]>
Co-authored-by: Ubuntu <[email protected]> Signed-off-by: Kyle H <[email protected]>
Summary by CodeRabbit
New Features
Improvements