Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions pkg/activator/net/lb_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,39 @@ func randomLBPolicy(_ context.Context, targets []*podTracker) (func(), *podTrack
return noop, targets[rand.Intn(len(targets))]
}

// randomChoice2 implements the Power of 2 choices LB algorithm
func randomChoice2(_ context.Context, targets []*podTracker) (func(), *podTracker) {
// Avoid random if possible.
l := len(targets)
// One tracker = no choice.
if l == 1 {
targets[0].addWeight(1)
return func() {
targets[0].addWeight(-1)
}, targets[0]
}
r1, r2 := 0, 1
// Two trackers - we know the both contestants,
// otherwise pick 2 random unequal integers.
if l > 2 {
r1, r2 = rand.Intn(l), rand.Intn(l)
for r1 == r2 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could probably skip the loop here by not including r1 in the choices for the second pick (ie rand(l), rand(l-1)). Seems fine to do this as a follow-on optimisation tho.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're up late :-)
I want to run this on the official benchmarks to see how it goes. We can try RR next (some variation thereof — least loaded RR).
But yeah we can probably squeeze a bit more from this, but it's gonna be nanoseconds.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeps, sgtm

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(early not late! :D)

r2 = rand.Intn(l)
}
}

t1, t2 := targets[r1], targets[r2]
// Possible race here, but this policy is for CC=0,
// so fine.
if t1.getWeight() > t2.getWeight() {
t1, t2 = t2, t1
}
t1.addWeight(1)
return func() {
t1.addWeight(-1)
}, t1
}

// firstAvailableLBPolicy is a load balancer policy, that picks the first target
// that has capacity to serve the request right now.
func firstAvailableLBPolicy(ctx context.Context, targets []*podTracker) (func(), *podTracker) {
Expand Down
70 changes: 70 additions & 0 deletions pkg/activator/net/lb_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,76 @@ import (
"knative.dev/serving/pkg/queue"
)

func TestRandomChoice2(t *testing.T) {
t.Run("1 tracker", func(t *testing.T) {
podTrackers := makeTrackers(1, 0)
cb, pt := randomChoice2(nil, podTrackers)
t.Cleanup(cb)
if got, want := pt.dest, podTrackers[0].dest; got != want {
t.Errorf("pt.dest = %s, want: %s", got, want)
}
wantW := int32(1) // to avoid casting on every check.
if got, want := pt.getWeight(), wantW; got != want {
t.Errorf("pt.weight = %d, want: %d", got, want)
}
cb, pt = randomChoice2(nil, podTrackers)
if got, want := pt.dest, podTrackers[0].dest; got != want {
t.Errorf("pt.dest = %s, want: %s", got, want)
}
if got, want := pt.getWeight(), wantW+1; got != want {
t.Errorf("pt.weight = %d, want: %d", got, want)
}
cb()
if got, want := pt.getWeight(), wantW; got != want {
t.Errorf("pt.weight = %d, want: %d", got, want)
}
})
t.Run("2 trackers", func(t *testing.T) {
podTrackers := makeTrackers(2, 0)
cb, pt := randomChoice2(nil, podTrackers)
t.Cleanup(cb)
wantW := int32(1) // to avoid casting on every check.
if got, want := pt.getWeight(), wantW; got != want {
t.Errorf("pt.weight = %d, want: %d", got, want)
}
// Must return a different one.
cb, pt = randomChoice2(nil, podTrackers)
dest := pt.dest
if got, want := pt.getWeight(), wantW; got != want {
t.Errorf("pt.weight = %d, want: %d", got, want)
}
cb()
// Should return the same one.
cb, pt = randomChoice2(nil, podTrackers)
if got, want := pt.getWeight(), wantW; got != want {
t.Errorf("pt.weight = %d, want: %d", got, want)
}
if got, want := pt.dest, dest; got != want {
t.Errorf("pt.dest = %s, want: %s", got, want)
}
})
t.Run("3 trackers", func(t *testing.T) {
podTrackers := makeTrackers(3, 0)
cb, pt := randomChoice2(nil, podTrackers)
t.Cleanup(cb)
wantW := int32(1) // to avoid casting on every check.
if got, want := pt.getWeight(), wantW; got != want {
t.Errorf("pt.weight = %d, want: %d", got, want)
}
// Must return a different one.
cb, pt = randomChoice2(nil, podTrackers)
if got, want := pt.getWeight(), wantW; got != want {
t.Errorf("pt.weight = %d, want: %d", got, want)
}
cb()
// Should return same or the other unsued one.
cb, pt = randomChoice2(nil, podTrackers)
if got, want := pt.getWeight(), wantW; got != want {
t.Errorf("pt.weight = %d, want: %d", got, want)
}
})
}

func TestFirstAvailable(t *testing.T) {
t.Run("1 tracker, 1 slot", func(t *testing.T) {
podTrackers := []*podTracker{{
Expand Down
12 changes: 11 additions & 1 deletion pkg/activator/net/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ var (
type podTracker struct {
dest string
b breaker
// weight is used for LB policy implementations.
weight int32
}

func (p *podTracker) addWeight(w int32) {
atomic.AddInt32(&p.weight, w)
}

func (p *podTracker) getWeight() int32 {
return atomic.LoadInt32(&p.weight)
}

func (p *podTracker) String() string {
Expand Down Expand Up @@ -155,7 +165,7 @@ func newRevisionThrottler(revID types.NamespacedName,
)
if containerConcurrency == 0 {
revBreaker = newInfiniteBreaker(logger)
lbp = randomLBPolicy
lbp = randomChoice2
} else {
revBreaker = queue.NewBreaker(breakerParams)
lbp = firstAvailableLBPolicy
Expand Down