-
Notifications
You must be signed in to change notification settings - Fork 4k
implemented and tested static stride scheduler for weighted round robin load balancing policy #10272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implemented and tested static stride scheduler for weighted round robin load balancing policy #10272
Conversation
go/static-stride-scheduler
|
|
|
You can continue pushing to your branch and update the PR as you go. |
xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Outdated
Show resolved
Hide resolved
xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Outdated
Show resolved
Hide resolved
| static final class StaticStrideScheduler { | ||
| private Vector<Long> scaledWeights; | ||
| private int sizeDivisor; | ||
| private long sequence; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be an AtomicInteger. If you haven't seen https://github.com/grpc/proposal/blob/master/A58-client-side-weighted-round-robin-lb-policy.md#earliest-deadline-first-edf-scheduler , there's a few things there. We don't have to do the identical approach, but we'd avoid doing things differently when unnecessary.
lambda () -> uint32 next_seq_fn is "a function you call to get an int32. The equivalent of that in Java is AtomicInteger.getAndIncrement(). One important part of that function in the design is that it is shared across scheduler instances. If you re-create the scheduler for the same input data, you get the same results, so the code doesn't have to worry about poor weighting if it is re-created frequently. Since we didn't have that sort of state previously, the old scheduler had to randomize itself.
xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Outdated
Show resolved
Hide resolved
xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Outdated
Show resolved
Hide resolved
xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| private int nextSequence() { | ||
| return this.sequence.getAndUpdate(seq -> ((seq + 1) % UINT32_MAX)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the seq reaches Integer.MAX_VALUE , it rolls over and this becomes -1. The seq should be non negative.
The staticScheduler lives in the picker, and there might be multiple instances of picker in the system, so ideally the seq should be global among all the pickers. Alternatively, we need to randomize the weight during the static scheduler construction time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you are storing 32 bits, instead of AtomicLong, use AtomicInteger. Then, you just need to cast up to long as you read it out. That can be done with Integer.toUnsignedLong() or seq & UINT32_MAX. Masking instead of modulus makes it easier to avoid negative results. Then, instead of getAndUpdate(), just use getAndIncrement(). So this becomes:
private final AtomicInteger sequence;
private long nextSequence() {
return Integer.toUnsignedLong(sequence.getAndIncrement());
}Note that I am paying attention to & UINT32_MAX being different from % UINT32_MAX. But we don't care about the difference here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was under the impression that AtomicInteger was signed, which would mean that it cannot support the max value for an unsigned 32 bit integer. Is this not the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It still has 32 bits and addition (the increment) is the same for signed and unsigned numbers. So the bits are what we'd hope they would be, but Java just won't treat them right in math. When casting to a larger integer size, then only difference between signed and unsigned is whether you sign extend: whether you fill the new bits on the left with 0s or 1s. For unsigned, you always use 0s. For signed, you copy the sign bit to the upper bits. So if you want unsigned conversion but only have signed, you do a signed conversion and then force the top bits to be 0s with the bitwise AND.
We could use AtomicLong and mask out the lower bits, but if we are masking the bits then AtomicInteger is just as good.
xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Outdated
Show resolved
Hide resolved
xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Outdated
Show resolved
Hide resolved
xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Outdated
Show resolved
Hide resolved
xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Outdated
Show resolved
Hide resolved
a232101 to
347b46f
Compare
xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Outdated
Show resolved
Hide resolved
| // scales weights s.t. max(weights) == K_MAX_WEIGHT, meanWeight is scaled accordingly | ||
| int[] scaledWeights = new int[numChannels]; | ||
| for (int i = 0; i < numChannels; i++) { | ||
| if (weights[i] < 0.0001) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly this should be compared to 0
xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Outdated
Show resolved
Hide resolved
xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java
Outdated
Show resolved
Hide resolved
xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java
Outdated
Show resolved
Hide resolved
xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java
Outdated
Show resolved
Hide resolved
xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Outdated
Show resolved
Hide resolved
| private final AtomicInteger sequence; | ||
| private static final int K_MAX_WEIGHT = 0xFFFF; | ||
|
|
||
| StaticStrideScheduler(float[] weights, Random random) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do it in a later PR (by you or someone else), but we will want to replace this Random with AtomicInteger sequence. The only reason to have the random now is because we couldn't carry state between re-creations of the scheduler. But the only mutable state now is the integer, so we can follow the gRFC.
| * in which each object is chosen periodically with frequency proportional to its weight. | ||
| * <p> | ||
| * Specifically, each backend is given a deadline equal to the multiplicative inverse of | ||
| * its weight. The place of each backend in its deadline is tracked, and each call to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This (L335-L337) is describing the priority queue implementation of EDF and is no longer the case here, but sounds like it is how current implementation is.
xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Outdated
Show resolved
Hide resolved
YifeiZhuang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * For example, if items A and B are added with weights 0.5 and 0.2, successive chooses return: | ||
| /* | ||
| * The Static Stride Scheduler is an implementation of an earliest deadline first (EDF) scheduler | ||
| * in which each object is chosen periodically with frequency proportional to its weight. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see why some of that stuff was removed, but the comment now has the same problem an earlier version of this code has: it name-drops EDF without actually saying how it is mapped to a scheduling problem. Let's just replace this line with "in which each object's deadline is the multiplicative inverse of the object's weight." I think that is the most important part of the mapping.
|
|
||
| /** | ||
| * Picks the next WRR object. | ||
| long getSequence() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Annotate with com.google.common.annotations.VisibleForTesting
The core of the Weighted Round Robin load balancer policy on the client side is a stride scheduler originally implemented by an EDFScheduler. However, the mutex lock required by the EDFScheduler has been a frequent source of thread contention at high request rates and a block on other cost saving efforts.
The Static Stride Scheduler is a generator of a practically equivalent sequence of picks as the current EDFScheduler. It removes the need for a priority queue (and thus a lock) and improves latency at high request rates.
Weighted Round Robin LB Policy