Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions pkg/activator/net/lb_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package net
import (
"context"
"math/rand"
"sync"
)

// lbPolicy is a functor that selects a target pod from the list, or (noop, nil) if
Expand Down Expand Up @@ -84,3 +85,34 @@ func firstAvailableLBPolicy(ctx context.Context, targets []*podTracker) (func(),
}
return noop, nil
}

type roundRobinPolicyT struct {
mu sync.Mutex
idx int
}
Comment thread
vagababov marked this conversation as resolved.
Outdated

func newRoundRobinPolicy() lbPolicy {
rrp := roundRobinPolicyT{}
return func(ctx context.Context, targets []*podTracker) (func(), *podTracker) {
rrp.mu.Lock()
defer rrp.mu.Unlock()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get away without locking? We could try to get away using atomics potentially, though the benchmarks don't really warrant that I suppose.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried, I was not happy with provable semantics for parallel requests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, we can iterate if necessary 🤷

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem, is that we start moving indices either independently or in interleaving fashion. In theory with enough requests it will still average out, but it's much harder to reason about

// The number of trackers might have shrunk, so reset to 0.
Comment thread
vagababov marked this conversation as resolved.
l := len(targets)
if rrp.idx >= l {
rrp.idx = 0
}

// Now for |targets| elements and check every next one in
// round robin fashion.
for i := 0; i < l; i++ {
p := (rrp.idx + i) % l
if cb, ok := targets[p].Reserve(ctx); ok {
// We want to start with the next index.
rrp.idx = p + 1
return cb, targets[p]
}
}
// We exhausted all the options...
return noop, nil
}
}
35 changes: 35 additions & 0 deletions pkg/activator/net/lb_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,38 @@ func TestFirstAvailable(t *testing.T) {
})
}

func TestRoundRobin(t *testing.T) {
rrp := newRoundRobinPolicy()
podTrackers := makeTrackers(3, 1)
cb, pt := rrp(context.Background(), podTrackers)
t.Cleanup(cb)
if got, want := pt, podTrackers[0]; got != want {
t.Fatalf("Tracker = %v, want: %v", got, want)
}
cb, pt = rrp(context.Background(), podTrackers)
t.Cleanup(cb)
if got, want := pt, podTrackers[1]; got != want {
t.Fatalf("Tracker = %v, want: %v", got, want)
}
cb, pt = rrp(context.Background(), podTrackers)
if got, want := pt, podTrackers[2]; got != want {
t.Fatalf("Tracker = %v, want: %v", got, want)
}
_, pt = rrp(context.Background(), podTrackers)
t.Cleanup(cb)
if pt != nil {
t.Fatal("Wanted nil, got: ", pt)
}
Comment thread
vagababov marked this conversation as resolved.
Outdated

// Reset last one.
cb()
cb, pt = rrp(context.Background(), podTrackers)
if got, want := pt, podTrackers[2]; got != want {
t.Fatalf("Tracker = %v, want: %v", got, want)
}
t.Cleanup(cb)
}

func BenchmarkPolicy(b *testing.B) {
for _, test := range []struct {
name string
Expand All @@ -171,6 +203,9 @@ func BenchmarkPolicy(b *testing.B) {
}, {
name: "first-available",
policy: firstAvailableLBPolicy,
}, {
name: "round-robin",
policy: newRoundRobinPolicy(),
}} {
for _, n := range []int{1, 2, 3, 10, 100} {
b.Run(fmt.Sprintf("%s-%d-trackers-sequential", test.name, n), func(b *testing.B) {
Expand Down
10 changes: 8 additions & 2 deletions pkg/activator/net/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,18 @@ func newRevisionThrottler(revID types.NamespacedName,
revBreaker breaker
lbp lbPolicy
)
if containerConcurrency == 0 {
switch {
case containerConcurrency == 0:
revBreaker = newInfiniteBreaker(logger)
lbp = randomChoice2Policy
} else {
case containerConcurrency <= 3:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have benchmarks that warrant this switch? I.e. have we tried round robin for the lower CC values?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well the benchmark above shows that full first is better even on code level. Given that pods might be shared (if we don't divide evenly) in case of lower cc we prefer to use the pods at the tail less, since they might be shared, causing queueing.

// For very low CC values use first available pod.
revBreaker = queue.NewBreaker(breakerParams)
lbp = firstAvailableLBPolicy
default:
// Otherwise RR.
revBreaker = queue.NewBreaker(breakerParams)
lbp = newRoundRobinPolicy()
}
return &revisionThrottler{
revID: revID,
Expand Down