diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java index 254c25162124..f7775abe553d 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java @@ -56,28 +56,26 @@ class HashDistributionSplitAssigner private final Set replicatedSources; private final Set allSources; private final FaultTolerantPartitioningScheme sourcePartitioningScheme; - private final Map outputPartitionToTaskPartition; + private final Map sourcePartitionToTaskPartition; private final Set createdTaskPartitions = new HashSet<>(); private final Set completedSources = new HashSet<>(); private final ListMultimap replicatedSplits = ArrayListMultimap.create(); - private int nextTaskPartitionId; + private boolean allTaskPartitionsCreated; public static HashDistributionSplitAssigner create( Optional catalogRequirement, Set partitionedSources, Set replicatedSources, FaultTolerantPartitioningScheme sourcePartitioningScheme, - Map outputDataSizeEstimates, + Map sourceDataSizeEstimates, PlanFragment fragment, long targetPartitionSizeInBytes, int targetMaxTaskCount) { if (fragment.getPartitioning().equals(SCALED_WRITER_HASH_DISTRIBUTION)) { - verify( - - fragment.getPartitionedSources().isEmpty() && fragment.getRemoteSourceNodes().size() == 1, + verify(fragment.getPartitionedSources().isEmpty() && fragment.getRemoteSourceNodes().size() == 1, "SCALED_WRITER_HASH_DISTRIBUTION fragments are expected to have exactly one remote source and no table scans"); } return new HashDistributionSplitAssigner( @@ -85,10 +83,10 @@ public static HashDistributionSplitAssigner create( partitionedSources, replicatedSources, sourcePartitioningScheme, - createOutputPartitionToTaskPartition( + createSourcePartitionToTaskPartition( sourcePartitioningScheme, partitionedSources, - outputDataSizeEstimates, + sourceDataSizeEstimates, targetPartitionSizeInBytes, targetMaxTaskCount, sourceId -> fragment.getPartitioning().equals(SCALED_WRITER_HASH_DISTRIBUTION), @@ -102,16 +100,16 @@ public static HashDistributionSplitAssigner create( Set partitionedSources, Set replicatedSources, FaultTolerantPartitioningScheme sourcePartitioningScheme, - Map outputPartitionToTaskPartition) + Map sourcePartitionToTaskPartition) { this.catalogRequirement = requireNonNull(catalogRequirement, "catalogRequirement is null"); this.replicatedSources = ImmutableSet.copyOf(requireNonNull(replicatedSources, "replicatedSources is null")); - allSources = ImmutableSet.builder() + this.allSources = ImmutableSet.builder() .addAll(partitionedSources) .addAll(replicatedSources) .build(); this.sourcePartitioningScheme = requireNonNull(sourcePartitioningScheme, "sourcePartitioningScheme is null"); - this.outputPartitionToTaskPartition = ImmutableMap.copyOf(requireNonNull(outputPartitionToTaskPartition, "outputPartitionToTaskPartition is null")); + this.sourcePartitionToTaskPartition = ImmutableMap.copyOf(requireNonNull(sourcePartitionToTaskPartition, "sourcePartitionToTaskPartition is null")); } @Override @@ -119,6 +117,33 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap hostRequirement = sourcePartitioningScheme.getNodeRequirement(sourcePartitionId) + .map(InternalNode::getHostAndPort) + .map(ImmutableSet::of) + .orElse(ImmutableSet.of()); + assignment.addPartition(new Partition( + taskPartitionId, + new NodeRequirements(catalogRequirement, hostRequirement))); + createdTaskPartitions.add(taskPartitionId); + } + } + } + assignment.setNoMorePartitions(); + + allTaskPartitionsCreated = true; + } + if (replicatedSources.contains(planNodeId)) { replicatedSplits.putAll(planNodeId, splits.values()); for (Integer partitionId : createdTaskPartitions) { @@ -126,9 +151,9 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap { - TaskPartition taskPartition = outputPartitionToTaskPartition.get(outputPartitionId); - verify(taskPartition != null, "taskPartition not found for outputPartitionId: %s", outputPartitionId); + splits.forEach((sourcePartitionId, split) -> { + TaskPartition taskPartition = sourcePartitionToTaskPartition.get(sourcePartitionId); + verify(taskPartition != null, "taskPartition not found for sourcePartitionId: %s", sourcePartitionId); List subPartitions; if (taskPartition.getSplitBy().isPresent() && taskPartition.getSplitBy().get().equals(planNodeId)) { @@ -139,27 +164,6 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap hostRequirement = sourcePartitioningScheme.getNodeRequirement(outputPartitionId) - .map(InternalNode::getHostAndPort) - .map(ImmutableSet::of) - .orElse(ImmutableSet.of()); - assignment.addPartition(new Partition( - taskPartitionId, - new NodeRequirements(catalogRequirement, hostRequirement))); - for (PlanNodeId replicatedSource : replicatedSplits.keySet()) { - assignment.updatePartition(new PartitionUpdate(taskPartitionId, replicatedSource, replicatedSplits.get(replicatedSource), completedSources.contains(replicatedSource))); - } - for (PlanNodeId completedSource : completedSources) { - assignment.updatePartition(new PartitionUpdate(taskPartitionId, completedSource, ImmutableList.of(), true)); - } - createdTaskPartitions.add(taskPartitionId); - } - assignment.updatePartition(new PartitionUpdate(subPartition.getId(), planNodeId, ImmutableList.of(split), false)); } }); @@ -170,23 +174,11 @@ public AssignmentResult assign(PlanNodeId planNodeId, ListMultimap createOutputPartitionToTaskPartition( + static Map createSourcePartitionToTaskPartition( FaultTolerantPartitioningScheme sourcePartitioningScheme, Set partitionedSources, - Map outputDataSizeEstimates, + Map sourceDataSizeEstimates, long targetPartitionSizeInBytes, int targetMaxTaskCount, Predicate canSplit, @@ -214,14 +206,14 @@ static Map createOutputPartitionToTaskPartition( int partitionCount = sourcePartitioningScheme.getPartitionCount(); if (sourcePartitioningScheme.isExplicitPartitionToNodeMappingPresent() || partitionedSources.isEmpty() || - !outputDataSizeEstimates.keySet().containsAll(partitionedSources)) { + !sourceDataSizeEstimates.keySet().containsAll(partitionedSources)) { // if bucket scheme is set explicitly or if estimates are missing create one task partition per output partition return IntStream.range(0, partitionCount) .boxed() .collect(toImmutableMap(Function.identity(), (key) -> new TaskPartition(1, Optional.empty()))); } - List partitionedSourcesEstimates = outputDataSizeEstimates.entrySet().stream() + List partitionedSourcesEstimates = sourceDataSizeEstimates.entrySet().stream() .filter(entry -> partitionedSources.contains(entry.getKey())) .map(Map.Entry::getValue) .collect(toImmutableList()); @@ -249,7 +241,7 @@ static Map createOutputPartitionToTaskPartition( partitionSizeInBytes, targetPartitionSizeInBytes, partitionedSources, - outputDataSizeEstimates, + sourceDataSizeEstimates, partitionId, canSplit); result.put(partitionId, taskPartition); @@ -268,13 +260,13 @@ private static TaskPartition createTaskPartition( long partitionSizeInBytes, long targetPartitionSizeInBytes, Set partitionedSources, - Map outputDataSizeEstimates, + Map sourceDataSizeEstimates, int partitionId, Predicate canSplit) { if (partitionSizeInBytes > targetPartitionSizeInBytes) { // try to assign multiple sub-partitions if possible - Map sourceSizes = getSourceSizes(partitionedSources, outputDataSizeEstimates, partitionId); + Map sourceSizes = getSourceSizes(partitionedSources, sourceDataSizeEstimates, partitionId); PlanNodeId largestSource = sourceSizes.entrySet().stream() .max(Map.Entry.comparingByValue()) .map(Map.Entry::getKey) @@ -289,10 +281,10 @@ private static TaskPartition createTaskPartition( return new TaskPartition(1, Optional.empty()); } - private static Map getSourceSizes(Set partitionedSources, Map outputDataSizeEstimates, int partitionId) + private static Map getSourceSizes(Set partitionedSources, Map sourceDataSizeEstimates, int partitionId) { return partitionedSources.stream() - .collect(toImmutableMap(Function.identity(), source -> outputDataSizeEstimates.get(source).getPartitionSizeInBytes(partitionId))); + .collect(toImmutableMap(Function.identity(), source -> sourceDataSizeEstimates.get(source).getPartitionSizeInBytes(partitionId))); } private record PartitionAssignment(TaskPartition taskPartition, long assignedDataSizeInBytes) diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestHashDistributionSplitAssigner.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestHashDistributionSplitAssigner.java index e21eb34ad1cc..73641ce1d665 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestHashDistributionSplitAssigner.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestHashDistributionSplitAssigner.java @@ -46,7 +46,7 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.ImmutableSetMultimap.toImmutableSetMultimap; -import static io.trino.execution.scheduler.HashDistributionSplitAssigner.createOutputPartitionToTaskPartition; +import static io.trino.execution.scheduler.HashDistributionSplitAssigner.createSourcePartitionToTaskPartition; import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE; import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.assertThat; @@ -73,14 +73,14 @@ public void testEmpty() .withSplitPartitionCount(10) .withTargetPartitionSizeInBytes(1024) .withMergeAllowed(true) - .withExpectedTaskCount(1) + .withExpectedTaskCount(10) .run(); testAssigner() .withReplicatedSources(REPLICATED_1) .withSplits(new SplitBatch(REPLICATED_1, ImmutableListMultimap.of(), true)) .withSplitPartitionCount(1) .withTargetPartitionSizeInBytes(1024) - .withOutputDataSizeEstimates(ImmutableMap.of(REPLICATED_1, new OutputDataSizeEstimate(ImmutableLongArray.builder().add(0).build()))) + .withSourceDataSizeEstimates(ImmutableMap.of(REPLICATED_1, new OutputDataSizeEstimate(ImmutableLongArray.builder().add(0).build()))) .withMergeAllowed(true) .withExpectedTaskCount(1) .run(); @@ -93,7 +93,7 @@ public void testEmpty() .withSplitPartitionCount(10) .withTargetPartitionSizeInBytes(1024) .withMergeAllowed(true) - .withExpectedTaskCount(1) + .withExpectedTaskCount(10) .run(); testAssigner() .withPartitionedSources(PARTITIONED_1, PARTITIONED_2) @@ -106,7 +106,7 @@ public void testEmpty() .withSplitPartitionCount(10) .withTargetPartitionSizeInBytes(1024) .withMergeAllowed(true) - .withExpectedTaskCount(1) + .withExpectedTaskCount(10) .run(); } @@ -121,7 +121,7 @@ public void testExplicitPartitionToNodeMap() .withSplitPartitionCount(3) .withPartitionToNodeMap(Optional.of(ImmutableList.of(NODE_1, NODE_2, NODE_3))) .withTargetPartitionSizeInBytes(1000) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(3) .run(); @@ -133,9 +133,9 @@ public void testExplicitPartitionToNodeMap() .withSplitPartitionCount(3) .withPartitionToNodeMap(Optional.of(ImmutableList.of(NODE_1, NODE_2, NODE_3))) .withTargetPartitionSizeInBytes(1000) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) - .withExpectedTaskCount(1) + .withExpectedTaskCount(3) .run(); // no splits testAssigner() @@ -145,9 +145,9 @@ public void testExplicitPartitionToNodeMap() .withSplitPartitionCount(3) .withPartitionToNodeMap(Optional.of(ImmutableList.of(NODE_1, NODE_2, NODE_3))) .withTargetPartitionSizeInBytes(1000) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) - .withExpectedTaskCount(1) + .withExpectedTaskCount(3) .run(); } @@ -161,7 +161,7 @@ public void testMergeNotAllowed() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(2, 1), createSplit(3, 2)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(1000) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(false) .withExpectedTaskCount(3) .run(); @@ -172,9 +172,9 @@ public void testMergeNotAllowed() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(0, 0), createSplit(1, 0)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(1000) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(false) - .withExpectedTaskCount(1) + .withExpectedTaskCount(3) .run(); // no splits testAssigner() @@ -183,9 +183,9 @@ public void testMergeNotAllowed() new SplitBatch(PARTITIONED_1, ImmutableListMultimap.of(), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(1000) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(false) - .withExpectedTaskCount(1) + .withExpectedTaskCount(3) .run(); } @@ -212,7 +212,7 @@ public void testMissingEstimates() .withPartitionToNodeMap(Optional.of(ImmutableList.of(NODE_1, NODE_2, NODE_3))) .withTargetPartitionSizeInBytes(1000) .withMergeAllowed(true) - .withExpectedTaskCount(1) + .withExpectedTaskCount(3) .run(); // no splits testAssigner() @@ -223,7 +223,7 @@ public void testMissingEstimates() .withPartitionToNodeMap(Optional.of(ImmutableList.of(NODE_1, NODE_2, NODE_3))) .withTargetPartitionSizeInBytes(1000) .withMergeAllowed(true) - .withExpectedTaskCount(1) + .withExpectedTaskCount(3) .run(); } @@ -237,7 +237,7 @@ public void testHappyPath() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(2, 1), createSplit(3, 2)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(3) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(1) .run(); @@ -251,7 +251,7 @@ public void testHappyPath() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(6, 1), createSplit(7, 2)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(3) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(1) .run(); @@ -265,7 +265,7 @@ public void testHappyPath() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(6, 1), createSplit(7, 2)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(1) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(3) .run(); @@ -279,7 +279,7 @@ public void testHappyPath() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(6, 1), createSplit(7, 2)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(1) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(3) .run(); @@ -294,7 +294,7 @@ public void testHappyPath() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(6, 1), createSplit(7, 2)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(1) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(3) .run(); @@ -310,7 +310,7 @@ public void testHappyPath() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(6, 1), createSplit(7, 2)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(1) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)), PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(1, 1, 1)))) .withMergeAllowed(true) @@ -329,10 +329,10 @@ public void testPartitionSplitting() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(2, 0), createSplit(3, 0)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(3) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(5, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(5, 1, 1)))) .withSplittableSources(PARTITIONED_1) .withMergeAllowed(true) - .withExpectedTaskCount(2) + .withExpectedTaskCount(3) .run(); // largest source is not splittable @@ -343,9 +343,9 @@ public void testPartitionSplitting() new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(2, 0), createSplit(3, 0)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(3) - .withOutputDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(5, 1, 1)))) + .withSourceDataSizeEstimates(ImmutableMap.of(PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(5, 1, 1)))) .withMergeAllowed(true) - .withExpectedTaskCount(1) + .withExpectedTaskCount(2) .run(); // multiple sources @@ -357,7 +357,7 @@ public void testPartitionSplitting() new SplitBatch(PARTITIONED_2, createSplitMap(createSplit(4, 0), createSplit(5, 1)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(30) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)), PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) .withSplittableSources(PARTITIONED_1) @@ -372,7 +372,7 @@ PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) new SplitBatch(PARTITIONED_2, createSplitMap(createSplit(4, 0), createSplit(5, 1)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(30) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)), PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) .withSplittableSources(PARTITIONED_1, PARTITIONED_2) @@ -387,12 +387,12 @@ PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) new SplitBatch(PARTITIONED_2, createSplitMap(createSplit(4, 0), createSplit(5, 0)), true)) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(30) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)), PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) .withSplittableSources(PARTITIONED_2) .withMergeAllowed(true) - .withExpectedTaskCount(1) + .withExpectedTaskCount(2) .run(); // targetPartitionSizeInBytes re-adjustment based on taskTargetMaxCount @@ -405,7 +405,7 @@ PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) .withSplitPartitionCount(3) .withTargetPartitionSizeInBytes(30) .withTaskTargetMaxCount(10) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1000, 1, 1)), PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) .withSplittableSources(PARTITIONED_1, PARTITIONED_2) @@ -420,7 +420,7 @@ public void testCreateOutputPartitionToTaskPartition() testPartitionMapping() .withSplitPartitionCount(3) .withPartitionedSources(PARTITIONED_1) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) .withTargetPartitionSizeInBytes(25) .withSplittableSources(PARTITIONED_1) @@ -432,7 +432,7 @@ PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) testPartitionMapping() .withSplitPartitionCount(3) .withPartitionedSources(PARTITIONED_1) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) .withTargetPartitionSizeInBytes(25) .withMergeAllowed(true) @@ -443,7 +443,7 @@ PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) testPartitionMapping() .withSplitPartitionCount(3) .withPartitionedSources(PARTITIONED_1) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) .withTargetPartitionSizeInBytes(25) .withMergeAllowed(false) @@ -455,7 +455,7 @@ PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) testPartitionMapping() .withSplitPartitionCount(3) .withPartitionedSources(PARTITIONED_1) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) .withTargetPartitionSizeInBytes(25) .withMergeAllowed(false) @@ -468,7 +468,7 @@ PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(50, 1, 1)))) testPartitionMapping() .withSplitPartitionCount(4) .withPartitionedSources(PARTITIONED_1) - .withOutputDataSizeEstimates(ImmutableMap.of( + .withSourceDataSizeEstimates(ImmutableMap.of( PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(0, 0, 0, 60)))) .withTargetPartitionSizeInBytes(25) .withMergeAllowed(false) @@ -544,7 +544,7 @@ private static class AssignerTester private Optional> partitionToNodeMap = Optional.empty(); private long targetPartitionSizeInBytes; private int taskTargetMaxCount = Integer.MAX_VALUE; - private Map outputDataSizeEstimates = ImmutableMap.of(); + private Map sourceDataSizeEstimates = ImmutableMap.of(); private Set splittableSources = ImmutableSet.of(); private boolean mergeAllowed; private int expectedTaskCount; @@ -591,9 +591,9 @@ public AssignerTester withTaskTargetMaxCount(int taskTargetMaxCount) return this; } - public AssignerTester withOutputDataSizeEstimates(Map outputDataSizeEstimates) + public AssignerTester withSourceDataSizeEstimates(Map sourceDataSizeEstimates) { - this.outputDataSizeEstimates = outputDataSizeEstimates; + this.sourceDataSizeEstimates = sourceDataSizeEstimates; return this; } @@ -618,10 +618,10 @@ public AssignerTester withExpectedTaskCount(int expectedTaskCount) public void run() { FaultTolerantPartitioningScheme partitioningScheme = createPartitioningScheme(splitPartitionCount, partitionToNodeMap); - Map outputPartitionToTaskPartition = createOutputPartitionToTaskPartition( + Map sourcePartitionToTaskPartition = createSourcePartitionToTaskPartition( partitioningScheme, partitionedSources, - outputDataSizeEstimates, + sourceDataSizeEstimates, targetPartitionSizeInBytes, taskTargetMaxCount, splittableSources::contains, @@ -631,7 +631,7 @@ public void run() partitionedSources, replicatedSources, partitioningScheme, - outputPartitionToTaskPartition); + sourcePartitionToTaskPartition); SplitAssignerTester tester = new SplitAssignerTester(); Map> partitionedSplitIds = new HashMap<>(); Set replicatedSplitIds = new HashSet<>(); @@ -675,7 +675,7 @@ public void run() // validate partitioned splits partitionedSplitIds.forEach((partitionId, sourceSplits) -> { sourceSplits.forEach((source, splitId) -> { - List descriptors = outputPartitionToTaskPartition.get(partitionId).getSubPartitions().stream() + List descriptors = sourcePartitionToTaskPartition.get(partitionId).getSubPartitions().stream() .filter(HashDistributionSplitAssigner.SubPartition::isIdAssigned) .map(HashDistributionSplitAssigner.SubPartition::getId) .map(taskDescriptors::get) @@ -710,7 +710,7 @@ private static class PartitionMappingTester private int splitPartitionCount; private Optional> partitionToNodeMap = Optional.empty(); private long targetPartitionSizeInBytes; - private Map outputDataSizeEstimates = ImmutableMap.of(); + private Map sourceDataSizeEstimates = ImmutableMap.of(); private Set splittableSources = ImmutableSet.of(); private boolean mergeAllowed; private Set expectedMappings = ImmutableSet.of(); @@ -739,9 +739,9 @@ public PartitionMappingTester withTargetPartitionSizeInBytes(long targetPartitio return this; } - public PartitionMappingTester withOutputDataSizeEstimates(Map outputDataSizeEstimates) + public PartitionMappingTester withSourceDataSizeEstimates(Map sourceDataSizeEstimates) { - this.outputDataSizeEstimates = outputDataSizeEstimates; + this.sourceDataSizeEstimates = sourceDataSizeEstimates; return this; } @@ -766,10 +766,10 @@ public PartitionMappingTester withExpectedMappings(PartitionMapping... mappings) public void run() { FaultTolerantPartitioningScheme partitioningScheme = createPartitioningScheme(splitPartitionCount, partitionToNodeMap); - Map actual = createOutputPartitionToTaskPartition( + Map actual = createSourcePartitionToTaskPartition( partitioningScheme, partitionedSources, - outputDataSizeEstimates, + sourceDataSizeEstimates, targetPartitionSizeInBytes, Integer.MAX_VALUE, splittableSources::contains, @@ -778,9 +778,9 @@ public void run() assertEquals(actualGroups, expectedMappings); } - private static Set extractMappings(Map outputPartitionToTaskPartition) + private static Set extractMappings(Map sourcePartitionToTaskPartition) { - SetMultimap grouped = outputPartitionToTaskPartition.entrySet().stream() + SetMultimap grouped = sourcePartitionToTaskPartition.entrySet().stream() .collect(toImmutableSetMultimap(Map.Entry::getValue, Map.Entry::getKey)); return Multimaps.asMap(grouped).entrySet().stream() .map(entry -> new PartitionMapping(entry.getValue(), entry.getKey().getSubPartitions().size()))