diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 045d4f9d7..b94e5a5a8 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -242,7 +242,7 @@ func (r *Runner) Run(ctx context.Context) error { } // --- Initialize Core EPP Components --- - scheduler, err := r.initializeScheduler(datastore) + scheduler, err := r.initializeScheduler() if err != nil { setupLog.Error(err, "Failed to create scheduler") return err @@ -293,13 +293,13 @@ func (r *Runner) Run(ctx context.Context) error { return nil } -func (r *Runner) initializeScheduler(datastore datastore.Datastore) (*scheduling.Scheduler, error) { +func (r *Runner) initializeScheduler() (*scheduling.Scheduler, error) { if r.schedulerConfig != nil { - return scheduling.NewSchedulerWithConfig(datastore, r.schedulerConfig), nil + return scheduling.NewSchedulerWithConfig(r.schedulerConfig), nil } // otherwise, no one configured from outside scheduler config. use existing configuration - scheduler := scheduling.NewScheduler(datastore) + scheduler := scheduling.NewScheduler() if schedulerV2 { queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog) kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog) @@ -317,11 +317,11 @@ func (r *Runner) initializeScheduler(datastore datastore.Datastore) (*scheduling } schedulerConfig := scheduling.NewSchedulerConfig(profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"schedulerv2": schedulerProfile}) - scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig) + scheduler = scheduling.NewSchedulerWithConfig(schedulerConfig) } if reqHeaderBasedSchedulerForTesting { - scheduler = conformance_epp.NewReqHeaderBasedScheduler(datastore) + scheduler = conformance_epp.NewReqHeaderBasedScheduler() } return scheduler, nil diff --git a/conformance/testing-epp/scheduler.go b/conformance/testing-epp/scheduler.go index 26028adc9..94f9ee5bb 100644 --- a/conformance/testing-epp/scheduler.go +++ b/conformance/testing-epp/scheduler.go @@ -27,11 +27,11 @@ import ( // NewReqHeaderBasedScheduler creates a scheduler for conformance tests that selects // an endpoint based on the "test-epp-endpoint-selection" request header. If the // header is missing or the specified endpoint doesn't exist, no endpoint is returned. -func NewReqHeaderBasedScheduler(datastore scheduling.Datastore) *scheduling.Scheduler { +func NewReqHeaderBasedScheduler() *scheduling.Scheduler { predicatableSchedulerProfile := framework.NewSchedulerProfile(). WithFilters(filter.NewHeaderBasedTestingFilter()). WithPicker(picker.NewMaxScorePicker()) - return scheduling.NewSchedulerWithConfig(datastore, scheduling.NewSchedulerConfig( + return scheduling.NewSchedulerWithConfig(scheduling.NewSchedulerConfig( profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"req-header-based-profile": predicatableSchedulerProfile})) } diff --git a/conformance/testing-epp/sheduler_test.go b/conformance/testing-epp/sheduler_test.go index 0fb24cf8f..4901e0380 100644 --- a/conformance/testing-epp/sheduler_test.go +++ b/conformance/testing-epp/sheduler_test.go @@ -31,13 +31,13 @@ import ( func TestSchedule(t *testing.T) { tests := []struct { name string - input []*backendmetrics.FakePodMetrics + input []backendmetrics.PodMetrics req *types.LLMRequest wantRes *types.SchedulingResult err bool }{ { - name: "no pods in datastore and req header is set", + name: "no candidate pods and req header is set", req: &types.LLMRequest{ Headers: map[string]string{"test-epp-endpoint-selection": "random-endpoint"}, RequestId: uuid.NewString(), @@ -47,8 +47,8 @@ func TestSchedule(t *testing.T) { }, { name: "req header not set", - input: []*backendmetrics.FakePodMetrics{ - {Pod: &backend.Pod{Address: "random-endpoint"}}, + input: []backendmetrics.PodMetrics{ + &backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "random-endpoint"}}, }, req: &types.LLMRequest{ Headers: map[string]string{}, // Deliberately set an empty header. @@ -58,9 +58,9 @@ func TestSchedule(t *testing.T) { err: true, }, { - name: "no pods address in datastore matches req header address", - input: []*backendmetrics.FakePodMetrics{ - {Pod: &backend.Pod{Address: "nonmatched-endpoint"}}, + name: "no pods address from the candidate pods matches req header address", + input: []backendmetrics.PodMetrics{ + &backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "nonmatched-endpoint"}}, }, req: &types.LLMRequest{ Headers: map[string]string{"test-epp-endpoint-selection": "matched-endpoint"}, @@ -70,10 +70,10 @@ func TestSchedule(t *testing.T) { err: true, }, { - name: "one pod address in datastore matches req header address", - input: []*backendmetrics.FakePodMetrics{ - {Pod: &backend.Pod{Address: "nonmatched-endpoint"}}, - {Pod: &backend.Pod{Address: "matched-endpoint"}}, + name: "one pod address from the candidate pods matches req header address", + input: []backendmetrics.PodMetrics{ + &backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "nonmatched-endpoint"}}, + &backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "matched-endpoint"}}, }, req: &types.LLMRequest{ Headers: map[string]string{"test-epp-endpoint-selection": "matched-endpoint"}, @@ -99,8 +99,8 @@ func TestSchedule(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - scheduler := NewReqHeaderBasedScheduler(&fakeDataStore{pods: test.input}) - got, err := scheduler.Schedule(context.Background(), test.req) + scheduler := NewReqHeaderBasedScheduler() + got, err := scheduler.Schedule(context.Background(), test.req, types.ToSchedulerPodMetrics(test.input)) if test.err != (err != nil) { t.Errorf("Unexpected error, got %v, want %v", err, test.err) } @@ -111,15 +111,3 @@ func TestSchedule(t *testing.T) { }) } } - -type fakeDataStore struct { - pods []*backendmetrics.FakePodMetrics -} - -func (fds *fakeDataStore) PodGetAll() []backendmetrics.PodMetrics { - pm := make([]backendmetrics.PodMetrics, 0, len(fds.pods)) - for _, pod := range fds.pods { - pm = append(pm, pod) - } - return pm -} diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 720fc22ae..5bc5ca4cd 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -41,7 +41,7 @@ import ( // Scheduler defines the interface required by the Director for scheduling. type Scheduler interface { - Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result *schedulingtypes.SchedulingResult, err error) + Schedule(ctx context.Context, request *schedulingtypes.LLMRequest, candidatePods []schedulingtypes.Pod) (result *schedulingtypes.SchedulingResult, err error) } // SaturationDetector provides a signal indicating whether the backends are considered saturated. @@ -135,7 +135,11 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo } // --- 3. Call Scheduler --- - results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest) + // Snapshot pod metrics from the datastore to: + // 1. Reduce concurrent access to the datastore. + // 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles. + candidatePods := schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll()) + results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, candidatePods) if err != nil { return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()} } diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 00fdc4508..9f16ed117 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -58,7 +58,7 @@ type mockScheduler struct { scheduleErr error } -func (m *mockScheduler) Schedule(ctx context.Context, req *schedulingtypes.LLMRequest) (*schedulingtypes.SchedulingResult, error) { +func (m *mockScheduler) Schedule(_ context.Context, _ *schedulingtypes.LLMRequest, _ []schedulingtypes.Pod) (*schedulingtypes.SchedulingResult, error) { return m.scheduleResults, m.scheduleErr } diff --git a/pkg/epp/scheduling/framework/scheduler_profile.go b/pkg/epp/scheduling/framework/scheduler_profile.go index 4aa1d0b7f..f41a915f0 100644 --- a/pkg/epp/scheduling/framework/scheduler_profile.go +++ b/pkg/epp/scheduling/framework/scheduler_profile.go @@ -106,8 +106,8 @@ func (p *SchedulerProfile) AddPlugins(pluginObjects ...plugins.Plugin) error { // RunCycle runs a SchedulerProfile cycle. In other words, it invokes all the SchedulerProfile plugins in this // order - Filters, Scorers, Picker, PostCyclePlugins. After completing all, it returns the result. -func (p *SchedulerProfile) Run(ctx context.Context, request *types.LLMRequest, cycleState *types.CycleState, podsSnapshot []types.Pod) (*types.ProfileRunResult, error) { - pods := p.runFilterPlugins(ctx, request, cycleState, podsSnapshot) +func (p *SchedulerProfile) Run(ctx context.Context, request *types.LLMRequest, cycleState *types.CycleState, candidatePods []types.Pod) (*types.ProfileRunResult, error) { + pods := p.runFilterPlugins(ctx, request, cycleState, candidatePods) if len(pods) == 0 { return nil, errutil.Error{Code: errutil.Internal, Msg: "no pods available for the given request"} } diff --git a/pkg/epp/scheduling/scheduler.go b/pkg/epp/scheduling/scheduler.go index 965652927..b848b26dc 100644 --- a/pkg/epp/scheduling/scheduler.go +++ b/pkg/epp/scheduling/scheduler.go @@ -39,7 +39,7 @@ type Datastore interface { } // NewScheduler returns a new scheduler with default scheduler plugins configuration. -func NewScheduler(datastore Datastore) *Scheduler { +func NewScheduler() *Scheduler { // When the scheduler is initialized with NewScheduler function, thw below config will be used as default. // it's possible to call NewSchedulerWithConfig to pass a different scheduler config. // For build time plugins changes, it's recommended to call in main.go to NewSchedulerWithConfig. @@ -75,26 +75,24 @@ func NewScheduler(datastore Datastore) *Scheduler { profileHandler := profile.NewSingleProfileHandler() - return NewSchedulerWithConfig(datastore, NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile})) + return NewSchedulerWithConfig(NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile})) } // NewSchedulerWithConfig returns a new scheduler with the given scheduler plugins configuration. -func NewSchedulerWithConfig(datastore Datastore, config *SchedulerConfig) *Scheduler { +func NewSchedulerWithConfig(config *SchedulerConfig) *Scheduler { return &Scheduler{ - datastore: datastore, profileHandler: config.profileHandler, profiles: config.profiles, } } type Scheduler struct { - datastore Datastore profileHandler framework.ProfileHandler profiles map[string]*framework.SchedulerProfile } // Schedule finds the target pod based on metrics and the requested lora adapter. -func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest) (*types.SchedulingResult, error) { +func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest, candidatePods []types.Pod) (*types.SchedulingResult, error) { logger := log.FromContext(ctx).WithValues("request", request) loggerDebug := logger.V(logutil.DEBUG) @@ -103,12 +101,6 @@ func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest) (*t metrics.RecordSchedulerE2ELatency(time.Since(scheduleStart)) }() - // Snapshot pod metrics from the datastore to: - // 1. Reduce concurrent access to the datastore. - // 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles. - podsSnapshot := types.ToSchedulerPodMetrics(s.datastore.PodGetAll()) - loggerDebug.Info(fmt.Sprintf("Scheduling a request, Metrics: %+v", podsSnapshot)) - profileRunResults := map[string]*types.ProfileRunResult{} cycleState := types.NewCycleState() @@ -122,7 +114,7 @@ func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest) (*t for name, profile := range profiles { // run the selected profiles and collect results (current code runs all profiles) - profileRunResult, err := profile.Run(ctx, request, cycleState, podsSnapshot) + profileRunResult, err := profile.Run(ctx, request, cycleState, candidatePods) if err != nil { loggerDebug.Info("failed to run scheduler profile", "profile", name, "error", err.Error()) } diff --git a/pkg/epp/scheduling/scheduler_test.go b/pkg/epp/scheduling/scheduler_test.go index 3fcbcc640..720a6b4f5 100644 --- a/pkg/epp/scheduling/scheduler_test.go +++ b/pkg/epp/scheduling/scheduler_test.go @@ -33,17 +33,17 @@ func TestSchedule(t *testing.T) { tests := []struct { name string req *types.LLMRequest - input []*backendmetrics.FakePodMetrics + input []backendmetrics.PodMetrics wantRes *types.SchedulingResult err bool }{ { - name: "no pods in datastore", + name: "no candidate pods", req: &types.LLMRequest{ TargetModel: "any-model", RequestId: uuid.NewString(), }, - input: []*backendmetrics.FakePodMetrics{}, + input: []backendmetrics.PodMetrics{}, wantRes: nil, err: true, }, @@ -55,8 +55,8 @@ func TestSchedule(t *testing.T) { }, // pod2 will be picked because it has relatively low queue size, with the requested // model being active, and has low KV cache. - input: []*backendmetrics.FakePodMetrics{ - { + input: []backendmetrics.PodMetrics{ + &backendmetrics.FakePodMetrics{ Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, Metrics: &backendmetrics.MetricsState{ WaitingQueueSize: 0, @@ -68,7 +68,7 @@ func TestSchedule(t *testing.T) { }, }, }, - { + &backendmetrics.FakePodMetrics{ Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, Metrics: &backendmetrics.MetricsState{ WaitingQueueSize: 3, @@ -80,7 +80,7 @@ func TestSchedule(t *testing.T) { }, }, }, - { + &backendmetrics.FakePodMetrics{ Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}, Metrics: &backendmetrics.MetricsState{ WaitingQueueSize: 10, @@ -119,8 +119,8 @@ func TestSchedule(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - scheduler := NewScheduler(&fakeDataStore{pods: test.input}) - got, err := scheduler.Schedule(context.Background(), test.req) + scheduler := NewScheduler() + got, err := scheduler.Schedule(context.Background(), test.req, types.ToSchedulerPodMetrics(test.input)) if test.err != (err != nil) { t.Errorf("Unexpected error, got %v, want %v", err, test.err) } @@ -131,15 +131,3 @@ func TestSchedule(t *testing.T) { }) } } - -type fakeDataStore struct { - pods []*backendmetrics.FakePodMetrics -} - -func (fds *fakeDataStore) PodGetAll() []backendmetrics.PodMetrics { - pm := make([]backendmetrics.PodMetrics, 0, len(fds.pods)) - for _, pod := range fds.pods { - pm = append(pm, pod) - } - return pm -} diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 5f473b4ba..db6252935 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -925,7 +925,7 @@ func BeforeSuite() func() { // Adjust from defaults serverRunner.PoolNamespacedName = types.NamespacedName{Name: testPoolName, Namespace: testNamespace} serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf) - scheduler := scheduling.NewScheduler(serverRunner.Datastore) + scheduler := scheduling.NewScheduler() sdConfig := &saturationdetector.Config{ QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold,