Skip to content

Commit 1e95373

Browse files
committed
Add fast path for single partition output
Adds special case handling for single partition output in PagePartitioner such that partition calculations can be skipped when all rows will necessarily go to a single output partition.
1 parent 1c775a0 commit 1e95373

File tree

8 files changed

+385
-34
lines changed

8 files changed

+385
-34
lines changed

core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.function.IntUnaryOperator;
4545

4646
import static com.google.common.base.Preconditions.checkArgument;
47+
import static com.google.common.base.Preconditions.checkState;
4748
import static com.google.common.base.Verify.verify;
4849
import static io.trino.execution.buffer.PageSplitterUtil.splitAndSerializePage;
4950
import static io.trino.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES;
@@ -138,7 +139,16 @@ public void partitionPage(Page page, OperatorContext operatorContext)
138139
}
139140

140141
int outputPositionCount = replicatesAnyRow && !hasAnyRowBeenReplicated ? page.getPositionCount() + positionsAppenders.length - 1 : page.getPositionCount();
141-
if (page.getPositionCount() < partitionFunction.partitionCount() * COLUMNAR_STRATEGY_COEFFICIENT) {
142+
if (positionsAppenders.length == 1) {
143+
// single output partition, skip partition calculation and append the entire page to the output partition
144+
checkState(partitionFunction.partitionCount() == 1, "partitionFunction must be single partition");
145+
// Any output rows are by definition "replicated" when only a single output partition exists
146+
if (replicatesAnyRow && !hasAnyRowBeenReplicated) {
147+
hasAnyRowBeenReplicated = true;
148+
}
149+
positionsAppenders[0].appendToOutputPartition(page);
150+
}
151+
else if (page.getPositionCount() < partitionFunction.partitionCount() * COLUMNAR_STRATEGY_COEFFICIENT) {
142152
// Partition will have on average less than COLUMNAR_STRATEGY_COEFFICIENT rows.
143153
// Doing it column-wise would degrade performance, so we fall back to row-wise approach.
144154
// Performance degradation is the worst in case of skewed hash distribution when only small subset

core/trino-main/src/main/java/io/trino/operator/output/PositionsAppender.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,22 @@
1414
package io.trino.operator.output;
1515

1616
import io.trino.spi.block.Block;
17+
import io.trino.spi.block.DictionaryBlock;
1718
import io.trino.spi.block.ValueBlock;
1819
import it.unimi.dsi.fastutil.ints.IntArrayList;
1920

2021
public sealed interface PositionsAppender
2122
permits RowPositionsAppender, TypedPositionsAppender
2223
{
24+
/**
25+
* Appends the positions from the list, in the specified order. Implementations are not permitted to modify
26+
* the contents of the {@link IntArrayList} argument as it may be passed directly from {@link DictionaryBlock#getRawIds()}
27+
* without a defensive copy.
28+
*/
2329
void append(IntArrayList positions, ValueBlock source);
2430

31+
void appendRange(ValueBlock block, int offset, int length);
32+
2533
/**
2634
* Appends the specified value positionCount times.
2735
* The result is the same as with using {@link PositionsAppender#append(IntArrayList, ValueBlock)} with

core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,17 @@ private PositionsAppenderPageBuilder(
7373
}
7474
}
7575

76+
public void appendToOutputPartition(Page page)
77+
{
78+
int positionCount = page.getPositionCount();
79+
declarePositions(positionCount);
80+
81+
for (int channel = 0; channel < channelAppenders.length; channel++) {
82+
Block block = page.getBlock(channel);
83+
channelAppenders[channel].appendRange(block, 0, positionCount);
84+
}
85+
}
86+
7687
public void appendToOutputPartition(Page page, IntArrayList positions)
7788
{
7889
declarePositions(positions.size());

core/trino-main/src/main/java/io/trino/operator/output/RowPositionsAppender.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,47 @@ public void append(IntArrayList positions, ValueBlock block)
115115
resetSize();
116116
}
117117

118+
@Override
119+
public void appendRange(ValueBlock block, int offset, int length)
120+
{
121+
checkArgument(block instanceof RowBlock, "Block must be instance of %s", RowBlock.class);
122+
if (length == 0) {
123+
return;
124+
}
125+
126+
RowBlock sourceRowBlock = (RowBlock) block;
127+
ensureCapacity(length);
128+
129+
Block[] rawFieldBlocks = sourceRowBlock.getRawFieldBlocks();
130+
int startOffset = sourceRowBlock.getOffsetBase();
131+
132+
for (int i = 0; i < fieldAppenders.length; i++) {
133+
fieldAppenders[i].appendRange(rawFieldBlocks[i], startOffset + offset, length);
134+
}
135+
136+
boolean[] rawRowIsNull = sourceRowBlock.getRawRowIsNull();
137+
if (rawRowIsNull != null) {
138+
for (int i = 0; i < length; i++) {
139+
boolean isNull = rawRowIsNull[startOffset + offset + i];
140+
hasNullRow |= isNull;
141+
hasNonNullRow |= !isNull;
142+
if (hasNullRow & hasNonNullRow) {
143+
System.arraycopy(rawRowIsNull, startOffset + offset + i, rowIsNull, positionCount + i, length - i);
144+
break;
145+
}
146+
else {
147+
rowIsNull[positionCount + i] = isNull;
148+
}
149+
}
150+
}
151+
else {
152+
hasNonNullRow = true;
153+
}
154+
155+
positionCount += length;
156+
resetSize();
157+
}
158+
118159
@Override
119160
public void appendRle(ValueBlock value, int rlePositionCount)
120161
{

core/trino-main/src/main/java/io/trino/operator/output/TypedPositionsAppender.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ public void append(IntArrayList positions, ValueBlock block)
4444
blockBuilder.appendPositions(block, positions.elements(), 0, positions.size());
4545
}
4646

47+
@Override
48+
public void appendRange(ValueBlock block, int offset, int length)
49+
{
50+
blockBuilder.appendRange(block, offset, length);
51+
}
52+
4753
@Override
4854
public void appendRle(ValueBlock block, int count)
4955
{

core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
2424
import jakarta.annotation.Nullable;
2525

26+
import java.util.Arrays;
2627
import java.util.Optional;
2728

2829
import static com.google.common.base.Preconditions.checkArgument;
@@ -32,6 +33,7 @@
3233
import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize;
3334
import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize;
3435
import static java.lang.Math.max;
36+
import static java.util.Objects.checkFromIndexSize;
3537
import static java.util.Objects.requireNonNull;
3638

3739
/**
@@ -68,6 +70,50 @@ public UnnestingPositionsAppender(PositionsAppender delegate, Optional<BlockPosi
6870
this.identicalOperator = identicalOperator.orElse(null);
6971
}
7072

73+
public void appendRange(Block source, int offset, int length)
74+
{
75+
if (length == 0) {
76+
return;
77+
}
78+
79+
switch (source) {
80+
case RunLengthEncodedBlock rleBlock -> {
81+
appendRle(rleBlock.getValue(), length);
82+
}
83+
case DictionaryBlock dictionaryBlock -> {
84+
ValueBlock dictionary = dictionaryBlock.getDictionary();
85+
if (state == State.UNINITIALIZED) {
86+
state = State.DICTIONARY;
87+
this.dictionary = dictionary;
88+
dictionaryIdsBuilder.appendRange(dictionaryBlock, offset, length);
89+
}
90+
else if (state == State.DICTIONARY && this.dictionary == dictionary) {
91+
dictionaryIdsBuilder.appendRange(dictionaryBlock, offset, length);
92+
}
93+
else {
94+
transitionToDirect();
95+
96+
int[] rawIds = dictionaryBlock.getRawIds();
97+
int rawOffset = dictionaryBlock.getRawIdsOffset();
98+
checkFromIndexSize(rawOffset + offset, length, rawIds.length);
99+
IntArrayList positionsList;
100+
if (rawOffset + offset == 0) {
101+
// Fast path, no copy necessary
102+
positionsList = IntArrayList.wrap(rawIds, length);
103+
}
104+
else {
105+
positionsList = IntArrayList.wrap(Arrays.copyOfRange(rawIds, rawOffset + offset, rawOffset + offset + length));
106+
}
107+
delegate.append(positionsList, dictionary);
108+
}
109+
}
110+
case ValueBlock valueBlock -> {
111+
transitionToDirect();
112+
delegate.appendRange(valueBlock, offset, length);
113+
}
114+
}
115+
}
116+
71117
public void append(IntArrayList positions, Block source)
72118
{
73119
if (positions.isEmpty()) {
@@ -275,6 +321,17 @@ public void appendPositions(IntArrayList positions, DictionaryBlock block)
275321
size += positions.size();
276322
}
277323

324+
public void appendRange(DictionaryBlock block, int offset, int length)
325+
{
326+
checkArgument(length > 0, "block has no positions");
327+
checkFromIndexSize(offset, length, block.getPositionCount());
328+
ensureCapacity(size + length);
329+
int[] rawIds = block.getRawIds();
330+
int rawIdsOffset = block.getRawIdsOffset();
331+
System.arraycopy(rawIds, rawIdsOffset + offset, dictionaryIds, size, length);
332+
size += length;
333+
}
334+
278335
public DictionaryIdsBuilder newBuilderLike()
279336
{
280337
if (size == 0) {

0 commit comments

Comments
 (0)