Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ public void addInput(Page page)
{
requireNonNull(page, "page is null");
page = pagePreprocessor.apply(page);
operatorContext.recordOutput(page.getSizeInBytes(), page.getPositionCount());
sink.addPage(page);
operatorContext.recordOutput(page.getSizeInBytes(), page.getPositionCount());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
public class PageChannelSelector
implements Function<Page, Page>
{
// No channels need to be remapped, only ensure that all page blocks are loaded
private static final Function<Page, Page> GET_LOADED_PAGE = Page::getLoadedPage;

private final int[] channels;

public PageChannelSelector(int... channels)
Expand All @@ -38,12 +35,6 @@ public PageChannelSelector(int... channels)
@Override
public Page apply(Page page)
{
// Ensure the channels that are emitted are fully loaded and in the correct order
return requireNonNull(page, "page is null").getLoadedPage(channels);
}

public static Function<Page, Page> identitySelection()
{
return GET_LOADED_PAGE;
return requireNonNull(page, "page is null").extractChannels(channels);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;

import java.util.List;
import java.util.Optional;
Expand All @@ -34,7 +35,7 @@ class PartitioningExchanger
private final LocalExchangeMemoryManager memoryManager;
private final PartitionFunction partitionFunction;
private final int[] partitioningChannels;
private final int hashChannel; // when >= 0, this is the precomputed raw hash column to partition on
private final Optional<Integer> hashChannel;
private final IntArrayList[] partitionAssignments;
private final PageReleasedListener onPageReleased;

Expand All @@ -49,7 +50,7 @@ public PartitioningExchanger(
this.memoryManager = requireNonNull(memoryManager, "memoryManager is null");
this.partitionFunction = requireNonNull(partitionFunction, "partitionFunction is null");
this.partitioningChannels = Ints.toArray(requireNonNull(partitioningChannels, "partitioningChannels is null"));
this.hashChannel = requireNonNull(hashChannel, "hashChannel is null").orElse(-1);
this.hashChannel = requireNonNull(hashChannel, "hashChannel is null");
this.onPageReleased = PageReleasedListener.forLocalExchangeMemoryManager(memoryManager);

partitionAssignments = new IntArrayList[partitions.size()];
Expand All @@ -59,46 +60,36 @@ public PartitioningExchanger(
}

@Override
public void accept(Page page)
public synchronized void accept(Page page)
{
// extract the partitioning channel before entering the critical section
partitionPage(page, extractPartitioningChannels(page));
}
// reset the assignment lists
for (IntList partitionAssignment : partitionAssignments) {
partitionAssignment.clear();
}

private synchronized void partitionPage(Page page, Page partitioningChannelsPage)
{
// assign each row to a partition. The assignments lists are all expected to cleared by the previous iterations
// assign each row to a partition
Page partitioningChannelsPage = extractPartitioningChannels(page);
for (int position = 0; position < partitioningChannelsPage.getPositionCount(); position++) {
int partition = partitionFunction.getPartition(partitioningChannelsPage, position);
partitionAssignments[partition].add(position);
}

// build a page for each partition
for (int partition = 0; partition < partitionAssignments.length; partition++) {
for (int partition = 0; partition < buffers.size(); partition++) {
IntArrayList positions = partitionAssignments[partition];
int partitionSize = positions.size();
if (partitionSize == 0) {
continue;
}
Page pageSplit;
if (partitionSize == page.getPositionCount()) {
pageSplit = page; // entire page will be sent to this partition, no copies necessary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that the issue is that these pages were retaining more memory as a result of not having all of their positions explicitly copied out? If this partitioning exchanger was used to do a local exchange after a remote exchange (fairly common), then the issue with VariableWidthBlocks retaining their entire SerializedPage slice could indeed yield a much higher retained size than before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about the delayed reply. @aweisberg do you have more context on this since you mentioned this PR in our discussion about the local memory issues?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I talked about this with James in Slack, but we didn't pick the correct way to follow up and get this performance improvement without also sometimes regressing on memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I had also made the regressing change to Trino, I followed up there with a fix and a longer description of the problem / alternatives. See: trinodb/trino#9327 (with a follow up PR in trinodb/trino#9379)

As for what to do in PrestoDB, there some unknowns about reintroducing this change with a similar fix on the Trino side, as I do not have a comparable testing environment to ensure that calling Page#compact() explicitly will perform comparably to Page#copyPositions in terms of throughput or reported memory usage for all workloads. Trino has made a variety of other localized changes to related classes (like adding explicit calls to Page#compact() in other places, revised operator and memory tracking implementations, etc). On the Athena side, we chose to take the performance hit of eagerly copying VariableWidthBlock instances out of their input slice during deserialization which means we didn't have the same issue, but that's an expensive hit to take to throughput.

So as I see it, we can either:

  1. Take a harsh performance hit to VariableWidthBlock deserialization throughput, but address this basic memory tracking bug more thoroughly (ie: right now the memory tracking bug isn't typically reported because there is usually a partitioning local exchange after a remote exchange that forces the copy, but not always)
  2. I can put together a PR with this change reintroduced, but with comparable fixes to the ones I made on the Trino side to mitigate the potential issue and let someone with time and realistic production workloads test them out.
  3. Leave the change reverted (I have no objection with this option)

}
else {
pageSplit = page.copyPositions(positions.elements(), 0, partitionSize);
if (!positions.isEmpty()) {
Page pageSplit = page.copyPositions(positions.elements(), 0, positions.size());
memoryManager.updateMemoryUsage(pageSplit.getRetainedSizeInBytes());
buffers.get(partition).accept(new PageReference(pageSplit, 1, onPageReleased));
}
// clear the assigned positions list for the next iteration to start empty
positions.clear();
memoryManager.updateMemoryUsage(pageSplit.getRetainedSizeInBytes());
buffers.get(partition).accept(new PageReference(pageSplit, 1, onPageReleased));
}
}

private Page extractPartitioningChannels(Page inputPage)
{
// hash value is pre-computed, only needs to extract that channel
if (hashChannel >= 0) {
return inputPage.extractChannel(hashChannel);
if (hashChannel.isPresent()) {
return new Page(inputPage.getBlock(hashChannel.get()));
}

// extract partitioning channels
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ public LocalExecutionPlan plan(
LocalExecutionPlanContext context = new LocalExecutionPlanContext(taskContext, tableWriteInfo);
PhysicalOperation physicalOperation = plan.accept(new Visitor(session, stageExecutionDescriptor, remoteSourceFactory, pageSinkCommitRequired), context);

Function<Page, Page> pagePreprocessor = enforceLoadedLayoutProcessor(outputLayout, physicalOperation.getLayout());
Function<Page, Page> pagePreprocessor = enforceLayoutProcessor(outputLayout, physicalOperation.getLayout());

List<Type> outputTypes = outputLayout.stream()
.map(VariableReferenceExpression::getType)
Expand Down Expand Up @@ -2813,7 +2813,7 @@ private PhysicalOperation createLocalMerge(ExchangeNode node, LocalExecutionPlan

List<OperatorFactory> operatorFactories = new ArrayList<>(source.getOperatorFactories());
List<VariableReferenceExpression> expectedLayout = node.getInputs().get(0);
Function<Page, Page> pagePreprocessor = enforceLoadedLayoutProcessor(expectedLayout, source.getLayout());
Function<Page, Page> pagePreprocessor = enforceLayoutProcessor(expectedLayout, source.getLayout());
operatorFactories.add(new LocalExchangeSinkOperatorFactory(
exchangeFactory,
subContext.getNextOperatorId(),
Expand Down Expand Up @@ -2900,7 +2900,7 @@ else if (context.getDriverInstanceCount().isPresent()) {
LocalExecutionPlanContext subContext = driverFactoryParameters.getSubContext();

List<VariableReferenceExpression> expectedLayout = node.getInputs().get(i);
Function<Page, Page> pagePreprocessor = enforceLoadedLayoutProcessor(expectedLayout, source.getLayout());
Function<Page, Page> pagePreprocessor = enforceLayoutProcessor(expectedLayout, source.getLayout());
List<OperatorFactory> operatorFactories = new ArrayList<>(source.getOperatorFactories());

operatorFactories.add(new LocalExchangeSinkOperatorFactory(
Expand Down Expand Up @@ -3222,16 +3222,16 @@ else if (target instanceof RefreshMaterializedViewHandle) {
};
}

private static Function<Page, Page> enforceLoadedLayoutProcessor(List<VariableReferenceExpression> expectedLayout, Map<VariableReferenceExpression, Integer> inputLayout)
private static Function<Page, Page> enforceLayoutProcessor(List<VariableReferenceExpression> expectedLayout, Map<VariableReferenceExpression, Integer> inputLayout)
{
int[] channels = expectedLayout.stream()
.peek(variable -> checkArgument(inputLayout.containsKey(variable), "channel not found for variable: %s", variable))
.mapToInt(inputLayout::get)
.toArray();

if (Arrays.equals(channels, range(0, inputLayout.size()).toArray())) {
// this is an identity mapping, simply ensuring that the page is fully loaded is sufficient
return PageChannelSelector.identitySelection();
// this is an identity mapping
return Function.identity();
}

return new PageChannelSelector(channels);
Expand Down