Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -979,8 +979,9 @@ private StateChangeListener<TaskStatus> createExchangeSinkInstanceHandleUpdateRe

private void loadMoreTaskDescriptorsIfNecessary()
{
if (schedulingQueue.getNonSpeculativeTaskCount() + nodeAcquisitions.size() < maxTasksWaitingForExecution) {
for (StageExecution stageExecution : stageExecutions.values()) {
boolean schedulingQueueIsFull = schedulingQueue.getNonSpeculativeTaskCount() >= maxTasksWaitingForExecution;
for (StageExecution stageExecution : stageExecutions.values()) {
if (!schedulingQueueIsFull || stageExecution.hasOpenTaskRunning()) {
stageExecution.loadMoreTaskDescriptors().ifPresent(future -> Futures.addCallback(future, new FutureCallback<>()
{
@Override
Expand Down Expand Up @@ -1147,6 +1148,7 @@ private static class StageExecution
private final Int2ObjectMap<StagePartition> partitions = new Int2ObjectOpenHashMap<>();
private boolean noMorePartitions;

private final IntSet runningPartitions = new IntOpenHashSet();
private final IntSet remainingPartitions = new IntOpenHashSet();

private ExchangeSourceOutputSelector.Builder sinkOutputSelectorBuilder;
Expand Down Expand Up @@ -1348,10 +1350,33 @@ public Optional<RemoteTask> schedule(int partitionId, ExchangeSinkInstanceHandle
splits,
noMoreSplits,
Optional.of(partition.getMemoryRequirements().getRequiredMemory()));
task.ifPresent(remoteTask -> partition.addTask(remoteTask, outputBuffers));
task.ifPresent(remoteTask -> {
partition.addTask(remoteTask, outputBuffers);
runningPartitions.add(partitionId);
});
return task;
}

public boolean hasOpenTaskRunning()
{
if (getState().isDone()) {
return false;
}

if (runningPartitions.isEmpty()) {
return false;
}

for (int partitionId : runningPartitions) {
StagePartition partition = getStagePartition(partitionId);
if (!partition.isSealed()) {
return true;
}
}

return false;
}

public Optional<ListenableFuture<AssignmentResult>> loadMoreTaskDescriptors()
{
if (getState().isDone() || taskDescriptorLoadingActive) {
Expand Down Expand Up @@ -1428,6 +1453,10 @@ public void taskFinished(TaskId taskId, TaskStatus taskStatus)
exchange.sinkFinished(partition.getExchangeSinkHandle(), taskId.getAttemptId());
SpoolingOutputStats.Snapshot outputStats = partition.taskFinished(taskId);

if (!partition.isRunning()) {
runningPartitions.remove(partitionId);
}

if (!remainingPartitions.remove(partitionId)) {
// a different task for the same partition finished before
return;
Expand Down Expand Up @@ -1476,6 +1505,10 @@ public List<PrioritizedScheduledTask> taskFailed(TaskId taskId, ExecutionFailure
StagePartition partition = getStagePartition(partitionId);
partition.taskFailed(taskId);

if (!partition.isRunning()) {
runningPartitions.remove(partitionId);
}

RuntimeException failure = failureInfo.toException();
ErrorCode errorCode = failureInfo.getErrorCode();
partitionMemoryEstimator.registerPartitionFinished(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.plugin.hive.HiveQueryRunner;
import io.trino.testing.FaultTolerantExecutionConnectorTestHelper;
import io.trino.testing.QueryRunner;
import org.testng.annotations.Test;

import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties;
import static io.trino.testing.TestingNames.randomNameSuffix;
Expand Down Expand Up @@ -56,4 +57,35 @@ protected Session getSession()
.setCatalogSessionProperty(session.getCatalog().orElseThrow(), "non_transactional_optimize_enabled", "true")
.build();
}

@Test(timeOut = 120_000)
public void testPotentialDeadlocks()
{
// create a highly granular table to ensure the number of splits is high
assertUpdate("""
CREATE TABLE lineitem_bucketed_partitioned
WITH (format = 'TEXTFILE', partitioned_by = ARRAY['p'], bucketed_by=array['b'], bucket_count=3)
AS
SELECT *, partkey b, orderkey % 100 p
FROM tpch.tiny.lineitem
""",
60175);
// execute a query that schedules many concurrent stages in parallel to detect potential scheduler deadlocks
try {
assertQuery(
"""
SELECT
(SELECT count(orderkey) FROM lineitem_bucketed_partitioned) +
(SELECT count(linenumber) FROM lineitem_bucketed_partitioned) +
(SELECT count(quantity) FROM lineitem_bucketed_partitioned) +
(SELECT count(extendedprice) FROM lineitem_bucketed_partitioned) +
(SELECT count(DISTINCT partkey) FROM lineitem_bucketed_partitioned) +
(SELECT count(DISTINCT suppkey) FROM lineitem_bucketed_partitioned) c
""",
"SELECT 242800");
}
finally {
assertUpdate("DROP TABLE lineitem_bucketed_partitioned");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public static Map<String, String> getExtraProperties()
.put("query.executor-pool-size", "10")
// enable exchange compression to follow production deployment recommendations
.put("exchange.compression-enabled", "true")
.put("max-tasks-waiting-for-execution-per-query", "2")
.put("max-tasks-waiting-for-node-per-stage", "2")
.put("query.schedule-split-batch-size", "2")
.buildOrThrow();
}
}