Skip to content

Commit 58cd3c4

Browse files
committed
implement multiple destination as the output of the scheduler
Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent 3322cf8 commit 58cd3c4

File tree

16 files changed

+161
-79
lines changed

16 files changed

+161
-79
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ func (r *Runner) initializeScheduler() (*scheduling.Scheduler, error) {
293293
schedulerProfile := framework.NewSchedulerProfile().
294294
WithScorers(framework.NewWeightedScorer(scorer.NewQueueScorer(), queueScorerWeight),
295295
framework.NewWeightedScorer(scorer.NewKVCacheScorer(), kvCacheScorerWeight)).
296-
WithPicker(picker.NewMaxScorePicker())
296+
WithPicker(picker.NewMaxScorePicker(picker.DefaultMaxNumOfEndpoints))
297297

298298
if prefixCacheScheduling {
299299
prefixScorerWeight := envutil.GetEnvInt("PREFIX_CACHE_SCORE_WEIGHT", prefix.DefaultScorerWeight, setupLog)

conformance/testing-epp/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
func NewReqHeaderBasedScheduler() *scheduling.Scheduler {
3131
predicatableSchedulerProfile := framework.NewSchedulerProfile().
3232
WithFilters(filter.NewHeaderBasedTestingFilter()).
33-
WithPicker(picker.NewMaxScorePicker())
33+
WithPicker(picker.NewMaxScorePicker(picker.DefaultMaxNumOfEndpoints))
3434

3535
return scheduling.NewSchedulerWithConfig(scheduling.NewSchedulerConfig(
3636
profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"req-header-based-profile": predicatableSchedulerProfile}))

conformance/testing-epp/scheduler_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,13 @@ func TestSchedule(t *testing.T) {
8282
wantRes: &types.SchedulingResult{
8383
ProfileResults: map[string]*types.ProfileRunResult{
8484
"req-header-based-profile": {
85-
TargetPod: &types.ScoredPod{
86-
Pod: &types.PodMetrics{
87-
Pod: &backend.Pod{
88-
Address: "matched-endpoint",
89-
Labels: map[string]string{},
85+
TargetPods: []types.Pod{
86+
&types.ScoredPod{
87+
Pod: &types.PodMetrics{
88+
Pod: &backend.Pod{
89+
Address: "matched-endpoint",
90+
Labels: map[string]string{},
91+
},
9092
},
9193
},
9294
},

pkg/epp/requestcontrol/director.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,8 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC
238238
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "results must be greater than zero"}
239239
}
240240
// primary profile is used to set destination
241-
targetPod := result.ProfileResults[result.PrimaryProfileName].TargetPod.GetPod()
241+
// TODO should use multiple destinations according to epp protocol. current code assumes a single target
242+
targetPod := result.ProfileResults[result.PrimaryProfileName].TargetPods[0].GetPod()
242243

243244
pool, err := d.datastore.PoolGet()
244245
if err != nil {

pkg/epp/requestcontrol/director_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,13 @@ func TestDirector_HandleRequest(t *testing.T) {
130130
defaultSuccessfulScheduleResults := &schedulingtypes.SchedulingResult{
131131
ProfileResults: map[string]*schedulingtypes.ProfileRunResult{
132132
"testProfile": {
133-
TargetPod: &schedulingtypes.ScoredPod{
134-
Pod: &schedulingtypes.PodMetrics{
135-
Pod: &backend.Pod{
136-
Address: "192.168.1.100",
137-
NamespacedName: k8stypes.NamespacedName{Name: "pod1", Namespace: "default"},
133+
TargetPods: []schedulingtypes.Pod{
134+
&schedulingtypes.ScoredPod{
135+
Pod: &schedulingtypes.PodMetrics{
136+
Pod: &backend.Pod{
137+
Address: "192.168.1.100",
138+
NamespacedName: k8stypes.NamespacedName{Name: "pod1", Namespace: "default"},
139+
},
138140
},
139141
},
140142
},

pkg/epp/scheduling/framework/plugins/filter/lora_affinity_filter.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,12 @@ var _ framework.Filter = &LoraAffinityFilter{}
4343
// LoraAffinityFilterFactory defines the factory function for LoraAffinityFilter.
4444
func LoraAffinityFilterFactory(name string, rawParameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
4545
parameters := loraAffinityFilterParameters{Threshold: config.DefaultLoraAffinityThreshold}
46-
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
47-
return nil, fmt.Errorf("failed to parse the parameters of the '%s' filter - %w", LoraAffinityFilterType, err)
46+
if rawParameters != nil {
47+
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
48+
return nil, fmt.Errorf("failed to parse the parameters of the '%s' filter - %w", LoraAffinityFilterType, err)
49+
}
4850
}
51+
4952
return NewLoraAffinityFilter(parameters.Threshold).WithName(name), nil
5053
}
5154

pkg/epp/scheduling/framework/plugins/filter/low_queue_filter.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ var _ framework.Filter = &LowQueueFilter{}
4242
// LowQueueFilterFactory defines the factory function for LowQueueFilter.
4343
func LowQueueFilterFactory(name string, rawParameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
4444
parameters := lowQueueFilterParameters{Threshold: config.DefaultQueueingThresholdLoRA}
45-
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
46-
return nil, fmt.Errorf("failed to parse the parameters of the '%s' filter - %w", LowQueueFilterType, err)
45+
if rawParameters != nil {
46+
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
47+
return nil, fmt.Errorf("failed to parse the parameters of the '%s' filter - %w", LowQueueFilterType, err)
48+
}
4749
}
4850

4951
return NewLowQueueFilter(parameters.Threshold).WithName(name), nil

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,10 @@ func PrefixCachePluginFactory(name string, rawParameters json.RawMessage, _ plug
125125
MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks,
126126
LRUCapacityPerServer: DefaultLRUCapacityPerServer,
127127
}
128-
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
129-
return nil, fmt.Errorf("failed to parse the parameters of the %s plugin. Error: %s", PrefixCachePluginType, err)
128+
if rawParameters != nil {
129+
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
130+
return nil, fmt.Errorf("failed to parse the parameters of the %s plugin. Error: %s", PrefixCachePluginType, err)
131+
}
130132
}
131133

132134
return New(parameters).WithName(name), nil
@@ -198,7 +200,7 @@ func (m *Plugin) Score(ctx context.Context, cycleState *types.CycleState, reques
198200

199201
// PostCycle records in the plugin cache the result of the scheduling selection.
200202
func (m *Plugin) PostCycle(ctx context.Context, cycleState *types.CycleState, res *types.ProfileRunResult) {
201-
targetPod := res.TargetPod.GetPod()
203+
targetPod := res.TargetPods[0].GetPod()
202204
state, err := types.ReadCycleStateKey[*SchedulingContextState](cycleState, PrefixCachePluginType)
203205
if err != nil {
204206
log.FromContext(ctx).Error(err, "failed to read prefix plugin cycle state")

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func TestPrefixPlugin(t *testing.T) {
6161
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
6262

6363
// Simulate pod1 was picked.
64-
plugin.PostCycle(context.Background(), cycleState1, &types.ProfileRunResult{TargetPod: pod1})
64+
plugin.PostCycle(context.Background(), cycleState1, &types.ProfileRunResult{TargetPods: []types.Pod{pod1}})
6565

6666
// Second request doesn't share any prefix with first one. It should be added to the cache but
6767
// the pod score should be 0.
@@ -82,7 +82,7 @@ func TestPrefixPlugin(t *testing.T) {
8282
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
8383

8484
// Simulate pod2 was picked.
85-
plugin.PostCycle(context.Background(), cycleState2, &types.ProfileRunResult{TargetPod: pod2})
85+
plugin.PostCycle(context.Background(), cycleState2, &types.ProfileRunResult{TargetPods: []types.Pod{pod2}})
8686

8787
// Third request shares partial prefix with first one.
8888
req3 := &types.LLMRequest{
@@ -101,7 +101,7 @@ func TestPrefixPlugin(t *testing.T) {
101101
assert.Equal(t, float64(2)/float64(3), scores[pod1], "score should be 2/3 - the model and the first prefix block match")
102102
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
103103

104-
plugin.PostCycle(context.Background(), cycleState3, &types.ProfileRunResult{TargetPod: pod1})
104+
plugin.PostCycle(context.Background(), cycleState3, &types.ProfileRunResult{TargetPods: []types.Pod{pod1}})
105105

106106
// 4th request is same as req3 except the model is different, still no match.
107107
req4 := &types.LLMRequest{
@@ -120,7 +120,7 @@ func TestPrefixPlugin(t *testing.T) {
120120
assert.Equal(t, float64(0), scores[pod1], "score for pod1")
121121
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
122122

123-
plugin.PostCycle(context.Background(), cycleState4, &types.ProfileRunResult{TargetPod: pod1})
123+
plugin.PostCycle(context.Background(), cycleState4, &types.ProfileRunResult{TargetPods: []types.Pod{pod1}})
124124

125125
// 5th request shares partial prefix with 3rd one.
126126
req5 := &types.LLMRequest{
@@ -139,7 +139,7 @@ func TestPrefixPlugin(t *testing.T) {
139139
assert.Equal(t, 0.75, scores[pod1], "score should be 0.75 - the model and the first 2 prefix blocks match")
140140
assert.Equal(t, float64(0), scores[pod2], "score for pod2")
141141

142-
plugin.PostCycle(context.Background(), cycleState5, &types.ProfileRunResult{TargetPod: pod1})
142+
plugin.PostCycle(context.Background(), cycleState5, &types.ProfileRunResult{TargetPods: []types.Pod{pod1}})
143143
}
144144

145145
// TestPrefixPluginStress is a stress test for the prefix scoring plugin, using prompts of increasing length.
@@ -180,7 +180,7 @@ func BenchmarkPrefixPluginStress(b *testing.B) {
180180
// First cycle: simulate scheduling and insert prefix info into the cache
181181
cycleState := types.NewCycleState()
182182
plugin.Score(context.Background(), cycleState, req, pods)
183-
plugin.PostCycle(context.Background(), cycleState, &types.ProfileRunResult{TargetPod: pod})
183+
plugin.PostCycle(context.Background(), cycleState, &types.ProfileRunResult{TargetPods: []types.Pod{pod}})
184184

185185
// Second cycle: validate internal state
186186
state, err := types.ReadCycleStateKey[*SchedulingContextState](cycleState, PrefixCachePluginType)
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package picker
18+
19+
const (
20+
DefaultMaxNumOfEndpoints = 1 // common default to all pickers
21+
)
22+
23+
// pickerParameters defines the common parameters for all pickers
24+
type pickerParameters struct {
25+
MaxNumOfEndpoints int `json:"maxNumOfEndpoints"`
26+
}

0 commit comments

Comments
 (0)