Skip to content
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b5cf7b0
implemented static stride scheduler class/algorithm (currently unused)
tonyjongyoonan Jun 9, 2023
44a5158
added imports, fixed small errors
tonyjongyoonan Jun 12, 2023
32973d4
changed to array, final class vars, atomic integer
tonyjongyoonan Jun 12, 2023
4bde79d
added static stride scheduler test cases
tonyjongyoonan Jun 12, 2023
d99d51a
fixing test case datatypes
tonyjongyoonan Jun 12, 2023
13a9fd8
added check argument for edge case: no weights inputted
tonyjongyoonan Jun 12, 2023
3d9e625
replaced edf scheduler with static stride scheduler
tonyjongyoonan Jun 12, 2023
2b054d3
added test case
tonyjongyoonan Jun 12, 2023
4addda3
fixed style errors, renamed schedulers
tonyjongyoonan Jun 12, 2023
4a820a7
quick fix
tonyjongyoonan Jun 12, 2023
dc7960a
bug fix attempt
tonyjongyoonan Jun 13, 2023
acd3425
fixed verbose work in updateWeightSS(), float equality
tonyjongyoonan Jun 13, 2023
ba1a0b7
fixed pickChannel() by removing kOffset, fixed atomic integer
tonyjongyoonan Jun 14, 2023
9f4a60d
added example test cases
tonyjongyoonan Jun 14, 2023
e11e542
types changed from long to int
tonyjongyoonan Jun 14, 2023
c76cbe5
added sss test cases
tonyjongyoonan Jun 14, 2023
903d2ac
added more edge cases (negative/zero weights)
tonyjongyoonan Jun 14, 2023
d5a0629
compile fix
tonyjongyoonan Jun 14, 2023
5982115
more than 3 channels test case
tonyjongyoonan Jun 14, 2023
88a8e48
fixed negative case
tonyjongyoonan Jun 15, 2023
f9cae20
end to end test cases
tonyjongyoonan Jun 15, 2023
dba0778
fixed sequence
tonyjongyoonan Jun 15, 2023
46a463e
ident fix
tonyjongyoonan Jun 15, 2023
142b499
replaced deterministic test cases with ones with many iterations, del…
tonyjongyoonan Jun 15, 2023
5523337
fixed sequence, added comments, deleted edf
tonyjongyoonan Jun 15, 2023
365ba8d
adjusted constructor in tests
tonyjongyoonan Jun 15, 2023
442bea8
fixed constructor to remove random
tonyjongyoonan Jun 15, 2023
2a0e489
optimized for scaling, fixed next sequence
tonyjongyoonan Jun 16, 2023
1731862
added random to resolve static class failure
tonyjongyoonan Jun 16, 2023
5e5127f
adjusted test cases to have random, deleted deterministic test cases …
tonyjongyoonan Jun 16, 2023
f0421d2
fixed test case and number to hex
tonyjongyoonan Jun 20, 2023
ec46fe3
fixed sequence, changed long --> int, added comments
tonyjongyoonan Jun 23, 2023
8ddf284
fixed sequence, changed types, added comments
tonyjongyoonan Jun 23, 2023
347b46f
fixing weird commit 1
tonyjongyoonan Jun 23, 2023
4a4762e
more syntax changes
tonyjongyoonan Jun 23, 2023
f526bf6
Delete settings.json
tonyjongyoonan Jun 23, 2023
21ceb85
modified test
tonyjongyoonan Jun 23, 2023
03de3f9
fixed feedback
tonyjongyoonan Jun 26, 2023
e2eb7f9
changed weights to short from int
tonyjongyoonan Jun 27, 2023
4072907
fixed message and short logic
tonyjongyoonan Jun 29, 2023
f749e52
removed obselete comment
tonyjongyoonan Jul 1, 2023
be21c23
changed annotations
tonyjongyoonan Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 84 additions & 107 deletions xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -120,7 +120,7 @@ private final class UpdateWeightTask implements Runnable {
@Override
public void run() {
if (currentPicker != null && currentPicker instanceof WeightedRoundRobinPicker) {
((WeightedRoundRobinPicker)currentPicker).updateWeight();
((WeightedRoundRobinPicker) currentPicker).updateWeight();
}
weightUpdateTimer = syncContext.schedule(this, config.weightUpdatePeriodNanos,
TimeUnit.NANOSECONDS, timeService);
Expand Down Expand Up @@ -258,7 +258,7 @@ final class WeightedRoundRobinPicker extends RoundRobinPicker {
new HashMap<>();
private final boolean enableOobLoadReport;
private final float errorUtilizationPenalty;
private volatile EdfScheduler scheduler;
private volatile StaticStrideScheduler scheduler;

WeightedRoundRobinPicker(List<Subchannel> list, boolean enableOobLoadReport,
float errorUtilizationPenalty) {
Expand All @@ -279,7 +279,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
Subchannel subchannel = list.get(scheduler.pick());
if (!enableOobLoadReport) {
return PickResult.withSubchannel(subchannel,
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
subchannelToReportListenerMap.getOrDefault(subchannel,
((WrrSubchannel) subchannel).new OrcaReportListener(errorUtilizationPenalty))));
} else {
Expand All @@ -288,26 +288,14 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
}

private void updateWeight() {
int weightedChannelCount = 0;
double avgWeight = 0;
for (Subchannel value : list) {
double newWeight = ((WrrSubchannel) value).getWeight();
if (newWeight > 0) {
avgWeight += newWeight;
weightedChannelCount++;
}
}
EdfScheduler scheduler = new EdfScheduler(list.size(), random);
if (weightedChannelCount >= 1) {
avgWeight /= 1.0 * weightedChannelCount;
} else {
avgWeight = 1;
}
float[] newWeights = new float[list.size()];
for (int i = 0; i < list.size(); i++) {
WrrSubchannel subchannel = (WrrSubchannel) list.get(i);
double newWeight = subchannel.getWeight();
scheduler.add(i, newWeight > 0 ? newWeight : avgWeight);
newWeights[i] = newWeight > 0 ? (float) newWeight : 0.0f;
}

StaticStrideScheduler scheduler = new StaticStrideScheduler(newWeights, random);
this.scheduler = scheduler;
}

Expand Down Expand Up @@ -340,111 +328,100 @@ public boolean isEquivalentTo(RoundRobinPicker picker) {
}
}

/**
* The earliest deadline first implementation in which each object is
* chosen deterministically and periodically with frequency proportional to its weight.
*
* <p>Specifically, each object added to chooser is given a deadline equal to the multiplicative
* inverse of its weight. The place of each object in its deadline is tracked, and each call to
* choose returns the object with the least remaining time in its deadline.
* (Ties are broken by the order in which the children were added to the chooser.) The deadline
* advances by the multiplicative inverse of the object's weight.
* For example, if items A and B are added with weights 0.5 and 0.2, successive chooses return:
/*
* Implementation of Static Stride Scheduler, replaces EDFScheduler.
* <p>
* The Static Stride Scheduler works by iterating through the list of subchannel weights
* and using modular arithmetic to evenly distribute picks and skips, favoring entries with the
* highest weight. It generates a practically equivalent sequence of picks as the EDFScheduler.
* Albeit needing more bandwidth, the Static Stride Scheduler is more performant than the
* EDFScheduler, as it removes the need for a priority queue (and thus mutex locks).
* <p>
* go/static-stride-scheduler
* <p>
*
* <ul>
* <li>In the first call, the deadlines are A=2 (1/0.5) and B=5 (1/0.2), so A is returned.
* The deadline of A is updated to 4.
* <li>Next, the remaining deadlines are A=4 and B=5, so A is returned. The deadline of A (2) is
* updated to A=6.
* <li>Remaining deadlines are A=6 and B=5, so B is returned. The deadline of B is updated with
* with B=10.
* <li>Remaining deadlines are A=6 and B=10, so A is returned. The deadline of A is updated with
* A=8.
* <li>Remaining deadlines are A=8 and B=10, so A is returned. The deadline of A is updated with
* A=10.
* <li>Remaining deadlines are A=10 and B=10, so A is returned. The deadline of A is updated
* with A=12.
* <li>Remaining deadlines are A=12 and B=10, so B is returned. The deadline of B is updated
* with B=15.
* <li>etc.
* </ul>
*
* <p>In short: the entry with the highest weight is preferred.
*
* <ul>
* <li>add() - O(lg n)
* <li>pick() - O(lg n)
* </ul>
*
* <li>nextSequence() - O(1)
* <li>pick() - O(n)
*/
@VisibleForTesting
static final class EdfScheduler {
private final PriorityQueue<ObjectState> prioQueue;

/**
* Weights below this value will be upped to this minimum weight.
*/
private static final double MINIMUM_WEIGHT = 0.0001;

private final Object lock = new Object();
static final class StaticStrideScheduler {
private final int[] scaledWeights;
private final int sizeDivisor;
private final AtomicInteger sequence;
private static final int K_MAX_WEIGHT = 0xFFFF;

StaticStrideScheduler(float[] weights, Random random) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do it in a later PR (by you or someone else), but we will want to replace this Random with AtomicInteger sequence. The only reason to have the random now is because we couldn't carry state between re-creations of the scheduler. But the only mutable state now is the integer, so we can follow the gRFC.

checkArgument(weights.length >= 1, "Couldn't build scheduler: requires at least one weight");
int numChannels = weights.length;
int numWeightedChannels = 0;
double sumWeight = 0;
float maxWeight = 0;
int meanWeight = 0;
for (float weight : weights) {
if (weight > 0.0001) {
sumWeight += weight;
maxWeight = Math.max(weight, maxWeight);
numWeightedChannels++;
}
}

private final Random random;
double scalingFactor = K_MAX_WEIGHT / maxWeight;
if (numWeightedChannels > 0) {
meanWeight = (int) Math.round(scalingFactor * sumWeight / numWeightedChannels);
} else {
meanWeight = 1;
}

/**
* Use the item's deadline as the order in the priority queue. If the deadlines are the same,
* use the index. Index should be unique.
*/
EdfScheduler(int initialCapacity, Random random) {
this.prioQueue = new PriorityQueue<ObjectState>(initialCapacity, (o1, o2) -> {
if (o1.deadline == o2.deadline) {
return Integer.compare(o1.index, o2.index);
// scales weights s.t. max(weights) == K_MAX_WEIGHT, meanWeight is scaled accordingly
int[] scaledWeights = new int[numChannels];
for (int i = 0; i < numChannels; i++) {
if (weights[i] < 0.0001) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly this should be compared to 0

scaledWeights[i] = meanWeight;
} else {
return Double.compare(o1.deadline, o2.deadline);
scaledWeights[i] = (int) Math.round(weights[i] * scalingFactor);
}
});
this.random = random;
}

this.scaledWeights = scaledWeights;
this.sizeDivisor = numChannels;
this.sequence = new AtomicInteger(random.nextInt());

}

/**
* Adds the item in the scheduler. This is not thread safe.
*
* @param index The field {@link ObjectState#index} to be added
* @param weight positive weight for the added object
*/
void add(int index, double weight) {
checkArgument(weight > 0.0, "Weights need to be positive.");
ObjectState state = new ObjectState(Math.max(weight, MINIMUM_WEIGHT), index);
// Randomize the initial deadline.
state.deadline = random.nextDouble() * (1 / state.weight);
prioQueue.add(state);
/** Returns the next sequence number and atomically increases sequence with wraparound. */
private long nextSequence() {
return Integer.toUnsignedLong(sequence.getAndIncrement());
}

/**
* Picks the next WRR object.
public long getSequence() {
return Integer.toUnsignedLong(sequence.get());
}

/*
* Selects index of next backend server.
* <p>
* A 2D array is compactly represented where the row represents the generation and the column
* represents the backend index. The value of an element is a boolean value which indicates
* whether or not a backend should be picked now. An atomically incremented counter keeps track
* of our backend and generation through modular arithmetic within the pick() method.
* An offset is also included to minimize consecutive non-picks of a backend.
*/
int pick() {
synchronized (lock) {
ObjectState minObject = prioQueue.remove();
minObject.deadline += 1.0 / minObject.weight;
prioQueue.add(minObject);
return minObject.index;
while (true) {
long sequence = this.nextSequence();
int backendIndex = (int) (sequence % this.sizeDivisor);
long generation = sequence / this.sizeDivisor;
long weight = this.scaledWeights[backendIndex];
long offset = (long) K_MAX_WEIGHT / 2 * backendIndex;
if ((weight * generation + offset) % K_MAX_WEIGHT < K_MAX_WEIGHT - weight) {
continue;
}
return backendIndex;
}
}
}

/** Holds the state of the object. */
@VisibleForTesting
static class ObjectState {
private final double weight;
private final int index;
private volatile double deadline;

ObjectState(double weight, int index) {
this.weight = weight;
this.index = index;
}
}

static final class WeightedRoundRobinLoadBalancerConfig {
final long blackoutPeriodNanos;
final long weightExpirationPeriodNanos;
Expand Down
Loading