Skip to content

Commit 0470106

Browse files
committed
Reduce contribution of RLE blocks size in page splitting
Consider only the underlying value block size for RLE blocks in page splitting calculation. This prevents the splitting up of Pages due to getSizeInBytes of RLE blocks being a factor of positions count
1 parent 3ac0769 commit 0470106

File tree

2 files changed

+37
-97
lines changed

2 files changed

+37
-97
lines changed

core/trino-main/src/main/java/io/trino/execution/buffer/PageSplitterUtil.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
import com.google.common.collect.ImmutableList;
1717
import io.airlift.slice.Slice;
1818
import io.trino.spi.Page;
19+
import io.trino.spi.block.Block;
20+
import io.trino.spi.block.DictionaryBlock;
21+
import io.trino.spi.block.RunLengthEncodedBlock;
22+
import io.trino.spi.block.ValueBlock;
1923

2024
import java.util.List;
2125

@@ -51,21 +55,41 @@ private static List<Page> splitPage(Page page, long maxPageSizeInBytes, long pre
5155
// the recursion would only terminate when page.getPositionCount() == 1
5256
// and create potentially a large number of Page's of size 1. So we check here that
5357
// if the size of the page doesn't improve from the previous call we terminate the recursion.
54-
if (page.getSizeInBytes() == previousPageSize || page.getSizeInBytes() <= maxPageSizeInBytes || page.getPositionCount() == 1) {
58+
long currentPageSize = getPageSizeForSplit(page);
59+
if (currentPageSize == previousPageSize || currentPageSize <= maxPageSizeInBytes || page.getPositionCount() == 1) {
5560
return ImmutableList.of(page);
5661
}
5762

5863
ImmutableList.Builder<Page> outputPages = ImmutableList.builder();
59-
long previousSize = page.getSizeInBytes();
6064
int positionCount = page.getPositionCount();
6165
int half = positionCount / 2;
6266

6367
Page leftHalf = page.getRegion(0, half);
64-
outputPages.addAll(splitPage(leftHalf, maxPageSizeInBytes, previousSize));
68+
outputPages.addAll(splitPage(leftHalf, maxPageSizeInBytes, currentPageSize));
6569

6670
Page rightHalf = page.getRegion(half, positionCount - half);
67-
outputPages.addAll(splitPage(rightHalf, maxPageSizeInBytes, previousSize));
71+
outputPages.addAll(splitPage(rightHalf, maxPageSizeInBytes, currentPageSize));
6872

6973
return outputPages.build();
7074
}
75+
76+
/**
77+
* Returns the size of the page in bytes.
78+
* This is used to determine if the page should be split.
79+
* This differs from {@link Page#getSizeInBytes()} in that it purposely calculates the size RLE blocks
80+
* as the size of the underlying value block. This is because we want to avoid creating a large number
81+
* of smaller Pages when the source has produced RLE-only Pages with large positions count.
82+
*/
83+
private static long getPageSizeForSplit(Page page)
84+
{
85+
long size = 0;
86+
for (int channel = 0; channel < page.getChannelCount(); channel++) {
87+
Block block = page.getBlock(channel);
88+
switch (block) {
89+
case RunLengthEncodedBlock rleBlock -> size += rleBlock.getUnderlyingValueBlock().getSizeInBytes();
90+
case DictionaryBlock _, ValueBlock _ -> size += block.getSizeInBytes();
91+
}
92+
}
93+
return size;
94+
}
7195
}

core/trino-main/src/test/java/io/trino/execution/TestPageSplitterUtil.java

Lines changed: 9 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,15 @@
1515

1616
import com.google.common.collect.ImmutableList;
1717
import io.airlift.slice.Slice;
18-
import io.trino.spi.Experimental;
1918
import io.trino.spi.Page;
20-
import io.trino.spi.block.ByteArrayBlock;
21-
import io.trino.spi.block.ValueBlock;
19+
import io.trino.spi.block.Block;
20+
import io.trino.spi.block.RunLengthEncodedBlock;
2221
import io.trino.spi.block.VariableWidthBlockBuilder;
2322
import io.trino.spi.type.Type;
2423
import io.trino.testing.MaterializedResult;
2524
import org.junit.jupiter.api.Test;
2625

2726
import java.util.List;
28-
import java.util.Optional;
29-
import java.util.function.ObjLongConsumer;
3027

3128
import static io.airlift.slice.Slices.wrappedBuffer;
3229
import static io.trino.SequencePageBuilder.createSequencePage;
@@ -79,13 +76,13 @@ public void testSplitPageNonDecreasingPageSize()
7976
{
8077
int positionCount = 100;
8178
int maxPageSizeInBytes = 1;
79+
List<Type> types = ImmutableList.of(VARCHAR);
8280

8381
Slice expectedValue = wrappedBuffer("test".getBytes(UTF_8));
8482
VariableWidthBlockBuilder blockBuilder = VARCHAR.createBlockBuilder(null, 1, expectedValue.length());
85-
for (int i = 0; i < positionCount; i++) {
86-
blockBuilder.writeEntry(expectedValue);
87-
}
88-
Page initialPage = new Page(new FixedDataSizeBlock(expectedValue.length(), blockBuilder.buildValueBlock()));
83+
blockBuilder.writeEntry(expectedValue);
84+
Block rleBlock = RunLengthEncodedBlock.create(blockBuilder.build(), positionCount);
85+
Page initialPage = new Page(rleBlock);
8986
List<Page> pages = splitPage(initialPage, maxPageSizeInBytes);
9087

9188
// the page should only be split in half as the recursion should terminate
@@ -98,89 +95,8 @@ public void testSplitPageNonDecreasingPageSize()
9895
assertThat((int) first.getSizeInBytes()).isGreaterThan(maxPageSizeInBytes);
9996
assertThat((int) second.getSizeInBytes()).isGreaterThan(maxPageSizeInBytes);
10097
assertPositionCount(pages, positionCount);
101-
}
102-
103-
// Fake block that has retains a fixed size when split
104-
private record FixedDataSizeBlock(long fixedSize, ValueBlock delegate)
105-
implements ValueBlock
106-
{
107-
@Override
108-
public ValueBlock copyPositions(int[] positions, int offset, int length)
109-
{
110-
return new FixedDataSizeBlock(fixedSize, delegate.copyPositions(positions, offset, length));
111-
}
112-
113-
@Override
114-
public ValueBlock getRegion(int positionOffset, int length)
115-
{
116-
return new FixedDataSizeBlock(fixedSize, delegate.getRegion(positionOffset, length));
117-
}
118-
119-
@Override
120-
public ValueBlock copyRegion(int position, int length)
121-
{
122-
return new FixedDataSizeBlock(fixedSize, delegate.copyRegion(position, length));
123-
}
124-
125-
@Override
126-
public ValueBlock copyWithAppendedNull()
127-
{
128-
return new FixedDataSizeBlock(fixedSize, delegate.copyWithAppendedNull());
129-
}
130-
131-
@Experimental(eta = "2025-01-01")
132-
@Override
133-
public Optional<ByteArrayBlock> getNulls()
134-
{
135-
return delegate.getNulls();
136-
}
137-
138-
@Override
139-
public ValueBlock getSingleValueBlock(int position)
140-
{
141-
return delegate.getSingleValueBlock(position);
142-
}
143-
144-
@Override
145-
public int getPositionCount()
146-
{
147-
return delegate.getPositionCount();
148-
}
149-
150-
@Override
151-
public long getSizeInBytes()
152-
{
153-
return fixedSize;
154-
}
155-
156-
@Override
157-
public long getRegionSizeInBytes(int position, int length)
158-
{
159-
return delegate.getRegionSizeInBytes(position, length);
160-
}
161-
162-
@Override
163-
public long getRetainedSizeInBytes()
164-
{
165-
return delegate.getRetainedSizeInBytes();
166-
}
167-
168-
@Override
169-
public long getEstimatedDataSizeForStats(int position)
170-
{
171-
return delegate.getEstimatedDataSizeForStats(position);
172-
}
173-
174-
@Override
175-
public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
176-
{
177-
delegate.retainedBytesForEachPart(consumer);
178-
}
179-
180-
@Override
181-
public boolean isNull(int position)
182-
{
183-
return delegate.isNull(position);
184-
}
98+
MaterializedResult actual = toMaterializedResult(TEST_SESSION, types, pages);
99+
MaterializedResult expected = toMaterializedResult(TEST_SESSION, types, ImmutableList.of(initialPage));
100+
assertThat(actual).containsExactlyElementsOf(expected);
185101
}
186102
}

0 commit comments

Comments
 (0)