diff --git a/pkg/activator/net/lb_policy.go b/pkg/activator/net/lb_policy.go index c1d75cd0409f..d5c810ce5ad8 100644 --- a/pkg/activator/net/lb_policy.go +++ b/pkg/activator/net/lb_policy.go @@ -21,6 +21,7 @@ package net import ( "context" "math/rand" + "sync" ) // lbPolicy is a functor that selects a target pod from the list, or (noop, nil) if @@ -84,3 +85,32 @@ func firstAvailableLBPolicy(ctx context.Context, targets []*podTracker) (func(), } return noop, nil } + +func newRoundRobinPolicy() lbPolicy { + var ( + mu sync.Mutex + idx int + ) + return func(ctx context.Context, targets []*podTracker) (func(), *podTracker) { + mu.Lock() + defer mu.Unlock() + // The number of trackers might have shrunk, so reset to 0. + l := len(targets) + if idx >= l { + idx = 0 + } + + // Now for |targets| elements and check every next one in + // round robin fashion. + for i := 0; i < l; i++ { + p := (idx + i) % l + if cb, ok := targets[p].Reserve(ctx); ok { + // We want to start with the next index. + idx = p + 1 + return cb, targets[p] + } + } + // We exhausted all the options... + return noop, nil + } +} diff --git a/pkg/activator/net/lb_policy_test.go b/pkg/activator/net/lb_policy_test.go index 9fd467e55f29..95e53a24bade 100644 --- a/pkg/activator/net/lb_policy_test.go +++ b/pkg/activator/net/lb_policy_test.go @@ -158,6 +158,82 @@ func TestFirstAvailable(t *testing.T) { }) } +func TestRoundRobin(t *testing.T) { + t.Run("with cc=1", func(t *testing.T) { + rrp := newRoundRobinPolicy() + podTrackers := makeTrackers(3, 1) + cb, pt := rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[0]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + cb, pt = rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[1]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + // This will make it use shorter array, jump over and start from 0. + // But it's occupied already, so will fail to acquire. + _, pt = rrp(context.Background(), podTrackers[:1]) + if pt != nil { + t.Fatal("Wanted nil, got: ", pt) + } + + cb, pt = rrp(context.Background(), podTrackers) + if got, want := pt, podTrackers[2]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + _, pt = rrp(context.Background(), podTrackers) + if pt != nil { + t.Fatal("Wanted nil, got: ", pt) + } + + // Reset last one. + cb() + cb, pt = rrp(context.Background(), podTrackers) + if got, want := pt, podTrackers[2]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + t.Cleanup(cb) + }) + t.Run("with cc=2", func(t *testing.T) { + rrp := newRoundRobinPolicy() + podTrackers := makeTrackers(3, 2) + cb, pt := rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[0]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + cb, pt = rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[1]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + // This will make it use shorter array, jump over and start from 0. + cb, pt = rrp(context.Background(), podTrackers[:1]) + t.Cleanup(cb) + if got, want := pt, podTrackers[0]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + cb, pt = rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[1]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + cb, pt = rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[2]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + // Now index 0 has already 2 slots occupied, so is 1, so we should get 2 again. + cb, pt = rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[2]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + }) +} + func BenchmarkPolicy(b *testing.B) { for _, test := range []struct { name string @@ -171,6 +247,9 @@ func BenchmarkPolicy(b *testing.B) { }, { name: "first-available", policy: firstAvailableLBPolicy, + }, { + name: "round-robin", + policy: newRoundRobinPolicy(), }} { for _, n := range []int{1, 2, 3, 10, 100} { b.Run(fmt.Sprintf("%s-%d-trackers-sequential", test.name, n), func(b *testing.B) { diff --git a/pkg/activator/net/throttler.go b/pkg/activator/net/throttler.go index d086d05c1d91..669daddf21a3 100644 --- a/pkg/activator/net/throttler.go +++ b/pkg/activator/net/throttler.go @@ -163,12 +163,18 @@ func newRevisionThrottler(revID types.NamespacedName, revBreaker breaker lbp lbPolicy ) - if containerConcurrency == 0 { + switch { + case containerConcurrency == 0: revBreaker = newInfiniteBreaker(logger) lbp = randomChoice2Policy - } else { + case containerConcurrency <= 3: + // For very low CC values use first available pod. revBreaker = queue.NewBreaker(breakerParams) lbp = firstAvailableLBPolicy + default: + // Otherwise RR. + revBreaker = queue.NewBreaker(breakerParams) + lbp = newRoundRobinPolicy() } return &revisionThrottler{ revID: revID,