4444import java .util .HashSet ;
4545import java .util .List ;
4646import java .util .Map ;
47- import java .util .PriorityQueue ;
4847import java .util .Random ;
4948import java .util .concurrent .ScheduledExecutorService ;
5049import java .util .concurrent .TimeUnit ;
50+ import java .util .concurrent .atomic .AtomicInteger ;
5151import java .util .logging .Level ;
5252import java .util .logging .Logger ;
5353
@@ -120,7 +120,7 @@ private final class UpdateWeightTask implements Runnable {
120120 @ Override
121121 public void run () {
122122 if (currentPicker != null && currentPicker instanceof WeightedRoundRobinPicker ) {
123- ((WeightedRoundRobinPicker )currentPicker ).updateWeight ();
123+ ((WeightedRoundRobinPicker ) currentPicker ).updateWeight ();
124124 }
125125 weightUpdateTimer = syncContext .schedule (this , config .weightUpdatePeriodNanos ,
126126 TimeUnit .NANOSECONDS , timeService );
@@ -258,7 +258,7 @@ final class WeightedRoundRobinPicker extends RoundRobinPicker {
258258 new HashMap <>();
259259 private final boolean enableOobLoadReport ;
260260 private final float errorUtilizationPenalty ;
261- private volatile EdfScheduler scheduler ;
261+ private volatile StaticStrideScheduler scheduler ;
262262
263263 WeightedRoundRobinPicker (List <Subchannel > list , boolean enableOobLoadReport ,
264264 float errorUtilizationPenalty ) {
@@ -279,7 +279,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
279279 Subchannel subchannel = list .get (scheduler .pick ());
280280 if (!enableOobLoadReport ) {
281281 return PickResult .withSubchannel (subchannel ,
282- OrcaPerRequestUtil .getInstance ().newOrcaClientStreamTracerFactory (
282+ OrcaPerRequestUtil .getInstance ().newOrcaClientStreamTracerFactory (
283283 subchannelToReportListenerMap .getOrDefault (subchannel ,
284284 ((WrrSubchannel ) subchannel ).new OrcaReportListener (errorUtilizationPenalty ))));
285285 } else {
@@ -288,26 +288,14 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
288288 }
289289
290290 private void updateWeight () {
291- int weightedChannelCount = 0 ;
292- double avgWeight = 0 ;
293- for (Subchannel value : list ) {
294- double newWeight = ((WrrSubchannel ) value ).getWeight ();
295- if (newWeight > 0 ) {
296- avgWeight += newWeight ;
297- weightedChannelCount ++;
298- }
299- }
300- EdfScheduler scheduler = new EdfScheduler (list .size (), random );
301- if (weightedChannelCount >= 1 ) {
302- avgWeight /= 1.0 * weightedChannelCount ;
303- } else {
304- avgWeight = 1 ;
305- }
291+ float [] newWeights = new float [list .size ()];
306292 for (int i = 0 ; i < list .size (); i ++) {
307293 WrrSubchannel subchannel = (WrrSubchannel ) list .get (i );
308294 double newWeight = subchannel .getWeight ();
309- scheduler . add ( i , newWeight > 0 ? newWeight : avgWeight ) ;
295+ newWeights [ i ] = newWeight > 0 ? ( float ) newWeight : 0.0f ;
310296 }
297+
298+ StaticStrideScheduler scheduler = new StaticStrideScheduler (newWeights , random );
311299 this .scheduler = scheduler ;
312300 }
313301
@@ -340,111 +328,125 @@ public boolean isEquivalentTo(RoundRobinPicker picker) {
340328 }
341329 }
342330
343- /**
344- * The earliest deadline first implementation in which each object is
345- * chosen deterministically and periodically with frequency proportional to its weight.
346- *
347- * <p>Specifically, each object added to chooser is given a deadline equal to the multiplicative
348- * inverse of its weight. The place of each object in its deadline is tracked, and each call to
349- * choose returns the object with the least remaining time in its deadline.
350- * (Ties are broken by the order in which the children were added to the chooser.) The deadline
351- * advances by the multiplicative inverse of the object's weight.
352- * For example, if items A and B are added with weights 0.5 and 0.2, successive chooses return:
353- *
354- * <ul>
355- * <li>In the first call, the deadlines are A=2 (1/0.5) and B=5 (1/0.2), so A is returned.
356- * The deadline of A is updated to 4.
357- * <li>Next, the remaining deadlines are A=4 and B=5, so A is returned. The deadline of A (2) is
358- * updated to A=6.
359- * <li>Remaining deadlines are A=6 and B=5, so B is returned. The deadline of B is updated with
360- * with B=10.
361- * <li>Remaining deadlines are A=6 and B=10, so A is returned. The deadline of A is updated with
362- * A=8.
363- * <li>Remaining deadlines are A=8 and B=10, so A is returned. The deadline of A is updated with
364- * A=10.
365- * <li>Remaining deadlines are A=10 and B=10, so A is returned. The deadline of A is updated
366- * with A=12.
367- * <li>Remaining deadlines are A=12 and B=10, so B is returned. The deadline of B is updated
368- * with B=15.
369- * <li>etc.
370- * </ul>
371- *
372- * <p>In short: the entry with the highest weight is preferred.
331+ /*
332+ * The Static Stride Scheduler is an implementation of an earliest deadline first (EDF) scheduler
333+ * in which each object's deadline is the multiplicative inverse of the object's weight.
334+ * <p>
335+ * The way in which this is implemented is through a static stride scheduler.
336+ * The Static Stride Scheduler works by iterating through the list of subchannel weights
337+ * and using modular arithmetic to proportionally distribute picks, favoring entries
338+ * with higher weights. It is based on the observation that the intended sequence generated
339+ * from an EDF scheduler is a periodic one that can be achieved through modular arithmetic.
340+ * The Static Stride Scheduler is more performant than other implementations of the EDF
341+ * Scheduler, as it removes the need for a priority queue (and thus mutex locks).
342+ * <p>
343+ * go/static-stride-scheduler
344+ * <p>
373345 *
374346 * <ul>
375- * <li>add() - O(lg n)
376- * <li>pick() - O(lg n)
377- * </ul>
378- *
347+ * <li>nextSequence() - O(1)
348+ * <li>pick() - O(n)
379349 */
380350 @ VisibleForTesting
381- static final class EdfScheduler {
382- private final PriorityQueue <ObjectState > prioQueue ;
383-
384- /**
385- * Weights below this value will be upped to this minimum weight.
386- */
387- private static final double MINIMUM_WEIGHT = 0.0001 ;
388-
389- private final Object lock = new Object ();
351+ static final class StaticStrideScheduler {
352+ private final short [] scaledWeights ;
353+ private final int sizeDivisor ;
354+ private final AtomicInteger sequence ;
355+ private static final int K_MAX_WEIGHT = 0xFFFF ;
356+
357+ StaticStrideScheduler (float [] weights , Random random ) {
358+ checkArgument (weights .length >= 1 , "Couldn't build scheduler: requires at least one weight" );
359+ int numChannels = weights .length ;
360+ int numWeightedChannels = 0 ;
361+ double sumWeight = 0 ;
362+ float maxWeight = 0 ;
363+ short meanWeight = 0 ;
364+ for (float weight : weights ) {
365+ if (weight > 0 ) {
366+ sumWeight += weight ;
367+ maxWeight = Math .max (weight , maxWeight );
368+ numWeightedChannels ++;
369+ }
370+ }
390371
391- private final Random random ;
372+ double scalingFactor = K_MAX_WEIGHT / maxWeight ;
373+ if (numWeightedChannels > 0 ) {
374+ meanWeight = (short ) Math .round (scalingFactor * sumWeight / numWeightedChannels );
375+ } else {
376+ meanWeight = 1 ;
377+ }
392378
393- /**
394- * Use the item's deadline as the order in the priority queue. If the deadlines are the same,
395- * use the index. Index should be unique.
396- */
397- EdfScheduler (int initialCapacity , Random random ) {
398- this .prioQueue = new PriorityQueue <ObjectState >(initialCapacity , (o1 , o2 ) -> {
399- if (o1 .deadline == o2 .deadline ) {
400- return Integer .compare (o1 .index , o2 .index );
379+ // scales weights s.t. max(weights) == K_MAX_WEIGHT, meanWeight is scaled accordingly
380+ short [] scaledWeights = new short [numChannels ];
381+ for (int i = 0 ; i < numChannels ; i ++) {
382+ if (weights [i ] <= 0 ) {
383+ scaledWeights [i ] = meanWeight ;
401384 } else {
402- return Double . compare ( o1 . deadline , o2 . deadline );
385+ scaledWeights [ i ] = ( short ) Math . round ( weights [ i ] * scalingFactor );
403386 }
404- });
405- this .random = random ;
387+ }
388+
389+ this .scaledWeights = scaledWeights ;
390+ this .sizeDivisor = numChannels ;
391+ this .sequence = new AtomicInteger (random .nextInt ());
392+
406393 }
407394
408- /**
409- * Adds the item in the scheduler. This is not thread safe.
410- *
411- * @param index The field {@link ObjectState#index} to be added
412- * @param weight positive weight for the added object
413- */
414- void add (int index , double weight ) {
415- checkArgument (weight > 0.0 , "Weights need to be positive." );
416- ObjectState state = new ObjectState (Math .max (weight , MINIMUM_WEIGHT ), index );
417- // Randomize the initial deadline.
418- state .deadline = random .nextDouble () * (1 / state .weight );
419- prioQueue .add (state );
395+ /** Returns the next sequence number and atomically increases sequence with wraparound. */
396+ private long nextSequence () {
397+ return Integer .toUnsignedLong (sequence .getAndIncrement ());
420398 }
421399
422- /**
423- * Picks the next WRR object.
400+ @ VisibleForTesting
401+ long getSequence () {
402+ return Integer .toUnsignedLong (sequence .get ());
403+ }
404+
405+ /*
406+ * Selects index of next backend server.
407+ * <p>
408+ * A 2D array is compactly represented as a function of W(backend), where the row
409+ * represents the generation and the column represents the backend index:
410+ * X(backend,generation) | generation ∈ [0,kMaxWeight).
411+ * Each element in the conceptual array is a boolean indicating whether the backend at
412+ * this index should be picked now. If false, the counter is incremented again,
413+ * and the new element is checked. An atomically incremented counter keeps track of our
414+ * backend and generation through modular arithmetic within the pick() method.
415+ * <p>
416+ * Modular arithmetic allows us to evenly distribute picks and skips between
417+ * generations based on W(backend).
418+ * X(backend,generation) = (W(backend) * generation) % kMaxWeight >= kMaxWeight - W(backend)
419+ * If we have the same three backends with weights:
420+ * W(backend) = {2,3,6} scaled to max(W(backend)) = 6, then X(backend,generation) is:
421+ * <p>
422+ * B0 B1 B2
423+ * T T T
424+ * F F T
425+ * F T T
426+ * T F T
427+ * F T T
428+ * F F T
429+ * The sequence of picked backend indices is given by
430+ * walking across and down: {0,1,2,2,1,2,0,2,1,2,2}.
431+ * <p>
432+ * To reduce the variance and spread the wasted work among different picks,
433+ * an offset that varies per backend index is also included to the calculation.
424434 */
425435 int pick () {
426- synchronized (lock ) {
427- ObjectState minObject = prioQueue .remove ();
428- minObject .deadline += 1.0 / minObject .weight ;
429- prioQueue .add (minObject );
430- return minObject .index ;
436+ while (true ) {
437+ long sequence = this .nextSequence ();
438+ int backendIndex = (int ) (sequence % this .sizeDivisor );
439+ long generation = sequence / this .sizeDivisor ;
440+ int weight = Short .toUnsignedInt (this .scaledWeights [backendIndex ]);
441+ long offset = (long ) K_MAX_WEIGHT / 2 * backendIndex ;
442+ if ((weight * generation + offset ) % K_MAX_WEIGHT < K_MAX_WEIGHT - weight ) {
443+ continue ;
444+ }
445+ return backendIndex ;
431446 }
432447 }
433448 }
434449
435- /** Holds the state of the object. */
436- @ VisibleForTesting
437- static class ObjectState {
438- private final double weight ;
439- private final int index ;
440- private volatile double deadline ;
441-
442- ObjectState (double weight , int index ) {
443- this .weight = weight ;
444- this .index = index ;
445- }
446- }
447-
448450 static final class WeightedRoundRobinLoadBalancerConfig {
449451 final long blackoutPeriodNanos ;
450452 final long weightExpirationPeriodNanos ;
0 commit comments