From 3b904c6ad46e2a223aa27292a21c375fa489a02d Mon Sep 17 00:00:00 2001 From: Zhang Haotong Date: Mon, 11 Aug 2025 11:59:55 +0800 Subject: [PATCH 01/15] [feat] Add OpenTelemetry tracing Signed-off-by: Zhang Haotong --- requirements.txt | 5 + tensorrt_llm/_utils.py | 18 +++- tensorrt_llm/commands/serve.py | 14 ++- tensorrt_llm/executor/executor.py | 3 + tensorrt_llm/executor/request.py | 3 + tensorrt_llm/executor/result.py | 76 +++++++++++++++ tensorrt_llm/llmapi/llm.py | 13 +++ tensorrt_llm/llmapi/llm_args.py | 5 + tensorrt_llm/llmapi/otel_tracing.py | 137 ++++++++++++++++++++++++++++ tensorrt_llm/serve/openai_server.py | 23 ++++- 10 files changed, 293 insertions(+), 4 deletions(-) create mode 100644 tensorrt_llm/llmapi/otel_tracing.py diff --git a/requirements.txt b/requirements.txt index 44954f1f71e..400df1448ed 100644 --- a/requirements.txt +++ b/requirements.txt @@ -63,6 +63,11 @@ meson ninja etcd3 blake3 +# observation.tracing +opentelemetry-sdk +opentelemetry-api +opentelemetry-exporter-otlp +opentelemetry-semantic-conventions-ai soundfile triton==3.3.1; platform_machine == "x86_64" tiktoken diff --git a/tensorrt_llm/_utils.py b/tensorrt_llm/_utils.py index 70bd33b0bea..f2727351115 100644 --- a/tensorrt_llm/_utils.py +++ b/tensorrt_llm/_utils.py @@ -28,13 +28,14 @@ from enum import EnumMeta from functools import lru_cache, partial, wraps from pathlib import Path -from typing import Any, Dict, List, Optional, Sequence, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Union import numpy as np import nvtx from mpi4py import MPI from mpi4py.util import pkl5 from packaging import version +from typing_extensions import ParamSpec # isort: off import torch @@ -1143,3 +1144,18 @@ def set_prometheus_multiproc_dir() -> object: os.environ["PROMETHEUS_MULTIPROC_DIR"] = prometheus_multiproc_dir.name logger.info( f"PROMETHEUS_MULTIPROC_DIR: {os.environ['PROMETHEUS_MULTIPROC_DIR']}") + + +P = ParamSpec("P") + + +# From: https://stackoverflow.com/a/4104188/2749989 +def run_once(f: Callable[P, None]) -> Callable[P, None]: + + def wrapper(*args: P.args, **kwargs: P.kwargs) -> None: + if not wrapper.has_run: # type: ignore[attr-defined] + wrapper.has_run = True # type: ignore[attr-defined] + return f(*args, **kwargs) + + wrapper.has_run = False # type: ignore[attr-defined] + return wrapper diff --git a/tensorrt_llm/commands/serve.py b/tensorrt_llm/commands/serve.py index 3d8bfd27418..306c06bc6eb 100644 --- a/tensorrt_llm/commands/serve.py +++ b/tensorrt_llm/commands/serve.py @@ -86,6 +86,7 @@ def get_llm_args(model: str, trust_remote_code: bool = False, reasoning_parser: Optional[str] = None, fail_fast_on_attention_window_too_large: bool = False, + otlp_traces_endpoint: Optional[str] = None, **llm_args_extra_dict: Any): if gpus_per_node is None: @@ -147,6 +148,7 @@ def get_llm_args(model: str, reasoning_parser, "fail_fast_on_attention_window_too_large": fail_fast_on_attention_window_too_large, + "otlp_traces_endpoint": otlp_traces_endpoint, } return llm_args, llm_args_extra_dict @@ -303,6 +305,12 @@ def launch_mm_encoder_server( help= "Exit with runtime error when attention window is too large to fit even a single sequence in the KV cache." ) +@click.option( + "--otlp_traces_endpoint", + type=str, + default=None, + help="Target URL to which OpenTelemetry traces will be sent." +) def serve( model: str, tokenizer: Optional[str], host: str, port: int, log_level: str, backend: str, max_beam_width: int, max_batch_size: int, @@ -312,7 +320,8 @@ def serve( num_postprocess_workers: int, trust_remote_code: bool, extra_llm_api_options: Optional[str], reasoning_parser: Optional[str], metadata_server_config_file: Optional[str], server_role: Optional[str], - fail_fast_on_attention_window_too_large: bool): + fail_fast_on_attention_window_too_large: bool, + otlp_traces_endpoint: Optional[str]): """Running an OpenAI API compatible server MODEL: model name | HF checkpoint path | TensorRT engine path @@ -337,7 +346,8 @@ def serve( trust_remote_code=trust_remote_code, reasoning_parser=reasoning_parser, fail_fast_on_attention_window_too_large= - fail_fast_on_attention_window_too_large) + fail_fast_on_attention_window_too_large, + otlp_traces_endpoint=otlp_traces_endpoint) llm_args_extra_dict = {} if extra_llm_api_options is not None: diff --git a/tensorrt_llm/executor/executor.py b/tensorrt_llm/executor/executor.py index c9d55a7cfc1..5e6840591d7 100644 --- a/tensorrt_llm/executor/executor.py +++ b/tensorrt_llm/executor/executor.py @@ -5,6 +5,7 @@ import signal import traceback from abc import ABC, abstractmethod +from collections.abc import Mapping from pathlib import Path from queue import Queue from typing import (TYPE_CHECKING, AsyncIterable, Generator, List, Optional, @@ -121,6 +122,7 @@ def generate_async( streaming: bool = False, kv_cache_retention_config: Optional[KvCacheRetentionConfig] = None, disaggregated_params: Optional[DisaggregatedParams] = None, + trace_headers: Optional[Mapping[str, str]] = None, postproc_params: Optional[PostprocParams] = None, multimodal_params: Optional[MultimodalParams] = None, scheduling_params: Optional[SchedulingParams] = None, @@ -147,6 +149,7 @@ def generate_async( streaming=streaming, kv_cache_retention_config=kv_cache_retention_config, disaggregated_params=disaggregated_params, + trace_headers=trace_headers, multimodal_params=multimodal_params, scheduling_params=scheduling_params, cache_salt_id=cache_salt_id) diff --git a/tensorrt_llm/executor/request.py b/tensorrt_llm/executor/request.py index 1030e57f091..853ee006adc 100644 --- a/tensorrt_llm/executor/request.py +++ b/tensorrt_llm/executor/request.py @@ -1,4 +1,5 @@ import os +from collections.abc import Mapping from dataclasses import dataclass from typing import List, Optional, Union @@ -94,6 +95,7 @@ def __init__( streaming: bool = False, kv_cache_retention_config: Optional[KvCacheRetentionConfig] = None, disaggregated_params: Optional[DisaggregatedParams] = None, + trace_headers: Optional[Mapping[str, str]] = None, postproc_params: Optional[PostprocParams] = None, multimodal_params: Optional[MultimodalParams] = None, scheduling_params: Optional[SchedulingParams] = None, @@ -122,6 +124,7 @@ def __init__( self.kv_cache_retention_config = kv_cache_retention_config self.id: Optional[int] = None self.disaggregated_params = disaggregated_params + self.trace_headers = trace_headers self.scheduling_params = scheduling_params self.cache_salt_id = cache_salt_id diff --git a/tensorrt_llm/executor/result.py b/tensorrt_llm/executor/result.py index f3ba0bced03..b6437aa8425 100644 --- a/tensorrt_llm/executor/result.py +++ b/tensorrt_llm/executor/result.py @@ -1,5 +1,6 @@ import asyncio import json +import time import weakref from dataclasses import dataclass, field from queue import Empty, Queue @@ -10,6 +11,10 @@ import torch import torch.nn.functional as F +from tensorrt_llm.llmapi.otel_tracing import ( + SpanAttributes, SpanKind, extract_trace_context, global_otlp_tracer, + insufficient_request_metrics_warning) + from .._utils import nvtx_range_debug from ..bindings import executor as tllm from ..disaggregated_params import DisaggregatedParams @@ -163,6 +168,7 @@ def __init__(self, self.avg_decoded_tokens_per_iter: Optional[float] = None self._done = False self.metrics_dict = {} + self.trace_headers = None if has_event_loop(): self.aqueue = AsyncQueue() @@ -296,6 +302,7 @@ def _handle_sequence(self, raise ValueError( f"Unknown finish reason: {finish_reasons[src_idx]}") self.record_stats(output, req_perf_metrics_dict) + self.do_tracing(output, req_perf_metrics_dict) @nvtx_range_debug("handle_response", color="red", @@ -401,6 +408,74 @@ def record_stats(self, metrics_stats.update(processed_metrics_stat) self.metrics_dict = metrics_stats + def do_tracing( + self, + output: CompletionOutput, + req_perf_metrics_dict: Optional[dict[str, float]] = None, + ): + if not global_otlp_tracer(): + return + + metrics_dict = self.metrics_dict + if not metrics_dict or not req_perf_metrics_dict: + # Insufficient request metrics available; trace generation aborted. + insufficient_request_metrics_warning() + return + + trace_context = extract_trace_context(self.trace_headers) + sampling_params = self.sampling_params + + # TODO: Add request arrival time + arrival_time = time.time() - metrics_dict.get(MetricNames.E2E, -1) + with global_otlp_tracer().start_as_current_span( + "llm_request", + kind=SpanKind.SERVER, + context=trace_context, + start_time=int(arrival_time * 1e9), + ) as span: + + def safe_set_attr(span, attr, value): + if value is not None: + span.set_attribute(attr, value) + + e2e_time = metrics_dict.get(MetricNames.E2E, -1) + safe_set_attr( + span, + SpanAttributes.GEN_AI_REQUEST_TEMPERATURE, + sampling_params.temperature, + ) + safe_set_attr(span, SpanAttributes.GEN_AI_REQUEST_TOP_P, + sampling_params.top_p) + safe_set_attr( + span, + SpanAttributes.GEN_AI_REQUEST_MAX_TOKENS, + sampling_params.max_tokens, + ) + safe_set_attr(span, SpanAttributes.GEN_AI_REQUEST_N, + sampling_params.n) + # TODO: Add prompt info in result base + safe_set_attr( + span, + SpanAttributes.GEN_AI_USAGE_PROMPT_TOKENS, + getattr(self.postproc_params.postproc_args, "num_prompt_tokens", + None) if self.postproc_params + and self.postproc_params.postproc_args else None, + ) + safe_set_attr(span, SpanAttributes.GEN_AI_USAGE_COMPLETION_TOKENS, + output.length) + safe_set_attr( + span, + SpanAttributes.GEN_AI_LATENCY_TIME_TO_FIRST_TOKEN, + metrics_dict.get(MetricNames.TTFT, -1), + ) + safe_set_attr(span, SpanAttributes.GEN_AI_LATENCY_E2E, e2e_time) + safe_set_attr(span, SpanAttributes.GEN_AI_REQUEST_ID, self.id) + safe_set_attr( + span, + SpanAttributes.GEN_AI_LATENCY_TIME_IN_QUEUE, + metrics_dict.get(MetricNames.REQUEST_QUEUE_TIME, -1), + ) + class DetokenizedGenerationResultBase(GenerationResultBase): ''' The base class for the generation result with detokenization support. ''' @@ -511,6 +586,7 @@ def __init__( self.disaggregated_params = disaggregated_params # minimal sampling params needed for logprob calculation self._logprob_params = logprob_params + self.trace_headers = generation_request.trace_headers # for aborting the request self._executor: Optional[weakref.ReferenceType[ diff --git a/tensorrt_llm/llmapi/llm.py b/tensorrt_llm/llmapi/llm.py index b4c7b4cfbe0..57fbac64fb8 100644 --- a/tensorrt_llm/llmapi/llm.py +++ b/tensorrt_llm/llmapi/llm.py @@ -6,6 +6,7 @@ import tempfile import time import weakref +from collections.abc import Mapping from pathlib import Path from typing import Any, List, Literal, Optional, Sequence, Union @@ -15,6 +16,7 @@ from tensorrt_llm.inputs.data import TextPrompt from tensorrt_llm.inputs.multimodal import MultimodalParams from tensorrt_llm.inputs.registry import DefaultInputProcessor +from tensorrt_llm.llmapi.otel_tracing import init_tracer from .._utils import nvtx_range_debug from ..bindings import executor as tllm @@ -220,6 +222,15 @@ def __init__(self, self.mpi_session.shutdown() raise + try: + if self.args.otlp_traces_endpoint: + init_tracer("trt.llm", self.args.otlp_traces_endpoint) + logger.info( + f"Initialized OTLP tracer successfully, endpoint: {self.args.otlp_traces_endpoint}" + ) + except Exception as e: + logger.error(f"Failed to initialize OTLP tracer: {e}") + exception_handler.register(self, 'shutdown') atexit.register(LLM._shutdown_wrapper, weakref.ref(self)) @@ -324,6 +335,7 @@ def generate_async( streaming: bool = False, kv_cache_retention_config: Optional[KvCacheRetentionConfig] = None, disaggregated_params: Optional[DisaggregatedParams] = None, + trace_headers: Optional[Mapping[str, str]] = None, _postproc_params: Optional[PostprocParams] = None, scheduling_params: Optional[SchedulingParams] = None, cache_salt: Optional[str] = None, @@ -444,6 +456,7 @@ def generate_async( streaming=streaming, kv_cache_retention_config=kv_cache_retention_config, disaggregated_params=disaggregated_params, + trace_headers=trace_headers, postproc_params=_postproc_params, multimodal_params=multimodal_params, scheduling_params=scheduling_params, diff --git a/tensorrt_llm/llmapi/llm_args.py b/tensorrt_llm/llmapi/llm_args.py index 58e18ccc944..c8133908382 100644 --- a/tensorrt_llm/llmapi/llm_args.py +++ b/tensorrt_llm/llmapi/llm_args.py @@ -1388,6 +1388,11 @@ class BaseLlmArgs(StrictBaseModel): exclude=True, alias="_mpi_session") + otlp_traces_endpoint: Optional[str] = Field( + default=None, + description="Target URL to which OpenTelemetry traces will be sent.", + alias="otlp_traces_endpoint") + backend: Optional[str] = Field( default=None, description="The backend to use for this LLM instance.", diff --git a/tensorrt_llm/llmapi/otel_tracing.py b/tensorrt_llm/llmapi/otel_tracing.py new file mode 100644 index 00000000000..86ee2b43af2 --- /dev/null +++ b/tensorrt_llm/llmapi/otel_tracing.py @@ -0,0 +1,137 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. + +import os +from collections.abc import Mapping +from typing import Optional + +from tensorrt_llm._utils import run_once +from tensorrt_llm.logger import logger + +TRACE_HEADERS = ["traceparent", "tracestate"] + +_global_tracer_ = None +_is_otel_imported = False +otel_import_error_traceback: Optional[str] = None + +try: + from opentelemetry.context.context import Context + from opentelemetry.sdk.environment_variables import \ + OTEL_EXPORTER_OTLP_TRACES_PROTOCOL + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.trace import SpanKind, Tracer, set_tracer_provider + from opentelemetry.trace.propagation.tracecontext import \ + TraceContextTextMapPropagator + + _is_otel_imported = True +except ImportError: + import traceback + + otel_import_error_traceback = traceback.format_exc() + + class Context: # type: ignore + pass + + class BaseSpanAttributes: # type: ignore + pass + + class SpanKind: # type: ignore + pass + + class Tracer: # type: ignore + pass + + +def is_otel_available() -> bool: + return _is_otel_imported + + +def init_tracer(instrumenting_module_name: str, + otlp_traces_endpoint: str) -> Optional[Tracer]: + if not is_otel_available(): + raise ValueError( + "OpenTelemetry is not available. Unable to initialize " + "a tracer. Ensure OpenTelemetry packages are installed. " + f"Original error:\n{otel_import_error_traceback}") + trace_provider = TracerProvider() + span_exporter = get_span_exporter(otlp_traces_endpoint) + trace_provider.add_span_processor(BatchSpanProcessor(span_exporter)) + set_tracer_provider(trace_provider) + tracer = trace_provider.get_tracer(instrumenting_module_name) + set_global_otlp_tracer(tracer) + return tracer + + +def get_span_exporter(endpoint): + protocol = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, "grpc") + if protocol == "grpc": + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import \ + OTLPSpanExporter + elif protocol == "http/protobuf": + from opentelemetry.exporter.otlp.proto.http.trace_exporter import \ + OTLPSpanExporter # type: ignore + else: + raise ValueError( + f"Unsupported OTLP protocol '{protocol}' is configured") + return OTLPSpanExporter(endpoint=endpoint) + + +def extract_trace_context( + headers: Optional[Mapping[str, str]]) -> Optional[Context]: + if is_otel_available(): + headers = headers or {} + return TraceContextTextMapPropagator().extract(headers) + else: + return None + + +def extract_trace_headers(headers: Mapping[str, str]) -> Mapping[str, str]: + # Return only recognized trace headers with normalized lowercase keys + lower_map = {k.lower(): v for k, v in headers.items()} + return {h: lower_map[h] for h in TRACE_HEADERS if h in lower_map} + + +def global_otlp_tracer() -> Tracer: + """Get the global OTLP instance in the current process.""" + return _global_tracer_ + + +def set_global_otlp_tracer(tracer: Tracer): + """Set the global OTLP Tracer instance in the current process.""" + global _global_tracer_ + assert _global_tracer_ is None + _global_tracer_ = tracer + + +def is_tracing_enabled() -> bool: + return _global_tracer_ is not None + + +class SpanAttributes: + GEN_AI_USAGE_COMPLETION_TOKENS = "gen_ai.usage.completion_tokens" + GEN_AI_USAGE_PROMPT_TOKENS = "gen_ai.usage.prompt_tokens" + GEN_AI_REQUEST_MAX_TOKENS = "gen_ai.request.max_tokens" + GEN_AI_REQUEST_TOP_P = "gen_ai.request.top_p" + GEN_AI_REQUEST_TEMPERATURE = "gen_ai.request.temperature" + GEN_AI_REQUEST_ID = "gen_ai.request.id" + GEN_AI_REQUEST_N = "gen_ai.request.n" + GEN_AI_LATENCY_TIME_TO_FIRST_TOKEN = "gen_ai.latency.time_to_first_token" + GEN_AI_LATENCY_E2E = "gen_ai.latency.e2e" + GEN_AI_LATENCY_TIME_IN_QUEUE = "gen_ai.latency.time_in_queue" + + +def contains_trace_headers(headers: Mapping[str, str]) -> bool: + lower_keys = {k.lower() for k in headers.keys()} + return any(h in lower_keys for h in TRACE_HEADERS) + + +@run_once +def log_tracing_disabled_warning() -> None: + logger.warning( + "Received a request with trace context but tracing is disabled") + + +@run_once +def insufficient_request_metrics_warning() -> None: + logger.warning( + "Insufficient request metrics available; trace generation aborted.") diff --git a/tensorrt_llm/serve/openai_server.py b/tensorrt_llm/serve/openai_server.py index aaac4ba8cf3..c272d5c7332 100644 --- a/tensorrt_llm/serve/openai_server.py +++ b/tensorrt_llm/serve/openai_server.py @@ -5,6 +5,7 @@ import signal import traceback from collections import deque +from collections.abc import Mapping from contextlib import asynccontextmanager from datetime import datetime from http import HTTPStatus @@ -15,6 +16,7 @@ from fastapi import FastAPI, Request from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse, Response, StreamingResponse +from starlette.datastructures import Headers from starlette.routing import Mount from transformers import AutoConfig, AutoProcessor @@ -28,6 +30,10 @@ from tensorrt_llm.llmapi import MultimodalEncoder from tensorrt_llm.llmapi.disagg_utils import MetadataServerConfig, ServerRole from tensorrt_llm.llmapi.llm import RequestOutput +from tensorrt_llm.llmapi.otel_tracing import (contains_trace_headers, + extract_trace_headers, + is_tracing_enabled, + log_tracing_disabled_warning) from tensorrt_llm.logger import logger from tensorrt_llm.metrics.collector import MetricsCollector from tensorrt_llm.serve.chat_utils import (check_multiple_response, @@ -460,6 +466,8 @@ async def create_chat_response( postproc_args=postproc_args, ) + trace_headers = (None if raw_request is None else await self._get_trace_headers(raw_request.headers)) + promise = self.llm.generate_async( inputs=prompt, sampling_params=sampling_params, @@ -468,6 +476,7 @@ async def create_chat_response( lora_request=request.lora_request, disaggregated_params=disaggregated_params, cache_salt=request.cache_salt, + trace_headers=trace_headers, ) asyncio.create_task(self.await_disconnected(raw_request, promise)) if not self.postproc_worker_enabled: @@ -669,13 +678,15 @@ async def generator_wrapper(generator: AsyncIterator[Any]): if request.stream else completion_response_post_processor, postproc_args=postproc_args, ) + trace_headers = (None if raw_request is None else await self._get_trace_headers(raw_request.headers)) promise = self.llm.generate_async( inputs=prompt, sampling_params=sampling_params, _postproc_params=postproc_params, streaming=request.stream, lora_request=request.lora_request, - disaggregated_params=disaggregated_params + disaggregated_params=disaggregated_params, + trace_headers=trace_headers ) asyncio.create_task(self.await_disconnected(raw_request, promise)) if not self.postproc_worker_enabled: @@ -892,3 +903,13 @@ async def __call__(self, host, port): log_level="info", timeout_keep_alive=TIMEOUT_KEEP_ALIVE) await uvicorn.Server(config).serve() + + async def _get_trace_headers( + self, + headers: Headers, + ) -> Optional[Mapping[str, str]]: + if is_tracing_enabled(): + return extract_trace_headers(headers) + if contains_trace_headers(headers): + log_tracing_disabled_warning() + return None From 5f15841037de10a50012016c54671f84f39c350e Mon Sep 17 00:00:00 2001 From: Zhang Haotong Date: Mon, 18 Aug 2025 14:44:15 +0800 Subject: [PATCH 02/15] [feat] Modular otel_trace Signed-off-by: Zhang Haotong --- tensorrt_llm/executor/result.py | 67 +++++++++---------- tensorrt_llm/llmapi/llm.py | 4 +- .../llmapi/{otel_tracing.py => tracing.py} | 11 +++ tensorrt_llm/serve/openai_server.py | 14 ++-- 4 files changed, 50 insertions(+), 46 deletions(-) rename tensorrt_llm/llmapi/{otel_tracing.py => tracing.py} (89%) diff --git a/tensorrt_llm/executor/result.py b/tensorrt_llm/executor/result.py index b6437aa8425..8d6e81cad50 100644 --- a/tensorrt_llm/executor/result.py +++ b/tensorrt_llm/executor/result.py @@ -11,9 +11,7 @@ import torch import torch.nn.functional as F -from tensorrt_llm.llmapi.otel_tracing import ( - SpanAttributes, SpanKind, extract_trace_context, global_otlp_tracer, - insufficient_request_metrics_warning) +from tensorrt_llm.llmapi import tracing from .._utils import nvtx_range_debug from ..bindings import executor as tllm @@ -413,23 +411,23 @@ def do_tracing( output: CompletionOutput, req_perf_metrics_dict: Optional[dict[str, float]] = None, ): - if not global_otlp_tracer(): + if not tracing.global_otlp_tracer(): return metrics_dict = self.metrics_dict if not metrics_dict or not req_perf_metrics_dict: # Insufficient request metrics available; trace generation aborted. - insufficient_request_metrics_warning() + tracing.insufficient_request_metrics_warning() return - trace_context = extract_trace_context(self.trace_headers) + trace_context = tracing.extract_trace_context(self.trace_headers) sampling_params = self.sampling_params # TODO: Add request arrival time arrival_time = time.time() - metrics_dict.get(MetricNames.E2E, -1) - with global_otlp_tracer().start_as_current_span( + with tracing.global_otlp_tracer().start_as_current_span( "llm_request", - kind=SpanKind.SERVER, + kind=tracing.SpanKind.SERVER, context=trace_context, start_time=int(arrival_time * 1e9), ) as span: @@ -439,42 +437,41 @@ def safe_set_attr(span, attr, value): span.set_attribute(attr, value) e2e_time = metrics_dict.get(MetricNames.E2E, -1) - safe_set_attr( - span, - SpanAttributes.GEN_AI_REQUEST_TEMPERATURE, - sampling_params.temperature, - ) - safe_set_attr(span, SpanAttributes.GEN_AI_REQUEST_TOP_P, + safe_set_attr(span, + tracing.SpanAttributes.GEN_AI_REQUEST_TEMPERATURE, + sampling_params.temperature) + safe_set_attr(span, tracing.SpanAttributes.GEN_AI_REQUEST_TOP_P, sampling_params.top_p) + safe_set_attr(span, tracing.SpanAttributes.GEN_AI_REQUEST_TOP_K, + sampling_params.top_k) safe_set_attr( span, - SpanAttributes.GEN_AI_REQUEST_MAX_TOKENS, + tracing.SpanAttributes.GEN_AI_REQUEST_MAX_TOKENS, sampling_params.max_tokens, ) - safe_set_attr(span, SpanAttributes.GEN_AI_REQUEST_N, + safe_set_attr(span, tracing.SpanAttributes.GEN_AI_REQUEST_N, sampling_params.n) - # TODO: Add prompt info in result base - safe_set_attr( - span, - SpanAttributes.GEN_AI_USAGE_PROMPT_TOKENS, - getattr(self.postproc_params.postproc_args, "num_prompt_tokens", - None) if self.postproc_params - and self.postproc_params.postproc_args else None, - ) - safe_set_attr(span, SpanAttributes.GEN_AI_USAGE_COMPLETION_TOKENS, + safe_set_attr(span, tracing.SpanAttributes.GEN_AI_REQUEST_ID, + self.id) + if prompt_token_ids := getattr(self, "prompt_token_ids", None): + safe_set_attr(span, + tracing.SpanAttributes.GEN_AI_USAGE_PROMPT_TOKENS, + len(prompt_token_ids)) + safe_set_attr(span, + tracing.SpanAttributes.GEN_AI_USAGE_COMPLETION_TOKENS, output.length) safe_set_attr( - span, - SpanAttributes.GEN_AI_LATENCY_TIME_TO_FIRST_TOKEN, - metrics_dict.get(MetricNames.TTFT, -1), - ) - safe_set_attr(span, SpanAttributes.GEN_AI_LATENCY_E2E, e2e_time) - safe_set_attr(span, SpanAttributes.GEN_AI_REQUEST_ID, self.id) + span, tracing.SpanAttributes.GEN_AI_LATENCY_TIME_TO_FIRST_TOKEN, + metrics_dict.get(MetricNames.TTFT, -1)) + safe_set_attr(span, tracing.SpanAttributes.GEN_AI_LATENCY_E2E, + e2e_time) + safe_set_attr(span, + tracing.SpanAttributes.GEN_AI_LATENCY_TIME_IN_QUEUE, + metrics_dict.get(MetricNames.REQUEST_QUEUE_TIME, -1)) safe_set_attr( - span, - SpanAttributes.GEN_AI_LATENCY_TIME_IN_QUEUE, - metrics_dict.get(MetricNames.REQUEST_QUEUE_TIME, -1), - ) + span, tracing.SpanAttributes.GEN_AI_RESPONSE_FINISH_REASONS, + json.dumps([output.finish_reason]) + if output.finish_reason else None) class DetokenizedGenerationResultBase(GenerationResultBase): diff --git a/tensorrt_llm/llmapi/llm.py b/tensorrt_llm/llmapi/llm.py index 57fbac64fb8..3f5a4cc2541 100644 --- a/tensorrt_llm/llmapi/llm.py +++ b/tensorrt_llm/llmapi/llm.py @@ -16,7 +16,7 @@ from tensorrt_llm.inputs.data import TextPrompt from tensorrt_llm.inputs.multimodal import MultimodalParams from tensorrt_llm.inputs.registry import DefaultInputProcessor -from tensorrt_llm.llmapi.otel_tracing import init_tracer +from tensorrt_llm.llmapi import tracing from .._utils import nvtx_range_debug from ..bindings import executor as tllm @@ -224,7 +224,7 @@ def __init__(self, try: if self.args.otlp_traces_endpoint: - init_tracer("trt.llm", self.args.otlp_traces_endpoint) + tracing.init_tracer("trt.llm", self.args.otlp_traces_endpoint) logger.info( f"Initialized OTLP tracer successfully, endpoint: {self.args.otlp_traces_endpoint}" ) diff --git a/tensorrt_llm/llmapi/otel_tracing.py b/tensorrt_llm/llmapi/tracing.py similarity index 89% rename from tensorrt_llm/llmapi/otel_tracing.py rename to tensorrt_llm/llmapi/tracing.py index 86ee2b43af2..237fee4b311 100644 --- a/tensorrt_llm/llmapi/otel_tracing.py +++ b/tensorrt_llm/llmapi/tracing.py @@ -1,5 +1,13 @@ # Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +__all__ = [ + 'SpanAttributes', 'SpanKind', 'contains_trace_headers', + 'extract_trace_context', 'extract_trace_headers', 'get_span_exporter', + 'global_otlp_tracer', 'init_tracer', 'insufficient_request_metrics_warning', + 'is_otel_available', 'is_tracing_enabled', 'log_tracing_disabled_warning', + 'set_global_otlp_tracer' +] + import os from collections.abc import Mapping from typing import Optional @@ -7,6 +15,7 @@ from tensorrt_llm._utils import run_once from tensorrt_llm.logger import logger +# Adapted from https://github.com/vllm-project/vllm/blob/v0.10.0rc1/vllm/tracing.py#L11 TRACE_HEADERS = ["traceparent", "tracestate"] _global_tracer_ = None @@ -112,12 +121,14 @@ class SpanAttributes: GEN_AI_USAGE_PROMPT_TOKENS = "gen_ai.usage.prompt_tokens" GEN_AI_REQUEST_MAX_TOKENS = "gen_ai.request.max_tokens" GEN_AI_REQUEST_TOP_P = "gen_ai.request.top_p" + GEN_AI_REQUEST_TOP_K = "gen_ai.request.top_k" GEN_AI_REQUEST_TEMPERATURE = "gen_ai.request.temperature" GEN_AI_REQUEST_ID = "gen_ai.request.id" GEN_AI_REQUEST_N = "gen_ai.request.n" GEN_AI_LATENCY_TIME_TO_FIRST_TOKEN = "gen_ai.latency.time_to_first_token" GEN_AI_LATENCY_E2E = "gen_ai.latency.e2e" GEN_AI_LATENCY_TIME_IN_QUEUE = "gen_ai.latency.time_in_queue" + GEN_AI_RESPONSE_FINISH_REASONS = "gen_ai.response.finish_reasons" def contains_trace_headers(headers: Mapping[str, str]) -> bool: diff --git a/tensorrt_llm/serve/openai_server.py b/tensorrt_llm/serve/openai_server.py index c272d5c7332..9a455c6037a 100644 --- a/tensorrt_llm/serve/openai_server.py +++ b/tensorrt_llm/serve/openai_server.py @@ -27,13 +27,9 @@ from tensorrt_llm.inputs import prompt_inputs from tensorrt_llm.inputs.utils import ConversationMessage, apply_chat_template from tensorrt_llm.llmapi import DisaggregatedParams as LlmDisaggregatedParams -from tensorrt_llm.llmapi import MultimodalEncoder +from tensorrt_llm.llmapi import MultimodalEncoder, tracing from tensorrt_llm.llmapi.disagg_utils import MetadataServerConfig, ServerRole from tensorrt_llm.llmapi.llm import RequestOutput -from tensorrt_llm.llmapi.otel_tracing import (contains_trace_headers, - extract_trace_headers, - is_tracing_enabled, - log_tracing_disabled_warning) from tensorrt_llm.logger import logger from tensorrt_llm.metrics.collector import MetricsCollector from tensorrt_llm.serve.chat_utils import (check_multiple_response, @@ -908,8 +904,8 @@ async def _get_trace_headers( self, headers: Headers, ) -> Optional[Mapping[str, str]]: - if is_tracing_enabled(): - return extract_trace_headers(headers) - if contains_trace_headers(headers): - log_tracing_disabled_warning() + if tracing.is_tracing_enabled(): + return tracing.extract_trace_headers(headers) + if tracing.contains_trace_headers(headers): + tracing.log_tracing_disabled_warning() return None From ee83e801f419e821e80aa093cdcc4c044e6f0a37 Mon Sep 17 00:00:00 2001 From: Zhang Haotong Date: Thu, 21 Aug 2025 21:07:34 +0800 Subject: [PATCH 03/15] [feat] Add trace to disagg server and add kv_cache info Signed-off-by: Zhang Haotong --- tensorrt_llm/executor/result.py | 52 ++++++++++-- tensorrt_llm/executor/worker.py | 10 ++- tensorrt_llm/llmapi/disagg_utils.py | 15 +++- tensorrt_llm/llmapi/llm.py | 5 ++ tensorrt_llm/llmapi/tracing.py | 86 +++++++++++++++++-- tensorrt_llm/metrics/enums.py | 4 + tensorrt_llm/serve/openai_disagg_server.py | 98 +++++++++++++++------- tensorrt_llm/serve/openai_server.py | 16 +--- 8 files changed, 222 insertions(+), 64 deletions(-) diff --git a/tensorrt_llm/executor/result.py b/tensorrt_llm/executor/result.py index 8d6e81cad50..1a1191d9a5b 100644 --- a/tensorrt_llm/executor/result.py +++ b/tensorrt_llm/executor/result.py @@ -166,7 +166,7 @@ def __init__(self, self.avg_decoded_tokens_per_iter: Optional[float] = None self._done = False self.metrics_dict = {} - self.trace_headers = None + self.trace_headers: Optional[dict[str, str]] = None if has_event_loop(): self.aqueue = AsyncQueue() @@ -324,7 +324,7 @@ def _handle_response(self, else: self._outputs[0]._postprocess_result = response.res if response.metrics: - self.metrics_dict = response.metrics + self.metrics_dict.update(response.metrics) if response.error: if self._background_error_handler is not None and ( @@ -404,7 +404,7 @@ def record_stats(self, stats, len(output.token_ids), self.sampling_params.n > 1) if processed_metrics_stat: metrics_stats.update(processed_metrics_stat) - self.metrics_dict = metrics_stats + self.metrics_dict.update(metrics_stats) def do_tracing( self, @@ -423,20 +423,29 @@ def do_tracing( trace_context = tracing.extract_trace_context(self.trace_headers) sampling_params = self.sampling_params - # TODO: Add request arrival time - arrival_time = time.time() - metrics_dict.get(MetricNames.E2E, -1) + # Since arrival_time and other timing metrics are based on different time origins, + # we need to apply corrections to align them with absolute timestamps + time_correction = 0 + arrival_timestamp = metrics_dict.get(MetricNames.ARRIVAL_TIMESTAMP, 0) + arrival_time = req_perf_metrics_dict.get( + RequestEventTiming.ARRIVAL_TIME, 0) + if arrival_timestamp > 0: + time_correction = arrival_timestamp - arrival_time + else: + time_correction = time.time() - metrics_dict.get( + MetricNames.E2E, -1) - arrival_time + with tracing.global_otlp_tracer().start_as_current_span( "llm_request", kind=tracing.SpanKind.SERVER, context=trace_context, - start_time=int(arrival_time * 1e9), + start_time=int((arrival_time + time_correction) * 1e9), ) as span: def safe_set_attr(span, attr, value): if value is not None: span.set_attribute(attr, value) - e2e_time = metrics_dict.get(MetricNames.E2E, -1) safe_set_attr(span, tracing.SpanAttributes.GEN_AI_REQUEST_TEMPERATURE, sampling_params.temperature) @@ -464,7 +473,7 @@ def safe_set_attr(span, attr, value): span, tracing.SpanAttributes.GEN_AI_LATENCY_TIME_TO_FIRST_TOKEN, metrics_dict.get(MetricNames.TTFT, -1)) safe_set_attr(span, tracing.SpanAttributes.GEN_AI_LATENCY_E2E, - e2e_time) + metrics_dict.get(MetricNames.E2E, -1)) safe_set_attr(span, tracing.SpanAttributes.GEN_AI_LATENCY_TIME_IN_QUEUE, metrics_dict.get(MetricNames.REQUEST_QUEUE_TIME, -1)) @@ -472,6 +481,33 @@ def safe_set_attr(span, attr, value): span, tracing.SpanAttributes.GEN_AI_RESPONSE_FINISH_REASONS, json.dumps([output.finish_reason]) if output.finish_reason else None) + safe_set_attr( + span, + tracing.SpanAttributes.GEN_AI_LATENCY_KV_CACHE_TRANSFER_TIME, + req_perf_metrics_dict.get( + RequestEventTiming.KV_CACHE_TRANSFER_END, 0.0) - + req_perf_metrics_dict.get( + RequestEventTiming.KV_CACHE_TRANSFER_START, 0.0)) + + if req_perf_metrics_dict.get( + RequestEventTiming.KV_CACHE_TRANSFER_START, + 0) and req_perf_metrics_dict.get( + RequestEventTiming.KV_CACHE_TRANSFER_END, 0): + tracing.add_event( + tracing.SpanEvents.KV_CACHE_TRANSFER_START, + timestamp=int((req_perf_metrics_dict.get( + RequestEventTiming.KV_CACHE_TRANSFER_START, 0.0) + + time_correction) * 1e9)) + tracing.add_event( + tracing.SpanEvents.KV_CACHE_TRANSFER_END, + attributes={ + "kv_cache_size": + req_perf_metrics_dict.get( + RequestEventTiming.KV_CACHE_SIZE, 0) + }, + timestamp=int((req_perf_metrics_dict.get( + RequestEventTiming.KV_CACHE_TRANSFER_END, 0.0) + + time_correction) * 1e9)) class DetokenizedGenerationResultBase(GenerationResultBase): diff --git a/tensorrt_llm/executor/worker.py b/tensorrt_llm/executor/worker.py index b636985c137..adde856f5a1 100644 --- a/tensorrt_llm/executor/worker.py +++ b/tensorrt_llm/executor/worker.py @@ -1153,7 +1153,15 @@ def _get_metrics_dict( req_perf_metrics.timing_metrics.first_scheduled_time. total_seconds(), RequestEventTiming.LAST_TOKEN_TIME: - req_perf_metrics.timing_metrics.last_token_time.total_seconds() + req_perf_metrics.timing_metrics.last_token_time.total_seconds(), + RequestEventTiming.KV_CACHE_TRANSFER_START: + req_perf_metrics.timing_metrics.kv_cache_transfer_start. + total_seconds(), + RequestEventTiming.KV_CACHE_TRANSFER_END: + req_perf_metrics.timing_metrics.kv_cache_transfer_end. + total_seconds(), + RequestEventTiming.KV_CACHE_SIZE: + req_perf_metrics.timing_metrics.kv_cache_size, } return metrics_dict diff --git a/tensorrt_llm/llmapi/disagg_utils.py b/tensorrt_llm/llmapi/disagg_utils.py index 8404cbaf7ad..7fb1c8b7dae 100644 --- a/tensorrt_llm/llmapi/disagg_utils.py +++ b/tensorrt_llm/llmapi/disagg_utils.py @@ -43,6 +43,12 @@ class ConditionalDisaggConfig(): max_local_prefill_length: int = 0 +@dataclass +class ObservabilityConfig(): + otlp_traces_endpoint: Optional[str] = None + """Target URL to which OpenTelemetry traces will be sent.""" + + @dataclass class DisaggServerConfig(): server_configs: List[CtxGenServerConfig] @@ -52,6 +58,7 @@ class DisaggServerConfig(): gen_router_config: Optional[RouterConfig] = None conditional_disagg_config: Optional[ConditionalDisaggConfig] = None max_retries: int = 1 + observability_config: Optional[ObservabilityConfig] = None perf_metrics_max_requests: int = 0 @@ -96,6 +103,7 @@ def extract_disagg_cfg(hostname: str = 'localhost', context_servers: Optional[dict] = None, generation_servers: Optional[dict] = None, conditional_disagg_config: Optional[dict] = None, + observability_config: Optional[dict] = None, **kwargs: Any) -> DisaggServerConfig: context_servers = context_servers or {} generation_servers = generation_servers or {} @@ -129,10 +137,13 @@ def extract_disagg_cfg(hostname: str = 'localhost', conditional_disagg_config = ConditionalDisaggConfig( **conditional_disagg_config) if conditional_disagg_config else None + observability_config = ObservabilityConfig( + **observability_config) if observability_config else None + config = DisaggServerConfig(server_configs, hostname, port, ctx_router_config, gen_router_config, - conditional_disagg_config, max_retries, - perf_metrics_max_requests) + conditional_disagg_config, observability_config, + max_retries, perf_metrics_max_requests) return config diff --git a/tensorrt_llm/llmapi/llm.py b/tensorrt_llm/llmapi/llm.py index 3f5a4cc2541..c59936ac2aa 100644 --- a/tensorrt_llm/llmapi/llm.py +++ b/tensorrt_llm/llmapi/llm.py @@ -17,6 +17,7 @@ from tensorrt_llm.inputs.multimodal import MultimodalParams from tensorrt_llm.inputs.registry import DefaultInputProcessor from tensorrt_llm.llmapi import tracing +from tensorrt_llm.metrics.enums import MetricNames from .._utils import nvtx_range_debug from ..bindings import executor as tllm @@ -463,6 +464,10 @@ def generate_async( cache_salt_id=cache_salt_id, ) + if sampling_params.return_perf_metrics: + result.metrics_dict.update( + {MetricNames.ARRIVAL_TIMESTAMP: time.time()}) + return RequestOutput._from_generation_result(result, prompt, self.tokenizer) diff --git a/tensorrt_llm/llmapi/tracing.py b/tensorrt_llm/llmapi/tracing.py index 237fee4b311..740efde4ef2 100644 --- a/tensorrt_llm/llmapi/tracing.py +++ b/tensorrt_llm/llmapi/tracing.py @@ -2,13 +2,15 @@ __all__ = [ 'SpanAttributes', 'SpanKind', 'contains_trace_headers', - 'extract_trace_context', 'extract_trace_headers', 'get_span_exporter', - 'global_otlp_tracer', 'init_tracer', 'insufficient_request_metrics_warning', - 'is_otel_available', 'is_tracing_enabled', 'log_tracing_disabled_warning', - 'set_global_otlp_tracer' + 'extract_trace_context', 'get_span_exporter', 'global_otlp_tracer', + 'init_tracer', 'insufficient_request_metrics_warning', 'is_otel_available', + 'is_tracing_enabled', 'log_tracing_disabled_warning', + 'set_global_otlp_tracer', 'extract_trace_headers' ] +import functools import os +import typing from collections.abc import Mapping from typing import Optional @@ -28,9 +30,11 @@ OTEL_EXPORTER_OTLP_TRACES_PROTOCOL from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor - from opentelemetry.trace import SpanKind, Tracer, set_tracer_provider + from opentelemetry.trace import (SpanKind, Status, StatusCode, Tracer, + get_current_span, set_tracer_provider) from opentelemetry.trace.propagation.tracecontext import \ TraceContextTextMapPropagator + from opentelemetry.util import types _is_otel_imported = True except ImportError: @@ -94,10 +98,23 @@ def extract_trace_context( return None -def extract_trace_headers(headers: Mapping[str, str]) -> Mapping[str, str]: - # Return only recognized trace headers with normalized lowercase keys - lower_map = {k.lower(): v for k, v in headers.items()} - return {h: lower_map[h] for h in TRACE_HEADERS if h in lower_map} +def extract_trace_headers( + headers: Mapping[str, str]) -> Optional[Mapping[str, str]]: + if is_tracing_enabled(): + # Return only recognized trace headers with normalized lowercase keys + lower_map = {k.lower(): v for k, v in headers.items()} + return {h: lower_map[h] for h in TRACE_HEADERS if h in lower_map} + if contains_trace_headers(headers): + log_tracing_disabled_warning() + return None + + +def inject_trace_headers(headers: Mapping[str, str]) -> Mapping[str, str]: + if is_tracing_enabled(): + trace_headers = extract_trace_headers(headers) if not headers else {} + TraceContextTextMapPropagator().inject(trace_headers) + return trace_headers + return None def global_otlp_tracer() -> Tracer: @@ -128,14 +145,31 @@ class SpanAttributes: GEN_AI_LATENCY_TIME_TO_FIRST_TOKEN = "gen_ai.latency.time_to_first_token" GEN_AI_LATENCY_E2E = "gen_ai.latency.e2e" GEN_AI_LATENCY_TIME_IN_QUEUE = "gen_ai.latency.time_in_queue" + GEN_AI_LATENCY_KV_CACHE_TRANSFER_TIME = "gen_ai.latency.kv_cache_transfer_time" GEN_AI_RESPONSE_FINISH_REASONS = "gen_ai.response.finish_reasons" +class SpanEvents: + KV_CACHE_TRANSFER_START = "kv_cache_transfer_start" + KV_CACHE_TRANSFER_END = "kv_cache_transfer_end" + CTX_SERVER_SELECTED = "ctx_server.selected" + GEN_SERVER_SELECTED = "gen_server.selected" + + def contains_trace_headers(headers: Mapping[str, str]) -> bool: lower_keys = {k.lower() for k in headers.keys()} return any(h in lower_keys for h in TRACE_HEADERS) +def add_event(name: str, + attributes: types.Attributes = None, + timestamp: typing.Optional[int] = None) -> None: + """Add an event to the current span if tracing is available.""" + if not is_tracing_enabled(): + return + get_current_span().add_event(name, attributes, timestamp) + + @run_once def log_tracing_disabled_warning() -> None: logger.warning( @@ -146,3 +180,37 @@ def log_tracing_disabled_warning() -> None: def insufficient_request_metrics_warning() -> None: logger.warning( "Insufficient request metrics available; trace generation aborted.") + + +def trace_span(name: str = None): + + def decorator(func): + + @functools.wraps(func) + async def async_wrapper(*args, **kwargs): + span_name = name if name is not None else func.__name__ + if global_otlp_tracer() is None: + return await func(*args, **kwargs) + + trace_headers = None + for arg in list(args) + list(kwargs.values()): + if hasattr(arg, 'headers'): + trace_headers = extract_trace_context(arg.headers) + break + + with global_otlp_tracer().start_as_current_span( + span_name, kind=SpanKind.SERVER, + context=trace_headers) as span: + try: + result = await func(*args, **kwargs) + span.set_status(Status(StatusCode.OK)) + return result + except Exception as e: + span.record_exception(e) + span.set_status( + Status(StatusCode.ERROR, f"An error occurred: {e}")) + raise e + + return async_wrapper + + return decorator diff --git a/tensorrt_llm/metrics/enums.py b/tensorrt_llm/metrics/enums.py index 5ce982281bc..e5beaeea7f9 100644 --- a/tensorrt_llm/metrics/enums.py +++ b/tensorrt_llm/metrics/enums.py @@ -6,6 +6,7 @@ class MetricNames(Enum): TPOT = "tpot" E2E = "e2e" REQUEST_QUEUE_TIME = "request_queue_time" + ARRIVAL_TIMESTAMP = 'arrival_timestamp' class RequestEventTiming(Enum): @@ -13,3 +14,6 @@ class RequestEventTiming(Enum): FIRST_TOKEN_TIME = "first_token_time" # nosec: B105 FIRST_SCHEDULED_TIME = "first_scheduled_time" LAST_TOKEN_TIME = "last_token_time" # nosec: B105 + KV_CACHE_TRANSFER_START = "kv_cache_transfer_start" + KV_CACHE_TRANSFER_END = "kv_cache_transfer_end" + KV_CACHE_SIZE = "kv_cache_size" diff --git a/tensorrt_llm/serve/openai_disagg_server.py b/tensorrt_llm/serve/openai_disagg_server.py index 495724b2928..d741104f87a 100644 --- a/tensorrt_llm/serve/openai_disagg_server.py +++ b/tensorrt_llm/serve/openai_disagg_server.py @@ -6,13 +6,14 @@ import signal import traceback from collections import deque +from collections.abc import Mapping from contextlib import asynccontextmanager from http import HTTPStatus from typing import Callable, Optional, Type, Union import aiohttp import uvicorn -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI, HTTPException, Request from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse, Response, StreamingResponse from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR @@ -21,7 +22,9 @@ from tensorrt_llm.executor import CppExecutorError from tensorrt_llm.llmapi.disagg_utils import (DisaggServerConfig, MetadataServerConfig, + ObservabilityConfig, get_ctx_gen_server_urls) +from tensorrt_llm.llmapi import tracing from tensorrt_llm.logger import logger from tensorrt_llm.serve.metadata_server import create_metadata_server from tensorrt_llm.serve.openai_protocol import (ChatCompletionRequest, @@ -52,6 +55,16 @@ def __init__(self, self.gen_router = create_router( config.gen_router_config, self.gen_servers, metadata_server_cfg, self.metadata_server) self.conditional_disagg_config = config.conditional_disagg_config + self.observability_cfg = config.observability_config + + try: + if self.observability_cfg and self.observability_cfg.otlp_traces_endpoint: + tracing.init_tracer("trt.llm", self.observability_cfg.otlp_traces_endpoint) + logger.info( + f"Initialized OTLP tracer successfully, endpoint: {self.observability_cfg.otlp_traces_endpoint}" + ) + except Exception as e: + logger.error(f"Failed to initialize OTLP tracer: {e}") self.perf_metrics_max_requests = config.perf_metrics_max_requests if self.perf_metrics_max_requests > 0: @@ -243,7 +256,8 @@ async def perf_metrics(self) -> JSONResponse: async def merge_streaming_responses(self, ctx_response, gen_server: str, - gen_req: Union[CompletionRequest, ChatCompletionRequest]): + gen_req: Union[CompletionRequest, ChatCompletionRequest], + trace_headers: Optional[Mapping[str, str]] = None): try: if ctx_response is not None and len(ctx_response.choices) != 1: raise ValueError("Context server did not return a single choice. This is not expected") @@ -255,9 +269,9 @@ async def merge_streaming_responses(self, ctx_response, # Then yield the generation responses await self._increment_metric("gen_total_requests") if isinstance(gen_req, CompletionRequest): - gen_response = await self.send_completion_request(gen_server, gen_req) + gen_response = await self.send_completion_request(gen_server, gen_req, trace_headers) elif isinstance(gen_req, ChatCompletionRequest): - gen_response = await self.send_chat_request(gen_server, gen_req) + gen_response = await self.send_chat_request(gen_server, gen_req, trace_headers) else: raise TypeError("Invalid request type: {type(gen_req).__name__}") @@ -268,7 +282,8 @@ async def merge_streaming_responses(self, ctx_response, finally: await self.gen_router.finish_request(gen_req) - async def openai_completion(self, req: CompletionRequest) -> Response: + @tracing.trace_span("llm_request") + async def openai_completion(self, req: CompletionRequest, raw_request: Request) -> Response: try: if not isinstance(req.prompt, str): # Check if it's a list and contains integers @@ -277,15 +292,16 @@ async def openai_completion(self, req: CompletionRequest) -> Response: elif not isinstance(req.prompt, list) or not all(isinstance(x, int) for x in req.prompt): raise ValueError("Disaggregated server currently only supports single string prompt or list of integers in request") - return await self._send_disagg_request(req) + return await self._send_disagg_request(req, raw_request) except Exception as e: await self._handle_exception(e) - async def openai_chat_completion(self, req: ChatCompletionRequest) -> Response: + @tracing.trace_span("llm_request") + async def openai_chat_completion(self, req: ChatCompletionRequest, raw_request: Request) -> Response: try: - return await self._send_disagg_request(req) + return await self._send_disagg_request(req, raw_request) except Exception as e: await self._handle_exception(e) @@ -299,7 +315,8 @@ async def _handle_exception(self, exception): logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=f"Internal server error {str(exception)}") - async def _send_context_request(self, ctx_server: str, ctx_req: Union[CompletionRequest, ChatCompletionRequest]): + async def _send_context_request(self, ctx_server: str, ctx_req: Union[CompletionRequest, ChatCompletionRequest], + trace_headers: Optional[Mapping[str, str]] = None): ctx_req.disaggregated_params = DisaggregatedParams(request_type="context_only") ctx_req.stream = False @@ -309,10 +326,10 @@ async def _send_context_request(self, ctx_server: str, ctx_req: Union[Completion await self._increment_metric("ctx_total_requests") try: if isinstance(ctx_req, ChatCompletionRequest): - ctx_response = await self.send_chat_request(ctx_server, ctx_req) + ctx_response = await self.send_chat_request(ctx_server, ctx_req, trace_headers) else: assert isinstance(ctx_req, CompletionRequest) - ctx_response = await self.send_completion_request(ctx_server, ctx_req) + ctx_response = await self.send_completion_request(ctx_server, ctx_req, trace_headers) finally: await self.ctx_router.finish_request(ctx_req) await self._increment_metric("ctx_completed_requests") @@ -327,9 +344,11 @@ async def _send_context_request(self, ctx_server: str, ctx_req: Union[Completion return ctx_response - async def _send_disagg_request(self, req: Union[CompletionRequest, ChatCompletionRequest]): + + async def _send_disagg_request(self, req: Union[CompletionRequest, ChatCompletionRequest], raw_request: Request): gen_server = None need_ctx = False + trace_headers = tracing.inject_trace_headers(raw_request.headers) try: # Determine if need context server condition = self.conditional_disagg_config @@ -358,8 +377,11 @@ async def _send_disagg_request(self, req: Union[CompletionRequest, ChatCompletio if need_ctx: ctx_req = copy.deepcopy(req) ctx_server, _ = await self.ctx_router.get_next_server(ctx_req) + #todo: rename event to something more descriptive + tracing.add_event(tracing.SpanEvents.CTX_SERVER_SELECTED, attributes={"server": str(ctx_server),}) + # TODO: add ctx_server info into generation request for pre-registration - ctx_response = await self._send_context_request(ctx_server, ctx_req) + ctx_response = await self._send_context_request(ctx_server, ctx_req, trace_headers) if ctx_response is not None and len(ctx_response.choices) != 1: raise ValueError("Context server did not return a single choice. This is not expected") @@ -382,6 +404,7 @@ async def _send_disagg_request(self, req: Union[CompletionRequest, ChatCompletio if gen_server is None: gen_server, _ = await self.gen_router.get_next_server(req) logger.debug("Sending request to gen server: %s", gen_server) + tracing.add_event(tracing.SpanEvents.GEN_SERVER_SELECTED,attributes={"server": str(gen_server),}) if need_ctx and self.perf_metrics_keys is not None: asyncio.create_task(self._add_perf_metrics_keys( @@ -396,10 +419,10 @@ async def _send_disagg_request(self, req: Union[CompletionRequest, ChatCompletio else: await self._increment_metric("gen_total_requests") if isinstance(req, CompletionRequest): - gen_response = await self.send_completion_request(gen_server, req) + gen_response = await self.send_completion_request(gen_server, req, trace_headers) else: assert isinstance(req, ChatCompletionRequest) - gen_response = await self.send_chat_request(gen_server, req) + gen_response = await self.send_chat_request(gen_server, req, trace_headers) await self._increment_metric("gen_completed_requests") return gen_response finally: @@ -409,7 +432,7 @@ async def _send_disagg_request(self, req: Union[CompletionRequest, ChatCompletio else: # Return a streaming response that combines both context and generation responses return StreamingResponse( - self.merge_streaming_responses(ctx_response, gen_server, req), + self.merge_streaming_responses(ctx_response, gen_server, req, trace_headers), media_type="text/event-stream" ) except: @@ -426,8 +449,15 @@ async def __call__(self, host, port): timeout_keep_alive=TIMEOUT_KEEP_ALIVE) await uvicorn.Server(config).serve() - async def create_generator(self, url: str, request: Union[CompletionRequest, ChatCompletionRequest], end_point: str): - async with self.session.post(url + end_point, json=request.model_dump(exclude_unset=True)) as response: + async def create_generator(self, url: str, request: Union[CompletionRequest, ChatCompletionRequest], + end_point: str, trace_headers: Optional[Mapping[str, str]] = None): + # Prepare headers + headers = {"Content-Type": "application/json"} + if trace_headers: + headers.update(trace_headers) + + async with self.session.post(url + end_point, json=request.model_dump(exclude_unset=True), + headers=headers) as response: content_type = response.headers.get("Content-Type", "") if "text/event-stream" in content_type: if not request.stream: @@ -442,26 +472,33 @@ async def create_generator(self, url: str, request: Union[CompletionRequest, Cha logger.error(f"Unexpected error in stream: {e}") raise - async def create_completion_generator(self, url: str, request: CompletionRequest): - async for chunk in self.create_generator(url, request, "/v1/completions"): + async def create_completion_generator(self, url: str, request: CompletionRequest, + trace_headers: Optional[Mapping[str, str]] = None): + async for chunk in self.create_generator(url, request, "/v1/completions", trace_headers): yield chunk - async def create_chat_generator(self, url: str, request: ChatCompletionRequest): - async for chunk in self.create_generator(url, request, "/v1/chat/completions"): + async def create_chat_generator(self, url: str, request: ChatCompletionRequest, + trace_headers: Optional[Mapping[str, str]] = None): + async for chunk in self.create_generator(url, request, "/v1/chat/completions", trace_headers): yield chunk async def send_request(self, url: str, request: Union[CompletionRequest, ChatCompletionRequest], endpoint: str, response_type: Type[Union[CompletionResponse, ChatCompletionResponse]], - create_generator: Callable) -> Union[CompletionResponse, ChatCompletionResponse, StreamingResponse]: + create_generator: Callable, + trace_headers: Optional[Mapping[str, str]] = None) -> Union[CompletionResponse, ChatCompletionResponse, StreamingResponse]: for attempt in range(self.max_retries + 1): try: + headers = {"Content-Type": "application/json"} + if trace_headers: + headers.update(trace_headers) if request.stream: - response_generator = create_generator(url, request) + response_generator = create_generator(url, request, headers) return StreamingResponse(content=response_generator, media_type="text/event-stream") else: - async with self.session.post(url + endpoint, json=request.model_dump(exclude_unset=True)) as response: + async with self.session.post(url + endpoint, json=request.model_dump(exclude_unset=True), + headers=headers) as response: content_type = response.headers.get("Content-Type", "") if "text/event-stream" in content_type: raise ValueError("Received an event-stream although request stream was False") @@ -481,12 +518,13 @@ async def send_request(self, url: str, logger.error(f"Error encountered while processing request to {url+endpoint}: {e}") raise + async def send_completion_request(self, url: str, request: CompletionRequest, + trace_headers: Optional[Mapping[str, str]] = None) -> Union[CompletionResponse, StreamingResponse]: + return await self.send_request(url, request, "/v1/completions", CompletionResponse, self.create_completion_generator, trace_headers) - async def send_completion_request(self, url: str, request: CompletionRequest) -> Union[CompletionResponse, StreamingResponse]: - return await self.send_request(url, request, "/v1/completions", CompletionResponse, self.create_completion_generator) - - async def send_chat_request(self, url: str, request: ChatCompletionRequest) -> ChatCompletionResponse: - return await self.send_request(url, request, "/v1/chat/completions", ChatCompletionResponse, self.create_chat_generator) + async def send_chat_request(self, url: str, request: ChatCompletionRequest, + trace_headers: Optional[Mapping[str, str]] = None) -> ChatCompletionResponse: + return await self.send_request(url, request, "/v1/chat/completions", ChatCompletionResponse, self.create_chat_generator, trace_headers) @classmethod async def check_server_ready(cls, session: aiohttp.ClientSession, server_url: str) -> bool: diff --git a/tensorrt_llm/serve/openai_server.py b/tensorrt_llm/serve/openai_server.py index 9a455c6037a..635b056086f 100644 --- a/tensorrt_llm/serve/openai_server.py +++ b/tensorrt_llm/serve/openai_server.py @@ -5,7 +5,6 @@ import signal import traceback from collections import deque -from collections.abc import Mapping from contextlib import asynccontextmanager from datetime import datetime from http import HTTPStatus @@ -16,7 +15,6 @@ from fastapi import FastAPI, Request from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse, Response, StreamingResponse -from starlette.datastructures import Headers from starlette.routing import Mount from transformers import AutoConfig, AutoProcessor @@ -462,7 +460,7 @@ async def create_chat_response( postproc_args=postproc_args, ) - trace_headers = (None if raw_request is None else await self._get_trace_headers(raw_request.headers)) + trace_headers = (None if raw_request is None else tracing.extract_trace_headers(raw_request.headers)) promise = self.llm.generate_async( inputs=prompt, @@ -674,7 +672,7 @@ async def generator_wrapper(generator: AsyncIterator[Any]): if request.stream else completion_response_post_processor, postproc_args=postproc_args, ) - trace_headers = (None if raw_request is None else await self._get_trace_headers(raw_request.headers)) + trace_headers = (None if raw_request is None else tracing.extract_trace_headers(raw_request.headers)) promise = self.llm.generate_async( inputs=prompt, sampling_params=sampling_params, @@ -899,13 +897,3 @@ async def __call__(self, host, port): log_level="info", timeout_keep_alive=TIMEOUT_KEEP_ALIVE) await uvicorn.Server(config).serve() - - async def _get_trace_headers( - self, - headers: Headers, - ) -> Optional[Mapping[str, str]]: - if tracing.is_tracing_enabled(): - return tracing.extract_trace_headers(headers) - if tracing.contains_trace_headers(headers): - tracing.log_tracing_disabled_warning() - return None From 4d3a59aab29e1a9f432bc8c565bee3e888a36608 Mon Sep 17 00:00:00 2001 From: Zhang Haotong Date: Mon, 25 Aug 2025 21:35:03 +0800 Subject: [PATCH 04/15] [docs] Add openTelemetry integration guide Signed-off-by: Zhang Haotong --- examples/opentelemetry/README.md | 85 ++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 examples/opentelemetry/README.md diff --git a/examples/opentelemetry/README.md b/examples/opentelemetry/README.md new file mode 100644 index 00000000000..18d264dbfb2 --- /dev/null +++ b/examples/opentelemetry/README.md @@ -0,0 +1,85 @@ +# OpenTelemetry Integration Guide + +This guide explains how to setup OpenTelemetry tracing in TensorRT-LLM to monitor and debug your LLM inference services. + +## Install OpenTelemetry + +Install the required OpenTelemetry packages: + +```bash +pip install \ + 'opentelemetry-sdk' \ + 'opentelemetry-api' \ + 'opentelemetry-exporter-otlp' \ + 'opentelemetry-semantic-conventions-ai' +``` + +## Start Jaeger + +You can start Jaeger with Docker: + +```bash +docker run --rm --name jaeger \ + -e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \ + -p 6831:6831/udp \ + -p 6832:6832/udp \ + -p 5778:5778 \ + -p 16686:16686 \ + -p 4317:4317 \ + -p 4318:4318 \ + -p 14250:14250 \ + -p 14268:14268 \ + -p 14269:14269 \ + -p 9411:9411 \ + jaegertracing/all-in-one:1.57.0 +``` + +Or run the jaeger-all-in-one(.exe) executable from [the binary distribution archives](https://www.jaegertracing.io/download/): + +```bash +jaeger-all-in-one --collector.zipkin.host-port=:9411 +``` + +## Setup environment variables and run TensorRT-LLM + +Set up the environment variables: + +```bash +export JAEGER_IP=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' jaeger) +export OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=grpc +export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=grpc://$JAEGER_IP:4317 +export OTEL_EXPORTER_OTLP_TRACES_INSECURE=true +export OTEL_SERVICE_NAME="trt-server" +``` + +Then run TensorRT-LLM with OpenTelemetry: + +```bash +trtllm-serve models/Qwen3-8B/ --otlp_traces_endpoint="$OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" +``` + +## Send requests and find traces in Jaeger + +You can send a request to the server and view the traces in [Jaeger UI](http://localhost:16686/). +The traces should be visible under the service name "trt-server". + +## Configuration for Disaggregated Serving + +For disaggregated serving scenarios, the configuration for ctx server and gen server remains the same as the standalone model. For the proxy, you can configure it as follows: + +```yaml +# disagg_config.yaml +hostname: 127.0.0.1 +port: 8000 +backend: pytorch +context_servers: + num_instances: 1 + urls: + - "127.0.0.1:8001" +generation_servers: + num_instances: 1 + urls: + - "127.0.0.1:8002" +observability_config: + otlp_traces_endpoint: "grpc://0.0.0.0:4317" +``` From 89c27733fc578fdb8034869085a9668bd7eaa524 Mon Sep 17 00:00:00 2001 From: Zhang Haotong Date: Tue, 26 Aug 2025 10:43:56 +0800 Subject: [PATCH 05/15] [chores] fix todo Signed-off-by: Zhang Haotong --- tensorrt_llm/serve/openai_disagg_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorrt_llm/serve/openai_disagg_server.py b/tensorrt_llm/serve/openai_disagg_server.py index d741104f87a..fde345e8df1 100644 --- a/tensorrt_llm/serve/openai_disagg_server.py +++ b/tensorrt_llm/serve/openai_disagg_server.py @@ -377,7 +377,7 @@ async def _send_disagg_request(self, req: Union[CompletionRequest, ChatCompletio if need_ctx: ctx_req = copy.deepcopy(req) ctx_server, _ = await self.ctx_router.get_next_server(ctx_req) - #todo: rename event to something more descriptive + tracing.add_event(tracing.SpanEvents.CTX_SERVER_SELECTED, attributes={"server": str(ctx_server),}) # TODO: add ctx_server info into generation request for pre-registration From c987e0628b0dd9dc6482457a5e86c0ba49379e39 Mon Sep 17 00:00:00 2001 From: Zhang Haotong Date: Tue, 26 Aug 2025 14:40:49 +0800 Subject: [PATCH 06/15] fix Signed-off-by: Zhang Haotong --- examples/opentelemetry/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/opentelemetry/README.md b/examples/opentelemetry/README.md index 18d264dbfb2..2e6557c43fd 100644 --- a/examples/opentelemetry/README.md +++ b/examples/opentelemetry/README.md @@ -52,7 +52,7 @@ export OTEL_EXPORTER_OTLP_TRACES_INSECURE=true export OTEL_SERVICE_NAME="trt-server" ``` -Then run TensorRT-LLM with OpenTelemetry: +Then run TensorRT-LLM with OpenTelemetry, and make sure to set `return_perf_metrics` to true in the model configuration: ```bash trtllm-serve models/Qwen3-8B/ --otlp_traces_endpoint="$OTEL_EXPORTER_OTLP_TRACES_ENDPOINT" From 94126ca8b25fdc3f4a2b048916164b2ef538fad2 Mon Sep 17 00:00:00 2001 From: Zhang Haotong Date: Tue, 26 Aug 2025 15:18:57 +0800 Subject: [PATCH 07/15] [fix] remove opentelemetry package from requirements.txt Signed-off-by: Zhang Haotong --- requirements.txt | 5 ----- tensorrt_llm/llmapi/tracing.py | 3 +-- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/requirements.txt b/requirements.txt index 400df1448ed..44954f1f71e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -63,11 +63,6 @@ meson ninja etcd3 blake3 -# observation.tracing -opentelemetry-sdk -opentelemetry-api -opentelemetry-exporter-otlp -opentelemetry-semantic-conventions-ai soundfile triton==3.3.1; platform_machine == "x86_64" tiktoken diff --git a/tensorrt_llm/llmapi/tracing.py b/tensorrt_llm/llmapi/tracing.py index 740efde4ef2..e8722f661cc 100644 --- a/tensorrt_llm/llmapi/tracing.py +++ b/tensorrt_llm/llmapi/tracing.py @@ -34,7 +34,6 @@ get_current_span, set_tracer_provider) from opentelemetry.trace.propagation.tracecontext import \ TraceContextTextMapPropagator - from opentelemetry.util import types _is_otel_imported = True except ImportError: @@ -162,7 +161,7 @@ def contains_trace_headers(headers: Mapping[str, str]) -> bool: def add_event(name: str, - attributes: types.Attributes = None, + attributes: Optional[Mapping[str, object]] = None, timestamp: typing.Optional[int] = None) -> None: """Add an event to the current span if tracing is available.""" if not is_tracing_enabled(): From 59384456b56d47d3e5eb1b10a5bf8dc9b350eedb Mon Sep 17 00:00:00 2001 From: Zhang Haotong Date: Tue, 26 Aug 2025 15:33:40 +0800 Subject: [PATCH 08/15] [chores] pre commit Signed-off-by: Zhang Haotong --- tensorrt_llm/commands/serve.py | 64 ++++++++-------------- tensorrt_llm/llmapi/tracing.py | 2 +- tensorrt_llm/serve/openai_disagg_server.py | 3 +- 3 files changed, 24 insertions(+), 45 deletions(-) diff --git a/tensorrt_llm/commands/serve.py b/tensorrt_llm/commands/serve.py index 306c06bc6eb..7c56c816d48 100644 --- a/tensorrt_llm/commands/serve.py +++ b/tensorrt_llm/commands/serve.py @@ -110,42 +110,24 @@ def get_llm_args(model: str, ) backend = backend if backend in ["pytorch", "_autodeploy"] else None llm_args = { - "model": - model, - "scheduler_config": - scheduler_config, - "tokenizer": - tokenizer, - "tensor_parallel_size": - tensor_parallel_size, - "pipeline_parallel_size": - pipeline_parallel_size, - "moe_expert_parallel_size": - moe_expert_parallel_size, - "gpus_per_node": - gpus_per_node, - "trust_remote_code": - trust_remote_code, - "build_config": - build_config, - "max_batch_size": - max_batch_size, - "max_num_tokens": - max_num_tokens, - "max_beam_width": - max_beam_width, - "max_seq_len": - max_seq_len, - "kv_cache_config": - kv_cache_config, - "backend": - backend, - "num_postprocess_workers": - num_postprocess_workers, - "postprocess_tokenizer_dir": - tokenizer or model, - "reasoning_parser": - reasoning_parser, + "model": model, + "scheduler_config": scheduler_config, + "tokenizer": tokenizer, + "tensor_parallel_size": tensor_parallel_size, + "pipeline_parallel_size": pipeline_parallel_size, + "moe_expert_parallel_size": moe_expert_parallel_size, + "gpus_per_node": gpus_per_node, + "trust_remote_code": trust_remote_code, + "build_config": build_config, + "max_batch_size": max_batch_size, + "max_num_tokens": max_num_tokens, + "max_beam_width": max_beam_width, + "max_seq_len": max_seq_len, + "kv_cache_config": kv_cache_config, + "backend": backend, + "num_postprocess_workers": num_postprocess_workers, + "postprocess_tokenizer_dir": tokenizer or model, + "reasoning_parser": reasoning_parser, "fail_fast_on_attention_window_too_large": fail_fast_on_attention_window_too_large, "otlp_traces_endpoint": otlp_traces_endpoint, @@ -305,12 +287,10 @@ def launch_mm_encoder_server( help= "Exit with runtime error when attention window is too large to fit even a single sequence in the KV cache." ) -@click.option( - "--otlp_traces_endpoint", - type=str, - default=None, - help="Target URL to which OpenTelemetry traces will be sent." -) +@click.option("--otlp_traces_endpoint", + type=str, + default=None, + help="Target URL to which OpenTelemetry traces will be sent.") def serve( model: str, tokenizer: Optional[str], host: str, port: int, log_level: str, backend: str, max_beam_width: int, max_batch_size: int, diff --git a/tensorrt_llm/llmapi/tracing.py b/tensorrt_llm/llmapi/tracing.py index e8722f661cc..8a9da6546ff 100644 --- a/tensorrt_llm/llmapi/tracing.py +++ b/tensorrt_llm/llmapi/tracing.py @@ -141,7 +141,7 @@ class SpanAttributes: GEN_AI_REQUEST_TEMPERATURE = "gen_ai.request.temperature" GEN_AI_REQUEST_ID = "gen_ai.request.id" GEN_AI_REQUEST_N = "gen_ai.request.n" - GEN_AI_LATENCY_TIME_TO_FIRST_TOKEN = "gen_ai.latency.time_to_first_token" + GEN_AI_LATENCY_TIME_TO_FIRST_TOKEN = "gen_ai.latency.time_to_first_token" # nosec B105 GEN_AI_LATENCY_E2E = "gen_ai.latency.e2e" GEN_AI_LATENCY_TIME_IN_QUEUE = "gen_ai.latency.time_in_queue" GEN_AI_LATENCY_KV_CACHE_TRANSFER_TIME = "gen_ai.latency.kv_cache_transfer_time" diff --git a/tensorrt_llm/serve/openai_disagg_server.py b/tensorrt_llm/serve/openai_disagg_server.py index fde345e8df1..66f48ac59f1 100644 --- a/tensorrt_llm/serve/openai_disagg_server.py +++ b/tensorrt_llm/serve/openai_disagg_server.py @@ -20,11 +20,10 @@ # yapf: disable from tensorrt_llm.executor import CppExecutorError +from tensorrt_llm.llmapi import tracing from tensorrt_llm.llmapi.disagg_utils import (DisaggServerConfig, MetadataServerConfig, - ObservabilityConfig, get_ctx_gen_server_urls) -from tensorrt_llm.llmapi import tracing from tensorrt_llm.logger import logger from tensorrt_llm.serve.metadata_server import create_metadata_server from tensorrt_llm.serve.openai_protocol import (ChatCompletionRequest, From c54f28175a02d4bf18f0336b5ad1a0fd7e7147ab Mon Sep 17 00:00:00 2001 From: Zhang Haotong Date: Tue, 16 Sep 2025 17:34:57 +0800 Subject: [PATCH 09/15] [feat] use more accurate time correction Signed-off-by: Zhang Haotong --- tensorrt_llm/executor/result.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/tensorrt_llm/executor/result.py b/tensorrt_llm/executor/result.py index 1a1191d9a5b..32db52c714f 100644 --- a/tensorrt_llm/executor/result.py +++ b/tensorrt_llm/executor/result.py @@ -425,15 +425,9 @@ def do_tracing( # Since arrival_time and other timing metrics are based on different time origins, # we need to apply corrections to align them with absolute timestamps - time_correction = 0 - arrival_timestamp = metrics_dict.get(MetricNames.ARRIVAL_TIMESTAMP, 0) + time_correction = time.time() - time.monotonic() arrival_time = req_perf_metrics_dict.get( RequestEventTiming.ARRIVAL_TIME, 0) - if arrival_timestamp > 0: - time_correction = arrival_timestamp - arrival_time - else: - time_correction = time.time() - metrics_dict.get( - MetricNames.E2E, -1) - arrival_time with tracing.global_otlp_tracer().start_as_current_span( "llm_request", From fec65300ab55e091b7acd23d11523d8208b77397 Mon Sep 17 00:00:00 2001 From: zhanghaotong Date: Fri, 10 Oct 2025 14:47:42 +0800 Subject: [PATCH 10/15] pre-commit Signed-off-by: zhanghaotong --- tensorrt_llm/executor/result.py | 2 +- tensorrt_llm/llmapi/disagg_utils.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tensorrt_llm/executor/result.py b/tensorrt_llm/executor/result.py index def8e8408a7..23ce8646b91 100644 --- a/tensorrt_llm/executor/result.py +++ b/tensorrt_llm/executor/result.py @@ -1,7 +1,7 @@ import asyncio import json -import time import threading +import time import weakref from dataclasses import dataclass, field from queue import Empty, Queue diff --git a/tensorrt_llm/llmapi/disagg_utils.py b/tensorrt_llm/llmapi/disagg_utils.py index 6319f1ad19a..64057b339c1 100644 --- a/tensorrt_llm/llmapi/disagg_utils.py +++ b/tensorrt_llm/llmapi/disagg_utils.py @@ -45,7 +45,8 @@ class ConditionalDisaggConfig(): @dataclass class ObservabilityConfig(): - otlp_traces_endpoint: Optional[str] = None # Target URL to which OpenTelemetry traces will be sent + otlp_traces_endpoint: Optional[ + str] = None # Target URL to which OpenTelemetry traces will be sent class MinimalInstances: From db621c7cf6eb6f1337606242f012c8040402f181 Mon Sep 17 00:00:00 2001 From: zhanghaotong Date: Fri, 10 Oct 2025 16:09:36 +0800 Subject: [PATCH 11/15] use strEnum and rename ObservabilityConfig to OtlpConfig Signed-off-by: zhanghaotong --- examples/opentelemetry/README.md | 2 +- tensorrt_llm/llmapi/disagg_utils.py | 11 +++++------ tensorrt_llm/llmapi/tracing.py | 15 +++++++++++++-- tensorrt_llm/serve/openai_disagg_server.py | 8 ++++---- 4 files changed, 23 insertions(+), 13 deletions(-) diff --git a/examples/opentelemetry/README.md b/examples/opentelemetry/README.md index 2e6557c43fd..9b6efb507a1 100644 --- a/examples/opentelemetry/README.md +++ b/examples/opentelemetry/README.md @@ -80,6 +80,6 @@ generation_servers: num_instances: 1 urls: - "127.0.0.1:8002" -observability_config: +otlp_config: otlp_traces_endpoint: "grpc://0.0.0.0:4317" ``` diff --git a/tensorrt_llm/llmapi/disagg_utils.py b/tensorrt_llm/llmapi/disagg_utils.py index 64057b339c1..58127b05605 100644 --- a/tensorrt_llm/llmapi/disagg_utils.py +++ b/tensorrt_llm/llmapi/disagg_utils.py @@ -44,7 +44,7 @@ class ConditionalDisaggConfig(): @dataclass -class ObservabilityConfig(): +class OtlpConfig(): otlp_traces_endpoint: Optional[ str] = None # Target URL to which OpenTelemetry traces will be sent @@ -72,7 +72,7 @@ class DisaggServerConfig(): gen_router_config: Optional[RouterConfig] = None conditional_disagg_config: Optional[ConditionalDisaggConfig] = None max_retries: int = 1 - observability_config: Optional[ObservabilityConfig] = None + otlp_config: Optional[OtlpConfig] = None perf_metrics_max_requests: int = 0 @@ -117,7 +117,7 @@ def extract_disagg_cfg(hostname: str = 'localhost', context_servers: Optional[dict] = None, generation_servers: Optional[dict] = None, conditional_disagg_config: Optional[dict] = None, - observability_config: Optional[dict] = None, + otlp_config: Optional[dict] = None, **kwargs: Any) -> DisaggServerConfig: context_servers = context_servers or {} generation_servers = generation_servers or {} @@ -151,12 +151,11 @@ def extract_disagg_cfg(hostname: str = 'localhost', conditional_disagg_config = ConditionalDisaggConfig( **conditional_disagg_config) if conditional_disagg_config else None - observability_config = ObservabilityConfig( - **observability_config) if observability_config else None + otlp_config = OtlpConfig(**otlp_config) if otlp_config else None config = DisaggServerConfig(server_configs, hostname, port, ctx_router_config, gen_router_config, - conditional_disagg_config, observability_config, + conditional_disagg_config, otlp_config, max_retries, perf_metrics_max_requests) return config diff --git a/tensorrt_llm/llmapi/tracing.py b/tensorrt_llm/llmapi/tracing.py index 8a9da6546ff..031e678d202 100644 --- a/tensorrt_llm/llmapi/tracing.py +++ b/tensorrt_llm/llmapi/tracing.py @@ -12,6 +12,7 @@ import os import typing from collections.abc import Mapping +from enum import StrEnum from typing import Optional from tensorrt_llm._utils import run_once @@ -132,23 +133,33 @@ def is_tracing_enabled() -> bool: return _global_tracer_ is not None -class SpanAttributes: +class SpanAttributes(StrEnum): + """Span attributes for LLM tracing following GenAI semantic conventions.""" + + # Token usage attributes GEN_AI_USAGE_COMPLETION_TOKENS = "gen_ai.usage.completion_tokens" GEN_AI_USAGE_PROMPT_TOKENS = "gen_ai.usage.prompt_tokens" + + # Request attributes GEN_AI_REQUEST_MAX_TOKENS = "gen_ai.request.max_tokens" GEN_AI_REQUEST_TOP_P = "gen_ai.request.top_p" GEN_AI_REQUEST_TOP_K = "gen_ai.request.top_k" GEN_AI_REQUEST_TEMPERATURE = "gen_ai.request.temperature" GEN_AI_REQUEST_ID = "gen_ai.request.id" GEN_AI_REQUEST_N = "gen_ai.request.n" + + # Latency attributes GEN_AI_LATENCY_TIME_TO_FIRST_TOKEN = "gen_ai.latency.time_to_first_token" # nosec B105 GEN_AI_LATENCY_E2E = "gen_ai.latency.e2e" GEN_AI_LATENCY_TIME_IN_QUEUE = "gen_ai.latency.time_in_queue" GEN_AI_LATENCY_KV_CACHE_TRANSFER_TIME = "gen_ai.latency.kv_cache_transfer_time" + + # Response attributes GEN_AI_RESPONSE_FINISH_REASONS = "gen_ai.response.finish_reasons" -class SpanEvents: +class SpanEvents(StrEnum): + """Span events for LLM tracing.""" KV_CACHE_TRANSFER_START = "kv_cache_transfer_start" KV_CACHE_TRANSFER_END = "kv_cache_transfer_end" CTX_SERVER_SELECTED = "ctx_server.selected" diff --git a/tensorrt_llm/serve/openai_disagg_server.py b/tensorrt_llm/serve/openai_disagg_server.py index 62f51e51768..8f7f8f1e847 100644 --- a/tensorrt_llm/serve/openai_disagg_server.py +++ b/tensorrt_llm/serve/openai_disagg_server.py @@ -55,13 +55,13 @@ def __init__(self, self.gen_router = create_router( config.gen_router_config, self.gen_servers, metadata_server_cfg, self.metadata_server) self.conditional_disagg_config = config.conditional_disagg_config - self.observability_cfg = config.observability_config + self.otlp_cfg = config.otlp_config try: - if self.observability_cfg and self.observability_cfg.otlp_traces_endpoint: - tracing.init_tracer("trt.llm", self.observability_cfg.otlp_traces_endpoint) + if self.otlp_cfg and self.otlp_cfg.otlp_traces_endpoint: + tracing.init_tracer("trt.llm", self.otlp_cfg.otlp_traces_endpoint) logger.info( - f"Initialized OTLP tracer successfully, endpoint: {self.observability_cfg.otlp_traces_endpoint}" + f"Initialized OTLP tracer successfully, endpoint: {self.otlp_cfg.otlp_traces_endpoint}" ) except Exception as e: logger.error(f"Failed to initialize OTLP tracer: {e}") From ab0fbedf8a11cb18d1ee7ae0d3e56c8bbda4d2ae Mon Sep 17 00:00:00 2001 From: zhanghaotong Date: Mon, 13 Oct 2025 10:38:57 +0800 Subject: [PATCH 12/15] use strenum Signed-off-by: zhanghaotong --- tensorrt_llm/llmapi/tracing.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tensorrt_llm/llmapi/tracing.py b/tensorrt_llm/llmapi/tracing.py index 031e678d202..fadf0bfdd91 100644 --- a/tensorrt_llm/llmapi/tracing.py +++ b/tensorrt_llm/llmapi/tracing.py @@ -12,9 +12,10 @@ import os import typing from collections.abc import Mapping -from enum import StrEnum from typing import Optional +from strenum import StrEnum + from tensorrt_llm._utils import run_once from tensorrt_llm.logger import logger From 66ca6e5de796c35bbf94cc61ccfb03a314fa5f80 Mon Sep 17 00:00:00 2001 From: zhanghaotong Date: Tue, 14 Oct 2025 16:41:25 +0800 Subject: [PATCH 13/15] fix Signed-off-by: zhanghaotong --- tensorrt_llm/llmapi/disagg_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorrt_llm/llmapi/disagg_utils.py b/tensorrt_llm/llmapi/disagg_utils.py index 58127b05605..1ef5adf4af3 100644 --- a/tensorrt_llm/llmapi/disagg_utils.py +++ b/tensorrt_llm/llmapi/disagg_utils.py @@ -71,8 +71,8 @@ class DisaggServerConfig(): ctx_router_config: Optional[RouterConfig] = None gen_router_config: Optional[RouterConfig] = None conditional_disagg_config: Optional[ConditionalDisaggConfig] = None - max_retries: int = 1 otlp_config: Optional[OtlpConfig] = None + max_retries: int = 1 perf_metrics_max_requests: int = 0 From 4162058f64ac894cf4a0efaeaaf37a5cd63e9d2f Mon Sep 17 00:00:00 2001 From: Shunkang <182541032+Shunkangz@users.noreply.github.co> Date: Wed, 15 Oct 2025 04:13:58 +0000 Subject: [PATCH 14/15] Fix llmapi test Signed-off-by: Shunkang <182541032+Shunkangz@users.noreply.github.co> --- tensorrt_llm/executor/result.py | 8 +++++++- tensorrt_llm/llmapi/llm.py | 1 + tensorrt_llm/llmapi/llm_args.py | 3 ++- tests/unittest/api_stability/api_stability_core.py | 1 + tests/unittest/api_stability/references/llm.yaml | 8 ++++++++ .../api_stability/references/request_output.yaml | 9 +++++++++ 6 files changed, 28 insertions(+), 2 deletions(-) diff --git a/tensorrt_llm/executor/result.py b/tensorrt_llm/executor/result.py index 45f57edc70a..9c730b4d816 100644 --- a/tensorrt_llm/executor/result.py +++ b/tensorrt_llm/executor/result.py @@ -579,7 +579,13 @@ def do_tracing( self, output: CompletionOutput, req_perf_metrics_dict: Optional[dict[str, float]] = None, - ): + ) -> None: + """Perform distributed tracing for the generation request. + + Args: + output (CompletionOutput): The output of the generation result. + req_perf_metrics_dict (Optional[dict[str, float]]): Request performance metrics. Defaults to None. + """ if not tracing.global_otlp_tracer(): return diff --git a/tensorrt_llm/llmapi/llm.py b/tensorrt_llm/llmapi/llm.py index 85a1be0750e..169363fa6ee 100644 --- a/tensorrt_llm/llmapi/llm.py +++ b/tensorrt_llm/llmapi/llm.py @@ -365,6 +365,7 @@ def generate_async( streaming (bool): Whether to use the streaming mode for the generation. Defaults to False. kv_cache_retention_config (tensorrt_llm.bindings.executor.KvCacheRetentionConfig, optional): Configuration for the request's retention in the KV Cache. Defaults to None. disaggregated_params (tensorrt_llm.disaggregated_params.DisaggregatedParams, optional): Disaggregated parameters. Defaults to None. + trace_headers (Mapping[str, str], optional): Trace headers. Defaults to None. scheduling_params (tensorrt_llm.scheduling_params.SchedulingParams, optional): Scheduling parameters. Defaults to None. cache_salt (str, optional): If specified, KV cache will be salted with the provided string to limit the kv cache reuse to the requests with the same string. Defaults to None. Returns: diff --git a/tensorrt_llm/llmapi/llm_args.py b/tensorrt_llm/llmapi/llm_args.py index 5e02fcf2229..2c6eaaba3eb 100644 --- a/tensorrt_llm/llmapi/llm_args.py +++ b/tensorrt_llm/llmapi/llm_args.py @@ -1640,7 +1640,8 @@ class BaseLlmArgs(StrictBaseModel): otlp_traces_endpoint: Optional[str] = Field( default=None, description="Target URL to which OpenTelemetry traces will be sent.", - alias="otlp_traces_endpoint") + alias="otlp_traces_endpoint", + status="prototype") backend: Optional[str] = Field( default=None, diff --git a/tests/unittest/api_stability/api_stability_core.py b/tests/unittest/api_stability/api_stability_core.py index 19e9a47d30b..36956f4eef1 100644 --- a/tests/unittest/api_stability/api_stability_core.py +++ b/tests/unittest/api_stability/api_stability_core.py @@ -3,6 +3,7 @@ import inspect import os import pathlib +from collections.abc import Mapping from dataclasses import _HAS_DEFAULT_FACTORY_CLASS, dataclass, fields from pprint import pprint from types import MethodType, NoneType diff --git a/tests/unittest/api_stability/references/llm.yaml b/tests/unittest/api_stability/references/llm.yaml index 1e535c99e9d..3ed56a5794e 100644 --- a/tests/unittest/api_stability/references/llm.yaml +++ b/tests/unittest/api_stability/references/llm.yaml @@ -187,6 +187,10 @@ methods: annotation: Optional[tensorrt_llm.llmapi.llm_args.SparseAttentionConfig] default: null status: prototype + otlp_traces_endpoint: + annotation: Optional[str] + default: null + status: prototype return_annotation: None generate: parameters: @@ -215,6 +219,10 @@ methods: cache_salt: annotation: Optional[str] default: null + trace_headers: + annotation: Optional[Mapping[str, str]] + default: null + status: prototype return_annotation: tensorrt_llm.llmapi.llm.RequestOutput get_kv_cache_events: parameters: diff --git a/tests/unittest/api_stability/references/request_output.yaml b/tests/unittest/api_stability/references/request_output.yaml index 7e3054cd5ef..684a178afb1 100644 --- a/tests/unittest/api_stability/references/request_output.yaml +++ b/tests/unittest/api_stability/references/request_output.yaml @@ -20,4 +20,13 @@ methods: annotation: Optional[dict[str, float]] default: None return_annotation: None + do_tracing: + parameters: + output: + annotation: tensorrt_llm.executor.result.CompletionOutput + default: inspect._empty + req_perf_metrics_dict: + annotation: Optional[dict[str, float]] + default: None + return_annotation: None properties: {} From 5c9bb15d8a9575d5ed40d472859fcc9682dd8d27 Mon Sep 17 00:00:00 2001 From: zhanghaotong Date: Mon, 20 Oct 2025 11:16:36 +0800 Subject: [PATCH 15/15] add dataclass to MinimalInstances Signed-off-by: zhanghaotong --- tensorrt_llm/llmapi/disagg_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tensorrt_llm/llmapi/disagg_utils.py b/tensorrt_llm/llmapi/disagg_utils.py index 1ef5adf4af3..493628c572c 100644 --- a/tensorrt_llm/llmapi/disagg_utils.py +++ b/tensorrt_llm/llmapi/disagg_utils.py @@ -49,6 +49,7 @@ class OtlpConfig(): str] = None # Target URL to which OpenTelemetry traces will be sent +@dataclass class MinimalInstances: context_servers: int = 1 # the minimal number of context servers generation_servers: int = 1 # the minimal number of generation servers