Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ public BroadcastExchanger(List<Consumer<PageReference>> buffers, LocalExchangeMe
@Override
public void accept(Page page)
{
memoryManager.updateMemoryUsage(page.getRetainedSizeInBytes());

PageReference pageReference = new PageReference(page, buffers.size(), onPageReleased);
memoryManager.updateMemoryUsage(pageReference.getRetainedSizeInBytes());

for (Consumer<PageReference> buffer : buffers) {
buffer.accept(pageReference);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Page removePage()

// dereference the page outside of lock, since may trigger a callback
Page page = pageReference.removePage();
bufferedBytes.addAndGet(-page.getRetainedSizeInBytes());
bufferedBytes.addAndGet(-pageReference.getRetainedSizeInBytes());

checkFinished();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,30 @@ final class PageReference
private volatile int referenceCount;
private final Page page;
private final PageReleasedListener onPageReleased;
private final long retainedSizeInBytes;

public PageReference(Page page, int referenceCount, PageReleasedListener onPageReleased)
{
this.page = requireNonNull(page, "page is null");
this.onPageReleased = requireNonNull(onPageReleased, "onPageReleased is null");
checkArgument(referenceCount >= 1, "referenceCount must be at least 1");
this.referenceCount = referenceCount;
// retained size will be accessed at least once (typically more) so storing the value into this
// field eagerly avoids repeated volatile reads after this point
this.retainedSizeInBytes = page.getRetainedSizeInBytes();
Comment thread
pettyjamesm marked this conversation as resolved.
Outdated
}

public long getRetainedSizeInBytes()
{
return page.getRetainedSizeInBytes();
return retainedSizeInBytes;
}

public Page removePage()
{
int referenceCount = REFERENCE_COUNT_UPDATER.decrementAndGet(this);
checkArgument(referenceCount >= 0, "Page reference count is negative");
if (referenceCount == 0) {
onPageReleased.onPageReleased(page.getRetainedSizeInBytes());
onPageReleased.onPageReleased(retainedSizeInBytes);
}
return page;
}
Expand All @@ -59,7 +63,7 @@ public Page removePage()
public String toString()
{
return toStringHelper(this)
.add("size", getRetainedSizeInBytes())
.add("size", retainedSizeInBytes)
.add("referenceCount", referenceCount)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import io.trino.spi.Page;
import it.unimi.dsi.fastutil.ints.IntArrayList;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -33,6 +36,7 @@ class PartitioningExchanger
private final LocalExchangeMemoryManager memoryManager;
private final Function<Page, Page> partitionedPagePreparer;
private final PartitionFunction partitionFunction;
@GuardedBy("this")
private final IntArrayList[] partitionAssignments;
private final PageReleasedListener onPageReleased;

Expand All @@ -57,10 +61,17 @@ public PartitioningExchanger(
@Override
public void accept(Page page)
{
partitionPage(page, partitionedPagePreparer.apply(page));
Consumer<PageReference> wholePagePartition = partitionPageOrFindWholePagePartition(page, partitionedPagePreparer.apply(page));
if (wholePagePartition != null) {
// whole input page will go to this partition, compact the input page avoid over-retaining memory and to
// match the behavior of sub-partitioned pages that copy positions out
page.compact();
sendPageToPartition(wholePagePartition, page);
}
}

private synchronized void partitionPage(Page page, Page partitionPage)
@Nullable
private synchronized Consumer<PageReference> partitionPageOrFindWholePagePartition(Page page, Page partitionPage)
{
// assign each row to a partition. The assignments lists are all expected to cleared by the previous iterations
for (int position = 0; position < partitionPage.getPositionCount(); position++) {
Expand All @@ -81,19 +92,23 @@ private synchronized void partitionPage(Page page, Page partitionPage)
int[] positions = positionsList.elements();
positionsList.clear();

Page pageSplit;
if (partitionSize == page.getPositionCount()) {
// entire page will be sent to this partition, just compact the page to avoid over-retaining
// memory and match the behavior of the sub-partitioned case
page.compact();
pageSplit = page;
}
else {
pageSplit = page.copyPositions(positions, 0, partitionSize);
// entire page will be sent to this partition, compact and send the page after releasing the lock
return buffers.get(partition);
}
memoryManager.updateMemoryUsage(pageSplit.getRetainedSizeInBytes());
buffers.get(partition).accept(new PageReference(pageSplit, 1, onPageReleased));
Page pageSplit = page.copyPositions(positions, 0, partitionSize);
sendPageToPartition(buffers.get(partition), pageSplit);
}
// No single partition receives the entire input page
return null;
}

// This is safe to call without synchronizing because the partition buffers are themselves threadsafe
private void sendPageToPartition(Consumer<PageReference> buffer, Page pageSplit)
{
PageReference pageReference = new PageReference(pageSplit, 1, onPageReleased);
memoryManager.updateMemoryUsage(pageReference.getRetainedSizeInBytes());
buffer.accept(pageReference);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ public PassthroughExchanger(LocalExchangeSource localExchangeSource, long buffer
@Override
public void accept(Page page)
{
long retainedSizeInBytes = page.getRetainedSizeInBytes();
PageReference pageReference = new PageReference(page, 1, onPageReleased);
long retainedSizeInBytes = pageReference.getRetainedSizeInBytes();
bufferMemoryManager.updateMemoryUsage(retainedSizeInBytes);
memoryTracker.accept(retainedSizeInBytes);

localExchangeSource.addPage(new PageReference(page, 1, onPageReleased));
localExchangeSource.addPage(pageReference);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ public RandomExchanger(List<Consumer<PageReference>> buffers, LocalExchangeMemor
@Override
public void accept(Page page)
{
memoryManager.updateMemoryUsage(page.getRetainedSizeInBytes());
PageReference pageReference = new PageReference(page, 1, onPageReleased);
memoryManager.updateMemoryUsage(pageReference.getRetainedSizeInBytes());
int randomIndex = ThreadLocalRandom.current().nextInt(buffers.size());
buffers.get(randomIndex).accept(new PageReference(page, 1, onPageReleased));
buffers.get(randomIndex).accept(pageReference);
}

@Override
Expand Down