Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/semantic-router/pkg/extproc/processor_req_body.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"github.com/openai/openai-go"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -299,6 +300,36 @@ func (r *OpenAIRouter) modifyRequestBodyForAutoRouting(openAIRequest *openai.Cha
return modifiedBody, nil
}

// startUpstreamSpanAndInjectHeaders starts an upstream request span and returns trace context headers.
// The span will be ended when response headers arrive in handleResponseHeaders.
func (r *OpenAIRouter) startUpstreamSpanAndInjectHeaders(model string, endpoint string, ctx *RequestContext) []*core.HeaderValueOption {
var traceContextHeaders []*core.HeaderValueOption

// Start upstream request span (will be ended when response headers arrive)
spanCtx, upstreamSpan := tracing.StartSpan(ctx.TraceContext, tracing.SpanUpstreamRequest,
trace.WithSpanKind(trace.SpanKindClient))
ctx.TraceContext = spanCtx
ctx.UpstreamSpan = upstreamSpan

// Set span attributes for upstream request
tracing.SetSpanAttributes(upstreamSpan,
attribute.String(tracing.AttrModelName, model),
attribute.String(tracing.AttrEndpointAddress, endpoint))

// Inject W3C trace context headers for distributed tracing to vLLM
traceHeaders := tracing.InjectTraceContextToSlice(spanCtx)
for _, th := range traceHeaders {
traceContextHeaders = append(traceContextHeaders, &core.HeaderValueOption{
Header: &core.HeaderValue{
Key: th[0],
RawValue: []byte(th[1]),
},
})
}

return traceContextHeaders
}

// createRoutingResponse creates a routing response with mutations
func (r *OpenAIRouter) createRoutingResponse(model string, endpoint string, modifiedBody []byte, ctx *RequestContext) *ext_proc.ProcessingResponse {
bodyMutation := &ext_proc.BodyMutation{
Expand All @@ -322,6 +353,10 @@ func (r *OpenAIRouter) createRoutingResponse(model string, endpoint string, modi

logging.Infof("createRoutingResponse: modifiedBody length=%d, model=%s, endpoint=%s", len(modifiedBody), model, endpoint)

// Start upstream span and inject trace context headers
traceContextHeaders := r.startUpstreamSpanAndInjectHeaders(model, endpoint, ctx)
setHeaders = append(setHeaders, traceContextHeaders...)

// Add standard routing headers
if endpoint != "" {
setHeaders = append(setHeaders, &core.HeaderValueOption{
Expand Down Expand Up @@ -387,6 +422,10 @@ func (r *OpenAIRouter) createSpecifiedModelResponse(model string, endpoint strin
setHeaders := []*core.HeaderValueOption{}
removeHeaders := []string{}

// Start upstream span and inject trace context headers
traceContextHeaders := r.startUpstreamSpanAndInjectHeaders(model, endpoint, ctx)
setHeaders = append(setHeaders, traceContextHeaders...)

if endpoint != "" {
setHeaders = append(setHeaders, &core.HeaderValueOption{
Header: &core.HeaderValue{
Expand Down
1 change: 1 addition & 0 deletions src/semantic-router/pkg/extproc/processor_req_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type RequestContext struct {

// Tracing context
TraceContext context.Context // OpenTelemetry trace context for span propagation
UpstreamSpan trace.Span // Span for tracking upstream vLLM request duration

// Response API context
ResponseAPICtx *ResponseAPIContext // Non-nil if this is a Response API request
Expand Down
18 changes: 18 additions & 0 deletions src/semantic-router/pkg/extproc/processor_res_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import (
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
http_ext "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3"
ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"

"github.com/vllm-project/semantic-router/src/semantic-router/pkg/headers"
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/metrics"
"github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/tracing"
)

// handleResponseHeaders processes the response headers
Expand All @@ -35,6 +38,21 @@ func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_Respo
}
}

// End upstream request span (started in createRoutingResponse/createSpecifiedModelResponse)
if ctx != nil && ctx.UpstreamSpan != nil {
// Add response status to span
tracing.SetSpanAttributes(ctx.UpstreamSpan,
attribute.Int("http.status_code", statusCode))

// Mark span as error if response was not successful
if !isSuccessful && statusCode != 0 {
ctx.UpstreamSpan.SetStatus(codes.Error, "upstream request failed")
}

ctx.UpstreamSpan.End()
ctx.UpstreamSpan = nil
}

// Best-effort TTFT measurement:
// - For non-streaming responses, record on first response headers (approx TTFB ~= TTFT)
// - For streaming responses (SSE), defer TTFT until the first response body chunk arrives
Expand Down
50 changes: 50 additions & 0 deletions src/semantic-router/pkg/observability/tracing/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,56 @@ func TestStartSpanWithNilContext(t *testing.T) {
span.End()
}

func TestInjectTraceContextToSlice(t *testing.T) {
// Initialize tracing
ctx := context.Background()
cfg := TracingConfig{
Enabled: true,
Provider: "opentelemetry",
ExporterType: "stdout",
SamplingType: "always_on",
ServiceName: "test-service",
ServiceVersion: "v1.0.0",
DeploymentEnvironment: "test",
}

err := InitTracing(ctx, cfg)
if err != nil {
t.Fatalf("Failed to initialize tracing: %v", err)
}
defer func() {
shutdownCtx := context.Background()
_ = ShutdownTracing(shutdownCtx)
}()

// Create a span to establish trace context
spanCtx, span := StartSpan(ctx, SpanUpstreamRequest)
defer span.End()

// Test InjectTraceContextToSlice
headers := InjectTraceContextToSlice(spanCtx)

// Should have traceparent header
hasTraceparent := false
for _, h := range headers {
if h[0] == "traceparent" {
hasTraceparent = true
// Validate format: 00-<trace-id>-<span-id>-<flags>
// Example: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01
if len(h[1]) < 55 {
t.Errorf("traceparent header too short: %s", h[1])
}
// Should start with version "00-"
if h[1][:3] != "00-" {
t.Errorf("traceparent header should start with '00-': %s", h[1])
}
}
}
if !hasTraceparent {
t.Error("InjectTraceContextToSlice did not produce traceparent header")
}
}

func TestSpanAttributeConstants(t *testing.T) {
// Verify span name constants are defined
spanNames := []string{
Expand Down
Loading