Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.execution.executor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.airlift.stats.CounterStat;
import io.trino.execution.TaskManagerConfig;
import org.weakref.jmx.Managed;
Expand All @@ -24,9 +23,7 @@
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
Expand All @@ -44,12 +41,12 @@ public class MultilevelSplitQueue
static final long LEVEL_CONTRIBUTION_CAP = SECONDS.toNanos(30);

@GuardedBy("lock")
private final List<PriorityQueue<PrioritizedSplitRunner>> levelWaitingSplits;
private final PriorityQueue<PrioritizedSplitRunner>[] levelWaitingSplits;

private final AtomicLong[] levelScheduledTime = new AtomicLong[LEVEL_THRESHOLD_SECONDS.length];
private final AtomicLong[] levelScheduledTime;

private final AtomicLong[] levelMinPriority;
private final List<CounterStat> selectedLevelCounters;
private final CounterStat[] selectedLevelCounters;

private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
Expand All @@ -64,18 +61,17 @@ public MultilevelSplitQueue(TaskManagerConfig taskManagerConfig)

public MultilevelSplitQueue(double levelTimeMultiplier)
{
this.levelScheduledTime = new AtomicLong[LEVEL_THRESHOLD_SECONDS.length];
this.levelMinPriority = new AtomicLong[LEVEL_THRESHOLD_SECONDS.length];
this.levelWaitingSplits = new ArrayList<>(LEVEL_THRESHOLD_SECONDS.length);
ImmutableList.Builder<CounterStat> counters = ImmutableList.builderWithExpectedSize(LEVEL_THRESHOLD_SECONDS.length);

for (int i = 0; i < LEVEL_THRESHOLD_SECONDS.length; i++) {
levelScheduledTime[i] = new AtomicLong();
levelMinPriority[i] = new AtomicLong(-1);
levelWaitingSplits.add(new PriorityQueue<>());
counters.add(new CounterStat());
}
this.levelWaitingSplits = new PriorityQueue[LEVEL_THRESHOLD_SECONDS.length];
this.selectedLevelCounters = new CounterStat[LEVEL_THRESHOLD_SECONDS.length];

this.selectedLevelCounters = counters.build();
for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
levelScheduledTime[level] = new AtomicLong();
levelMinPriority[level] = new AtomicLong(-1);
levelWaitingSplits[level] = new PriorityQueue<>();
selectedLevelCounters[level] = new CounterStat();
}

this.levelTimeMultiplier = levelTimeMultiplier;
}
Expand Down Expand Up @@ -103,7 +99,7 @@ public void offer(PrioritizedSplitRunner split)
int level = split.getPriority().getLevel();
lock.lock();
try {
if (levelWaitingSplits.get(level).isEmpty()) {
if (levelWaitingSplits[level].isEmpty()) {
// Accesses to levelScheduledTime are not synchronized, so we have a data race
// here - our level time math will be off. However, the staleness is bounded by
// the fact that only running splits that complete during this computation
Expand All @@ -114,7 +110,7 @@ public void offer(PrioritizedSplitRunner split)
levelScheduledTime[level].addAndGet(delta);
}

levelWaitingSplits.get(level).offer(split);
levelWaitingSplits[level].offer(split);
notEmpty.signal();
}
finally {
Expand All @@ -140,7 +136,7 @@ public PrioritizedSplitRunner take()

int selectedLevel = result.getPriority().getLevel();
levelMinPriority[selectedLevel].set(result.getPriority().getLevelPriority());
selectedLevelCounters.get(selectedLevel).update(1);
selectedLevelCounters[selectedLevel].update(1);

return result;
}
Expand All @@ -165,7 +161,7 @@ private PrioritizedSplitRunner pollSplit()
double worstRatio = 1;
int selectedLevel = -1;
for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
if (!levelWaitingSplits.get(level).isEmpty()) {
if (!levelWaitingSplits[level].isEmpty()) {
long levelTime = levelScheduledTime[level].get();
double ratio = levelTime == 0 ? 0 : targetScheduledTime / (1.0 * levelTime);
if (selectedLevel == -1 || ratio > worstRatio) {
Expand All @@ -181,7 +177,7 @@ private PrioritizedSplitRunner pollSplit()
return null;
}

PrioritizedSplitRunner result = levelWaitingSplits.get(selectedLevel).poll();
PrioritizedSplitRunner result = levelWaitingSplits[selectedLevel].poll();
checkState(result != null, "pollSplit cannot return null");

return result;
Expand Down Expand Up @@ -295,9 +291,9 @@ public int size()
public static int computeLevel(long threadUsageNanos)
{
long seconds = NANOSECONDS.toSeconds(threadUsageNanos);
for (int i = 0; i < (LEVEL_THRESHOLD_SECONDS.length - 1); i++) {
if (seconds < LEVEL_THRESHOLD_SECONDS[i + 1]) {
return i;
for (int level = 0; level < (LEVEL_THRESHOLD_SECONDS.length - 1); level++) {
if (seconds < LEVEL_THRESHOLD_SECONDS[level + 1]) {
return level;
}
}

Expand Down Expand Up @@ -344,34 +340,34 @@ public long getLevel4Time()
@Nested
public CounterStat getSelectedCountLevel0()
{
return selectedLevelCounters.get(0);
return selectedLevelCounters[0];
}

@Managed
@Nested
public CounterStat getSelectedCountLevel1()
{
return selectedLevelCounters.get(1);
return selectedLevelCounters[1];
}

@Managed
@Nested
public CounterStat getSelectedCountLevel2()
{
return selectedLevelCounters.get(2);
return selectedLevelCounters[2];
}

@Managed
@Nested
public CounterStat getSelectedCountLevel3()
{
return selectedLevelCounters.get(3);
return selectedLevelCounters[3];
}

@Managed
@Nested
public CounterStat getSelectedCountLevel4()
{
return selectedLevelCounters.get(4);
return selectedLevelCounters[4];
}
}