From ba1bb0b0cfc567448abf108ad16b443dfa880919 Mon Sep 17 00:00:00 2001 From: Aishu Kamal Date: Sat, 10 Jan 2026 02:04:34 +0000 Subject: [PATCH 1/3] Optionally disable endpoint subset filtering while dispatching requests from epp --- cmd/epp/runner/runner.go | 5 +++- pkg/epp/requestcontrol/director_test.go | 12 ++++---- pkg/epp/requestcontrol/locator.go | 15 +++++++++- pkg/epp/requestcontrol/locator_test.go | 39 ++++++++++++++++++++++++- pkg/epp/server/options.go | 4 +++ test/integration/epp/harness.go | 4 ++- 6 files changed, 69 insertions(+), 10 deletions(-) diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 8d6c710c45..cd7ad8451e 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -308,7 +308,10 @@ func (r *Runner) Run(ctx context.Context) error { // --- Admission Control Initialization --- var admissionController requestcontrol.AdmissionController var locator contracts.PodLocator - locator = requestcontrol.NewDatastorePodLocator(ds) + locatorCfg := requestcontrol.PodLocatorConfig { + DisableEndpointSubsetFilter: opts.DisableEndpointSubsetFilter, + } + locator = requestcontrol.NewDatastorePodLocator(ds, locatorCfg) if r.featureGates[flowcontrol.FeatureGate] { locator = requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50) setupLog.Info("Initializing experimental Flow Control layer") diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index c5ff58a364..dcd0055d7f 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -659,7 +659,7 @@ func TestDirector_HandleRequest(t *testing.T) { } config = config.WithAdmissionPlugins(newMockAdmissionPlugin("test-admit-plugin", test.admitRequestDenialError)) - locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute) + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds, PodLocatorConfig{}), time.Minute) director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, locator, config) if test.name == "successful request with model rewrite" { mockDs := &mockDatastore{ @@ -667,7 +667,7 @@ func TestDirector_HandleRequest(t *testing.T) { rewrites: []*v1alpha2.InferenceModelRewrite{rewrite}, } director.datastore = mockDs - director.podLocator = NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs), time.Minute) + director.podLocator = NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs, PodLocatorConfig{}), time.Minute) } reqCtx := &handlers.RequestContext{ @@ -956,7 +956,7 @@ func TestDirector_ApplyWeightedModelRewrite(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { mockDs := &mockDatastore{rewrites: test.rewrites} - locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs), time.Minute) + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs, PodLocatorConfig{}), time.Minute) director := NewDirectorWithConfig(mockDs, &mockScheduler{}, &mockAdmissionController{}, locator, NewConfig()) reqCtx := &handlers.RequestContext{ @@ -1057,7 +1057,7 @@ func TestDirector_HandleResponseReceived(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil, 0) mockSched := &mockScheduler{} - locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute) + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds, PodLocatorConfig{}), time.Minute) director := NewDirectorWithConfig( ds, mockSched, @@ -1101,7 +1101,7 @@ func TestDirector_HandleResponseStreaming(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil, 0) mockSched := &mockScheduler{} - locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute) + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds, PodLocatorConfig{}), time.Minute) director := NewDirectorWithConfig(ds, mockSched, nil, locator, NewConfig().WithResponseStreamingPlugins(ps1)) reqCtx := &handlers.RequestContext{ @@ -1138,7 +1138,7 @@ func TestDirector_HandleResponseComplete(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil, 0) mockSched := &mockScheduler{} - locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute) + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds, PodLocatorConfig{}), time.Minute) director := NewDirectorWithConfig(ds, mockSched, nil, locator, NewConfig().WithResponseCompletePlugins(pc1)) reqCtx := &handlers.RequestContext{ diff --git a/pkg/epp/requestcontrol/locator.go b/pkg/epp/requestcontrol/locator.go index 8b732f617b..ca8da249b6 100644 --- a/pkg/epp/requestcontrol/locator.go +++ b/pkg/epp/requestcontrol/locator.go @@ -51,18 +51,25 @@ const ( // --- DatastorePodLocator (The Delegate) --- +// PodLocatorConfig holds configuration for the DatastorePodLocator. +type PodLocatorConfig struct { + DisableEndpointSubsetFilter bool +} + // DatastorePodLocator implements contracts.PodLocator by querying the EPP Datastore. // It centralizes the logic for resolving candidate pods based on request metadata (specifically Envoy subset filters). type DatastorePodLocator struct { datastore Datastore + config PodLocatorConfig } var _ contracts.PodLocator = &DatastorePodLocator{} // NewDatastorePodLocator creates a new DatastorePodLocator. -func NewDatastorePodLocator(ds Datastore) *DatastorePodLocator { +func NewDatastorePodLocator(ds Datastore, cfg PodLocatorConfig) *DatastorePodLocator { return &DatastorePodLocator{ datastore: ds, + config: cfg, } } @@ -75,6 +82,12 @@ func NewDatastorePodLocator(ds Datastore) *DatastorePodLocator { func (d *DatastorePodLocator) Locate(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics { loggerTrace := log.FromContext(ctx).V(logutil.TRACE) + // If the user explicitly disabled subset filtering, return the default pool (all pods). + if d.config.DisableEndpointSubsetFilter { + loggerTrace.Info("endpoint subset filtering is explicitly disabled, returning all pods") + return d.datastore.PodList(datastore.AllPodsPredicate) + } + // Check if the subset filter namespace exists in metadata. // If not, we assume the request targets the default pool (all pods). if requestMetadata == nil { diff --git a/pkg/epp/requestcontrol/locator_test.go b/pkg/epp/requestcontrol/locator_test.go index 0f5a5df1cc..8ab8dc92d6 100644 --- a/pkg/epp/requestcontrol/locator_test.go +++ b/pkg/epp/requestcontrol/locator_test.go @@ -42,25 +42,28 @@ func TestDatastorePodLocator_Locate(t *testing.T) { allPods := []backendmetrics.PodMetrics{podA, podB, podC} mockDS := &mockDatastore{pods: allPods} - locator := NewDatastorePodLocator(mockDS) tests := []struct { name string + config PodLocatorConfig metadata map[string]any expectedPodIPs []string }{ { name: "Nil metadata returns all pods", + config: PodLocatorConfig{}, metadata: nil, expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, }, { name: "Empty metadata returns all pods", + config: PodLocatorConfig{}, metadata: map[string]any{}, expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, }, { name: "Metadata without subset namespace returns all pods", + config: PodLocatorConfig{}, metadata: map[string]any{ "other-filter": "value", }, @@ -68,6 +71,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { }, { name: "Subset filter with single match", + config: PodLocatorConfig{}, metadata: makeMetadataWithSubset([]any{ "10.0.0.1:8080", }), @@ -75,6 +79,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { }, { name: "Subset filter with multiple matches", + config: PodLocatorConfig{}, metadata: makeMetadataWithSubset([]any{ "10.0.0.1:8080", "10.0.0.3:9090", @@ -83,6 +88,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { }, { name: "Subset filter with no matches (Scale-from-Zero scenario)", + config: PodLocatorConfig{}, metadata: makeMetadataWithSubset([]any{ "192.168.1.1:8080", // Does not exist in mockDS }), @@ -90,6 +96,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { }, { name: "Subset filter is present but list is empty", + config: PodLocatorConfig{}, metadata: map[string]any{ metadata.SubsetFilterNamespace: map[string]any{ metadata.SubsetFilterKey: []any{}, @@ -99,17 +106,47 @@ func TestDatastorePodLocator_Locate(t *testing.T) { }, { name: "Subset filter contains malformed data (non-string)", + config: PodLocatorConfig{}, metadata: makeMetadataWithSubset([]any{ "10.0.0.1:8080", 12345, // Should be ignored }), expectedPodIPs: []string{"10.0.0.1"}, }, + { + name: "Subset filter with match (filter disabled)", + config: PodLocatorConfig{ + DisableEndpointSubsetFilter: true, + }, + metadata: makeMetadataWithSubset([]any{ + "10.0.0.1:8080", + }), + expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, + }, + { + name: "Subset filter is present but list is empty (filter disabled)", + config: PodLocatorConfig{ + DisableEndpointSubsetFilter: true, + }, + metadata: makeMetadataWithSubset([]any{}), + expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, + }, + { + name: "Subset filter with no matches (filter disabled)", + config: PodLocatorConfig{ + DisableEndpointSubsetFilter: true, + }, + metadata: makeMetadataWithSubset([]any{ + "192.168.1.1:8080", + }), + expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { t.Parallel() + locator := NewDatastorePodLocator(mockDS, tc.config) result := locator.Locate(context.Background(), tc.metadata) var gotIPs []string diff --git a/pkg/epp/server/options.go b/pkg/epp/server/options.go index d4d17c291e..7a57cd1024 100644 --- a/pkg/epp/server/options.go +++ b/pkg/epp/server/options.go @@ -55,6 +55,7 @@ type Options struct { // EndpointSelector string // Selector to filter model server pods on, only 'key=value' pairs are supported. (TODO: k8s.Selector, pflag.StringSlice?) EndpointTargetPorts []int // Target ports of model server pods. + DisableEndpointSubsetFilter bool // Disables respecting x-gateway-destination-endpoint-subset in EPP. // // MSP metrics scraping. // @@ -100,6 +101,7 @@ func NewOptions() *Options { GRPCPort: DefaultGrpcPort, PoolGroup: "inference.networking.k8s.io", EndpointTargetPorts: []int{}, + DisableEndpointSubsetFilter: false, ModelServerMetricsScheme: "http", ModelServerMetricsPath: "/metrics", ModelServerMetricsHTTPSInsecure: true, @@ -141,6 +143,8 @@ func (opts *Options) AddFlags(fs *pflag.FlagSet) { "Format: a comma-separated list of key=value pairs without whitespace (e.g., 'app=vllm-llama3-8b-instruct,env=prod').") fs.IntSliceVar(&opts.EndpointTargetPorts, "endpoint-target-ports", opts.EndpointTargetPorts, "Target ports of model server pods. "+ "Format: a comma-separated list of numbers without whitespace (e.g., '3000,3001,3002').") + fs.BoolVar(&opts.DisableEndpointSubsetFilter, "disable-endpoint-subset-filter", opts.DisableEndpointSubsetFilter, + "Disables respecting the x-gateway-destination-endpoint-subset metadata for dispatching requests in EPP.") fs.StringVar(&opts.ModelServerMetricsScheme, "model-server-metrics-scheme", opts.ModelServerMetricsScheme, "Protocol scheme used in scraping metrics from endpoints.") fs.StringVar(&opts.ModelServerMetricsPath, "model-server-metrics-path", opts.ModelServerMetricsPath, diff --git a/test/integration/epp/harness.go b/test/integration/epp/harness.go index ca3f715b42..0ebbbc8ef9 100644 --- a/test/integration/epp/harness.go +++ b/test/integration/epp/harness.go @@ -168,7 +168,9 @@ func NewTestHarness(t *testing.T, ctx context.Context) *TestHarness { MetricsStalenessThreshold: utilizationdetector.DefaultMetricsStalenessThreshold, } runner.SaturationDetector = utilizationdetector.NewDetector(sdConfig, logger.WithName("sd")) - locator := requestcontrol.NewDatastorePodLocator(runner.Datastore) + locator := requestcontrol.NewDatastorePodLocator(runner.Datastore, requestcontrol.PodLocatorConfig { + DisableEndpointSubsetFilter: false, + }) runner.Director = requestcontrol.NewDirectorWithConfig( runner.Datastore, scheduling.NewSchedulerWithConfig(schedulerConfig), From f87d02b64a3463e524754e06f7a58ec3269badb2 Mon Sep 17 00:00:00 2001 From: Aishu Kamal Date: Mon, 12 Jan 2026 20:36:44 +0000 Subject: [PATCH 2/3] Fix formatting issues --- pkg/epp/requestcontrol/locator.go | 4 +- pkg/epp/requestcontrol/locator_test.go | 68 +++++++++++++------------- pkg/epp/server/options.go | 8 +-- 3 files changed, 40 insertions(+), 40 deletions(-) diff --git a/pkg/epp/requestcontrol/locator.go b/pkg/epp/requestcontrol/locator.go index ca8da249b6..6817abfa08 100644 --- a/pkg/epp/requestcontrol/locator.go +++ b/pkg/epp/requestcontrol/locator.go @@ -60,7 +60,7 @@ type PodLocatorConfig struct { // It centralizes the logic for resolving candidate pods based on request metadata (specifically Envoy subset filters). type DatastorePodLocator struct { datastore Datastore - config PodLocatorConfig + config PodLocatorConfig } var _ contracts.PodLocator = &DatastorePodLocator{} @@ -69,7 +69,7 @@ var _ contracts.PodLocator = &DatastorePodLocator{} func NewDatastorePodLocator(ds Datastore, cfg PodLocatorConfig) *DatastorePodLocator { return &DatastorePodLocator{ datastore: ds, - config: cfg, + config: cfg, } } diff --git a/pkg/epp/requestcontrol/locator_test.go b/pkg/epp/requestcontrol/locator_test.go index 8ab8dc92d6..8162d41856 100644 --- a/pkg/epp/requestcontrol/locator_test.go +++ b/pkg/epp/requestcontrol/locator_test.go @@ -45,7 +45,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { tests := []struct { name string - config PodLocatorConfig + config PodLocatorConfig metadata map[string]any expectedPodIPs []string }{ @@ -62,7 +62,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, }, { - name: "Metadata without subset namespace returns all pods", + name: "Metadata without subset namespace returns all pods", config: PodLocatorConfig{}, metadata: map[string]any{ "other-filter": "value", @@ -70,7 +70,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, }, { - name: "Subset filter with single match", + name: "Subset filter with single match", config: PodLocatorConfig{}, metadata: makeMetadataWithSubset([]any{ "10.0.0.1:8080", @@ -78,7 +78,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { expectedPodIPs: []string{"10.0.0.1"}, }, { - name: "Subset filter with multiple matches", + name: "Subset filter with multiple matches", config: PodLocatorConfig{}, metadata: makeMetadataWithSubset([]any{ "10.0.0.1:8080", @@ -87,7 +87,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { expectedPodIPs: []string{"10.0.0.1", "10.0.0.3"}, }, { - name: "Subset filter with no matches (Scale-from-Zero scenario)", + name: "Subset filter with no matches (Scale-from-Zero scenario)", config: PodLocatorConfig{}, metadata: makeMetadataWithSubset([]any{ "192.168.1.1:8080", // Does not exist in mockDS @@ -95,7 +95,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { expectedPodIPs: []string{}, }, { - name: "Subset filter is present but list is empty", + name: "Subset filter is present but list is empty", config: PodLocatorConfig{}, metadata: map[string]any{ metadata.SubsetFilterNamespace: map[string]any{ @@ -105,7 +105,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { expectedPodIPs: []string{}, }, { - name: "Subset filter contains malformed data (non-string)", + name: "Subset filter contains malformed data (non-string)", config: PodLocatorConfig{}, metadata: makeMetadataWithSubset([]any{ "10.0.0.1:8080", @@ -114,33 +114,33 @@ func TestDatastorePodLocator_Locate(t *testing.T) { expectedPodIPs: []string{"10.0.0.1"}, }, { - name: "Subset filter with match (filter disabled)", - config: PodLocatorConfig{ - DisableEndpointSubsetFilter: true, - }, - metadata: makeMetadataWithSubset([]any{ - "10.0.0.1:8080", - }), - expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, - }, - { - name: "Subset filter is present but list is empty (filter disabled)", - config: PodLocatorConfig{ - DisableEndpointSubsetFilter: true, - }, - metadata: makeMetadataWithSubset([]any{}), - expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, - }, - { - name: "Subset filter with no matches (filter disabled)", - config: PodLocatorConfig{ - DisableEndpointSubsetFilter: true, - }, - metadata: makeMetadataWithSubset([]any{ - "192.168.1.1:8080", - }), - expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, - }, + name: "Subset filter with match (filter disabled)", + config: PodLocatorConfig{ + DisableEndpointSubsetFilter: true, + }, + metadata: makeMetadataWithSubset([]any{ + "10.0.0.1:8080", + }), + expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, + }, + { + name: "Subset filter is present but list is empty (filter disabled)", + config: PodLocatorConfig{ + DisableEndpointSubsetFilter: true, + }, + metadata: makeMetadataWithSubset([]any{}), + expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, + }, + { + name: "Subset filter with no matches (filter disabled)", + config: PodLocatorConfig{ + DisableEndpointSubsetFilter: true, + }, + metadata: makeMetadataWithSubset([]any{ + "192.168.1.1:8080", + }), + expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, + }, } for _, tc := range tests { diff --git a/pkg/epp/server/options.go b/pkg/epp/server/options.go index 7a57cd1024..61c4f61d13 100644 --- a/pkg/epp/server/options.go +++ b/pkg/epp/server/options.go @@ -53,9 +53,9 @@ type Options struct { // // Endpoints (in lieu of using an InferencePool for service discovery). // - EndpointSelector string // Selector to filter model server pods on, only 'key=value' pairs are supported. (TODO: k8s.Selector, pflag.StringSlice?) - EndpointTargetPorts []int // Target ports of model server pods. - DisableEndpointSubsetFilter bool // Disables respecting x-gateway-destination-endpoint-subset in EPP. + EndpointSelector string // Selector to filter model server pods on, only 'key=value' pairs are supported. (TODO: k8s.Selector, pflag.StringSlice?) + EndpointTargetPorts []int // Target ports of model server pods. + DisableEndpointSubsetFilter bool // Disables respecting x-gateway-destination-endpoint-subset in EPP. // // MSP metrics scraping. // @@ -101,7 +101,7 @@ func NewOptions() *Options { GRPCPort: DefaultGrpcPort, PoolGroup: "inference.networking.k8s.io", EndpointTargetPorts: []int{}, - DisableEndpointSubsetFilter: false, + DisableEndpointSubsetFilter: false, ModelServerMetricsScheme: "http", ModelServerMetricsPath: "/metrics", ModelServerMetricsHTTPSInsecure: true, From cd7886581f12e2603f2424f804421c9934cf78a8 Mon Sep 17 00:00:00 2001 From: Aishu Kamal Date: Mon, 12 Jan 2026 21:08:46 +0000 Subject: [PATCH 3/3] Address comments, use variadic function args --- cmd/epp/runner/runner.go | 5 +--- pkg/epp/requestcontrol/director_test.go | 12 ++++----- pkg/epp/requestcontrol/locator.go | 18 ++++++++++++- pkg/epp/requestcontrol/locator_test.go | 36 ++++++++++--------------- test/integration/epp/harness.go | 4 +-- 5 files changed, 39 insertions(+), 36 deletions(-) diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index cd7ad8451e..15cdb8cf98 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -308,10 +308,7 @@ func (r *Runner) Run(ctx context.Context) error { // --- Admission Control Initialization --- var admissionController requestcontrol.AdmissionController var locator contracts.PodLocator - locatorCfg := requestcontrol.PodLocatorConfig { - DisableEndpointSubsetFilter: opts.DisableEndpointSubsetFilter, - } - locator = requestcontrol.NewDatastorePodLocator(ds, locatorCfg) + locator = requestcontrol.NewDatastorePodLocator(ds, requestcontrol.WithDisableEndpointSubsetFilter(opts.DisableEndpointSubsetFilter)) if r.featureGates[flowcontrol.FeatureGate] { locator = requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50) setupLog.Info("Initializing experimental Flow Control layer") diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index dcd0055d7f..c5ff58a364 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -659,7 +659,7 @@ func TestDirector_HandleRequest(t *testing.T) { } config = config.WithAdmissionPlugins(newMockAdmissionPlugin("test-admit-plugin", test.admitRequestDenialError)) - locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds, PodLocatorConfig{}), time.Minute) + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute) director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, locator, config) if test.name == "successful request with model rewrite" { mockDs := &mockDatastore{ @@ -667,7 +667,7 @@ func TestDirector_HandleRequest(t *testing.T) { rewrites: []*v1alpha2.InferenceModelRewrite{rewrite}, } director.datastore = mockDs - director.podLocator = NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs, PodLocatorConfig{}), time.Minute) + director.podLocator = NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs), time.Minute) } reqCtx := &handlers.RequestContext{ @@ -956,7 +956,7 @@ func TestDirector_ApplyWeightedModelRewrite(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { mockDs := &mockDatastore{rewrites: test.rewrites} - locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs, PodLocatorConfig{}), time.Minute) + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(mockDs), time.Minute) director := NewDirectorWithConfig(mockDs, &mockScheduler{}, &mockAdmissionController{}, locator, NewConfig()) reqCtx := &handlers.RequestContext{ @@ -1057,7 +1057,7 @@ func TestDirector_HandleResponseReceived(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil, 0) mockSched := &mockScheduler{} - locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds, PodLocatorConfig{}), time.Minute) + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute) director := NewDirectorWithConfig( ds, mockSched, @@ -1101,7 +1101,7 @@ func TestDirector_HandleResponseStreaming(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil, 0) mockSched := &mockScheduler{} - locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds, PodLocatorConfig{}), time.Minute) + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute) director := NewDirectorWithConfig(ds, mockSched, nil, locator, NewConfig().WithResponseStreamingPlugins(ps1)) reqCtx := &handlers.RequestContext{ @@ -1138,7 +1138,7 @@ func TestDirector_HandleResponseComplete(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil, 0) mockSched := &mockScheduler{} - locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds, PodLocatorConfig{}), time.Minute) + locator := NewCachedPodLocator(context.Background(), NewDatastorePodLocator(ds), time.Minute) director := NewDirectorWithConfig(ds, mockSched, nil, locator, NewConfig().WithResponseCompletePlugins(pc1)) reqCtx := &handlers.RequestContext{ diff --git a/pkg/epp/requestcontrol/locator.go b/pkg/epp/requestcontrol/locator.go index 6817abfa08..7dcbc16390 100644 --- a/pkg/epp/requestcontrol/locator.go +++ b/pkg/epp/requestcontrol/locator.go @@ -56,6 +56,16 @@ type PodLocatorConfig struct { DisableEndpointSubsetFilter bool } +// LocatorOption is a function that configures the PodLocatorConfig. +type LocatorOption func(*PodLocatorConfig) + +// WithDisableEndpointSubsetFilter sets the DisableEndpointSubsetFilter flag. +func WithDisableEndpointSubsetFilter(disable bool) LocatorOption { + return func(c *PodLocatorConfig) { + c.DisableEndpointSubsetFilter = disable + } +} + // DatastorePodLocator implements contracts.PodLocator by querying the EPP Datastore. // It centralizes the logic for resolving candidate pods based on request metadata (specifically Envoy subset filters). type DatastorePodLocator struct { @@ -66,7 +76,13 @@ type DatastorePodLocator struct { var _ contracts.PodLocator = &DatastorePodLocator{} // NewDatastorePodLocator creates a new DatastorePodLocator. -func NewDatastorePodLocator(ds Datastore, cfg PodLocatorConfig) *DatastorePodLocator { +func NewDatastorePodLocator(ds Datastore, opts ...LocatorOption) *DatastorePodLocator { + cfg := PodLocatorConfig{ + DisableEndpointSubsetFilter: false, + } + for _, opt := range opts { + opt(&cfg) + } return &DatastorePodLocator{ datastore: ds, config: cfg, diff --git a/pkg/epp/requestcontrol/locator_test.go b/pkg/epp/requestcontrol/locator_test.go index 8162d41856..4235497979 100644 --- a/pkg/epp/requestcontrol/locator_test.go +++ b/pkg/epp/requestcontrol/locator_test.go @@ -45,41 +45,36 @@ func TestDatastorePodLocator_Locate(t *testing.T) { tests := []struct { name string - config PodLocatorConfig + opts []LocatorOption metadata map[string]any expectedPodIPs []string }{ { name: "Nil metadata returns all pods", - config: PodLocatorConfig{}, metadata: nil, expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, }, { name: "Empty metadata returns all pods", - config: PodLocatorConfig{}, metadata: map[string]any{}, expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, }, { - name: "Metadata without subset namespace returns all pods", - config: PodLocatorConfig{}, + name: "Metadata without subset namespace returns all pods", metadata: map[string]any{ "other-filter": "value", }, expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, }, { - name: "Subset filter with single match", - config: PodLocatorConfig{}, + name: "Subset filter with single match", metadata: makeMetadataWithSubset([]any{ "10.0.0.1:8080", }), expectedPodIPs: []string{"10.0.0.1"}, }, { - name: "Subset filter with multiple matches", - config: PodLocatorConfig{}, + name: "Subset filter with multiple matches", metadata: makeMetadataWithSubset([]any{ "10.0.0.1:8080", "10.0.0.3:9090", @@ -87,16 +82,14 @@ func TestDatastorePodLocator_Locate(t *testing.T) { expectedPodIPs: []string{"10.0.0.1", "10.0.0.3"}, }, { - name: "Subset filter with no matches (Scale-from-Zero scenario)", - config: PodLocatorConfig{}, + name: "Subset filter with no matches (Scale-from-Zero scenario)", metadata: makeMetadataWithSubset([]any{ "192.168.1.1:8080", // Does not exist in mockDS }), expectedPodIPs: []string{}, }, { - name: "Subset filter is present but list is empty", - config: PodLocatorConfig{}, + name: "Subset filter is present but list is empty", metadata: map[string]any{ metadata.SubsetFilterNamespace: map[string]any{ metadata.SubsetFilterKey: []any{}, @@ -105,8 +98,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { expectedPodIPs: []string{}, }, { - name: "Subset filter contains malformed data (non-string)", - config: PodLocatorConfig{}, + name: "Subset filter contains malformed data (non-string)", metadata: makeMetadataWithSubset([]any{ "10.0.0.1:8080", 12345, // Should be ignored @@ -115,8 +107,8 @@ func TestDatastorePodLocator_Locate(t *testing.T) { }, { name: "Subset filter with match (filter disabled)", - config: PodLocatorConfig{ - DisableEndpointSubsetFilter: true, + opts: []LocatorOption{ + WithDisableEndpointSubsetFilter(true), }, metadata: makeMetadataWithSubset([]any{ "10.0.0.1:8080", @@ -125,16 +117,16 @@ func TestDatastorePodLocator_Locate(t *testing.T) { }, { name: "Subset filter is present but list is empty (filter disabled)", - config: PodLocatorConfig{ - DisableEndpointSubsetFilter: true, + opts: []LocatorOption{ + WithDisableEndpointSubsetFilter(true), }, metadata: makeMetadataWithSubset([]any{}), expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, }, { name: "Subset filter with no matches (filter disabled)", - config: PodLocatorConfig{ - DisableEndpointSubsetFilter: true, + opts: []LocatorOption{ + WithDisableEndpointSubsetFilter(true), }, metadata: makeMetadataWithSubset([]any{ "192.168.1.1:8080", @@ -146,7 +138,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { t.Parallel() - locator := NewDatastorePodLocator(mockDS, tc.config) + locator := NewDatastorePodLocator(mockDS, tc.opts...) result := locator.Locate(context.Background(), tc.metadata) var gotIPs []string diff --git a/test/integration/epp/harness.go b/test/integration/epp/harness.go index 0ebbbc8ef9..ca3f715b42 100644 --- a/test/integration/epp/harness.go +++ b/test/integration/epp/harness.go @@ -168,9 +168,7 @@ func NewTestHarness(t *testing.T, ctx context.Context) *TestHarness { MetricsStalenessThreshold: utilizationdetector.DefaultMetricsStalenessThreshold, } runner.SaturationDetector = utilizationdetector.NewDetector(sdConfig, logger.WithName("sd")) - locator := requestcontrol.NewDatastorePodLocator(runner.Datastore, requestcontrol.PodLocatorConfig { - DisableEndpointSubsetFilter: false, - }) + locator := requestcontrol.NewDatastorePodLocator(runner.Datastore) runner.Director = requestcontrol.NewDirectorWithConfig( runner.Datastore, scheduling.NewSchedulerWithConfig(schedulerConfig),