From 4cf00716d55c55bc5286481a2ba2474bd66b28bc Mon Sep 17 00:00:00 2001 From: sallyom Date: Fri, 22 Aug 2025 22:25:42 -0400 Subject: [PATCH 1/9] Add opentelemetry tracing Add centralized telemetry package and custom spans following the llm-d distributed tracing proposal. Co-Authored-By: Claude Signed-off-by: sallyom --- cmd/epp/main.go | 37 +++++- cmd/pd-sidecar/main.go | 29 +++++ go.mod | 10 +- pkg/common/common.go | 10 ++ pkg/common/common_test.go | 87 ++++++++++++++ pkg/plugins/pre-request/pd_prerequest.go | 33 +++++- pkg/plugins/profile/pd_profile_handler.go | 44 +++++++ pkg/plugins/scorer/precise_prefix_cache.go | 59 +++++++++- pkg/sidecar/proxy/chat_completions.go | 41 +++++++ pkg/sidecar/proxy/connector_nixlv2.go | 99 +++++++++++++++- pkg/sidecar/proxy/connector_sglang.go | 82 +++++++++++++ pkg/sidecar/proxy/proxy_helpers.go | 11 +- pkg/telemetry/tracing.go | 131 +++++++++++++++++++++ 13 files changed, 662 insertions(+), 11 deletions(-) create mode 100644 pkg/common/common_test.go create mode 100644 pkg/telemetry/tracing.go diff --git a/cmd/epp/main.go b/cmd/epp/main.go index 1952fcf30..65eb8cdb5 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -27,20 +27,55 @@ package main import ( "os" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/gateway-api-inference-extension/cmd/epp/runner" "github.com/llm-d/llm-d-inference-scheduler/pkg/metrics" "github.com/llm-d/llm-d-inference-scheduler/pkg/plugins" + "github.com/llm-d/llm-d-inference-scheduler/pkg/telemetry" ) func main() { + ctx := ctrl.SetupSignalHandler() + + // Initialize tracing before creating any spans + shutdownTracing, err := telemetry.InitTracing(ctx) + if err != nil { + // Log error but don't fail - tracing is optional + ctrl.Log.Error(err, "Failed to initialize tracing") + } + + // Add startup span to verify tracing is working + tracer := telemetry.Tracer() + ctx, span := tracer.Start(ctx, "llm_d.epp.startup") + defer span.End() + span.SetAttributes( + attribute.String("component", "llm-d-inference-scheduler"), + attribute.String("operation", "startup"), + ) + // Register llm-d-inference-scheduler plugins plugins.RegisterAllPlugins() + // Note: GIE built-in plugins are automatically registered by the runner + // when it processes configuration in runner.parsePluginsConfiguration() + if err := runner.NewRunner(). WithCustomCollectors(metrics.GetCollectors()...). - Run(ctrl.SetupSignalHandler()); err != nil { + Run(ctx); err != nil { + span.SetStatus(codes.Error, "startup failed") + if shutdownTracing != nil { + if err := shutdownTracing(ctx); err != nil { + ctrl.Log.Error(err, "Failed to shutdown tracing") + } + } os.Exit(1) } + if shutdownTracing != nil { + if err := shutdownTracing(ctx); err != nil { + ctrl.Log.Error(err, "Failed to shutdown tracing") + } + } } diff --git a/cmd/pd-sidecar/main.go b/cmd/pd-sidecar/main.go index 8e9e6533c..ed04fdc46 100644 --- a/cmd/pd-sidecar/main.go +++ b/cmd/pd-sidecar/main.go @@ -29,6 +29,9 @@ import ( "github.com/llm-d/llm-d-inference-scheduler/pkg/sidecar/proxy" "github.com/llm-d/llm-d-inference-scheduler/pkg/sidecar/version" + "github.com/llm-d/llm-d-inference-scheduler/pkg/telemetry" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) var ( @@ -70,6 +73,29 @@ func main() { ctx := ctrl.SetupSignalHandler() log.IntoContext(ctx, logger) + // Initialize tracing before creating any spans + shutdownTracing, err := telemetry.InitTracing(ctx) + if err != nil { + // Log error but don't fail - tracing is optional + logger.Error(err, "Failed to initialize tracing") + } + if shutdownTracing != nil { + defer func() { + if err := shutdownTracing(ctx); err != nil { + logger.Error(err, "Failed to shutdown tracing") + } + }() + } + + // Add startup span to verify tracing is working + tracer := telemetry.Tracer() + ctx, span := tracer.Start(ctx, "llm_d.pd_proxy.startup") + span.SetAttributes( + attribute.String("component", "llm-d-pd-proxy"), + attribute.String("operation", "startup"), + ) + defer span.End() + logger.Info("Proxy starting", "Built on", version.BuildRef, "From Git SHA", version.CommitSHA) // Validate connector @@ -108,6 +134,7 @@ func main() { targetURL, err := url.Parse(scheme + "://localhost:" + *vLLMPort) if err != nil { logger.Error(err, "failed to create targetURL") + span.SetStatus(codes.Error, "failed to create targetURL") return } @@ -121,6 +148,7 @@ func main() { } if err != nil { logger.Error(err, "failed to create TLS certificate") + span.SetStatus(codes.Error, "failed to create TLS certificate") return } cert = &tempCert @@ -139,6 +167,7 @@ func main() { validator, err := proxy.NewAllowlistValidator(*enableSSRFProtection, *poolGroup, *inferencePoolNamespace, *inferencePoolName) if err != nil { logger.Error(err, "failed to create SSRF protection validator") + span.SetStatus(codes.Error, "failed to create SSRF protection validator") return } diff --git a/go.mod b/go.mod index e29565d84..0b6af436e 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,11 @@ require ( github.com/openai/openai-go v1.12.0 github.com/prometheus/client_golang v1.23.2 github.com/stretchr/testify v1.11.1 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 + go.opentelemetry.io/otel v1.39.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 + go.opentelemetry.io/otel/sdk v1.39.0 + go.opentelemetry.io/otel/trace v1.39.0 golang.org/x/sync v0.19.0 google.golang.org/grpc v1.79.1 k8s.io/api v0.34.4 @@ -103,14 +108,9 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/xlab/treeprint v1.2.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 // indirect - go.opentelemetry.io/otel v1.39.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.39.0 // indirect go.opentelemetry.io/otel/metric v1.39.0 // indirect - go.opentelemetry.io/otel/sdk v1.39.0 // indirect - go.opentelemetry.io/otel/trace v1.39.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/pkg/common/common.go b/pkg/common/common.go index a6c50fe7c..a9cac8290 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -4,6 +4,8 @@ //revive:disable:var-naming package common +import "strings" + const ( // PrefillPodHeader is the header name used to indicate Prefill worker PrefillPodHeader = "x-prefiller-host-port" @@ -11,3 +13,11 @@ const ( // DataParallelPodHeader is the header name used to indicate the worker for Data Parallel DataParallelPodHeader = "x-data-parallel-host-port" ) + +// StripScheme removes http:// or https:// prefix from endpoint URL +// This is useful for gRPC clients that expect host:port format only +func StripScheme(endpoint string) string { + endpoint = strings.TrimPrefix(endpoint, "http://") + endpoint = strings.TrimPrefix(endpoint, "https://") + return endpoint +} diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go new file mode 100644 index 000000000..fd60671e4 --- /dev/null +++ b/pkg/common/common_test.go @@ -0,0 +1,87 @@ +/* +Copyright 2025 The llm-d Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package common + +import "testing" + +func TestStripScheme(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + { + name: "http scheme", + input: "http://localhost:4317", + expected: "localhost:4317", + }, + { + name: "https scheme", + input: "https://localhost:4317", + expected: "localhost:4317", + }, + { + name: "no scheme", + input: "localhost:4317", + expected: "localhost:4317", + }, + { + name: "host only", + input: "localhost", + expected: "localhost", + }, + { + name: "http with domain", + input: "http://otel-collector.monitoring.svc.cluster.local:4317", + expected: "otel-collector.monitoring.svc.cluster.local:4317", + }, + { + name: "https with domain", + input: "https://otel-collector.monitoring.svc.cluster.local:4317", + expected: "otel-collector.monitoring.svc.cluster.local:4317", + }, + { + name: "empty string", + input: "", + expected: "", + }, + { + name: "ip address with http", + input: "http://10.0.0.1:4317", + expected: "10.0.0.1:4317", + }, + { + name: "ip address with https", + input: "https://10.0.0.1:4317", + expected: "10.0.0.1:4317", + }, + { + name: "ip address without scheme", + input: "10.0.0.1:4317", + expected: "10.0.0.1:4317", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := StripScheme(tt.input) + if result != tt.expected { + t.Errorf("StripScheme(%q) = %q, want %q", tt.input, result, tt.expected) + } + }) + } +} diff --git a/pkg/plugins/pre-request/pd_prerequest.go b/pkg/plugins/pre-request/pd_prerequest.go index c77fc700f..7a9b619c5 100644 --- a/pkg/plugins/pre-request/pd_prerequest.go +++ b/pkg/plugins/pre-request/pd_prerequest.go @@ -7,11 +7,14 @@ import ( "fmt" "net" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/requestcontrol" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling" "github.com/llm-d/llm-d-inference-scheduler/pkg/common" + "github.com/llm-d/llm-d-inference-scheduler/pkg/telemetry" ) const ( @@ -67,17 +70,45 @@ func (p *PrefillHeaderHandler) WithName(name string) *PrefillHeaderHandler { } // PreRequest wires prefill SchedulerProfile result into a header to indicate prefill worker -func (p *PrefillHeaderHandler) PreRequest(_ context.Context, request *scheduling.LLMRequest, schedulingResult *scheduling.SchedulingResult) { +func (p *PrefillHeaderHandler) PreRequest(ctx context.Context, request *scheduling.LLMRequest, schedulingResult *scheduling.SchedulingResult) { + tracer := telemetry.Tracer() + _, span := tracer.Start(ctx, "llm_d.epp.prerequest.pd_disaggregation", + trace.WithSpanKind(trace.SpanKindInternal), + ) + defer span.End() + + // Add component and request attributes + span.SetAttributes( + attribute.String("component", "llm-d-inference-scheduler"), + attribute.String("operation", "prefill_disaggregation"), + ) + + if request != nil && request.TargetModel != "" { + span.SetAttributes(attribute.String("gen_ai.request.model", request.TargetModel)) + } + if request != nil && request.RequestId != "" { + span.SetAttributes(attribute.String("gen_ai.request.id", request.RequestId)) + } if _, found := request.Headers[common.PrefillPodHeader]; found { request.Headers[common.PrefillPodHeader] = "" // clear header, if already set } prefillProfileRunResult, exists := schedulingResult.ProfileResults[p.prefillProfile] if !exists { + span.SetAttributes( + attribute.Bool("llm_d.epp.pd.disaggregation_enabled", false), + attribute.String("llm_d.epp.pd.reason", "no_prefill_profile_result"), + ) return // prefill profile failed to run or we chose not to run it, no-op in this case } targetPod := prefillProfileRunResult.TargetEndpoints[0].GetMetadata() prefillHostPort := net.JoinHostPort(targetPod.Address, targetPod.Port) request.Headers[common.PrefillPodHeader] = prefillHostPort // in the form of + + span.SetAttributes( + attribute.Bool("llm_d.epp.pd.disaggregation_enabled", true), + attribute.String("llm_d.epp.pd.prefill_pod_address", targetPod.Address), + attribute.String("llm_d.epp.pd.prefill_pod_port", targetPod.Port), + ) } diff --git a/pkg/plugins/profile/pd_profile_handler.go b/pkg/plugins/profile/pd_profile_handler.go index 3f5a4b932..891f86570 100644 --- a/pkg/plugins/profile/pd_profile_handler.go +++ b/pkg/plugins/profile/pd_profile_handler.go @@ -9,6 +9,8 @@ import ( "net" "strconv" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "sigs.k8s.io/controller-runtime/pkg/log" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/util/logging" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin" @@ -17,6 +19,7 @@ import ( "github.com/llm-d/llm-d-inference-scheduler/pkg/common" "github.com/llm-d/llm-d-inference-scheduler/pkg/metrics" + "github.com/llm-d/llm-d-inference-scheduler/pkg/telemetry" ) const ( @@ -143,8 +146,35 @@ func (h *PdProfileHandler) WithName(name string) *PdProfileHandler { // previously executed cycles along with their results. func (h *PdProfileHandler) Pick(ctx context.Context, _ *scheduling.CycleState, request *scheduling.LLMRequest, profiles map[string]scheduling.SchedulerProfile, profileResults map[string]*scheduling.ProfileRunResult) map[string]scheduling.SchedulerProfile { + // Start tracing span for profile picking operation + tracer := telemetry.Tracer() + ctx, span := tracer.Start(ctx, "llm_d.epp.profile_handler.pick", + trace.WithSpanKind(trace.SpanKindInternal), + ) + defer span.End() + + // Set initial attributes + span.SetAttributes( + attribute.Int("llm_d.profile_handler.total_profiles", len(profiles)), + attribute.Int("llm_d.profile_handler.executed_profiles", len(profileResults)), + ) + + // Set optional request attributes if request is not nil + if request != nil { + if request.TargetModel != "" { + span.SetAttributes(attribute.String("gen_ai.request.model", request.TargetModel)) + } + if request.RequestId != "" { + span.SetAttributes(attribute.String("gen_ai.request.id", request.RequestId)) + } + } + if _, executed := profileResults[h.decodeProfile]; !executed { // if decode profile was not executed yet, first let the scheduler run the decode profile + span.SetAttributes( + attribute.String("llm_d.profile_handler.decision", "run_decode"), + attribute.String("llm_d.profile_handler.selected_profile", h.decodeProfile), + ) return map[string]scheduling.SchedulerProfile{ h.decodeProfile: profiles[h.decodeProfile], } @@ -154,24 +184,38 @@ func (h *PdProfileHandler) Pick(ctx context.Context, _ *scheduling.CycleState, r // when a profile run fails its result value is nil. we need to check decode result before continuing to prefill // check if all configured profiles have been executed, or if decode failed, no need to run more profiles. if len(profiles) == len(profileResults) || profileResults[h.decodeProfile] == nil { + span.SetAttributes( + attribute.String("llm_d.profile_handler.decision", "complete"), + attribute.Bool("llm_d.profile_handler.decode_failed", profileResults[h.decodeProfile] == nil), + ) return map[string]scheduling.SchedulerProfile{} } inputTokens, err := getUserInputLenInTokens(request) if err != nil { log.FromContext(ctx).V(logutil.DEBUG).Error(err, "Failed to get user input") + span.SetAttributes(attribute.String("error", err.Error())) return nil } + span.SetAttributes(attribute.Int("llm_d.profile_handler.input_tokens", inputTokens)) + if h.decider != nil && h.decider.disaggregate(ctx, inputTokens, profileResults[h.decodeProfile].TargetEndpoints[0]) { metrics.RecordPDDecision(request.TargetModel, metrics.DecisionTypePrefillDecode) // run the prefill profile + span.SetAttributes( + attribute.String("llm_d.profile_handler.decision", "prefill_decode"), + attribute.String("llm_d.profile_handler.selected_profile", h.prefillProfile), + ) return map[string]scheduling.SchedulerProfile{ h.prefillProfile: profiles[h.prefillProfile], } } metrics.RecordPDDecision(request.TargetModel, metrics.DecisionTypeDecodeOnly) + span.SetAttributes( + attribute.String("llm_d.profile_handler.decision", "decode_only"), + ) return map[string]scheduling.SchedulerProfile{} // do not run prefill } diff --git a/pkg/plugins/scorer/precise_prefix_cache.go b/pkg/plugins/scorer/precise_prefix_cache.go index f37f24f5e..e935041d9 100644 --- a/pkg/plugins/scorer/precise_prefix_cache.go +++ b/pkg/plugins/scorer/precise_prefix_cache.go @@ -12,11 +12,16 @@ import ( "github.com/llm-d/llm-d-kv-cache/pkg/kvcache/kvblock" "github.com/llm-d/llm-d-kv-cache/pkg/kvevents" "github.com/llm-d/llm-d-kv-cache/pkg/tokenization/types" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "sigs.k8s.io/controller-runtime/pkg/log" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/util/logging" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/scheduling" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugins/scheduling/scorer/prefix" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/telemetry" ) const ( @@ -177,9 +182,22 @@ func (s *PrecisePrefixCacheScorer) Category() scheduling.ScorerCategory { // Score scores the provided endpoint based on the KVCache index state. // The returned scores are normalized to a range of 0-1. func (s *PrecisePrefixCacheScorer) Score(ctx context.Context, cycleState *scheduling.CycleState, request *scheduling.LLMRequest, endpoints []scheduling.Endpoint) map[scheduling.Endpoint]float64 { + // Start tracing span for scoring operation + tracer := telemetry.Tracer() + ctx, span := tracer.Start(ctx, "llm_d.epp.scorer.prefix_cache", + trace.WithSpanKind(trace.SpanKindInternal), + ) + defer span.End() + logger := log.FromContext(ctx).WithName(s.typedName.String()) debugLogger := logger.V(logutil.DEBUG) + // Set initial attributes + span.SetAttributes( + attribute.Int("llm_d.scorer.candidate_endpoints", len(endpoints)), + ) + + // Handle pod discovery and subscriber management if s.kvEventsConfig.DiscoverPods { // update subscribers here temporarily for _, endpoint := range endpoints { @@ -200,18 +218,34 @@ func (s *PrecisePrefixCacheScorer) Score(ctx context.Context, cycleState *schedu } } + // Early return if request is nil if request == nil { debugLogger.Info("Request is nil, skipping scoring") + span.SetAttributes(attribute.String("llm_d.scorer.result", "skipped_nil_request")) return nil } + // Set optional request attributes + if request.TargetModel != "" { + span.SetAttributes(attribute.String("gen_ai.request.model", request.TargetModel)) + } + if request.RequestId != "" { + span.SetAttributes(attribute.String("gen_ai.request.id", request.RequestId)) + } + scores, err := s.getScores(ctx, request) if err != nil { logger.Error(err, "Failed to get endpoint scores") + span.SetStatus(codes.Error, err.Error()) return nil } debugLogger.Info("Got endpoint scores", "scores", scores) + // Track scoring statistics + span.SetAttributes( + attribute.Int("llm_d.scorer.scores_computed", len(scores)), + ) + endpointToKey := func(endpoint scheduling.Endpoint) (string, bool) { metadata := endpoint.GetMetadata() if metadata == nil { @@ -221,6 +255,7 @@ func (s *PrecisePrefixCacheScorer) Score(ctx context.Context, cycleState *schedu return metadata.Address, true } + // Write prefix cache state to cycle state state := &prefix.SchedulingContextState{ PrefixHashes: []prefix.BlockHash{}, PrefixCacheServers: map[prefix.ServerID]int{}, @@ -234,7 +269,29 @@ func (s *PrecisePrefixCacheScorer) Score(ctx context.Context, cycleState *schedu } cycleState.Write(plugin.StateKey(s.typedName.String()), state) - return indexedScoresToNormalizedScoredPods(endpoints, endpointToKey, scores) + normalizedScores := indexedScoresToNormalizedScoredPods(endpoints, endpointToKey, scores) + + // Calculate score distribution for observability + if len(normalizedScores) > 0 { + maxScore := 0.0 + totalScore := 0.0 + for _, score := range normalizedScores { + if score > maxScore { + maxScore = score + } + totalScore += score + } + avgScore := totalScore / float64(len(normalizedScores)) + + span.SetAttributes( + attribute.Float64("llm_d.scorer.score.max", maxScore), + attribute.Float64("llm_d.scorer.score.avg", avgScore), + attribute.Int("llm_d.scorer.endpoints_scored", len(normalizedScores)), + ) + } + + span.SetAttributes(attribute.String("llm_d.scorer.result", "success")) + return normalizedScores } // getScores retrieves the endpoint scores from the KV-cache indexer diff --git a/pkg/sidecar/proxy/chat_completions.go b/pkg/sidecar/proxy/chat_completions.go index 5ab731a6e..8299c6b4a 100644 --- a/pkg/sidecar/proxy/chat_completions.go +++ b/pkg/sidecar/proxy/chat_completions.go @@ -17,12 +17,23 @@ limitations under the License. package proxy import ( + "context" "net/http" "strings" + "time" "github.com/llm-d/llm-d-inference-scheduler/pkg/common" + "github.com/llm-d/llm-d-inference-scheduler/pkg/telemetry" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) +// contextKey is a custom type for context keys to avoid collisions +type contextKey string + +const requestStartTimeKey contextKey = "request_start_time" + var ( // ChatCompletionsPath is the OpenAI chat completions path ChatCompletionsPath = "/v1/chat/completions" @@ -32,6 +43,21 @@ var ( ) func (s *Server) chatCompletionsHandler(w http.ResponseWriter, r *http.Request) { + requestStart := time.Now() + tracer := telemetry.Tracer() + ctx, span := tracer.Start(r.Context(), "llm_d.pd_proxy.request", + trace.WithSpanKind(trace.SpanKindServer), + ) + defer span.End() + + // Update request context with span and start time + ctx = context.WithValue(ctx, requestStartTimeKey, requestStart) + r = r.WithContext(ctx) + + span.SetAttributes( + attribute.String("llm_d.pd_proxy.connector", s.config.Connector), + ) + var prefillHostPorts []string prefillHostPorts = r.Header.Values(common.PrefillPodHeader) @@ -56,6 +82,10 @@ func (s *Server) chatCompletionsHandler(w http.ResponseWriter, r *http.Request) if len(prefillHostPort) == 0 { s.logger.V(4).Info("skip disaggregated prefill") + span.SetAttributes( + attribute.Bool("llm_d.pd_proxy.disaggregation_enabled", false), + attribute.String("llm_d.pd_proxy.reason", "no_prefill_header"), + ) if !s.forwardDataParallel || !s.dataParallelHandler(w, r) { s.decoderProxy.ServeHTTP(w, r) @@ -63,6 +93,12 @@ func (s *Server) chatCompletionsHandler(w http.ResponseWriter, r *http.Request) return } + span.SetAttributes( + attribute.Bool("llm_d.pd_proxy.disaggregation_enabled", true), + attribute.String("llm_d.pd_proxy.prefill_target", prefillHostPort), + attribute.Int("llm_d.pd_proxy.prefill_candidates", numHosts), + ) + // SSRF Protection: Check if the prefill target is allowed if !s.allowlistValidator.IsAllowed(prefillHostPort) { s.logger.Error(nil, "SSRF protection: prefill target not in allowlist", @@ -70,6 +106,11 @@ func (s *Server) chatCompletionsHandler(w http.ResponseWriter, r *http.Request) "clientIP", r.RemoteAddr, "userAgent", r.Header.Get("User-Agent"), "requestPath", r.URL.Path) + span.SetAttributes( + attribute.String("llm_d.pd_proxy.error", "ssrf_protection_denied"), + attribute.String("llm_d.pd_proxy.denied_target", prefillHostPort), + ) + span.SetStatus(codes.Error, "SSRF protection: prefill target not in allowlist") http.Error(w, "Forbidden: prefill target not allowed by SSRF protection", http.StatusForbidden) return } diff --git a/pkg/sidecar/proxy/connector_nixlv2.go b/pkg/sidecar/proxy/connector_nixlv2.go index f8543a711..45eb19392 100644 --- a/pkg/sidecar/proxy/connector_nixlv2.go +++ b/pkg/sidecar/proxy/connector_nixlv2.go @@ -21,8 +21,13 @@ import ( "io" "net/http" "strings" + "time" "github.com/google/uuid" + "github.com/llm-d/llm-d-inference-scheduler/pkg/telemetry" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) func (s *Server) runNIXLProtocolV2(w http.ResponseWriter, r *http.Request, prefillPodHostPort string) { @@ -57,9 +62,20 @@ func (s *Server) runNIXLProtocolV2(w http.ResponseWriter, r *http.Request, prefi uuidStr := uuid.String() // Prefill Stage + tracer := telemetry.Tracer() + ctx := r.Context() + + ctx, prefillSpan := tracer.Start(ctx, "llm_d.pd_proxy.prefill", + trace.WithSpanKind(trace.SpanKindInternal), + ) + prefillSpan.SetAttributes( + attribute.String("llm_d.pd_proxy.request_id", uuidStr), + attribute.String("llm_d.pd_proxy.prefill_target", prefillPodHostPort), + attribute.String("llm_d.pd_proxy.connector", "nixlv2"), + ) + prefillStart := time.Now() // 1. Prepare prefill request - ctx := r.Context() preq := r.Clone(ctx) preq.Header.Add(requestHeaderRequestID, uuidStr) @@ -107,11 +123,20 @@ func (s *Server) runNIXLProtocolV2(w http.ResponseWriter, r *http.Request, prefi pw := &bufferedResponseWriter{} prefillHandler.ServeHTTP(pw, preq) + prefillDuration := time.Since(prefillStart) + prefillSpan.SetAttributes( + attribute.Int("llm_d.pd_proxy.prefill.status_code", pw.statusCode), + attribute.Float64("llm_d.pd_proxy.prefill.duration_ms", float64(prefillDuration.Milliseconds())), + ) + if isHTTPError(pw.statusCode) { s.logger.Error(err, "request failed", "code", pw.statusCode) + prefillSpan.SetStatus(codes.Error, "prefill request failed") + prefillSpan.End() w.WriteHeader(pw.statusCode) return } + prefillSpan.End() // Process response - extract p/d fields var prefillerResponse map[string]any @@ -133,15 +158,31 @@ func (s *Server) runNIXLProtocolV2(w http.ResponseWriter, r *http.Request, prefi // Decode Stage + ctx, decodeSpan := tracer.Start(ctx, "llm_d.pd_proxy.decode", + trace.WithSpanKind(trace.SpanKindInternal), + ) + defer decodeSpan.End() + + decodeSpan.SetAttributes( + attribute.String("llm_d.pd_proxy.request_id", uuidStr), + attribute.String("llm_d.pd_proxy.connector", "nixlv2"), + ) + decodeStart := time.Now() + // 1. Prepare decode request dreq := r.Clone(ctx) dreq.Header.Add(requestHeaderRequestID, uuidStr) delete(completionRequest, requestFieldStream) + streamingEnabled := false if streamOk { completionRequest[requestFieldStream] = streamValue + if streamBool, ok := streamValue.(bool); ok { + streamingEnabled = streamBool + } } + decodeSpan.SetAttributes(attribute.Bool("llm_d.pd_proxy.decode.streaming", streamingEnabled)) if streamOptionsOk { completionRequest[requestFieldStreamOptions] = streamOptionsValue } @@ -168,8 +209,62 @@ func (s *Server) runNIXLProtocolV2(w http.ResponseWriter, r *http.Request, prefi // 2. Forward to local decoder. s.logger.V(5).Info("sending request to decoder", "body", string(dbody)) - if !s.forwardDataParallel || !s.dataParallelHandler(w, dreq) { + dataParallelUsed := s.forwardDataParallel && s.dataParallelHandler(w, dreq) + decodeSpan.SetAttributes(attribute.Bool("llm_d.pd_proxy.decode.data_parallel", dataParallelUsed)) + + if !dataParallelUsed { s.logger.V(4).Info("sending request to decoder", "to", s.decoderURL.Host) + decodeSpan.SetAttributes(attribute.String("llm_d.pd_proxy.decode.target", s.decoderURL.Host)) s.decoderProxy.ServeHTTP(w, dreq) } + + decodeDuration := time.Since(decodeStart) + decodeSpan.SetAttributes(attribute.Float64("llm_d.pd_proxy.decode.duration_ms", float64(decodeDuration.Milliseconds()))) + + // Calculate end-to-end P/D metrics and add to decode span + // These metrics represent the "true" TTFT and latency from the coordinator's perspective + // Note: After tracer.Start() above, ctx contains the decode span, so SpanFromContext returns it + if currentSpan := trace.SpanFromContext(ctx); currentSpan.SpanContext().IsValid() { + // Get request start time from context + var totalDuration time.Duration + var trueTTFT time.Duration + if requestStartValue := ctx.Value(requestStartTimeKey); requestStartValue != nil { + if requestStart, ok := requestStartValue.(time.Time); ok { + totalDuration = time.Since(requestStart) + + // The "true TTFT" in P/D mode is the time until the decoder can start generating + // This includes: gateway routing + scheduling + prefill time + KV transfer coordination overhead + // The decode vLLM will report a low TTFT (since KV is already transferred), + // but this captures the real end-to-end TTFT from the client's perspective + // + // True TTFT = time from gateway request start to decode start + // This includes all coordinator overhead that vLLM-level metrics miss + trueTTFT = decodeStart.Sub(requestStart) + } + } + + // Coordinator overhead: time between prefill HTTP completion and decode HTTP request start + // This captures the sidecar coordination overhead (JSON parsing, etc.) between prefill and decode stages + // Note: Actual KV cache transfer happens inside vLLM and is not measured here + coordinatorOverhead := decodeStart.Sub(prefillStart.Add(prefillDuration)) + + // For TPOT (Time Per Output Token), we would need to: + // 1. Parse streaming response to detect token boundaries + // 2. Calculate: (total_decode_time - decode_ttft) / (num_output_tokens - 1) + // This is complex and requires response intercepting, so we defer to trace analysis + + currentSpan.SetAttributes( + // End-to-end P/D timing metrics + // These are the metrics that should be used instead of per-instance vLLM metrics + attribute.Float64("llm_d.pd_proxy.total_duration_ms", float64(totalDuration.Milliseconds())), + attribute.Float64("llm_d.pd_proxy.true_ttft_ms", float64(trueTTFT.Milliseconds())), + + // Component breakdown for analysis + attribute.Float64("llm_d.pd_proxy.prefill_duration_ms", float64(prefillDuration.Milliseconds())), + attribute.Float64("llm_d.pd_proxy.decode_duration_ms", float64(decodeDuration.Milliseconds())), + + // Coordination overhead between prefill and decode (sidecar JSON processing) + attribute.Float64("llm_d.pd_proxy.coordinator_overhead_ms", float64(coordinatorOverhead.Milliseconds())), + ) + } } diff --git a/pkg/sidecar/proxy/connector_sglang.go b/pkg/sidecar/proxy/connector_sglang.go index 1e8251380..8b1c196b6 100644 --- a/pkg/sidecar/proxy/connector_sglang.go +++ b/pkg/sidecar/proxy/connector_sglang.go @@ -28,6 +28,11 @@ import ( "strconv" "strings" "time" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/telemetry" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) var ( @@ -77,6 +82,20 @@ func (s *Server) runSGLangProtocol(w http.ResponseWriter, r *http.Request, prefi } func (s *Server) sendSGLangConcurrentRequests(w http.ResponseWriter, r *http.Request, body []byte, prefillHost string) { + tracer := telemetry.Tracer() + ctx := r.Context() + + // Prefill Stage - async + ctx, prefillSpan := tracer.Start(ctx, "llm_d.pd_proxy.prefill", + trace.WithSpanKind(trace.SpanKindInternal), + ) + prefillSpan.SetAttributes( + attribute.String("llm_d.pd_proxy.prefill_target", prefillHost), + attribute.String("llm_d.pd_proxy.connector", "sglang"), + attribute.Bool("llm_d.pd_proxy.prefill.async", true), + ) + prefillStart := time.Now() + // Create separate requests for prefill and decode // Use context.WithoutCancel for prefillReq to prevent it from being aborted // if the main HTTP handler (which serves decodeReq) finishes first. @@ -85,6 +104,8 @@ func (s *Server) sendSGLangConcurrentRequests(w http.ResponseWriter, r *http.Req prefillHandler, err := s.prefillerProxyHandler(prefillHost) if err != nil { + prefillSpan.SetStatus(codes.Error, "failed to create prefill handler") + prefillSpan.End() if err := errorBadGateway(err, w); err != nil { s.logger.Error(err, "failed to send error response to client") } @@ -93,6 +114,7 @@ func (s *Server) sendSGLangConcurrentRequests(w http.ResponseWriter, r *http.Req // Send prefill request asynchronously go func() { + defer prefillSpan.End() defer func() { if rec := recover(); rec != nil && rec != http.ErrAbortHandler { s.logger.Error(fmt.Errorf("panic: %v", rec), "panic in prefill request") @@ -100,11 +122,71 @@ func (s *Server) sendSGLangConcurrentRequests(w http.ResponseWriter, r *http.Req }() pw := &bufferedResponseWriter{} prefillHandler.ServeHTTP(pw, prefillReq) + prefillDuration := time.Since(prefillStart) + prefillSpan.SetAttributes( + attribute.Int("llm_d.pd_proxy.prefill.status_code", pw.statusCode), + attribute.Float64("llm_d.pd_proxy.prefill.duration_ms", float64(prefillDuration.Milliseconds())), + ) + if pw.statusCode < 200 || pw.statusCode >= 300 { + prefillSpan.SetStatus(codes.Error, "prefill request failed") + } s.logger.V(5).Info("prefill request completed", "status", pw.statusCode) }() + // Decode Stage - sync + ctx, decodeSpan := tracer.Start(ctx, "llm_d.pd_proxy.decode", + trace.WithSpanKind(trace.SpanKindInternal), + ) + defer decodeSpan.End() + + decodeSpan.SetAttributes( + attribute.String("llm_d.pd_proxy.connector", "sglang"), + attribute.Bool("llm_d.pd_proxy.decode.concurrent_with_prefill", true), + ) + decodeStart := time.Now() + // Send decode request synchronously + decodeReq = decodeReq.WithContext(ctx) s.decoderProxy.ServeHTTP(w, decodeReq) + + decodeDuration := time.Since(decodeStart) + decodeSpan.SetAttributes( + attribute.Float64("llm_d.pd_proxy.decode.duration_ms", float64(decodeDuration.Milliseconds())), + attribute.String("llm_d.pd_proxy.decode.target", s.decoderURL.Host), + ) + + // Calculate end-to-end P/D metrics and add to decode span + // Note: SGLang runs prefill and decode concurrently, so timing is different from sequential P/D + // Note: After tracer.Start() above, ctx contains the decode span, so SpanFromContext returns it + if currentSpan := trace.SpanFromContext(ctx); currentSpan.SpanContext().IsValid() { + // Get request start time from context + var totalDuration time.Duration + var trueTTFT time.Duration + if requestStartValue := ctx.Value(requestStartTimeKey); requestStartValue != nil { + if requestStart, ok := requestStartValue.(time.Time); ok { + totalDuration = time.Since(requestStart) + + // For SGLang, prefill and decode run concurrently, but True TTFT still needs to capture + // the full coordinator overhead from gateway start to when decode can begin generating. + // This includes: gateway routing + scheduling overhead + time to start decode request + // Note: In concurrent mode, this is different from sequential P/D where we wait for prefill + trueTTFT = decodeStart.Sub(requestStart) + } + } + + currentSpan.SetAttributes( + // End-to-end P/D timing metrics for concurrent P/D + attribute.Float64("llm_d.pd_proxy.total_duration_ms", float64(totalDuration.Milliseconds())), + attribute.Float64("llm_d.pd_proxy.true_ttft_ms", float64(trueTTFT.Milliseconds())), + + // Component breakdown (note: prefill runs concurrently) + attribute.Float64("llm_d.pd_proxy.decode_duration_ms", float64(decodeDuration.Milliseconds())), + + // Note: prefill_duration_ms is tracked in the async prefill span + // SGLang-specific: prefill and decode overlap in time + attribute.Bool("llm_d.pd_proxy.concurrent_pd", true), + ) + } } func cloneWithJSONBody(ctx context.Context, r *http.Request, body []byte) *http.Request { diff --git a/pkg/sidecar/proxy/proxy_helpers.go b/pkg/sidecar/proxy/proxy_helpers.go index 30ba76494..9a67df213 100644 --- a/pkg/sidecar/proxy/proxy_helpers.go +++ b/pkg/sidecar/proxy/proxy_helpers.go @@ -10,6 +10,8 @@ import ( "net/url" "syscall" "time" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) // startHTTP starts the HTTP reverse proxy. @@ -27,8 +29,15 @@ func (s *Server) startHTTP(ctx context.Context, cert *tls.Certificate) error { } s.addr = ln.Addr() + // Wrap handler with OpenTelemetry middleware to extract trace context from incoming requests + handler := otelhttp.NewHandler(s.handler, "llm-d-pd-proxy", + otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string { + return "llm_d.pd_proxy." + r.Method + " " + r.URL.Path + }), + ) + server := &http.Server{ - Handler: s.handler, + Handler: handler, // No ReadTimeout/WriteTimeout for LLM inference - can take hours for large contexts IdleTimeout: 300 * time.Second, // 5 minutes for keep-alive connections ReadHeaderTimeout: 30 * time.Second, // Reasonable for headers only diff --git a/pkg/telemetry/tracing.go b/pkg/telemetry/tracing.go new file mode 100644 index 000000000..7474a286c --- /dev/null +++ b/pkg/telemetry/tracing.go @@ -0,0 +1,131 @@ +/* +Copyright 2025 The llm-d Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package telemetry provides OpenTelemetry tracing initialization and utilities +// for distributed tracing across llm-d components. +package telemetry + +import ( + "context" + "fmt" + "os" + "strconv" + + "github.com/llm-d/llm-d-inference-scheduler/pkg/common" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" + "go.opentelemetry.io/otel/trace" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +const ( + defaultServiceName = "llm-d-inference-scheduler" +) + +var ( + // serviceName holds the service name for the tracer + // Set during InitTracing() from OTEL_SERVICE_NAME env var + serviceName = defaultServiceName +) + +// InitTracing initializes OpenTelemetry tracing with OTLP exporter. +// Configuration is done via environment variables: +// - OTEL_SERVICE_NAME: Service name for tracing (default: llm-d-inference-scheduler) +// - OTEL_EXPORTER_OTLP_ENDPOINT: OTLP collector endpoint (default: http://localhost:4317) +// - OTEL_TRACES_SAMPLER: Sampling strategy (default: parentbased_traceidratio) +// - OTEL_TRACES_SAMPLER_ARG: Sampling ratio (default: 0.1 for 10%) +func InitTracing(ctx context.Context) (func(context.Context) error, error) { + logger := log.FromContext(ctx) + + // Get service name from environment, fallback to default + serviceName = os.Getenv("OTEL_SERVICE_NAME") + if serviceName == "" { + serviceName = defaultServiceName + } + + // Get OTLP endpoint from environment + endpoint := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") + if endpoint == "" { + endpoint = "localhost:4317" + } + + // Strip http:// or https:// prefix if present + // otlptracegrpc.WithEndpoint() expects host:port only + endpoint = common.StripScheme(endpoint) + + logger.Info("Initializing OpenTelemetry tracing", "endpoint", endpoint, "service", serviceName) + + // Create OTLP trace exporter + exporter, err := otlptracegrpc.New(ctx, + otlptracegrpc.WithEndpoint(endpoint), + otlptracegrpc.WithInsecure(), // Use WithTLSCredentials() in production + ) + if err != nil { + return nil, fmt.Errorf("failed to create OTLP trace exporter: %w", err) + } + + // Create resource with service name + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceName(serviceName), + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + // Get sampling ratio from environment, fallback to default + samplingRatio := 0.1 // default 10% sampling + if arg := os.Getenv("OTEL_TRACES_SAMPLER_ARG"); arg != "" { + if ratio, err := strconv.ParseFloat(arg, 64); err == nil && ratio >= 0.0 && ratio <= 1.0 { + samplingRatio = ratio + } else { + logger.Info("Invalid OTEL_TRACES_SAMPLER_ARG, using default", "arg", arg, "default", samplingRatio) + } + } + + logger.Info("Configuring trace sampling", "ratio", samplingRatio) + + // Create trace provider with parent-based sampling + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(res), + sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(samplingRatio))), + ) + + // Set global trace provider + otel.SetTracerProvider(tp) + + // Set W3C trace context propagator + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )) + + logger.Info("OpenTelemetry tracing initialized successfully") + + // Return shutdown function + return tp.Shutdown, nil +} + +// Tracer returns a tracer for the inference scheduler +func Tracer() trace.Tracer { + return otel.Tracer(serviceName) +} From 3a31c9d5999e263f814a477a52548d182fadcdb0 Mon Sep 17 00:00:00 2001 From: sallyom Date: Tue, 2 Dec 2025 15:27:33 -0500 Subject: [PATCH 2/9] update Dockerfile.sidecar Signed-off-by: sallyom --- Dockerfile.sidecar | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile.sidecar b/Dockerfile.sidecar index 9c9e747e2..989fd13be 100644 --- a/Dockerfile.sidecar +++ b/Dockerfile.sidecar @@ -17,6 +17,7 @@ RUN go mod download COPY cmd/pd-sidecar/main.go cmd/cmd.go COPY pkg/sidecar pkg/sidecar COPY pkg/common pkg/common +COPY pkg/telemetry pkg/telemetry # Build # the GOARCH has not a default value to allow the binary be built according to the host where the command From da3c41640860e0d1b47f6947fd9bf3830bc87f14 Mon Sep 17 00:00:00 2001 From: sallyom Date: Tue, 20 Jan 2026 19:59:42 -0500 Subject: [PATCH 3/9] tracing: remove extra success results & startup spans and cleanup Signed-off-by: sallyom --- cmd/epp/main.go | 29 ++++++---------------- cmd/pd-sidecar/main.go | 14 ----------- go.mod | 2 +- pkg/plugins/pre-request/pd_prerequest.go | 6 ----- pkg/plugins/profile/pd_profile_handler.go | 3 ++- pkg/plugins/scorer/precise_prefix_cache.go | 1 - pkg/sidecar/proxy/chat_completions.go | 1 + pkg/telemetry/tracing.go | 15 ++++++----- 8 files changed, 18 insertions(+), 53 deletions(-) diff --git a/cmd/epp/main.go b/cmd/epp/main.go index 65eb8cdb5..4d2d01260 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -27,8 +27,6 @@ package main import ( "os" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/gateway-api-inference-extension/cmd/epp/runner" @@ -46,15 +44,13 @@ func main() { // Log error but don't fail - tracing is optional ctrl.Log.Error(err, "Failed to initialize tracing") } - - // Add startup span to verify tracing is working - tracer := telemetry.Tracer() - ctx, span := tracer.Start(ctx, "llm_d.epp.startup") - defer span.End() - span.SetAttributes( - attribute.String("component", "llm-d-inference-scheduler"), - attribute.String("operation", "startup"), - ) + if shutdownTracing != nil { + defer func() { + if err := shutdownTracing(ctx); err != nil { + ctrl.Log.Error(err, "Failed to shutdown tracing") + } + }() + } // Register llm-d-inference-scheduler plugins plugins.RegisterAllPlugins() @@ -65,17 +61,6 @@ func main() { if err := runner.NewRunner(). WithCustomCollectors(metrics.GetCollectors()...). Run(ctx); err != nil { - span.SetStatus(codes.Error, "startup failed") - if shutdownTracing != nil { - if err := shutdownTracing(ctx); err != nil { - ctrl.Log.Error(err, "Failed to shutdown tracing") - } - } os.Exit(1) } - if shutdownTracing != nil { - if err := shutdownTracing(ctx); err != nil { - ctrl.Log.Error(err, "Failed to shutdown tracing") - } - } } diff --git a/cmd/pd-sidecar/main.go b/cmd/pd-sidecar/main.go index ed04fdc46..45927102e 100644 --- a/cmd/pd-sidecar/main.go +++ b/cmd/pd-sidecar/main.go @@ -30,8 +30,6 @@ import ( "github.com/llm-d/llm-d-inference-scheduler/pkg/sidecar/proxy" "github.com/llm-d/llm-d-inference-scheduler/pkg/sidecar/version" "github.com/llm-d/llm-d-inference-scheduler/pkg/telemetry" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" ) var ( @@ -87,15 +85,6 @@ func main() { }() } - // Add startup span to verify tracing is working - tracer := telemetry.Tracer() - ctx, span := tracer.Start(ctx, "llm_d.pd_proxy.startup") - span.SetAttributes( - attribute.String("component", "llm-d-pd-proxy"), - attribute.String("operation", "startup"), - ) - defer span.End() - logger.Info("Proxy starting", "Built on", version.BuildRef, "From Git SHA", version.CommitSHA) // Validate connector @@ -134,7 +123,6 @@ func main() { targetURL, err := url.Parse(scheme + "://localhost:" + *vLLMPort) if err != nil { logger.Error(err, "failed to create targetURL") - span.SetStatus(codes.Error, "failed to create targetURL") return } @@ -148,7 +136,6 @@ func main() { } if err != nil { logger.Error(err, "failed to create TLS certificate") - span.SetStatus(codes.Error, "failed to create TLS certificate") return } cert = &tempCert @@ -167,7 +154,6 @@ func main() { validator, err := proxy.NewAllowlistValidator(*enableSSRFProtection, *poolGroup, *inferencePoolNamespace, *inferencePoolName) if err != nil { logger.Error(err, "failed to create SSRF protection validator") - span.SetStatus(codes.Error, "failed to create SSRF protection validator") return } diff --git a/go.mod b/go.mod index 0b6af436e..f0818e201 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/openai/openai-go v1.12.0 github.com/prometheus/client_golang v1.23.2 github.com/stretchr/testify v1.11.1 - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 go.opentelemetry.io/otel v1.39.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 go.opentelemetry.io/otel/sdk v1.39.0 diff --git a/pkg/plugins/pre-request/pd_prerequest.go b/pkg/plugins/pre-request/pd_prerequest.go index 7a9b619c5..77410a03a 100644 --- a/pkg/plugins/pre-request/pd_prerequest.go +++ b/pkg/plugins/pre-request/pd_prerequest.go @@ -77,12 +77,6 @@ func (p *PrefillHeaderHandler) PreRequest(ctx context.Context, request *scheduli ) defer span.End() - // Add component and request attributes - span.SetAttributes( - attribute.String("component", "llm-d-inference-scheduler"), - attribute.String("operation", "prefill_disaggregation"), - ) - if request != nil && request.TargetModel != "" { span.SetAttributes(attribute.String("gen_ai.request.model", request.TargetModel)) } diff --git a/pkg/plugins/profile/pd_profile_handler.go b/pkg/plugins/profile/pd_profile_handler.go index 891f86570..1913b1041 100644 --- a/pkg/plugins/profile/pd_profile_handler.go +++ b/pkg/plugins/profile/pd_profile_handler.go @@ -10,6 +10,7 @@ import ( "strconv" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" "sigs.k8s.io/controller-runtime/pkg/log" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/common/util/logging" @@ -194,7 +195,7 @@ func (h *PdProfileHandler) Pick(ctx context.Context, _ *scheduling.CycleState, r inputTokens, err := getUserInputLenInTokens(request) if err != nil { log.FromContext(ctx).V(logutil.DEBUG).Error(err, "Failed to get user input") - span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, err.Error()) return nil } diff --git a/pkg/plugins/scorer/precise_prefix_cache.go b/pkg/plugins/scorer/precise_prefix_cache.go index e935041d9..fb539429a 100644 --- a/pkg/plugins/scorer/precise_prefix_cache.go +++ b/pkg/plugins/scorer/precise_prefix_cache.go @@ -290,7 +290,6 @@ func (s *PrecisePrefixCacheScorer) Score(ctx context.Context, cycleState *schedu ) } - span.SetAttributes(attribute.String("llm_d.scorer.result", "success")) return normalizedScores } diff --git a/pkg/sidecar/proxy/chat_completions.go b/pkg/sidecar/proxy/chat_completions.go index 8299c6b4a..f8420e31d 100644 --- a/pkg/sidecar/proxy/chat_completions.go +++ b/pkg/sidecar/proxy/chat_completions.go @@ -56,6 +56,7 @@ func (s *Server) chatCompletionsHandler(w http.ResponseWriter, r *http.Request) span.SetAttributes( attribute.String("llm_d.pd_proxy.connector", s.config.Connector), + attribute.String("llm_d.pd_proxy.request_path", r.URL.Path), ) var prefillHostPorts []string diff --git a/pkg/telemetry/tracing.go b/pkg/telemetry/tracing.go index 7474a286c..3c12a4299 100644 --- a/pkg/telemetry/tracing.go +++ b/pkg/telemetry/tracing.go @@ -37,12 +37,9 @@ import ( const ( defaultServiceName = "llm-d-inference-scheduler" -) -var ( - // serviceName holds the service name for the tracer - // Set during InitTracing() from OTEL_SERVICE_NAME env var - serviceName = defaultServiceName + // instrumentationName identifies this instrumentation library in traces. + instrumentationName = "llm-d-inference-scheduler" ) // InitTracing initializes OpenTelemetry tracing with OTLP exporter. @@ -55,7 +52,7 @@ func InitTracing(ctx context.Context) (func(context.Context) error, error) { logger := log.FromContext(ctx) // Get service name from environment, fallback to default - serviceName = os.Getenv("OTEL_SERVICE_NAME") + serviceName := os.Getenv("OTEL_SERVICE_NAME") if serviceName == "" { serviceName = defaultServiceName } @@ -125,7 +122,9 @@ func InitTracing(ctx context.Context) (func(context.Context) error, error) { return tp.Shutdown, nil } -// Tracer returns a tracer for the inference scheduler +// Tracer returns a tracer for the inference scheduler. +// The tracer is identified by the instrumentation library name, which is +// distinct from the service name set during InitTracing(). func Tracer() trace.Tracer { - return otel.Tracer(serviceName) + return otel.Tracer(instrumentationName) } From 104f3c8cc7c689c750a165adabf2587d9c8753c2 Mon Sep 17 00:00:00 2001 From: sallyom Date: Thu, 12 Feb 2026 11:56:49 -0500 Subject: [PATCH 4/9] fix: avoid os.Exit bypassing defer in main Co-Authored-By: Claude Opus 4.6 Signed-off-by: sallyom --- cmd/epp/main.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/epp/main.go b/cmd/epp/main.go index 4d2d01260..0638f6c4c 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -36,6 +36,10 @@ import ( ) func main() { + os.Exit(run()) +} + +func run() int { ctx := ctrl.SetupSignalHandler() // Initialize tracing before creating any spans @@ -61,6 +65,7 @@ func main() { if err := runner.NewRunner(). WithCustomCollectors(metrics.GetCollectors()...). Run(ctx); err != nil { - os.Exit(1) + return 1 } + return 0 } From 5ddef7a5831272d6c906bb92693f68d807676eff Mon Sep 17 00:00:00 2001 From: sallyom Date: Thu, 12 Feb 2026 15:26:30 -0500 Subject: [PATCH 5/9] fix: address review nits for tracing PR Co-Authored-By: Claude Opus 4.6 Signed-off-by: sallyom --- pkg/common/common.go | 14 ++++++++------ pkg/plugins/pre-request/pd_prerequest.go | 4 ++-- pkg/plugins/profile/pd_profile_handler.go | 2 +- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/pkg/common/common.go b/pkg/common/common.go index a9cac8290..2fb6f45b4 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -4,7 +4,7 @@ //revive:disable:var-naming package common -import "strings" +import "net/url" const ( // PrefillPodHeader is the header name used to indicate Prefill worker @@ -14,10 +14,12 @@ const ( DataParallelPodHeader = "x-data-parallel-host-port" ) -// StripScheme removes http:// or https:// prefix from endpoint URL -// This is useful for gRPC clients that expect host:port format only +// StripScheme removes the scheme from an endpoint URL, returning host:port. +// This is useful for gRPC clients that expect host:port format only. func StripScheme(endpoint string) string { - endpoint = strings.TrimPrefix(endpoint, "http://") - endpoint = strings.TrimPrefix(endpoint, "https://") - return endpoint + u, err := url.Parse(endpoint) + if err != nil || u.Host == "" { + return endpoint // not a valid URL, return as-is + } + return u.Host } diff --git a/pkg/plugins/pre-request/pd_prerequest.go b/pkg/plugins/pre-request/pd_prerequest.go index 77410a03a..9bfda5a4f 100644 --- a/pkg/plugins/pre-request/pd_prerequest.go +++ b/pkg/plugins/pre-request/pd_prerequest.go @@ -90,7 +90,7 @@ func (p *PrefillHeaderHandler) PreRequest(ctx context.Context, request *scheduli prefillProfileRunResult, exists := schedulingResult.ProfileResults[p.prefillProfile] if !exists { span.SetAttributes( - attribute.Bool("llm_d.epp.pd.disaggregation_enabled", false), + attribute.Bool("llm_d.epp.pd.disaggregation_used", false), attribute.String("llm_d.epp.pd.reason", "no_prefill_profile_result"), ) return // prefill profile failed to run or we chose not to run it, no-op in this case @@ -101,7 +101,7 @@ func (p *PrefillHeaderHandler) PreRequest(ctx context.Context, request *scheduli request.Headers[common.PrefillPodHeader] = prefillHostPort // in the form of span.SetAttributes( - attribute.Bool("llm_d.epp.pd.disaggregation_enabled", true), + attribute.Bool("llm_d.epp.pd.disaggregation_used", true), attribute.String("llm_d.epp.pd.prefill_pod_address", targetPod.Address), attribute.String("llm_d.epp.pd.prefill_pod_port", targetPod.Port), ) diff --git a/pkg/plugins/profile/pd_profile_handler.go b/pkg/plugins/profile/pd_profile_handler.go index 1913b1041..d0e1038f5 100644 --- a/pkg/plugins/profile/pd_profile_handler.go +++ b/pkg/plugins/profile/pd_profile_handler.go @@ -149,7 +149,7 @@ func (h *PdProfileHandler) Pick(ctx context.Context, _ *scheduling.CycleState, r profileResults map[string]*scheduling.ProfileRunResult) map[string]scheduling.SchedulerProfile { // Start tracing span for profile picking operation tracer := telemetry.Tracer() - ctx, span := tracer.Start(ctx, "llm_d.epp.profile_handler.pick", + ctx, span := tracer.Start(ctx, "llm_d.epp.pd.profile_handler.pick", trace.WithSpanKind(trace.SpanKindInternal), ) defer span.End() From 9aedcc978cb9c85200eb73217985c6746b56cd53 Mon Sep 17 00:00:00 2001 From: sallyom Date: Thu, 12 Feb 2026 15:35:05 -0500 Subject: [PATCH 6/9] test: add edge case tests for StripScheme Co-Authored-By: Claude Opus 4.6 Signed-off-by: sallyom --- pkg/common/common_test.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go index fd60671e4..7ee68607a 100644 --- a/pkg/common/common_test.go +++ b/pkg/common/common_test.go @@ -1,5 +1,5 @@ /* -Copyright 2025 The llm-d Authors. +Copyright 2026 The llm-d Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -74,6 +74,21 @@ func TestStripScheme(t *testing.T) { input: "10.0.0.1:4317", expected: "10.0.0.1:4317", }, + { + name: "schemeless with double slash", + input: "//192.168.1.1:80", + expected: "192.168.1.1:80", + }, + { + name: "uppercase scheme", + input: "HTTP://localhost:4317", + expected: "localhost:4317", + }, + { + name: "port only", + input: ":9090", + expected: ":9090", + }, } for _, tt := range tests { From 693a0294eb5801e2715e815f7b15483dd68c33d1 Mon Sep 17 00:00:00 2001 From: sallyom Date: Mon, 23 Feb 2026 13:34:26 -0500 Subject: [PATCH 7/9] remove extra comments from sidecar spans Signed-off-by: sallyom --- pkg/sidecar/proxy/chat_completions.go | 4 ++-- pkg/sidecar/proxy/connector_nixlv2.go | 30 ++++----------------------- pkg/sidecar/proxy/connector_sglang.go | 18 +++------------- 3 files changed, 9 insertions(+), 43 deletions(-) diff --git a/pkg/sidecar/proxy/chat_completions.go b/pkg/sidecar/proxy/chat_completions.go index f8420e31d..eb08bf123 100644 --- a/pkg/sidecar/proxy/chat_completions.go +++ b/pkg/sidecar/proxy/chat_completions.go @@ -84,7 +84,7 @@ func (s *Server) chatCompletionsHandler(w http.ResponseWriter, r *http.Request) if len(prefillHostPort) == 0 { s.logger.V(4).Info("skip disaggregated prefill") span.SetAttributes( - attribute.Bool("llm_d.pd_proxy.disaggregation_enabled", false), + attribute.Bool("llm_d.pd_proxy.disaggregation_used", false), attribute.String("llm_d.pd_proxy.reason", "no_prefill_header"), ) @@ -95,7 +95,7 @@ func (s *Server) chatCompletionsHandler(w http.ResponseWriter, r *http.Request) } span.SetAttributes( - attribute.Bool("llm_d.pd_proxy.disaggregation_enabled", true), + attribute.Bool("llm_d.pd_proxy.disaggregation_used", true), attribute.String("llm_d.pd_proxy.prefill_target", prefillHostPort), attribute.Int("llm_d.pd_proxy.prefill_candidates", numHosts), ) diff --git a/pkg/sidecar/proxy/connector_nixlv2.go b/pkg/sidecar/proxy/connector_nixlv2.go index 45eb19392..c2b3001c8 100644 --- a/pkg/sidecar/proxy/connector_nixlv2.go +++ b/pkg/sidecar/proxy/connector_nixlv2.go @@ -221,49 +221,27 @@ func (s *Server) runNIXLProtocolV2(w http.ResponseWriter, r *http.Request, prefi decodeDuration := time.Since(decodeStart) decodeSpan.SetAttributes(attribute.Float64("llm_d.pd_proxy.decode.duration_ms", float64(decodeDuration.Milliseconds()))) - // Calculate end-to-end P/D metrics and add to decode span - // These metrics represent the "true" TTFT and latency from the coordinator's perspective - // Note: After tracer.Start() above, ctx contains the decode span, so SpanFromContext returns it + // Calculate end-to-end P/D timing metrics. + // True TTFT captures time from gateway request start to decode start, including + // gateway routing, scheduling, prefill, and coordination overhead that + // per-instance vLLM metrics miss. if currentSpan := trace.SpanFromContext(ctx); currentSpan.SpanContext().IsValid() { - // Get request start time from context var totalDuration time.Duration var trueTTFT time.Duration if requestStartValue := ctx.Value(requestStartTimeKey); requestStartValue != nil { if requestStart, ok := requestStartValue.(time.Time); ok { totalDuration = time.Since(requestStart) - - // The "true TTFT" in P/D mode is the time until the decoder can start generating - // This includes: gateway routing + scheduling + prefill time + KV transfer coordination overhead - // The decode vLLM will report a low TTFT (since KV is already transferred), - // but this captures the real end-to-end TTFT from the client's perspective - // - // True TTFT = time from gateway request start to decode start - // This includes all coordinator overhead that vLLM-level metrics miss trueTTFT = decodeStart.Sub(requestStart) } } - // Coordinator overhead: time between prefill HTTP completion and decode HTTP request start - // This captures the sidecar coordination overhead (JSON parsing, etc.) between prefill and decode stages - // Note: Actual KV cache transfer happens inside vLLM and is not measured here coordinatorOverhead := decodeStart.Sub(prefillStart.Add(prefillDuration)) - // For TPOT (Time Per Output Token), we would need to: - // 1. Parse streaming response to detect token boundaries - // 2. Calculate: (total_decode_time - decode_ttft) / (num_output_tokens - 1) - // This is complex and requires response intercepting, so we defer to trace analysis - currentSpan.SetAttributes( - // End-to-end P/D timing metrics - // These are the metrics that should be used instead of per-instance vLLM metrics attribute.Float64("llm_d.pd_proxy.total_duration_ms", float64(totalDuration.Milliseconds())), attribute.Float64("llm_d.pd_proxy.true_ttft_ms", float64(trueTTFT.Milliseconds())), - - // Component breakdown for analysis attribute.Float64("llm_d.pd_proxy.prefill_duration_ms", float64(prefillDuration.Milliseconds())), attribute.Float64("llm_d.pd_proxy.decode_duration_ms", float64(decodeDuration.Milliseconds())), - - // Coordination overhead between prefill and decode (sidecar JSON processing) attribute.Float64("llm_d.pd_proxy.coordinator_overhead_ms", float64(coordinatorOverhead.Milliseconds())), ) } diff --git a/pkg/sidecar/proxy/connector_sglang.go b/pkg/sidecar/proxy/connector_sglang.go index 8b1c196b6..ee11a94ab 100644 --- a/pkg/sidecar/proxy/connector_sglang.go +++ b/pkg/sidecar/proxy/connector_sglang.go @@ -155,35 +155,23 @@ func (s *Server) sendSGLangConcurrentRequests(w http.ResponseWriter, r *http.Req attribute.String("llm_d.pd_proxy.decode.target", s.decoderURL.Host), ) - // Calculate end-to-end P/D metrics and add to decode span - // Note: SGLang runs prefill and decode concurrently, so timing is different from sequential P/D - // Note: After tracer.Start() above, ctx contains the decode span, so SpanFromContext returns it + // Calculate end-to-end P/D timing metrics for concurrent P/D. + // True TTFT captures time from gateway request start to decode start. + // In SGLang's concurrent mode, prefill duration is tracked in the async prefill span. if currentSpan := trace.SpanFromContext(ctx); currentSpan.SpanContext().IsValid() { - // Get request start time from context var totalDuration time.Duration var trueTTFT time.Duration if requestStartValue := ctx.Value(requestStartTimeKey); requestStartValue != nil { if requestStart, ok := requestStartValue.(time.Time); ok { totalDuration = time.Since(requestStart) - - // For SGLang, prefill and decode run concurrently, but True TTFT still needs to capture - // the full coordinator overhead from gateway start to when decode can begin generating. - // This includes: gateway routing + scheduling overhead + time to start decode request - // Note: In concurrent mode, this is different from sequential P/D where we wait for prefill trueTTFT = decodeStart.Sub(requestStart) } } currentSpan.SetAttributes( - // End-to-end P/D timing metrics for concurrent P/D attribute.Float64("llm_d.pd_proxy.total_duration_ms", float64(totalDuration.Milliseconds())), attribute.Float64("llm_d.pd_proxy.true_ttft_ms", float64(trueTTFT.Milliseconds())), - - // Component breakdown (note: prefill runs concurrently) attribute.Float64("llm_d.pd_proxy.decode_duration_ms", float64(decodeDuration.Milliseconds())), - - // Note: prefill_duration_ms is tracked in the async prefill span - // SGLang-specific: prefill and decode overlap in time attribute.Bool("llm_d.pd_proxy.concurrent_pd", true), ) } From e7b7d297d45f2a584e6891a01af764c801ad9c30 Mon Sep 17 00:00:00 2001 From: sallyom Date: Tue, 24 Feb 2026 15:17:08 -0500 Subject: [PATCH 8/9] fix lint error Signed-off-by: sallyom --- pkg/common/common_test.go | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/pkg/common/common_test.go b/pkg/common/common_test.go index 7ee68607a..dc20b3607 100644 --- a/pkg/common/common_test.go +++ b/pkg/common/common_test.go @@ -1,19 +1,7 @@ -/* -Copyright 2026 The llm-d Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - +// Package common contains items common to both the +// EPP/Inference-Scheduler and the Routing Sidecar +// +//revive:disable:var-naming package common import "testing" From d74715349cea8fc699b22935a6eb9e376ee36702 Mon Sep 17 00:00:00 2001 From: greg pereira Date: Tue, 24 Feb 2026 13:11:59 -0800 Subject: [PATCH 9/9] protect against segfault on tests Signed-off-by: greg pereira --- pkg/sidecar/proxy/chat_completions.go | 7 ++++++- pkg/sidecar/proxy/proxy_helpers.go | 6 +++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/pkg/sidecar/proxy/chat_completions.go b/pkg/sidecar/proxy/chat_completions.go index eb08bf123..1c367b9d2 100644 --- a/pkg/sidecar/proxy/chat_completions.go +++ b/pkg/sidecar/proxy/chat_completions.go @@ -54,9 +54,14 @@ func (s *Server) chatCompletionsHandler(w http.ResponseWriter, r *http.Request) ctx = context.WithValue(ctx, requestStartTimeKey, requestStart) r = r.WithContext(ctx) + // Set span attributes with safe defaults for nil values + requestPath := "" + if r.URL != nil { + requestPath = r.URL.Path + } span.SetAttributes( attribute.String("llm_d.pd_proxy.connector", s.config.Connector), - attribute.String("llm_d.pd_proxy.request_path", r.URL.Path), + attribute.String("llm_d.pd_proxy.request_path", requestPath), ) var prefillHostPorts []string diff --git a/pkg/sidecar/proxy/proxy_helpers.go b/pkg/sidecar/proxy/proxy_helpers.go index 9a67df213..8be062363 100644 --- a/pkg/sidecar/proxy/proxy_helpers.go +++ b/pkg/sidecar/proxy/proxy_helpers.go @@ -32,7 +32,11 @@ func (s *Server) startHTTP(ctx context.Context, cert *tls.Certificate) error { // Wrap handler with OpenTelemetry middleware to extract trace context from incoming requests handler := otelhttp.NewHandler(s.handler, "llm-d-pd-proxy", otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string { - return "llm_d.pd_proxy." + r.Method + " " + r.URL.Path + path := "" + if r.URL != nil { + path = r.URL.Path + } + return "llm_d.pd_proxy." + r.Method + " " + path }), )