Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ public GroupByIdBlock getResult()

return new GroupByIdBlock(
nextGroupId,
new RunLengthEncodedBlock(
RunLengthEncodedBlock.create(
BIGINT.createFixedSizeBlockBuilder(1).writeLong(groupId).build(),
block.getPositionCount()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Page transformPage(Page inputPage)
Block operationChannelBlock = mergeRow.getField(mergeRow.getFieldCount() - 2);
builder.add(operationChannelBlock);
builder.add(inputPage.getBlock(rowIdChannel));
builder.add(new RunLengthEncodedBlock(INSERT_FROM_UPDATE_BLOCK, positionCount));
builder.add(RunLengthEncodedBlock.create(INSERT_FROM_UPDATE_BLOCK, positionCount));

Page result = new Page(builder.toArray(Block[]::new));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,14 @@ private Page generateNextPage()

for (int i = 0; i < groupingSetInputs[currentGroupingSet].length; i++) {
if (groupingSetInputs[currentGroupingSet][i] == -1) {
outputBlocks[i] = new RunLengthEncodedBlock(nullBlocks[i], currentPage.getPositionCount());
outputBlocks[i] = RunLengthEncodedBlock.create(nullBlocks[i], currentPage.getPositionCount());
}
else {
outputBlocks[i] = currentPage.getBlock(groupingSetInputs[currentGroupingSet][i]);
}
}

outputBlocks[outputBlocks.length - 1] = new RunLengthEncodedBlock(groupIdBlocks[currentGroupingSet], currentPage.getPositionCount());
outputBlocks[outputBlocks.length - 1] = RunLengthEncodedBlock.create(groupIdBlocks[currentGroupingSet], currentPage.getPositionCount());
currentGroupingSet = (currentGroupingSet + 1) % groupingSetInputs.length;
Page outputPage = new Page(currentPage.getPositionCount(), outputBlocks);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ private Block processNextGroupIds(GroupByIdBlock ids)
// must have > 1 positions to benefit from using a RunLengthEncoded block
if (nextDistinctId == ids.getGroupCount()) {
// no new distinct positions
return new RunLengthEncodedBlock(BooleanType.createBlockForSingleNonNullValue(false), positions);
return RunLengthEncodedBlock.create(BooleanType.createBlockForSingleNonNullValue(false), positions);
}
if (nextDistinctId + positions == ids.getGroupCount()) {
// all positions are distinct
nextDistinctId = ids.getGroupCount();
return new RunLengthEncodedBlock(BooleanType.createBlockForSingleNonNullValue(true), positions);
return RunLengthEncodedBlock.create(BooleanType.createBlockForSingleNonNullValue(true), positions);
}
}
byte[] distinctMask = new byte[positions];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ public GroupByIdBlock getResult()

return new GroupByIdBlock(
nextGroupId,
new RunLengthEncodedBlock(
RunLengthEncodedBlock.create(
BIGINT.createFixedSizeBlockBuilder(1).writeLong(groupId).build(),
page.getPositionCount()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public Page next()
// For the page with less rows, create RLE blocks and add them to the blocks array
for (int i = 0; i < smallPageOutputBlocks.length; i++) {
Block block = smallPageOutputBlocks[i].getSingleValueBlock(rowIndex);
resultBlockBuffer[indexForRleBlocks + i] = new RunLengthEncodedBlock(block, largePagePositionCount);
resultBlockBuffer[indexForRleBlocks + i] = RunLengthEncodedBlock.create(block, largePagePositionCount);
}
// Page constructor will create a copy of the block buffer (and must for correctness)
return new Page(largePagePositionCount, resultBlockBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
Expand Down Expand Up @@ -124,7 +123,7 @@ public Block build()
result = new ByteArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values);
}
else {
result = new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount);
}
reset();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
Expand Down Expand Up @@ -137,7 +136,7 @@ public Block build()
result = new Int128ArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values);
}
else {
result = new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount);
}
reset();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,8 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
Expand Down Expand Up @@ -132,7 +131,7 @@ public Block build()
result = new Int96ArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), high, low);
}
else {
result = new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount);
}
reset();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
Expand Down Expand Up @@ -124,7 +123,7 @@ public Block build()
result = new IntArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values);
}
else {
result = new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount);
}
reset();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
Expand Down Expand Up @@ -124,7 +123,7 @@ public Block build()
result = new LongArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values);
}
else {
result = new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount);
}
reset();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ private Page getPartitionFunctionArguments(Page page)
for (int i = 0; i < blocks.length; i++) {
int channel = partitionChannels[i];
if (channel < 0) {
blocks[i] = new RunLengthEncodedBlock(partitionConstantBlocks[i], page.getPositionCount());
blocks[i] = RunLengthEncodedBlock.create(partitionConstantBlocks[i], page.getPositionCount());
}
else {
blocks[i] = page.getBlock(channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,18 @@
package io.trino.operator.output;

import io.trino.spi.block.Block;
import io.trino.spi.block.RunLengthEncodedBlock;
import it.unimi.dsi.fastutil.ints.IntArrayList;

public interface PositionsAppender
{
void append(IntArrayList positions, Block source);

/**
* Appends value from the {@code rleBlock} to this appender {@link RunLengthEncodedBlock#getPositionCount()} times.
* Appends the specified value positionCount times.
* The result is the same as with using {@link PositionsAppender#append(IntArrayList, Block)} with
* positions list [0...{@link RunLengthEncodedBlock#getPositionCount()} -1]
* but with possible performance optimizations for {@link RunLengthEncodedBlock}.
* positions list [0...positionCount -1] but with possible performance optimizations.
*/
void appendRle(RunLengthEncodedBlock rleBlock);
void appendRle(Block value, int rlePositionCount);

/**
* Creates the block from the appender data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,31 +60,32 @@ public void append(IntArrayList positions, Block source)
}

@Override
public void appendRle(RunLengthEncodedBlock source)
public void appendRle(Block value, int positionCount)
{
if (source.getPositionCount() == 0) {
if (positionCount == 0) {
return;
}
checkArgument(value.getPositionCount() == 1, "Expected value to contain a single position but has %d positions".formatted(value.getPositionCount()));

if (rlePositionCount == 0) {
// initial empty state, switch to RLE state
rleValue = source.getValue();
rlePositionCount = source.getPositionCount();
rleValue = value;
rlePositionCount = positionCount;
}
else if (rleValue != null) {
// we are in the RLE state
if (equalOperator.equalNullSafe(rleValue, 0, source.getValue(), 0)) {
if (equalOperator.equalNullSafe(rleValue, 0, value, 0)) {
// the values match. we can just add positions.
this.rlePositionCount += source.getPositionCount();
this.rlePositionCount += positionCount;
return;
}
// RLE values do not match. switch to flat state
switchToFlat();
delegate.appendRle(source);
delegate.appendRle(value, positionCount);
}
else {
// flat state
delegate.appendRle(source);
delegate.appendRle(value, positionCount);
}
}

Expand All @@ -93,7 +94,7 @@ public Block build()
{
Block result;
if (rleValue != null) {
result = new RunLengthEncodedBlock(rleValue, rlePositionCount);
result = RunLengthEncodedBlock.create(rleValue, rlePositionCount);
}
else {
result = delegate.build();
Expand Down Expand Up @@ -127,7 +128,7 @@ private void switchToFlat()
{
if (rleValue != null) {
// we are in the RLE state, flatten all RLE blocks
delegate.appendRle(new RunLengthEncodedBlock(rleValue, rlePositionCount));
delegate.appendRle(rleValue, rlePositionCount);
rleValue = null;
}
rlePositionCount = NO_RLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,10 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock rleBlock)
public void appendRle(Block value, int rlePositionCount)
{
int rlePositionCount = rleBlock.getPositionCount();
ensureCapacity(rlePositionCount);
AbstractRowBlock sourceRowBlock = (AbstractRowBlock) rleBlock.getValue();
AbstractRowBlock sourceRowBlock = (AbstractRowBlock) value;
if (sourceRowBlock.isNull(0)) {
// append rlePositionCount nulls
Arrays.fill(rowIsNull, positionCount, positionCount + rlePositionCount, true);
Expand All @@ -111,7 +110,7 @@ public void appendRle(RunLengthEncodedBlock rleBlock)
List<Block> fieldBlocks = sourceRowBlock.getChildren();
int fieldPosition = sourceRowBlock.getFieldBlockOffset(0);
for (int i = 0; i < fieldAppenders.length; i++) {
fieldAppenders[i].appendRle(new RunLengthEncodedBlock(fieldBlocks.get(i).getSingleValueBlock(fieldPosition), rlePositionCount));
fieldAppenders[i].appendRle(fieldBlocks.get(i).getSingleValueBlock(fieldPosition), rlePositionCount);
}
hasNonNullRow = true;
}
Expand All @@ -132,7 +131,7 @@ public Block build()
}
else {
Block nullRowBlock = fromFieldBlocks(1, Optional.of(new boolean[] {true}), fieldBlocks);
result = new RunLengthEncodedBlock(nullRowBlock, positionCount);
result = RunLengthEncodedBlock.create(nullRowBlock, positionCount);
}

reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
Expand Down Expand Up @@ -124,7 +123,7 @@ public Block build()
result = new ShortArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values);
}
else {
result = new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount);
}
reset();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ public void append(IntArrayList positions, Block block)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
Expand Down Expand Up @@ -154,7 +153,7 @@ public Block build()
hasNullValue ? Optional.of(valueIsNull) : Optional.empty());
}
else {
result = new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
result = RunLengthEncodedBlock.create(NULL_VALUE_BLOCK, positionCount);
}
reset();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.type.Type;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.openjdk.jol.info.ClassLayout;
Expand Down Expand Up @@ -53,9 +52,9 @@ public void append(IntArrayList positions, Block source)
}

@Override
public void appendRle(RunLengthEncodedBlock block)
public void appendRle(Block block, int rlePositionCount)
{
for (int i = 0; i < block.getPositionCount(); i++) {
for (int i = 0; i < rlePositionCount; i++) {
type.appendTo(block, 0, blockBuilder);
}
}
Expand Down
Loading