From 91594c20c7e7e8b2509d7626809c66443178f0a4 Mon Sep 17 00:00:00 2001 From: Victor Agababov Date: Fri, 5 Jun 2020 10:47:33 -0700 Subject: [PATCH 1/2] New lb policies --- pkg/activator/net/lb_policy.go | 33 ++++++++++++++ pkg/activator/net/lb_policy_test.go | 70 +++++++++++++++++++++++++++++ pkg/activator/net/throttler.go | 12 ++++- 3 files changed, 114 insertions(+), 1 deletion(-) diff --git a/pkg/activator/net/lb_policy.go b/pkg/activator/net/lb_policy.go index 3c6025b746d9..e0ce9ee93dd2 100644 --- a/pkg/activator/net/lb_policy.go +++ b/pkg/activator/net/lb_policy.go @@ -36,6 +36,39 @@ func randomLBPolicy(_ context.Context, targets []*podTracker) (func(), *podTrack return noop, targets[rand.Intn(len(targets))] } +// randomChoice2 implementes the Power of 2 choices LB algorithm +func randomChoice2(_ context.Context, targets []*podTracker) (func(), *podTracker) { + // Avoid random if possible. + l := len(targets) + // One tracker = no choice. + if l == 1 { + targets[0].addWeight(1) + return func() { + targets[0].addWeight(-1) + }, targets[0] + } + r1, r2 := 0, 1 + // Two trackers - we know the both contestants, + // otherwise pick 2 random unequal integers. + if l > 2 { + r1, r2 = rand.Intn(l), rand.Intn(l) + for r1 == r2 { + r2 = rand.Intn(l) + } + } + + t1, t2 := targets[r1], targets[r2] + // Possible race here, but this policy is for CC=0, + // so fine. + if t1.getWeight() > t2.getWeight() { + t1, t2 = t2, t1 + } + t1.addWeight(1) + return func() { + t1.addWeight(-1) + }, t1 +} + // firstAvailableLBPolicy is a load balancer policy, that picks the first target // that has capacity to serve the request right now. func firstAvailableLBPolicy(ctx context.Context, targets []*podTracker) (func(), *podTracker) { diff --git a/pkg/activator/net/lb_policy_test.go b/pkg/activator/net/lb_policy_test.go index 638dd3bd4724..45154769eb79 100644 --- a/pkg/activator/net/lb_policy_test.go +++ b/pkg/activator/net/lb_policy_test.go @@ -24,6 +24,76 @@ import ( "knative.dev/serving/pkg/queue" ) +func TestRandomChoice2(t *testing.T) { + t.Run("1 tracker", func(t *testing.T) { + podTrackers := makeTrackers(1, 0) + cb, pt := randomChoice2(nil, podTrackers) + t.Cleanup(cb) + if got, want := pt.dest, podTrackers[0].dest; got != want { + t.Errorf("pt.dest = %s, want: %s", got, want) + } + wantW := int32(1) // to avoid casting on every check. + if got, want := pt.getWeight(), wantW; got != want { + t.Errorf("pt.weight = %d, want: %d", got, want) + } + cb, pt = randomChoice2(nil, podTrackers) + if got, want := pt.dest, podTrackers[0].dest; got != want { + t.Errorf("pt.dest = %s, want: %s", got, want) + } + if got, want := pt.getWeight(), wantW+1; got != want { + t.Errorf("pt.weight = %d, want: %d", got, want) + } + cb() + if got, want := pt.getWeight(), wantW; got != want { + t.Errorf("pt.weight = %d, want: %d", got, want) + } + }) + t.Run("2 trackers", func(t *testing.T) { + podTrackers := makeTrackers(2, 0) + cb, pt := randomChoice2(nil, podTrackers) + t.Cleanup(cb) + wantW := int32(1) // to avoid casting on every check. + if got, want := pt.getWeight(), wantW; got != want { + t.Errorf("pt.weight = %d, want: %d", got, want) + } + // Must return a different one. + cb, pt = randomChoice2(nil, podTrackers) + dest := pt.dest + if got, want := pt.getWeight(), wantW; got != want { + t.Errorf("pt.weight = %d, want: %d", got, want) + } + cb() + // Should return the same one. + cb, pt = randomChoice2(nil, podTrackers) + if got, want := pt.getWeight(), wantW; got != want { + t.Errorf("pt.weight = %d, want: %d", got, want) + } + if got, want := pt.dest, dest; got != want { + t.Errorf("pt.dest = %s, want: %s", got, want) + } + }) + t.Run("3 trackers", func(t *testing.T) { + podTrackers := makeTrackers(3, 0) + cb, pt := randomChoice2(nil, podTrackers) + t.Cleanup(cb) + wantW := int32(1) // to avoid casting on every check. + if got, want := pt.getWeight(), wantW; got != want { + t.Errorf("pt.weight = %d, want: %d", got, want) + } + // Must return a different one. + cb, pt = randomChoice2(nil, podTrackers) + if got, want := pt.getWeight(), wantW; got != want { + t.Errorf("pt.weight = %d, want: %d", got, want) + } + cb() + // Should return same or the other unsued one. + cb, pt = randomChoice2(nil, podTrackers) + if got, want := pt.getWeight(), wantW; got != want { + t.Errorf("pt.weight = %d, want: %d", got, want) + } + }) +} + func TestFirstAvailable(t *testing.T) { t.Run("1 tracker, 1 slot", func(t *testing.T) { podTrackers := []*podTracker{{ diff --git a/pkg/activator/net/throttler.go b/pkg/activator/net/throttler.go index f3dd23fc9dd4..1728c4314a26 100644 --- a/pkg/activator/net/throttler.go +++ b/pkg/activator/net/throttler.go @@ -70,6 +70,16 @@ var ( type podTracker struct { dest string b breaker + // weight is used for LB policy implementations. + weight int32 +} + +func (p *podTracker) addWeight(w int32) { + atomic.AddInt32(&p.weight, w) +} + +func (p *podTracker) getWeight() int32 { + return atomic.LoadInt32(&p.weight) } func (p *podTracker) String() string { @@ -155,7 +165,7 @@ func newRevisionThrottler(revID types.NamespacedName, ) if containerConcurrency == 0 { revBreaker = newInfiniteBreaker(logger) - lbp = randomLBPolicy + lbp = randomChoice2 } else { revBreaker = queue.NewBreaker(breakerParams) lbp = firstAvailableLBPolicy From 3d2acd4b9bcd9816b7d1f5e652a6a47410c21c56 Mon Sep 17 00:00:00 2001 From: Victor Agababov Date: Fri, 5 Jun 2020 17:08:57 -0700 Subject: [PATCH 2/2] fix the nits --- pkg/activator/net/lb_policy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/activator/net/lb_policy.go b/pkg/activator/net/lb_policy.go index e0ce9ee93dd2..c8bd7e1c25c3 100644 --- a/pkg/activator/net/lb_policy.go +++ b/pkg/activator/net/lb_policy.go @@ -36,7 +36,7 @@ func randomLBPolicy(_ context.Context, targets []*podTracker) (func(), *podTrack return noop, targets[rand.Intn(len(targets))] } -// randomChoice2 implementes the Power of 2 choices LB algorithm +// randomChoice2 implements the Power of 2 choices LB algorithm func randomChoice2(_ context.Context, targets []*podTracker) (func(), *podTracker) { // Avoid random if possible. l := len(targets)