Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,23 @@ public void appendRle(Block block, int rlePositionCount)
updateSize(rlePositionCount);
}

@Override
public void append(int sourcePosition, Block source)
{
ensureCapacity(positionCount + 1);
if (source.isNull(sourcePosition)) {
valueIsNull[positionCount] = true;
hasNullValue = true;
}
else {
values[positionCount] = source.getByte(sourcePosition, 0);
hasNonNullValue = true;
}
positionCount++;

updateSize(1);
}

@Override
public Block build()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,25 @@ public void appendRle(Block block, int rlePositionCount)
updateSize(rlePositionCount);
}

@Override
public void append(int sourcePosition, Block source)
{
ensureCapacity(positionCount + 1);
if (source.isNull(sourcePosition)) {
valueIsNull[positionCount] = true;
hasNullValue = true;
}
else {
int positionIndex = positionCount * 2;
values[positionIndex] = source.getLong(sourcePosition, 0);
values[positionIndex + 1] = source.getLong(sourcePosition, SIZE_OF_LONG);
hasNonNullValue = true;
}
positionCount++;

updateSize(1);
}

@Override
public Block build()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,25 @@ public void appendRle(Block block, int rlePositionCount)
updateSize(rlePositionCount);
}

@Override
public void append(int sourcePosition, Block source)
{
ensureCapacity(positionCount + 1);
if (source.isNull(sourcePosition)) {
valueIsNull[positionCount] = true;
hasNullValue = true;
}
else {
high[positionCount] = source.getLong(sourcePosition, 0);
low[positionCount] = source.getInt(sourcePosition, SIZE_OF_LONG);

hasNonNullValue = true;
}
positionCount++;

updateSize(1);
}

@Override
public Block build()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,23 @@ public void appendRle(Block block, int rlePositionCount)
updateSize(rlePositionCount);
}

@Override
public void append(int sourcePosition, Block source)
{
ensureCapacity(positionCount + 1);
if (source.isNull(sourcePosition)) {
valueIsNull[positionCount] = true;
hasNullValue = true;
}
else {
values[positionCount] = source.getInt(sourcePosition, 0);
hasNonNullValue = true;
}
positionCount++;

updateSize(1);
}

@Override
public Block build()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,23 @@ public void appendRle(Block block, int rlePositionCount)
updateSize(rlePositionCount);
}

@Override
public void append(int sourcePosition, Block source)
{
ensureCapacity(positionCount + 1);
if (source.isNull(sourcePosition)) {
valueIsNull[positionCount] = true;
hasNullValue = true;
}
else {
values[positionCount] = source.getLong(sourcePosition, 0);
hasNonNullValue = true;
}
positionCount++;

updateSize(1);
}

@Override
public Block build()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.trino.operator.OperatorContext;
import io.trino.operator.PartitionFunction;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.block.Block;
import io.trino.spi.block.DictionaryBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
Expand Down Expand Up @@ -62,14 +61,12 @@ public class PagePartitioner
{
private static final int COLUMNAR_STRATEGY_COEFFICIENT = 2;
private final OutputBuffer outputBuffer;
private final Type[] sourceTypes;
private final PartitionFunction partitionFunction;
private final int[] partitionChannels;
private final LocalMemoryContext memoryContext;
@Nullable
private final Block[] partitionConstantBlocks; // when null, no constants are present. Only non-null elements are constants
private final PageSerializer serializer;
private final PageBuilder[] pageBuilders;
private final PositionsAppenderPageBuilder[] positionsAppenders;
private final boolean replicatesAnyRow;
private final int nullChannel; // when >= 0, send the position to every partition if this channel is null
Expand Down Expand Up @@ -107,7 +104,6 @@ public PagePartitioner(
this.replicatesAnyRow = replicatesAnyRow;
this.nullChannel = nullChannel.orElse(-1);
this.outputBuffer = requireNonNull(outputBuffer, "outputBuffer is null");
this.sourceTypes = sourceTypes.toArray(new Type[0]);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems we could just modify partitionPage as:

        ...
        long buildersSizeInBytes = getSizeInBytes();

        // don't exceed average maxPageSize when appender and builders are combined together
        if (buildersSizeInBytes > maxMemory.toBytes()) {
            flushPositionsAppenders(true);
            flushPageBuilders(true);
        }

        updateMemoryUsage();
}

Seems like this change would be much smaller and doesn't rise questions about potential regressions (especially for FTE where there will be a lot of small partitions potentially).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, the if there raises questions about potential regressions.
It can easily produce small pages. For example, if 50% of positions are handled by PostiionsAppenders and the other 50% by BlockBuilders this will cause the average page size to be 50% of the expected page size given enough partitions.
If only 5% of positions go to either one then this will produce a lot of extremely small pages because it will always flush almost empty buffers

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can easily produce small pages. For example, if 50% of positions are handled by PostiionsAppenders and the other 50% by BlockBuilders this will cause the average page size to be 50% of the expected page size given enough partitions.

50% is still big enough too, right?

If only 5% of positions go to either one then this will produce a lot of extremely small pages because it will always flush almost empty buffers

If you have 5% vs 95%, then you have one small page and one large page. It is fine, see Javadoc for io.trino.operator.project.MergePages

this.serializer = serdeFactory.createSerializer(exchangeEncryptionKey.map(Ciphers::deserializeAesEncryptionKey));

// Ensure partition channels align with constant arguments provided
Expand All @@ -124,11 +120,7 @@ public PagePartitioner(

this.positionsAppenders = new PositionsAppenderPageBuilder[partitionCount];
for (int i = 0; i < partitionCount; i++) {
positionsAppenders[i] = PositionsAppenderPageBuilder.withMaxPageSize(pageSize, sourceTypes, positionsAppenderFactory);
}
this.pageBuilders = new PageBuilder[partitionCount];
for (int i = 0; i < partitionCount; i++) {
pageBuilders[i] = PageBuilder.withMaxPageSize(pageSize, sourceTypes);
positionsAppenders[i] = PositionsAppenderPageBuilder.withMaxPageSize(pageSize, requireNonNull(sourceTypes, "sourceTypes is null"), positionsAppenderFactory);
}
this.memoryContext = aggregatedMemoryContext.newLocalMemoryContext(PagePartitioner.class.getSimpleName());
this.partitionsInitialRetainedSize = getRetainedSizeInBytes();
Expand Down Expand Up @@ -183,8 +175,8 @@ public void partitionPageByRow(Page page)
int position;
// Handle "any row" replication outside of the inner loop processing
if (replicatesAnyRow && !hasAnyRowBeenReplicated) {
for (PageBuilder pageBuilder : pageBuilders) {
appendRow(pageBuilder, page, 0);
for (PositionsAppenderPageBuilder pageBuilder : positionsAppenders) {
pageBuilder.appendToOutputPartition(page, 0);
}
hasAnyRowBeenReplicated = true;
position = 1;
Expand All @@ -199,34 +191,24 @@ public void partitionPageByRow(Page page)
Block nullsBlock = page.getBlock(nullChannel);
for (; position < page.getPositionCount(); position++) {
if (nullsBlock.isNull(position)) {
for (PageBuilder pageBuilder : pageBuilders) {
appendRow(pageBuilder, page, position);
for (PositionsAppenderPageBuilder pageBuilder : positionsAppenders) {
pageBuilder.appendToOutputPartition(page, position);
}
}
else {
int partition = partitionFunction.getPartition(partitionFunctionArgs, position);
appendRow(pageBuilders[partition], page, position);
positionsAppenders[partition].appendToOutputPartition(page, position);
}
}
}
else {
for (; position < page.getPositionCount(); position++) {
int partition = partitionFunction.getPartition(partitionFunctionArgs, position);
appendRow(pageBuilders[partition], page, position);
positionsAppenders[partition].appendToOutputPartition(page, position);
}
}

flushPageBuilders(false);
}

private void appendRow(PageBuilder pageBuilder, Page page, int position)
{
pageBuilder.declarePosition();

for (int channel = 0; channel < sourceTypes.length; channel++) {
Type type = sourceTypes[channel];
type.appendTo(page.getBlock(channel), position, pageBuilder.getBlockBuilder(channel));
}
flushPositionsAppenders(false);
}

public void partitionPageByColumn(Page page)
Expand Down Expand Up @@ -451,30 +433,14 @@ public void close()
{
try {
flushPositionsAppenders(true);
flushPageBuilders(true);
}
finally {
// clear buffers before memory release
Arrays.fill(positionsAppenders, null);
Arrays.fill(pageBuilders, null);
memoryContext.close();
}
}

private void flushPageBuilders(boolean force)
{
// add all full pages to output buffer
for (int partition = 0; partition < pageBuilders.length; partition++) {
PageBuilder partitionPageBuilder = pageBuilders[partition];
if (!partitionPageBuilder.isEmpty() && (force || partitionPageBuilder.isFull())) {
Page pagePartition = partitionPageBuilder.build();
partitionPageBuilder.reset();

enqueuePage(pagePartition, partition);
}
}
}

private void flushPositionsAppenders(boolean force)
{
// add all full pages to output buffer
Expand Down Expand Up @@ -511,9 +477,6 @@ private long getSizeInBytes()
for (PositionsAppenderPageBuilder pageBuilder : positionsAppenders) {
sizeInBytes += pageBuilder.getSizeInBytes();
}
for (PageBuilder pageBuilder : pageBuilders) {
sizeInBytes += pageBuilder.getSizeInBytes();
}
return sizeInBytes;
}

Expand All @@ -526,9 +489,6 @@ private long getRetainedSizeInBytes()
for (PositionsAppenderPageBuilder pageBuilder : positionsAppenders) {
sizeInBytes += pageBuilder.getRetainedSizeInBytes();
}
for (PageBuilder pageBuilder : pageBuilders) {
sizeInBytes += pageBuilder.getRetainedSizeInBytes();
}
sizeInBytes += serializer.getRetainedSizeInBytes();
return sizeInBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ public interface PositionsAppender
*/
void appendRle(Block value, int rlePositionCount);

/**
Copy link
Copy Markdown
Member

@sopel39 sopel39 Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a new method TBH. It seems you could just optimize either existing append or appendRle for single row use.

Also comment here should probably highly discourage usage of single row append as appenders won't be efficient in that case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like the cost of allocating single position arrays for every row might be significant. @lukasz-stec Do you know what most of the overhead comes from?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I could add an if (positionCount == 1) to appendRle to optimize it and then most of overhead would be that if. Not only during execution but also during compilation, which could impact batch append path.
IMO separate method is a lot cleaner and easier to optimize and since PositionsAppender is not a generic API to be used anywhere, at least not at the moment ,we shouldn't worry about adding a method to it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it and then most of overhead would be that if

I don't think that would be an overhead. It won't be for batched append. And for row-by-row appends this if would evaluate to same value, so it's largely irrelevant.

* Appends single position. The implementation must be conceptually equal to
* {@code append(IntArrayList.wrap(new int[] {position}), source)} but may be optimized.
* Caller should avoid using this method if {@link #append(IntArrayList, Block)} can be used
* as appending positions one by one can be significantly slower and may not support features
* like pushing RLE through the appender.
*/
void append(int position, Block source);

/**
* Creates the block from the appender data.
* After this, appender is reset to the initial state, and it is ready to build a new block.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ public void appendToOutputPartition(Page page, IntArrayList positions)
}
}

public void appendToOutputPartition(Page page, int position)
{
declarePositions(1);

for (int channel = 0; channel < channelAppenders.length; channel++) {
Block block = page.getBlock(channel);
channelAppenders[channel].append(position, block);
}
}

public long getRetainedSizeInBytes()
{
// We use a foreach loop instead of streams
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ else if (rleValue != null) {
}
}

@Override
public void append(int position, Block value)
{
switchToFlat();
Copy link
Copy Markdown
Member

@sopel39 sopel39 Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like it might not be beneficial (e.g. switch-to-flat for 1000 rows to just append single row). This only makes sense if the you know you will call this method a lot of times from now on.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could handle rle here at some CPU cost but we don't handle it in the blockBuilder case anyway

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but appenders are not block builders nor they should behave like ones. Appenders are optimized to process big batches of positions (including special block types like RLEs and dictionaries). Here we quickly fallback to block-builder behaviour just because we've seen single row append.

delegate.append(position, value);
}

@Override
public Block build()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,36 @@ else if (value.isNull(0)) {
updateSize();
}

@Override
public void append(int position, Block value)
{
ensureCapacity(1);
if (value instanceof AbstractRowBlock sourceRowBlock) {
if (sourceRowBlock.isNull(position)) {
rowIsNull[positionCount] = true;
hasNullRow = true;
}
else {
// append not null row value
List<Block> fieldBlocks = sourceRowBlock.getChildren();
int fieldPosition = sourceRowBlock.getFieldBlockOffset(position);
for (int i = 0; i < fieldAppenders.length; i++) {
fieldAppenders[i].append(fieldPosition, fieldBlocks.get(i));
}
hasNonNullRow = true;
}
}
else if (value.isNull(position)) {
rowIsNull[positionCount] = true;
hasNullRow = true;
}
else {
throw new IllegalArgumentException("unsupported block type: " + value);
}
positionCount++;
updateSize();
}

@Override
public Block build()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,23 @@ public void appendRle(Block block, int rlePositionCount)
updateSize(rlePositionCount);
}

@Override
public void append(int sourcePosition, Block source)
{
ensureCapacity(positionCount + 1);
if (source.isNull(sourcePosition)) {
valueIsNull[positionCount] = true;
hasNullValue = true;
}
else {
values[positionCount] = source.getShort(sourcePosition, 0);
hasNonNullValue = true;
}
positionCount++;

updateSize(1);
}

@Override
public Block build()
{
Expand Down
Loading