Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/aigw/docker-compose-otel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ services:
environment:
- OPENAI_BASE_URL=http://host.docker.internal:11434/v1
- OPENAI_API_KEY=unused
# Map HTTP request headers to otel span and metric attributes for session tracking
# Format: header:attribute,header:attribute
- OTEL_AIGW_SPAN_REQUEST_HEADER_ATTRIBUTES=x-session-id:session.id,x-user-id:user.id
- OTEL_AIGW_METRICS_REQUEST_HEADER_ATTRIBUTES=x-user-id:user.id
ports:
- "1975:1975" # OpenAI compatible endpoint at /v1
- "1064:1064" # Admin server: /metrics (Prometheus) and /health endpoints
Expand Down
7 changes: 7 additions & 0 deletions cmd/aigw/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,13 @@ func (runCtx *runCmdContext) mustStartExtProc(
args = append(args, "--logLevel", "warn")
}

if metricsAttrs := os.Getenv("OTEL_AIGW_METRICS_REQUEST_HEADER_ATTRIBUTES"); metricsAttrs != "" {
args = append(args, "-metricsRequestHeaderAttributes", metricsAttrs)
}
if spanAttrs := os.Getenv("OTEL_AIGW_SPAN_REQUEST_HEADER_ATTRIBUTES"); spanAttrs != "" {
args = append(args, "-spanRequestHeaderAttributes", spanAttrs)
}

done := make(chan error)
go func() {
if err := runCtx.extProcLauncher(ctx, args, os.Stderr); err != nil {
Expand Down
35 changes: 35 additions & 0 deletions cmd/aigw/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,41 @@ func Test_mustStartExtProc(t *testing.T) {
require.ErrorIs(t, <-done, mockErr)
}

func Test_mustStartExtProc_withHeaderAttributes(t *testing.T) {
t.Setenv("OTEL_AIGW_METRICS_REQUEST_HEADER_ATTRIBUTES", "x-team-id:team.id,x-user-id:user.id")
t.Setenv("OTEL_AIGW_SPAN_REQUEST_HEADER_ATTRIBUTES", "x-session-id:session.id,x-user-id:user.id")

var capturedArgs []string
runCtx := &runCmdContext{
tmpdir: t.TempDir(),
adminPort: 1064,
extProcLauncher: func(_ context.Context, args []string, _ io.Writer) error {
capturedArgs = args
return errors.New("mock error") // Return error to stop execution
},
stderrLogger: slog.New(slog.DiscardHandler),
}

done := runCtx.mustStartExtProc(t.Context(), filterapi.MustLoadDefaultConfig())
<-done // Wait for completion

// Verify both metrics and tracing flags are set
require.Contains(t, capturedArgs, "-metricsRequestHeaderAttributes")
require.Contains(t, capturedArgs, "-spanRequestHeaderAttributes")

// Find the index and verify the values
for i, arg := range capturedArgs {
if arg == "-metricsRequestHeaderAttributes" {
require.Less(t, i+1, len(capturedArgs), "metricsRequestHeaderAttributes should have a value")
require.Equal(t, "x-team-id:team.id,x-user-id:user.id", capturedArgs[i+1])
}
if arg == "-spanRequestHeaderAttributes" {
require.Less(t, i+1, len(capturedArgs), "spanRequestHeaderAttributes should have a value")
require.Equal(t, "x-session-id:session.id,x-user-id:user.id", capturedArgs[i+1])
}
}
}

func TestTryFindEnvoyAdminAddress(t *testing.T) {
gwWithProxy := func(name string) *gwapiv1.Gateway {
return &gwapiv1.Gateway{
Expand Down
115 changes: 74 additions & 41 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@ import (
)

type flags struct {
extProcLogLevel string
extProcImage string
extProcImagePullPolicy corev1.PullPolicy
enableLeaderElection bool
logLevel zapcore.Level
extensionServerPort string
tlsCertDir string
tlsCertName string
tlsKeyName string
caBundleName string
metricsRequestHeaderLabels string
rootPrefix string
extProcExtraEnvVars string
extProcLogLevel string
extProcImage string
extProcImagePullPolicy corev1.PullPolicy
enableLeaderElection bool
logLevel zapcore.Level
extensionServerPort string
tlsCertDir string
tlsCertName string
tlsKeyName string
caBundleName string
metricsRequestHeaderAttributes string
metricsRequestHeaderLabels string // DEPRECATED: use metricsRequestHeaderAttributes instead.
spanRequestHeaderAttributes string
rootPrefix string
extProcExtraEnvVars string
// extProcMaxRecvMsgSize is the maximum message size in bytes that the gRPC server can receive.
extProcMaxRecvMsgSize int
}
Expand Down Expand Up @@ -114,10 +116,20 @@ func parseAndValidateFlags(args []string) (flags, error) {
"tls.key",
"The name of the TLS key file.",
)
metricsRequestHeaderAttributes := fs.String(
"metricsRequestHeaderAttributes",
"",
"Comma-separated key-value pairs for mapping HTTP request headers to Otel metric attributes. Format: x-team-id:team.id,x-user-id:user.id.",
)
metricsRequestHeaderLabels := fs.String(
"metricsRequestHeaderLabels",
"",
"Comma-separated key-value pairs for mapping HTTP request headers to Prometheus metric labels. Format: x-team-id:team_id,x-user-id:user_id.",
"DEPRECATED: Use --metricsRequestHeaderAttributes instead. This flag will be removed in a future release.",
)
spanRequestHeaderAttributes := fs.String(
"spanRequestHeaderAttributes",
"",
"Comma-separated key-value pairs for mapping HTTP request headers to otel span attributes. Format: x-session-id:session.id,x-user-id:user.id.",
)
rootPrefix := fs.String(
"rootPrefix",
Expand All @@ -140,6 +152,11 @@ func parseAndValidateFlags(args []string) (flags, error) {
return flags{}, err
}

// Handle deprecated flag: fall back to metricsRequestHeaderLabels if metricsRequestHeaderAttributes is not set.
if *metricsRequestHeaderAttributes == "" && *metricsRequestHeaderLabels != "" {
*metricsRequestHeaderAttributes = *metricsRequestHeaderLabels
}

var slogLevel slog.Level
if err := slogLevel.UnmarshalText([]byte(*extProcLogLevelPtr)); err != nil {
err = fmt.Errorf("invalid external processor log level: %q", *extProcLogLevelPtr)
Expand All @@ -157,11 +174,19 @@ func parseAndValidateFlags(args []string) (flags, error) {
return flags{}, err
}

// Validate metrics header labels if provided.
if *metricsRequestHeaderLabels != "" {
_, err := internalapi.ParseRequestHeaderLabelMapping(*metricsRequestHeaderLabels)
// Validate metrics header attributes if provided.
if *metricsRequestHeaderAttributes != "" {
_, err := internalapi.ParseRequestHeaderAttributeMapping(*metricsRequestHeaderAttributes)
if err != nil {
return flags{}, fmt.Errorf("invalid metrics header labels: %w", err)
return flags{}, fmt.Errorf("invalid metrics header attributes: %w", err)
}
}

// Validate tracing header attributes if provided.
if *spanRequestHeaderAttributes != "" {
_, err := internalapi.ParseRequestHeaderAttributeMapping(*spanRequestHeaderAttributes)
if err != nil {
return flags{}, fmt.Errorf("invalid tracing header attributes: %w", err)
}
}

Expand All @@ -174,20 +199,22 @@ func parseAndValidateFlags(args []string) (flags, error) {
}

return flags{
extProcLogLevel: *extProcLogLevelPtr,
extProcImage: *extProcImagePtr,
extProcImagePullPolicy: extProcPullPolicy,
enableLeaderElection: *enableLeaderElectionPtr,
logLevel: zapLogLevel,
extensionServerPort: *extensionServerPortPtr,
tlsCertDir: *tlsCertDir,
tlsCertName: *tlsCertName,
tlsKeyName: *tlsKeyName,
caBundleName: *caBundleName,
metricsRequestHeaderLabels: *metricsRequestHeaderLabels,
rootPrefix: *rootPrefix,
extProcExtraEnvVars: *extProcExtraEnvVars,
extProcMaxRecvMsgSize: *extProcMaxRecvMsgSize,
extProcLogLevel: *extProcLogLevelPtr,
extProcImage: *extProcImagePtr,
extProcImagePullPolicy: extProcPullPolicy,
enableLeaderElection: *enableLeaderElectionPtr,
logLevel: zapLogLevel,
extensionServerPort: *extensionServerPortPtr,
tlsCertDir: *tlsCertDir,
tlsCertName: *tlsCertName,
tlsKeyName: *tlsKeyName,
caBundleName: *caBundleName,
metricsRequestHeaderAttributes: *metricsRequestHeaderAttributes,
metricsRequestHeaderLabels: *metricsRequestHeaderLabels,
spanRequestHeaderAttributes: *spanRequestHeaderAttributes,
rootPrefix: *rootPrefix,
extProcExtraEnvVars: *extProcExtraEnvVars,
extProcMaxRecvMsgSize: *extProcMaxRecvMsgSize,
}, nil
}

Expand All @@ -200,6 +227,11 @@ func main() {
os.Exit(1)
}

// Warn if deprecated flag is being used.
if flags.metricsRequestHeaderLabels != "" {
setupLog.Info("The --metricsRequestHeaderLabels flag is deprecated and will be removed in a future release. Please use --metricsRequestHeaderAttributes instead.")
}

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&zap.Options{Development: true, Level: flags.logLevel})))
k8sConfig := ctrl.GetConfigOrDie()

Expand Down Expand Up @@ -255,15 +287,16 @@ func main() {

// Start the controller.
if err := controller.StartControllers(ctx, mgr, k8sConfig, ctrl.Log.WithName("controller"), controller.Options{
ExtProcImage: flags.extProcImage,
ExtProcImagePullPolicy: flags.extProcImagePullPolicy,
ExtProcLogLevel: flags.extProcLogLevel,
EnableLeaderElection: flags.enableLeaderElection,
UDSPath: extProcUDSPath,
MetricsRequestHeaderLabels: flags.metricsRequestHeaderLabels,
RootPrefix: flags.rootPrefix,
ExtProcExtraEnvVars: flags.extProcExtraEnvVars,
ExtProcMaxRecvMsgSize: flags.extProcMaxRecvMsgSize,
ExtProcImage: flags.extProcImage,
ExtProcImagePullPolicy: flags.extProcImagePullPolicy,
ExtProcLogLevel: flags.extProcLogLevel,
EnableLeaderElection: flags.enableLeaderElection,
UDSPath: extProcUDSPath,
MetricsRequestHeaderAttributes: flags.metricsRequestHeaderAttributes,
TracingRequestHeaderAttributes: flags.spanRequestHeaderAttributes,
RootPrefix: flags.rootPrefix,
ExtProcExtraEnvVars: flags.extProcExtraEnvVars,
ExtProcMaxRecvMsgSize: flags.extProcMaxRecvMsgSize,
}); err != nil {
setupLog.Error(err, "failed to start controller")
}
Expand Down
34 changes: 34 additions & 0 deletions cmd/controller/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func Test_parseAndValidateFlags(t *testing.T) {
tc.dash + "logLevel=debug",
tc.dash + "port=:8080",
tc.dash + "extProcExtraEnvVars=OTEL_SERVICE_NAME=test;OTEL_TRACES_EXPORTER=console",
tc.dash + "spanRequestHeaderAttributes=x-session-id:session.id",
}
f, err := parseAndValidateFlags(args)
require.Equal(t, "debug", f.extProcLogLevel)
Expand All @@ -57,11 +58,34 @@ func Test_parseAndValidateFlags(t *testing.T) {
require.Equal(t, "debug", f.logLevel.String())
require.Equal(t, ":8080", f.extensionServerPort)
require.Equal(t, "OTEL_SERVICE_NAME=test;OTEL_TRACES_EXPORTER=console", f.extProcExtraEnvVars)
require.Equal(t, "x-session-id:session.id", f.spanRequestHeaderAttributes)
require.NoError(t, err)
})
}
})

t.Run("deprecated metricsRequestHeaderLabels flag", func(t *testing.T) {
args := []string{
"--metricsRequestHeaderLabels=x-team-id:team.id",
}
f, err := parseAndValidateFlags(args)
require.NoError(t, err)
// Verify the deprecated flag value is used for metricsRequestHeaderAttributes
require.Equal(t, "x-team-id:team.id", f.metricsRequestHeaderAttributes)
require.Equal(t, "x-team-id:team.id", f.metricsRequestHeaderLabels)
})

t.Run("new flag takes precedence over deprecated flag", func(t *testing.T) {
args := []string{
"--metricsRequestHeaderLabels=x-old:old.value",
"--metricsRequestHeaderAttributes=x-new:new.value",
}
f, err := parseAndValidateFlags(args)
require.NoError(t, err)
// Verify the new flag takes precedence
require.Equal(t, "x-new:new.value", f.metricsRequestHeaderAttributes)
})

t.Run("invalid flags", func(t *testing.T) {
for _, tc := range []struct {
name string
Expand Down Expand Up @@ -93,6 +117,16 @@ func Test_parseAndValidateFlags(t *testing.T) {
flags: []string{"--extProcExtraEnvVars==value"},
expErr: "invalid extProc extra env vars",
},
{
name: "invalid spanRequestHeaderAttributes - missing colon",
flags: []string{"--spanRequestHeaderAttributes=x-session-id"},
expErr: "invalid tracing header attributes",
},
{
name: "invalid spanRequestHeaderAttributes - empty header",
flags: []string{"--spanRequestHeaderAttributes=:session.id"},
expErr: "invalid tracing header attributes",
},
} {
t.Run(tc.name, func(t *testing.T) {
_, err := parseAndValidateFlags(tc.flags)
Expand Down
Loading
Loading