Skip to content

Commit a232101

Browse files
fixed sequence, changed dtypes, added comments
1 parent aebf247 commit a232101

File tree

1 file changed

+47
-37
lines changed

1 file changed

+47
-37
lines changed

xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java

Lines changed: 47 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9885")
6060
final class WeightedRoundRobinLoadBalancer extends RoundRobinLoadBalancer {
6161
private static final Logger log = Logger.getLogger(
62-
WeightedRoundRobinLoadBalancer.class.getName());
62+
WeightedRoundRobinLoadBalancer.class.getName());
6363
private WeightedRoundRobinLoadBalancerConfig config;
6464
private final SynchronizationContext syncContext;
6565
private final ScheduledExecutorService timeService;
@@ -113,7 +113,7 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
113113
@Override
114114
public RoundRobinPicker createReadyPicker(List<Subchannel> activeList) {
115115
return new WeightedRoundRobinPicker(activeList, config.enableOobLoadReport,
116-
config.errorUtilizationPenalty);
116+
config.errorUtilizationPenalty);
117117
}
118118

119119
private final class UpdateWeightTask implements Runnable {
@@ -123,7 +123,7 @@ public void run() {
123123
((WeightedRoundRobinPicker) currentPicker).updateWeight();
124124
}
125125
weightUpdateTimer = syncContext.schedule(this, config.weightUpdatePeriodNanos,
126-
TimeUnit.NANOSECONDS, timeService);
126+
TimeUnit.NANOSECONDS, timeService);
127127
}
128128
}
129129

@@ -132,7 +132,7 @@ private void afterAcceptAddresses() {
132132
WrrSubchannel weightedSubchannel = (WrrSubchannel) subchannel;
133133
if (config.enableOobLoadReport) {
134134
OrcaOobUtil.setListener(weightedSubchannel,
135-
weightedSubchannel.new OrcaReportListener(config.errorUtilizationPenalty),
135+
weightedSubchannel.new OrcaReportListener(config.errorUtilizationPenalty),
136136
OrcaOobUtil.OrcaReportingConfig.newBuilder()
137137
.setReportInterval(config.oobReportingPeriodNanos, TimeUnit.NANOSECONDS)
138138
.build());
@@ -206,7 +206,7 @@ private double getWeight() {
206206
nonEmptySince = infTime;
207207
return 0;
208208
} else if (now - nonEmptySince < config.blackoutPeriodNanos
209-
&& config.blackoutPeriodNanos > 0) {
209+
&& config.blackoutPeriodNanos > 0) {
210210
return 0;
211211
} else {
212212
return weight;
@@ -230,8 +230,8 @@ public void onLoadReport(MetricReport report) {
230230
double newWeight = 0;
231231
// Prefer application utilization and fallback to CPU utilization if unset.
232232
double utilization =
233-
report.getApplicationUtilization() > 0 ? report.getApplicationUtilization()
234-
: report.getCpuUtilization();
233+
report.getApplicationUtilization() > 0 ? report.getApplicationUtilization()
234+
: report.getCpuUtilization();
235235
if (utilization > 0 && report.getQps() > 0) {
236236
double penalty = 0;
237237
if (report.getEps() > 0 && errorUtilizationPenalty > 0) {
@@ -255,19 +255,19 @@ public void onLoadReport(MetricReport report) {
255255
final class WeightedRoundRobinPicker extends RoundRobinPicker {
256256
private final List<Subchannel> list;
257257
private final Map<Subchannel, OrcaPerRequestReportListener> subchannelToReportListenerMap =
258-
new HashMap<>();
258+
new HashMap<>();
259259
private final boolean enableOobLoadReport;
260260
private final float errorUtilizationPenalty;
261261
private volatile StaticStrideScheduler ssScheduler;
262262

263263
WeightedRoundRobinPicker(List<Subchannel> list, boolean enableOobLoadReport,
264-
float errorUtilizationPenalty) {
264+
float errorUtilizationPenalty) {
265265
checkNotNull(list, "list");
266266
Preconditions.checkArgument(!list.isEmpty(), "empty list");
267267
this.list = list;
268268
for (Subchannel subchannel : list) {
269269
this.subchannelToReportListenerMap.put(subchannel,
270-
((WrrSubchannel) subchannel).new OrcaReportListener(errorUtilizationPenalty));
270+
((WrrSubchannel) subchannel).new OrcaReportListener(errorUtilizationPenalty));
271271
}
272272
this.enableOobLoadReport = enableOobLoadReport;
273273
this.errorUtilizationPenalty = errorUtilizationPenalty;
@@ -279,14 +279,14 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
279279
Subchannel subchannel = list.get(ssScheduler.pick());
280280
if (!enableOobLoadReport) {
281281
return PickResult.withSubchannel(subchannel,
282-
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
283-
subchannelToReportListenerMap.getOrDefault(subchannel,
284-
((WrrSubchannel) subchannel).new OrcaReportListener(errorUtilizationPenalty))));
282+
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
283+
subchannelToReportListenerMap.getOrDefault(subchannel,
284+
((WrrSubchannel) subchannel).new OrcaReportListener(errorUtilizationPenalty))));
285285
} else {
286286
return PickResult.withSubchannel(subchannel);
287287
}
288288
}
289-
289+
290290
private void updateWeight() {
291291
float[] newWeights = new float[list.size()];
292292
for (int i = 0; i < list.size(); i++) {
@@ -302,9 +302,9 @@ private void updateWeight() {
302302
@Override
303303
public String toString() {
304304
return MoreObjects.toStringHelper(WeightedRoundRobinPicker.class)
305-
.add("enableOobLoadReport", enableOobLoadReport)
306-
.add("errorUtilizationPenalty", errorUtilizationPenalty)
307-
.add("list", list).toString();
305+
.add("enableOobLoadReport", enableOobLoadReport)
306+
.add("errorUtilizationPenalty", errorUtilizationPenalty)
307+
.add("list", list).toString();
308308
}
309309

310310
@VisibleForTesting
@@ -323,21 +323,23 @@ public boolean isEquivalentTo(RoundRobinPicker picker) {
323323
}
324324
// the lists cannot contain duplicate subchannels
325325
return enableOobLoadReport == other.enableOobLoadReport
326-
&& Float.compare(errorUtilizationPenalty, other.errorUtilizationPenalty) == 0
327-
&& list.size() == other.list.size() && new HashSet<>(list).containsAll(other.list);
326+
&& Float.compare(errorUtilizationPenalty, other.errorUtilizationPenalty) == 0
327+
&& list.size() == other.list.size() && new HashSet<>(list).containsAll(other.list);
328328
}
329329
}
330330

331331
/*
332332
* Implementation of Static Stride Scheduler, replaces EDFScheduler.
333333
* <p>
334334
* The Static Stride Scheduler works by iterating through the list of subchannel weights
335-
* and using modular arithmetic to evenly distribute picks and skips, favoring
336-
* entries with the highest weight.
335+
* and using modular arithmetic to evenly distribute picks and skips, favoring entries with the
336+
* highest weight. It generates a practically equivalent sequence of picks as the EDFScheduler.
337+
* Albeit needing more bandwidth, the Static Stride Scheduler is more performant than the
338+
* EDFScheduler, as it removes the need for a priority queue (and thus mutex locks).
337339
* <p>
338340
* go/static-stride-scheduler
339341
* <p>
340-
*
342+
*
341343
* <ul>
342344
* <li>nextSequence() - O(1)
343345
* <li>pick() - O(n)
@@ -346,22 +348,20 @@ public boolean isEquivalentTo(RoundRobinPicker picker) {
346348
static final class StaticStrideScheduler {
347349
private final int[] scaledWeights;
348350
private final int sizeDivisor;
349-
private final Random random;
350351
private final AtomicInteger sequence;
351352
private static final int K_MAX_WEIGHT = 0xFFFF;
352-
private static final long UINT32_MAX = 0xFFFF_FFFFL;
353353

354354
StaticStrideScheduler(float[] weights, Random random) {
355+
checkArgument(weights.length >= 1, "Couldn't build scheduler: requires at least one weight");
355356
int numChannels = weights.length;
356-
checkArgument(numChannels >= 1, "Couldn't build scheduler: requires at least one weight");
357357
int numWeightedChannels = 0;
358358
double sumWeight = 0;
359359
float maxWeight = 0;
360360
int meanWeight = 0;
361361
for (float weight : weights) {
362362
if (weight > 0.0001) {
363363
sumWeight += weight;
364-
maxWeight = Math.max(weight, maxWeight);
364+
maxWeight = Math.max(weight, maxWeight);
365365
numWeightedChannels++;
366366
}
367367
}
@@ -385,29 +385,39 @@ static final class StaticStrideScheduler {
385385

386386
this.scaledWeights = scaledWeights;
387387
this.sizeDivisor = numChannels;
388-
this.random = random;
389-
this.sequence = new AtomicInteger((int) (this.random.nextDouble() * UINT32_MAX));
390-
388+
this.sequence = new AtomicInteger(random.nextInt());
389+
391390
}
392391

393-
/** Returns the next sequence number and increases sequence with wraparound. */
392+
/** Returns the next sequence number and atomically increases sequence with wraparound. */
394393
private long nextSequence() {
395394
return Integer.toUnsignedLong(sequence.getAndIncrement());
396395
}
397-
398396

399-
/** Selects index of next backend server. */
397+
public long getSequence() {
398+
return Integer.toUnsignedLong(sequence.get());
399+
}
400+
401+
/*
402+
* Selects index of next backend server.
403+
* <p>
404+
* A 2D array is compactly represented where the row represents the generation and the column
405+
* represents the backend index. The value of an element is a boolean value which indicates
406+
* whether or not a backend should be picked now. An atomically incremented counter keeps track
407+
* of our backend and generation through modular arithmetic within the pick() method.
408+
* An offset is also included to minimize consecutive non-picks of a backend.
409+
*/
400410
int pick() {
401411
while (true) {
402412
long sequence = this.nextSequence();
403-
long backendIndex = sequence % this.sizeDivisor;
413+
int backendIndex = (int) (sequence % this.sizeDivisor);
404414
long generation = sequence / this.sizeDivisor;
405-
long weight = this.scaledWeights[(int) backendIndex];
406-
long offset = backendIndex * K_MAX_WEIGHT / 2;
407-
if ((weight * generation + offset) % K_MAX_WEIGHT < K_MAX_WEIGHT - weight) {
415+
long weight = this.scaledWeights[backendIndex];
416+
long offset = (long) K_MAX_WEIGHT / 2 * backendIndex;
417+
if ((weight * generation + offset) % K_MAX_WEIGHT < K_MAX_WEIGHT - weight) {
408418
continue;
409419
}
410-
return (int) backendIndex;
420+
return backendIndex;
411421
}
412422
}
413423
}

0 commit comments

Comments
 (0)