diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 8d6c710c45..15cdb8cf98 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -308,7 +308,7 @@ func (r *Runner) Run(ctx context.Context) error { // --- Admission Control Initialization --- var admissionController requestcontrol.AdmissionController var locator contracts.PodLocator - locator = requestcontrol.NewDatastorePodLocator(ds) + locator = requestcontrol.NewDatastorePodLocator(ds, requestcontrol.WithDisableEndpointSubsetFilter(opts.DisableEndpointSubsetFilter)) if r.featureGates[flowcontrol.FeatureGate] { locator = requestcontrol.NewCachedPodLocator(ctx, locator, time.Millisecond*50) setupLog.Info("Initializing experimental Flow Control layer") diff --git a/pkg/epp/requestcontrol/locator.go b/pkg/epp/requestcontrol/locator.go index 8b732f617b..7dcbc16390 100644 --- a/pkg/epp/requestcontrol/locator.go +++ b/pkg/epp/requestcontrol/locator.go @@ -51,18 +51,41 @@ const ( // --- DatastorePodLocator (The Delegate) --- +// PodLocatorConfig holds configuration for the DatastorePodLocator. +type PodLocatorConfig struct { + DisableEndpointSubsetFilter bool +} + +// LocatorOption is a function that configures the PodLocatorConfig. +type LocatorOption func(*PodLocatorConfig) + +// WithDisableEndpointSubsetFilter sets the DisableEndpointSubsetFilter flag. +func WithDisableEndpointSubsetFilter(disable bool) LocatorOption { + return func(c *PodLocatorConfig) { + c.DisableEndpointSubsetFilter = disable + } +} + // DatastorePodLocator implements contracts.PodLocator by querying the EPP Datastore. // It centralizes the logic for resolving candidate pods based on request metadata (specifically Envoy subset filters). type DatastorePodLocator struct { datastore Datastore + config PodLocatorConfig } var _ contracts.PodLocator = &DatastorePodLocator{} // NewDatastorePodLocator creates a new DatastorePodLocator. -func NewDatastorePodLocator(ds Datastore) *DatastorePodLocator { +func NewDatastorePodLocator(ds Datastore, opts ...LocatorOption) *DatastorePodLocator { + cfg := PodLocatorConfig{ + DisableEndpointSubsetFilter: false, + } + for _, opt := range opts { + opt(&cfg) + } return &DatastorePodLocator{ datastore: ds, + config: cfg, } } @@ -75,6 +98,12 @@ func NewDatastorePodLocator(ds Datastore) *DatastorePodLocator { func (d *DatastorePodLocator) Locate(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics { loggerTrace := log.FromContext(ctx).V(logutil.TRACE) + // If the user explicitly disabled subset filtering, return the default pool (all pods). + if d.config.DisableEndpointSubsetFilter { + loggerTrace.Info("endpoint subset filtering is explicitly disabled, returning all pods") + return d.datastore.PodList(datastore.AllPodsPredicate) + } + // Check if the subset filter namespace exists in metadata. // If not, we assume the request targets the default pool (all pods). if requestMetadata == nil { diff --git a/pkg/epp/requestcontrol/locator_test.go b/pkg/epp/requestcontrol/locator_test.go index 0f5a5df1cc..4235497979 100644 --- a/pkg/epp/requestcontrol/locator_test.go +++ b/pkg/epp/requestcontrol/locator_test.go @@ -42,10 +42,10 @@ func TestDatastorePodLocator_Locate(t *testing.T) { allPods := []backendmetrics.PodMetrics{podA, podB, podC} mockDS := &mockDatastore{pods: allPods} - locator := NewDatastorePodLocator(mockDS) tests := []struct { name string + opts []LocatorOption metadata map[string]any expectedPodIPs []string }{ @@ -105,11 +105,40 @@ func TestDatastorePodLocator_Locate(t *testing.T) { }), expectedPodIPs: []string{"10.0.0.1"}, }, + { + name: "Subset filter with match (filter disabled)", + opts: []LocatorOption{ + WithDisableEndpointSubsetFilter(true), + }, + metadata: makeMetadataWithSubset([]any{ + "10.0.0.1:8080", + }), + expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, + }, + { + name: "Subset filter is present but list is empty (filter disabled)", + opts: []LocatorOption{ + WithDisableEndpointSubsetFilter(true), + }, + metadata: makeMetadataWithSubset([]any{}), + expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, + }, + { + name: "Subset filter with no matches (filter disabled)", + opts: []LocatorOption{ + WithDisableEndpointSubsetFilter(true), + }, + metadata: makeMetadataWithSubset([]any{ + "192.168.1.1:8080", + }), + expectedPodIPs: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"}, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { t.Parallel() + locator := NewDatastorePodLocator(mockDS, tc.opts...) result := locator.Locate(context.Background(), tc.metadata) var gotIPs []string diff --git a/pkg/epp/server/options.go b/pkg/epp/server/options.go index d4d17c291e..61c4f61d13 100644 --- a/pkg/epp/server/options.go +++ b/pkg/epp/server/options.go @@ -53,8 +53,9 @@ type Options struct { // // Endpoints (in lieu of using an InferencePool for service discovery). // - EndpointSelector string // Selector to filter model server pods on, only 'key=value' pairs are supported. (TODO: k8s.Selector, pflag.StringSlice?) - EndpointTargetPorts []int // Target ports of model server pods. + EndpointSelector string // Selector to filter model server pods on, only 'key=value' pairs are supported. (TODO: k8s.Selector, pflag.StringSlice?) + EndpointTargetPorts []int // Target ports of model server pods. + DisableEndpointSubsetFilter bool // Disables respecting x-gateway-destination-endpoint-subset in EPP. // // MSP metrics scraping. // @@ -100,6 +101,7 @@ func NewOptions() *Options { GRPCPort: DefaultGrpcPort, PoolGroup: "inference.networking.k8s.io", EndpointTargetPorts: []int{}, + DisableEndpointSubsetFilter: false, ModelServerMetricsScheme: "http", ModelServerMetricsPath: "/metrics", ModelServerMetricsHTTPSInsecure: true, @@ -141,6 +143,8 @@ func (opts *Options) AddFlags(fs *pflag.FlagSet) { "Format: a comma-separated list of key=value pairs without whitespace (e.g., 'app=vllm-llama3-8b-instruct,env=prod').") fs.IntSliceVar(&opts.EndpointTargetPorts, "endpoint-target-ports", opts.EndpointTargetPorts, "Target ports of model server pods. "+ "Format: a comma-separated list of numbers without whitespace (e.g., '3000,3001,3002').") + fs.BoolVar(&opts.DisableEndpointSubsetFilter, "disable-endpoint-subset-filter", opts.DisableEndpointSubsetFilter, + "Disables respecting the x-gateway-destination-endpoint-subset metadata for dispatching requests in EPP.") fs.StringVar(&opts.ModelServerMetricsScheme, "model-server-metrics-scheme", opts.ModelServerMetricsScheme, "Protocol scheme used in scraping metrics from endpoints.") fs.StringVar(&opts.ModelServerMetricsPath, "model-server-metrics-path", opts.ModelServerMetricsPath,