diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 28120dde6..f4a2c9b07 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -272,16 +272,16 @@ func (r *Runner) Run(ctx context.Context) error { } } - err = r.parseConfiguration(ctx) + err = r.parsePluginsConfiguration(ctx) if err != nil { - setupLog.Error(err, "Failed to parse the configuration") + setupLog.Error(err, "Failed to parse plugins configuration") return err } // --- Initialize Core EPP Components --- - scheduler := r.initializeScheduler() + scheduler := scheduling.NewSchedulerWithConfig(r.schedulerConfig) - saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log) + saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, setupLog) director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig) @@ -326,16 +326,7 @@ func (r *Runner) Run(ctx context.Context) error { return nil } -func (r *Runner) initializeScheduler() *scheduling.Scheduler { - if r.schedulerConfig != nil { - return scheduling.NewSchedulerWithConfig(r.schedulerConfig) - } - - // otherwise, no one configured from outside scheduler config. use existing configuration - return scheduling.NewScheduler() -} - -func (r *Runner) parseConfiguration(ctx context.Context) error { +func (r *Runner) parsePluginsConfiguration(ctx context.Context) error { if *configText == "" && *configFile == "" { return nil // configuring through code, not through file } diff --git a/pkg/epp/scheduling/scheduler.go b/pkg/epp/scheduling/scheduler.go index bd1a29655..51e188d82 100644 --- a/pkg/epp/scheduling/scheduler.go +++ b/pkg/epp/scheduling/scheduler.go @@ -25,11 +25,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -38,46 +34,6 @@ type Datastore interface { PodGetAll() []backendmetrics.PodMetrics } -// NewScheduler returns a new scheduler with default scheduler plugins configuration. -func NewScheduler() *Scheduler { - // When the scheduler is initialized with NewScheduler function, thw below config will be used as default. - // it's possible to call NewSchedulerWithConfig to pass a different scheduler config. - // For build time plugins changes, it's recommended to call in main.go to NewSchedulerWithConfig. - loraAffinityFilter := filter.NewLoraAffinityFilter(config.Conf.LoraAffinityThreshold) - leastQueueFilter := filter.NewLeastQueueFilter() - leastKvCacheFilter := filter.NewLeastKVCacheFilter() - - lowLatencyFilter := &filter.DecisionTreeFilter{ - Current: filter.NewLowQueueFilter(config.Conf.QueueingThresholdLoRA), - NextOnSuccess: &filter.DecisionTreeFilter{ - Current: loraAffinityFilter, - NextOnSuccessOrFailure: &filter.DecisionTreeFilter{ - Current: leastQueueFilter, - NextOnSuccessOrFailure: &filter.DecisionTreeFilter{ - Current: leastKvCacheFilter, - }, - }, - }, - NextOnFailure: &filter.DecisionTreeFilter{ - Current: leastQueueFilter, - NextOnSuccessOrFailure: &filter.DecisionTreeFilter{ - Current: loraAffinityFilter, - NextOnSuccessOrFailure: &filter.DecisionTreeFilter{ - Current: leastKvCacheFilter, - }, - }, - }, - } - - defaultProfile := framework.NewSchedulerProfile(). - WithFilters(lowLatencyFilter). - WithPicker(picker.NewRandomPicker(picker.DefaultMaxNumOfEndpoints)) - - profileHandler := profile.NewSingleProfileHandler() - - return NewSchedulerWithConfig(NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile})) -} - // NewSchedulerWithConfig returns a new scheduler with the given scheduler plugins configuration. func NewSchedulerWithConfig(config *SchedulerConfig) *Scheduler { return &Scheduler{ diff --git a/pkg/epp/scheduling/scheduler_test.go b/pkg/epp/scheduling/scheduler_test.go index 996d15210..8923c3d6a 100644 --- a/pkg/epp/scheduling/scheduler_test.go +++ b/pkg/epp/scheduling/scheduler_test.go @@ -25,11 +25,50 @@ import ( k8stypes "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" // Import config for thresholds + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) // Tests the default scheduler configuration and expected behavior. func TestSchedule(t *testing.T) { + loraAffinityFilter := filter.NewLoraAffinityFilter(config.Conf.LoraAffinityThreshold) + leastQueueFilter := filter.NewLeastQueueFilter() + leastKvCacheFilter := filter.NewLeastKVCacheFilter() + + lowLatencyFilter := &filter.DecisionTreeFilter{ + Current: filter.NewLowQueueFilter(config.Conf.QueueingThresholdLoRA), + NextOnSuccess: &filter.DecisionTreeFilter{ + Current: loraAffinityFilter, + NextOnSuccessOrFailure: &filter.DecisionTreeFilter{ + Current: leastQueueFilter, + NextOnSuccessOrFailure: &filter.DecisionTreeFilter{ + Current: leastKvCacheFilter, + }, + }, + }, + NextOnFailure: &filter.DecisionTreeFilter{ + Current: leastQueueFilter, + NextOnSuccessOrFailure: &filter.DecisionTreeFilter{ + Current: loraAffinityFilter, + NextOnSuccessOrFailure: &filter.DecisionTreeFilter{ + Current: leastKvCacheFilter, + }, + }, + }, + } + + defaultProfile := framework.NewSchedulerProfile(). + WithFilters(lowLatencyFilter). + WithPicker(picker.NewRandomPicker(picker.DefaultMaxNumOfEndpoints)) + + profileHandler := profile.NewSingleProfileHandler() + + schedulerConfig := NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile}) + tests := []struct { name string req *types.LLMRequest @@ -120,7 +159,7 @@ func TestSchedule(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - scheduler := NewScheduler() + scheduler := NewSchedulerWithConfig(schedulerConfig) got, err := scheduler.Schedule(context.Background(), test.req, test.input) if test.err != (err != nil) { t.Errorf("Unexpected error, got %v, want %v", err, test.err) diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 6d439d17d..3a3bd9eff 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -50,11 +50,13 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/config" + crconfig "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/envtest" crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/yaml" + "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" @@ -63,11 +65,15 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" epptestutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" integrationutils "sigs.k8s.io/gateway-api-inference-extension/test/integration" - "sigs.k8s.io/yaml" ) const ( @@ -1018,7 +1024,41 @@ func BeforeSuite() func() { // Adjust from defaults serverRunner.PoolNamespacedName = types.NamespacedName{Name: testPoolName, Namespace: testNamespace} serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf) - scheduler := scheduling.NewScheduler() + + loraAffinityFilter := filter.NewLoraAffinityFilter(config.Conf.LoraAffinityThreshold) + leastQueueFilter := filter.NewLeastQueueFilter() + leastKvCacheFilter := filter.NewLeastKVCacheFilter() + + lowLatencyFilter := &filter.DecisionTreeFilter{ + Current: filter.NewLowQueueFilter(config.Conf.QueueingThresholdLoRA), + NextOnSuccess: &filter.DecisionTreeFilter{ + Current: loraAffinityFilter, + NextOnSuccessOrFailure: &filter.DecisionTreeFilter{ + Current: leastQueueFilter, + NextOnSuccessOrFailure: &filter.DecisionTreeFilter{ + Current: leastKvCacheFilter, + }, + }, + }, + NextOnFailure: &filter.DecisionTreeFilter{ + Current: leastQueueFilter, + NextOnSuccessOrFailure: &filter.DecisionTreeFilter{ + Current: loraAffinityFilter, + NextOnSuccessOrFailure: &filter.DecisionTreeFilter{ + Current: leastKvCacheFilter, + }, + }, + }, + } + + defaultProfile := framework.NewSchedulerProfile(). + WithFilters(lowLatencyFilter). + WithPicker(picker.NewRandomPicker(picker.DefaultMaxNumOfEndpoints)) + + profileHandler := profile.NewSingleProfileHandler() + + schedulerConfig := scheduling.NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile}) + scheduler := scheduling.NewSchedulerWithConfig(schedulerConfig) sdConfig := &saturationdetector.Config{ QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold, @@ -1125,7 +1165,7 @@ func managerTestOptions(namespace, name string, metricsServerOptions metricsserv }, }, }, - Controller: config.Controller{ + Controller: crconfig.Controller{ SkipNameValidation: boolPointer(true), }, Metrics: metricsServerOptions,