diff --git a/cmd/aigw/docker-compose-otel.yaml b/cmd/aigw/docker-compose-otel.yaml index 4582f1bb1f..da099bec51 100644 --- a/cmd/aigw/docker-compose-otel.yaml +++ b/cmd/aigw/docker-compose-otel.yaml @@ -60,6 +60,10 @@ services: environment: - OPENAI_BASE_URL=http://host.docker.internal:11434/v1 - OPENAI_API_KEY=unused + # Map HTTP request headers to otel span and metric attributes for session tracking + # Format: header:attribute,header:attribute + - OTEL_AIGW_SPAN_REQUEST_HEADER_ATTRIBUTES=x-session-id:session.id,x-user-id:user.id + - OTEL_AIGW_METRICS_REQUEST_HEADER_ATTRIBUTES=x-user-id:user.id ports: - "1975:1975" # OpenAI compatible endpoint at /v1 - "1064:1064" # Admin server: /metrics (Prometheus) and /health endpoints diff --git a/cmd/aigw/run.go b/cmd/aigw/run.go index 415d088503..2dd852f1eb 100644 --- a/cmd/aigw/run.go +++ b/cmd/aigw/run.go @@ -419,6 +419,13 @@ func (runCtx *runCmdContext) mustStartExtProc( args = append(args, "--logLevel", "warn") } + if metricsAttrs := os.Getenv("OTEL_AIGW_METRICS_REQUEST_HEADER_ATTRIBUTES"); metricsAttrs != "" { + args = append(args, "-metricsRequestHeaderAttributes", metricsAttrs) + } + if spanAttrs := os.Getenv("OTEL_AIGW_SPAN_REQUEST_HEADER_ATTRIBUTES"); spanAttrs != "" { + args = append(args, "-spanRequestHeaderAttributes", spanAttrs) + } + done := make(chan error) go func() { if err := runCtx.extProcLauncher(ctx, args, os.Stderr); err != nil { diff --git a/cmd/aigw/run_test.go b/cmd/aigw/run_test.go index 3025ca72c7..ed02651375 100644 --- a/cmd/aigw/run_test.go +++ b/cmd/aigw/run_test.go @@ -176,6 +176,41 @@ func Test_mustStartExtProc(t *testing.T) { require.ErrorIs(t, <-done, mockErr) } +func Test_mustStartExtProc_withHeaderAttributes(t *testing.T) { + t.Setenv("OTEL_AIGW_METRICS_REQUEST_HEADER_ATTRIBUTES", "x-team-id:team.id,x-user-id:user.id") + t.Setenv("OTEL_AIGW_SPAN_REQUEST_HEADER_ATTRIBUTES", "x-session-id:session.id,x-user-id:user.id") + + var capturedArgs []string + runCtx := &runCmdContext{ + tmpdir: t.TempDir(), + adminPort: 1064, + extProcLauncher: func(_ context.Context, args []string, _ io.Writer) error { + capturedArgs = args + return errors.New("mock error") // Return error to stop execution + }, + stderrLogger: slog.New(slog.DiscardHandler), + } + + done := runCtx.mustStartExtProc(t.Context(), filterapi.MustLoadDefaultConfig()) + <-done // Wait for completion + + // Verify both metrics and tracing flags are set + require.Contains(t, capturedArgs, "-metricsRequestHeaderAttributes") + require.Contains(t, capturedArgs, "-spanRequestHeaderAttributes") + + // Find the index and verify the values + for i, arg := range capturedArgs { + if arg == "-metricsRequestHeaderAttributes" { + require.Less(t, i+1, len(capturedArgs), "metricsRequestHeaderAttributes should have a value") + require.Equal(t, "x-team-id:team.id,x-user-id:user.id", capturedArgs[i+1]) + } + if arg == "-spanRequestHeaderAttributes" { + require.Less(t, i+1, len(capturedArgs), "spanRequestHeaderAttributes should have a value") + require.Equal(t, "x-session-id:session.id,x-user-id:user.id", capturedArgs[i+1]) + } + } +} + func TestTryFindEnvoyAdminAddress(t *testing.T) { gwWithProxy := func(name string) *gwapiv1.Gateway { return &gwapiv1.Gateway{ diff --git a/cmd/controller/main.go b/cmd/controller/main.go index b3577e4002..d74a6f221e 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -32,19 +32,21 @@ import ( ) type flags struct { - extProcLogLevel string - extProcImage string - extProcImagePullPolicy corev1.PullPolicy - enableLeaderElection bool - logLevel zapcore.Level - extensionServerPort string - tlsCertDir string - tlsCertName string - tlsKeyName string - caBundleName string - metricsRequestHeaderLabels string - rootPrefix string - extProcExtraEnvVars string + extProcLogLevel string + extProcImage string + extProcImagePullPolicy corev1.PullPolicy + enableLeaderElection bool + logLevel zapcore.Level + extensionServerPort string + tlsCertDir string + tlsCertName string + tlsKeyName string + caBundleName string + metricsRequestHeaderAttributes string + metricsRequestHeaderLabels string // DEPRECATED: use metricsRequestHeaderAttributes instead. + spanRequestHeaderAttributes string + rootPrefix string + extProcExtraEnvVars string // extProcMaxRecvMsgSize is the maximum message size in bytes that the gRPC server can receive. extProcMaxRecvMsgSize int } @@ -114,10 +116,20 @@ func parseAndValidateFlags(args []string) (flags, error) { "tls.key", "The name of the TLS key file.", ) + metricsRequestHeaderAttributes := fs.String( + "metricsRequestHeaderAttributes", + "", + "Comma-separated key-value pairs for mapping HTTP request headers to Otel metric attributes. Format: x-team-id:team.id,x-user-id:user.id.", + ) metricsRequestHeaderLabels := fs.String( "metricsRequestHeaderLabels", "", - "Comma-separated key-value pairs for mapping HTTP request headers to Prometheus metric labels. Format: x-team-id:team_id,x-user-id:user_id.", + "DEPRECATED: Use --metricsRequestHeaderAttributes instead. This flag will be removed in a future release.", + ) + spanRequestHeaderAttributes := fs.String( + "spanRequestHeaderAttributes", + "", + "Comma-separated key-value pairs for mapping HTTP request headers to otel span attributes. Format: x-session-id:session.id,x-user-id:user.id.", ) rootPrefix := fs.String( "rootPrefix", @@ -140,6 +152,11 @@ func parseAndValidateFlags(args []string) (flags, error) { return flags{}, err } + // Handle deprecated flag: fall back to metricsRequestHeaderLabels if metricsRequestHeaderAttributes is not set. + if *metricsRequestHeaderAttributes == "" && *metricsRequestHeaderLabels != "" { + *metricsRequestHeaderAttributes = *metricsRequestHeaderLabels + } + var slogLevel slog.Level if err := slogLevel.UnmarshalText([]byte(*extProcLogLevelPtr)); err != nil { err = fmt.Errorf("invalid external processor log level: %q", *extProcLogLevelPtr) @@ -157,11 +174,19 @@ func parseAndValidateFlags(args []string) (flags, error) { return flags{}, err } - // Validate metrics header labels if provided. - if *metricsRequestHeaderLabels != "" { - _, err := internalapi.ParseRequestHeaderLabelMapping(*metricsRequestHeaderLabels) + // Validate metrics header attributes if provided. + if *metricsRequestHeaderAttributes != "" { + _, err := internalapi.ParseRequestHeaderAttributeMapping(*metricsRequestHeaderAttributes) if err != nil { - return flags{}, fmt.Errorf("invalid metrics header labels: %w", err) + return flags{}, fmt.Errorf("invalid metrics header attributes: %w", err) + } + } + + // Validate tracing header attributes if provided. + if *spanRequestHeaderAttributes != "" { + _, err := internalapi.ParseRequestHeaderAttributeMapping(*spanRequestHeaderAttributes) + if err != nil { + return flags{}, fmt.Errorf("invalid tracing header attributes: %w", err) } } @@ -174,20 +199,22 @@ func parseAndValidateFlags(args []string) (flags, error) { } return flags{ - extProcLogLevel: *extProcLogLevelPtr, - extProcImage: *extProcImagePtr, - extProcImagePullPolicy: extProcPullPolicy, - enableLeaderElection: *enableLeaderElectionPtr, - logLevel: zapLogLevel, - extensionServerPort: *extensionServerPortPtr, - tlsCertDir: *tlsCertDir, - tlsCertName: *tlsCertName, - tlsKeyName: *tlsKeyName, - caBundleName: *caBundleName, - metricsRequestHeaderLabels: *metricsRequestHeaderLabels, - rootPrefix: *rootPrefix, - extProcExtraEnvVars: *extProcExtraEnvVars, - extProcMaxRecvMsgSize: *extProcMaxRecvMsgSize, + extProcLogLevel: *extProcLogLevelPtr, + extProcImage: *extProcImagePtr, + extProcImagePullPolicy: extProcPullPolicy, + enableLeaderElection: *enableLeaderElectionPtr, + logLevel: zapLogLevel, + extensionServerPort: *extensionServerPortPtr, + tlsCertDir: *tlsCertDir, + tlsCertName: *tlsCertName, + tlsKeyName: *tlsKeyName, + caBundleName: *caBundleName, + metricsRequestHeaderAttributes: *metricsRequestHeaderAttributes, + metricsRequestHeaderLabels: *metricsRequestHeaderLabels, + spanRequestHeaderAttributes: *spanRequestHeaderAttributes, + rootPrefix: *rootPrefix, + extProcExtraEnvVars: *extProcExtraEnvVars, + extProcMaxRecvMsgSize: *extProcMaxRecvMsgSize, }, nil } @@ -200,6 +227,11 @@ func main() { os.Exit(1) } + // Warn if deprecated flag is being used. + if flags.metricsRequestHeaderLabels != "" { + setupLog.Info("The --metricsRequestHeaderLabels flag is deprecated and will be removed in a future release. Please use --metricsRequestHeaderAttributes instead.") + } + ctrl.SetLogger(zap.New(zap.UseFlagOptions(&zap.Options{Development: true, Level: flags.logLevel}))) k8sConfig := ctrl.GetConfigOrDie() @@ -255,15 +287,16 @@ func main() { // Start the controller. if err := controller.StartControllers(ctx, mgr, k8sConfig, ctrl.Log.WithName("controller"), controller.Options{ - ExtProcImage: flags.extProcImage, - ExtProcImagePullPolicy: flags.extProcImagePullPolicy, - ExtProcLogLevel: flags.extProcLogLevel, - EnableLeaderElection: flags.enableLeaderElection, - UDSPath: extProcUDSPath, - MetricsRequestHeaderLabels: flags.metricsRequestHeaderLabels, - RootPrefix: flags.rootPrefix, - ExtProcExtraEnvVars: flags.extProcExtraEnvVars, - ExtProcMaxRecvMsgSize: flags.extProcMaxRecvMsgSize, + ExtProcImage: flags.extProcImage, + ExtProcImagePullPolicy: flags.extProcImagePullPolicy, + ExtProcLogLevel: flags.extProcLogLevel, + EnableLeaderElection: flags.enableLeaderElection, + UDSPath: extProcUDSPath, + MetricsRequestHeaderAttributes: flags.metricsRequestHeaderAttributes, + TracingRequestHeaderAttributes: flags.spanRequestHeaderAttributes, + RootPrefix: flags.rootPrefix, + ExtProcExtraEnvVars: flags.extProcExtraEnvVars, + ExtProcMaxRecvMsgSize: flags.extProcMaxRecvMsgSize, }); err != nil { setupLog.Error(err, "failed to start controller") } diff --git a/cmd/controller/main_test.go b/cmd/controller/main_test.go index cb5ef6804c..eddb63fbe0 100644 --- a/cmd/controller/main_test.go +++ b/cmd/controller/main_test.go @@ -48,6 +48,7 @@ func Test_parseAndValidateFlags(t *testing.T) { tc.dash + "logLevel=debug", tc.dash + "port=:8080", tc.dash + "extProcExtraEnvVars=OTEL_SERVICE_NAME=test;OTEL_TRACES_EXPORTER=console", + tc.dash + "spanRequestHeaderAttributes=x-session-id:session.id", } f, err := parseAndValidateFlags(args) require.Equal(t, "debug", f.extProcLogLevel) @@ -57,11 +58,34 @@ func Test_parseAndValidateFlags(t *testing.T) { require.Equal(t, "debug", f.logLevel.String()) require.Equal(t, ":8080", f.extensionServerPort) require.Equal(t, "OTEL_SERVICE_NAME=test;OTEL_TRACES_EXPORTER=console", f.extProcExtraEnvVars) + require.Equal(t, "x-session-id:session.id", f.spanRequestHeaderAttributes) require.NoError(t, err) }) } }) + t.Run("deprecated metricsRequestHeaderLabels flag", func(t *testing.T) { + args := []string{ + "--metricsRequestHeaderLabels=x-team-id:team.id", + } + f, err := parseAndValidateFlags(args) + require.NoError(t, err) + // Verify the deprecated flag value is used for metricsRequestHeaderAttributes + require.Equal(t, "x-team-id:team.id", f.metricsRequestHeaderAttributes) + require.Equal(t, "x-team-id:team.id", f.metricsRequestHeaderLabels) + }) + + t.Run("new flag takes precedence over deprecated flag", func(t *testing.T) { + args := []string{ + "--metricsRequestHeaderLabels=x-old:old.value", + "--metricsRequestHeaderAttributes=x-new:new.value", + } + f, err := parseAndValidateFlags(args) + require.NoError(t, err) + // Verify the new flag takes precedence + require.Equal(t, "x-new:new.value", f.metricsRequestHeaderAttributes) + }) + t.Run("invalid flags", func(t *testing.T) { for _, tc := range []struct { name string @@ -93,6 +117,16 @@ func Test_parseAndValidateFlags(t *testing.T) { flags: []string{"--extProcExtraEnvVars==value"}, expErr: "invalid extProc extra env vars", }, + { + name: "invalid spanRequestHeaderAttributes - missing colon", + flags: []string{"--spanRequestHeaderAttributes=x-session-id"}, + expErr: "invalid tracing header attributes", + }, + { + name: "invalid spanRequestHeaderAttributes - empty header", + flags: []string{"--spanRequestHeaderAttributes=:session.id"}, + expErr: "invalid tracing header attributes", + }, } { t.Run(tc.name, func(t *testing.T) { _, err := parseAndValidateFlags(tc.flags) diff --git a/cmd/extproc/mainlib/main.go b/cmd/extproc/mainlib/main.go index cb1c71e250..6b8da4fb72 100644 --- a/cmd/extproc/mainlib/main.go +++ b/cmd/extproc/mainlib/main.go @@ -35,14 +35,16 @@ import ( // extProcFlags is the struct that holds the flags passed to the external processor. type extProcFlags struct { - configPath string // path to the configuration file. - extProcAddr string // gRPC address for the external processor. - logLevel slog.Level // log level for the external processor. - adminPort int // HTTP port for the admin server (metrics and health). - metricsRequestHeaderLabels string // comma-separated key-value pairs for mapping HTTP request headers to Prometheus metric labels. - mcpAddr string // address for the MCP proxy server which can be either tcp or unix domain socket. - mcpSessionEncryptionSeed string // Seed for deriving the key for encrypting MCP sessions. - mcpWriteTimeout time.Duration // the maximum duration before timing out writes of the MCP response. + configPath string // path to the configuration file. + extProcAddr string // gRPC address for the external processor. + logLevel slog.Level // log level for the external processor. + adminPort int // HTTP port for the admin server (metrics and health). + metricsRequestHeaderAttributes string // comma-separated key-value pairs for mapping HTTP request headers to otel metric attributes. + metricsRequestHeaderLabels string // DEPRECATED: use metricsRequestHeaderAttributes instead. + spanRequestHeaderAttributes string // comma-separated key-value pairs for mapping HTTP request headers to otel span attributes. + mcpAddr string // address for the MCP proxy server which can be either tcp or unix domain socket. + mcpSessionEncryptionSeed string // Seed for deriving the key for encrypting MCP sessions. + mcpWriteTimeout time.Duration // the maximum duration before timing out writes of the MCP response. // rootPrefix is the root prefix for all the processors. rootPrefix string // maxRecvMsgSize is the maximum message size in bytes that the gRPC server can receive. @@ -74,10 +76,20 @@ func parseAndValidateFlags(args []string) (extProcFlags, error) { "log level for the external processor. One of 'debug', 'info', 'warn', or 'error'.", ) fs.IntVar(&flags.adminPort, "adminPort", 1064, "HTTP port for the admin server (serves /metrics and /health endpoints).") + fs.StringVar(&flags.metricsRequestHeaderAttributes, + "metricsRequestHeaderAttributes", + "", + "Comma-separated key-value pairs for mapping HTTP request headers to otel metric attributes. Format: x-team-id:team.id,x-user-id:user.id.", + ) fs.StringVar(&flags.metricsRequestHeaderLabels, "metricsRequestHeaderLabels", "", - "Comma-separated key-value pairs for mapping HTTP request headers to Prometheus metric labels. Format: x-team-id:team_id,x-user-id:user_id.", + "DEPRECATED: Use -metricsRequestHeaderAttributes instead. This flag will be removed in a future release.", + ) + fs.StringVar(&flags.spanRequestHeaderAttributes, + "spanRequestHeaderAttributes", + "", + "Comma-separated key-value pairs for mapping HTTP request headers to otel span attributes. Format: x-session-id:session.id,x-user-id:user.id.", ) fs.StringVar(&flags.rootPrefix, "rootPrefix", @@ -102,12 +114,22 @@ func parseAndValidateFlags(args []string) (extProcFlags, error) { return extProcFlags{}, fmt.Errorf("failed to parse extProcFlags: %w", err) } + // Handle deprecated flag: fall back to metricsRequestHeaderLabels if metricsRequestHeaderAttributes is not set. + if flags.metricsRequestHeaderAttributes == "" && flags.metricsRequestHeaderLabels != "" { + flags.metricsRequestHeaderAttributes = flags.metricsRequestHeaderLabels + } + if flags.configPath == "" { errs = append(errs, fmt.Errorf("configPath must be provided")) } if err := flags.logLevel.UnmarshalText([]byte(*logLevelPtr)); err != nil { errs = append(errs, fmt.Errorf("failed to unmarshal log level: %w", err)) } + if flags.spanRequestHeaderAttributes != "" { + if _, err := internalapi.ParseRequestHeaderAttributeMapping(flags.spanRequestHeaderAttributes); err != nil { + errs = append(errs, fmt.Errorf("failed to parse tracing header mapping: %w", err)) + } + } return flags, errors.Join(errs...) } @@ -135,6 +157,11 @@ func Main(ctx context.Context, args []string, stderr io.Writer) (err error) { l := slog.New(slog.NewTextHandler(stderr, &slog.HandlerOptions{Level: flags.logLevel})) + // Warn if deprecated flag is being used. + if flags.metricsRequestHeaderLabels != "" { + l.Warn("The -metricsRequestHeaderLabels flag is deprecated and will be removed in a future release. Please use -metricsRequestHeaderAttributes instead.") + } + l.Info("starting external processor", slog.String("version", version.Version), slog.String("address", flags.extProcAddr), @@ -177,12 +204,19 @@ func Main(ctx context.Context, args []string, stderr io.Writer) (err error) { } // Parse header mapping for metrics. - metricsRequestHeaderLabels, err := internalapi.ParseRequestHeaderLabelMapping(flags.metricsRequestHeaderLabels) + metricsRequestHeaderAttributes, err := internalapi.ParseRequestHeaderAttributeMapping(flags.metricsRequestHeaderAttributes) if err != nil { return fmt.Errorf("failed to parse metrics header mapping: %w", err) } - // Create Prometheus registry and reader. + // Parse header mapping for tracing spans. + spanRequestHeaderAttributes, err := internalapi.ParseRequestHeaderAttributeMapping(flags.spanRequestHeaderAttributes) + if err != nil { + return fmt.Errorf("failed to parse tracing header mapping: %w", err) + } + + // Create Prometheus registry and reader which automatically converts + // attribute to Prometheus-compatible format (e.g. dots to underscores). promRegistry := prometheus.NewRegistry() promReader, err := otelprom.New(otelprom.WithRegisterer(promRegistry)) if err != nil { @@ -194,11 +228,11 @@ func Main(ctx context.Context, args []string, stderr io.Writer) (err error) { if err != nil { return fmt.Errorf("failed to create metrics: %w", err) } - chatCompletionMetrics := metrics.NewChatCompletion(meter, metricsRequestHeaderLabels) - embeddingsMetrics := metrics.NewEmbeddings(meter, metricsRequestHeaderLabels) + chatCompletionMetrics := metrics.NewChatCompletion(meter, metricsRequestHeaderAttributes) + embeddingsMetrics := metrics.NewEmbeddings(meter, metricsRequestHeaderAttributes) mcpMetrics := metrics.NewMCP(meter) - tracing, err := tracing.NewTracingFromEnv(ctx, os.Stdout) + tracing, err := tracing.NewTracingFromEnv(ctx, os.Stdout, spanRequestHeaderAttributes) if err != nil { return err } diff --git a/cmd/extproc/mainlib/main_test.go b/cmd/extproc/mainlib/main_test.go index 09996141f4..cb95f2c4b3 100644 --- a/cmd/extproc/mainlib/main_test.go +++ b/cmd/extproc/mainlib/main_test.go @@ -86,7 +86,41 @@ func Test_parseAndValidateFlags(t *testing.T) { name: "with header mapping", args: []string{ "-configPath", "/path/to/config.yaml", - "-metricsRequestHeaderLabels", "x-team-id:team_id,x-user-id:user_id", + "-metricsRequestHeaderAttributes", "x-team-id:team.id,x-user-id:user.id", + }, + configPath: "/path/to/config.yaml", + rootPrefix: "/", + addr: ":1063", + logLevel: slog.LevelInfo, + }, + { + name: "with tracing header attributes", + args: []string{ + "-configPath", "/path/to/config.yaml", + "-spanRequestHeaderAttributes", "x-session-id:session.id,x-user-id:user.id", + }, + configPath: "/path/to/config.yaml", + rootPrefix: "/", + addr: ":1063", + logLevel: slog.LevelInfo, + }, + { + name: "with both metrics and tracing headers", + args: []string{ + "-configPath", "/path/to/config.yaml", + "-metricsRequestHeaderAttributes", "x-user-id:user.id", + "-spanRequestHeaderAttributes", "x-session-id:session.id", + }, + configPath: "/path/to/config.yaml", + rootPrefix: "/", + addr: ":1063", + logLevel: slog.LevelInfo, + }, + { + name: "with deprecated metricsRequestHeaderLabels flag", + args: []string{ + "-configPath", "/path/to/config.yaml", + "-metricsRequestHeaderLabels", "x-team-id:team.id", }, configPath: "/path/to/config.yaml", rootPrefix: "/", @@ -106,9 +140,34 @@ func Test_parseAndValidateFlags(t *testing.T) { }) t.Run("invalid extProcFlags", func(t *testing.T) { - _, err := parseAndValidateFlags([]string{"-logLevel", "invalid"}) - require.EqualError(t, err, `configPath must be provided -failed to unmarshal log level: slog: level string "invalid": unknown name`) + tests := []struct { + name string + args []string + expectedError string + }{ + { + name: "invalid log level", + args: []string{"-logLevel", "invalid"}, + expectedError: "configPath must be provided\nfailed to unmarshal log level: slog: level string \"invalid\": unknown name", + }, + { + name: "invalid tracing header attributes - missing colon", + args: []string{"-configPath", "/path/to/config.yaml", "-spanRequestHeaderAttributes", "x-session-id"}, + expectedError: "failed to parse tracing header mapping: invalid header-attribute pair at position 1: \"x-session-id\" (expected format: header:attribute)", + }, + { + name: "invalid tracing header attributes - empty header", + args: []string{"-configPath", "/path/to/config.yaml", "-spanRequestHeaderAttributes", ":session.id"}, + expectedError: "failed to parse tracing header mapping: empty header or attribute at position 1: \":session.id\"", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := parseAndValidateFlags(tt.args) + require.EqualError(t, err, tt.expectedError) + }) + } }) } diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 9d186bdee3..9dfee24343 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -73,8 +73,10 @@ type Options struct { UDSPath string // DisableMutatingWebhook disables the mutating webhook for the Gateway for testing purposes. DisableMutatingWebhook bool - // MetricsRequestHeaderLabels is the comma-separated key-value pairs for mapping HTTP request headers to Prometheus metric labels. - MetricsRequestHeaderLabels string + // MetricsRequestHeaderAttributes is the comma-separated key-value pairs for mapping HTTP request headers to Otel metric attributes. + MetricsRequestHeaderAttributes string + // TracingRequestHeaderAttributes is the comma-separated key-value pairs for mapping HTTP request headers to otel span attributes. + TracingRequestHeaderAttributes string // RootPrefix is the root prefix for all the routes handled by the AI Gateway. RootPrefix string // ExtProcExtraEnvVars is the semicolon-separated key=value pairs for extra environment variables in extProc container. @@ -205,7 +207,8 @@ func StartControllers(ctx context.Context, mgr manager.Manager, config *rest.Con options.ExtProcImagePullPolicy, options.ExtProcLogLevel, options.UDSPath, - options.MetricsRequestHeaderLabels, + options.MetricsRequestHeaderAttributes, + options.TracingRequestHeaderAttributes, options.RootPrefix, options.ExtProcExtraEnvVars, options.ExtProcMaxRecvMsgSize, diff --git a/internal/controller/gateway_mutator.go b/internal/controller/gateway_mutator.go index 1171f4f419..ab90d3c935 100644 --- a/internal/controller/gateway_mutator.go +++ b/internal/controller/gateway_mutator.go @@ -34,14 +34,15 @@ type gatewayMutator struct { kube kubernetes.Interface logger logr.Logger - extProcImage string - extProcImagePullPolicy corev1.PullPolicy - extProcLogLevel string - udsPath string - metricsRequestHeaderLabels string - rootPrefix string - extProcExtraEnvVars []corev1.EnvVar - extProcMaxRecvMsgSize int + extProcImage string + extProcImagePullPolicy corev1.PullPolicy + extProcLogLevel string + udsPath string + metricsRequestHeaderAttributes string + spanRequestHeaderAttributes string + rootPrefix string + extProcExtraEnvVars []corev1.EnvVar + extProcMaxRecvMsgSize int // Whether to run the extProc container as a sidecar (true) as a normal container (false). // This is essentially a workaround for old k8s versions, and we can remove this in the future. @@ -50,7 +51,7 @@ type gatewayMutator struct { func newGatewayMutator(c client.Client, kube kubernetes.Interface, logger logr.Logger, extProcImage string, extProcImagePullPolicy corev1.PullPolicy, extProcLogLevel, - udsPath, metricsRequestHeaderLabels, rootPrefix, extProcExtraEnvVars string, extProcMaxRecvMsgSize int, + udsPath, metricsRequestHeaderAttributes, spanRequestHeaderAttributes, rootPrefix, extProcExtraEnvVars string, extProcMaxRecvMsgSize int, extProcAsSideCar bool, ) *gatewayMutator { var parsedEnvVars []corev1.EnvVar @@ -64,17 +65,18 @@ func newGatewayMutator(c client.Client, kube kubernetes.Interface, logger logr.L } return &gatewayMutator{ c: c, codec: serializer.NewCodecFactory(Scheme), - kube: kube, - extProcImage: extProcImage, - extProcImagePullPolicy: extProcImagePullPolicy, - extProcLogLevel: extProcLogLevel, - logger: logger, - udsPath: udsPath, - metricsRequestHeaderLabels: metricsRequestHeaderLabels, - rootPrefix: rootPrefix, - extProcExtraEnvVars: parsedEnvVars, - extProcMaxRecvMsgSize: extProcMaxRecvMsgSize, - extProcAsSideCar: extProcAsSideCar, + kube: kube, + extProcImage: extProcImage, + extProcImagePullPolicy: extProcImagePullPolicy, + extProcLogLevel: extProcLogLevel, + logger: logger, + udsPath: udsPath, + metricsRequestHeaderAttributes: metricsRequestHeaderAttributes, + spanRequestHeaderAttributes: spanRequestHeaderAttributes, + rootPrefix: rootPrefix, + extProcExtraEnvVars: parsedEnvVars, + extProcMaxRecvMsgSize: extProcMaxRecvMsgSize, + extProcAsSideCar: extProcAsSideCar, } } @@ -112,8 +114,13 @@ func (g *gatewayMutator) buildExtProcArgs(filterConfigFullPath string, extProcAd } // Add metrics header label mapping if configured. - if g.metricsRequestHeaderLabels != "" { - args = append(args, "-metricsRequestHeaderLabels", g.metricsRequestHeaderLabels) + if g.metricsRequestHeaderAttributes != "" { + args = append(args, "-metricsRequestHeaderAttributes", g.metricsRequestHeaderAttributes) + } + + // Add tracing header attribute mapping if configured. + if g.spanRequestHeaderAttributes != "" { + args = append(args, "-spanRequestHeaderAttributes", g.spanRequestHeaderAttributes) } return args diff --git a/internal/controller/gateway_mutator_test.go b/internal/controller/gateway_mutator_test.go index 375b3d255d..69d3556d8d 100644 --- a/internal/controller/gateway_mutator_test.go +++ b/internal/controller/gateway_mutator_test.go @@ -26,7 +26,7 @@ import ( func TestGatewayMutator_Default(t *testing.T) { fakeClient := requireNewFakeClientWithIndexes(t) fakeKube := fake2.NewClientset() - g := newTestGatewayMutator(fakeClient, fakeKube, "", "", false) + g := newTestGatewayMutator(fakeClient, fakeKube, "", "", "", false) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "test-namespace"}, Spec: corev1.PodSpec{ @@ -44,10 +44,11 @@ func TestGatewayMutator_Default(t *testing.T) { func TestGatewayMutator_mutatePod(t *testing.T) { tests := []struct { - name string - metricsRequestHeaderLabels string - extProcExtraEnvVars string - extprocTest func(t *testing.T, container corev1.Container) + name string + metricsRequestHeaderAttributes string + spanRequestHeaderAttributes string + extProcExtraEnvVars string + extprocTest func(t *testing.T, container corev1.Container) }{ { name: "basic extproc container", @@ -66,24 +67,48 @@ func TestGatewayMutator_mutatePod(t *testing.T) { }, }, { - name: "with metrics request header labels", - metricsRequestHeaderLabels: "x-team-id:team_id,x-user-id:user_id", + name: "with metrics request header labels", + metricsRequestHeaderAttributes: "x-team-id:team.id,x-user-id:user.id", extprocTest: func(t *testing.T, container corev1.Container) { require.Empty(t, container.Env) - require.Contains(t, container.Args, "-metricsRequestHeaderLabels") - require.Contains(t, container.Args, "x-team-id:team_id,x-user-id:user_id") + require.Contains(t, container.Args, "-metricsRequestHeaderAttributes") + require.Contains(t, container.Args, "x-team-id:team.id,x-user-id:user.id") }, }, { - name: "with both metrics and env vars", - metricsRequestHeaderLabels: "x-request-id:request_id", - extProcExtraEnvVars: "OTEL_SERVICE_NAME=custom-service", + name: "with both metrics and env vars", + metricsRequestHeaderAttributes: "x-team-id:team.id", + extProcExtraEnvVars: "OTEL_SERVICE_NAME=custom-service", extprocTest: func(t *testing.T, container corev1.Container) { require.Equal(t, []corev1.EnvVar{ {Name: "OTEL_SERVICE_NAME", Value: "custom-service"}, }, container.Env) - require.Contains(t, container.Args, "-metricsRequestHeaderLabels") - require.Contains(t, container.Args, "x-request-id:request_id") + require.Contains(t, container.Args, "-metricsRequestHeaderAttributes") + require.Contains(t, container.Args, "x-team-id:team.id") + }, + }, + { + name: "with tracing request header attributes", + spanRequestHeaderAttributes: "x-session-id:session.id,x-user-id:user.id", + extprocTest: func(t *testing.T, container corev1.Container) { + require.Empty(t, container.Env) + require.Contains(t, container.Args, "-spanRequestHeaderAttributes") + require.Contains(t, container.Args, "x-session-id:session.id,x-user-id:user.id") + }, + }, + { + name: "with metrics, tracing, and env vars", + metricsRequestHeaderAttributes: "x-user-id:user.id", + spanRequestHeaderAttributes: "x-session-id:session.id", + extProcExtraEnvVars: "OTEL_SERVICE_NAME=test-service", + extprocTest: func(t *testing.T, container corev1.Container) { + require.Equal(t, []corev1.EnvVar{ + {Name: "OTEL_SERVICE_NAME", Value: "test-service"}, + }, container.Env) + require.Contains(t, container.Args, "-metricsRequestHeaderAttributes") + require.Contains(t, container.Args, "x-user-id:user.id") + require.Contains(t, container.Args, "-spanRequestHeaderAttributes") + require.Contains(t, container.Args, "x-session-id:session.id") }, }, } @@ -94,7 +119,7 @@ func TestGatewayMutator_mutatePod(t *testing.T) { t.Run(fmt.Sprintf("sidecar=%v", sidecar), func(t *testing.T) { fakeClient := requireNewFakeClientWithIndexes(t) fakeKube := fake2.NewClientset() - g := newTestGatewayMutator(fakeClient, fakeKube, tt.metricsRequestHeaderLabels, tt.extProcExtraEnvVars, sidecar) + g := newTestGatewayMutator(fakeClient, fakeKube, tt.metricsRequestHeaderAttributes, tt.spanRequestHeaderAttributes, tt.extProcExtraEnvVars, sidecar) const gwName, gwNamespace = "test-gateway", "test-namespace" err := fakeClient.Create(t.Context(), &aigv1a1.AIGatewayRoute{ @@ -158,11 +183,11 @@ func TestGatewayMutator_mutatePod(t *testing.T) { } } -func newTestGatewayMutator(fakeClient client.Client, fakeKube *fake2.Clientset, metricsRequestHeaderLabels, extProcExtraEnvVars string, sidecar bool) *gatewayMutator { +func newTestGatewayMutator(fakeClient client.Client, fakeKube *fake2.Clientset, metricsRequestHeaderAttributes, spanRequestHeaderAttributes, extProcExtraEnvVars string, sidecar bool) *gatewayMutator { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&zap.Options{Development: true, Level: zapcore.DebugLevel}))) return newGatewayMutator( fakeClient, fakeKube, ctrl.Log, "docker.io/envoyproxy/ai-gateway-extproc:latest", corev1.PullIfNotPresent, - "info", "/tmp/extproc.sock", metricsRequestHeaderLabels, "/v1", extProcExtraEnvVars, 512*1024*1024, + "info", "/tmp/extproc.sock", metricsRequestHeaderAttributes, spanRequestHeaderAttributes, "/v1", extProcExtraEnvVars, 512*1024*1024, sidecar, ) } diff --git a/internal/extproc/translator/anthropic_gcpanthropic_test.go b/internal/extproc/translator/anthropic_gcpanthropic_test.go index 8b8281eef4..2c882399ba 100644 --- a/internal/extproc/translator/anthropic_gcpanthropic_test.go +++ b/internal/extproc/translator/anthropic_gcpanthropic_test.go @@ -369,7 +369,7 @@ func TestAnthropicToGCPAnthropicTranslator_RequestBody_FieldPassthrough(t *testi }, }, "tool_choice": map[string]any{"type": "auto"}, - "metadata": map[string]any{"user_id": "test123"}, + "metadata": map[string]any{"user.id": "test123"}, } _, bodyMutation, err := translator.RequestBody(nil, parsedReq, false) diff --git a/internal/internalapi/internalapi.go b/internal/internalapi/internalapi.go index a6001842c3..540cad3431 100644 --- a/internal/internalapi/internalapi.go +++ b/internal/internalapi/internalapi.go @@ -76,15 +76,17 @@ const ( AIGatewayGeneratedHTTPRouteAnnotation = "ai-gateway-generated" ) -// ParseRequestHeaderLabelMapping parses comma-separated key-value pairs for header-to-label mapping. -// The input format is "header1:label1,header2:label2" where header names are HTTP request -// headers and label names are Prometheus metric labels. -// Example: "x-team-id:team_id,x-user-id:user_id". +// ParseRequestHeaderAttributeMapping parses comma-separated key-value pairs for header-to-attribute mapping. +// The input format is "header1:attribute1,header2:attribute2" where header names are HTTP request +// headers and attribute names are Otel span or metric attributes. +// Example: "x-session-id:session.id,x-user-id:user.id". // // Note: This serves a different purpose than OTEL's OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST, -// which captures headers as span attributes for tracing. This function creates Prometheus metric labels -// from headers with custom naming (e.g., x-team-id → team_id) for proper Prometheus conventions. -func ParseRequestHeaderLabelMapping(s string) (map[string]string, error) { +// which captures headers as span attributes for tracing. +// +// Note: We do not need to convert to Prometheus format (e.g., x-session-id → session.id) here, +// as that's done implicitly in the Prometheus exporter. +func ParseRequestHeaderAttributeMapping(s string) (map[string]string, error) { if s == "" { return nil, nil } @@ -95,22 +97,22 @@ func ParseRequestHeaderLabelMapping(s string) (map[string]string, error) { for i, pair := range pairs { pair = strings.TrimSpace(pair) if pair == "" { - return nil, fmt.Errorf("empty header-label pair at position %d", i+1) + return nil, fmt.Errorf("empty header-attribute pair at position %d", i+1) } parts := strings.SplitN(pair, ":", 2) if len(parts) != 2 { - return nil, fmt.Errorf("invalid header-label pair at position %d: %q (expected format: header:label)", i+1, pair) + return nil, fmt.Errorf("invalid header-attribute pair at position %d: %q (expected format: header:attribute)", i+1, pair) } header := strings.TrimSpace(parts[0]) - label := strings.TrimSpace(parts[1]) + attribute := strings.TrimSpace(parts[1]) - if header == "" || label == "" { - return nil, fmt.Errorf("empty header or label at position %d: %q", i+1, pair) + if header == "" || attribute == "" { + return nil, fmt.Errorf("empty header or attribute at position %d: %q", i+1, pair) } - result[header] = label + result[header] = attribute } return result, nil diff --git a/internal/internalapi/internalapi_test.go b/internal/internalapi/internalapi_test.go index 2b042a5158..d814cdd173 100644 --- a/internal/internalapi/internalapi_test.go +++ b/internal/internalapi/internalapi_test.go @@ -66,7 +66,7 @@ func TestConstants(t *testing.T) { require.Equal(t, "x-gateway-destination-endpoint", EndpointPickerHeaderKey) } -func TestParseRequestHeaderLabelMapping(t *testing.T) { +func TestParseRequestHeaderAttributeMapping(t *testing.T) { tests := []struct { name string input string @@ -81,67 +81,67 @@ func TestParseRequestHeaderLabelMapping(t *testing.T) { }, { name: "single valid pair", - input: "x-team-id:team_id", - expected: map[string]string{"x-team-id": "team_id"}, + input: "x-session-id:session.id", + expected: map[string]string{"x-session-id": "session.id"}, wantErr: false, }, { name: "multiple valid pairs", - input: "x-team-id:team_id,x-user-id:user_id", - expected: map[string]string{"x-team-id": "team_id", "x-user-id": "user_id"}, + input: "x-session-id:session.id,x-user-id:user.id", + expected: map[string]string{"x-session-id": "session.id", "x-user-id": "user.id"}, wantErr: false, }, { name: "with whitespace", - input: " x-team-id : team_id , x-user-id : user_id ", - expected: map[string]string{"x-team-id": "team_id", "x-user-id": "user_id"}, + input: " x-session-id : session.id , x-user-id : user.id ", + expected: map[string]string{"x-session-id": "session.id", "x-user-id": "user.id"}, wantErr: false, }, { name: "invalid format - missing colon", - input: "x-team-id", + input: "x-session-id", expected: nil, wantErr: true, }, { name: "invalid format - empty header", - input: ":team_id", + input: ":session.id", expected: nil, wantErr: true, }, { - name: "invalid format - empty label", - input: "x-team-id:", + name: "invalid format - empty attribute", + input: "x-session-id:", expected: nil, wantErr: true, }, { name: "multiple colons - takes first colon", - input: "x-team-id:team_id:extra", - expected: map[string]string{"x-team-id": "team_id:extra"}, + input: "x-session-id:session.id:extra", + expected: map[string]string{"x-session-id": "session.id:extra"}, wantErr: false, }, { name: "trailing comma - should fail", - input: "x-team-id:team_id,", + input: "x-session-id:session.id,", expected: nil, wantErr: true, }, { name: "double comma - should fail", - input: "x-team-id:team_id,,x-user-id:user_id", + input: "x-session-id:session.id,,x-user-id:user.id", expected: nil, wantErr: true, }, { name: "comma with spaces - should fail", - input: "x-team-id : team_id , , x-user-id : user_id", + input: "x-session-id : session.id , , x-user-id : user.id", expected: nil, wantErr: true, }, { name: "leading comma - should fail", - input: ",x-team-id:team_id", + input: ",x-session-id:session.id", expected: nil, wantErr: true, }, @@ -149,7 +149,7 @@ func TestParseRequestHeaderLabelMapping(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result, err := ParseRequestHeaderLabelMapping(tt.input) + result, err := ParseRequestHeaderAttributeMapping(tt.input) if tt.wantErr { assert.Error(t, err) } else { diff --git a/internal/mcpproxy/handlers_test.go b/internal/mcpproxy/handlers_test.go index 61c7c1ffa7..c479968883 100644 --- a/internal/mcpproxy/handlers_test.go +++ b/internal/mcpproxy/handlers_test.go @@ -1549,7 +1549,7 @@ func Test_parseParamsAndMaybeStartSpan(t *testing.T) { p := &mcp.GetPromptParams{} m := newTestMCPProxy() t.Setenv("OTEL_TRACES_EXPORTER", "console") - trace, err := tracing.NewTracingFromEnv(t.Context(), t.Output()) + trace, err := tracing.NewTracingFromEnv(t.Context(), t.Output(), nil) require.NoError(t, err) m.tracer = trace.MCPTracer() s, err := parseParamsAndMaybeStartSpan(t.Context(), m, req, p) diff --git a/internal/metrics/chat_completion_metrics_test.go b/internal/metrics/chat_completion_metrics_test.go index 18cf2cc25b..c5bd515d5f 100644 --- a/internal/metrics/chat_completion_metrics_test.go +++ b/internal/metrics/chat_completion_metrics_test.go @@ -192,7 +192,7 @@ func TestHeaderLabelMapping(t *testing.T) { // Test header label mapping. headerMapping = map[string]string{ - "x-user-id": "user_id", + "x-user-id": "user.id", "x-org-id": "org_id", } @@ -223,7 +223,7 @@ func TestHeaderLabelMapping(t *testing.T) { attribute.Key(genaiAttributeRequestModel).String("test-model"), attribute.Key(genaiAttributeResponseModel).String("test-model"), attribute.Key(genaiAttributeTokenType).String(genaiTokenTypeInput), - attribute.Key("user_id").String("user123"), + attribute.Key("user.id").String("user123"), attribute.Key("org_id").String("org456"), ) diff --git a/internal/tracing/tracer.go b/internal/tracing/tracer.go index 78abb9fde0..c9780ecb19 100644 --- a/internal/tracing/tracer.go +++ b/internal/tracing/tracer.go @@ -10,6 +10,7 @@ import ( corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" @@ -21,22 +22,24 @@ import ( // Ensure chatCompletionTracer implements ChatCompletionTracer. var _ tracing.ChatCompletionTracer = (*chatCompletionTracer)(nil) -func newChatCompletionTracer(tracer trace.Tracer, propagator propagation.TextMapPropagator, recorder tracing.ChatCompletionRecorder) tracing.ChatCompletionTracer { +func newChatCompletionTracer(tracer trace.Tracer, propagator propagation.TextMapPropagator, recorder tracing.ChatCompletionRecorder, headerAttributes map[string]string) tracing.ChatCompletionTracer { // Check if the tracer is a no-op by checking its type. if _, ok := tracer.(noop.Tracer); ok { return tracing.NoopChatCompletionTracer{} } return &chatCompletionTracer{ - tracer: tracer, - propagator: propagator, - recorder: recorder, + tracer: tracer, + propagator: propagator, + recorder: recorder, + headerAttributes: headerAttributes, } } type chatCompletionTracer struct { - tracer trace.Tracer - recorder tracing.ChatCompletionRecorder - propagator propagation.TextMapPropagator + tracer trace.Tracer + recorder tracing.ChatCompletionRecorder + propagator propagation.TextMapPropagator + headerAttributes map[string]string } // StartSpanAndInjectHeaders implements ChatCompletionTracer.StartSpanAndInjectHeaders. @@ -56,6 +59,20 @@ func (t *chatCompletionTracer) StartSpanAndInjectHeaders(ctx context.Context, he // This avoids expensive body processing for unsampled spans. if span.IsRecording() { t.recorder.RecordRequest(span, req, body) + + // Apply header-to-attribute mapping if configured. + if len(t.headerAttributes) > 0 { + attrs := make([]attribute.KeyValue, 0, len(t.headerAttributes)) + for headerName, attrName := range t.headerAttributes { + if headerValue, ok := headers[headerName]; ok { + attrs = append(attrs, attribute.String(attrName, headerValue)) + } + } + if len(attrs) > 0 { + span.SetAttributes(attrs...) + } + } + return &chatCompletionSpan{span: span, recorder: t.recorder} } @@ -89,22 +106,24 @@ func (c *headerMutationCarrier) Keys() []string { // Ensure embeddingsTracer implements [tracing.EmbeddingsTracer]. var _ tracing.EmbeddingsTracer = (*embeddingsTracer)(nil) -func newEmbeddingsTracer(tracer trace.Tracer, propagator propagation.TextMapPropagator, recorder tracing.EmbeddingsRecorder) tracing.EmbeddingsTracer { +func newEmbeddingsTracer(tracer trace.Tracer, propagator propagation.TextMapPropagator, recorder tracing.EmbeddingsRecorder, headerAttributes map[string]string) tracing.EmbeddingsTracer { // Check if the tracer is a no-op by checking its type. if _, ok := tracer.(noop.Tracer); ok { return tracing.NoopEmbeddingsTracer{} } return &embeddingsTracer{ - tracer: tracer, - propagator: propagator, - recorder: recorder, + tracer: tracer, + propagator: propagator, + recorder: recorder, + headerAttributes: headerAttributes, } } type embeddingsTracer struct { - tracer trace.Tracer - recorder tracing.EmbeddingsRecorder - propagator propagation.TextMapPropagator + tracer trace.Tracer + recorder tracing.EmbeddingsRecorder + propagator propagation.TextMapPropagator + headerAttributes map[string]string } // StartSpanAndInjectHeaders implements [tracing.EmbeddingsTracer.StartSpanAndInjectHeaders]. @@ -124,6 +143,20 @@ func (t *embeddingsTracer) StartSpanAndInjectHeaders(ctx context.Context, header // This avoids expensive body processing for unsampled spans. if span.IsRecording() { t.recorder.RecordRequest(span, req, body) + + // Apply header-to-attribute mapping if configured. + if len(t.headerAttributes) > 0 { + attrs := make([]attribute.KeyValue, 0, len(t.headerAttributes)) + for headerName, attrName := range t.headerAttributes { + if headerValue, ok := headers[headerName]; ok { + attrs = append(attrs, attribute.String(attrName, headerValue)) + } + } + if len(attrs) > 0 { + span.SetAttributes(attrs...) + } + } + return &embeddingsSpan{span: span, recorder: t.recorder} } diff --git a/internal/tracing/tracer_test.go b/internal/tracing/tracer_test.go index 75e8f45657..117738d44e 100644 --- a/internal/tracing/tracer_test.go +++ b/internal/tracing/tracer_test.go @@ -117,7 +117,7 @@ func TestTracer_StartSpanAndInjectHeaders(t *testing.T) { exporter := tracetest.NewInMemoryExporter() tp := trace.NewTracerProvider(trace.WithSyncer(exporter)) - tracer := newChatCompletionTracer(tp.Tracer("test"), autoprop.NewTextMapPropagator(), testChatCompletionRecorder{}) + tracer := newChatCompletionTracer(tp.Tracer("test"), autoprop.NewTextMapPropagator(), testChatCompletionRecorder{}, nil) headerMutation := &extprocv3.HeaderMutation{} reqBody, err := json.Marshal(tt.req) @@ -168,7 +168,7 @@ func TestNewChatCompletionTracer_Noop(t *testing.T) { // Use noop tracer. noopTracer := noop.Tracer{} - tracer := newChatCompletionTracer(noopTracer, autoprop.NewTextMapPropagator(), testChatCompletionRecorder{}) + tracer := newChatCompletionTracer(noopTracer, autoprop.NewTextMapPropagator(), testChatCompletionRecorder{}, nil) // Verify it returns NoopTracer. require.IsType(t, tracing.NoopChatCompletionTracer{}, tracer) @@ -176,12 +176,12 @@ func TestNewChatCompletionTracer_Noop(t *testing.T) { // Test that noop tracer doesn't create spans. headers := map[string]string{} headerMutation := &extprocv3.HeaderMutation{} - req := &openai.ChatCompletionRequest{Model: "test"} + testReq := &openai.ChatCompletionRequest{Model: "test"} span := tracer.StartSpanAndInjectHeaders(t.Context(), headers, headerMutation, - req, + testReq, []byte("{}"), ) @@ -198,17 +198,17 @@ func TestTracer_UnsampledSpan(t *testing.T) { ) t.Cleanup(func() { _ = tracerProvider.Shutdown(context.Background()) }) - tracer := newChatCompletionTracer(tracerProvider.Tracer("test"), autoprop.NewTextMapPropagator(), testChatCompletionRecorder{}) + tracer := newChatCompletionTracer(tracerProvider.Tracer("test"), autoprop.NewTextMapPropagator(), testChatCompletionRecorder{}, nil) // Start a span that won't be sampled. headers := map[string]string{} headerMutation := &extprocv3.HeaderMutation{} - req := &openai.ChatCompletionRequest{Model: "test"} + testReq := &openai.ChatCompletionRequest{Model: "test"} span := tracer.StartSpanAndInjectHeaders(t.Context(), headers, headerMutation, - req, + testReq, []byte("{}"), ) @@ -219,6 +219,108 @@ func TestTracer_UnsampledSpan(t *testing.T) { require.NotEmpty(t, headerMutation.SetHeaders) } +func TestTracer_HeaderAttributeMapping(t *testing.T) { + exporter := tracetest.NewInMemoryExporter() + tp := trace.NewTracerProvider(trace.WithSyncer(exporter)) + + // Configure header-to-attribute mapping + headerMapping := map[string]string{ + "x-session-id": "session.id", + "x-user-id": "user.id", + } + + tracer := newChatCompletionTracer(tp.Tracer("test"), autoprop.NewTextMapPropagator(), testChatCompletionRecorder{}, headerMapping) + + // Create request with headers + headers := map[string]string{ + "x-session-id": "abc123", + "x-user-id": "user456", + "x-other": "ignored", // Not in mapping + } + headerMutation := &extprocv3.HeaderMutation{} + reqBody, err := json.Marshal(req) + require.NoError(t, err) + + span := tracer.StartSpanAndInjectHeaders(t.Context(), + headers, + headerMutation, + req, + reqBody, + ) + require.IsType(t, &chatCompletionSpan{}, span) + + // End the span to export it + span.EndSpan() + + spans := exporter.GetSpans() + require.Len(t, spans, 1) + actualSpan := spans[0] + + // Verify header attributes were added + var foundSessionID, foundUserID bool + for _, attr := range actualSpan.Attributes { + switch attr.Key { + case "session.id": + require.Equal(t, "abc123", attr.Value.AsString()) + foundSessionID = true + case "user.id": + require.Equal(t, "user456", attr.Value.AsString()) + foundUserID = true + } + } + require.True(t, foundSessionID, "session.id attribute not found") + require.True(t, foundUserID, "user.id attribute not found") +} + +func TestEmbeddingsTracer_HeaderAttributeMapping(t *testing.T) { + exporter := tracetest.NewInMemoryExporter() + tp := trace.NewTracerProvider(trace.WithSyncer(exporter)) + + // Configure header-to-attribute mapping + headerMapping := map[string]string{ + "x-session-id": "session.id", + } + + tracer := newEmbeddingsTracer(tp.Tracer("test"), autoprop.NewTextMapPropagator(), testEmbeddingsRecorder{}, headerMapping) + + // Create request with headers + headers := map[string]string{ + "x-session-id": "test-session-123", + } + headerMutation := &extprocv3.HeaderMutation{} + embReq := &openai.EmbeddingRequest{ + Input: openai.EmbeddingRequestInput{Value: "test input"}, + Model: "text-embedding-ada-002", + } + reqBody, err := json.Marshal(embReq) + require.NoError(t, err) + + span := tracer.StartSpanAndInjectHeaders(t.Context(), + headers, + headerMutation, + embReq, + reqBody, + ) + require.IsType(t, &embeddingsSpan{}, span) + + // End the span to export it + span.EndSpan() + + spans := exporter.GetSpans() + require.Len(t, spans, 1) + actualSpan := spans[0] + + // Verify header attribute was added + var foundSessionID bool + for _, attr := range actualSpan.Attributes { + if attr.Key == "session.id" { + require.Equal(t, "test-session-123", attr.Value.AsString()) + foundSessionID = true + } + } + require.True(t, foundSessionID, "session.id attribute not found") +} + func TestHeaderMutationCarrier(t *testing.T) { t.Run("Get panics", func(t *testing.T) { carrier := &headerMutationCarrier{m: &extprocv3.HeaderMutation{}} @@ -289,3 +391,30 @@ func (testChatCompletionRecorder) RecordResponse(span oteltrace.Span, resp *open } span.SetAttributes(attribute.Int("respBodyLen", len(body))) } + +var _ tracing.EmbeddingsRecorder = testEmbeddingsRecorder{} + +type testEmbeddingsRecorder struct{} + +func (testEmbeddingsRecorder) RecordResponseOnError(span oteltrace.Span, statusCode int, body []byte) { + span.SetAttributes(attribute.Int("statusCode", statusCode)) + span.SetAttributes(attribute.String("errorBody", string(body))) +} + +func (testEmbeddingsRecorder) StartParams(_ *openai.EmbeddingRequest, _ []byte) (spanName string, opts []oteltrace.SpanStartOption) { + return "Embeddings", startOpts +} + +func (testEmbeddingsRecorder) RecordRequest(span oteltrace.Span, req *openai.EmbeddingRequest, body []byte) { + span.SetAttributes(attribute.String("model", req.Model)) + span.SetAttributes(attribute.Int("reqBodyLen", len(body))) +} + +func (testEmbeddingsRecorder) RecordResponse(span oteltrace.Span, resp *openai.EmbeddingResponse) { + span.SetAttributes(attribute.Int("statusCode", 200)) + body, err := json.Marshal(resp) + if err != nil { + panic(err) + } + span.SetAttributes(attribute.Int("respBodyLen", len(body))) +} diff --git a/internal/tracing/tracing.go b/internal/tracing/tracing.go index 280f7b3823..c4e7ab07cb 100644 --- a/internal/tracing/tracing.go +++ b/internal/tracing/tracing.go @@ -17,7 +17,6 @@ import ( "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/trace/noop" tracing "github.com/envoyproxy/ai-gateway/internal/tracing/api" "github.com/envoyproxy/ai-gateway/internal/tracing/openinference/openai" @@ -55,9 +54,15 @@ func (t *tracingImpl) Shutdown(ctx context.Context) error { return nil } -// NewTracingFromEnv configures OpenTelemetry tracing based on environment. -// variables. Returns a tracing graph that is noop when disabled. -func NewTracingFromEnv(ctx context.Context, stdout io.Writer) (tracing.Tracing, error) { +// NewTracingFromEnv configures OpenTelemetry tracing based on environment +// variables and optional header attribute mapping. +// +// Parameters: +// - headerAttributeMapping: maps HTTP headers to otel span attributes (e.g. map["x-session-id"]="session.id"). +// If nil, no header mapping is applied. +// +// Returns a tracing graph that is noop when disabled. +func NewTracingFromEnv(ctx context.Context, stdout io.Writer, headerAttributeMapping map[string]string) (tracing.Tracing, error) { // Return no-op tracing if disabled. if os.Getenv("OTEL_SDK_DISABLED") == "true" { return tracing.NoopTracing{}, nil @@ -140,6 +145,9 @@ func NewTracingFromEnv(ctx context.Context, stdout io.Writer) (tracing.Tracing, // Configure propagation via the OTEL_PROPAGATORS ENV variable. propagator := autoprop.NewTextMapPropagator() + // Use provided header attribute mapping. + headerAttrs := headerAttributeMapping + // Default to OpenInference trace span semantic conventions. chatRecorder := openai.NewChatCompletionRecorderFromEnv() embeddingsRecorder := openai.NewEmbeddingsRecorderFromEnv() @@ -150,11 +158,13 @@ func NewTracingFromEnv(ctx context.Context, stdout io.Writer) (tracing.Tracing, tracer, propagator, chatRecorder, + headerAttrs, ), embeddingsTracer: newEmbeddingsTracer( tracer, propagator, embeddingsRecorder, + headerAttrs, ), mcpTracer: newMCPTracer(tracer, propagator), shutdown: tp.Shutdown, // we have to shut down what we create. @@ -167,25 +177,3 @@ type Shutdown interface { type noopShutdown struct{} func (noopShutdown) Shutdown(context.Context) error { return nil } - -// NewTracing configures OpenTelemetry tracing based on the configuration. -// Returns a tracing graph that is noop when the tracer provider is no-op. -func NewTracing(config *tracing.TracingConfig) tracing.Tracing { - if _, ok := config.Tracer.(noop.Tracer); ok { - return tracing.NoopTracing{} - } - return &tracingImpl{ - chatCompletionTracer: newChatCompletionTracer( - config.Tracer, - config.Propagator, - config.ChatCompletionRecorder, - ), - embeddingsTracer: newEmbeddingsTracer( - config.Tracer, - config.Propagator, - config.EmbeddingsRecorder, - ), - mcpTracer: newMCPTracer(config.Tracer, config.Propagator), - shutdown: nil, // shutdown is nil when we didn't create tp. - } -} diff --git a/internal/tracing/tracing_test.go b/internal/tracing/tracing_test.go index 4b67cef42d..e90c813adf 100644 --- a/internal/tracing/tracing_test.go +++ b/internal/tracing/tracing_test.go @@ -17,16 +17,12 @@ import ( extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/stretchr/testify/require" - "go.opentelemetry.io/contrib/propagators/autoprop" - "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/trace/noop" "k8s.io/utils/ptr" "github.com/envoyproxy/ai-gateway/internal/apischema/openai" "github.com/envoyproxy/ai-gateway/internal/testing/testotel" tracing "github.com/envoyproxy/ai-gateway/internal/tracing/api" "github.com/envoyproxy/ai-gateway/internal/tracing/openinference" - openaitracing "github.com/envoyproxy/ai-gateway/internal/tracing/openinference/openai" ) // TestNewTracingFromEnv_DefaultServiceName tests that the service name. @@ -61,7 +57,7 @@ func TestNewTracingFromEnv_DefaultServiceName(t *testing.T) { } var stdout bytes.Buffer - result, err := NewTracingFromEnv(t.Context(), &stdout) + result, err := NewTracingFromEnv(t.Context(), &stdout, nil) require.NoError(t, err) t.Cleanup(func() { _ = result.Shutdown(t.Context()) @@ -117,7 +113,7 @@ func TestNewTracingFromEnv_DisabledByEnv(t *testing.T) { t.Setenv(k, v) } - result, err := NewTracingFromEnv(t.Context(), io.Discard) + result, err := NewTracingFromEnv(t.Context(), io.Discard, nil) require.NoError(t, err) require.IsType(t, tracing.NoopTracing{}, result) }) @@ -170,7 +166,7 @@ func TestNewTracingFromEnv_EndpointHierarchy(t *testing.T) { t.Setenv(k, v) } - result, err := NewTracingFromEnv(t.Context(), io.Discard) + result, err := NewTracingFromEnv(t.Context(), io.Discard, nil) require.NoError(t, err) if tt.expectActive { @@ -235,7 +231,7 @@ func TestNewTracingFromEnv_ConsoleExporter(t *testing.T) { } var stdout bytes.Buffer - result, err := NewTracingFromEnv(t.Context(), &stdout) + result, err := NewTracingFromEnv(t.Context(), &stdout, nil) require.NoError(t, err) t.Cleanup(func() { _ = result.Shutdown(context.Background()) @@ -508,7 +504,7 @@ func newTracingFromEnvForTest(t *testing.T, stdout io.Writer) (*testotel.OTLPCol t.Cleanup(collector.Close) collector.SetEnv(t.Setenv) - result, err := NewTracingFromEnv(t.Context(), stdout) + result, err := NewTracingFromEnv(t.Context(), stdout, nil) require.NoError(t, err) t.Cleanup(func() { _ = result.Shutdown(context.Background()) @@ -517,41 +513,6 @@ func newTracingFromEnvForTest(t *testing.T, stdout io.Writer) (*testotel.OTLPCol return collector, result } -func TestNewTracing(t *testing.T) { - t.Run("with noop tracer", func(t *testing.T) { - config := &tracing.TracingConfig{ - Tracer: noop.Tracer{}, - Propagator: autoprop.NewTextMapPropagator(), - ChatCompletionRecorder: openaitracing.NewChatCompletionRecorderFromEnv(), - } - - result := NewTracing(config) - require.IsType(t, tracing.NoopTracing{}, result) - }) - - t.Run("with real tracer", func(t *testing.T) { - tp := trace.NewTracerProvider() - t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) - - config := &tracing.TracingConfig{ - Tracer: tp.Tracer("test"), - Propagator: autoprop.NewTextMapPropagator(), - ChatCompletionRecorder: openaitracing.NewChatCompletionRecorderFromEnv(), - } - - result := NewTracing(config) - require.IsType(t, &tracingImpl{}, result) - - // Test that ChatCompletionTracer returns the expected tracer. - tracer := result.ChatCompletionTracer() - require.NotNil(t, tracer) - - // Test that Shutdown returns nil when tp wasn't created internally. - err := result.Shutdown(t.Context()) - require.NoError(t, err) - }) -} - func TestNoopShutdown(t *testing.T) { ns := noopShutdown{} err := ns.Shutdown(t.Context()) @@ -573,7 +534,7 @@ func TestNewTracingFromEnv_OTLPHeaders(t *testing.T) { t.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", ts.URL) t.Setenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf") - result, err := NewTracingFromEnv(t.Context(), io.Discard) + result, err := NewTracingFromEnv(t.Context(), io.Discard, nil) require.NoError(t, err) t.Cleanup(func() { _ = result.Shutdown(context.Background()) diff --git a/manifests/charts/ai-gateway-helm/templates/deployment.yaml b/manifests/charts/ai-gateway-helm/templates/deployment.yaml index a4ea85644d..1ab43f8611 100644 --- a/manifests/charts/ai-gateway-helm/templates/deployment.yaml +++ b/manifests/charts/ai-gateway-helm/templates/deployment.yaml @@ -46,8 +46,11 @@ spec: - --extProcImage={{ .Values.extProc.image.repository }}:{{ .Values.extProc.image.tag | default .Chart.AppVersion }} - --extProcImagePullPolicy={{ .Values.extProc.imagePullPolicy }} - --extProcLogLevel={{ .Values.extProc.logLevel }} - {{- if .Values.controller.metricsRequestHeaderLabels }} - - --metricsRequestHeaderLabels={{ .Values.controller.metricsRequestHeaderLabels }} + {{- if or .Values.controller.metricsRequestHeaderAttributes .Values.controller.metricsRequestHeaderLabels }} + - --metricsRequestHeaderAttributes={{ .Values.controller.metricsRequestHeaderAttributes | default .Values.controller.metricsRequestHeaderLabels }} + {{- end }} + {{- if .Values.controller.spanRequestHeaderAttributes }} + - --spanRequestHeaderAttributes={{ .Values.controller.spanRequestHeaderAttributes }} {{- end }} {{- if .Values.extProc.extraEnvVars }} - --extProcExtraEnvVars={{ include "ai-gateway-helm.extProc.envVarsString" . }} diff --git a/manifests/charts/ai-gateway-helm/values.yaml b/manifests/charts/ai-gateway-helm/values.yaml index eff3c24f52..dfa17f7f7b 100644 --- a/manifests/charts/ai-gateway-helm/values.yaml +++ b/manifests/charts/ai-gateway-helm/values.yaml @@ -39,11 +39,19 @@ controller: nameOverride: "" fullnameOverride: "ai-gateway-controller" - # Comma-separated key-value pairs for mapping HTTP request headers to Prometheus metric labels. - # Format: "header1:label1,header2:label2" - # Example: "x-team-id:team_id,x-user-id:user_id" + # DEPRECATED: metricsRequestHeaderLabels will be removed after v0.4. Use metricsRequestHeaderAttributes instead. metricsRequestHeaderLabels: "" + # Comma-separated key-value pairs for mapping HTTP request headers to Otel metric attributes. + # Format: "header1:label1,header2:label2" + # Example: "x-team-id:team.id,x-user-id:user.id" + metricsRequestHeaderAttributes: "" + + # Comma-separated key-value pairs for mapping HTTP request headers to otel span attributes. + # Format: "header1:attribute1,header2:attribute2" + # Example: "x-session-id:session.id,x-user-id:user.id" + spanRequestHeaderAttributes: "" + # -- Service Account -- serviceAccount: # Specifies whether a service account should be created diff --git a/site/docs/capabilities/observability/tracing.md b/site/docs/capabilities/observability/tracing.md index 2f92da9151..165609c830 100644 --- a/site/docs/capabilities/observability/tracing.md +++ b/site/docs/capabilities/observability/tracing.md @@ -120,6 +120,50 @@ extProc: Note: Hiding inputs/outputs prevents human or LLM-as-a-Judge evaluation of your LLM requests, such as done with the [Phoenix Evals library][phoenix-evals]. +## Session Tracking + +Sessions help track and organize related traces across multi-turn conversations +with your AI app. Maintaining context between interactions is key for +observability. + +With sessions, you can: +- Track a conversation's full history in one thread. +- View inputs/outputs for a given agent. +- Monitor token usage and latency per conversation. + +By tagging spans with a consistent session ID, you get a connected view of +performance across a user's journey. + +The challenge is that requests to the gateway may not send traces, making +grouping difficult. Many GenAI frameworks allow you to set custom HTTP headers +when sending traffic to an LLM. Propagating sessions this way is simpler than +instrumenting applications with tracing code and can still achieve grouping. + +There's no standard name for session ID headers, but there is a common attribute +in OpenTelemetry, [session.id][otel-session], which has special handling in some +OpenTelemetry platforms such as [Phoenix][phoenix-session]. + +To bridge this gap, Envoy AI Gateway has two configurations to map HTTP request +headers to OpenTelemetry attributes, one for spans and one for metrics. +- `controller.spanRequestHeaderAttributes` +- `controller.metricsRequestHeaderAttributes` + +Both of these use the same value format: a comma-separated list of +`:` pairs. For example, if your session ID header +is `x-session-id`, you can map it to the standard OpenTelemetry attribute +`session.id` like this: `x-session-id:session.id`. + +Some metrics systems will be able to do fine-grained aggregation, but not all. +Here's an example of setting the session ID header for spans, but not metrics: +```shell +helm upgrade ai-eg oci://docker.io/envoyproxy/ai-gateway-helm \ + --version v0.0.0-latest \ + --namespace envoy-ai-gateway-system \ + --reuse-values \ + --set "controller.metricsRequestHeaderAttributes=x-user-id:user.id" \ + --set "controller.spanRequestHeaderAttributes=x-session-id:session.id,x-user-id:user.id" +``` + ## Cleanup To remove Phoenix and disable tracing: @@ -149,3 +193,5 @@ helm upgrade ai-eg oci://docker.io/envoyproxy/ai-gateway-helm \ [otel-config]: https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/ [phoenix]: https://docs.arize.com/phoenix [phoenix-evals]: https://arize.com/docs/phoenix/evaluation/llm-evals +[otel-session]: https://opentelemetry.io/docs/specs/semconv/registry/attributes/session/ +[phoenix-session]: https://arize.com/docs/phoenix/tracing/llm-traces/sessions diff --git a/site/docs/cli/run.md b/site/docs/cli/run.md index c668db21c7..e8e081186f 100644 --- a/site/docs/cli/run.md +++ b/site/docs/cli/run.md @@ -187,8 +187,11 @@ focused on retrieval and semantic analysis. - `OPENINFERENCE_HIDE_EMBEDDINGS_TEXT`: Hide embeddings input (default: `false`) - `OPENINFERENCE_HIDE_EMBEDDINGS_VECTORS`: Hide embeddings output (default: `false`) -See [docker-compose-otel.yaml][docker-compose-otel.yaml] for a complete example -configuration. +- **Header Mapping**: Map HTTP request headers to span attributes and metric labels. See [Session Tracking][session-tracking] for more details. + - `OTEL_AIGW_METRICS_REQUEST_HEADER_ATTRIBUTES`: Example: `x-team-id:team.id,x-user-id:user.id` + - `OTEL_AIGW_SPAN_REQUEST_HEADER_ATTRIBUTES`: Example: `x-session-id:session.id,x-user-id:user.id` + +See [docker-compose-otel.yaml][docker-compose-otel.yaml] for a complete example configuration. --- [openinference]: https://github.com/Arize-ai/openinference/tree/main/spec @@ -197,3 +200,4 @@ configuration. [openinference-config]: https://github.com/Arize-ai/openinference/blob/main/spec/configuration.md [openinference-embeddings]: https://github.com/Arize-ai/openinference/blob/main/spec/embedding_spans.md [docker-compose-otel.yaml]: https://github.com/envoyproxy/ai-gateway/blob/main/cmd/aigw/docker-compose-otel.yaml +[session-tracking]: ../capabilities/observability/tracing.md#session-tracking diff --git a/tests/e2e/e2e_test.go b/tests/e2e/e2e_test.go index 84037402cb..0d1623834c 100644 --- a/tests/e2e/e2e_test.go +++ b/tests/e2e/e2e_test.go @@ -14,8 +14,9 @@ import ( func TestMain(m *testing.M) { e2elib.TestMain(m, e2elib.AIGatewayHelmOption{ AdditionalArgs: []string{ - // Configure the additional prometheus metrics label for user ID. - "--set", "controller.metricsRequestHeaderLabels=x-user-id:" + userIDMetricsLabel, + // Configure the additional span and prometheus metrics label for user ID. + "--set", "controller.metricsRequestHeaderAttributes=x-user-id:" + userIDAttribute, + "--set", "controller.spanRequestHeaderAttributes=x-user-id:" + userIDAttribute, }, }, false, true, ) diff --git a/tests/e2e/otel_tracing_test.go b/tests/e2e/otel_tracing_test.go index ea334c847e..1c0829b69f 100644 --- a/tests/e2e/otel_tracing_test.go +++ b/tests/e2e/otel_tracing_test.go @@ -42,7 +42,8 @@ func TestOTELTracingWithConsoleExporter(t *testing.T) { // Upgrade the existing "ai-eg" release with new env vars. helm := testsinternal.GoToolCmdContext(ctx, "helm", "upgrade", "ai-eg", "--force", helmChartPath, - "--set", "controller.metricsRequestHeaderLabels=x-user-id:user_id", // Keep existing setting. + "--set", "controller.metricsRequestHeaderAttributes=x-user-id:"+userIDAttribute, // existing setting + "--set", "controller.spanRequestHeaderAttributes=x-user-id:"+userIDAttribute, // existing setting "--set", "extProc.extraEnvVars[0].name=OTEL_TRACES_EXPORTER", "--set", "extProc.extraEnvVars[0].value=console", "--set", "extProc.extraEnvVars[1].name=OTEL_SERVICE_NAME", @@ -128,6 +129,24 @@ func TestOTELTracingWithConsoleExporter(t *testing.T) { envVars := describeOutput.String() t.Logf("Environment variables in extProc container: %s", envVars) + // Get the container args to check header attributes configuration. + argsCmd := exec.CommandContext(ctx, "kubectl", "get", "pod", podName, + "-n", e2elib.EnvoyGatewayNamespace, + "-o", "jsonpath={.spec.initContainers[?(@.name=='ai-gateway-extproc')].args}") + + argsOutput := &bytes.Buffer{} + argsCmd.Stdout = argsOutput + argsCmd.Stderr = argsOutput + + err = argsCmd.Run() + if err != nil { + t.Logf("Failed to get container args for pod %s: %v", podName, err) + return false // Retry if command fails. + } + + containerArgs := argsOutput.String() + t.Logf("Container args in extProc container: %s", containerArgs) + defer func() { // Deletes the pods to ensure they are recreated with the new configuration for the next iteration. deletePodsCmd := e2elib.Kubectl(ctx, "delete", "pod", podName, @@ -148,8 +167,19 @@ func TestOTELTracingWithConsoleExporter(t *testing.T) { t.Log("Expected OTEL_SERVICE_NAME=ai-gateway-e2e-test in extProc container spec") return false } + + // Verify that pre-upgrade header attribute args are present in the container args. + if !strings.Contains(containerArgs, "-metricsRequestHeaderAttributes") || !strings.Contains(containerArgs, "x-user-id:"+userIDAttribute) { + t.Log("Expected -metricsRequestHeaderAttributes x-user-id:" + userIDAttribute + " in extProc container args") + return false + } + if !strings.Contains(containerArgs, "-spanRequestHeaderAttributes") || !strings.Contains(containerArgs, "x-user-id:"+userIDAttribute) { + t.Log("Expected -spanRequestHeaderAttributes x-user-id:" + userIDAttribute + " in extProc container args") + return false + } + return true }, 2*time.Minute, 5*time.Second) - t.Log("OTEL environment variables successfully verified in extProc container") + t.Log("OTEL environment variables and header attribute args successfully verified in extProc container") } diff --git a/tests/e2e/token_ratelimit_test.go b/tests/e2e/token_ratelimit_test.go index 646374c1bd..0bc3372243 100644 --- a/tests/e2e/token_ratelimit_test.go +++ b/tests/e2e/token_ratelimit_test.go @@ -25,8 +25,12 @@ import ( "github.com/envoyproxy/ai-gateway/tests/internal/testupstreamlib" ) -// userIDMetricsLabel is the label used for user ID in the Prometheus metrics. +// userIDAttribute is the attribute used for user ID in otel span and metrics. // This is passed via a helm value to the AI Gateway deployment. +const userIDAttribute = "user.id" + +// userIDMetricsLabel is the Prometheus label the userIDAttribute becomes when +// exported as a metric. const userIDMetricsLabel = "user_id" func Test_Examples_TokenRateLimit(t *testing.T) { @@ -111,7 +115,10 @@ func Test_Examples_TokenRateLimit(t *testing.T) { require.Eventually(t, func() bool { fwd := e2elib.RequireNewHTTPPortForwarder(t, "monitoring", "app=prometheus", 9090) defer fwd.Kill() - const query = `sum(gen_ai_client_token_usage_token_sum{gateway_envoyproxy_io_owning_gateway_name = "envoy-ai-gateway-token-ratelimit"}) by (gen_ai_request_model, gen_ai_token_type, user_id)` + // notice all labels are snake_case in Prometheus even though the otel inputs are dotted. + query := fmt.Sprintf( + `sum(gen_ai_client_token_usage_token_sum{gateway_envoyproxy_io_owning_gateway_name = "envoy-ai-gateway-token-ratelimit"}) by (gen_ai_request_model, gen_ai_token_type, %s)`, + userIDMetricsLabel) req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/api/v1/query?query=%s", fwd.Address(), url.QueryEscape(query)), nil) require.NoError(t, err) resp, err := http.DefaultClient.Do(req) @@ -146,6 +153,7 @@ func Test_Examples_TokenRateLimit(t *testing.T) { require.Equal(t, modelName, result.Metric["gen_ai_request_model"]) typ := result.Metric["gen_ai_token_type"] actualTypes = append(actualTypes, typ) + // Look up based on the label, not the attribute name it was derived from! uID, ok := result.Metric[userIDMetricsLabel] require.True(t, ok, userIDMetricsLabel+" should be present in the metric") t.Logf("Type: %s, Value: %v, User ID: %s", typ, result.Value, uID)