diff --git a/core/trino-main/src/main/java/io/trino/operator/PrecomputedHashGenerator.java b/core/trino-main/src/main/java/io/trino/operator/PrecomputedHashGenerator.java index 95c5ec91f2e1..b88802291832 100644 --- a/core/trino-main/src/main/java/io/trino/operator/PrecomputedHashGenerator.java +++ b/core/trino-main/src/main/java/io/trino/operator/PrecomputedHashGenerator.java @@ -14,7 +14,7 @@ package io.trino.operator; import io.trino.spi.Page; -import io.trino.spi.type.BigintType; +import io.trino.spi.block.Block; import static com.google.common.base.MoreObjects.toStringHelper; @@ -31,7 +31,8 @@ public PrecomputedHashGenerator(int hashChannel) @Override public long hashPosition(int position, Page page) { - return BigintType.BIGINT.getLong(page.getBlock(hashChannel), position); + Block hashBlock = page.getBlock(hashChannel); + return hashBlock.getLong(position, 0); } @Override diff --git a/core/trino-main/src/main/java/io/trino/operator/output/BytePositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/BytePositionsAppender.java new file mode 100644 index 000000000000..fb065efa2794 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/BytePositionsAppender.java @@ -0,0 +1,187 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import io.trino.spi.block.Block; +import io.trino.spi.block.ByteArrayBlock; +import io.trino.spi.block.RunLengthEncodedBlock; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.openjdk.jol.info.ClassLayout; + +import java.util.Arrays; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.sizeOf; +import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; +import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; +import static java.lang.Math.max; + +public class BytePositionsAppender + implements PositionsAppender +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(BytePositionsAppender.class).instanceSize(); + private static final Block NULL_VALUE_BLOCK = new ByteArrayBlock(1, Optional.of(new boolean[] {true}), new byte[1]); + + private boolean initialized; + private int initialEntryCount; + + private int positionCount; + private boolean hasNullValue; + private boolean hasNonNullValue; + + // it is assumed that these arrays are the same length + private boolean[] valueIsNull = new boolean[0]; + private byte[] values = new byte[0]; + + private long retainedSizeInBytes; + private long sizeInBytes; + + public BytePositionsAppender(int expectedEntries) + { + this.initialEntryCount = max(expectedEntries, 1); + + updateRetainedSize(); + } + + @Override + public void append(IntArrayList positions, Block block) + { + if (positions.isEmpty()) { + return; + } + // performance of this method depends on block being always the same, flat type + checkArgument(block instanceof ByteArrayBlock); + int[] positionArray = positions.elements(); + int positionsSize = positions.size(); + ensureCapacity(positionCount + positionsSize); + + if (block.mayHaveNull()) { + for (int i = 0; i < positionsSize; i++) { + int position = positionArray[i]; + boolean isNull = block.isNull(position); + int positionIndex = positionCount + i; + if (isNull) { + valueIsNull[positionIndex] = true; + hasNullValue = true; + } + else { + values[positionIndex] = block.getByte(position, 0); + hasNonNullValue = true; + } + } + positionCount += positionsSize; + } + else { + for (int i = 0; i < positionsSize; i++) { + int position = positionArray[i]; + values[positionCount + i] = block.getByte(position, 0); + } + positionCount += positionsSize; + hasNonNullValue = true; + } + + updateSize(positionsSize); + } + + @Override + public void appendRle(RunLengthEncodedBlock block) + { + int rlePositionCount = block.getPositionCount(); + if (rlePositionCount == 0) { + return; + } + int sourcePosition = 0; + ensureCapacity(positionCount + rlePositionCount); + if (block.isNull(sourcePosition)) { + Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true); + hasNullValue = true; + } + else { + byte value = block.getByte(sourcePosition, 0); + Arrays.fill(values, positionCount, positionCount + rlePositionCount, value); + hasNonNullValue = true; + } + positionCount += rlePositionCount; + + updateSize(rlePositionCount); + } + + @Override + public Block build() + { + if (!hasNonNullValue) { + return new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount); + } + ByteArrayBlock result = new ByteArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values); + reset(); + return result; + } + + @Override + public long getRetainedSizeInBytes() + { + return retainedSizeInBytes; + } + + @Override + public long getSizeInBytes() + { + return sizeInBytes; + } + + private void reset() + { + initialEntryCount = calculateBlockResetSize(positionCount); + initialized = false; + valueIsNull = new boolean[0]; + values = new byte[0]; + positionCount = 0; + sizeInBytes = 0; + hasNonNullValue = false; + hasNullValue = false; + updateRetainedSize(); + } + + private void ensureCapacity(int capacity) + { + if (values.length >= capacity) { + return; + } + + int newSize; + if (initialized) { + newSize = calculateNewArraySize(values.length); + } + else { + newSize = initialEntryCount; + initialized = true; + } + newSize = Math.max(newSize, capacity); + + valueIsNull = Arrays.copyOf(valueIsNull, newSize); + values = Arrays.copyOf(values, newSize); + updateRetainedSize(); + } + + private void updateSize(long positionsSize) + { + sizeInBytes += ByteArrayBlock.SIZE_IN_BYTES_PER_POSITION * positionsSize; + } + + private void updateRetainedSize() + { + retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values); + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/Int128PositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/Int128PositionsAppender.java new file mode 100644 index 000000000000..4046beb8d97f --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/Int128PositionsAppender.java @@ -0,0 +1,200 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import io.trino.spi.block.Block; +import io.trino.spi.block.Int128ArrayBlock; +import io.trino.spi.block.RunLengthEncodedBlock; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.openjdk.jol.info.ClassLayout; + +import java.util.Arrays; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static io.airlift.slice.SizeOf.sizeOf; +import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; +import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; +import static java.lang.Math.max; + +public class Int128PositionsAppender + implements PositionsAppender +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(Int128PositionsAppender.class).instanceSize(); + private static final Block NULL_VALUE_BLOCK = new Int128ArrayBlock(1, Optional.of(new boolean[] {true}), new long[2]); + + private boolean initialized; + private int initialEntryCount; + + private int positionCount; + private boolean hasNullValue; + private boolean hasNonNullValue; + + // it is assumed that these arrays are the same length + private boolean[] valueIsNull = new boolean[0]; + private long[] values = new long[0]; + + private long retainedSizeInBytes; + private long sizeInBytes; + + public Int128PositionsAppender(int expectedEntries) + { + this.initialEntryCount = max(expectedEntries, 1); + + updateRetainedSize(); + } + + @Override + public void append(IntArrayList positions, Block block) + { + if (positions.isEmpty()) { + return; + } + // performance of this method depends on block being always the same, flat type + checkArgument(block instanceof Int128ArrayBlock); + int[] positionArray = positions.elements(); + int positionsSize = positions.size(); + ensureCapacity(positionCount + positionsSize); + + if (block.mayHaveNull()) { + int positionIndex = positionCount * 2; + for (int i = 0; i < positionsSize; i++) { + int position = positionArray[i]; + boolean isNull = block.isNull(position); + + if (isNull) { + valueIsNull[positionCount + i] = true; + hasNullValue = true; + } + else { + values[positionIndex] = block.getLong(position, 0); + values[positionIndex + 1] = block.getLong(position, SIZE_OF_LONG); + hasNonNullValue = true; + } + positionIndex += 2; + } + positionCount += positionsSize; + } + else { + int positionIndex = positionCount * 2; + for (int i = 0; i < positionsSize; i++) { + int position = positionArray[i]; + values[positionIndex] = block.getLong(position, 0); + values[positionIndex + 1] = block.getLong(position, SIZE_OF_LONG); + positionIndex += 2; + } + positionCount += positionsSize; + hasNonNullValue = true; + } + + updateSize(positionsSize); + } + + @Override + public void appendRle(RunLengthEncodedBlock block) + { + int rlePositionCount = block.getPositionCount(); + if (rlePositionCount == 0) { + return; + } + int sourcePosition = 0; + ensureCapacity(positionCount + rlePositionCount); + if (block.isNull(sourcePosition)) { + Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true); + hasNullValue = true; + } + else { + long valueHigh = block.getLong(sourcePosition, 0); + long valueLow = block.getLong(sourcePosition, SIZE_OF_LONG); + int positionIndex = positionCount * 2; + for (int i = 0; i < rlePositionCount; i++) { + values[positionIndex] = valueHigh; + values[positionIndex + 1] = valueLow; + positionIndex += 2; + } + hasNonNullValue = true; + } + positionCount += rlePositionCount; + + updateSize(rlePositionCount); + } + + @Override + public Block build() + { + if (!hasNonNullValue) { + return new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount); + } + Int128ArrayBlock result = new Int128ArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values); + reset(); + return result; + } + + @Override + public long getRetainedSizeInBytes() + { + return retainedSizeInBytes; + } + + @Override + public long getSizeInBytes() + { + return sizeInBytes; + } + + private void reset() + { + initialEntryCount = calculateBlockResetSize(positionCount); + initialized = false; + valueIsNull = new boolean[0]; + values = new long[0]; + positionCount = 0; + sizeInBytes = 0; + hasNonNullValue = false; + hasNullValue = false; + updateRetainedSize(); + } + + private void ensureCapacity(int capacity) + { + if (valueIsNull.length >= capacity) { + return; + } + + int newSize; + if (initialized) { + newSize = calculateNewArraySize(valueIsNull.length); + } + else { + newSize = initialEntryCount; + initialized = true; + } + newSize = Math.max(newSize, capacity); + + valueIsNull = Arrays.copyOf(valueIsNull, newSize); + values = Arrays.copyOf(values, newSize * 2); + updateRetainedSize(); + } + + private void updateSize(long positionsSize) + { + sizeInBytes += Int128ArrayBlock.SIZE_IN_BYTES_PER_POSITION * positionsSize; + } + + private void updateRetainedSize() + { + retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values); + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/Int96PositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/Int96PositionsAppender.java new file mode 100644 index 000000000000..721006f95b0e --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/Int96PositionsAppender.java @@ -0,0 +1,197 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import io.trino.spi.block.Block; +import io.trino.spi.block.Int96ArrayBlock; +import io.trino.spi.block.RunLengthEncodedBlock; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.openjdk.jol.info.ClassLayout; + +import java.util.Arrays; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.SIZE_OF_LONG; +import static io.airlift.slice.SizeOf.sizeOf; +import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; +import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; +import static java.lang.Math.max; + +public class Int96PositionsAppender + implements PositionsAppender +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(Int96PositionsAppender.class).instanceSize(); + private static final Block NULL_VALUE_BLOCK = new Int96ArrayBlock(1, Optional.of(new boolean[] {true}), new long[1], new int[1]); + + private boolean initialized; + private int initialEntryCount; + + private int positionCount; + private boolean hasNullValue; + private boolean hasNonNullValue; + + // it is assumed that these arrays are the same length + private boolean[] valueIsNull = new boolean[0]; + private long[] high = new long[0]; + private int[] low = new int[0]; + + private long retainedSizeInBytes; + private long sizeInBytes; + + public Int96PositionsAppender(int expectedEntries) + { + this.initialEntryCount = max(expectedEntries, 1); + + updateRetainedSize(); + } + + @Override + public void append(IntArrayList positions, Block block) + { + if (positions.isEmpty()) { + return; + } + // performance of this method depends on block being always the same, flat type + checkArgument(block instanceof Int96ArrayBlock); + int[] positionArray = positions.elements(); + int positionsSize = positions.size(); + ensureCapacity(positionCount + positionsSize); + + if (block.mayHaveNull()) { + for (int i = 0; i < positionsSize; i++) { + int position = positionArray[i]; + boolean isNull = block.isNull(position); + int positionIndex = positionCount + i; + if (isNull) { + valueIsNull[positionIndex] = true; + hasNullValue = true; + } + else { + high[positionIndex] = block.getLong(position, 0); + low[positionIndex] = block.getInt(position, SIZE_OF_LONG); + hasNonNullValue = true; + } + } + positionCount += positionsSize; + } + else { + for (int i = 0; i < positionsSize; i++) { + int position = positionArray[i]; + high[positionCount + i] = block.getLong(position, 0); + low[positionCount + i] = block.getInt(position, SIZE_OF_LONG); + } + positionCount += positionsSize; + hasNonNullValue = true; + } + + updateSize(positionsSize); + } + + @Override + public void appendRle(RunLengthEncodedBlock block) + { + int rlePositionCount = block.getPositionCount(); + if (rlePositionCount == 0) { + return; + } + int sourcePosition = 0; + ensureCapacity(positionCount + rlePositionCount); + if (block.isNull(sourcePosition)) { + Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true); + hasNullValue = true; + } + else { + long valueHigh = block.getLong(sourcePosition, 0); + int valueLow = block.getInt(sourcePosition, SIZE_OF_LONG); + for (int i = 0; i < rlePositionCount; i++) { + high[positionCount + i] = valueHigh; + low[positionCount + i] = valueLow; + } + hasNonNullValue = true; + } + positionCount += rlePositionCount; + + updateSize(rlePositionCount); + } + + @Override + public Block build() + { + if (!hasNonNullValue) { + return new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount); + } + Int96ArrayBlock result = new Int96ArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), high, low); + reset(); + return result; + } + + @Override + public long getRetainedSizeInBytes() + { + return retainedSizeInBytes; + } + + @Override + public long getSizeInBytes() + { + return sizeInBytes; + } + + private void reset() + { + initialEntryCount = calculateBlockResetSize(positionCount); + initialized = false; + valueIsNull = new boolean[0]; + high = new long[0]; + low = new int[0]; + positionCount = 0; + sizeInBytes = 0; + hasNonNullValue = false; + hasNullValue = false; + updateRetainedSize(); + } + + private void ensureCapacity(int capacity) + { + if (valueIsNull.length >= capacity) { + return; + } + + int newSize; + if (initialized) { + newSize = calculateNewArraySize(valueIsNull.length); + } + else { + newSize = initialEntryCount; + initialized = true; + } + newSize = Math.max(newSize, capacity); + + valueIsNull = Arrays.copyOf(valueIsNull, newSize); + high = Arrays.copyOf(high, newSize); + low = Arrays.copyOf(low, newSize); + updateRetainedSize(); + } + + private void updateSize(long positionsSize) + { + sizeInBytes += Int96ArrayBlock.SIZE_IN_BYTES_PER_POSITION * positionsSize; + } + + private void updateRetainedSize() + { + retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(high) + sizeOf(low); + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/IntPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/IntPositionsAppender.java new file mode 100644 index 000000000000..76f0c45840ac --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/IntPositionsAppender.java @@ -0,0 +1,187 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import io.trino.spi.block.Block; +import io.trino.spi.block.IntArrayBlock; +import io.trino.spi.block.RunLengthEncodedBlock; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.openjdk.jol.info.ClassLayout; + +import java.util.Arrays; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.sizeOf; +import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; +import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; +import static java.lang.Math.max; + +public class IntPositionsAppender + implements PositionsAppender +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(IntPositionsAppender.class).instanceSize(); + private static final Block NULL_VALUE_BLOCK = new IntArrayBlock(1, Optional.of(new boolean[] {true}), new int[1]); + + private boolean initialized; + private int initialEntryCount; + + private int positionCount; + private boolean hasNullValue; + private boolean hasNonNullValue; + + // it is assumed that these arrays are the same length + private boolean[] valueIsNull = new boolean[0]; + private int[] values = new int[0]; + + private long retainedSizeInBytes; + private long sizeInBytes; + + public IntPositionsAppender(int expectedEntries) + { + this.initialEntryCount = max(expectedEntries, 1); + + updateRetainedSize(); + } + + @Override + public void append(IntArrayList positions, Block block) + { + if (positions.isEmpty()) { + return; + } + // performance of this method depends on block being always the same, flat type + checkArgument(block instanceof IntArrayBlock); + int[] positionArray = positions.elements(); + int positionsSize = positions.size(); + ensureCapacity(positionCount + positionsSize); + + if (block.mayHaveNull()) { + for (int i = 0; i < positionsSize; i++) { + int position = positionArray[i]; + boolean isNull = block.isNull(position); + int positionIndex = positionCount + i; + if (isNull) { + valueIsNull[positionIndex] = true; + hasNullValue = true; + } + else { + values[positionIndex] = block.getInt(position, 0); + hasNonNullValue = true; + } + } + positionCount += positionsSize; + } + else { + for (int i = 0; i < positionsSize; i++) { + int position = positionArray[i]; + values[positionCount + i] = block.getInt(position, 0); + } + positionCount += positionsSize; + hasNonNullValue = true; + } + + updateSize(positionsSize); + } + + @Override + public void appendRle(RunLengthEncodedBlock block) + { + int rlePositionCount = block.getPositionCount(); + if (rlePositionCount == 0) { + return; + } + int sourcePosition = 0; + ensureCapacity(positionCount + rlePositionCount); + if (block.isNull(sourcePosition)) { + Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true); + hasNullValue = true; + } + else { + int value = block.getInt(sourcePosition, 0); + Arrays.fill(values, positionCount, positionCount + rlePositionCount, value); + hasNonNullValue = true; + } + positionCount += rlePositionCount; + + updateSize(rlePositionCount); + } + + @Override + public Block build() + { + if (!hasNonNullValue) { + return new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount); + } + IntArrayBlock result = new IntArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values); + reset(); + return result; + } + + @Override + public long getRetainedSizeInBytes() + { + return retainedSizeInBytes; + } + + @Override + public long getSizeInBytes() + { + return sizeInBytes; + } + + private void reset() + { + initialEntryCount = calculateBlockResetSize(positionCount); + initialized = false; + valueIsNull = new boolean[0]; + values = new int[0]; + positionCount = 0; + sizeInBytes = 0; + hasNonNullValue = false; + hasNullValue = false; + updateRetainedSize(); + } + + private void ensureCapacity(int capacity) + { + if (values.length >= capacity) { + return; + } + + int newSize; + if (initialized) { + newSize = calculateNewArraySize(values.length); + } + else { + newSize = initialEntryCount; + initialized = true; + } + newSize = Math.max(newSize, capacity); + + valueIsNull = Arrays.copyOf(valueIsNull, newSize); + values = Arrays.copyOf(values, newSize); + updateRetainedSize(); + } + + private void updateSize(long positionsSize) + { + sizeInBytes += IntArrayBlock.SIZE_IN_BYTES_PER_POSITION * positionsSize; + } + + private void updateRetainedSize() + { + retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values); + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/LongPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/LongPositionsAppender.java new file mode 100644 index 000000000000..2ae9ff6237f2 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/LongPositionsAppender.java @@ -0,0 +1,187 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import io.trino.spi.block.Block; +import io.trino.spi.block.LongArrayBlock; +import io.trino.spi.block.RunLengthEncodedBlock; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.openjdk.jol.info.ClassLayout; + +import java.util.Arrays; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.sizeOf; +import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; +import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; +import static java.lang.Math.max; + +public class LongPositionsAppender + implements PositionsAppender +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(LongPositionsAppender.class).instanceSize(); + private static final Block NULL_VALUE_BLOCK = new LongArrayBlock(1, Optional.of(new boolean[] {true}), new long[1]); + + private boolean initialized; + private int initialEntryCount; + + private int positionCount; + private boolean hasNullValue; + private boolean hasNonNullValue; + + // it is assumed that these arrays are the same length + private boolean[] valueIsNull = new boolean[0]; + private long[] values = new long[0]; + + private long retainedSizeInBytes; + private long sizeInBytes; + + public LongPositionsAppender(int expectedEntries) + { + this.initialEntryCount = max(expectedEntries, 1); + + updateRetainedSize(); + } + + @Override + public void append(IntArrayList positions, Block block) + { + if (positions.isEmpty()) { + return; + } + // performance of this method depends on block being always the same, flat type + checkArgument(block instanceof LongArrayBlock); + int[] positionArray = positions.elements(); + int positionsSize = positions.size(); + ensureCapacity(positionCount + positionsSize); + + if (block.mayHaveNull()) { + for (int i = 0; i < positionsSize; i++) { + int position = positionArray[i]; + int positionIndex = positionCount + i; + boolean isNull = block.isNull(position); + if (isNull) { + valueIsNull[positionIndex] = true; + hasNullValue = true; + } + else { + values[positionIndex] = block.getLong(position, 0); + hasNonNullValue = true; + } + } + positionCount += positionsSize; + } + else { + for (int i = 0; i < positionsSize; i++) { + int position = positionArray[i]; + values[positionCount + i] = block.getLong(position, 0); + } + positionCount += positionsSize; + hasNonNullValue = true; + } + + updateSize(positionsSize); + } + + @Override + public void appendRle(RunLengthEncodedBlock block) + { + int rlePositionCount = block.getPositionCount(); + if (rlePositionCount == 0) { + return; + } + int sourcePosition = 0; + ensureCapacity(positionCount + rlePositionCount); + if (block.isNull(sourcePosition)) { + Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true); + hasNullValue = true; + } + else { + long value = block.getLong(sourcePosition, 0); + Arrays.fill(values, positionCount, positionCount + rlePositionCount, value); + hasNonNullValue = true; + } + positionCount += rlePositionCount; + + updateSize(rlePositionCount); + } + + @Override + public Block build() + { + if (!hasNonNullValue) { + return new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount); + } + LongArrayBlock result = new LongArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values); + reset(); + return result; + } + + @Override + public long getRetainedSizeInBytes() + { + return retainedSizeInBytes; + } + + @Override + public long getSizeInBytes() + { + return sizeInBytes; + } + + private void reset() + { + initialEntryCount = calculateBlockResetSize(positionCount); + initialized = false; + valueIsNull = new boolean[0]; + values = new long[0]; + positionCount = 0; + sizeInBytes = 0; + hasNonNullValue = false; + hasNullValue = false; + updateRetainedSize(); + } + + private void ensureCapacity(int capacity) + { + if (values.length >= capacity) { + return; + } + + int newSize; + if (initialized) { + newSize = calculateNewArraySize(values.length); + } + else { + newSize = initialEntryCount; + initialized = true; + } + newSize = Math.max(newSize, capacity); + + valueIsNull = Arrays.copyOf(valueIsNull, newSize); + values = Arrays.copyOf(values, newSize); + updateRetainedSize(); + } + + private void updateSize(long positionsSize) + { + sizeInBytes += LongArrayBlock.SIZE_IN_BYTES_PER_POSITION * positionsSize; + } + + private void updateRetainedSize() + { + retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values); + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java b/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java index a62072256d36..4f611b5d985a 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java @@ -26,7 +26,6 @@ import io.trino.spi.Page; import io.trino.spi.PageBuilder; import io.trino.spi.block.Block; -import io.trino.spi.block.BlockBuilder; import io.trino.spi.block.DictionaryBlock; import io.trino.spi.block.RunLengthEncodedBlock; import io.trino.spi.predicate.NullableValue; @@ -57,7 +56,7 @@ public class PagePartitioner { - private static final int COLUMNAR_STRATEGY_COEFFICIENT = 4; + private static final int COLUMNAR_STRATEGY_COEFFICIENT = 2; private final OutputBuffer outputBuffer; private final Type[] sourceTypes; private final PartitionFunction partitionFunction; @@ -66,13 +65,12 @@ public class PagePartitioner private final Block[] partitionConstantBlocks; // when null, no constants are present. Only non-null elements are constants private final PagesSerde serde; private final PageBuilder[] pageBuilders; + private final PositionsAppenderPageBuilder[] positionsAppenders; private final boolean replicatesAnyRow; private final int nullChannel; // when >= 0, send the position to every partition if this channel is null private final AtomicLong rowsAdded = new AtomicLong(); private final AtomicLong pagesAdded = new AtomicLong(); private final OperatorContext operatorContext; - private final PositionsAppenderFactory positionsAppenderFactory; - private final PositionsAppender[] positionsAppenders; private boolean hasAnyRowBeenReplicated; @@ -91,7 +89,7 @@ public PagePartitioner( { this.partitionFunction = requireNonNull(partitionFunction, "partitionFunction is null"); this.partitionChannels = Ints.toArray(requireNonNull(partitionChannels, "partitionChannels is null")); - this.positionsAppenderFactory = requireNonNull(positionsAppenderFactory, "positionsAppenderFactory is null"); + requireNonNull(positionsAppenderFactory, "positionsAppenderFactory is null"); Block[] partitionConstantBlocks = requireNonNull(partitionConstants, "partitionConstants is null").stream() .map(constant -> constant.map(NullableValue::asBlock).orElse(null)) .toArray(Block[]::new); @@ -120,11 +118,14 @@ public PagePartitioner( int pageSize = toIntExact(min(DEFAULT_MAX_PAGE_SIZE_IN_BYTES, maxMemory.toBytes() / partitionCount)); pageSize = max(1, pageSize); + this.positionsAppenders = new PositionsAppenderPageBuilder[partitionCount]; + for (int i = 0; i < partitionCount; i++) { + positionsAppenders[i] = PositionsAppenderPageBuilder.withMaxPageSize(pageSize, sourceTypes, positionsAppenderFactory); + } this.pageBuilders = new PageBuilder[partitionCount]; for (int i = 0; i < partitionCount; i++) { pageBuilders[i] = PageBuilder.withMaxPageSize(pageSize, sourceTypes); } - positionsAppenders = new PositionsAppender[sourceTypes.size()]; } public ListenableFuture isFull() @@ -137,6 +138,9 @@ public long getSizeInBytes() // We use a foreach loop instead of streams // as it has much better performance. long sizeInBytes = 0; + for (PositionsAppenderPageBuilder pageBuilder : positionsAppenders) { + sizeInBytes += pageBuilder.getSizeInBytes(); + } for (PageBuilder pageBuilder : pageBuilders) { sizeInBytes += pageBuilder.getSizeInBytes(); } @@ -149,6 +153,9 @@ public long getSizeInBytes() public long getRetainedSizeInBytes() { long sizeInBytes = 0; + for (PositionsAppenderPageBuilder pageBuilder : positionsAppenders) { + sizeInBytes += pageBuilder.getRetainedSizeInBytes(); + } for (PageBuilder pageBuilder : pageBuilders) { sizeInBytes += pageBuilder.getRetainedSizeInBytes(); } @@ -230,7 +237,7 @@ public void partitionPageByRow(Page page) } } - flush(false); + flushPageBuilders(false); } private void appendRow(PageBuilder pageBuilder, Page page, int position) @@ -247,25 +254,15 @@ public void partitionPageByColumn(Page page) { IntArrayList[] partitionedPositions = partitionPositions(page); - PositionsAppender[] positionsAppenders = getAppenders(page); - for (int i = 0; i < partitionFunction.getPartitionCount(); i++) { IntArrayList partitionPositions = partitionedPositions[i]; if (!partitionPositions.isEmpty()) { - appendToOutputPartition(pageBuilders[i], page, partitionPositions, positionsAppenders); + positionsAppenders[i].appendToOutputPartition(page, partitionPositions); partitionPositions.clear(); } } - flush(false); - } - - private PositionsAppender[] getAppenders(Page page) - { - for (int i = 0; i < positionsAppenders.length; i++) { - positionsAppenders[i] = positionsAppenderFactory.create(sourceTypes[i], page.getBlock(i).getClass()); - } - return positionsAppenders; + flushPositionsAppenders(false); } private IntArrayList[] partitionPositions(Page page) @@ -301,17 +298,6 @@ else if (partitionFunctionArgs.getChannelCount() == 1 && isDictionaryProcessingF return partitionPositions; } - private void appendToOutputPartition(PageBuilder outputPartition, Page page, IntArrayList positions, PositionsAppender[] positionsAppenders) - { - outputPartition.declarePositions(positions.size()); - - for (int channel = 0; channel < positionsAppenders.length; channel++) { - Block partitionBlock = page.getBlock(channel); - BlockBuilder target = outputPartition.getBlockBuilder(channel); - positionsAppenders[channel].appendTo(positions, partitionBlock, target); - } - } - private IntArrayList[] initPositions(Page page) { // We allocate new arrays for every page (instead of caching them) because we don't @@ -448,9 +434,15 @@ private IntArrayList[] partitionNullablePositions(Page page, int position, IntAr private IntArrayList[] partitionNotNullPositions(Page page, int startingPosition, IntArrayList[] partitionPositions, IntUnaryOperator partitionFunction) { - for (int position = startingPosition; position < page.getPositionCount(); position++) { + int positionCount = page.getPositionCount(); + int[] partitionPerPosition = new int[positionCount]; + for (int position = startingPosition; position < positionCount; position++) { int partition = partitionFunction.applyAsInt(position); - partitionPositions[partition].add(position); + partitionPerPosition[position] = partition; + } + + for (int position = startingPosition; position < positionCount; position++) { + partitionPositions[partitionPerPosition[position]].add(position); } return partitionPositions; @@ -476,7 +468,13 @@ private Page getPartitionFunctionArguments(Page page) return new Page(page.getPositionCount(), blocks); } - public void flush(boolean force) + public void forceFlush() + { + flushPositionsAppenders(true); + flushPageBuilders(true); + } + + private void flushPageBuilders(boolean force) { try (PagesSerde.PagesSerdeContext context = serde.newContext()) { // add all full pages to output buffer @@ -486,16 +484,35 @@ public void flush(boolean force) Page pagePartition = partitionPageBuilder.build(); partitionPageBuilder.reset(); - operatorContext.recordOutput(pagePartition.getSizeInBytes(), pagePartition.getPositionCount()); + enqueuePage(pagePartition, partition, context); + } + } + } + } - outputBuffer.enqueue(partition, splitAndSerializePage(context, pagePartition)); - pagesAdded.incrementAndGet(); - rowsAdded.addAndGet(pagePartition.getPositionCount()); + private void flushPositionsAppenders(boolean force) + { + try (PagesSerde.PagesSerdeContext context = serde.newContext()) { + // add all full pages to output buffer + for (int partition = 0; partition < positionsAppenders.length; partition++) { + PositionsAppenderPageBuilder partitionPageBuilder = positionsAppenders[partition]; + if (!partitionPageBuilder.isEmpty() && (force || partitionPageBuilder.isFull())) { + Page pagePartition = partitionPageBuilder.build(); + enqueuePage(pagePartition, partition, context); } } } } + private void enqueuePage(Page pagePartition, int partition, PagesSerde.PagesSerdeContext context) + { + operatorContext.recordOutput(pagePartition.getSizeInBytes(), pagePartition.getPositionCount()); + + outputBuffer.enqueue(partition, splitAndSerializePage(context, pagePartition)); + pagesAdded.incrementAndGet(); + rowsAdded.addAndGet(pagePartition.getPositionCount()); + } + private List splitAndSerializePage(PagesSerde.PagesSerdeContext context, Page page) { List split = splitPage(page, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java b/core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java index 355340b8c71c..16b55c35601d 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java @@ -245,7 +245,7 @@ public OperatorContext getOperatorContext() public void finish() { finished = true; - partitionFunction.flush(true); + partitionFunction.forceFlush(); } @Override diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppender.java index 0b5aa8414f8b..8a8697b77f04 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppender.java @@ -11,37 +11,37 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package io.trino.operator.output; import io.trino.spi.block.Block; -import io.trino.spi.block.BlockBuilder; -import io.trino.spi.type.Type; +import io.trino.spi.block.RunLengthEncodedBlock; import it.unimi.dsi.fastutil.ints.IntArrayList; -import static java.util.Objects.requireNonNull; - public interface PositionsAppender { - void appendTo(IntArrayList positions, Block source, BlockBuilder target); + void append(IntArrayList positions, Block source); + + /** + * Appends value from the {@code rleBlock} to this appender {@link RunLengthEncodedBlock#getPositionCount()} times. + * The result is the same as with using {@link PositionsAppender#append(IntArrayList, Block)} with + * positions list [0...{@link RunLengthEncodedBlock#getPositionCount()} -1] + * but with possible performance optimizations for {@link RunLengthEncodedBlock}. + */ + void appendRle(RunLengthEncodedBlock rleBlock); - class TypedPositionsAppender - implements PositionsAppender - { - private final Type type; + /** + * Creates the block from the appender data. + * After this, appender is reset to the initial state, and it is ready to build a new block. + */ + Block build(); - public TypedPositionsAppender(Type type) - { - this.type = requireNonNull(type, "type is null"); - } + /** + * Returns number of bytes retained by this instance in memory including over-allocations. + */ + long getRetainedSizeInBytes(); - @Override - public void appendTo(IntArrayList positions, Block source, BlockBuilder target) - { - int[] positionArray = positions.elements(); - for (int i = 0; i < positions.size(); i++) { - type.appendTo(source, positionArray[i], target); - } - } - } + /** + * Returns the size of memory in bytes used by this appender. + */ + long getSizeInBytes(); } diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderFactory.java b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderFactory.java index 41021d98b70e..a87c6eb60c6a 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderFactory.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderFactory.java @@ -13,341 +13,60 @@ */ package io.trino.operator.output; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import io.airlift.bytecode.DynamicClassLoader; -import io.trino.collect.cache.NonEvictableLoadingCache; -import io.trino.operator.output.PositionsAppender.TypedPositionsAppender; -import io.trino.spi.block.Block; -import io.trino.spi.block.BlockBuilder; import io.trino.spi.block.Int128ArrayBlock; import io.trino.spi.block.Int96ArrayBlock; import io.trino.spi.type.FixedWidthType; import io.trino.spi.type.Type; import io.trino.spi.type.VariableWidthType; -import io.trino.sql.gen.IsolatedClass; -import it.unimi.dsi.fastutil.ints.IntArrayList; +import io.trino.type.BlockTypeOperators; -import java.util.Objects; -import java.util.Optional; - -import static io.airlift.slice.SizeOf.SIZE_OF_LONG; -import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache; import static java.util.Objects.requireNonNull; -/** - * Isolates the {@code PositionsAppender} class per type and block tuples. - * Type specific {@code PositionsAppender} implementations manually inline {@code Type#appendTo} method inside the loop - * to avoid virtual(mega-morphic) calls and force jit to inline the {@code Block} and {@code BlockBuilder} methods. - * Ideally, {@code TypedPositionsAppender} could work instead of type specific {@code PositionsAppender}s, - * but in practice jit falls back to virtual calls in some cases (e.g. {@link Block#isNull}). - */ public class PositionsAppenderFactory { - private final NonEvictableLoadingCache cache; + private final BlockTypeOperators blockTypeOperators; - public PositionsAppenderFactory() + public PositionsAppenderFactory(BlockTypeOperators blockTypeOperators) { - this.cache = buildNonEvictableCache( - CacheBuilder.newBuilder().maximumSize(1000), - CacheLoader.from(key -> createAppender(key.type))); + this.blockTypeOperators = requireNonNull(blockTypeOperators, "blockTypeOperators is null"); } - public PositionsAppender create(Type type, Class blockClass) + public PositionsAppender create(Type type, int expectedPositions, long maxPageSizeInBytes) { - return cache.getUnchecked(new CacheKey(type, blockClass)); - } + if (!type.isComparable()) { + return new UnnestingPositionsAppender(createPrimitiveAppender(type, expectedPositions, maxPageSizeInBytes)); + } - private PositionsAppender createAppender(Type type) - { - return Optional.ofNullable(findDedicatedAppenderClassFor(type)) - .map(this::isolateAppender) - .orElseGet(() -> isolateTypeAppender(type)); + return new UnnestingPositionsAppender( + new RleAwarePositionsAppender( + blockTypeOperators.getEqualOperator(type), + createPrimitiveAppender(type, expectedPositions, maxPageSizeInBytes))); } - private Class findDedicatedAppenderClassFor(Type type) + private PositionsAppender createPrimitiveAppender(Type type, int expectedPositions, long maxPageSizeInBytes) { if (type instanceof FixedWidthType) { switch (((FixedWidthType) type).getFixedSize()) { case Byte.BYTES: - return BytePositionsAppender.class; + return new BytePositionsAppender(expectedPositions); case Short.BYTES: - return SmallintPositionsAppender.class; + return new ShortPositionsAppender(expectedPositions); case Integer.BYTES: - return IntPositionsAppender.class; + return new IntPositionsAppender(expectedPositions); case Long.BYTES: - return LongPositionsAppender.class; + return new LongPositionsAppender(expectedPositions); case Int96ArrayBlock.INT96_BYTES: - return Int96PositionsAppender.class; + return new Int96PositionsAppender(expectedPositions); case Int128ArrayBlock.INT128_BYTES: - return Int128PositionsAppender.class; + return new Int128PositionsAppender(expectedPositions); default: // size not supported directly, fallback to the generic appender } } else if (type instanceof VariableWidthType) { - return SlicePositionsAppender.class; - } - - return null; - } - - private PositionsAppender isolateTypeAppender(Type type) - { - Class isolatedAppenderClass = isolateAppenderClass(TypedPositionsAppender.class); - try { - return isolatedAppenderClass.getConstructor(Type.class).newInstance(type); - } - catch (ReflectiveOperationException e) { - throw new RuntimeException(e); - } - } - - private PositionsAppender isolateAppender(Class appenderClass) - { - Class isolatedAppenderClass = isolateAppenderClass(appenderClass); - try { - return isolatedAppenderClass.getConstructor().newInstance(); - } - catch (ReflectiveOperationException e) { - throw new RuntimeException(e); - } - } - - private Class isolateAppenderClass(Class appenderClass) - { - DynamicClassLoader dynamicClassLoader = new DynamicClassLoader(PositionsAppender.class.getClassLoader()); - - Class isolatedBatchPositionsTransferClass = IsolatedClass.isolateClass( - dynamicClassLoader, - PositionsAppender.class, - appenderClass); - return isolatedBatchPositionsTransferClass; - } - - public static class LongPositionsAppender - implements PositionsAppender - { - @Override - public void appendTo(IntArrayList positions, Block block, BlockBuilder blockBuilder) - { - int[] positionArray = positions.elements(); - if (block.mayHaveNull()) { - for (int i = 0; i < positions.size(); i++) { - int position = positionArray[i]; - if (block.isNull(position)) { - blockBuilder.appendNull(); - } - else { - blockBuilder.writeLong(block.getLong(position, 0)).closeEntry(); - } - } - } - else { - for (int i = 0; i < positions.size(); i++) { - blockBuilder.writeLong(block.getLong(positionArray[i], 0)).closeEntry(); - } - } - } - } - - public static class IntPositionsAppender - implements PositionsAppender - { - @Override - public void appendTo(IntArrayList positions, Block block, BlockBuilder blockBuilder) - { - int[] positionArray = positions.elements(); - if (block.mayHaveNull()) { - for (int i = 0; i < positions.size(); i++) { - int position = positionArray[i]; - if (block.isNull(position)) { - blockBuilder.appendNull(); - } - else { - blockBuilder.writeInt(block.getInt(position, 0)).closeEntry(); - } - } - } - else { - for (int i = 0; i < positions.size(); i++) { - blockBuilder.writeInt(block.getInt(positionArray[i], 0)).closeEntry(); - } - } - } - } - - public static class BytePositionsAppender - implements PositionsAppender - { - @Override - public void appendTo(IntArrayList positions, Block block, BlockBuilder blockBuilder) - { - int[] positionArray = positions.elements(); - if (block.mayHaveNull()) { - for (int i = 0; i < positions.size(); i++) { - int position = positionArray[i]; - if (block.isNull(position)) { - blockBuilder.appendNull(); - } - else { - blockBuilder.writeByte(block.getByte(position, 0)).closeEntry(); - } - } - } - else { - for (int i = 0; i < positions.size(); i++) { - blockBuilder.writeByte(block.getByte(positionArray[i], 0)).closeEntry(); - } - } - } - } - - public static class SlicePositionsAppender - implements PositionsAppender - { - @Override - public void appendTo(IntArrayList positions, Block block, BlockBuilder blockBuilder) - { - int[] positionArray = positions.elements(); - if (block.mayHaveNull()) { - for (int i = 0; i < positions.size(); i++) { - int position = positionArray[i]; - if (block.isNull(position)) { - blockBuilder.appendNull(); - } - else { - block.writeBytesTo(position, 0, block.getSliceLength(position), blockBuilder); - blockBuilder.closeEntry(); - } - } - } - else { - for (int i = 0; i < positions.size(); i++) { - int position = positionArray[i]; - block.writeBytesTo(position, 0, block.getSliceLength(position), blockBuilder); - blockBuilder.closeEntry(); - } - } - } - } - - public static class SmallintPositionsAppender - implements PositionsAppender - { - @Override - public void appendTo(IntArrayList positions, Block block, BlockBuilder blockBuilder) - { - int[] positionArray = positions.elements(); - if (block.mayHaveNull()) { - for (int i = 0; i < positions.size(); i++) { - int position = positionArray[i]; - if (block.isNull(position)) { - blockBuilder.appendNull(); - } - else { - blockBuilder.writeShort(block.getShort(position, 0)).closeEntry(); - } - } - } - else { - for (int i = 0; i < positions.size(); i++) { - blockBuilder.writeShort(block.getShort(positionArray[i], 0)).closeEntry(); - } - } - } - } - - public static class Int96PositionsAppender - implements PositionsAppender - { - @Override - public void appendTo(IntArrayList positions, Block block, BlockBuilder blockBuilder) - { - int[] positionArray = positions.elements(); - if (block.mayHaveNull()) { - for (int i = 0; i < positions.size(); i++) { - int position = positionArray[i]; - if (block.isNull(position)) { - blockBuilder.appendNull(); - } - else { - blockBuilder.writeLong(block.getLong(position, 0)); - blockBuilder.writeInt(block.getInt(position, SIZE_OF_LONG)); - blockBuilder.closeEntry(); - } - } - } - else { - for (int i = 0; i < positions.size(); i++) { - int position = positionArray[i]; - blockBuilder.writeLong(block.getLong(position, 0)); - blockBuilder.writeInt(block.getInt(position, SIZE_OF_LONG)); - blockBuilder.closeEntry(); - } - } - } - } - - public static class Int128PositionsAppender - implements PositionsAppender - { - @Override - public void appendTo(IntArrayList positions, Block block, BlockBuilder blockBuilder) - { - int[] positionArray = positions.elements(); - if (block.mayHaveNull()) { - for (int i = 0; i < positions.size(); i++) { - int position = positionArray[i]; - if (block.isNull(position)) { - blockBuilder.appendNull(); - } - else { - blockBuilder.writeLong(block.getLong(position, 0)); - blockBuilder.writeLong(block.getLong(position, SIZE_OF_LONG)); - blockBuilder.closeEntry(); - } - } - } - else { - for (int i = 0; i < positions.size(); i++) { - int position = positionArray[i]; - blockBuilder.writeLong(block.getLong(position, 0)); - blockBuilder.writeLong(block.getLong(position, SIZE_OF_LONG)); - blockBuilder.closeEntry(); - } - } - } - } - - private static class CacheKey - { - private final Type type; - private final Class blockClass; - - private CacheKey(Type type, Class blockClass) - { - this.type = requireNonNull(type, "type is null"); - this.blockClass = requireNonNull(blockClass, "blockClass is null"); + return new SlicePositionsAppender(expectedPositions, maxPageSizeInBytes); } - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - CacheKey cacheKey = (CacheKey) o; - return type.equals(cacheKey.type) && blockClass.equals(cacheKey.blockClass); - } - - @Override - public int hashCode() - { - return Objects.hash(type, blockClass); - } + return new TypedPositionsAppender(type, expectedPositions); } } diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java new file mode 100644 index 000000000000..7c232be59797 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java @@ -0,0 +1,120 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import io.trino.spi.Page; +import io.trino.spi.block.Block; +import io.trino.spi.type.Type; +import it.unimi.dsi.fastutil.ints.IntArrayList; + +import java.util.List; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +public class PositionsAppenderPageBuilder +{ + private static final int DEFAULT_INITIAL_EXPECTED_ENTRIES = 8; + private final PositionsAppender[] channelAppenders; + private final int maxPageSizeInBytes; + private int declaredPositions; + + public static PositionsAppenderPageBuilder withMaxPageSize(int maxPageBytes, List sourceTypes, PositionsAppenderFactory positionsAppenderFactory) + { + return new PositionsAppenderPageBuilder(DEFAULT_INITIAL_EXPECTED_ENTRIES, maxPageBytes, sourceTypes, positionsAppenderFactory); + } + + private PositionsAppenderPageBuilder( + int initialExpectedEntries, + int maxPageSizeInBytes, + List types, + PositionsAppenderFactory positionsAppenderFactory) + { + requireNonNull(types, "types is null"); + requireNonNull(positionsAppenderFactory, "positionsAppenderFactory is null"); + + this.maxPageSizeInBytes = maxPageSizeInBytes; + channelAppenders = new PositionsAppender[types.size()]; + for (int i = 0; i < channelAppenders.length; i++) { + channelAppenders[i] = positionsAppenderFactory.create(types.get(i), initialExpectedEntries, maxPageSizeInBytes); + } + } + + public void appendToOutputPartition(Page page, IntArrayList positions) + { + declarePositions(positions.size()); + + for (int channel = 0; channel < channelAppenders.length; channel++) { + Block block = page.getBlock(channel); + channelAppenders[channel].append(positions, block); + } + } + + public long getRetainedSizeInBytes() + { + // We use a foreach loop instead of streams + // as it has much better performance. + long retainedSizeInBytes = 0; + for (PositionsAppender positionsAppender : channelAppenders) { + retainedSizeInBytes += positionsAppender.getRetainedSizeInBytes(); + } + return retainedSizeInBytes; + } + + public long getSizeInBytes() + { + long sizeInBytes = 0; + for (PositionsAppender positionsAppender : channelAppenders) { + sizeInBytes += positionsAppender.getSizeInBytes(); + } + return sizeInBytes; + } + + public void declarePositions(int positions) + { + declaredPositions += positions; + } + + public boolean isFull() + { + return declaredPositions == Integer.MAX_VALUE || getSizeInBytes() >= maxPageSizeInBytes; + } + + public boolean isEmpty() + { + return declaredPositions == 0; + } + + public Page build() + { + if (channelAppenders.length == 0) { + return new Page(declaredPositions); + } + + Block[] blocks = new Block[channelAppenders.length]; + for (int i = 0; i < blocks.length; i++) { + blocks[i] = channelAppenders[i].build(); + checkState(blocks[i].getPositionCount() == declaredPositions, "Declared positions (%s) does not match block %s's number of entries (%s)", declaredPositions, i, blocks[i].getPositionCount()); + } + + Page page = new Page(declaredPositions, blocks); + reset(); + return page; + } + + private void reset() + { + declaredPositions = 0; + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderUtil.java b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderUtil.java new file mode 100644 index 000000000000..001e60e460e4 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderUtil.java @@ -0,0 +1,74 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import static java.lang.Math.ceil; +import static java.lang.String.format; + +// Copied from io.trino.spi.block.BlockUtil +final class PositionsAppenderUtil +{ + private static final double BLOCK_RESET_SKEW = 1.25; + private static final int DEFAULT_CAPACITY = 64; + // See java.util.ArrayList for an explanation + static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + private PositionsAppenderUtil() + { + } + + // Copied from io.trino.spi.block.BlockUtil#calculateNewArraySize + static int calculateNewArraySize(int currentSize) + { + // grow array by 50% + long newSize = (long) currentSize + (currentSize >> 1); + + // verify new size is within reasonable bounds + if (newSize < DEFAULT_CAPACITY) { + newSize = DEFAULT_CAPACITY; + } + else if (newSize > MAX_ARRAY_SIZE) { + newSize = MAX_ARRAY_SIZE; + if (newSize == currentSize) { + throw new IllegalArgumentException(format("Cannot grow array beyond '%s'", MAX_ARRAY_SIZE)); + } + } + return (int) newSize; + } + + // Copied from io.trino.spi.block.BlockUtil#calculateBlockResetSize + static int calculateBlockResetSize(int currentSize) + { + long newSize = (long) ceil(currentSize * BLOCK_RESET_SKEW); + + // verify new size is within reasonable bounds + if (newSize < DEFAULT_CAPACITY) { + newSize = DEFAULT_CAPACITY; + } + else if (newSize > MAX_ARRAY_SIZE) { + newSize = MAX_ARRAY_SIZE; + } + return (int) newSize; + } + + // Copied from io.trino.spi.block.BlockUtil#calculateBlockResetBytes + static int calculateBlockResetBytes(int currentBytes) + { + long newBytes = (long) ceil(currentBytes * BLOCK_RESET_SKEW); + if (newBytes > MAX_ARRAY_SIZE) { + return MAX_ARRAY_SIZE; + } + return (int) newBytes; + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/RleAwarePositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/RleAwarePositionsAppender.java new file mode 100644 index 000000000000..b007a906a3e5 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/RleAwarePositionsAppender.java @@ -0,0 +1,135 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import io.trino.spi.block.Block; +import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.type.BlockTypeOperators.BlockPositionEqual; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.openjdk.jol.info.ClassLayout; + +import javax.annotation.Nullable; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +/** + * {@link PositionsAppender} that will produce {@link RunLengthEncodedBlock} output if possible, + * that is all inputs are {@link RunLengthEncodedBlock} blocks with the same value. + */ +public class RleAwarePositionsAppender + implements PositionsAppender +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(RleAwarePositionsAppender.class).instanceSize(); + private static final int NO_RLE = -1; + + private final BlockPositionEqual equalOperator; + private final PositionsAppender delegate; + + @Nullable + private Block rleValue; + + // NO_RLE means flat state, 0 means initial empty state, positive means RLE state and the current RLE position count. + private int rlePositionCount; + + public RleAwarePositionsAppender(BlockPositionEqual equalOperator, PositionsAppender delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.equalOperator = requireNonNull(equalOperator, "equalOperator is null"); + } + + @Override + public void append(IntArrayList positions, Block source) + { + // RleAwarePositionsAppender should be used with FlatteningPositionsAppender that makes sure + // append is called only with flat block + checkArgument(!(source instanceof RunLengthEncodedBlock)); + switchToFlat(); + delegate.append(positions, source); + } + + @Override + public void appendRle(RunLengthEncodedBlock source) + { + if (source.getPositionCount() == 0) { + return; + } + + if (rlePositionCount == 0) { + // initial empty state, switch to RLE state + rleValue = source.getValue(); + rlePositionCount = source.getPositionCount(); + } + else if (rleValue != null) { + // we are in the RLE state + if (equalOperator.equalNullSafe(rleValue, 0, source.getValue(), 0)) { + // the values match. we can just add positions. + this.rlePositionCount += source.getPositionCount(); + return; + } + // RLE values do not match. switch to flat state + switchToFlat(); + delegate.appendRle(source); + } + else { + // flat state + delegate.appendRle(source); + } + } + + @Override + public Block build() + { + Block result; + if (rleValue != null) { + result = new RunLengthEncodedBlock(rleValue, rlePositionCount); + } + else { + result = delegate.build(); + } + + reset(); + return result; + } + + private void reset() + { + rleValue = null; + rlePositionCount = 0; + } + + @Override + public long getRetainedSizeInBytes() + { + long retainedRleSize = rleValue != null ? rleValue.getRetainedSizeInBytes() : 0; + return INSTANCE_SIZE + retainedRleSize + delegate.getRetainedSizeInBytes(); + } + + @Override + public long getSizeInBytes() + { + long rleSize = rleValue != null ? rleValue.getSizeInBytes() : 0; + return rleSize + delegate.getSizeInBytes(); + } + + private void switchToFlat() + { + if (rleValue != null) { + // we are in the RLE state, flatten all RLE blocks + delegate.appendRle(new RunLengthEncodedBlock(rleValue, rlePositionCount)); + rleValue = null; + } + rlePositionCount = NO_RLE; + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/ShortPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/ShortPositionsAppender.java new file mode 100644 index 000000000000..b57305449ef8 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/ShortPositionsAppender.java @@ -0,0 +1,187 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import io.trino.spi.block.Block; +import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.block.ShortArrayBlock; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.openjdk.jol.info.ClassLayout; + +import java.util.Arrays; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.sizeOf; +import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; +import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; +import static java.lang.Math.max; + +public class ShortPositionsAppender + implements PositionsAppender +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(ShortPositionsAppender.class).instanceSize(); + private static final Block NULL_VALUE_BLOCK = new ShortArrayBlock(1, Optional.of(new boolean[] {true}), new short[1]); + + private boolean initialized; + private int initialEntryCount; + + private int positionCount; + private boolean hasNullValue; + private boolean hasNonNullValue; + + // it is assumed that these arrays are the same length + private boolean[] valueIsNull = new boolean[0]; + private short[] values = new short[0]; + + private long retainedSizeInBytes; + private long sizeInBytes; + + public ShortPositionsAppender(int expectedEntries) + { + this.initialEntryCount = max(expectedEntries, 1); + + updateRetainedSize(); + } + + @Override + public void append(IntArrayList positions, Block block) + { + if (positions.isEmpty()) { + return; + } + // performance of this method depends on block being always the same, flat type + checkArgument(block instanceof ShortArrayBlock); + int[] positionArray = positions.elements(); + int positionsSize = positions.size(); + ensureCapacity(positionCount + positionsSize); + + if (block.mayHaveNull()) { + for (int i = 0; i < positionsSize; i++) { + int position = positionArray[i]; + boolean isNull = block.isNull(position); + int positionIndex = positionCount + i; + if (isNull) { + valueIsNull[positionIndex] = true; + hasNullValue = true; + } + else { + values[positionIndex] = block.getShort(position, 0); + hasNonNullValue = true; + } + } + positionCount += positionsSize; + } + else { + for (int i = 0; i < positionsSize; i++) { + int position = positionArray[i]; + values[positionCount + i] = block.getShort(position, 0); + } + positionCount += positionsSize; + hasNonNullValue = true; + } + + updateSize(positionsSize); + } + + @Override + public void appendRle(RunLengthEncodedBlock block) + { + int rlePositionCount = block.getPositionCount(); + if (rlePositionCount == 0) { + return; + } + int sourcePosition = 0; + ensureCapacity(positionCount + rlePositionCount); + if (block.isNull(sourcePosition)) { + Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true); + hasNullValue = true; + } + else { + short value = block.getShort(sourcePosition, 0); + Arrays.fill(values, positionCount, positionCount + rlePositionCount, value); + hasNonNullValue = true; + } + positionCount += rlePositionCount; + + updateSize(rlePositionCount); + } + + @Override + public Block build() + { + if (!hasNonNullValue) { + return new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount); + } + ShortArrayBlock result = new ShortArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values); + reset(); + return result; + } + + @Override + public long getRetainedSizeInBytes() + { + return retainedSizeInBytes; + } + + @Override + public long getSizeInBytes() + { + return sizeInBytes; + } + + private void reset() + { + initialEntryCount = calculateBlockResetSize(positionCount); + initialized = false; + valueIsNull = new boolean[0]; + values = new short[0]; + positionCount = 0; + sizeInBytes = 0; + hasNonNullValue = false; + hasNullValue = false; + updateRetainedSize(); + } + + private void ensureCapacity(int capacity) + { + if (values.length >= capacity) { + return; + } + + int newSize; + if (initialized) { + newSize = calculateNewArraySize(values.length); + } + else { + newSize = initialEntryCount; + initialized = true; + } + newSize = Math.max(newSize, capacity); + + valueIsNull = Arrays.copyOf(valueIsNull, newSize); + values = Arrays.copyOf(values, newSize); + updateRetainedSize(); + } + + private void updateSize(long positionsSize) + { + sizeInBytes += ShortArrayBlock.SIZE_IN_BYTES_PER_POSITION * positionsSize; + } + + private void updateRetainedSize() + { + retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values); + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java new file mode 100644 index 000000000000..5210b04e2d85 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java @@ -0,0 +1,271 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.spi.block.Block; +import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.block.VariableWidthBlock; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.openjdk.jol.info.ClassLayout; + +import java.util.Arrays; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.SIZE_OF_BYTE; +import static io.airlift.slice.SizeOf.SIZE_OF_INT; +import static io.airlift.slice.SizeOf.sizeOf; +import static io.airlift.slice.Slices.EMPTY_SLICE; +import static io.trino.operator.output.PositionsAppenderUtil.MAX_ARRAY_SIZE; +import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetBytes; +import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; +import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; +import static java.lang.Math.min; + +public class SlicePositionsAppender + implements PositionsAppender +{ + private static final int EXPECTED_BYTES_PER_ENTRY = 32; + private static final int INSTANCE_SIZE = ClassLayout.parseClass(SlicePositionsAppender.class).instanceSize(); + private static final Block NULL_VALUE_BLOCK = new VariableWidthBlock(1, EMPTY_SLICE, new int[] {0, 0}, Optional.of(new boolean[] {true})); + + private boolean initialized; + private int initialEntryCount; + private int initialBytesSize; + + private byte[] bytes = new byte[0]; + private int currentOffset; + + private boolean hasNullValue; + private boolean hasNonNullValue; + // it is assumed that the offsets array is one position longer than the valueIsNull array + private boolean[] valueIsNull = new boolean[0]; + private int[] offsets = new int[1]; + + private int positionCount; + + private long retainedSizeInBytes; + private long sizeInBytes; + + public SlicePositionsAppender(int expectedEntries, long maxPageSizeInBytes) + { + this(expectedEntries, getExpectedBytes(maxPageSizeInBytes, expectedEntries)); + } + + public SlicePositionsAppender(int expectedEntries, int expectedBytes) + { + initialEntryCount = expectedEntries; + initialBytesSize = min(expectedBytes, MAX_ARRAY_SIZE); + + updateRetainedSize(); + } + + @Override + public void append(IntArrayList positions, Block block) + { + if (positions.isEmpty()) { + return; + } + // performance of this method depends on block being always the same, flat type + checkArgument(block instanceof VariableWidthBlock); + ensurePositionCapacity(positionCount + positions.size()); + int[] positionArray = positions.elements(); + int newByteCount = 0; + int[] lengths = new int[positions.size()]; + + if (block.mayHaveNull()) { + for (int i = 0; i < positions.size(); i++) { + int position = positionArray[i]; + if (block.isNull(position)) { + offsets[positionCount + i + 1] = offsets[positionCount + i]; + valueIsNull[positionCount + i] = true; + hasNullValue = true; + } + else { + int length = block.getSliceLength(position); + lengths[i] = length; + newByteCount += length; + offsets[positionCount + i + 1] = offsets[positionCount + i] + length; + hasNonNullValue = true; + } + } + } + else { + for (int i = 0; i < positions.size(); i++) { + int position = positionArray[i]; + int length = block.getSliceLength(position); + lengths[i] = length; + newByteCount += length; + offsets[positionCount + i + 1] = offsets[positionCount + i] + length; + } + hasNonNullValue = true; + } + copyBytes(block, lengths, positionArray, positions.size(), offsets, positionCount, newByteCount); + } + + @Override + public void appendRle(RunLengthEncodedBlock block) + { + int rlePositionCount = block.getPositionCount(); + if (rlePositionCount == 0) { + return; + } + int sourcePosition = 0; + ensurePositionCapacity(positionCount + rlePositionCount); + if (block.isNull(sourcePosition)) { + int offset = offsets[positionCount]; + Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true); + Arrays.fill(offsets, positionCount + 1, positionCount + rlePositionCount + 1, offset); + positionCount += rlePositionCount; + + hasNullValue = true; + updateSize(rlePositionCount, 0); + } + else { + int startOffset = offsets[positionCount]; + hasNonNullValue = true; + duplicateBytes(block.getValue(), sourcePosition, rlePositionCount, startOffset); + } + } + + @Override + public Block build() + { + if (!hasNonNullValue) { + return new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount); + } + VariableWidthBlock result = new VariableWidthBlock( + positionCount, + Slices.wrappedBuffer(bytes, 0, currentOffset), + offsets, + hasNullValue ? Optional.of(valueIsNull) : Optional.empty()); + reset(); + return result; + } + + @Override + public long getRetainedSizeInBytes() + { + return retainedSizeInBytes; + } + + @Override + public long getSizeInBytes() + { + return sizeInBytes; + } + + private void copyBytes(Block block, int[] lengths, int[] positions, int count, int[] targetOffsets, int targetOffsetsIndex, int newByteCount) + { + ensureBytesCapacity(currentOffset + newByteCount); + + for (int i = 0; i < count; i++) { + int position = positions[i]; + if (!block.isNull(position)) { + int length = lengths[i]; + Slice slice = block.getSlice(position, 0, length); + slice.getBytes(0, bytes, targetOffsets[targetOffsetsIndex + i], length); + } + } + + positionCount += count; + currentOffset += newByteCount; + updateSize(count, newByteCount); + } + + /** + * Copy {@code length} bytes from {@code block}, at position {@code position} to {@code count} consecutive positions in the {@link #bytes} array. + */ + private void duplicateBytes(Block block, int position, int count, int startOffset) + { + int length = block.getSliceLength(position); + int newByteCount = count * length; + ensureBytesCapacity(currentOffset + newByteCount); + + Slice slice = block.getSlice(position, 0, length); + for (int i = 0; i < count; i++) { + slice.getBytes(0, bytes, startOffset + (i * length), length); + offsets[positionCount + i + 1] = startOffset + ((i + 1) * length); + } + + positionCount += count; + currentOffset += newByteCount; + updateSize(count, newByteCount); + } + + private void reset() + { + initialEntryCount = calculateBlockResetSize(positionCount); + initialBytesSize = calculateBlockResetBytes(currentOffset); + initialized = false; + valueIsNull = new boolean[0]; + offsets = new int[1]; + bytes = new byte[0]; + positionCount = 0; + currentOffset = 0; + sizeInBytes = 0; + hasNonNullValue = false; + hasNullValue = false; + updateRetainedSize(); + } + + private void updateSize(long positionsSize, int bytesWritten) + { + sizeInBytes += (SIZE_OF_BYTE + SIZE_OF_INT) * positionsSize + bytesWritten; + } + + private void ensureBytesCapacity(int bytesCapacity) + { + if (bytes.length < bytesCapacity) { + int newBytesLength = Math.max(bytes.length, initialBytesSize); + if (bytesCapacity > newBytesLength) { + newBytesLength = Math.max(bytesCapacity, calculateNewArraySize(newBytesLength)); + } + bytes = Arrays.copyOf(bytes, newBytesLength); + updateRetainedSize(); + } + } + + private void ensurePositionCapacity(int capacity) + { + if (valueIsNull.length < capacity) { + int newSize; + if (initialized) { + newSize = calculateNewArraySize(valueIsNull.length); + } + else { + newSize = initialEntryCount; + initialized = true; + } + newSize = Math.max(newSize, capacity); + + valueIsNull = Arrays.copyOf(valueIsNull, newSize); + offsets = Arrays.copyOf(offsets, newSize + 1); + updateRetainedSize(); + } + } + + private void updateRetainedSize() + { + retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(offsets) + sizeOf(bytes); + } + + private static int getExpectedBytes(long maxPageSizeInBytes, int expectedPositions) + { + // it is guaranteed Math.min will not overflow; safe to cast + return (int) min((long) expectedPositions * EXPECTED_BYTES_PER_ENTRY, maxPageSizeInBytes); + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/TypedPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/TypedPositionsAppender.java new file mode 100644 index 000000000000..4ab5028c7847 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/TypedPositionsAppender.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.type.Type; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.openjdk.jol.info.ClassLayout; + +import static java.util.Objects.requireNonNull; + +class TypedPositionsAppender + implements PositionsAppender +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(TypedPositionsAppender.class).instanceSize(); + + private final Type type; + private BlockBuilder blockBuilder; + + TypedPositionsAppender(Type type, int expectedPositions) + { + this( + type, + requireNonNull(type, "type is null").createBlockBuilder(null, expectedPositions)); + } + + TypedPositionsAppender(Type type, BlockBuilder blockBuilder) + { + this.type = requireNonNull(type, "type is null"); + this.blockBuilder = requireNonNull(blockBuilder, "blockBuilder is null"); + } + + @Override + public void append(IntArrayList positions, Block source) + { + int[] positionArray = positions.elements(); + for (int i = 0; i < positions.size(); i++) { + type.appendTo(source, positionArray[i], blockBuilder); + } + } + + @Override + public void appendRle(RunLengthEncodedBlock block) + { + for (int i = 0; i < block.getPositionCount(); i++) { + type.appendTo(block, 0, blockBuilder); + } + } + + @Override + public Block build() + { + Block result = blockBuilder.build(); + blockBuilder = blockBuilder.newBlockBuilderLike(null); + return result; + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + blockBuilder.getRetainedSizeInBytes(); + } + + @Override + public long getSizeInBytes() + { + return blockBuilder.getSizeInBytes(); + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java new file mode 100644 index 000000000000..86bbbb98bee3 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java @@ -0,0 +1,123 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import io.trino.spi.block.Block; +import io.trino.spi.block.DictionaryBlock; +import io.trino.spi.block.RunLengthEncodedBlock; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.openjdk.jol.info.ClassLayout; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +/** + * Dispatches the {@link #append} and {@link #appendRle} methods to the {@link #delegate} depending on the input {@link Block} class. + * The {@link Block} is flattened if necessary so that the {@link #delegate} {@link PositionsAppender#append(IntArrayList, Block)} + * always gets flat {@link Block} and {@link PositionsAppender#appendRle(RunLengthEncodedBlock)} always gets {@link RunLengthEncodedBlock} + * with {@link RunLengthEncodedBlock#getValue()} being flat {@link Block}. + */ +public class UnnestingPositionsAppender + implements PositionsAppender +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(UnnestingPositionsAppender.class).instanceSize(); + + private final PositionsAppender delegate; + + public UnnestingPositionsAppender(PositionsAppender delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public void append(IntArrayList positions, Block source) + { + if (positions.isEmpty()) { + return; + } + if (source instanceof RunLengthEncodedBlock) { + delegate.appendRle(flatten((RunLengthEncodedBlock) source, positions.size())); + } + else if (source instanceof DictionaryBlock) { + appendDictionary(positions, (DictionaryBlock) source); + } + else { + delegate.append(positions, source); + } + } + + @Override + public void appendRle(RunLengthEncodedBlock source) + { + if (source.getPositionCount() == 0) { + return; + } + delegate.appendRle(flatten(source, source.getPositionCount())); + } + + @Override + public Block build() + { + return delegate.build(); + } + + @Override + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + delegate.getRetainedSizeInBytes(); + } + + @Override + public long getSizeInBytes() + { + return delegate.getSizeInBytes(); + } + + private void appendDictionary(IntArrayList positions, DictionaryBlock source) + { + Block dictionary = source.getDictionary(); + + while (dictionary instanceof RunLengthEncodedBlock || dictionary instanceof DictionaryBlock) { + if (dictionary instanceof RunLengthEncodedBlock) { + // if at some level dictionary contains only a single value then it can be flattened to rle + appendRle(new RunLengthEncodedBlock(((RunLengthEncodedBlock) dictionary).getValue(), positions.size())); + return; + } + + // dictionary is a nested dictionary. we need to remap the ids + DictionaryBlock nestedDictionary = (DictionaryBlock) dictionary; + positions = mapPositions(positions, source); + dictionary = nestedDictionary.getDictionary(); + source = nestedDictionary; + } + delegate.append(mapPositions(positions, source), dictionary); + } + + private RunLengthEncodedBlock flatten(RunLengthEncodedBlock source, int positionCount) + { + checkArgument(positionCount > 0); + Block value = source.getValue().getSingleValueBlock(0); + checkArgument(!(value instanceof DictionaryBlock) && !(value instanceof RunLengthEncodedBlock), "value must be flat but got %s", value); + return new RunLengthEncodedBlock(value, positionCount); + } + + private IntArrayList mapPositions(IntArrayList positions, DictionaryBlock block) + { + int[] positionArray = new int[positions.size()]; + for (int i = 0; i < positions.size(); i++) { + positionArray[i] = block.getId(positions.getInt(i)); + } + return IntArrayList.wrap(positionArray); + } +} diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index c9cec05ec5af..40d34bbfe7cc 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -400,7 +400,7 @@ public class LocalExecutionPlanner private final BlockTypeOperators blockTypeOperators; private final TableExecuteContextManager tableExecuteContextManager; private final ExchangeManagerRegistry exchangeManagerRegistry; - private final PositionsAppenderFactory positionsAppenderFactory = new PositionsAppenderFactory(); + private final PositionsAppenderFactory positionsAppenderFactory; private final NonEvictableCache accumulatorFactoryCache = buildNonEvictableCache(CacheBuilder.newBuilder() .maximumSize(1000) @@ -464,6 +464,7 @@ public LocalExecutionPlanner( this.blockTypeOperators = requireNonNull(blockTypeOperators, "blockTypeOperators is null"); this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null"); this.exchangeManagerRegistry = requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null"); + this.positionsAppenderFactory = new PositionsAppenderFactory(blockTypeOperators); } public LocalExecutionPlan plan( diff --git a/core/trino-main/src/test/java/io/trino/block/BlockAssertions.java b/core/trino-main/src/test/java/io/trino/block/BlockAssertions.java index cc92a63a59dc..c5b03b194f09 100644 --- a/core/trino-main/src/test/java/io/trino/block/BlockAssertions.java +++ b/core/trino-main/src/test/java/io/trino/block/BlockAssertions.java @@ -120,7 +120,7 @@ public static void assertBlockEquals(Type type, Block actual, Block expected) { assertEquals(actual.getPositionCount(), expected.getPositionCount()); for (int position = 0; position < actual.getPositionCount(); position++) { - assertEquals(type.getObjectValue(SESSION, actual, position), type.getObjectValue(SESSION, expected, position)); + assertEquals(type.getObjectValue(SESSION, actual, position), type.getObjectValue(SESSION, expected, position), "position " + position); } } @@ -524,6 +524,12 @@ public static Block createLongDecimalsBlock(Iterable values) return builder.build(); } + public static Block createLongTimestampBlock(TimestampType type, LongTimestamp... values) + { + requireNonNull(values, "values is null"); + return createLongTimestampBlock(type, Arrays.asList(values)); + } + public static Block createLongTimestampBlock(TimestampType type, Iterable values) { BlockBuilder builder = type.createBlockBuilder(null, 100); @@ -545,6 +551,30 @@ public static Block createCharsBlock(CharType charType, List values) return createBlock(charType, charType::writeString, values); } + public static Block createTinyintsBlock(Integer... values) + { + requireNonNull(values, "values is null"); + + return createTinyintsBlock(Arrays.asList(values)); + } + + public static Block createTinyintsBlock(Iterable values) + { + return createBlock(TINYINT, (ValueWriter) TINYINT::writeLong, values); + } + + public static Block createSmallintsBlock(Integer... values) + { + requireNonNull(values, "values is null"); + + return createSmallintsBlock(Arrays.asList(values)); + } + + public static Block createSmallintsBlock(Iterable values) + { + return createBlock(SMALLINT, (ValueWriter) SMALLINT::writeLong, values); + } + public static Block createIntsBlock(Integer... values) { requireNonNull(values, "values is null"); diff --git a/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java b/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java index 0eb4f4d73d7a..cf9db82fc12a 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java @@ -50,6 +50,7 @@ import io.trino.sql.planner.HashBucketFunction; import io.trino.sql.planner.plan.PlanNodeId; import io.trino.testing.TestingTaskContext; +import io.trino.type.BlockTypeOperators; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -110,7 +111,7 @@ @BenchmarkMode(Mode.AverageTime) public class BenchmarkPartitionedOutputOperator { - private static final PositionsAppenderFactory POSITIONS_APPENDER_FACTORY = new PositionsAppenderFactory(); + private static final PositionsAppenderFactory POSITIONS_APPENDER_FACTORY = new PositionsAppenderFactory(new BlockTypeOperators()); @Benchmark public void addPage(BenchmarkData data) diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java new file mode 100644 index 000000000000..25fded9678b4 --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java @@ -0,0 +1,786 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.slice.Slice; +import io.airlift.units.DataSize; +import io.trino.execution.StateMachine; +import io.trino.execution.buffer.BufferResult; +import io.trino.execution.buffer.BufferState; +import io.trino.execution.buffer.OutputBuffer; +import io.trino.execution.buffer.OutputBufferInfo; +import io.trino.execution.buffer.OutputBuffers; +import io.trino.execution.buffer.PagesSerde; +import io.trino.execution.buffer.PagesSerdeFactory; +import io.trino.operator.BucketPartitionFunction; +import io.trino.operator.DriverContext; +import io.trino.operator.OperatorContext; +import io.trino.operator.OutputFactory; +import io.trino.operator.PartitionFunction; +import io.trino.spi.Page; +import io.trino.spi.block.Block; +import io.trino.spi.block.DictionaryBlock; +import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.block.TestingBlockEncodingSerde; +import io.trino.spi.predicate.NullableValue; +import io.trino.spi.type.AbstractType; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.Decimals; +import io.trino.spi.type.TimestampType; +import io.trino.spi.type.Type; +import io.trino.sql.planner.plan.PlanNodeId; +import io.trino.testing.TestingTaskContext; +import io.trino.type.BlockTypeOperators; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static io.trino.SessionTestUtils.TEST_SESSION; +import static io.trino.block.BlockAssertions.createLongDictionaryBlock; +import static io.trino.block.BlockAssertions.createLongSequenceBlock; +import static io.trino.block.BlockAssertions.createLongsBlock; +import static io.trino.block.BlockAssertions.createRLEBlock; +import static io.trino.block.BlockAssertions.createRandomBlockForType; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.CharType.createCharType; +import static io.trino.spi.type.DecimalType.createDecimalType; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TinyintType.TINYINT; +import static io.trino.spi.type.UuidType.UUID; +import static io.trino.spi.type.VarbinaryType.VARBINARY; +import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; +import static io.trino.sql.planner.SystemPartitioningHandle.SystemPartitionFunction.ROUND_ROBIN; +import static io.trino.type.IpAddressType.IPADDRESS; +import static java.lang.Math.toIntExact; +import static java.util.Collections.nCopies; +import static java.util.Collections.unmodifiableList; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newCachedThreadPool; +import static java.util.concurrent.Executors.newScheduledThreadPool; +import static org.assertj.core.api.Assertions.assertThat; + +@Test(singleThreaded = true) +public class TestPagePartitioner +{ + private static final DataSize MAX_MEMORY = DataSize.of(50, MEGABYTE); + private static final DataSize PARTITION_MAX_MEMORY = DataSize.of(5, MEGABYTE); + + private static final int POSITIONS_PER_PAGE = 8; + private static final int PARTITION_COUNT = 2; + + private static final PagesSerdeFactory PAGES_SERDE_FACTORY = new PagesSerdeFactory(new TestingBlockEncodingSerde(), false); + private static final PagesSerde PAGES_SERDE = PAGES_SERDE_FACTORY.createPagesSerde(); + + private ExecutorService executor; + private ScheduledExecutorService scheduledExecutor; + private TestOutputBuffer outputBuffer; + + @BeforeClass + public void setUpClass() + { + executor = newCachedThreadPool(daemonThreadsNamed(getClass().getSimpleName() + "-executor-%s")); + scheduledExecutor = newScheduledThreadPool(1, daemonThreadsNamed(getClass().getSimpleName() + "-scheduledExecutor-%s")); + } + + @AfterClass(alwaysRun = true) + public void tearDownClass() + { + executor.shutdownNow(); + executor = null; + scheduledExecutor.shutdownNow(); + scheduledExecutor = null; + } + + @BeforeMethod + public void setUp() + { + outputBuffer = new TestOutputBuffer(); + } + + @Test + public void testOutputForEmptyPage() + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT).build(); + Page page = new Page(createLongsBlock(ImmutableList.of())); + + pagePartitioner.partitionPage(page); + pagePartitioner.forceFlush(); + + List partitioned = readLongs(outputBuffer.getEnqueuedDeserialized(), 0); + assertThat(partitioned).isEmpty(); + } + + @Test(dataProvider = "partitioningMode") + public void testOutputEqualsInput(PartitioningMode partitioningMode) + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT).build(); + Page page = new Page(createLongSequenceBlock(0, POSITIONS_PER_PAGE)); + List expected = readLongs(Stream.of(page), 0); + + processPages(pagePartitioner, partitioningMode, page); + + List partitioned = readLongs(outputBuffer.getEnqueuedDeserialized(), 0); + assertThat(partitioned).containsExactlyInAnyOrderElementsOf(expected); // order is different due to 2 partitions joined + } + + @Test(dataProvider = "partitioningMode") + public void testOutputForPageWithNoBlockPartitionFunction(PartitioningMode partitioningMode) + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT) + .withPartitionFunction(new BucketPartitionFunction( + ROUND_ROBIN.createBucketFunction(null, false, PARTITION_COUNT, null), + IntStream.range(0, PARTITION_COUNT).toArray())) + .withPartitionChannels(ImmutableList.of()) + .build(); + Page page = new Page(createLongSequenceBlock(0, POSITIONS_PER_PAGE)); + + processPages(pagePartitioner, partitioningMode, page); + + List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); + assertThat(partition0).containsExactly(0L, 2L, 4L, 6L); + List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); + assertThat(partition1).containsExactly(1L, 3L, 5L, 7L); + } + + @Test(dataProvider = "partitioningMode") + public void testOutputForMultipleSimplePages(PartitioningMode partitioningMode) + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT).build(); + Page page1 = new Page(createLongSequenceBlock(0, POSITIONS_PER_PAGE)); + Page page2 = new Page(createLongSequenceBlock(1, POSITIONS_PER_PAGE)); + Page page3 = new Page(createLongSequenceBlock(2, POSITIONS_PER_PAGE)); + List expected = readLongs(Stream.of(page1, page2, page3), 0); + + processPages(pagePartitioner, partitioningMode, page1, page2, page3); + + List partitioned = readLongs(outputBuffer.getEnqueuedDeserialized(), 0); + assertThat(partitioned).containsExactlyInAnyOrderElementsOf(expected); // order is different due to 2 partitions joined + } + + @Test(dataProvider = "partitioningMode") + public void testOutputForSimplePageWithReplication(PartitioningMode partitioningMode) + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT).replicate().build(); + Page page = new Page(createLongsBlock(0L, 1L, 2L, 3L, null)); + + processPages(pagePartitioner, partitioningMode, page); + + List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); + assertThat(partition0).containsExactly(0L, 2L, null); + List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); + assertThat(partition1).containsExactly(0L, 1L, 3L); // position 0 copied to all partitions + } + + @Test(dataProvider = "partitioningMode") + public void testOutputForSimplePageWithNullChannel(PartitioningMode partitioningMode) + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT).withNullChannel(0).build(); + Page page = new Page(createLongsBlock(0L, 1L, 2L, 3L, null)); + + processPages(pagePartitioner, partitioningMode, page); + + List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); + assertThat(partition0).containsExactlyInAnyOrder(0L, 2L, null); + List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); + assertThat(partition1).containsExactlyInAnyOrder(1L, 3L, null); // null copied to all partitions + } + + @Test(dataProvider = "partitioningMode") + public void testOutputForSimplePageWithPartitionConstant(PartitioningMode partitioningMode) + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT) + .withPartitionConstants(ImmutableList.of(Optional.of(new NullableValue(BIGINT, 1L)))) + .withPartitionChannels(-1) + .build(); + Page page = new Page(createLongsBlock(0L, 1L, 2L, 3L, null)); + List allValues = readLongs(Stream.of(page), 0); + + processPages(pagePartitioner, partitioningMode, page); + + List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); + assertThat(partition0).isEmpty(); + List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); + assertThat(partition1).containsExactlyElementsOf(allValues); + } + + @Test(dataProvider = "partitioningMode") + public void testOutputForSimplePageWithPartitionConstantAndHashBlock(PartitioningMode partitioningMode) + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT) + .withPartitionConstants(ImmutableList.of(Optional.empty(), Optional.of(new NullableValue(BIGINT, 1L)))) + .withPartitionChannels(0, -1) // use first block and constant block at index 1 as input to partitionFunction + .withHashChannels(0, 1) // use both channels to calculate partition (a+b) mod 2 + .build(); + Page page = new Page(createLongsBlock(0L, 1L, 2L, 3L)); + + processPages(pagePartitioner, partitioningMode, page); + + List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); + assertThat(partition0).containsExactly(1L, 3L); + List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); + assertThat(partition1).containsExactly(0L, 2L); + } + + @Test(dataProvider = "partitioningMode") + public void testPartitionPositionsWithRleNotNull(PartitioningMode partitioningMode) + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT, BIGINT).build(); + Page page = new Page(createRLEBlock(0, POSITIONS_PER_PAGE), createLongSequenceBlock(0, POSITIONS_PER_PAGE)); + + processPages(pagePartitioner, partitioningMode, page); + + List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 1); + assertThat(partition0).containsExactlyElementsOf(readLongs(Stream.of(page), 1)); + List partition0HashBlock = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); + assertThat(partition0HashBlock).containsOnly(0L).hasSize(POSITIONS_PER_PAGE); + assertThat(outputBuffer.getEnqueuedDeserialized(1)).isEmpty(); + } + + @Test(dataProvider = "partitioningMode") + public void testPartitionPositionsWithRleNotNullWithReplication(PartitioningMode partitioningMode) + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT, BIGINT).replicate().build(); + Page page = new Page(createRLEBlock(0, POSITIONS_PER_PAGE), createLongSequenceBlock(0, POSITIONS_PER_PAGE)); + + processPages(pagePartitioner, partitioningMode, page); + + List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 1); + assertThat(partition0).containsExactlyElementsOf(readLongs(Stream.of(page), 1)); + List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 1); + assertThat(partition1).containsExactly(0L); // position 0 copied to all partitions + } + + @Test(dataProvider = "partitioningMode") + public void testPartitionPositionsWithRleNullWithNullChannel(PartitioningMode partitioningMode) + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT, BIGINT).withNullChannel(0).build(); + Page page = new Page(new RunLengthEncodedBlock(createLongsBlock((Long) null), POSITIONS_PER_PAGE), createLongSequenceBlock(0, POSITIONS_PER_PAGE)); + + processPages(pagePartitioner, partitioningMode, page); + + List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 1); + assertThat(partition0).containsExactlyElementsOf(readLongs(Stream.of(page), 1)); + List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 1); + assertThat(partition1).containsExactlyElementsOf(readLongs(Stream.of(page), 1)); + } + + @Test(dataProvider = "partitioningMode") + public void testOutputForDictionaryBlock(PartitioningMode partitioningMode) + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT).build(); + Page page = new Page(createLongDictionaryBlock(0, 10)); // must have at least 10 position to have non-trivial dict + + processPages(pagePartitioner, partitioningMode, page); + + List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); + assertThat(partition0).containsExactlyElementsOf(nCopies(5, 0L)); + List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); + assertThat(partition1).containsExactlyElementsOf(nCopies(5, 1L)); + } + + @Test(dataProvider = "partitioningMode") + public void testOutputForOneValueDictionaryBlock(PartitioningMode partitioningMode) + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT).build(); + Page page = new Page(new DictionaryBlock(createLongsBlock(0), new int[] {0, 0, 0, 0})); + + processPages(pagePartitioner, partitioningMode, page); + + List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); + assertThat(partition0).containsExactlyElementsOf(nCopies(4, 0L)); + List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); + assertThat(partition1).isEmpty(); + } + + @Test(dataProvider = "partitioningMode") + public void testOutputForViewDictionaryBlock(PartitioningMode partitioningMode) + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT).build(); + Page page = new Page(new DictionaryBlock(createLongSequenceBlock(4, 8), new int[] {1, 0, 3, 2})); + + processPages(pagePartitioner, partitioningMode, page); + + List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); + assertThat(partition0).containsExactlyInAnyOrder(4L, 6L); + List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); + assertThat(partition1).containsExactlyInAnyOrder(5L, 7L); + } + + @Test(dataProvider = "typesWithPartitioningMode") + public void testOutputForSimplePageWithType(Type type, PartitioningMode partitioningMode) + { + PagePartitioner pagePartitioner = pagePartitioner(BIGINT, type).build(); + Page page = new Page( + createLongSequenceBlock(0, POSITIONS_PER_PAGE), // partition block + createBlockForType(type, POSITIONS_PER_PAGE)); + List expected = readChannel(Stream.of(page), 1, type); + + processPages(pagePartitioner, partitioningMode, page); + + List partitioned = readChannel(outputBuffer.getEnqueuedDeserialized(), 1, type); + assertThat(partitioned).containsExactlyInAnyOrderElementsOf(expected); // order is different due to 2 partitions joined + } + + @Test(dataProvider = "types") + public void testOutputWithMixedRowWiseAndColumnarPartitioning(Type type) + { + testOutputEqualsInput(type, PartitioningMode.COLUMNAR, PartitioningMode.ROW_WISE); + testOutputEqualsInput(type, PartitioningMode.ROW_WISE, PartitioningMode.COLUMNAR); + } + + private void testOutputEqualsInput(Type type, PartitioningMode mode1, PartitioningMode mode2) + { + PagePartitionerBuilder pagePartitionerBuilder = pagePartitioner(BIGINT, type, type); + PagePartitioner pagePartitioner = pagePartitionerBuilder.build(); + Page input = new Page( + createLongSequenceBlock(0, POSITIONS_PER_PAGE), // partition block + createBlockForType(type, POSITIONS_PER_PAGE), + createBlockForType(type, POSITIONS_PER_PAGE)); + + List expected = readChannel(Stream.of(input, input), 1, type); + + mode1.partitionPage(pagePartitioner, input); + mode2.partitionPage(pagePartitioner, input); + + pagePartitioner.forceFlush(); + + List partitioned = readChannel(outputBuffer.getEnqueuedDeserialized(), 1, type); + assertThat(partitioned).containsExactlyInAnyOrderElementsOf(expected); // output of the PagePartitioner can be reordered + outputBuffer.clear(); + } + + @DataProvider(name = "partitioningMode") + public static Object[][] partitioningMode() + { + return new Object[][] {{PartitioningMode.ROW_WISE}, {PartitioningMode.COLUMNAR}}; + } + + @DataProvider(name = "types") + public static Object[][] types() + { + return getTypes().stream().map(type -> new Object[] {type}).toArray(Object[][]::new); + } + + @DataProvider(name = "typesWithPartitioningMode") + public static Object[][] typesWithPartitioningMode() + { + return getTypes().stream() + .flatMap(type -> Stream.of(PartitioningMode.values()) + .map(partitioningMode -> new Object[] {type, partitioningMode})) + .toArray(Object[][]::new); + } + + private static ImmutableList getTypes() + { + return ImmutableList.of( + BIGINT, + BOOLEAN, + INTEGER, + createCharType(10), + createUnboundedVarcharType(), + DOUBLE, + SMALLINT, + TINYINT, + UUID, + VARBINARY, + createDecimalType(1), + createDecimalType(Decimals.MAX_SHORT_PRECISION + 1), + new ArrayType(BIGINT), + TimestampType.createTimestampType(9), + TimestampType.createTimestampType(3), + IPADDRESS); + } + + private Block createBlockForType(Type type, int positionsPerPage) + { + return createRandomBlockForType(type, positionsPerPage, 0.2F); + } + + private static void processPages(PagePartitioner pagePartitioner, PartitioningMode partitioningMode, Page... pages) + { + for (Page page : pages) { + partitioningMode.partitionPage(pagePartitioner, page); + } + pagePartitioner.forceFlush(); + } + + private static List readLongs(Stream pages, int channel) + { + return readChannel(pages, channel, BIGINT); + } + + private static List readChannel(Stream pages, int channel, Type type) + { + List result = new ArrayList<>(); + + pages.forEach(page -> { + Block block = page.getBlock(channel); + for (int i = 0; i < block.getPositionCount(); i++) { + if (block.isNull(i)) { + result.add(null); + } + else { + result.add(type.getObjectValue(null, block, i)); + } + } + }); + return unmodifiableList(result); + } + + private PagePartitionerBuilder pagePartitioner(Type... types) + { + return pagePartitioner(ImmutableList.copyOf(types)); + } + + private PagePartitionerBuilder pagePartitioner(List types) + { + return pagePartitioner().withTypes(types); + } + + private PagePartitionerBuilder pagePartitioner() + { + return new PagePartitionerBuilder(executor, scheduledExecutor, outputBuffer); + } + + private enum PartitioningMode + { + ROW_WISE { + @Override + public void partitionPage(PagePartitioner pagePartitioner, Page page) + { + pagePartitioner.partitionPageByRow(page); + } + }, + COLUMNAR { + @Override + public void partitionPage(PagePartitioner pagePartitioner, Page page) + { + pagePartitioner.partitionPageByColumn(page); + } + }; + + public abstract void partitionPage(PagePartitioner pagePartitioner, Page page); + } + + public static class PagePartitionerBuilder + { + public static final PositionsAppenderFactory POSITIONS_APPENDER_FACTORY = new PositionsAppenderFactory(new BlockTypeOperators()); + private final ExecutorService executor; + private final ScheduledExecutorService scheduledExecutor; + private final OutputBuffer outputBuffer; + + private ImmutableList partitionChannels = ImmutableList.of(0); + private List> partitionConstants = ImmutableList.of(); + private PartitionFunction partitionFunction = new SumModuloPartitionFunction(PARTITION_COUNT, 0); + private boolean shouldReplicate; + private OptionalInt nullChannel = OptionalInt.empty(); + private List types; + + PagePartitionerBuilder(ExecutorService executor, ScheduledExecutorService scheduledExecutor, OutputBuffer outputBuffer) + { + this.executor = requireNonNull(executor, "executor is null"); + this.scheduledExecutor = requireNonNull(scheduledExecutor, "scheduledExecutor is null"); + this.outputBuffer = requireNonNull(outputBuffer, "outputBuffer is null"); + } + + public PagePartitionerBuilder withPartitionChannels(Integer... partitionChannels) + { + return withPartitionChannels(ImmutableList.copyOf(partitionChannels)); + } + + public PagePartitionerBuilder withPartitionChannels(ImmutableList partitionChannels) + { + this.partitionChannels = partitionChannels; + return this; + } + + public PagePartitionerBuilder withPartitionConstants(List> partitionConstants) + { + this.partitionConstants = partitionConstants; + return this; + } + + public PagePartitionerBuilder withHashChannels(int... hashChannels) + { + return withPartitionFunction(new SumModuloPartitionFunction(PARTITION_COUNT, hashChannels)); + } + + public PagePartitionerBuilder withPartitionFunction(PartitionFunction partitionFunction) + { + this.partitionFunction = partitionFunction; + return this; + } + + public PagePartitionerBuilder replicate() + { + return withShouldReplicate(true); + } + + public PagePartitionerBuilder withShouldReplicate(boolean shouldReplicate) + { + this.shouldReplicate = shouldReplicate; + return this; + } + + public PagePartitionerBuilder withNullChannel(int nullChannel) + { + return withNullChannel(OptionalInt.of(nullChannel)); + } + + public PagePartitionerBuilder withNullChannel(OptionalInt nullChannel) + { + this.nullChannel = nullChannel; + return this; + } + + public PagePartitionerBuilder withTypes(Type... types) + { + return withTypes(ImmutableList.copyOf(types)); + } + + public PagePartitionerBuilder withTypes(List types) + { + this.types = types; + return this; + } + + public PartitionedOutputOperator buildPartitionedOutputOperator() + { + DriverContext driverContext = buildDriverContext(); + + OutputFactory operatorFactory = new PartitionedOutputOperator.PartitionedOutputFactory( + partitionFunction, + partitionChannels, + partitionConstants, + shouldReplicate, + nullChannel, + outputBuffer, + PARTITION_MAX_MEMORY, + POSITIONS_APPENDER_FACTORY); + + return (PartitionedOutputOperator) operatorFactory + .createOutputOperator(0, new PlanNodeId("plan-node-0"), types, Function.identity(), PAGES_SERDE_FACTORY) + .createOperator(driverContext); + } + + public PagePartitioner build() + { + DriverContext driverContext = buildDriverContext(); + + OperatorContext operatorContext = driverContext.addOperatorContext(0, new PlanNodeId("plan-node-0"), PartitionedOutputOperator.class.getSimpleName()); + + return new PagePartitioner( + partitionFunction, + partitionChannels, + partitionConstants, + shouldReplicate, + nullChannel, + outputBuffer, + PAGES_SERDE_FACTORY, + types, + PARTITION_MAX_MEMORY, + operatorContext, + POSITIONS_APPENDER_FACTORY); + } + + private DriverContext buildDriverContext() + { + return TestingTaskContext.builder(executor, scheduledExecutor, TEST_SESSION) + .setMemoryPoolSize(MAX_MEMORY) + .build() + .addPipelineContext(0, true, true, false) + .addDriverContext(); + } + } + + public static class TestOutputBuffer + implements OutputBuffer + { + private final Multimap enqueued = ArrayListMultimap.create(); + + public Stream getEnqueuedDeserialized() + { + return getEnqueued().stream().map(PAGES_SERDE::deserialize); + } + + public List getEnqueued() + { + return ImmutableList.copyOf(enqueued.values()); + } + + public void clear() + { + enqueued.clear(); + } + + public Stream getEnqueuedDeserialized(int partition) + { + return getEnqueued(partition).stream().map(PAGES_SERDE::deserialize); + } + + public List getEnqueued(int partition) + { + Collection serializedPages = enqueued.get(partition); + return serializedPages == null ? ImmutableList.of() : ImmutableList.copyOf(serializedPages); + } + + @Override + public void enqueue(int partition, List pages) + { + enqueued.putAll(partition, pages); + } + + @Override + public OutputBufferInfo getInfo() + { + return null; + } + + @Override + public BufferState getState() + { + return BufferState.NO_MORE_BUFFERS; + } + + @Override + public double getUtilization() + { + return 0; + } + + @Override + public boolean isOverutilized() + { + return false; + } + + @Override + public void addStateChangeListener(StateMachine.StateChangeListener stateChangeListener) + { + } + + @Override + public void setOutputBuffers(OutputBuffers newOutputBuffers) + { + } + + @Override + public ListenableFuture get(OutputBuffers.OutputBufferId bufferId, long token, DataSize maxSize) + { + return null; + } + + @Override + public void acknowledge(OutputBuffers.OutputBufferId bufferId, long token) + { + } + + @Override + public void destroy(OutputBuffers.OutputBufferId bufferId) + { + } + + @Override + public ListenableFuture isFull() + { + return null; + } + + @Override + public void enqueue(List pages) + { + } + + @Override + public void setNoMorePages() + { + } + + @Override + public void destroy() + { + } + + @Override + public void abort() + { + } + + @Override + public long getPeakMemoryUsage() + { + return 0; + } + + @Override + public Optional getFailureCause() + { + return Optional.empty(); + } + } + + private static class SumModuloPartitionFunction + implements PartitionFunction + { + private final int[] hashChannels; + private final int partitionCount; + + SumModuloPartitionFunction(int partitionCount, int... hashChannels) + { + checkArgument(partitionCount > 0); + this.partitionCount = partitionCount; + this.hashChannels = hashChannels; + } + + @Override + public int getPartitionCount() + { + return partitionCount; + } + + @Override + public int getPartition(Page page, int position) + { + long value = 0; + for (int i = 0; i < hashChannels.length; i++) { + value += page.getBlock(hashChannels[i]).getLong(position, 0); + } + + return toIntExact(Math.abs(value) % partitionCount); + } + } +} diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPartitionedOutputOperator.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPartitionedOutputOperator.java index b5d591b38f32..cb885a485cd1 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/TestPartitionedOutputOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPartitionedOutputOperator.java @@ -13,98 +13,28 @@ */ package io.trino.operator.output; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Multimap; -import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.slice.Slice; -import io.airlift.units.DataSize; -import io.trino.execution.StateMachine; -import io.trino.execution.buffer.BufferResult; -import io.trino.execution.buffer.BufferState; -import io.trino.execution.buffer.OutputBuffer; -import io.trino.execution.buffer.OutputBufferInfo; -import io.trino.execution.buffer.OutputBuffers; -import io.trino.execution.buffer.PagesSerde; -import io.trino.execution.buffer.PagesSerdeFactory; -import io.trino.operator.BucketPartitionFunction; -import io.trino.operator.DriverContext; import io.trino.operator.OperatorContext; -import io.trino.operator.OutputFactory; -import io.trino.operator.PartitionFunction; +import io.trino.operator.output.TestPagePartitioner.PagePartitionerBuilder; +import io.trino.operator.output.TestPagePartitioner.TestOutputBuffer; import io.trino.spi.Page; -import io.trino.spi.block.Block; -import io.trino.spi.block.DictionaryBlock; -import io.trino.spi.block.RunLengthEncodedBlock; -import io.trino.spi.block.TestingBlockEncodingSerde; -import io.trino.spi.predicate.NullableValue; -import io.trino.spi.type.ArrayType; -import io.trino.spi.type.Decimals; -import io.trino.spi.type.TimestampType; -import io.trino.spi.type.Type; -import io.trino.sql.planner.plan.PlanNodeId; -import io.trino.testing.TestingTaskContext; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.OptionalInt; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; -import java.util.function.Function; -import java.util.stream.IntStream; -import java.util.stream.Stream; -import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.concurrent.Threads.daemonThreadsNamed; -import static io.airlift.units.DataSize.Unit.MEGABYTE; -import static io.trino.SessionTestUtils.TEST_SESSION; -import static io.trino.block.BlockAssertions.createLongDictionaryBlock; import static io.trino.block.BlockAssertions.createLongSequenceBlock; -import static io.trino.block.BlockAssertions.createLongsBlock; -import static io.trino.block.BlockAssertions.createRLEBlock; -import static io.trino.block.BlockAssertions.createRandomBlockForType; -import static io.trino.execution.buffer.OutputBuffers.BufferType.PARTITIONED; import static io.trino.spi.type.BigintType.BIGINT; -import static io.trino.spi.type.BooleanType.BOOLEAN; -import static io.trino.spi.type.CharType.createCharType; -import static io.trino.spi.type.DecimalType.createDecimalType; -import static io.trino.spi.type.DoubleType.DOUBLE; -import static io.trino.spi.type.IntegerType.INTEGER; -import static io.trino.spi.type.SmallintType.SMALLINT; -import static io.trino.spi.type.TinyintType.TINYINT; -import static io.trino.spi.type.UuidType.UUID; -import static io.trino.spi.type.VarbinaryType.VARBINARY; -import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; -import static io.trino.sql.planner.SystemPartitioningHandle.SystemPartitionFunction.ROUND_ROBIN; -import static io.trino.type.IpAddressType.IPADDRESS; -import static java.lang.Math.toIntExact; -import static java.util.Collections.nCopies; -import static java.util.Collections.unmodifiableList; -import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newScheduledThreadPool; -import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; @Test(singleThreaded = true) public class TestPartitionedOutputOperator { - private static final DataSize MAX_MEMORY = DataSize.of(50, MEGABYTE); - private static final DataSize PARTITION_MAX_MEMORY = DataSize.of(5, MEGABYTE); - - private static final int POSITIONS_PER_PAGE = 8; - private static final int PARTITION_COUNT = 2; - - private static final PagesSerdeFactory PAGES_SERDE_FACTORY = new PagesSerdeFactory(new TestingBlockEncodingSerde(), false); - private static final PagesSerde PAGES_SERDE = PAGES_SERDE_FACTORY.createPagesSerde(); - private ExecutorService executor; private ScheduledExecutorService scheduledExecutor; private TestOutputBuffer outputBuffer; @@ -132,561 +62,17 @@ public void setUp() } @Test - public void testOutputForSimplePage() + public void testOperatorContextStats() { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT).build(); - Page page = new Page(createLongSequenceBlock(0, POSITIONS_PER_PAGE)); - List expected = readLongs(Stream.of(page), 0); + PartitionedOutputOperator partitionedOutputOperator = new PagePartitionerBuilder(executor, scheduledExecutor, outputBuffer) + .withTypes(BIGINT).buildPartitionedOutputOperator(); + Page page = new Page(createLongSequenceBlock(0, 8)); - processPages(partitionedOutputOperator, page); + partitionedOutputOperator.addInput(page); + partitionedOutputOperator.finish(); - List partitioned = readLongs(outputBuffer.getEnqueuedDeserialized(), 0); - assertThat(partitioned).containsExactlyInAnyOrderElementsOf(expected); // order is different due to 2 partitions joined OperatorContext operatorContext = partitionedOutputOperator.getOperatorContext(); assertEquals(operatorContext.getOutputDataSize().getTotalCount(), page.getSizeInBytes()); assertEquals(operatorContext.getOutputPositions().getTotalCount(), page.getPositionCount()); } - - @Test - public void testOutputForEmptyPage() - { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT).build(); - Page page = new Page(createLongsBlock(ImmutableList.of())); - - processPages(partitionedOutputOperator, page); - - List partitioned = readLongs(outputBuffer.getEnqueuedDeserialized(), 0); - assertThat(partitioned).isEmpty(); - } - - @Test - public void testOutputForPageWithNoBlockPartitionFunction() - { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT) - .withPartitionFunction(new BucketPartitionFunction( - ROUND_ROBIN.createBucketFunction(null, false, PARTITION_COUNT, null), - IntStream.range(0, PARTITION_COUNT).toArray())) - .withPartitionChannels(ImmutableList.of()) - .build(); - Page page = new Page(createLongSequenceBlock(0, POSITIONS_PER_PAGE)); - - processPages(partitionedOutputOperator, page); - - List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); - assertThat(partition0).containsExactly(0L, 2L, 4L, 6L); - List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); - assertThat(partition1).containsExactly(1L, 3L, 5L, 7L); - } - - @Test - public void testOutputForMultipleSimplePages() - { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT).build(); - Page page1 = new Page(createLongSequenceBlock(0, POSITIONS_PER_PAGE)); - Page page2 = new Page(createLongSequenceBlock(1, POSITIONS_PER_PAGE)); - Page page3 = new Page(createLongSequenceBlock(2, POSITIONS_PER_PAGE)); - List expected = readLongs(Stream.of(page1, page2, page3), 0); - - processPages(partitionedOutputOperator, page1, page2, page3); - - List partitioned = readLongs(outputBuffer.getEnqueuedDeserialized(), 0); - assertThat(partitioned).containsExactlyInAnyOrderElementsOf(expected); // order is different due to 2 partitions joined - } - - @Test - public void testOutputForSimplePageWithReplication() - { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT).replicate().build(); - Page page = new Page(createLongsBlock(0L, 1L, 2L, 3L, null)); - - processPages(partitionedOutputOperator, page); - - List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); - assertThat(partition0).containsExactly(0L, 2L, null); - List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); - assertThat(partition1).containsExactly(0L, 1L, 3L); // position 0 copied to all partitions - } - - @Test - public void testOutputForSimplePageWithNullChannel() - { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT).withNullChannel(0).build(); - Page page = new Page(createLongsBlock(0L, 1L, 2L, 3L, null)); - - processPages(partitionedOutputOperator, page); - - List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); - assertThat(partition0).containsExactlyInAnyOrder(0L, 2L, null); - List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); - assertThat(partition1).containsExactlyInAnyOrder(1L, 3L, null); // null copied to all partitions - } - - @Test - public void testOutputForSimplePageWithPartitionConstant() - { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT) - .withPartitionConstants(ImmutableList.of(Optional.of(new NullableValue(BIGINT, 1L)))) - .withPartitionChannels(-1) - .build(); - Page page = new Page(createLongsBlock(0L, 1L, 2L, 3L, null)); - List allValues = readLongs(Stream.of(page), 0); - - processPages(partitionedOutputOperator, page); - - List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); - assertThat(partition0).isEmpty(); - List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); - assertThat(partition1).containsExactlyElementsOf(allValues); - } - - @Test - public void testOutputForSimplePageWithPartitionConstantAndHashBlock() - { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT) - .withPartitionConstants(ImmutableList.of(Optional.empty(), Optional.of(new NullableValue(BIGINT, 1L)))) - .withPartitionChannels(0, -1) // use first block and constant block at index 1 as input to partitionFunction - .withHashChannels(0, 1) // use both channels to calculate partition (a+b) mod 2 - .build(); - Page page = new Page(createLongsBlock(0L, 1L, 2L, 3L)); - - processPages(partitionedOutputOperator, page); - - List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); - assertThat(partition0).containsExactly(1L, 3L); - List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); - assertThat(partition1).containsExactly(0L, 2L); - } - - @Test - public void testPartitionPositionsWithRleNotNull() - { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT, BIGINT).build(); - Page page = new Page(createRLEBlock(0, POSITIONS_PER_PAGE), createLongSequenceBlock(0, POSITIONS_PER_PAGE)); - - processPages(partitionedOutputOperator, page); - - List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 1); - assertThat(partition0).containsExactlyElementsOf(readLongs(Stream.of(page), 1)); - List partition0HashBlock = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); - assertThat(partition0HashBlock).containsOnly(0L).hasSize(POSITIONS_PER_PAGE); - assertThat(outputBuffer.getEnqueuedDeserialized(1)).isEmpty(); - } - - @Test - public void testPartitionPositionsWithRleNotNullWithReplication() - { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT, BIGINT).replicate().build(); - Page page = new Page(createRLEBlock(0, POSITIONS_PER_PAGE), createLongSequenceBlock(0, POSITIONS_PER_PAGE)); - - processPages(partitionedOutputOperator, page); - - List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 1); - assertThat(partition0).containsExactlyElementsOf(readLongs(Stream.of(page), 1)); - List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 1); - assertThat(partition1).containsExactly(0L); // position 0 copied to all partitions - } - - @Test - public void testPartitionPositionsWithRleNullWithNullChannel() - { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT, BIGINT).withNullChannel(0).build(); - Page page = new Page(new RunLengthEncodedBlock(createLongsBlock((Long) null), POSITIONS_PER_PAGE), createLongSequenceBlock(0, POSITIONS_PER_PAGE)); - - processPages(partitionedOutputOperator, page); - - List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 1); - assertThat(partition0).containsExactlyElementsOf(readLongs(Stream.of(page), 1)); - List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 1); - assertThat(partition1).containsExactlyElementsOf(readLongs(Stream.of(page), 1)); - } - - @Test - public void testOutputForDictionaryBlock() - { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT).build(); - Page page = new Page(createLongDictionaryBlock(0, 10)); // must have at least 10 position to have non-trivial dict - - processPages(partitionedOutputOperator, page); - - List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); - assertThat(partition0).containsExactlyElementsOf(nCopies(5, 0L)); - List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); - assertThat(partition1).containsExactlyElementsOf(nCopies(5, 1L)); - } - - @Test - public void testOutputForOneValueDictionaryBlock() - { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT).build(); - Page page = new Page(new DictionaryBlock(createLongsBlock(0), new int[] {0, 0, 0, 0})); - - processPages(partitionedOutputOperator, page); - - List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); - assertThat(partition0).containsExactlyElementsOf(nCopies(4, 0L)); - List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); - assertThat(partition1).isEmpty(); - } - - @Test - public void testOutputForViewDictionaryBlock() - { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT).build(); - Page page = new Page(new DictionaryBlock(createLongSequenceBlock(4, 8), new int[] {1, 0, 3, 2})); - - processPages(partitionedOutputOperator, page); - - List partition0 = readLongs(outputBuffer.getEnqueuedDeserialized(0), 0); - assertThat(partition0).containsExactlyInAnyOrder(4L, 6L); - List partition1 = readLongs(outputBuffer.getEnqueuedDeserialized(1), 0); - assertThat(partition1).containsExactlyInAnyOrder(5L, 7L); - } - - @Test(dataProvider = "types") - public void testOutputForSimplePageWithType(Type type) - { - PartitionedOutputOperator partitionedOutputOperator = partitionedOutputOperator(BIGINT, type).build(); - Page page = new Page( - createLongSequenceBlock(0, POSITIONS_PER_PAGE), // partition block - createBlockForType(type, POSITIONS_PER_PAGE)); - List expected = readChannel(Stream.of(page), 1, type); - - processPages(partitionedOutputOperator, page); - - List partitioned = readChannel(outputBuffer.getEnqueuedDeserialized(), 1, type); - assertThat(partitioned).containsExactlyInAnyOrderElementsOf(expected); // order is different due to 2 partitions joined - } - - @DataProvider(name = "types") - public static Object[][] types() - { - return new Object[][] - { - {BIGINT}, - {BOOLEAN}, - {INTEGER}, - {createCharType(10)}, - {createUnboundedVarcharType()}, - {DOUBLE}, - {SMALLINT}, - {TINYINT}, - {UUID}, - {VARBINARY}, - {createDecimalType(1)}, - {createDecimalType(Decimals.MAX_SHORT_PRECISION + 1)}, - {new ArrayType(BIGINT)}, - {TimestampType.createTimestampType(9)}, - {TimestampType.createTimestampType(3)}, - {IPADDRESS} - }; - } - - private Block createBlockForType(Type type, int positionsPerPage) - { - return createRandomBlockForType(type, positionsPerPage, 0.2F); - } - - private static void processPages(PartitionedOutputOperator partitionedOutputOperator, Page... pages) - { - for (Page page : pages) { - partitionedOutputOperator.addInput(page); - } - partitionedOutputOperator.finish(); - } - - private static List readLongs(Stream pages, int channel) - { - return readChannel(pages, channel, BIGINT); - } - - private static List readChannel(Stream pages, int channel, Type type) - { - List result = new ArrayList<>(); - - pages.forEach(page -> { - Block block = page.getBlock(channel); - for (int i = 0; i < block.getPositionCount(); i++) { - if (block.isNull(i)) { - result.add(null); - } - else { - result.add(type.getObjectValue(null, block, i)); - } - } - }); - return unmodifiableList(result); - } - - private PartitionedOutputOperatorBuilder partitionedOutputOperator(Type... types) - { - return partitionedOutputOperator(ImmutableList.copyOf(types)); - } - - private PartitionedOutputOperatorBuilder partitionedOutputOperator(List types) - { - return partitionedOutputOperator().withTypes(types); - } - - private PartitionedOutputOperatorBuilder partitionedOutputOperator() - { - return new PartitionedOutputOperatorBuilder(executor, scheduledExecutor, outputBuffer); - } - - static class PartitionedOutputOperatorBuilder - { - public static final PositionsAppenderFactory POSITIONS_APPENDER_FACTORY = new PositionsAppenderFactory(); - private final ExecutorService executor; - private final ScheduledExecutorService scheduledExecutor; - private final OutputBuffer outputBuffer; - - private ImmutableList partitionChannels = ImmutableList.of(0); - private List> partitionConstants = ImmutableList.of(); - private PartitionFunction partitionFunction = new SumModuloPartitionFunction(PARTITION_COUNT, 0); - private boolean shouldReplicate; - private OptionalInt nullChannel = OptionalInt.empty(); - private List types; - - PartitionedOutputOperatorBuilder(ExecutorService executor, ScheduledExecutorService scheduledExecutor, OutputBuffer outputBuffer) - { - this.executor = requireNonNull(executor, "executor is null"); - this.scheduledExecutor = requireNonNull(scheduledExecutor, "scheduledExecutor is null"); - this.outputBuffer = requireNonNull(outputBuffer, "outputBuffer is null"); - } - - public PartitionedOutputOperatorBuilder withPartitionChannels(Integer... partitionChannels) - { - return withPartitionChannels(ImmutableList.copyOf(partitionChannels)); - } - - public PartitionedOutputOperatorBuilder withPartitionChannels(ImmutableList partitionChannels) - { - this.partitionChannels = partitionChannels; - return this; - } - - public PartitionedOutputOperatorBuilder withPartitionConstants(List> partitionConstants) - { - this.partitionConstants = partitionConstants; - return this; - } - - public PartitionedOutputOperatorBuilder withHashChannels(int... hashChannels) - { - return withPartitionFunction(new SumModuloPartitionFunction(PARTITION_COUNT, hashChannels)); - } - - public PartitionedOutputOperatorBuilder withPartitionFunction(PartitionFunction partitionFunction) - { - this.partitionFunction = partitionFunction; - return this; - } - - public PartitionedOutputOperatorBuilder replicate() - { - return withShouldReplicate(true); - } - - public PartitionedOutputOperatorBuilder withShouldReplicate(boolean shouldReplicate) - { - this.shouldReplicate = shouldReplicate; - return this; - } - - public PartitionedOutputOperatorBuilder withNullChannel(int nullChannel) - { - return withNullChannel(OptionalInt.of(nullChannel)); - } - - public PartitionedOutputOperatorBuilder withNullChannel(OptionalInt nullChannel) - { - this.nullChannel = nullChannel; - return this; - } - - public PartitionedOutputOperatorBuilder withTypes(List types) - { - this.types = types; - return this; - } - - public PartitionedOutputOperator build() - { - DriverContext driverContext = TestingTaskContext.builder(executor, scheduledExecutor, TEST_SESSION) - .setMemoryPoolSize(MAX_MEMORY) - .build() - .addPipelineContext(0, true, true, false) - .addDriverContext(); - - OutputBuffers buffers = OutputBuffers.createInitialEmptyOutputBuffers(PARTITIONED); - for (int partition = 0; partition < PARTITION_COUNT; partition++) { - buffers = buffers.withBuffer(new OutputBuffers.OutputBufferId(partition), partition); - } - - OutputFactory operatorFactory = new PartitionedOutputOperator.PartitionedOutputFactory( - partitionFunction, - partitionChannels, - partitionConstants, - shouldReplicate, - nullChannel, - outputBuffer, - PARTITION_MAX_MEMORY, - POSITIONS_APPENDER_FACTORY); - - return (PartitionedOutputOperator) operatorFactory - .createOutputOperator(0, new PlanNodeId("plan-node-0"), types, Function.identity(), PAGES_SERDE_FACTORY) - .createOperator(driverContext); - } - } - - private static class TestOutputBuffer - implements OutputBuffer - { - private final Multimap enqueued = ArrayListMultimap.create(); - - public Stream getEnqueuedDeserialized() - { - return getEnqueued().stream().map(PAGES_SERDE::deserialize); - } - - public List getEnqueued() - { - return ImmutableList.copyOf(enqueued.values()); - } - - public Stream getEnqueuedDeserialized(int partition) - { - return getEnqueued(partition).stream().map(PAGES_SERDE::deserialize); - } - - public List getEnqueued(int partition) - { - Collection serializedPages = enqueued.get(partition); - return serializedPages == null ? ImmutableList.of() : ImmutableList.copyOf(serializedPages); - } - - @Override - public void enqueue(int partition, List pages) - { - enqueued.putAll(partition, pages); - } - - @Override - public OutputBufferInfo getInfo() - { - return null; - } - - @Override - public BufferState getState() - { - return BufferState.NO_MORE_BUFFERS; - } - - @Override - public double getUtilization() - { - return 0; - } - - @Override - public boolean isOverutilized() - { - return false; - } - - @Override - public void addStateChangeListener(StateMachine.StateChangeListener stateChangeListener) - { - } - - @Override - public void setOutputBuffers(OutputBuffers newOutputBuffers) - { - } - - @Override - public ListenableFuture get(OutputBuffers.OutputBufferId bufferId, long token, DataSize maxSize) - { - return null; - } - - @Override - public void acknowledge(OutputBuffers.OutputBufferId bufferId, long token) - { - } - - @Override - public void destroy(OutputBuffers.OutputBufferId bufferId) - { - } - - @Override - public ListenableFuture isFull() - { - return null; - } - - @Override - public void enqueue(List pages) - { - } - - @Override - public void setNoMorePages() - { - } - - @Override - public void destroy() - { - } - - @Override - public void abort() - { - } - - @Override - public long getPeakMemoryUsage() - { - return 0; - } - - @Override - public Optional getFailureCause() - { - return Optional.empty(); - } - } - - private static class SumModuloPartitionFunction - implements PartitionFunction - { - private final int[] hashChannels; - private final int partitionCount; - - SumModuloPartitionFunction(int partitionCount, int... hashChannels) - { - checkArgument(partitionCount > 0); - this.partitionCount = partitionCount; - this.hashChannels = hashChannels; - } - - @Override - public int getPartitionCount() - { - return partitionCount; - } - - @Override - public int getPartition(Page page, int position) - { - long value = 0; - for (int i = 0; i < hashChannels.length; i++) { - value += page.getBlock(hashChannels[i]).getLong(position, 0); - } - - return toIntExact(Math.abs(value) % partitionCount); - } - } } diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java new file mode 100644 index 000000000000..7d65d107f2e3 --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java @@ -0,0 +1,363 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slices; +import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.block.BlockBuilderStatus; +import io.trino.spi.block.DictionaryBlock; +import io.trino.spi.block.PageBuilderStatus; +import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.Decimals; +import io.trino.spi.type.LongTimestamp; +import io.trino.spi.type.Type; +import io.trino.type.BlockTypeOperators; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.stream.IntStream; + +import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; +import static io.airlift.testing.Assertions.assertInstanceOf; +import static io.trino.block.BlockAssertions.assertBlockEquals; +import static io.trino.block.BlockAssertions.createArrayBigintBlock; +import static io.trino.block.BlockAssertions.createBooleansBlock; +import static io.trino.block.BlockAssertions.createDoublesBlock; +import static io.trino.block.BlockAssertions.createIntsBlock; +import static io.trino.block.BlockAssertions.createLongDecimalsBlock; +import static io.trino.block.BlockAssertions.createLongTimestampBlock; +import static io.trino.block.BlockAssertions.createLongsBlock; +import static io.trino.block.BlockAssertions.createRandomBlockForType; +import static io.trino.block.BlockAssertions.createRandomDictionaryBlock; +import static io.trino.block.BlockAssertions.createSlicesBlock; +import static io.trino.block.BlockAssertions.createSmallintsBlock; +import static io.trino.block.BlockAssertions.createStringsBlock; +import static io.trino.block.BlockAssertions.createTinyintsBlock; +import static io.trino.spi.block.DictionaryId.randomDictionaryId; +import static io.trino.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.CharType.createCharType; +import static io.trino.spi.type.DecimalType.createDecimalType; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TimestampType.createTimestampType; +import static io.trino.spi.type.TinyintType.TINYINT; +import static io.trino.spi.type.VarbinaryType.VARBINARY; +import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; +import static java.util.Objects.requireNonNull; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class TestPositionsAppender +{ + private static final PositionsAppenderFactory POSITIONS_APPENDER_FACTORY = new PositionsAppenderFactory(new BlockTypeOperators()); + + @Test(dataProvider = "types") + public void testMixedBlockTypes(Type type) + { + List input = ImmutableList.of( + input(emptyBlock(type)), + input(nullBlock(type, 3), 0, 2), + input(notNullBlock(type, 3), 1, 2), + input(partiallyNullBlock(type, 4), 0, 1, 2, 3), + input(partiallyNullBlock(type, 4)), // empty position list + input(rleBlock(type, 4), 0, 2), + input(rleBlock(type, 2), 0, 1), // rle all positions + input(nullRleBlock(type, 4), 1, 2), + input(dictionaryBlock(type, 4, 2, 0), 0, 3), // dict not null + input(dictionaryBlock(type, 8, 4, 0.5F), 1, 3, 5), // dict mixed + input(dictionaryBlock(type, 8, 4, 1), 1, 3, 5), // dict null + input(rleBlock(dictionaryBlock(type, 1, 2, 0), 3), 2), // rle -> dict + input(rleBlock(dictionaryBlock(notNullBlock(type, 2), new int[] {1}), 3), 2), // rle -> dict with position 0 mapped to > 0 + input(rleBlock(dictionaryBlock(rleBlock(type, 4), 1), 3), 1), // rle -> dict -> rle + input(dictionaryBlock(dictionaryBlock(type, 5, 4, 0.5F), 3), 2), // dict -> dict + input(dictionaryBlock(dictionaryBlock(dictionaryBlock(type, 5, 4, 0.5F), 3), 3), 2), // dict -> dict -> dict + input(dictionaryBlock(rleBlock(type, 4), 3), 0, 2)); // dict -> rle + + testAppend(type, input); + } + + @Test(dataProvider = "nullRleTypes") + public void testNullRle(Type type) + { + testNullRle(type, nullBlock(type, 2)); + testNullRle(type, nullRleBlock(type, 2)); + } + + @Test(dataProvider = "types") + public void testRleSwitchToFlat(Type type) + { + List inputs = ImmutableList.of( + input(rleBlock(type, 3), 0, 1), + input(notNullBlock(type, 2), 0, 1)); + testAppend(type, inputs); + + List dictionaryInputs = ImmutableList.of( + input(rleBlock(type, 3), 0, 1), + input(dictionaryBlock(type, 2, 4, 0.5F), 0, 1)); + testAppend(type, dictionaryInputs); + } + + @Test(dataProvider = "types") + public void testFlatAppendRle(Type type) + { + List inputs = ImmutableList.of( + input(notNullBlock(type, 2), 0, 1), + input(rleBlock(type, 3), 0, 1)); + testAppend(type, inputs); + + List dictionaryInputs = ImmutableList.of( + input(dictionaryBlock(type, 2, 4, 0.5F), 0, 1), + input(rleBlock(type, 3), 0, 1)); + testAppend(type, dictionaryInputs); + } + + @Test(dataProvider = "differentValues") + public void testMultipleRleBlocksWithDifferentValues(Type type, Block value1, Block value2) + { + List input = ImmutableList.of( + input(rleBlock(value1, 3), 0, 1), + input(rleBlock(value2, 3), 0, 1)); + testAppend(type, input); + } + + @DataProvider(name = "differentValues") + public static Object[][] differentValues() + { + return new Object[][] + { + {BIGINT, createLongsBlock(0), createLongsBlock(1)}, + {BOOLEAN, createBooleansBlock(true), createBooleansBlock(false)}, + {INTEGER, createIntsBlock(0), createIntsBlock(1)}, + {createCharType(10), createStringsBlock("0"), createStringsBlock("1")}, + {createUnboundedVarcharType(), createStringsBlock("0"), createStringsBlock("1")}, + {DOUBLE, createDoublesBlock(0D), createDoublesBlock(1D)}, + {SMALLINT, createSmallintsBlock(0), createSmallintsBlock(1)}, + {TINYINT, createTinyintsBlock(0), createTinyintsBlock(1)}, + {VARBINARY, createSlicesBlock(Slices.wrappedLongArray(0)), createSlicesBlock(Slices.wrappedLongArray(1))}, + {createDecimalType(Decimals.MAX_SHORT_PRECISION + 1), createLongDecimalsBlock("0"), createLongDecimalsBlock("1")}, + {new ArrayType(BIGINT), createArrayBigintBlock(ImmutableList.of(ImmutableList.of(0L))), createArrayBigintBlock(ImmutableList.of(ImmutableList.of(1L)))}, + { + createTimestampType(9), + createLongTimestampBlock(createTimestampType(9), new LongTimestamp(0, 0)), + createLongTimestampBlock(createTimestampType(9), new LongTimestamp(1, 0))} + }; + } + + @Test(dataProvider = "types") + public void testMultipleRleWithTheSameValueProduceRle(Type type) + { + PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type, 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); + + Block value = notNullBlock(type, 1); + positionsAppender.append(allPositions(3), rleBlock(value, 3)); + positionsAppender.append(allPositions(2), rleBlock(value, 2)); + + Block actual = positionsAppender.build(); + assertEquals(actual.getPositionCount(), 5); + assertInstanceOf(actual, RunLengthEncodedBlock.class); + } + + @DataProvider(name = "nullRleTypes") + public static Object[][] nullRleTypes() + { + return new Object[][] + { + {BIGINT}, + {BOOLEAN}, + {INTEGER}, + {createCharType(10)}, + {createUnboundedVarcharType()}, + {DOUBLE}, + {SMALLINT}, + {TINYINT}, + {VARBINARY}, + {createDecimalType(Decimals.MAX_SHORT_PRECISION + 1)}, + {createTimestampType(9)} + }; + } + + @DataProvider(name = "types") + public static Object[][] types() + { + return new Object[][] + { + {BIGINT}, + {BOOLEAN}, + {INTEGER}, + {createCharType(10)}, + {createUnboundedVarcharType()}, + {DOUBLE}, + {SMALLINT}, + {TINYINT}, + {VARBINARY}, + {createDecimalType(Decimals.MAX_SHORT_PRECISION + 1)}, + {new ArrayType(BIGINT)}, + {createTimestampType(9)} + }; + } + + private IntArrayList allPositions(int count) + { + return new IntArrayList(IntStream.range(0, count).toArray()); + } + + private BlockView input(Block block, int... positions) + { + return new BlockView(block, new IntArrayList(positions)); + } + + private DictionaryBlock dictionaryBlock(Block dictionary, int positionCount) + { + return createRandomDictionaryBlock(dictionary, positionCount); + } + + private DictionaryBlock dictionaryBlock(Block dictionary, int[] ids) + { + return new DictionaryBlock(0, ids.length, dictionary, ids, false, randomDictionaryId()); + } + + private DictionaryBlock dictionaryBlock(Type type, int positionCount, int dictionarySize, float nullRate) + { + Block dictionary = createRandomBlockForType(type, dictionarySize, nullRate); + return createRandomDictionaryBlock(dictionary, positionCount); + } + + private RunLengthEncodedBlock rleBlock(Block value, int positionCount) + { + return new RunLengthEncodedBlock(value, positionCount); + } + + private RunLengthEncodedBlock rleBlock(Type type, int positionCount) + { + Block rleValue = createRandomBlockForType(type, 1, 0); + return new RunLengthEncodedBlock(rleValue, positionCount); + } + + private RunLengthEncodedBlock nullRleBlock(Type type, int positionCount) + { + Block rleValue = nullBlock(type, 1); + return new RunLengthEncodedBlock(rleValue, positionCount); + } + + private Block partiallyNullBlock(Type type, int positionCount) + { + return createRandomBlockForType(type, positionCount, 0.5F); + } + + private Block notNullBlock(Type type, int positionCount) + { + return createRandomBlockForType(type, positionCount, 0); + } + + private Block nullBlock(Type type, int positionCount) + { + BlockBuilder blockBuilder = type.createBlockBuilder(null, positionCount); + for (int i = 0; i < positionCount; i++) { + blockBuilder.appendNull(); + } + return blockBuilder.build(); + } + + private Block emptyBlock(Type type) + { + return type.createBlockBuilder(null, 0).build(); + } + + private void testNullRle(Type type, Block source) + { + PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type, 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); + + // append twice to trigger RleAwarePositionsAppender.equalOperator call + positionsAppender.append(new IntArrayList(IntStream.range(0, source.getPositionCount()).toArray()), source); + positionsAppender.append(new IntArrayList(IntStream.range(0, source.getPositionCount()).toArray()), source); + Block actual = positionsAppender.build(); + assertTrue(actual.isNull(0)); + assertEquals(actual.getPositionCount(), source.getPositionCount() * 2); + assertInstanceOf(actual, RunLengthEncodedBlock.class); + } + + private void testAppend(Type type, List inputs) + { + PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type, 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); + long initialRetainedSize = positionsAppender.getRetainedSizeInBytes(); + + inputs.forEach(input -> positionsAppender.append(input.getPositions(), input.getBlock())); + long sizeInBytes = positionsAppender.getSizeInBytes(); + assertGreaterThanOrEqual(positionsAppender.getRetainedSizeInBytes(), sizeInBytes); + Block actual = positionsAppender.build(); + + assertBlockIsValid(actual, sizeInBytes, type, inputs); + // verify positionsAppender reset + assertEquals(positionsAppender.getSizeInBytes(), 0); + assertEquals(positionsAppender.getRetainedSizeInBytes(), initialRetainedSize); + Block secondBlock = positionsAppender.build(); + assertEquals(secondBlock.getPositionCount(), 0); + } + + private void assertBlockIsValid(Block actual, long sizeInBytes, Type type, List inputs) + { + PageBuilderStatus pageBuilderStatus = new PageBuilderStatus(); + BlockBuilderStatus blockBuilderStatus = pageBuilderStatus.createBlockBuilderStatus(); + Block expected = buildBlock(type, inputs, blockBuilderStatus); + + assertBlockEquals(type, actual, expected); + assertEquals(sizeInBytes, pageBuilderStatus.getSizeInBytes()); + } + + private Block buildBlock(Type type, List inputs, BlockBuilderStatus blockBuilderStatus) + { + BlockBuilder blockBuilder = type.createBlockBuilder(blockBuilderStatus, 10); + for (BlockView input : inputs) { + for (int position : input.getPositions()) { + type.appendTo(input.getBlock(), position, blockBuilder); + } + } + return blockBuilder.build(); + } + + private static class BlockView + { + private final Block block; + private final IntArrayList positions; + + private BlockView(Block block, IntArrayList positions) + { + this.block = requireNonNull(block, "block is null"); + this.positions = requireNonNull(positions, "positions is null"); + } + + public Block getBlock() + { + return block; + } + + public IntArrayList getPositions() + { + return positions; + } + + public void appendTo(PositionsAppender positionsAppender) + { + positionsAppender.append(getPositions(), getBlock()); + } + } +}