diff --git a/src/semantic-router/pkg/extproc/processor_req_body.go b/src/semantic-router/pkg/extproc/processor_req_body.go index fa9747d6f8..f2b546f3b1 100644 --- a/src/semantic-router/pkg/extproc/processor_req_body.go +++ b/src/semantic-router/pkg/extproc/processor_req_body.go @@ -8,6 +8,7 @@ import ( ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/openai/openai-go" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -299,6 +300,36 @@ func (r *OpenAIRouter) modifyRequestBodyForAutoRouting(openAIRequest *openai.Cha return modifiedBody, nil } +// startUpstreamSpanAndInjectHeaders starts an upstream request span and returns trace context headers. +// The span will be ended when response headers arrive in handleResponseHeaders. +func (r *OpenAIRouter) startUpstreamSpanAndInjectHeaders(model string, endpoint string, ctx *RequestContext) []*core.HeaderValueOption { + var traceContextHeaders []*core.HeaderValueOption + + // Start upstream request span (will be ended when response headers arrive) + spanCtx, upstreamSpan := tracing.StartSpan(ctx.TraceContext, tracing.SpanUpstreamRequest, + trace.WithSpanKind(trace.SpanKindClient)) + ctx.TraceContext = spanCtx + ctx.UpstreamSpan = upstreamSpan + + // Set span attributes for upstream request + tracing.SetSpanAttributes(upstreamSpan, + attribute.String(tracing.AttrModelName, model), + attribute.String(tracing.AttrEndpointAddress, endpoint)) + + // Inject W3C trace context headers for distributed tracing to vLLM + traceHeaders := tracing.InjectTraceContextToSlice(spanCtx) + for _, th := range traceHeaders { + traceContextHeaders = append(traceContextHeaders, &core.HeaderValueOption{ + Header: &core.HeaderValue{ + Key: th[0], + RawValue: []byte(th[1]), + }, + }) + } + + return traceContextHeaders +} + // createRoutingResponse creates a routing response with mutations func (r *OpenAIRouter) createRoutingResponse(model string, endpoint string, modifiedBody []byte, ctx *RequestContext) *ext_proc.ProcessingResponse { bodyMutation := &ext_proc.BodyMutation{ @@ -322,6 +353,10 @@ func (r *OpenAIRouter) createRoutingResponse(model string, endpoint string, modi logging.Infof("createRoutingResponse: modifiedBody length=%d, model=%s, endpoint=%s", len(modifiedBody), model, endpoint) + // Start upstream span and inject trace context headers + traceContextHeaders := r.startUpstreamSpanAndInjectHeaders(model, endpoint, ctx) + setHeaders = append(setHeaders, traceContextHeaders...) + // Add standard routing headers if endpoint != "" { setHeaders = append(setHeaders, &core.HeaderValueOption{ @@ -387,6 +422,10 @@ func (r *OpenAIRouter) createSpecifiedModelResponse(model string, endpoint strin setHeaders := []*core.HeaderValueOption{} removeHeaders := []string{} + // Start upstream span and inject trace context headers + traceContextHeaders := r.startUpstreamSpanAndInjectHeaders(model, endpoint, ctx) + setHeaders = append(setHeaders, traceContextHeaders...) + if endpoint != "" { setHeaders = append(setHeaders, &core.HeaderValueOption{ Header: &core.HeaderValue{ diff --git a/src/semantic-router/pkg/extproc/processor_req_header.go b/src/semantic-router/pkg/extproc/processor_req_header.go index 3959014ea6..4a14f717ff 100644 --- a/src/semantic-router/pkg/extproc/processor_req_header.go +++ b/src/semantic-router/pkg/extproc/processor_req_header.go @@ -76,6 +76,7 @@ type RequestContext struct { // Tracing context TraceContext context.Context // OpenTelemetry trace context for span propagation + UpstreamSpan trace.Span // Span for tracking upstream vLLM request duration // Response API context ResponseAPICtx *ResponseAPIContext // Non-nil if this is a Response API request diff --git a/src/semantic-router/pkg/extproc/processor_res_header.go b/src/semantic-router/pkg/extproc/processor_res_header.go index 068a89a483..6670459253 100644 --- a/src/semantic-router/pkg/extproc/processor_res_header.go +++ b/src/semantic-router/pkg/extproc/processor_res_header.go @@ -8,9 +8,12 @@ import ( core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" http_ext "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/headers" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/metrics" + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/tracing" ) // handleResponseHeaders processes the response headers @@ -35,6 +38,21 @@ func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_Respo } } + // End upstream request span (started in createRoutingResponse/createSpecifiedModelResponse) + if ctx != nil && ctx.UpstreamSpan != nil { + // Add response status to span + tracing.SetSpanAttributes(ctx.UpstreamSpan, + attribute.Int("http.status_code", statusCode)) + + // Mark span as error if response was not successful + if !isSuccessful && statusCode != 0 { + ctx.UpstreamSpan.SetStatus(codes.Error, "upstream request failed") + } + + ctx.UpstreamSpan.End() + ctx.UpstreamSpan = nil + } + // Best-effort TTFT measurement: // - For non-streaming responses, record on first response headers (approx TTFB ~= TTFT) // - For streaming responses (SSE), defer TTFT until the first response body chunk arrives diff --git a/src/semantic-router/pkg/observability/tracing/tracing_test.go b/src/semantic-router/pkg/observability/tracing/tracing_test.go index 10173a0756..7dbe67c257 100644 --- a/src/semantic-router/pkg/observability/tracing/tracing_test.go +++ b/src/semantic-router/pkg/observability/tracing/tracing_test.go @@ -185,6 +185,56 @@ func TestStartSpanWithNilContext(t *testing.T) { span.End() } +func TestInjectTraceContextToSlice(t *testing.T) { + // Initialize tracing + ctx := context.Background() + cfg := TracingConfig{ + Enabled: true, + Provider: "opentelemetry", + ExporterType: "stdout", + SamplingType: "always_on", + ServiceName: "test-service", + ServiceVersion: "v1.0.0", + DeploymentEnvironment: "test", + } + + err := InitTracing(ctx, cfg) + if err != nil { + t.Fatalf("Failed to initialize tracing: %v", err) + } + defer func() { + shutdownCtx := context.Background() + _ = ShutdownTracing(shutdownCtx) + }() + + // Create a span to establish trace context + spanCtx, span := StartSpan(ctx, SpanUpstreamRequest) + defer span.End() + + // Test InjectTraceContextToSlice + headers := InjectTraceContextToSlice(spanCtx) + + // Should have traceparent header + hasTraceparent := false + for _, h := range headers { + if h[0] == "traceparent" { + hasTraceparent = true + // Validate format: 00--- + // Example: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01 + if len(h[1]) < 55 { + t.Errorf("traceparent header too short: %s", h[1]) + } + // Should start with version "00-" + if h[1][:3] != "00-" { + t.Errorf("traceparent header should start with '00-': %s", h[1]) + } + } + } + if !hasTraceparent { + t.Error("InjectTraceContextToSlice did not produce traceparent header") + } +} + func TestSpanAttributeConstants(t *testing.T) { // Verify span name constants are defined spanNames := []string{