diff --git a/presto-main/src/main/java/com/facebook/presto/operator/exchange/LocalExchangeSinkOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/exchange/LocalExchangeSinkOperator.java index bf65577a175e8..9828f57b818e1 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/exchange/LocalExchangeSinkOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/exchange/LocalExchangeSinkOperator.java @@ -149,8 +149,8 @@ public void addInput(Page page) { requireNonNull(page, "page is null"); page = pagePreprocessor.apply(page); - operatorContext.recordOutput(page.getSizeInBytes(), page.getPositionCount()); sink.addPage(page); + operatorContext.recordOutput(page.getSizeInBytes(), page.getPositionCount()); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/operator/exchange/PageChannelSelector.java b/presto-main/src/main/java/com/facebook/presto/operator/exchange/PageChannelSelector.java index b8b21c1d5483c..48dfb28b440a3 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/exchange/PageChannelSelector.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/exchange/PageChannelSelector.java @@ -24,9 +24,6 @@ public class PageChannelSelector implements Function { - // No channels need to be remapped, only ensure that all page blocks are loaded - private static final Function GET_LOADED_PAGE = Page::getLoadedPage; - private final int[] channels; public PageChannelSelector(int... channels) @@ -38,12 +35,6 @@ public PageChannelSelector(int... channels) @Override public Page apply(Page page) { - // Ensure the channels that are emitted are fully loaded and in the correct order - return requireNonNull(page, "page is null").getLoadedPage(channels); - } - - public static Function identitySelection() - { - return GET_LOADED_PAGE; + return requireNonNull(page, "page is null").extractChannels(channels); } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/exchange/PartitioningExchanger.java b/presto-main/src/main/java/com/facebook/presto/operator/exchange/PartitioningExchanger.java index cec9df91c6785..cfa473f5eb72f 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/exchange/PartitioningExchanger.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/exchange/PartitioningExchanger.java @@ -20,6 +20,7 @@ import com.google.common.primitives.Ints; import com.google.common.util.concurrent.ListenableFuture; import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntList; import java.util.List; import java.util.Optional; @@ -34,7 +35,7 @@ class PartitioningExchanger private final LocalExchangeMemoryManager memoryManager; private final PartitionFunction partitionFunction; private final int[] partitioningChannels; - private final int hashChannel; // when >= 0, this is the precomputed raw hash column to partition on + private final Optional hashChannel; private final IntArrayList[] partitionAssignments; private final PageReleasedListener onPageReleased; @@ -49,7 +50,7 @@ public PartitioningExchanger( this.memoryManager = requireNonNull(memoryManager, "memoryManager is null"); this.partitionFunction = requireNonNull(partitionFunction, "partitionFunction is null"); this.partitioningChannels = Ints.toArray(requireNonNull(partitioningChannels, "partitioningChannels is null")); - this.hashChannel = requireNonNull(hashChannel, "hashChannel is null").orElse(-1); + this.hashChannel = requireNonNull(hashChannel, "hashChannel is null"); this.onPageReleased = PageReleasedListener.forLocalExchangeMemoryManager(memoryManager); partitionAssignments = new IntArrayList[partitions.size()]; @@ -59,46 +60,36 @@ public PartitioningExchanger( } @Override - public void accept(Page page) + public synchronized void accept(Page page) { - // extract the partitioning channel before entering the critical section - partitionPage(page, extractPartitioningChannels(page)); - } + // reset the assignment lists + for (IntList partitionAssignment : partitionAssignments) { + partitionAssignment.clear(); + } - private synchronized void partitionPage(Page page, Page partitioningChannelsPage) - { - // assign each row to a partition. The assignments lists are all expected to cleared by the previous iterations + // assign each row to a partition + Page partitioningChannelsPage = extractPartitioningChannels(page); for (int position = 0; position < partitioningChannelsPage.getPositionCount(); position++) { int partition = partitionFunction.getPartition(partitioningChannelsPage, position); partitionAssignments[partition].add(position); } // build a page for each partition - for (int partition = 0; partition < partitionAssignments.length; partition++) { + for (int partition = 0; partition < buffers.size(); partition++) { IntArrayList positions = partitionAssignments[partition]; - int partitionSize = positions.size(); - if (partitionSize == 0) { - continue; - } - Page pageSplit; - if (partitionSize == page.getPositionCount()) { - pageSplit = page; // entire page will be sent to this partition, no copies necessary - } - else { - pageSplit = page.copyPositions(positions.elements(), 0, partitionSize); + if (!positions.isEmpty()) { + Page pageSplit = page.copyPositions(positions.elements(), 0, positions.size()); + memoryManager.updateMemoryUsage(pageSplit.getRetainedSizeInBytes()); + buffers.get(partition).accept(new PageReference(pageSplit, 1, onPageReleased)); } - // clear the assigned positions list for the next iteration to start empty - positions.clear(); - memoryManager.updateMemoryUsage(pageSplit.getRetainedSizeInBytes()); - buffers.get(partition).accept(new PageReference(pageSplit, 1, onPageReleased)); } } private Page extractPartitioningChannels(Page inputPage) { // hash value is pre-computed, only needs to extract that channel - if (hashChannel >= 0) { - return inputPage.extractChannel(hashChannel); + if (hashChannel.isPresent()) { + return new Page(inputPage.getBlock(hashChannel.get())); } // extract partitioning channels diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index 5a4570be2ac47..a0efb67ada8a1 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -573,7 +573,7 @@ public LocalExecutionPlan plan( LocalExecutionPlanContext context = new LocalExecutionPlanContext(taskContext, tableWriteInfo); PhysicalOperation physicalOperation = plan.accept(new Visitor(session, stageExecutionDescriptor, remoteSourceFactory, pageSinkCommitRequired), context); - Function pagePreprocessor = enforceLoadedLayoutProcessor(outputLayout, physicalOperation.getLayout()); + Function pagePreprocessor = enforceLayoutProcessor(outputLayout, physicalOperation.getLayout()); List outputTypes = outputLayout.stream() .map(VariableReferenceExpression::getType) @@ -2813,7 +2813,7 @@ private PhysicalOperation createLocalMerge(ExchangeNode node, LocalExecutionPlan List operatorFactories = new ArrayList<>(source.getOperatorFactories()); List expectedLayout = node.getInputs().get(0); - Function pagePreprocessor = enforceLoadedLayoutProcessor(expectedLayout, source.getLayout()); + Function pagePreprocessor = enforceLayoutProcessor(expectedLayout, source.getLayout()); operatorFactories.add(new LocalExchangeSinkOperatorFactory( exchangeFactory, subContext.getNextOperatorId(), @@ -2900,7 +2900,7 @@ else if (context.getDriverInstanceCount().isPresent()) { LocalExecutionPlanContext subContext = driverFactoryParameters.getSubContext(); List expectedLayout = node.getInputs().get(i); - Function pagePreprocessor = enforceLoadedLayoutProcessor(expectedLayout, source.getLayout()); + Function pagePreprocessor = enforceLayoutProcessor(expectedLayout, source.getLayout()); List operatorFactories = new ArrayList<>(source.getOperatorFactories()); operatorFactories.add(new LocalExchangeSinkOperatorFactory( @@ -3222,7 +3222,7 @@ else if (target instanceof RefreshMaterializedViewHandle) { }; } - private static Function enforceLoadedLayoutProcessor(List expectedLayout, Map inputLayout) + private static Function enforceLayoutProcessor(List expectedLayout, Map inputLayout) { int[] channels = expectedLayout.stream() .peek(variable -> checkArgument(inputLayout.containsKey(variable), "channel not found for variable: %s", variable)) @@ -3230,8 +3230,8 @@ private static Function enforceLoadedLayoutProcessor(List