Skip to content

Commit 57945b3

Browse files
committed
complete coding. should fix tests
Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent b08d15d commit 57945b3

File tree

9 files changed

+42
-30
lines changed

9 files changed

+42
-30
lines changed

cmd/epp/runner/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func (r *Runner) initializeScheduler() (*scheduling.Scheduler, error) {
306306
schedulerProfile := framework.NewSchedulerProfile().
307307
WithScorers(framework.NewWeightedScorer(scorer.NewQueueScorer(), queueScorerWeight),
308308
framework.NewWeightedScorer(scorer.NewKVCacheScorer(), kvCacheScorerWeight)).
309-
WithPicker(picker.NewMaxScorePicker())
309+
WithPicker(picker.NewMaxScorePicker(1))
310310

311311
if prefixCacheScheduling {
312312
prefixScorerWeight := envutil.GetEnvInt("PREFIX_CACHE_SCORE_WEIGHT", prefix.DefaultScorerWeight, setupLog)

conformance/testing-epp/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
func NewReqHeaderBasedScheduler() *scheduling.Scheduler {
3131
predicatableSchedulerProfile := framework.NewSchedulerProfile().
3232
WithFilters(filter.NewHeaderBasedTestingFilter()).
33-
WithPicker(picker.NewMaxScorePicker())
33+
WithPicker(picker.NewMaxScorePicker(1))
3434

3535
return scheduling.NewSchedulerWithConfig(scheduling.NewSchedulerConfig(
3636
profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"req-header-based-profile": predicatableSchedulerProfile}))

conformance/testing-epp/sheduler_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,13 @@ func TestSchedule(t *testing.T) {
8282
wantRes: &types.SchedulingResult{
8383
ProfileResults: map[string]*types.ProfileRunResult{
8484
"req-header-based-profile": {
85-
TargetPod: &types.ScoredPod{
86-
Pod: &types.PodMetrics{
87-
Pod: &backend.Pod{
88-
Address: "matched-endpoint",
89-
Labels: map[string]string{},
85+
TargetPods: []types.Pod{
86+
&types.ScoredPod{
87+
Pod: &types.PodMetrics{
88+
Pod: &backend.Pod{
89+
Address: "matched-endpoint",
90+
Labels: map[string]string{},
91+
},
9092
},
9193
},
9294
},

pkg/epp/handlers/request.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (s *StreamingServer) HandleRequestHeaders(ctx context.Context, reqCtx *Requ
4444
if err != nil {
4545
return err
4646
}
47-
reqCtx.TargetEndpoint = pod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))
47+
reqCtx.TargetDestination = pod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))
4848
reqCtx.RequestSize = 0
4949
reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(reqCtx)
5050
return nil
@@ -91,7 +91,7 @@ func (s *StreamingServer) generateRequestHeaderResponse(reqCtx *RequestContext)
9191
},
9292
},
9393
},
94-
DynamicMetadata: s.generateMetadata(reqCtx.TargetEndpoint),
94+
DynamicMetadata: s.generateMetadata(reqCtx.TargetDestination),
9595
}
9696
}
9797

@@ -101,7 +101,7 @@ func (s *StreamingServer) generateHeaders(reqCtx *RequestContext) []*configPb.He
101101
{
102102
Header: &configPb.HeaderValue{
103103
Key: s.destinationEndpointHintKey,
104-
RawValue: []byte(reqCtx.TargetEndpoint),
104+
RawValue: []byte(reqCtx.TargetDestination),
105105
},
106106
},
107107
}

pkg/epp/handlers/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ type StreamingServer struct {
8181
// We should split these apart as this monolithic object exposes too much data to too many layers.
8282
type RequestContext struct {
8383
TargetPod *backend.Pod
84-
TargetEndpoint string
84+
TargetDestination string
8585
Model string
8686
ResolvedTargetModel string
8787
RequestReceivedTimestamp time.Time

pkg/epp/requestcontrol/director.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"math/rand"
2525
"net"
2626
"strconv"
27+
"strings"
2728
"time"
2829

2930
"github.com/go-logr/logr"
@@ -184,19 +185,26 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC
184185
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "results must be greater than zero"}
185186
}
186187
// primary profile is used to set destination
187-
targetPod := result.ProfileResults[result.PrimaryProfileName].TargetPod.GetPod()
188+
targetPods := result.ProfileResults[result.PrimaryProfileName].TargetPods
188189

189190
pool, err := d.datastore.PoolGet()
190191
if err != nil {
191192
return reqCtx, err
192193
}
193194
targetPort := int(pool.Spec.TargetPortNumber)
194195

195-
endpoint := net.JoinHostPort(targetPod.Address, strconv.Itoa(targetPort))
196-
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "endpoint", targetPod)
196+
endpoints := make([]string, len(targetPods))
197+
for i, targetPod := range targetPods {
198+
endpoints[i] = net.JoinHostPort(targetPod.GetPod().Address, strconv.Itoa(targetPort))
199+
}
200+
201+
// Join with comma separator
202+
targetDestination := strings.Join(endpoints, ",")
203+
204+
logger.V(logutil.DEFAULT).Info("Request handled", "model", reqCtx.Model, "targetModel", reqCtx.ResolvedTargetModel, "destination", targetDestination)
197205

198-
reqCtx.TargetPod = targetPod
199-
reqCtx.TargetEndpoint = endpoint
206+
reqCtx.TargetPod = targetPods[0].GetPod() // TODO this is optimistic assumption. should update TargetPod only after we know which pod served the request (and before PostResponse)
207+
reqCtx.TargetDestination = targetDestination
200208

201209
d.runPreRequestPlugins(ctx, reqCtx.SchedulingRequest, result, targetPort)
202210

pkg/epp/requestcontrol/director_test.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,13 @@ func TestDirector_HandleRequest(t *testing.T) {
129129
defaultSuccessfulScheduleResults := &schedulingtypes.SchedulingResult{
130130
ProfileResults: map[string]*schedulingtypes.ProfileRunResult{
131131
"testProfile": {
132-
TargetPod: &schedulingtypes.ScoredPod{
133-
Pod: &schedulingtypes.PodMetrics{
134-
Pod: &backend.Pod{
135-
Address: "192.168.1.100",
136-
NamespacedName: k8stypes.NamespacedName{Name: "pod1", Namespace: "default"},
132+
TargetPods: []schedulingtypes.Pod{
133+
&schedulingtypes.ScoredPod{
134+
Pod: &schedulingtypes.PodMetrics{
135+
Pod: &backend.Pod{
136+
Address: "192.168.1.100",
137+
NamespacedName: k8stypes.NamespacedName{Name: "pod1", Namespace: "default"},
138+
},
137139
},
138140
},
139141
},
@@ -168,7 +170,7 @@ func TestDirector_HandleRequest(t *testing.T) {
168170
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
169171
Address: "192.168.1.100",
170172
},
171-
TargetEndpoint: "192.168.1.100:8000",
173+
TargetDestination: "192.168.1.100:8000",
172174
},
173175
wantMutatedBodyModel: model,
174176
},
@@ -193,7 +195,7 @@ func TestDirector_HandleRequest(t *testing.T) {
193195
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
194196
Address: "192.168.1.100",
195197
},
196-
TargetEndpoint: "192.168.1.100:8000",
198+
TargetDestination: "192.168.1.100:8000",
197199
},
198200
wantMutatedBodyModel: model,
199201
},
@@ -222,7 +224,7 @@ func TestDirector_HandleRequest(t *testing.T) {
222224
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
223225
Address: "192.168.1.100",
224226
},
225-
TargetEndpoint: "192.168.1.100:8000",
227+
TargetDestination: "192.168.1.100:8000",
226228
},
227229
wantMutatedBodyModel: model,
228230
},
@@ -243,7 +245,7 @@ func TestDirector_HandleRequest(t *testing.T) {
243245
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
244246
Address: "192.168.1.100",
245247
},
246-
TargetEndpoint: "192.168.1.100:8000",
248+
TargetDestination: "192.168.1.100:8000",
247249
},
248250
wantMutatedBodyModel: modelSheddable,
249251
},
@@ -264,7 +266,7 @@ func TestDirector_HandleRequest(t *testing.T) {
264266
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
265267
Address: "192.168.1.100",
266268
},
267-
TargetEndpoint: "192.168.1.100:8000",
269+
TargetDestination: "192.168.1.100:8000",
268270
},
269271
wantMutatedBodyModel: "resolved-target-model-A",
270272
},
@@ -280,7 +282,7 @@ func TestDirector_HandleRequest(t *testing.T) {
280282
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
281283
Address: "192.168.1.100",
282284
},
283-
TargetEndpoint: "192.168.1.100:8000",
285+
TargetDestination: "192.168.1.100:8000",
284286
},
285287
wantMutatedBodyModel: "food-review-1",
286288
reqBodyMap: map[string]interface{}{
@@ -384,7 +386,7 @@ func TestDirector_HandleRequest(t *testing.T) {
384386
assert.Equal(t, test.wantReqCtx.ResolvedTargetModel, returnedReqCtx.ResolvedTargetModel,
385387
"reqCtx.ResolvedTargetModel mismatch")
386388
assert.Equal(t, test.wantReqCtx.TargetPod, returnedReqCtx.TargetPod, "reqCtx.TargetPod mismatch")
387-
assert.Equal(t, test.wantReqCtx.TargetEndpoint, returnedReqCtx.TargetEndpoint, "reqCtx.TargetEndpoint mismatch")
389+
assert.Equal(t, test.wantReqCtx.TargetDestination, returnedReqCtx.TargetDestination, "reqCtx.TargetEndpoint mismatch")
388390
}
389391

390392
if test.wantMutatedBodyModel != "" {

pkg/epp/scheduling/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func NewScheduler() *Scheduler {
7171

7272
defaultProfile := framework.NewSchedulerProfile().
7373
WithFilters(lowLatencyFilter).
74-
WithPicker(&picker.RandomPicker{})
74+
WithPicker(picker.NewRandomPicker(1))
7575

7676
profileHandler := profile.NewSingleProfileHandler()
7777

pkg/epp/server/server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func (ts *testDirector) HandleRequest(ctx context.Context, reqCtx *handlers.Requ
179179
ts.requestHeaders = reqCtx.Request.Headers
180180

181181
reqCtx.Request.Body["model"] = "v1"
182-
reqCtx.TargetEndpoint = fmt.Sprintf("%s:%d", podAddress, poolPort)
182+
reqCtx.TargetDestination = fmt.Sprintf("%s:%d", podAddress, poolPort)
183183
return reqCtx, nil
184184
}
185185

0 commit comments

Comments
 (0)