From 4cf96a99d0aca6888285c4a6670ed6a68a724d49 Mon Sep 17 00:00:00 2001 From: Zebing Lin Date: Sun, 21 May 2023 10:37:48 -0400 Subject: [PATCH 1/2] Rename outputPartition to sourcePartition in HashDistributionSplitAssigner --- .../HashDistributionSplitAssigner.java | 44 +++++------ .../TestHashDistributionSplitAssigner.java | 78 +++++++++---------- 2 files changed, 60 insertions(+), 62 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java index 254c25162124..f823feefd32c 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java @@ -56,7 +56,7 @@ class HashDistributionSplitAssigner private final Set replicatedSources; private final Set allSources; private final FaultTolerantPartitioningScheme sourcePartitioningScheme; - private final Map outputPartitionToTaskPartition; + private final Map sourcePartitionToTaskPartition; private final Set createdTaskPartitions = new HashSet<>(); private final Set completedSources = new HashSet<>(); @@ -69,15 +69,13 @@ public static HashDistributionSplitAssigner create( Set partitionedSources, Set replicatedSources, FaultTolerantPartitioningScheme sourcePartitioningScheme, - Map outputDataSizeEstimates, + Map sourceDataSizeEstimates, PlanFragment fragment, long targetPartitionSizeInBytes, int targetMaxTaskCount) { if (fragment.getPartitioning().equals(SCALED_WRITER_HASH_DISTRIBUTION)) { - verify( - - fragment.getPartitionedSources().isEmpty() && fragment.getRemoteSourceNodes().size() == 1, + verify(fragment.getPartitionedSources().isEmpty() && fragment.getRemoteSourceNodes().size() == 1, "SCALED_WRITER_HASH_DISTRIBUTION fragments are expected to have exactly one remote source and no table scans"); } return new HashDistributionSplitAssigner( @@ -85,10 +83,10 @@ public static HashDistributionSplitAssigner create( partitionedSources, replicatedSources, sourcePartitioningScheme, - createOutputPartitionToTaskPartition( + createSourcePartitionToTaskPartition( sourcePartitioningScheme, partitionedSources, - outputDataSizeEstimates, + sourceDataSizeEstimates, targetPartitionSizeInBytes, targetMaxTaskCount, sourceId -> fragment.getPartitioning().equals(SCALED_WRITER_HASH_DISTRIBUTION), @@ -102,16 +100,16 @@ public static HashDistributionSplitAssigner create( Set partitionedSources, Set replicatedSources, FaultTolerantPartitioningScheme sourcePartitioningScheme, - Map outputPartitionToTaskPartition) + Map sourcePartitionToTaskPartition) { this.catalogRequirement = requireNonNull(catalogRequirement, "catalogRequirement is null"); this.replicatedSources = ImmutableSet.copyOf(requireNonNull(replicatedSources, "replicatedSources is null")); - allSources = ImmutableSet.builder() + this.allSources = ImmutableSet.builder() .addAll(partitionedSources) .addAll(replicatedSources) .build(); this.sourcePartitioningScheme = requireNonNull(sourcePartitioningScheme, "sourcePartitioningScheme is null"); - this.outputPartitionToTaskPartition = ImmutableMap.copyOf(requireNonNull(outputPartitionToTaskPartition, "outputPartitionToTaskPartition is null")); + this.sourcePartitionToTaskPartition = ImmutableMap.copyOf(requireNonNull(sourcePartitionToTaskPartition, "sourcePartitionToTaskPartition is null")); } @Override @@ -126,9 +124,9 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap { - TaskPartition taskPartition = outputPartitionToTaskPartition.get(outputPartitionId); - verify(taskPartition != null, "taskPartition not found for outputPartitionId: %s", outputPartitionId); + splits.forEach((sourcePartitionId, split) -> { + TaskPartition taskPartition = sourcePartitionToTaskPartition.get(sourcePartitionId); + verify(taskPartition != null, "taskPartition not found for sourcePartitionId: %s", sourcePartitionId); List subPartitions; if (taskPartition.getSplitBy().isPresent() && taskPartition.getSplitBy().get().equals(planNodeId)) { @@ -144,7 +142,7 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap hostRequirement = sourcePartitioningScheme.getNodeRequirement(outputPartitionId) + Set hostRequirement = sourcePartitioningScheme.getNodeRequirement(sourcePartitionId) .map(InternalNode::getHostAndPort) .map(ImmutableSet::of) .orElse(ImmutableSet.of()); @@ -202,10 +200,10 @@ public AssignmentResult finish() } @VisibleForTesting - static Map createOutputPartitionToTaskPartition( + static Map createSourcePartitionToTaskPartition( FaultTolerantPartitioningScheme sourcePartitioningScheme, Set partitionedSources, - Map outputDataSizeEstimates, + Map sourceDataSizeEstimates, long targetPartitionSizeInBytes, int targetMaxTaskCount, Predicate canSplit, @@ -214,14 +212,14 @@ static Map createOutputPartitionToTaskPartition( int partitionCount = sourcePartitioningScheme.getPartitionCount(); if (sourcePartitioningScheme.isExplicitPartitionToNodeMappingPresent() || partitionedSources.isEmpty() || - !outputDataSizeEstimates.keySet().containsAll(partitionedSources)) { + !sourceDataSizeEstimates.keySet().containsAll(partitionedSources)) { // if bucket scheme is set explicitly or if estimates are missing create one task partition per output partition return IntStream.range(0, partitionCount) .boxed() .collect(toImmutableMap(Function.identity(), (key) -> new TaskPartition(1, Optional.empty()))); } - List partitionedSourcesEstimates = outputDataSizeEstimates.entrySet().stream() + List partitionedSourcesEstimates = sourceDataSizeEstimates.entrySet().stream() .filter(entry -> partitionedSources.contains(entry.getKey())) .map(Map.Entry::getValue) .collect(toImmutableList()); @@ -249,7 +247,7 @@ static Map createOutputPartitionToTaskPartition( partitionSizeInBytes, targetPartitionSizeInBytes, partitionedSources, - outputDataSizeEstimates, + sourceDataSizeEstimates, partitionId, canSplit); result.put(partitionId, taskPartition); @@ -268,13 +266,13 @@ private static TaskPartition createTaskPartition( long partitionSizeInBytes, long targetPartitionSizeInBytes, Set partitionedSources, - Map outputDataSizeEstimates, + Map sourceDataSizeEstimates, int partitionId, Predicate canSplit) { if (partitionSizeInBytes > targetPartitionSizeInBytes) { // try to assign multiple sub-partitions if possible - Map sourceSizes = getSourceSizes(partitionedSources, outputDataSizeEstimates, partitionId); + Map sourceSizes = getSourceSizes(partitionedSources, sourceDataSizeEstimates, partitionId); PlanNodeId largestSource = sourceSizes.entrySet().stream() .max(Map.Entry.comparingByValue()) .map(Map.Entry::getKey) @@ -289,10 +287,10 @@ private static TaskPartition createTaskPartition( return new TaskPartition(1, Optional.empty()); } - private static Map getSourceSizes(Set partitionedSources, Map outputDataSizeEstimates, int partitionId) + private static Map getSourceSizes(Set partitionedSources, Map sourceDataSizeEstimates, int partitionId) { return partitionedSources.stream() - .collect(toImmutableMap(Function.identity(), source -> outputDataSizeEstimates.get(source).getPartitionSizeInBytes(partitionId))); + .collect(toImmutableMap(Function.identity(), source -> sourceDataSizeEstimates.get(source).getPartitionSizeInBytes(partitionId))); } private record PartitionAssignment(TaskPartition taskPartition, long assignedDataSizeInBytes) diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestHashDistributionSplitAssigner.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestHashDistributionSplitAssigner.java index e21eb34ad1cc..6b5fe1ceba1b 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestHashDistributionSplitAssigner.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestHashDistributionSplitAssigner.java @@ -46,7 +46,7 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.ImmutableSetMultimap.toImmutableSetMultimap; -import static io.trino.execution.scheduler.HashDistributionSplitAssigner.createOutputPartitionToTaskPartition; +import static io.trino.execution.scheduler.HashDistributionSplitAssigner.createSourcePartitionToTaskPartition; import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE; import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.assertThat; @@ -80,7 +80,7 @@ public void testEmpty() .withSplits(new SplitBatch(REPLICATED_1, ImmutableListMultimap.of(), true)) .withSplitPartitionCount(1) .withTargetPartitionSizeInBytes(1024) - .withOutputDataSizeEstimates(ImmutableMap.of(REPLICATED_1, new OutputDataSizeEstimate(ImmutableLongArray.builder().add(0).build()))) + .withSourceDataSizeEstimates(ImmutableMap.of(REPLICATED_1, new OutputDataSizeEstimate(ImmutableLongArray.builder().add(0).build()))) .withMergeAllowed(true) .withExpectedTaskCount(1) .run(); @@ -121,7 +121,7 @@ public void testExplicitPartitionToNodeMap() .withSplitPartitionCount(3) .withPartitionToNodeMap(Optional.of(ImmutableList.of(NODE_1, NODE_2, NODE_3))) .withTargetPartitionSizeInBytes(1000) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(3) .run(); @@ -133,7 +133,7 @@ public void testExplicitPartitionToNodeMap() .withSplitPartitionCount(3) .withPartitionToNodeMap(Optional.of(ImmutableList.of(NODE_1, NODE_2, NODE_3))) .withTargetPartitionSizeInBytes(1000) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(1) .run(); @@ -145,7 +145,7 @@ public void testExplicitPartitionToNodeMap() .withSplitPartitionCount(3) .withPartitionToNodeMap(Optional.of(ImmutableList.of(NODE_1, NODE_2, NODE_3))) .withTargetPartitionSizeInBytes(1000) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(1) .run(); @@ -161,7 +161,7 @@ public void testMergeNotAllowed() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(2, 1), createSplit(3, 2)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(1000) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(false) .withExpectedTaskCount(3) .run(); @@ -172,7 +172,7 @@ public void testMergeNotAllowed() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(0, 0), createSplit(1, 0)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(1000) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(false) .withExpectedTaskCount(1) .run(); @@ -183,7 +183,7 @@ public void testMergeNotAllowed() new SplitBatch(PARTITIONED_1, ImmutableListMultimap.of(), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(1000) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(false) .withExpectedTaskCount(1) .run(); @@ -237,7 +237,7 @@ public void testHappyPath() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(2, 1), createSplit(3, 2)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(3) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(1) .run(); @@ -251,7 +251,7 @@ public void testHappyPath() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(6, 1), createSplit(7, 2)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(3) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(1) .run(); @@ -265,7 +265,7 @@ public void testHappyPath() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(6, 1), createSplit(7, 2)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(1) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(3) .run(); @@ -279,7 +279,7 @@ public void testHappyPath() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(6, 1), createSplit(7, 2)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(1) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(3) .run(); @@ -294,7 +294,7 @@ public void testHappyPath() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(6, 1), createSplit(7, 2)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(1) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(3) .run(); @@ -310,7 +310,7 @@ public void testHappyPath() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(6, 1), createSplit(7, 2)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(1) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)), PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) @@ -329,7 +329,7 @@ public void testPartitionSplitting() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(2, 0), createSplit(3, 0)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(3) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(5, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(5, 1, 1)))) .withSplittableSources(PARTITIONED_1) .withMergeAllowed(true) .withExpectedTaskCount(2) @@ -343,7 +343,7 @@ public void testPartitionSplitting() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(2, 0), createSplit(3, 0)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(3) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(5, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(5, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(1) .run(); @@ -357,7 +357,7 @@ public void testPartitionSplitting() new SplitBatch(PARTITIONED_2, createSplitMap(createSplit(4, 0), createSplit(5, 1)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(30) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)), PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) .withSplittableSources(PARTITIONED_1) @@ -372,7 +372,7 @@ PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) new SplitBatch(PARTITIONED_2, createSplitMap(createSplit(4, 0), createSplit(5, 1)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(30) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)), PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) .withSplittableSources(PARTITIONED_1, PARTITIONED_2) @@ -387,7 +387,7 @@ PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) new SplitBatch(PARTITIONED_2, createSplitMap(createSplit(4, 0), createSplit(5, 0)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(30) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)), PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) .withSplittableSources(PARTITIONED_2) @@ -405,7 +405,7 @@ PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(30) .withTaskTargetMaxCount(10) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1000, 1, 1)), PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) .withSplittableSources(PARTITIONED_1, PARTITIONED_2) @@ -420,7 +420,7 @@ public void testCreateOutputPartitionToTaskPartition() testPartitionMapping() .withSplitPartitionCount(3) .withPartitionedSources(PARTITIONED_1) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) .withTargetPartitionSizeInBytes(25) .withSplittableSources(PARTITIONED_1) @@ -432,7 +432,7 @@ PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) testPartitionMapping() .withSplitPartitionCount(3) .withPartitionedSources(PARTITIONED_1) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) .withTargetPartitionSizeInBytes(25) .withMergeAllowed(true) @@ -443,7 +443,7 @@ PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) testPartitionMapping() .withSplitPartitionCount(3) .withPartitionedSources(PARTITIONED_1) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) .withTargetPartitionSizeInBytes(25) .withMergeAllowed(false) @@ -455,7 +455,7 @@ PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) testPartitionMapping() .withSplitPartitionCount(3) .withPartitionedSources(PARTITIONED_1) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) .withTargetPartitionSizeInBytes(25) .withMergeAllowed(false) @@ -468,7 +468,7 @@ PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) testPartitionMapping() .withSplitPartitionCount(4) .withPartitionedSources(PARTITIONED_1) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(0, 0, 0, 60)))) .withTargetPartitionSizeInBytes(25) .withMergeAllowed(false) @@ -544,7 +544,7 @@ private static class AssignerTester private Optional> partitionToNodeMap = Optional.empty(); private long targetPartitionSizeInBytes; private int taskTargetMaxCount = Integer.MAX_VALUE; - private Map outputDataSizeEstimates = ImmutableMap.of(); + private Map sourceDataSizeEstimates = ImmutableMap.of(); private Set splittableSources = ImmutableSet.of(); private boolean mergeAllowed; private int expectedTaskCount; @@ -591,9 +591,9 @@ public AssignerTester withTaskTargetMaxCount(int taskTargetMaxCount) return this; } - public AssignerTester withOutputDataSizeEstimates(Map outputDataSizeEstimates) + public AssignerTester withSourceDataSizeEstimates(Map sourceDataSizeEstimates) { - this.outputDataSizeEstimates = outputDataSizeEstimates; + this.sourceDataSizeEstimates = sourceDataSizeEstimates; return this; } @@ -618,10 +618,10 @@ public AssignerTester withExpectedTaskCount(int expectedTaskCount) public void run() { FaultTolerantPartitioningScheme partitioningScheme = createPartitioningScheme(splitPartitionCount, partitionToNodeMap); - Map outputPartitionToTaskPartition = createOutputPartitionToTaskPartition( + Map sourcePartitionToTaskPartition = createSourcePartitionToTaskPartition( partitioningScheme, partitionedSources, - outputDataSizeEstimates, + sourceDataSizeEstimates, targetPartitionSizeInBytes, taskTargetMaxCount, splittableSources::contains, @@ -631,7 +631,7 @@ public void run() partitionedSources, replicatedSources, partitioningScheme, - outputPartitionToTaskPartition); + sourcePartitionToTaskPartition); SplitAssignerTester tester = new SplitAssignerTester(); Map> partitionedSplitIds = new HashMap<>(); Set replicatedSplitIds = new HashSet<>(); @@ -675,7 +675,7 @@ public void run() // validate partitioned splits partitionedSplitIds.forEach((partitionId, sourceSplits) -> { sourceSplits.forEach((source, splitId) -> { - List descriptors = outputPartitionToTaskPartition.get(partitionId).getSubPartitions().stream() + List descriptors = sourcePartitionToTaskPartition.get(partitionId).getSubPartitions().stream() .filter(HashDistributionSplitAssigner.SubPartition::isIdAssigned) .map(HashDistributionSplitAssigner.SubPartition::getId) .map(taskDescriptors::get) @@ -710,7 +710,7 @@ private static class PartitionMappingTester private int splitPartitionCount; private Optional> partitionToNodeMap = Optional.empty(); private long targetPartitionSizeInBytes; - private Map outputDataSizeEstimates = ImmutableMap.of(); + private Map sourceDataSizeEstimates = ImmutableMap.of(); private Set splittableSources = ImmutableSet.of(); private boolean mergeAllowed; private Set expectedMappings = ImmutableSet.of(); @@ -739,9 +739,9 @@ public PartitionMappingTester withTargetPartitionSizeInBytes(long targetPartitio return this; } - public PartitionMappingTester withOutputDataSizeEstimates(Map outputDataSizeEstimates) + public PartitionMappingTester withSourceDataSizeEstimates(Map sourceDataSizeEstimates) { - this.outputDataSizeEstimates = outputDataSizeEstimates; + this.sourceDataSizeEstimates = sourceDataSizeEstimates; return this; } @@ -766,10 +766,10 @@ public PartitionMappingTester withExpectedMappings(PartitionMapping... mappings) public void run() { FaultTolerantPartitioningScheme partitioningScheme = createPartitioningScheme(splitPartitionCount, partitionToNodeMap); - Map actual = createOutputPartitionToTaskPartition( + Map actual = createSourcePartitionToTaskPartition( partitioningScheme, partitionedSources, - outputDataSizeEstimates, + sourceDataSizeEstimates, targetPartitionSizeInBytes, Integer.MAX_VALUE, splittableSources::contains, @@ -778,9 +778,9 @@ public void run() assertEquals(actualGroups, expectedMappings); } - private static Set extractMappings(Map outputPartitionToTaskPartition) + private static Set extractMappings(Map sourcePartitionToTaskPartition) { - SetMultimap grouped = outputPartitionToTaskPartition.entrySet().stream() + SetMultimap grouped = sourcePartitionToTaskPartition.entrySet().stream() .collect(toImmutableSetMultimap(Map.Entry::getValue, Map.Entry::getKey)); return Multimaps.asMap(grouped).entrySet().stream() .map(entry -> new PartitionMapping(entry.getValue(), entry.getKey().getSubPartitions().size())) From 156660f1e5442bbde344d7522876fb8115054470 Mon Sep 17 00:00:00 2001 From: Zebing Lin Date: Sun, 21 May 2023 22:24:55 -0400 Subject: [PATCH 2/2] Create tasks all at once on first assignment of HashDistributionSplitAssigner --- .../HashDistributionSplitAssigner.java | 64 +++++++++---------- .../TestHashDistributionSplitAssigner.java | 24 +++---- 2 files changed, 41 insertions(+), 47 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java index f823feefd32c..f7775abe553d 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java @@ -62,7 +62,7 @@ class HashDistributionSplitAssigner private final Set completedSources = new HashSet<>(); private final ListMultimap replicatedSplits = ArrayListMultimap.create(); - private int nextTaskPartitionId; + private boolean allTaskPartitionsCreated; public static HashDistributionSplitAssigner create( Optional catalogRequirement, @@ -117,6 +117,33 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap hostRequirement = sourcePartitioningScheme.getNodeRequirement(sourcePartitionId) + .map(InternalNode::getHostAndPort) + .map(ImmutableSet::of) + .orElse(ImmutableSet.of()); + assignment.addPartition(new Partition( + taskPartitionId, + new NodeRequirements(catalogRequirement, hostRequirement))); + createdTaskPartitions.add(taskPartitionId); + } + } + } + assignment.setNoMorePartitions(); + + allTaskPartitionsCreated = true; + } + if (replicatedSources.contains(planNodeId)) { replicatedSplits.putAll(planNodeId, splits.values()); for (Integer partitionId : createdTaskPartitions) { @@ -137,27 +164,6 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap hostRequirement = sourcePartitioningScheme.getNodeRequirement(sourcePartitionId) - .map(InternalNode::getHostAndPort) - .map(ImmutableSet::of) - .orElse(ImmutableSet.of()); - assignment.addPartition(new Partition( - taskPartitionId, - new NodeRequirements(catalogRequirement, hostRequirement))); - for (PlanNodeId replicatedSource : replicatedSplits.keySet()) { - assignment.updatePartition(new PartitionUpdate(taskPartitionId, replicatedSource, replicatedSplits.get(replicatedSource), completedSources.contains(replicatedSource))); - } - for (PlanNodeId completedSource : completedSources) { - assignment.updatePartition(new PartitionUpdate(taskPartitionId, completedSource, ImmutableList.of(), true)); - } - createdTaskPartitions.add(taskPartitionId); - } - assignment.updatePartition(new PartitionUpdate(subPartition.getId(), planNodeId, ImmutableList.of(split), false)); } }); @@ -168,23 +174,11 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap