Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions python/sglang/srt/managers/io_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,6 @@ class GenerateReqInput(BaseReq):
input_ids: Optional[Union[List[List[int]], List[int]]] = None
# The embeddings for input_ids; one can specify either text or input_ids or input_embeds.
input_embeds: Optional[Union[List[List[List[float]]], List[List[float]]]] = None
# Embedding overrides to place at specific token positions.
# Runtime type: Optional[Union[PositionalEmbeds, List[Optional[PositionalEmbeds]]]]
# Typed as Any to avoid Pydantic/FastAPI schema errors (PositionalEmbeds contains torch.Tensor).
positional_embed_overrides: Any = None
# The image input. It can be an image instance, file name, URL, or base64 encoded string.
# Can be formatted as:
# - Single image for a single request
Expand Down Expand Up @@ -195,6 +191,10 @@ class GenerateReqInput(BaseReq):
# of `CustomLogitProcessor` in python/sglang/srt/sampling/custom_logit_processor.py
# Use the processor's `to_str()` method to generate the serialized string.
custom_logit_processor: Optional[Union[List[Optional[str]], str]] = None
# Embedding overrides to place at specific token positions.
# Runtime type: Optional[Union[PositionalEmbeds, List[Optional[PositionalEmbeds]]]]
# Typed as Any to avoid Pydantic/FastAPI schema errors (PositionalEmbeds contains torch.Tensor).
positional_embed_overrides: Any = None

# For disaggregated inference
bootstrap_host: Optional[Union[List[str], str]] = None
Expand Down
9 changes: 8 additions & 1 deletion python/sglang/srt/managers/schedule_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,14 @@ def log_time_stats(self):
if self.bootstrap_room is not None
else ""
)
prefix = f"Req Time Stats(rid={self.rid}{bootstrap_info}, input len={len(self.origin_input_ids)}, output len={len(self.output_ids)}, type={self.time_stats.disagg_mode_str()})"
prefix = (
f"ReqTimeStats("
f"rid={self.rid}{bootstrap_info}, "
f"input_len={len(self.origin_input_ids)}, "
f"cached_input_len={self.cached_tokens}, "
f"output_len={len(self.output_ids)}, "
f"type={self.time_stats.disagg_mode_str()})"
)
logger.info(f"{prefix}: {self.time_stats.convert_to_duration()}")
self.has_log_time_stats = True

Expand Down
8 changes: 0 additions & 8 deletions python/sglang/srt/managers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2956,10 +2956,6 @@ def run_batch(
logger.info(f"Scheduler.run_batch sleep {self.forward_sleep_time}s")
time.sleep(self.forward_sleep_time)

# Capture prefill start time for EXTEND mode
if batch.forward_mode == ForwardMode.EXTEND:
set_time_batch(batch.reqs, "set_prefill_run_batch_start_time")

# Place holder handling for pd-disagg decode event loop
if batch.forward_mode.is_prebuilt():
return self._run_batch_prebuilt(batch)
Expand Down Expand Up @@ -3073,10 +3069,6 @@ def run_batch(
pooled_hidden_states=pooler_output.pooled_hidden_states,
)

# Capture prefill end time for EXTEND mode
if batch.forward_mode == ForwardMode.EXTEND:
set_time_batch(batch.reqs, "set_prefill_run_batch_end_time")

if (
self.server_args.enable_dp_attention
and self.server_args.elastic_ep_backend is not None
Expand Down
2 changes: 1 addition & 1 deletion python/sglang/srt/managers/tokenizer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1668,7 +1668,7 @@ async def _handle_batch_output(
"finish_reason": recv_obj.finished_reasons[i],
"prompt_tokens": recv_obj.prompt_tokens[i],
"weight_version": self.server_args.weight_version,
"total_retractions": recv_obj.retraction_counts[i],
"num_retractions": recv_obj.retraction_counts[i],
}

if self.enable_metrics:
Expand Down
23 changes: 19 additions & 4 deletions python/sglang/srt/observability/metrics_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,10 +589,14 @@ def __init__(
documentation="Histogram of queueing time in seconds.",
labelnames=labels.keys(),
buckets=[
0.0,
0.1,
0.2,
0.5,
0.000,
0.001,
0.005,
0.010,
0.050,
0.100,
0.200,
0.500,
1,
2,
3,
Expand Down Expand Up @@ -1318,6 +1322,14 @@ def __init__(
server_args.prompt_tokens_buckets, default_bucket_prompt_tokens
),
)
self.uncached_prompt_tokens_histogram = Histogram(
name="sglang:uncached_prompt_tokens_histogram",
documentation="Histogram of uncached (compute) prompt token length.",
labelnames=labels.keys(),
buckets=generate_buckets(
server_args.prompt_tokens_buckets, default_bucket_prompt_tokens
),
)
self.generation_tokens_histogram = Histogram(
name="sglang:generation_tokens_histogram",
documentation="Histogram of generation token length.",
Expand Down Expand Up @@ -1491,6 +1503,9 @@ def report_cache_source(source: str, value: int):
self.num_so_requests_total.labels(**labels).inc(1)
self.histogram_e2e_request_latency.labels(**labels).observe(float(e2e_latency))
self.prompt_tokens_histogram.labels(**labels).observe(float(prompt_tokens))
self.uncached_prompt_tokens_histogram.labels(**labels).observe(
float(prompt_tokens - cached_tokens)
)
Comment thread
merrymercy marked this conversation as resolved.
self.generation_tokens_histogram.labels(**labels).observe(
float(generation_tokens)
)
Expand Down
121 changes: 45 additions & 76 deletions python/sglang/srt/observability/req_time_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,18 +438,9 @@ def convert_to_output_meta_info(
self.finished_time
)

if (
scheduler_time_stats
and hasattr(scheduler_time_stats, "forward_entry_time")
and self.finished_time > 0.0
):
meta_info["inference_time"] = (
self.finished_time - scheduler_time_stats.forward_entry_time
)

decode_latency = self.get_decode_latency()
if decode_latency > 0.0 and completion_tokens > 0:
meta_info["decode_throughput"] = completion_tokens / decode_latency
if decode_latency > 0.0 and completion_tokens > 1:
meta_info["decode_throughput"] = (completion_tokens - 1) / decode_latency
return meta_info

def convert_to_gen_ai_span_attrs(self):
Expand Down Expand Up @@ -547,8 +538,6 @@ class SchedulerReqTimeStats(ReqTimeStatsBase):
# common, get by time.perf_counter()
wait_queue_entry_time: float = 0.0
forward_entry_time: float = 0.0
prefill_run_batch_start_time: float = 0.0
prefill_run_batch_end_time: float = 0.0
prefill_finished_time: float = 0.0
completion_time: float = 0.0

Expand Down Expand Up @@ -583,6 +572,7 @@ class SchedulerReqTimeStats(ReqTimeStatsBase):
# other
transfer_speed_gb_s: float = 0.0
transfer_total_mb: float = 0.0

# Number of prefill retries for this request
prefill_retry_count: int = 0

Expand All @@ -594,8 +584,6 @@ def __getstate__(self) -> object:
state = {
"wait_queue_entry_time": self.wait_queue_entry_time,
"forward_entry_time": self.forward_entry_time,
"prefill_run_batch_start_time": self.prefill_run_batch_start_time,
"prefill_run_batch_end_time": self.prefill_run_batch_end_time,
"prefill_finished_time": self.prefill_finished_time,
"diff_realtime_monotonic": global_diff_realtime_monotonic,
}
Expand All @@ -607,40 +595,42 @@ def set_scheduler_recv_time(self, ts=None):
self.scheduler_recv_time = ts

def set_spec_draft_start_time(self, ts=None):
if ts is None:
ts = time.perf_counter()
ts = ts or time.perf_counter()
self.spec_draft_start_time = ts

def set_spec_draft_end_time(self, ts=None):
if ts is None:
ts = time.perf_counter()
ts = ts or time.perf_counter()

stage = RequestStage.SPEC_DRAFT
self.trace_slice(stage, self.spec_draft_start_time, ts)
if self.trace_ctx.tracing_enable:
stage = RequestStage.SPEC_DRAFT
self.trace_slice(stage, self.spec_draft_start_time, ts)

def set_spec_verify_start_time(self, ts=None):
if ts is None:
ts = time.perf_counter()
ts = ts or time.perf_counter()
self.spec_verify_start_time = ts

def set_spec_verify_end_time(self, ts=None, accepted_tokens: int = 0):
if ts is None:
ts = time.perf_counter()
stage = RequestStage.SPEC_VERIFY
self.trace_slice(
stage, self.spec_verify_start_time, ts, {"accepted_tokens": accepted_tokens}
)
ts = ts or time.perf_counter()

if self.trace_ctx.tracing_enable:
stage = RequestStage.SPEC_VERIFY
self.trace_slice(
stage,
self.spec_verify_start_time,
ts,
{"accepted_tokens": accepted_tokens},
)

def set_spec_draft_extend_start_time(self, ts=None):
if ts is None:
ts = time.perf_counter()
ts = ts or time.perf_counter()
self.spec_draft_extend_start_time = ts

def set_spec_draft_extend_end_time(self, ts=None):
if ts is None:
ts = time.perf_counter()
stage = RequestStage.SPEC_DRAFT_EXTEND
self.trace_slice(stage, self.spec_draft_extend_start_time, ts)
ts = ts or time.perf_counter()

if self.trace_ctx.tracing_enable:
stage = RequestStage.SPEC_DRAFT_EXTEND
self.trace_slice(stage, self.spec_draft_extend_start_time, ts)

def set_run_batch_cpu_start_time(self, ts=None, attrs=None):
ts = ts or time.perf_counter()
Expand Down Expand Up @@ -721,14 +711,6 @@ def set_forward_entry_time(self, ts=None):
elif self.last_forward_entry_time == 0.0:
self.last_forward_entry_time = ts

def set_prefill_run_batch_start_time(self, ts=None):
ts = ts or time.perf_counter()
self.prefill_run_batch_start_time = ts

def set_prefill_run_batch_end_time(self, ts=None):
ts = ts or time.perf_counter()
self.prefill_run_batch_end_time = ts

def set_last_chunked_prefill_finish_time(self, ts=None):
ts = ts or time.perf_counter()
last_time = self.last_chunked_prefill_finish_time
Expand Down Expand Up @@ -969,19 +951,6 @@ def set_decode_prebuilt_finish_time(self, ts=None):
def get_queueing_time(self) -> float:
return self.forward_entry_time - self.wait_queue_entry_time

def get_prefill_waiting_latency(self) -> Optional[float]:
if self.prefill_run_batch_start_time > 0.0:
return self.prefill_run_batch_start_time - self.forward_entry_time
return None

def get_prefill_launch_latency(self) -> Optional[float]:
if (
self.prefill_run_batch_start_time > 0.0
and self.prefill_run_batch_end_time > 0.0
):
return self.prefill_run_batch_end_time - self.prefill_run_batch_start_time
return None

def convert_to_duration(self) -> str:
if self.disagg_mode == DisaggregationMode.NULL:
queue_duration = self.duration_between(
Expand All @@ -996,7 +965,7 @@ def convert_to_duration(self) -> str:
queue_duration >= 0 and forward_duration >= 0
), f"queue_duration={queue_duration} < 0 or forward_duration={forward_duration} < 0"

return f"queue_duration={self.format_duration(queue_duration)}, forward_duration={self.format_duration(forward_duration)}, start_time={self.wait_queue_entry_time:.3f}"
return f"queue_duration={self.format_duration(queue_duration)}, forward_duration={self.format_duration(forward_duration)}, entry_time={self.format_wallclock(self.wait_queue_entry_time)}"
elif self.disagg_mode == DisaggregationMode.PREFILL:
bootstrap_queue_duration = self.duration_between(
self.prefill_bootstrap_queue_entry_time, self.wait_queue_entry_time
Expand Down Expand Up @@ -1028,21 +997,20 @@ def convert_to_duration(self) -> str:
assert (
bootstrap_duration >= 0 and alloc_wait_duration >= 0
), f"bootstrap_duration={bootstrap_duration} < 0 or alloc_wait_duration={alloc_wait_duration} < 0"
bootstrap_breakdown = (
f"= bootstrap({self.format_duration(bootstrap_duration)}) "
f"+ alloc_wait({self.format_duration(alloc_wait_duration)}); "
bootstrap_fields = (
f"bootstrap_duration={self.format_duration(bootstrap_duration)}, "
f"alloc_wait_duration={self.format_duration(alloc_wait_duration)}, "
)
else:
bootstrap_breakdown = ""
bootstrap_fields = f"bootstrap_queue_duration={self.format_duration(bootstrap_queue_duration)}, "

return (
f"bootstrap_queue_duration({self.format_duration(bootstrap_queue_duration)}) "
f"{bootstrap_breakdown}"
f"{bootstrap_fields}"
f"queue_duration={self.format_duration(queue_duration)}, "
f"forward_duration={self.format_duration(forward_duration)}, "
f"start={self.prefill_bootstrap_queue_entry_time:.3f}, "
f"transfer_speed={self.transfer_speed_gb_s:.2f}GB/s, "
f"transfer_total={self.transfer_total_mb:.2f}MB, "
f"entry_time={self.format_wallclock(self.prefill_bootstrap_queue_entry_time)}, "
f"transfer_speed={self.transfer_speed_gb_s:.2f} GB/s, "
f"transfer_total={self.transfer_total_mb:.2f} MB, "
f"#retries={self.prefill_retry_count}"
)
elif self.disagg_mode == DisaggregationMode.DECODE:
Expand Down Expand Up @@ -1084,20 +1052,19 @@ def convert_to_duration(self) -> str:
assert (
bootstrap_duration >= 0 and alloc_wait_duration >= 0
), f"bootstrap_duration={bootstrap_duration} < 0 or alloc_wait_duration={alloc_wait_duration} < 0"
prealloc_breakdown = (
f"= bootstrap({self.format_duration(bootstrap_duration)}) "
f"+ alloc_wait({self.format_duration(alloc_wait_duration)}); "
prealloc_fields = (
f"bootstrap_duration={self.format_duration(bootstrap_duration)}, "
f"alloc_wait_duration={self.format_duration(alloc_wait_duration)}, "
)
else:
prealloc_breakdown = ""
prealloc_fields = f"prealloc_queue_duration={self.format_duration(prealloc_duration)}, "

return (
f"prealloc_queue_duration({self.format_duration(prealloc_duration)}) "
f"{prealloc_breakdown}"
f"transfer_duration={self.format_duration(transfer_duration)}; "
f"{prealloc_fields}"
f"transfer_duration={self.format_duration(transfer_duration)}, "
f"queue_duration={self.format_duration(queue_duration)}, "
f"forward_duration={self.format_duration(forward_duration)}, "
f"start={self.decode_prealloc_queue_entry_time:.3f}"
f"entry_time={self.format_wallclock(self.decode_prealloc_queue_entry_time)}"
)
else:
return "Unknown Time Stats"
Expand All @@ -1115,8 +1082,6 @@ def convert_to_output_meta_info(self):
meta_data.update(
{
"queue_time": self.get_queueing_time(),
"prefill_waiting_latency": self.get_prefill_waiting_latency(),
"prefill_launch_latency": self.get_prefill_launch_latency(),
}
)
return meta_data
Expand All @@ -1129,6 +1094,10 @@ def duration_between(self, start: float, end: float) -> float:
return 0.0
return end - start

@staticmethod
def format_wallclock(perf_counter_time: float) -> str:
return f"{convert_time_to_realtime(perf_counter_time):.3f}"


def set_schedule_time_batch(batch: ScheduleBatch):
# only for tracing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,3 +962,4 @@ def update_device_timer(self: Scheduler):
def reset_device_timer_window(self: Scheduler):
if ENABLE_METRICS_DEVICE_TIMER:
self._device_timer_window_batch_count = 0
self.fwd_occupancy = float("nan")
4 changes: 3 additions & 1 deletion python/sglang/srt/utils/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ def _create_logger_with_handler(name: str, handler: logging.Handler) -> logging.
logger.setLevel(logging.INFO)
logger.propagate = False
if not logger.handlers:
handler.setFormatter(logging.Formatter("%(message)s"))
handler.setFormatter(
logging.Formatter("[%(asctime)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
)
logger.addHandler(handler)
return logger

Expand Down
2 changes: 2 additions & 0 deletions python/sglang/srt/utils/request_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ def _compute_metadata(
"input_embeds",
"image_data",
"audio_data",
"video_data",
"lora_path",
"sampling_params",
}
Expand All @@ -217,6 +218,7 @@ def _compute_metadata(
"input_embeds",
"image_data",
"audio_data",
"video_data",
"lora_path",
}
out_skip_names = {"text", "output_ids", "embedding"}
Expand Down
Loading
Loading