Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class PagePartitioner
private final PositionsAppenderPageBuilder[] positionsAppenders;
private final boolean replicatesAnyRow;
private final int nullChannel; // when >= 0, send the position to every partition if this channel is null
private final long partitionsInitialRetainedSize;
private PartitionedOutputInfoSupplier partitionedOutputInfoSupplier;

private boolean hasAnyRowBeenReplicated;
Expand Down Expand Up @@ -123,8 +122,7 @@ public PagePartitioner(
positionsAppenders[i] = PositionsAppenderPageBuilder.withMaxPageSize(pageSize, requireNonNull(sourceTypes, "sourceTypes is null"), positionsAppenderFactory);
}
this.memoryContext = aggregatedMemoryContext.newLocalMemoryContext(PagePartitioner.class.getSimpleName());
this.partitionsInitialRetainedSize = getRetainedSizeInBytes();
this.memoryContext.setBytes(partitionsInitialRetainedSize);
updateMemoryUsage();
}

// sets up this partitioner for the new operator
Expand Down Expand Up @@ -154,17 +152,6 @@ public void partitionPage(Page page)
updateMemoryUsage();
}

private void updateMemoryUsage()
{
// We use getSizeInBytes() here instead of getRetainedSizeInBytes() for an approximation of
// the amount of memory used by the pageBuilders, because calculating the retained
// size can be expensive especially for complex types.
long partitionsSizeInBytes = getSizeInBytes();

// We also add partitionsInitialRetainedSize as an approximation of the object overhead of the partitions.
memoryContext.setBytes(partitionsSizeInBytes + partitionsInitialRetainedSize);
}

public void partitionPageByRow(Page page)
{
requireNonNull(page, "page is null");
Expand Down Expand Up @@ -469,28 +456,14 @@ private List<Slice> splitAndSerializePage(Page page)
return builder.build();
}

private long getSizeInBytes()
{
// We use a foreach loop instead of streams
// as it has much better performance.
long sizeInBytes = 0;
for (PositionsAppenderPageBuilder pageBuilder : positionsAppenders) {
sizeInBytes += pageBuilder.getSizeInBytes();
}
return sizeInBytes;
}

/**
* This method can be expensive for complex types.
*/
private long getRetainedSizeInBytes()
private void updateMemoryUsage()
{
long sizeInBytes = 0;
long retainedSizeInBytes = 0;
for (PositionsAppenderPageBuilder pageBuilder : positionsAppenders) {
sizeInBytes += pageBuilder.getRetainedSizeInBytes();
retainedSizeInBytes += pageBuilder.getRetainedSizeInBytes();
}
sizeInBytes += serializer.getRetainedSizeInBytes();
return sizeInBytes;
retainedSizeInBytes += serializer.getRetainedSizeInBytes();
memoryContext.setBytes(retainedSizeInBytes);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public class RowPositionsAppender
private boolean hasNullRow;
private boolean hasNonNullRow;
private boolean[] rowIsNull = new boolean[0];
private long retainedSizeInBytes;
private long sizeInBytes;
private long retainedSizeInBytes = -1;
private long sizeInBytes = -1;

public static RowPositionsAppender createRowAppender(
PositionsAppenderFactory positionsAppenderFactory,
Expand All @@ -63,7 +63,7 @@ private RowPositionsAppender(PositionsAppender[] fieldAppenders, int expectedPos
{
this.fieldAppenders = requireNonNull(fieldAppenders, "fields is null");
this.initialEntryCount = expectedPositions;
updateRetainedSize();
resetSize();
}

@Override
Expand Down Expand Up @@ -103,7 +103,7 @@ else if (allPositionsNull(positions, block)) {
}

positionCount += positions.size();
updateSize();
resetSize();
}

@Override
Expand Down Expand Up @@ -135,7 +135,7 @@ else if (value.isNull(0)) {
throw new IllegalArgumentException("unsupported block type: " + value);
}
positionCount += rlePositionCount;
updateSize();
resetSize();
}

@Override
Expand Down Expand Up @@ -165,7 +165,7 @@ else if (value.isNull(position)) {
throw new IllegalArgumentException("unsupported block type: " + value);
}
positionCount++;
updateSize();
resetSize();
}

@Override
Expand All @@ -191,16 +191,32 @@ public Block build()
@Override
public long getRetainedSizeInBytes()
{
long size = retainedSizeInBytes;
if (retainedSizeInBytes != -1) {
return retainedSizeInBytes;
}

long size = INSTANCE_SIZE + sizeOf(rowIsNull);
for (PositionsAppender field : fieldAppenders) {
size += field.getRetainedSizeInBytes();
}

retainedSizeInBytes = size;
return size;
}

@Override
public long getSizeInBytes()
{
if (sizeInBytes != -1) {
return sizeInBytes;
}

long size = (Integer.BYTES + Byte.BYTES) * (long) positionCount;
for (PositionsAppender field : fieldAppenders) {
size += field.getSizeInBytes();
}

sizeInBytes = size;
return sizeInBytes;
}

Expand All @@ -210,10 +226,9 @@ private void reset()
initialized = false;
rowIsNull = new boolean[0];
positionCount = 0;
sizeInBytes = 0;
hasNonNullRow = false;
hasNullRow = false;
updateRetainedSize();
resetSize();
}

private boolean allPositionsNull(IntArrayList positions, Block block)
Expand Down Expand Up @@ -265,21 +280,13 @@ private void ensureCapacity(int additionalCapacity)

int newCapacity = Math.max(newSize, positionCount + additionalCapacity);
rowIsNull = Arrays.copyOf(rowIsNull, newCapacity);
updateRetainedSize();
resetSize();
}
}

private void updateSize()
{
long size = (Integer.BYTES + Byte.BYTES) * (long) positionCount;
for (PositionsAppender field : fieldAppenders) {
size += field.getSizeInBytes();
}
sizeInBytes = size;
}

private void updateRetainedSize()
private void resetSize()
{
retainedSizeInBytes = INSTANCE_SIZE + sizeOf(rowIsNull);
sizeInBytes = -1;
retainedSizeInBytes = -1;
}
}