From 2c8b95bdefd25b66a54856d9ff29e2cdf3000b56 Mon Sep 17 00:00:00 2001 From: Victor Agababov Date: Mon, 8 Jun 2020 23:08:21 -0700 Subject: [PATCH 1/2] Add round robin policy for the cc > 3 Add a round robin policy for the CC > 3. This is the simply one, i.e not the least loaded. I can iterate over that, but the preliminary results look quite good as is. --- pkg/activator/net/lb_policy.go | 32 ++++++++++++++++++++++++++ pkg/activator/net/lb_policy_test.go | 35 +++++++++++++++++++++++++++++ pkg/activator/net/throttler.go | 10 +++++++-- 3 files changed, 75 insertions(+), 2 deletions(-) diff --git a/pkg/activator/net/lb_policy.go b/pkg/activator/net/lb_policy.go index c1d75cd0409f..740f7f0f0f67 100644 --- a/pkg/activator/net/lb_policy.go +++ b/pkg/activator/net/lb_policy.go @@ -21,6 +21,7 @@ package net import ( "context" "math/rand" + "sync" ) // lbPolicy is a functor that selects a target pod from the list, or (noop, nil) if @@ -84,3 +85,34 @@ func firstAvailableLBPolicy(ctx context.Context, targets []*podTracker) (func(), } return noop, nil } + +type roundRobinPolicyT struct { + mu sync.Mutex + idx int +} + +func newRoundRobinPolicy() lbPolicy { + rrp := roundRobinPolicyT{} + return func(ctx context.Context, targets []*podTracker) (func(), *podTracker) { + rrp.mu.Lock() + defer rrp.mu.Unlock() + // The number of trackers might have shrunk, so reset to 0. + l := len(targets) + if rrp.idx >= l { + rrp.idx = 0 + } + + // Now for |targets| elements and check every next one in + // round robin fashion. + for i := 0; i < l; i++ { + p := (rrp.idx + i) % l + if cb, ok := targets[p].Reserve(ctx); ok { + // We want to start with the next index. + rrp.idx = p + 1 + return cb, targets[p] + } + } + // We exhausted all the options... + return noop, nil + } +} diff --git a/pkg/activator/net/lb_policy_test.go b/pkg/activator/net/lb_policy_test.go index 9fd467e55f29..c6a345619bc3 100644 --- a/pkg/activator/net/lb_policy_test.go +++ b/pkg/activator/net/lb_policy_test.go @@ -158,6 +158,38 @@ func TestFirstAvailable(t *testing.T) { }) } +func TestRoundRobin(t *testing.T) { + rrp := newRoundRobinPolicy() + podTrackers := makeTrackers(3, 1) + cb, pt := rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[0]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + cb, pt = rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[1]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + cb, pt = rrp(context.Background(), podTrackers) + if got, want := pt, podTrackers[2]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + _, pt = rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if pt != nil { + t.Fatal("Wanted nil, got: ", pt) + } + + // Reset last one. + cb() + cb, pt = rrp(context.Background(), podTrackers) + if got, want := pt, podTrackers[2]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + t.Cleanup(cb) +} + func BenchmarkPolicy(b *testing.B) { for _, test := range []struct { name string @@ -171,6 +203,9 @@ func BenchmarkPolicy(b *testing.B) { }, { name: "first-available", policy: firstAvailableLBPolicy, + }, { + name: "round-robin", + policy: newRoundRobinPolicy(), }} { for _, n := range []int{1, 2, 3, 10, 100} { b.Run(fmt.Sprintf("%s-%d-trackers-sequential", test.name, n), func(b *testing.B) { diff --git a/pkg/activator/net/throttler.go b/pkg/activator/net/throttler.go index d086d05c1d91..669daddf21a3 100644 --- a/pkg/activator/net/throttler.go +++ b/pkg/activator/net/throttler.go @@ -163,12 +163,18 @@ func newRevisionThrottler(revID types.NamespacedName, revBreaker breaker lbp lbPolicy ) - if containerConcurrency == 0 { + switch { + case containerConcurrency == 0: revBreaker = newInfiniteBreaker(logger) lbp = randomChoice2Policy - } else { + case containerConcurrency <= 3: + // For very low CC values use first available pod. revBreaker = queue.NewBreaker(breakerParams) lbp = firstAvailableLBPolicy + default: + // Otherwise RR. + revBreaker = queue.NewBreaker(breakerParams) + lbp = newRoundRobinPolicy() } return &revisionThrottler{ revID: revID, From a47214919dadf4a55019e68b5816d38d2f83fb92 Mon Sep 17 00:00:00 2001 From: Victor Agababov Date: Tue, 9 Jun 2020 08:56:43 -0700 Subject: [PATCH 2/2] more tests --- pkg/activator/net/lb_policy.go | 22 +++--- pkg/activator/net/lb_policy_test.go | 100 ++++++++++++++++++++-------- 2 files changed, 82 insertions(+), 40 deletions(-) diff --git a/pkg/activator/net/lb_policy.go b/pkg/activator/net/lb_policy.go index 740f7f0f0f67..d5c810ce5ad8 100644 --- a/pkg/activator/net/lb_policy.go +++ b/pkg/activator/net/lb_policy.go @@ -86,29 +86,27 @@ func firstAvailableLBPolicy(ctx context.Context, targets []*podTracker) (func(), return noop, nil } -type roundRobinPolicyT struct { - mu sync.Mutex - idx int -} - func newRoundRobinPolicy() lbPolicy { - rrp := roundRobinPolicyT{} + var ( + mu sync.Mutex + idx int + ) return func(ctx context.Context, targets []*podTracker) (func(), *podTracker) { - rrp.mu.Lock() - defer rrp.mu.Unlock() + mu.Lock() + defer mu.Unlock() // The number of trackers might have shrunk, so reset to 0. l := len(targets) - if rrp.idx >= l { - rrp.idx = 0 + if idx >= l { + idx = 0 } // Now for |targets| elements and check every next one in // round robin fashion. for i := 0; i < l; i++ { - p := (rrp.idx + i) % l + p := (idx + i) % l if cb, ok := targets[p].Reserve(ctx); ok { // We want to start with the next index. - rrp.idx = p + 1 + idx = p + 1 return cb, targets[p] } } diff --git a/pkg/activator/net/lb_policy_test.go b/pkg/activator/net/lb_policy_test.go index c6a345619bc3..95e53a24bade 100644 --- a/pkg/activator/net/lb_policy_test.go +++ b/pkg/activator/net/lb_policy_test.go @@ -159,35 +159,79 @@ func TestFirstAvailable(t *testing.T) { } func TestRoundRobin(t *testing.T) { - rrp := newRoundRobinPolicy() - podTrackers := makeTrackers(3, 1) - cb, pt := rrp(context.Background(), podTrackers) - t.Cleanup(cb) - if got, want := pt, podTrackers[0]; got != want { - t.Fatalf("Tracker = %v, want: %v", got, want) - } - cb, pt = rrp(context.Background(), podTrackers) - t.Cleanup(cb) - if got, want := pt, podTrackers[1]; got != want { - t.Fatalf("Tracker = %v, want: %v", got, want) - } - cb, pt = rrp(context.Background(), podTrackers) - if got, want := pt, podTrackers[2]; got != want { - t.Fatalf("Tracker = %v, want: %v", got, want) - } - _, pt = rrp(context.Background(), podTrackers) - t.Cleanup(cb) - if pt != nil { - t.Fatal("Wanted nil, got: ", pt) - } + t.Run("with cc=1", func(t *testing.T) { + rrp := newRoundRobinPolicy() + podTrackers := makeTrackers(3, 1) + cb, pt := rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[0]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + cb, pt = rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[1]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + // This will make it use shorter array, jump over and start from 0. + // But it's occupied already, so will fail to acquire. + _, pt = rrp(context.Background(), podTrackers[:1]) + if pt != nil { + t.Fatal("Wanted nil, got: ", pt) + } - // Reset last one. - cb() - cb, pt = rrp(context.Background(), podTrackers) - if got, want := pt, podTrackers[2]; got != want { - t.Fatalf("Tracker = %v, want: %v", got, want) - } - t.Cleanup(cb) + cb, pt = rrp(context.Background(), podTrackers) + if got, want := pt, podTrackers[2]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + _, pt = rrp(context.Background(), podTrackers) + if pt != nil { + t.Fatal("Wanted nil, got: ", pt) + } + + // Reset last one. + cb() + cb, pt = rrp(context.Background(), podTrackers) + if got, want := pt, podTrackers[2]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + t.Cleanup(cb) + }) + t.Run("with cc=2", func(t *testing.T) { + rrp := newRoundRobinPolicy() + podTrackers := makeTrackers(3, 2) + cb, pt := rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[0]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + cb, pt = rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[1]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + // This will make it use shorter array, jump over and start from 0. + cb, pt = rrp(context.Background(), podTrackers[:1]) + t.Cleanup(cb) + if got, want := pt, podTrackers[0]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + cb, pt = rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[1]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + cb, pt = rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[2]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + // Now index 0 has already 2 slots occupied, so is 1, so we should get 2 again. + cb, pt = rrp(context.Background(), podTrackers) + t.Cleanup(cb) + if got, want := pt, podTrackers[2]; got != want { + t.Fatalf("Tracker = %v, want: %v", got, want) + } + }) } func BenchmarkPolicy(b *testing.B) {