From de3b7c2ea12b187ad1ce0bdcdc4b35713bf285bb Mon Sep 17 00:00:00 2001 From: tangjiangling Date: Thu, 15 Sep 2022 19:01:54 +0800 Subject: [PATCH 1/2] Use arrays to initialize the member variables of MultilevelSplitQueue Previously in `MultilevelSplitQueue`, `levelScheduledTime` and `levelMinPriority` were initialized using arrays, while `levelWaitingSplits` and `selectedLevelCounters` were initialized using `List`, this commit unifies the style by making all these variables initialized using array style. --- .../executor/MultilevelSplitQueue.java | 40 +++++++++---------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/executor/MultilevelSplitQueue.java b/core/trino-main/src/main/java/io/trino/execution/executor/MultilevelSplitQueue.java index 8163ebf93850..1d8232cf5e4d 100644 --- a/core/trino-main/src/main/java/io/trino/execution/executor/MultilevelSplitQueue.java +++ b/core/trino-main/src/main/java/io/trino/execution/executor/MultilevelSplitQueue.java @@ -14,7 +14,6 @@ package io.trino.execution.executor; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import io.airlift.stats.CounterStat; import io.trino.execution.TaskManagerConfig; import org.weakref.jmx.Managed; @@ -24,9 +23,7 @@ import javax.annotation.concurrent.ThreadSafe; import javax.inject.Inject; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -44,12 +41,12 @@ public class MultilevelSplitQueue static final long LEVEL_CONTRIBUTION_CAP = SECONDS.toNanos(30); @GuardedBy("lock") - private final List> levelWaitingSplits; + private final PriorityQueue[] levelWaitingSplits; - private final AtomicLong[] levelScheduledTime = new AtomicLong[LEVEL_THRESHOLD_SECONDS.length]; + private final AtomicLong[] levelScheduledTime; private final AtomicLong[] levelMinPriority; - private final List selectedLevelCounters; + private final CounterStat[] selectedLevelCounters; private final ReentrantLock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); @@ -64,19 +61,18 @@ public MultilevelSplitQueue(TaskManagerConfig taskManagerConfig) public MultilevelSplitQueue(double levelTimeMultiplier) { + this.levelScheduledTime = new AtomicLong[LEVEL_THRESHOLD_SECONDS.length]; this.levelMinPriority = new AtomicLong[LEVEL_THRESHOLD_SECONDS.length]; - this.levelWaitingSplits = new ArrayList<>(LEVEL_THRESHOLD_SECONDS.length); - ImmutableList.Builder counters = ImmutableList.builderWithExpectedSize(LEVEL_THRESHOLD_SECONDS.length); + this.levelWaitingSplits = new PriorityQueue[LEVEL_THRESHOLD_SECONDS.length]; + this.selectedLevelCounters = new CounterStat[LEVEL_THRESHOLD_SECONDS.length]; for (int i = 0; i < LEVEL_THRESHOLD_SECONDS.length; i++) { levelScheduledTime[i] = new AtomicLong(); levelMinPriority[i] = new AtomicLong(-1); - levelWaitingSplits.add(new PriorityQueue<>()); - counters.add(new CounterStat()); + levelWaitingSplits[i] = new PriorityQueue<>(); + selectedLevelCounters[i] = new CounterStat(); } - this.selectedLevelCounters = counters.build(); - this.levelTimeMultiplier = levelTimeMultiplier; } @@ -103,7 +99,7 @@ public void offer(PrioritizedSplitRunner split) int level = split.getPriority().getLevel(); lock.lock(); try { - if (levelWaitingSplits.get(level).isEmpty()) { + if (levelWaitingSplits[level].isEmpty()) { // Accesses to levelScheduledTime are not synchronized, so we have a data race // here - our level time math will be off. However, the staleness is bounded by // the fact that only running splits that complete during this computation @@ -114,7 +110,7 @@ public void offer(PrioritizedSplitRunner split) levelScheduledTime[level].addAndGet(delta); } - levelWaitingSplits.get(level).offer(split); + levelWaitingSplits[level].offer(split); notEmpty.signal(); } finally { @@ -140,7 +136,7 @@ public PrioritizedSplitRunner take() int selectedLevel = result.getPriority().getLevel(); levelMinPriority[selectedLevel].set(result.getPriority().getLevelPriority()); - selectedLevelCounters.get(selectedLevel).update(1); + selectedLevelCounters[selectedLevel].update(1); return result; } @@ -165,7 +161,7 @@ private PrioritizedSplitRunner pollSplit() double worstRatio = 1; int selectedLevel = -1; for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) { - if (!levelWaitingSplits.get(level).isEmpty()) { + if (!levelWaitingSplits[level].isEmpty()) { long levelTime = levelScheduledTime[level].get(); double ratio = levelTime == 0 ? 0 : targetScheduledTime / (1.0 * levelTime); if (selectedLevel == -1 || ratio > worstRatio) { @@ -181,7 +177,7 @@ private PrioritizedSplitRunner pollSplit() return null; } - PrioritizedSplitRunner result = levelWaitingSplits.get(selectedLevel).poll(); + PrioritizedSplitRunner result = levelWaitingSplits[selectedLevel].poll(); checkState(result != null, "pollSplit cannot return null"); return result; @@ -344,34 +340,34 @@ public long getLevel4Time() @Nested public CounterStat getSelectedCountLevel0() { - return selectedLevelCounters.get(0); + return selectedLevelCounters[0]; } @Managed @Nested public CounterStat getSelectedCountLevel1() { - return selectedLevelCounters.get(1); + return selectedLevelCounters[1]; } @Managed @Nested public CounterStat getSelectedCountLevel2() { - return selectedLevelCounters.get(2); + return selectedLevelCounters[2]; } @Managed @Nested public CounterStat getSelectedCountLevel3() { - return selectedLevelCounters.get(3); + return selectedLevelCounters[3]; } @Managed @Nested public CounterStat getSelectedCountLevel4() { - return selectedLevelCounters.get(4); + return selectedLevelCounters[4]; } } From 52e148d9c444e25614e73ebd3f7a5438ee697fc6 Mon Sep 17 00:00:00 2001 From: tangjiangling Date: Thu, 15 Sep 2022 19:29:52 +0800 Subject: [PATCH 2/2] Rename variables for readability The indexes of these arrays are all "level" related, so we can rename them. --- .../execution/executor/MultilevelSplitQueue.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/executor/MultilevelSplitQueue.java b/core/trino-main/src/main/java/io/trino/execution/executor/MultilevelSplitQueue.java index 1d8232cf5e4d..a012a3d7085a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/executor/MultilevelSplitQueue.java +++ b/core/trino-main/src/main/java/io/trino/execution/executor/MultilevelSplitQueue.java @@ -66,11 +66,11 @@ public MultilevelSplitQueue(double levelTimeMultiplier) this.levelWaitingSplits = new PriorityQueue[LEVEL_THRESHOLD_SECONDS.length]; this.selectedLevelCounters = new CounterStat[LEVEL_THRESHOLD_SECONDS.length]; - for (int i = 0; i < LEVEL_THRESHOLD_SECONDS.length; i++) { - levelScheduledTime[i] = new AtomicLong(); - levelMinPriority[i] = new AtomicLong(-1); - levelWaitingSplits[i] = new PriorityQueue<>(); - selectedLevelCounters[i] = new CounterStat(); + for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) { + levelScheduledTime[level] = new AtomicLong(); + levelMinPriority[level] = new AtomicLong(-1); + levelWaitingSplits[level] = new PriorityQueue<>(); + selectedLevelCounters[level] = new CounterStat(); } this.levelTimeMultiplier = levelTimeMultiplier; @@ -291,9 +291,9 @@ public int size() public static int computeLevel(long threadUsageNanos) { long seconds = NANOSECONDS.toSeconds(threadUsageNanos); - for (int i = 0; i < (LEVEL_THRESHOLD_SECONDS.length - 1); i++) { - if (seconds < LEVEL_THRESHOLD_SECONDS[i + 1]) { - return i; + for (int level = 0; level < (LEVEL_THRESHOLD_SECONDS.length - 1); level++) { + if (seconds < LEVEL_THRESHOLD_SECONDS[level + 1]) { + return level; } }