From 36990031f0067209798fca4c64b39e99de019b1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Wed, 26 Oct 2022 22:07:16 +0200 Subject: [PATCH 1/2] Add missing synchronization --- .../io/trino/execution/scheduler/EventDrivenTaskSource.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSource.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSource.java index d6ffa62e943f..f782a0e06dbb 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSource.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSource.java @@ -273,7 +273,9 @@ private int getSplitPartition(Split split) private void fail(Throwable failure) { - callback.failed(failure); + synchronized (assignerLock) { + callback.failed(failure); + } close(); } From 3beb2c81d6e2c47700f8a1f3422705e540b8398e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Thu, 27 Oct 2022 11:45:48 +0200 Subject: [PATCH 2/2] Fix propagation of noMoreSplits flag to running tasks Without the fix, the update with noMoreSplits=true was not called on an open partition in the case when source fragments did not generate any splits. As a result of that, if tasks consuming this partition were scheduled before the partition was sealed they never learned that there will not be any more splits for the partition, and never finished. As a result, the query hung. The common case was that query was completed successfully as usually tasks only started after the partition was already sealed which implied noMoreSplits=true, even if not set explicitly. --- .../ArbitraryDistributionSplitAssigner.java | 15 ++++++++++++++ ...estArbitraryDistributionSplitAssigner.java | 20 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java index 4dfed047bdf3..83469e8472cd 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java @@ -143,6 +143,13 @@ private AssignmentResult assignReplicatedSplits(PlanNodeId planNodeId, List taskDescriptors = callback.getTaskDescriptors(); assertThat(taskDescriptors).hasSize(1); @@ -82,6 +84,7 @@ public void testEmpty() splitAssigner = createSplitAssigner(ImmutableSet.of(), ImmutableSet.of(REPLICATED_1), 100, false); callback = new TestingTaskSourceCallback(); splitAssigner.assign(REPLICATED_1, ImmutableListMultimap.of(), true).update(callback); + assertTrue(callback.isNoMoreSplits(0, REPLICATED_1)); splitAssigner.finish().update(callback); taskDescriptors = callback.getTaskDescriptors(); assertThat(taskDescriptors).hasSize(1); @@ -91,7 +94,11 @@ public void testEmpty() splitAssigner = createSplitAssigner(ImmutableSet.of(PARTITIONED_1), ImmutableSet.of(REPLICATED_1), 100, true); callback = new TestingTaskSourceCallback(); splitAssigner.assign(REPLICATED_1, ImmutableListMultimap.of(), true).update(callback); + assertFalse(callback.isNoMoreSplits(0, PARTITIONED_1)); + assertFalse(callback.isNoMoreSplits(0, REPLICATED_1)); splitAssigner.assign(PARTITIONED_1, ImmutableListMultimap.of(), true).update(callback); + assertTrue(callback.isNoMoreSplits(0, PARTITIONED_1)); + assertTrue(callback.isNoMoreSplits(0, REPLICATED_1)); splitAssigner.finish().update(callback); taskDescriptors = callback.getTaskDescriptors(); assertThat(taskDescriptors).hasSize(1); @@ -100,7 +107,11 @@ public void testEmpty() splitAssigner = createSplitAssigner(ImmutableSet.of(PARTITIONED_1), ImmutableSet.of(REPLICATED_1), 100, true); callback = new TestingTaskSourceCallback(); splitAssigner.assign(PARTITIONED_1, ImmutableListMultimap.of(), true).update(callback); + assertFalse(callback.isNoMoreSplits(0, PARTITIONED_1)); + assertFalse(callback.isNoMoreSplits(0, REPLICATED_1)); splitAssigner.assign(REPLICATED_1, ImmutableListMultimap.of(), true).update(callback); + assertTrue(callback.isNoMoreSplits(0, PARTITIONED_1)); + assertTrue(callback.isNoMoreSplits(0, REPLICATED_1)); splitAssigner.finish().update(callback); taskDescriptors = callback.getTaskDescriptors(); assertThat(taskDescriptors).hasSize(1); @@ -111,7 +122,16 @@ public void testEmpty() splitAssigner.assign(REPLICATED_1, ImmutableListMultimap.of(), true).update(callback); splitAssigner.assign(PARTITIONED_1, ImmutableListMultimap.of(), true).update(callback); splitAssigner.assign(PARTITIONED_2, ImmutableListMultimap.of(), true).update(callback); + assertFalse(callback.isNoMoreSplits(0, PARTITIONED_1)); + assertFalse(callback.isNoMoreSplits(0, REPLICATED_1)); + assertFalse(callback.isNoMoreSplits(0, PARTITIONED_2)); + assertFalse(callback.isNoMoreSplits(0, REPLICATED_2)); splitAssigner.assign(REPLICATED_2, ImmutableListMultimap.of(), true).update(callback); + assertTrue(callback.isNoMoreSplits(0, PARTITIONED_1)); + assertTrue(callback.isNoMoreSplits(0, REPLICATED_1)); + assertTrue(callback.isNoMoreSplits(0, PARTITIONED_2)); + assertTrue(callback.isNoMoreSplits(0, REPLICATED_2)); + splitAssigner.finish().update(callback); taskDescriptors = callback.getTaskDescriptors(); assertThat(taskDescriptors).hasSize(1);