Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions pkg/activator/net/lb_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package net
import (
"context"
"math/rand"
"sync"
)

// lbPolicy is a functor that selects a target pod from the list, or (noop, nil) if
Expand Down Expand Up @@ -84,3 +85,32 @@ func firstAvailableLBPolicy(ctx context.Context, targets []*podTracker) (func(),
}
return noop, nil
}

func newRoundRobinPolicy() lbPolicy {
var (
mu sync.Mutex
idx int
)
return func(ctx context.Context, targets []*podTracker) (func(), *podTracker) {
mu.Lock()
defer mu.Unlock()
// The number of trackers might have shrunk, so reset to 0.
l := len(targets)
if idx >= l {
idx = 0
}

// Now for |targets| elements and check every next one in
// round robin fashion.
for i := 0; i < l; i++ {
p := (idx + i) % l
if cb, ok := targets[p].Reserve(ctx); ok {
// We want to start with the next index.
idx = p + 1
return cb, targets[p]
}
}
// We exhausted all the options...
return noop, nil
}
}
79 changes: 79 additions & 0 deletions pkg/activator/net/lb_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,82 @@ func TestFirstAvailable(t *testing.T) {
})
}

func TestRoundRobin(t *testing.T) {
t.Run("with cc=1", func(t *testing.T) {
rrp := newRoundRobinPolicy()
podTrackers := makeTrackers(3, 1)
cb, pt := rrp(context.Background(), podTrackers)
t.Cleanup(cb)
if got, want := pt, podTrackers[0]; got != want {
t.Fatalf("Tracker = %v, want: %v", got, want)
}
cb, pt = rrp(context.Background(), podTrackers)
t.Cleanup(cb)
if got, want := pt, podTrackers[1]; got != want {
t.Fatalf("Tracker = %v, want: %v", got, want)
}
// This will make it use shorter array, jump over and start from 0.
// But it's occupied already, so will fail to acquire.
_, pt = rrp(context.Background(), podTrackers[:1])
if pt != nil {
t.Fatal("Wanted nil, got: ", pt)
}

cb, pt = rrp(context.Background(), podTrackers)
if got, want := pt, podTrackers[2]; got != want {
t.Fatalf("Tracker = %v, want: %v", got, want)
}
_, pt = rrp(context.Background(), podTrackers)
if pt != nil {
t.Fatal("Wanted nil, got: ", pt)
}

// Reset last one.
cb()
cb, pt = rrp(context.Background(), podTrackers)
if got, want := pt, podTrackers[2]; got != want {
t.Fatalf("Tracker = %v, want: %v", got, want)
}
t.Cleanup(cb)
})
t.Run("with cc=2", func(t *testing.T) {
rrp := newRoundRobinPolicy()
podTrackers := makeTrackers(3, 2)
cb, pt := rrp(context.Background(), podTrackers)
t.Cleanup(cb)
if got, want := pt, podTrackers[0]; got != want {
t.Fatalf("Tracker = %v, want: %v", got, want)
}
cb, pt = rrp(context.Background(), podTrackers)
t.Cleanup(cb)
if got, want := pt, podTrackers[1]; got != want {
t.Fatalf("Tracker = %v, want: %v", got, want)
}
// This will make it use shorter array, jump over and start from 0.
cb, pt = rrp(context.Background(), podTrackers[:1])
t.Cleanup(cb)
if got, want := pt, podTrackers[0]; got != want {
t.Fatalf("Tracker = %v, want: %v", got, want)
}
cb, pt = rrp(context.Background(), podTrackers)
t.Cleanup(cb)
if got, want := pt, podTrackers[1]; got != want {
t.Fatalf("Tracker = %v, want: %v", got, want)
}
cb, pt = rrp(context.Background(), podTrackers)
t.Cleanup(cb)
if got, want := pt, podTrackers[2]; got != want {
t.Fatalf("Tracker = %v, want: %v", got, want)
}
// Now index 0 has already 2 slots occupied, so is 1, so we should get 2 again.
cb, pt = rrp(context.Background(), podTrackers)
t.Cleanup(cb)
if got, want := pt, podTrackers[2]; got != want {
t.Fatalf("Tracker = %v, want: %v", got, want)
}
})
}

func BenchmarkPolicy(b *testing.B) {
for _, test := range []struct {
name string
Expand All @@ -171,6 +247,9 @@ func BenchmarkPolicy(b *testing.B) {
}, {
name: "first-available",
policy: firstAvailableLBPolicy,
}, {
name: "round-robin",
policy: newRoundRobinPolicy(),
}} {
for _, n := range []int{1, 2, 3, 10, 100} {
b.Run(fmt.Sprintf("%s-%d-trackers-sequential", test.name, n), func(b *testing.B) {
Expand Down
10 changes: 8 additions & 2 deletions pkg/activator/net/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,18 @@ func newRevisionThrottler(revID types.NamespacedName,
revBreaker breaker
lbp lbPolicy
)
if containerConcurrency == 0 {
switch {
case containerConcurrency == 0:
revBreaker = newInfiniteBreaker(logger)
lbp = randomChoice2Policy
} else {
case containerConcurrency <= 3:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have benchmarks that warrant this switch? I.e. have we tried round robin for the lower CC values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well the benchmark above shows that full first is better even on code level. Given that pods might be shared (if we don't divide evenly) in case of lower cc we prefer to use the pods at the tail less, since they might be shared, causing queueing.

// For very low CC values use first available pod.
revBreaker = queue.NewBreaker(breakerParams)
lbp = firstAvailableLBPolicy
default:
// Otherwise RR.
revBreaker = queue.NewBreaker(breakerParams)
lbp = newRoundRobinPolicy()
}
return &revisionThrottler{
revID: revID,
Expand Down