Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airlift.concurrent.ThreadPoolExecutorMBean;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: commit message is incorrect

import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
import io.airlift.stats.DistributionStat;
import io.airlift.stats.TimeDistribution;
import io.airlift.stats.TimeStat;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -153,6 +154,12 @@ public class TaskExecutor
private final TimeStat blockedQuantaWallTime = new TimeStat(MICROSECONDS);
private final TimeStat unblockedQuantaWallTime = new TimeStat(MICROSECONDS);

private final DistributionStat leafSplitsSize = new DistributionStat();
@GuardedBy("this")
private long lastLeafSplitsSizeRecordTime;
@GuardedBy("this")
private long lastLeafSplitsSize;

private volatile boolean closed;

@Inject
Expand Down Expand Up @@ -210,6 +217,7 @@ public TaskExecutor(
this.maximumNumberOfDriversPerTask = maximumNumberOfDriversPerTask;
this.waitingSplits = requireNonNull(splitQueue, "splitQueue is null");
this.tasks = new LinkedList<>();
this.lastLeafSplitsSizeRecordTime = ticker.read();
}

@PostConstruct
Expand Down Expand Up @@ -279,6 +287,7 @@ public void removeTask(TaskHandle taskHandle)

// replace blocked splits that were terminated
addNewEntrants();
recordLeafSplitsSize();
}

private void doRemoveTask(TaskHandle taskHandle)
Expand All @@ -293,6 +302,7 @@ private void doRemoveTask(TaskHandle taskHandle)
intermediateSplits.removeAll(splits);
blockedSplits.keySet().removeAll(splits);
waitingSplits.removeAll(splits);
recordLeafSplitsSize();
}

// call destroy outside of synchronized block as it is expensive and doesn't need a lock on the task executor
Expand Down Expand Up @@ -343,6 +353,7 @@ else if (intermediate) {

finishedFutures.add(prioritizedSplitRunner.getFinishedFuture());
}
recordLeafSplitsSize();
}
for (PrioritizedSplitRunner split : splitsToDestroy) {
split.destroy();
Expand Down Expand Up @@ -378,6 +389,7 @@ private void splitFinished(PrioritizedSplitRunner split)
scheduleTaskIfNecessary(taskHandle);

addNewEntrants();
recordLeafSplitsSize();
}
// call destroy outside of synchronized block as it is expensive and doesn't need a lock on the task executor
split.destroy();
Expand All @@ -400,6 +412,7 @@ private synchronized void scheduleTaskIfNecessary(TaskHandle taskHandle)
startSplit(split);
splitQueuedTime.add(Duration.nanosSince(split.getCreatedNanos()));
}
recordLeafSplitsSize();
}

private synchronized void addNewEntrants()
Expand Down Expand Up @@ -459,6 +472,17 @@ private synchronized PrioritizedSplitRunner pollNextSplitWorker()
return null;
}

private synchronized void recordLeafSplitsSize()
{
long now = ticker.read();
long timeDifference = now - this.lastLeafSplitsSizeRecordTime;
if (timeDifference > 0) {
this.leafSplitsSize.add(lastLeafSplitsSize, timeDifference);
this.lastLeafSplitsSizeRecordTime = now;
this.lastLeafSplitsSize = allSplits.size() - intermediateSplits.size();
}
}

private class TaskRunner
implements Runnable
{
Expand Down Expand Up @@ -577,6 +601,13 @@ public int getWaitingSplits()
return waitingSplits.size();
}

@Managed
@Nested
public DistributionStat getLeafSplitsSize()
{
return leafSplitsSize;
}

@Managed
public int getRunningSplits()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.OptionalInt;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -498,6 +499,30 @@ public void testMinDriversPerTaskWhenTargetConcurrencyIncreases()
}
}

@Test
public void testLeafSplitsSize()
{
MultilevelSplitQueue splitQueue = new MultilevelSplitQueue(2);
TestingTicker ticker = new TestingTicker();
TaskExecutor taskExecutor = new TaskExecutor(4, 1, 2, 2, splitQueue, ticker);

TaskHandle testTaskHandle = taskExecutor.addTask(new TaskId(new StageId("test", 0), 0, 0), () -> 0, 10, new Duration(1, MILLISECONDS), OptionalInt.empty());
TestingJob driver1 = new TestingJob(ticker, new Phaser(), new Phaser(), new Phaser(), 1, 500);
TestingJob driver2 = new TestingJob(ticker, new Phaser(), new Phaser(), new Phaser(), 1, 1000 / 500);

ticker.increment(1, TimeUnit.SECONDS);
taskExecutor.enqueueSplits(testTaskHandle, false, ImmutableList.of(driver1, driver2));
assertEquals(taskExecutor.getLeafSplitsSize().getAllTime().getMax(), 0.0);

ticker.increment(1, TimeUnit.SECONDS);
taskExecutor.enqueueSplits(testTaskHandle, false, ImmutableList.of(driver1));
assertEquals(taskExecutor.getLeafSplitsSize().getAllTime().getMax(), 2.0);

ticker.increment(1, TimeUnit.SECONDS);
taskExecutor.enqueueSplits(testTaskHandle, true, ImmutableList.of(driver1));
assertEquals(taskExecutor.getLeafSplitsSize().getAllTime().getMax(), 2.0);
}

private void assertSplitStates(int endIndex, TestingJob[] splits)
{
// assert that splits up to and including endIndex are all started
Expand Down