diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java index 4dfed047bdf3..83469e8472cd 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java @@ -143,6 +143,13 @@ private AssignmentResult assignReplicatedSplits(PlanNodeId planNodeId, List taskDescriptors = callback.getTaskDescriptors(); assertThat(taskDescriptors).hasSize(1); @@ -82,6 +84,7 @@ public void testEmpty() splitAssigner = createSplitAssigner(ImmutableSet.of(), ImmutableSet.of(REPLICATED_1), 100, false); callback = new TestingTaskSourceCallback(); splitAssigner.assign(REPLICATED_1, ImmutableListMultimap.of(), true).update(callback); + assertTrue(callback.isNoMoreSplits(0, REPLICATED_1)); splitAssigner.finish().update(callback); taskDescriptors = callback.getTaskDescriptors(); assertThat(taskDescriptors).hasSize(1); @@ -91,7 +94,11 @@ public void testEmpty() splitAssigner = createSplitAssigner(ImmutableSet.of(PARTITIONED_1), ImmutableSet.of(REPLICATED_1), 100, true); callback = new TestingTaskSourceCallback(); splitAssigner.assign(REPLICATED_1, ImmutableListMultimap.of(), true).update(callback); + assertFalse(callback.isNoMoreSplits(0, PARTITIONED_1)); + assertFalse(callback.isNoMoreSplits(0, REPLICATED_1)); splitAssigner.assign(PARTITIONED_1, ImmutableListMultimap.of(), true).update(callback); + assertTrue(callback.isNoMoreSplits(0, PARTITIONED_1)); + assertTrue(callback.isNoMoreSplits(0, REPLICATED_1)); splitAssigner.finish().update(callback); taskDescriptors = callback.getTaskDescriptors(); assertThat(taskDescriptors).hasSize(1); @@ -100,7 +107,11 @@ public void testEmpty() splitAssigner = createSplitAssigner(ImmutableSet.of(PARTITIONED_1), ImmutableSet.of(REPLICATED_1), 100, true); callback = new TestingTaskSourceCallback(); splitAssigner.assign(PARTITIONED_1, ImmutableListMultimap.of(), true).update(callback); + assertFalse(callback.isNoMoreSplits(0, PARTITIONED_1)); + assertFalse(callback.isNoMoreSplits(0, REPLICATED_1)); splitAssigner.assign(REPLICATED_1, ImmutableListMultimap.of(), true).update(callback); + assertTrue(callback.isNoMoreSplits(0, PARTITIONED_1)); + assertTrue(callback.isNoMoreSplits(0, REPLICATED_1)); splitAssigner.finish().update(callback); taskDescriptors = callback.getTaskDescriptors(); assertThat(taskDescriptors).hasSize(1); @@ -111,7 +122,16 @@ public void testEmpty() splitAssigner.assign(REPLICATED_1, ImmutableListMultimap.of(), true).update(callback); splitAssigner.assign(PARTITIONED_1, ImmutableListMultimap.of(), true).update(callback); splitAssigner.assign(PARTITIONED_2, ImmutableListMultimap.of(), true).update(callback); + assertFalse(callback.isNoMoreSplits(0, PARTITIONED_1)); + assertFalse(callback.isNoMoreSplits(0, REPLICATED_1)); + assertFalse(callback.isNoMoreSplits(0, PARTITIONED_2)); + assertFalse(callback.isNoMoreSplits(0, REPLICATED_2)); splitAssigner.assign(REPLICATED_2, ImmutableListMultimap.of(), true).update(callback); + assertTrue(callback.isNoMoreSplits(0, PARTITIONED_1)); + assertTrue(callback.isNoMoreSplits(0, REPLICATED_1)); + assertTrue(callback.isNoMoreSplits(0, PARTITIONED_2)); + assertTrue(callback.isNoMoreSplits(0, REPLICATED_2)); + splitAssigner.finish().update(callback); taskDescriptors = callback.getTaskDescriptors(); assertThat(taskDescriptors).hasSize(1);