diff --git a/python/sglang/srt/managers/io_struct.py b/python/sglang/srt/managers/io_struct.py index a573e672a8ae..f3e04bf7425c 100644 --- a/python/sglang/srt/managers/io_struct.py +++ b/python/sglang/srt/managers/io_struct.py @@ -139,10 +139,6 @@ class GenerateReqInput(BaseReq): input_ids: Optional[Union[List[List[int]], List[int]]] = None # The embeddings for input_ids; one can specify either text or input_ids or input_embeds. input_embeds: Optional[Union[List[List[List[float]]], List[List[float]]]] = None - # Embedding overrides to place at specific token positions. - # Runtime type: Optional[Union[PositionalEmbeds, List[Optional[PositionalEmbeds]]]] - # Typed as Any to avoid Pydantic/FastAPI schema errors (PositionalEmbeds contains torch.Tensor). - positional_embed_overrides: Any = None # The image input. It can be an image instance, file name, URL, or base64 encoded string. # Can be formatted as: # - Single image for a single request @@ -195,6 +191,10 @@ class GenerateReqInput(BaseReq): # of `CustomLogitProcessor` in python/sglang/srt/sampling/custom_logit_processor.py # Use the processor's `to_str()` method to generate the serialized string. custom_logit_processor: Optional[Union[List[Optional[str]], str]] = None + # Embedding overrides to place at specific token positions. + # Runtime type: Optional[Union[PositionalEmbeds, List[Optional[PositionalEmbeds]]]] + # Typed as Any to avoid Pydantic/FastAPI schema errors (PositionalEmbeds contains torch.Tensor). + positional_embed_overrides: Any = None # For disaggregated inference bootstrap_host: Optional[Union[List[str], str]] = None diff --git a/python/sglang/srt/managers/schedule_batch.py b/python/sglang/srt/managers/schedule_batch.py index 6f6b404757ce..5aa8bd298fc8 100755 --- a/python/sglang/srt/managers/schedule_batch.py +++ b/python/sglang/srt/managers/schedule_batch.py @@ -1298,7 +1298,14 @@ def log_time_stats(self): if self.bootstrap_room is not None else "" ) - prefix = f"Req Time Stats(rid={self.rid}{bootstrap_info}, input len={len(self.origin_input_ids)}, output len={len(self.output_ids)}, type={self.time_stats.disagg_mode_str()})" + prefix = ( + f"ReqTimeStats(" + f"rid={self.rid}{bootstrap_info}, " + f"input_len={len(self.origin_input_ids)}, " + f"cached_input_len={self.cached_tokens}, " + f"output_len={len(self.output_ids)}, " + f"type={self.time_stats.disagg_mode_str()})" + ) logger.info(f"{prefix}: {self.time_stats.convert_to_duration()}") self.has_log_time_stats = True diff --git a/python/sglang/srt/managers/scheduler.py b/python/sglang/srt/managers/scheduler.py index 22710db85e21..39917179d608 100644 --- a/python/sglang/srt/managers/scheduler.py +++ b/python/sglang/srt/managers/scheduler.py @@ -2956,10 +2956,6 @@ def run_batch( logger.info(f"Scheduler.run_batch sleep {self.forward_sleep_time}s") time.sleep(self.forward_sleep_time) - # Capture prefill start time for EXTEND mode - if batch.forward_mode == ForwardMode.EXTEND: - set_time_batch(batch.reqs, "set_prefill_run_batch_start_time") - # Place holder handling for pd-disagg decode event loop if batch.forward_mode.is_prebuilt(): return self._run_batch_prebuilt(batch) @@ -3073,10 +3069,6 @@ def run_batch( pooled_hidden_states=pooler_output.pooled_hidden_states, ) - # Capture prefill end time for EXTEND mode - if batch.forward_mode == ForwardMode.EXTEND: - set_time_batch(batch.reqs, "set_prefill_run_batch_end_time") - if ( self.server_args.enable_dp_attention and self.server_args.elastic_ep_backend is not None diff --git a/python/sglang/srt/managers/tokenizer_manager.py b/python/sglang/srt/managers/tokenizer_manager.py index d3ab092c5b5e..c3b4005abe90 100644 --- a/python/sglang/srt/managers/tokenizer_manager.py +++ b/python/sglang/srt/managers/tokenizer_manager.py @@ -1668,7 +1668,7 @@ async def _handle_batch_output( "finish_reason": recv_obj.finished_reasons[i], "prompt_tokens": recv_obj.prompt_tokens[i], "weight_version": self.server_args.weight_version, - "total_retractions": recv_obj.retraction_counts[i], + "num_retractions": recv_obj.retraction_counts[i], } if self.enable_metrics: diff --git a/python/sglang/srt/observability/metrics_collector.py b/python/sglang/srt/observability/metrics_collector.py index e7196dcbc1c2..4b0edd32eceb 100644 --- a/python/sglang/srt/observability/metrics_collector.py +++ b/python/sglang/srt/observability/metrics_collector.py @@ -589,10 +589,14 @@ def __init__( documentation="Histogram of queueing time in seconds.", labelnames=labels.keys(), buckets=[ - 0.0, - 0.1, - 0.2, - 0.5, + 0.000, + 0.001, + 0.005, + 0.010, + 0.050, + 0.100, + 0.200, + 0.500, 1, 2, 3, @@ -1318,6 +1322,14 @@ def __init__( server_args.prompt_tokens_buckets, default_bucket_prompt_tokens ), ) + self.uncached_prompt_tokens_histogram = Histogram( + name="sglang:uncached_prompt_tokens_histogram", + documentation="Histogram of uncached (compute) prompt token length.", + labelnames=labels.keys(), + buckets=generate_buckets( + server_args.prompt_tokens_buckets, default_bucket_prompt_tokens + ), + ) self.generation_tokens_histogram = Histogram( name="sglang:generation_tokens_histogram", documentation="Histogram of generation token length.", @@ -1491,6 +1503,9 @@ def report_cache_source(source: str, value: int): self.num_so_requests_total.labels(**labels).inc(1) self.histogram_e2e_request_latency.labels(**labels).observe(float(e2e_latency)) self.prompt_tokens_histogram.labels(**labels).observe(float(prompt_tokens)) + self.uncached_prompt_tokens_histogram.labels(**labels).observe( + float(prompt_tokens - cached_tokens) + ) self.generation_tokens_histogram.labels(**labels).observe( float(generation_tokens) ) diff --git a/python/sglang/srt/observability/req_time_stats.py b/python/sglang/srt/observability/req_time_stats.py index 0ad222392bb1..bda40b85cedf 100644 --- a/python/sglang/srt/observability/req_time_stats.py +++ b/python/sglang/srt/observability/req_time_stats.py @@ -438,18 +438,9 @@ def convert_to_output_meta_info( self.finished_time ) - if ( - scheduler_time_stats - and hasattr(scheduler_time_stats, "forward_entry_time") - and self.finished_time > 0.0 - ): - meta_info["inference_time"] = ( - self.finished_time - scheduler_time_stats.forward_entry_time - ) - decode_latency = self.get_decode_latency() - if decode_latency > 0.0 and completion_tokens > 0: - meta_info["decode_throughput"] = completion_tokens / decode_latency + if decode_latency > 0.0 and completion_tokens > 1: + meta_info["decode_throughput"] = (completion_tokens - 1) / decode_latency return meta_info def convert_to_gen_ai_span_attrs(self): @@ -547,8 +538,6 @@ class SchedulerReqTimeStats(ReqTimeStatsBase): # common, get by time.perf_counter() wait_queue_entry_time: float = 0.0 forward_entry_time: float = 0.0 - prefill_run_batch_start_time: float = 0.0 - prefill_run_batch_end_time: float = 0.0 prefill_finished_time: float = 0.0 completion_time: float = 0.0 @@ -583,6 +572,7 @@ class SchedulerReqTimeStats(ReqTimeStatsBase): # other transfer_speed_gb_s: float = 0.0 transfer_total_mb: float = 0.0 + # Number of prefill retries for this request prefill_retry_count: int = 0 @@ -594,8 +584,6 @@ def __getstate__(self) -> object: state = { "wait_queue_entry_time": self.wait_queue_entry_time, "forward_entry_time": self.forward_entry_time, - "prefill_run_batch_start_time": self.prefill_run_batch_start_time, - "prefill_run_batch_end_time": self.prefill_run_batch_end_time, "prefill_finished_time": self.prefill_finished_time, "diff_realtime_monotonic": global_diff_realtime_monotonic, } @@ -607,40 +595,42 @@ def set_scheduler_recv_time(self, ts=None): self.scheduler_recv_time = ts def set_spec_draft_start_time(self, ts=None): - if ts is None: - ts = time.perf_counter() + ts = ts or time.perf_counter() self.spec_draft_start_time = ts def set_spec_draft_end_time(self, ts=None): - if ts is None: - ts = time.perf_counter() + ts = ts or time.perf_counter() - stage = RequestStage.SPEC_DRAFT - self.trace_slice(stage, self.spec_draft_start_time, ts) + if self.trace_ctx.tracing_enable: + stage = RequestStage.SPEC_DRAFT + self.trace_slice(stage, self.spec_draft_start_time, ts) def set_spec_verify_start_time(self, ts=None): - if ts is None: - ts = time.perf_counter() + ts = ts or time.perf_counter() self.spec_verify_start_time = ts def set_spec_verify_end_time(self, ts=None, accepted_tokens: int = 0): - if ts is None: - ts = time.perf_counter() - stage = RequestStage.SPEC_VERIFY - self.trace_slice( - stage, self.spec_verify_start_time, ts, {"accepted_tokens": accepted_tokens} - ) + ts = ts or time.perf_counter() + + if self.trace_ctx.tracing_enable: + stage = RequestStage.SPEC_VERIFY + self.trace_slice( + stage, + self.spec_verify_start_time, + ts, + {"accepted_tokens": accepted_tokens}, + ) def set_spec_draft_extend_start_time(self, ts=None): - if ts is None: - ts = time.perf_counter() + ts = ts or time.perf_counter() self.spec_draft_extend_start_time = ts def set_spec_draft_extend_end_time(self, ts=None): - if ts is None: - ts = time.perf_counter() - stage = RequestStage.SPEC_DRAFT_EXTEND - self.trace_slice(stage, self.spec_draft_extend_start_time, ts) + ts = ts or time.perf_counter() + + if self.trace_ctx.tracing_enable: + stage = RequestStage.SPEC_DRAFT_EXTEND + self.trace_slice(stage, self.spec_draft_extend_start_time, ts) def set_run_batch_cpu_start_time(self, ts=None, attrs=None): ts = ts or time.perf_counter() @@ -721,14 +711,6 @@ def set_forward_entry_time(self, ts=None): elif self.last_forward_entry_time == 0.0: self.last_forward_entry_time = ts - def set_prefill_run_batch_start_time(self, ts=None): - ts = ts or time.perf_counter() - self.prefill_run_batch_start_time = ts - - def set_prefill_run_batch_end_time(self, ts=None): - ts = ts or time.perf_counter() - self.prefill_run_batch_end_time = ts - def set_last_chunked_prefill_finish_time(self, ts=None): ts = ts or time.perf_counter() last_time = self.last_chunked_prefill_finish_time @@ -969,19 +951,6 @@ def set_decode_prebuilt_finish_time(self, ts=None): def get_queueing_time(self) -> float: return self.forward_entry_time - self.wait_queue_entry_time - def get_prefill_waiting_latency(self) -> Optional[float]: - if self.prefill_run_batch_start_time > 0.0: - return self.prefill_run_batch_start_time - self.forward_entry_time - return None - - def get_prefill_launch_latency(self) -> Optional[float]: - if ( - self.prefill_run_batch_start_time > 0.0 - and self.prefill_run_batch_end_time > 0.0 - ): - return self.prefill_run_batch_end_time - self.prefill_run_batch_start_time - return None - def convert_to_duration(self) -> str: if self.disagg_mode == DisaggregationMode.NULL: queue_duration = self.duration_between( @@ -996,7 +965,7 @@ def convert_to_duration(self) -> str: queue_duration >= 0 and forward_duration >= 0 ), f"queue_duration={queue_duration} < 0 or forward_duration={forward_duration} < 0" - return f"queue_duration={self.format_duration(queue_duration)}, forward_duration={self.format_duration(forward_duration)}, start_time={self.wait_queue_entry_time:.3f}" + return f"queue_duration={self.format_duration(queue_duration)}, forward_duration={self.format_duration(forward_duration)}, entry_time={self.format_wallclock(self.wait_queue_entry_time)}" elif self.disagg_mode == DisaggregationMode.PREFILL: bootstrap_queue_duration = self.duration_between( self.prefill_bootstrap_queue_entry_time, self.wait_queue_entry_time @@ -1028,21 +997,20 @@ def convert_to_duration(self) -> str: assert ( bootstrap_duration >= 0 and alloc_wait_duration >= 0 ), f"bootstrap_duration={bootstrap_duration} < 0 or alloc_wait_duration={alloc_wait_duration} < 0" - bootstrap_breakdown = ( - f"= bootstrap({self.format_duration(bootstrap_duration)}) " - f"+ alloc_wait({self.format_duration(alloc_wait_duration)}); " + bootstrap_fields = ( + f"bootstrap_duration={self.format_duration(bootstrap_duration)}, " + f"alloc_wait_duration={self.format_duration(alloc_wait_duration)}, " ) else: - bootstrap_breakdown = "" + bootstrap_fields = f"bootstrap_queue_duration={self.format_duration(bootstrap_queue_duration)}, " return ( - f"bootstrap_queue_duration({self.format_duration(bootstrap_queue_duration)}) " - f"{bootstrap_breakdown}" + f"{bootstrap_fields}" f"queue_duration={self.format_duration(queue_duration)}, " f"forward_duration={self.format_duration(forward_duration)}, " - f"start={self.prefill_bootstrap_queue_entry_time:.3f}, " - f"transfer_speed={self.transfer_speed_gb_s:.2f}GB/s, " - f"transfer_total={self.transfer_total_mb:.2f}MB, " + f"entry_time={self.format_wallclock(self.prefill_bootstrap_queue_entry_time)}, " + f"transfer_speed={self.transfer_speed_gb_s:.2f} GB/s, " + f"transfer_total={self.transfer_total_mb:.2f} MB, " f"#retries={self.prefill_retry_count}" ) elif self.disagg_mode == DisaggregationMode.DECODE: @@ -1084,20 +1052,19 @@ def convert_to_duration(self) -> str: assert ( bootstrap_duration >= 0 and alloc_wait_duration >= 0 ), f"bootstrap_duration={bootstrap_duration} < 0 or alloc_wait_duration={alloc_wait_duration} < 0" - prealloc_breakdown = ( - f"= bootstrap({self.format_duration(bootstrap_duration)}) " - f"+ alloc_wait({self.format_duration(alloc_wait_duration)}); " + prealloc_fields = ( + f"bootstrap_duration={self.format_duration(bootstrap_duration)}, " + f"alloc_wait_duration={self.format_duration(alloc_wait_duration)}, " ) else: - prealloc_breakdown = "" + prealloc_fields = f"prealloc_queue_duration={self.format_duration(prealloc_duration)}, " return ( - f"prealloc_queue_duration({self.format_duration(prealloc_duration)}) " - f"{prealloc_breakdown}" - f"transfer_duration={self.format_duration(transfer_duration)}; " + f"{prealloc_fields}" + f"transfer_duration={self.format_duration(transfer_duration)}, " f"queue_duration={self.format_duration(queue_duration)}, " f"forward_duration={self.format_duration(forward_duration)}, " - f"start={self.decode_prealloc_queue_entry_time:.3f}" + f"entry_time={self.format_wallclock(self.decode_prealloc_queue_entry_time)}" ) else: return "Unknown Time Stats" @@ -1115,8 +1082,6 @@ def convert_to_output_meta_info(self): meta_data.update( { "queue_time": self.get_queueing_time(), - "prefill_waiting_latency": self.get_prefill_waiting_latency(), - "prefill_launch_latency": self.get_prefill_launch_latency(), } ) return meta_data @@ -1129,6 +1094,10 @@ def duration_between(self, start: float, end: float) -> float: return 0.0 return end - start + @staticmethod + def format_wallclock(perf_counter_time: float) -> str: + return f"{convert_time_to_realtime(perf_counter_time):.3f}" + def set_schedule_time_batch(batch: ScheduleBatch): # only for tracing diff --git a/python/sglang/srt/observability/scheduler_metrics_mixin.py b/python/sglang/srt/observability/scheduler_metrics_mixin.py index ef9ad407620a..06b1596b8873 100644 --- a/python/sglang/srt/observability/scheduler_metrics_mixin.py +++ b/python/sglang/srt/observability/scheduler_metrics_mixin.py @@ -962,3 +962,4 @@ def update_device_timer(self: Scheduler): def reset_device_timer_window(self: Scheduler): if ENABLE_METRICS_DEVICE_TIMER: self._device_timer_window_batch_count = 0 + self.fwd_occupancy = float("nan") diff --git a/python/sglang/srt/utils/log_utils.py b/python/sglang/srt/utils/log_utils.py index 5279616f0773..2d64ca8b9f2f 100644 --- a/python/sglang/srt/utils/log_utils.py +++ b/python/sglang/srt/utils/log_utils.py @@ -50,7 +50,9 @@ def _create_logger_with_handler(name: str, handler: logging.Handler) -> logging. logger.setLevel(logging.INFO) logger.propagate = False if not logger.handlers: - handler.setFormatter(logging.Formatter("%(message)s")) + handler.setFormatter( + logging.Formatter("[%(asctime)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S") + ) logger.addHandler(handler) return logger diff --git a/python/sglang/srt/utils/request_logger.py b/python/sglang/srt/utils/request_logger.py index 75baba4f9450..45373103d890 100644 --- a/python/sglang/srt/utils/request_logger.py +++ b/python/sglang/srt/utils/request_logger.py @@ -205,6 +205,7 @@ def _compute_metadata( "input_embeds", "image_data", "audio_data", + "video_data", "lora_path", "sampling_params", } @@ -217,6 +218,7 @@ def _compute_metadata( "input_embeds", "image_data", "audio_data", + "video_data", "lora_path", } out_skip_names = {"text", "output_ids", "embedding"} diff --git a/test/registered/distributed/test_disaggregation_different_tp.py b/test/registered/distributed/test_disaggregation_different_tp.py index f3ed6ad35f26..fc6215685853 100644 --- a/test/registered/distributed/test_disaggregation_different_tp.py +++ b/test/registered/distributed/test_disaggregation_different_tp.py @@ -48,6 +48,8 @@ def start_prefill(cls): cls.bootstrap_port, "--tp", "4", + "--enable-metrics", + "--enable-request-time-stats-logging", ] prefill_args += cls.transfer_backend + cls.rdma_devices cls.process_prefill = popen_launch_pd_server( @@ -69,6 +71,8 @@ def start_decode(cls): "2", "--base-gpu-id", "4", + "--enable-metrics", + "--enable-request-time-stats-logging", ] decode_args += cls.transfer_backend + cls.rdma_devices cls.process_decode = popen_launch_pd_server( @@ -123,6 +127,8 @@ def start_prefill(cls): cls.bootstrap_port, "--tp", "2", + "--enable-metrics", + "--enable-request-time-stats-logging", ] prefill_args += cls.transfer_backend + cls.rdma_devices cls.process_prefill = popen_launch_pd_server( @@ -144,6 +150,8 @@ def start_decode(cls): "4", "--base-gpu-id", "4", + "--enable-metrics", + "--enable-request-time-stats-logging", ] decode_args += cls.transfer_backend + cls.rdma_devices cls.process_decode = popen_launch_pd_server( @@ -198,6 +206,8 @@ def start_prefill(cls): cls.bootstrap_port, "--tp", "4", + "--enable-metrics", + "--enable-request-time-stats-logging", ] prefill_args += cls.transfer_backend + cls.rdma_devices cls.process_prefill = popen_launch_pd_server( @@ -219,6 +229,8 @@ def start_decode(cls): "2", "--base-gpu-id", "4", + "--enable-metrics", + "--enable-request-time-stats-logging", ] decode_args += cls.transfer_backend + cls.rdma_devices cls.process_decode = popen_launch_pd_server( @@ -273,6 +285,8 @@ def start_prefill(cls): cls.bootstrap_port, "--tp", "2", + "--enable-metrics", + "--enable-request-time-stats-logging", ] prefill_args += cls.transfer_backend + cls.rdma_devices cls.process_prefill = popen_launch_pd_server( @@ -294,6 +308,8 @@ def start_decode(cls): "4", "--base-gpu-id", "4", + "--enable-metrics", + "--enable-request-time-stats-logging", ] decode_args += cls.transfer_backend + cls.rdma_devices cls.process_decode = popen_launch_pd_server( @@ -354,6 +370,8 @@ def start_prefill(cls): cls.bootstrap_port, "--tp", "4", + "--enable-metrics", + "--enable-request-time-stats-logging", ] prefill_args += cls.transfer_backend + cls.rdma_devices env = {**os.environ, **STAGING_ENV} @@ -377,6 +395,8 @@ def start_decode(cls): "2", "--base-gpu-id", "4", + "--enable-metrics", + "--enable-request-time-stats-logging", ] decode_args += cls.transfer_backend + cls.rdma_devices env = {**os.environ, **STAGING_ENV} @@ -431,6 +451,8 @@ def start_prefill(cls): cls.bootstrap_port, "--tp", "2", + "--enable-metrics", + "--enable-request-time-stats-logging", ] prefill_args += cls.transfer_backend + cls.rdma_devices env = {**os.environ, **STAGING_ENV} @@ -454,6 +476,8 @@ def start_decode(cls): "4", "--base-gpu-id", "4", + "--enable-metrics", + "--enable-request-time-stats-logging", ] decode_args += cls.transfer_backend + cls.rdma_devices env = {**os.environ, **STAGING_ENV} diff --git a/test/registered/utils/test_log_utils.py b/test/registered/utils/test_log_utils.py index b3007e2200ae..b65d89017cef 100644 --- a/test/registered/utils/test_log_utils.py +++ b/test/registered/utils/test_log_utils.py @@ -1,5 +1,6 @@ import io import json +import re import tempfile import unittest import uuid @@ -11,6 +12,8 @@ register_cpu_ci(est_time=6, suite="stage-a-test-cpu") +_LOG_PREFIX_RE = re.compile(r"^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\] ") + class TestLogUtils(unittest.TestCase): def test_stdout(self): @@ -23,7 +26,7 @@ def test_stdout(self): ) self.assertEqual(len(loggers), 1) log_json(loggers[0], "test.event", {"key": "value"}) - data = json.loads(buf.getvalue().strip()) + data = _parse_log_json(buf.getvalue().strip()) self.assertIn("timestamp", data) self.assertEqual(data["event"], "test.event") self.assertEqual(data["key"], "value") @@ -52,13 +55,18 @@ def test_multiple_targets(self): self.assertEqual(len(loggers), 2) log_json(loggers, "multi.event", {"x": 1}) _flush_all(loggers) - stdout_data = json.loads(buf.getvalue().strip()) + stdout_data = _parse_log_json(buf.getvalue().strip()) file_data = _read_log_file(temp_dir) self.assertEqual(stdout_data["event"], "multi.event") self.assertEqual(file_data["event"], "multi.event") self.assertEqual(stdout_data["x"], file_data["x"]) +def _parse_log_json(line: str) -> dict: + """Strip the ``[YYYY-MM-DD HH:MM:SS] `` prefix added by the formatter.""" + return json.loads(_LOG_PREFIX_RE.sub("", line)) + + def _flush_all(loggers: list) -> None: for logger in loggers: for handler in logger.handlers: @@ -68,7 +76,7 @@ def _flush_all(loggers: list) -> None: def _read_log_file(temp_dir: str) -> dict: log_files = list(Path(temp_dir).glob("*.log")) assert len(log_files) == 1 - return json.loads(log_files[0].read_text().strip()) + return _parse_log_json(log_files[0].read_text().strip()) if __name__ == "__main__":