Skip to content

Commit 45fd751

Browse files
committed
address code review comments
Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent 4475ac6 commit 45fd751

File tree

6 files changed

+50
-24
lines changed

6 files changed

+50
-24
lines changed

pkg/epp/handlers/server.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
308308
// Handle the err and fire an immediate response.
309309
if err != nil {
310310
logger.V(logutil.DEFAULT).Error(err, "Failed to process request", "request", req)
311-
resp, err := BuildErrResponse(err)
311+
resp, err := buildErrResponse(err)
312312
if err != nil {
313313
return err
314314
}
@@ -389,7 +389,7 @@ func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProces
389389
return nil
390390
}
391391

392-
func BuildErrResponse(err error) (*extProcPb.ProcessingResponse, error) {
392+
func buildErrResponse(err error) (*extProcPb.ProcessingResponse, error) {
393393
var resp *extProcPb.ProcessingResponse
394394

395395
switch errutil.CanonicalCode(err) {
@@ -416,6 +416,17 @@ func BuildErrResponse(err error) (*extProcPb.ProcessingResponse, error) {
416416
},
417417
},
418418
}
419+
// This code can be returned by the director when there are no candidate pods for the request scheduling.
420+
case errutil.ServiceUnavailable:
421+
resp = &extProcPb.ProcessingResponse{
422+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
423+
ImmediateResponse: &extProcPb.ImmediateResponse{
424+
Status: &envoyTypePb.HttpStatus{
425+
Code: envoyTypePb.StatusCode_ServiceUnavailable,
426+
},
427+
},
428+
},
429+
}
419430
// This code can be returned when users provide invalid json request.
420431
case errutil.BadRequest:
421432
resp = &extProcPb.ProcessingResponse{

pkg/epp/requestcontrol/director.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,9 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
143143
}
144144

145145
// --- 3. Call Scheduler (with the relevant candidate pods) ---
146-
candidatePods, err := d.getCandidatePodsForScheduling(reqCtx.Request.Metadata)
147-
if err != nil {
148-
return reqCtx, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Errorf("failed to find candidate pods: %w", err).Error()}
146+
candidatePods := d.getCandidatePodsForScheduling(ctx, reqCtx.Request.Metadata)
147+
if len(candidatePods) == 0 {
148+
return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"}
149149
}
150150
results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, candidatePods)
151151
if err != nil {
@@ -190,18 +190,21 @@ func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2
190190
// Snapshot pod metrics from the datastore to:
191191
// 1. Reduce concurrent access to the datastore.
192192
// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
193-
func (d *Director) getCandidatePodsForScheduling(requestMetadata map[string]any) ([]schedulingtypes.Pod, error) {
193+
func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []schedulingtypes.Pod {
194+
loggerTrace := log.FromContext(ctx).V(logutil.TRACE)
195+
194196
subsetMap, found := requestMetadata[subsetHintNamespace].(map[string]any)
195197
if !found {
196-
return schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll()), nil
198+
return schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll())
197199
}
198200

199201
// Check if endpoint key is present in the subset map and ensure there is at least one value
200202
endpointSubsetList, found := subsetMap[subsetHintKey].([]any)
201203
if !found {
202-
return schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll()), nil
204+
return schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll())
203205
} else if len(endpointSubsetList) == 0 {
204-
return nil, fmt.Errorf("'%s' metadata cannot be empty", subsetHintKey)
206+
loggerTrace.Info("found empty subset filter in request metadata, filtering all pods")
207+
return []schedulingtypes.Pod{}
205208
}
206209

207210
// Create a map of endpoint addresses for easy lookup
@@ -213,14 +216,18 @@ func (d *Director) getCandidatePodsForScheduling(requestMetadata map[string]any)
213216
endpoints[epStr] = true
214217
}
215218

219+
podTotalCount := 0
216220
podFitleredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool {
221+
podTotalCount++
217222
if _, found := endpoints[pm.GetPod().Address]; found {
218223
return true
219224
}
220225
return false
221226
})
222227

223-
return schedulingtypes.ToSchedulerPodMetrics(podFitleredList), nil
228+
loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFitleredList))
229+
230+
return schedulingtypes.ToSchedulerPodMetrics(podFitleredList)
224231
}
225232

226233
// prepareRequest populates the RequestContext and calls the registered PreRequest plugins

pkg/epp/requestcontrol/director_test.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,6 @@ func TestGetCandidatePodsForScheduling(t *testing.T) {
442442
name string
443443
metadata map[string]any
444444
output []schedulingtypes.Pod
445-
wantErr bool
446445
}{
447446
{
448447
name: "SubsetFilter, filter not present — return all pods",
@@ -475,8 +474,7 @@ func TestGetCandidatePodsForScheduling(t *testing.T) {
475474
{
476475
name: "SubsetFilter, filter present with empty list — return error",
477476
metadata: makeFilterMetadata([]any{}),
478-
output: nil,
479-
wantErr: true,
477+
output: []schedulingtypes.Pod{},
480478
},
481479
{
482480
name: "SubsetFilter, subset with one matching pod",
@@ -519,15 +517,7 @@ func TestGetCandidatePodsForScheduling(t *testing.T) {
519517
t.Run(test.name, func(t *testing.T) {
520518
director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockSaturationDetector{}, NewConfig())
521519

522-
got, err := director.getCandidatePodsForScheduling(test.metadata)
523-
524-
if test.wantErr && err == nil {
525-
t.Fatalf("expected an error, but didn't receive")
526-
}
527-
528-
if err != nil && !test.wantErr {
529-
t.Fatalf("Unexpected error, got %v", err)
530-
}
520+
got := director.getCandidatePodsForScheduling(context.Background(), test.metadata)
531521

532522
diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b schedulingtypes.Pod) bool {
533523
return a.GetPod().NamespacedName.String() < b.GetPod().NamespacedName.String()

pkg/epp/util/error/error.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
Unknown = "Unknown"
3131
BadRequest = "BadRequest"
3232
Internal = "Internal"
33+
ServiceUnavailable = "ServiceUnavailable"
3334
ModelServerError = "ModelServerError"
3435
BadConfiguration = "BadConfiguration"
3536
InferencePoolResourceExhausted = "InferencePoolResourceExhausted"

pkg/epp/util/error/error_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ func TestError_Error(t *testing.T) {
4343
},
4444
want: "inference gateway: Internal - unexpected condition",
4545
},
46+
{
47+
name: "ServiceUnavailable error",
48+
err: Error{
49+
Code: ServiceUnavailable,
50+
Msg: "service unavailable",
51+
},
52+
want: "inference gateway: ServiceUnavailable - service unavailable",
53+
},
4654
{
4755
name: "ModelServerError",
4856
err: Error{
@@ -124,6 +132,14 @@ func TestCanonicalCode(t *testing.T) {
124132
},
125133
want: Internal,
126134
},
135+
{
136+
name: "Error type with ServiceUnavailable code",
137+
err: Error{
138+
Code: ServiceUnavailable,
139+
Msg: "Service unavailable error",
140+
},
141+
want: ServiceUnavailable,
142+
},
127143
{
128144
name: "Error type with ModelServerError code",
129145
err: Error{
@@ -205,6 +221,7 @@ func TestErrorConstants(t *testing.T) {
205221
Unknown: "Unknown",
206222
BadRequest: "BadRequest",
207223
Internal: "Internal",
224+
ServiceUnavailable: "ServiceUnavailable",
208225
ModelServerError: "ModelServerError",
209226
BadConfiguration: "BadConfiguration",
210227
InferencePoolResourceExhausted: "InferencePoolResourceExhausted",

test/integration/epp/hermetic_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -821,9 +821,9 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
821821
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
822822
ImmediateResponse: &extProcPb.ImmediateResponse{
823823
Status: &envoyTypePb.HttpStatus{
824-
Code: envoyTypePb.StatusCode_TooManyRequests,
824+
Code: envoyTypePb.StatusCode_ServiceUnavailable,
825825
},
826-
Body: []byte("inference gateway: InferencePoolResourceExhausted - failed to find target pod: failed to run scheduler profile 'default'"),
826+
Body: []byte("inference gateway: ServiceUnavailable - failed to find candidate pods for serving the request"),
827827
},
828828
},
829829
},

0 commit comments

Comments
 (0)