diff --git a/core/trino-main/src/test/java/io/trino/block/BlockAssertions.java b/core/trino-main/src/test/java/io/trino/block/BlockAssertions.java index fc6b4423efa5..cc92a63a59dc 100644 --- a/core/trino-main/src/test/java/io/trino/block/BlockAssertions.java +++ b/core/trino-main/src/test/java/io/trino/block/BlockAssertions.java @@ -355,6 +355,16 @@ public static List generateListWithNulls(int positionCount, float nullRat return unmodifiableList(result); } + public static Set chooseNullPositions(int positionCount, float nullRate) + { + int nullCount = (int) (positionCount * nullRate); + if (nullCount == 0) { + verify(nullRate == 0 || positionCount == 0, "position count %s too small to have at least one null with rate %s", (Object) positionCount, nullRate); + return ImmutableSet.of(); + } + return chooseRandomUnique(positionCount, nullCount); + } + public static Block createStringsBlock(String... values) { requireNonNull(values, "values is null"); @@ -871,16 +881,6 @@ private interface ValueWriter void write(BlockBuilder builder, T value); } - private static Set chooseNullPositions(int positionCount, float nullRate) - { - int nullCount = (int) (positionCount * nullRate); - if (nullCount == 0) { - verify(nullRate == 0, "position count %s too small to have at least one null with rate %s", (Object) positionCount, nullRate); - return ImmutableSet.of(); - } - return chooseRandomUnique(positionCount, nullCount); - } - private static Set chooseRandomUnique(int bound, int count) { if (count < bound / 10) { diff --git a/core/trino-main/src/test/java/io/trino/operator/PageTestUtils.java b/core/trino-main/src/test/java/io/trino/operator/PageTestUtils.java index 76ab4e6970c2..283e27a02735 100644 --- a/core/trino-main/src/test/java/io/trino/operator/PageTestUtils.java +++ b/core/trino-main/src/test/java/io/trino/operator/PageTestUtils.java @@ -75,24 +75,32 @@ public static Page createRandomPage( float nullRate, Optional wrapping) { - int channelCount = types.size(); - ImmutableList.Builder blocks = ImmutableList.builder(); + List blocks = types.stream() + .map(type -> { + Block block = createRandomBlockForType(type, positionCount, nullRate); + return wrapping.map(w -> w.wrap(block, positionCount)).orElse(block); + }) + .collect(toImmutableList()); - for (int i = 0; i < channelCount; i++) { - Block block = createRandomBlockForType(types.get(i), positionCount, nullRate); - blocks.add(wrapping.map(w -> w.wrap(block, positionCount)).orElse(block)); - } + return createPage(types, positionCount, hashChannels, blocks); + } - hashChannels.ifPresent(channels -> { - ImmutableList blocksWithoutHash = blocks.build(); + public static Page createPage( + List types, + int positionCount, + Optional> hashChannels, + List blocks) + { + ImmutableList.Builder finalBlocks = ImmutableList.builder().addAll(blocks); - blocks.add(getHashBlock( + hashChannels.ifPresent(channels -> { + finalBlocks.add(getHashBlock( channels.stream() .map(types::get) .collect(toImmutableList()), - channels.stream().map(blocksWithoutHash::get).toArray(Block[]::new))); + channels.stream().map(blocks::get).toArray(Block[]::new))); }); - return new Page(positionCount, blocks.build().toArray(Block[]::new)); + return new Page(positionCount, finalBlocks.build().toArray(Block[]::new)); } } diff --git a/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java b/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java index 74c815face77..0eb4f4d73d7a 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java @@ -27,12 +27,14 @@ import io.trino.memory.context.SimpleLocalMemoryContext; import io.trino.operator.BucketPartitionFunction; import io.trino.operator.DriverContext; +import io.trino.operator.PageTestUtils; import io.trino.operator.PartitionFunction; import io.trino.operator.PrecomputedHashGenerator; import io.trino.operator.output.PartitionedOutputOperator.PartitionedOutputFactory; import io.trino.spi.Page; import io.trino.spi.QueryId; import io.trino.spi.block.Block; +import io.trino.spi.block.RowBlock; import io.trino.spi.block.RunLengthEncodedBlock; import io.trino.spi.block.TestingBlockEncodingSerde; import io.trino.spi.type.ArrayType; @@ -66,6 +68,7 @@ import java.util.List; import java.util.Optional; import java.util.OptionalInt; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -79,6 +82,7 @@ import static io.airlift.units.DataSize.Unit.BYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.trino.SessionTestUtils.TEST_SESSION; +import static io.trino.block.BlockAssertions.chooseNullPositions; import static io.trino.block.BlockAssertions.createLongDictionaryBlock; import static io.trino.block.BlockAssertions.createLongsBlock; import static io.trino.block.BlockAssertions.createRLEBlock; @@ -87,9 +91,6 @@ import static io.trino.execution.buffer.OutputBuffers.BufferType.PARTITIONED; import static io.trino.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; -import static io.trino.operator.PageTestUtils.createRandomDictionaryPage; -import static io.trino.operator.PageTestUtils.createRandomPage; -import static io.trino.operator.PageTestUtils.createRandomRlePage; import static io.trino.operator.output.BenchmarkPartitionedOutputOperator.BenchmarkData.TestType; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DecimalType.createDecimalType; @@ -151,30 +152,68 @@ public static class BenchmarkData private int positionCount = DEFAULT_POSITION_COUNT; @Param({ + // Flat BIGINT data channel, flat BIGINT partition channel. "BIGINT", - "BIGINT_SKEWED_HASH", + // Flat BIGINT data channel, flat BIGINT partition channel with only 2 values. + "BIGINT_PARTITION_CHANNEL_SKEWED", + // Dictionary BIGINT data channel, flat BIGINT partition channel. "DICTIONARY_BIGINT", + // Rle BIGINT data channel, flat BIGINT partition channel. "RLE_BIGINT", + // Flat BIGINT data channel, flat BIGINT partition channel with number of distinct values equal to 20% of data page size. "BIGINT_PARTITION_CHANNEL_20_PERCENT", - "BIGINT_DICTIONARY_PARTITION_CHANNEL_20_PERCENT", - "BIGINT_DICTIONARY_PARTITION_CHANNEL_50_PERCENT", - "BIGINT_DICTIONARY_PARTITION_CHANNEL_80_PERCENT", - "BIGINT_DICTIONARY_PARTITION_CHANNEL_100_PERCENT", - "BIGINT_DICTIONARY_PARTITION_CHANNEL_100_PERCENT_MINUS_1", - "RLE_PARTITION_BIGINT", - "RLE_PARTITION_NULL_BIGINT", + // Flat BIGINT data channel, dictionary BIGINT partition channel with dictionary size equal to 20% of data page size. + // To be compared with BIGINT_PARTITION_CHANNEL_20_PERCENT. + "BIGINT_PARTITION_CHANNEL_DICTIONARY_20_PERCENT", + // Flat BIGINT data channel, dictionary BIGINT partition channel with dictionary size equal to 50% of data page size. + "BIGINT_PARTITION_CHANNEL_DICTIONARY_50_PERCENT", + // Flat BIGINT data channel, dictionary BIGINT partition channel with dictionary size equal to 80% of data page size. + "BIGINT_PARTITION_CHANNEL_DICTIONARY_80_PERCENT", + // Flat BIGINT data channel, dictionary BIGINT partition channel with dictionary size equal to data page size. + "BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT", + // Flat BIGINT data channel, dictionary BIGINT partition channel with dictionary size equal to data page size - 1. + // To be compared with BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT. + "BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT_MINUS_1", + // Flat BIGINT data channel, rle BIGINT partition channel with not null value. + "BIGINT_PARTITION_CHANNEL_RLE", + // Flat BIGINT data channel, rle BIGINT partition channel with null value. + "BIGINT_PARTITION_CHANNEL_RLE_NULL", + // Flat LONG_DECIMAL data channel, flat BIGINT partition channel. "LONG_DECIMAL", + // Dictionary LONG_DECIMAL data channel, flat BIGINT partition channel. + "DICTIONARY_LONG_DECIMAL", + // Flat INTEGER data channel, flat BIGINT partition channel. "INTEGER", + // Dictionary INTEGER data channel, flat BIGINT partition channel. + "DICTIONARY_INTEGER", + // Flat SMALLINT data channel, flat BIGINT partition channel. "SMALLINT", + // Dictionary SMALLINT data channel, flat BIGINT partition channel. + "DICTIONARY_SMALLINT", + // Flat BOOLEAN data channel, flat BIGINT partition channel. "BOOLEAN", + // Dictionary BOOLEAN data channel, flat BIGINT partition channel. + "DICTIONARY_BOOLEAN", + // Flat VARCHAR data channel, flat BIGINT partition channel. "VARCHAR", + // Dictionary VARCHAR data channel, flat BIGINT partition channel. + "DICTIONARY_VARCHAR", + // Flat array of BIGINT data channel, flat BIGINT partition channel. "ARRAY_BIGINT", + // Flat array of VARCHAR data channel, flat BIGINT partition channel. "ARRAY_VARCHAR", + // Flat array of array of BIGINT data channel, flat BIGINT partition channel. "ARRAY_ARRAY_BIGINT", + // Flat map data channel, flat BIGINT partition channel. "MAP_BIGINT_BIGINT", + // Flat map> data channel, flat BIGINT partition channel. "MAP_BIGINT_MAP_BIGINT_BIGINT", + // Flat RowType with two BIGINT fields data channel, flat BIGINT partition channel. "ROW_BIGINT_BIGINT", - "ROW_ARRAY_BIGINT_ARRAY_BIGINT" + // Flat RowType with BIGINT and array of BIGINT fields data channel, flat BIGINT partition channel. + "ROW_ARRAY_BIGINT_ARRAY_BIGINT", + // Flat RowType with rle BIGINT and flat BIGINT fields data channel, flat BIGINT partition channel. + "ROW_RLE_BIGINT_BIGINT", }) private TestType type = TestType.BIGINT; @@ -191,152 +230,114 @@ public static class BenchmarkData public enum TestType { BIGINT(BigintType.BIGINT, 5000), - BIGINT_SKEWED_HASH(BigintType.BIGINT, 5000) { - @Override - public Page createPage(List types, int positionCount, float nullRate) - { - return page( - positionCount, - types.size(), - () -> createRandomBlockForType(BigintType.BIGINT, positionCount, nullRate), - createRandomLongsBlock(positionCount, 2)); - } - }, - DICTIONARY_BIGINT(BigintType.BIGINT, 3000) { - @Override - public Page createPage(List types, int positionCount, float nullRate) - { - return createRandomDictionaryPage(types, positionCount, nullRate); - } - }, - RLE_BIGINT(BigintType.BIGINT, 3000) { - @Override - public Page createPage(List types, int positionCount, float nullRate) - { - return createRandomRlePage(types, positionCount, nullRate); - } - }, - BIGINT_PARTITION_CHANNEL_20_PERCENT(BigintType.BIGINT, 3000) { - @Override - public Page createPage(List types, int positionCount, float nullRate) - { - return page( - positionCount, - types.size(), - () -> createRandomBlockForType(BigintType.BIGINT, positionCount, nullRate), - createLongsBlock(LongStream.range(0, positionCount) - .mapToObj(value -> value % (positionCount / 5)) - .collect(toImmutableList()))); - } - }, - BIGINT_DICTIONARY_PARTITION_CHANNEL_20_PERCENT(BigintType.BIGINT, 3000) { - @Override - public Page createPage(List types, int positionCount, float nullRate) - { - return page( - positionCount, - types.size(), - () -> createRandomBlockForType(BigintType.BIGINT, positionCount, nullRate), - createLongDictionaryBlock(0, positionCount, positionCount / 5)); - } - }, - BIGINT_DICTIONARY_PARTITION_CHANNEL_50_PERCENT(BigintType.BIGINT, 3000) { - @Override - public Page createPage(List types, int positionCount, float nullRate) - { - return page( - positionCount, - types.size(), - () -> createRandomBlockForType(BigintType.BIGINT, positionCount, nullRate), - createLongDictionaryBlock(0, positionCount, positionCount / 2)); - } - }, - BIGINT_DICTIONARY_PARTITION_CHANNEL_80_PERCENT(BigintType.BIGINT, 3000) { - @Override - public Page createPage(List types, int positionCount, float nullRate) - { - return page( - positionCount, - types.size(), - () -> createRandomBlockForType(BigintType.BIGINT, positionCount, nullRate), - createLongDictionaryBlock(0, positionCount, (int) (positionCount * 0.8))); - } - }, - BIGINT_DICTIONARY_PARTITION_CHANNEL_100_PERCENT(BigintType.BIGINT, 3000) { - @Override - public Page createPage(List types, int positionCount, float nullRate) - { - return page( - positionCount, - types.size(), - () -> createRandomBlockForType(BigintType.BIGINT, positionCount, nullRate), - createLongDictionaryBlock(0, positionCount, positionCount)); - } - }, - BIGINT_DICTIONARY_PARTITION_CHANNEL_100_PERCENT_MINUS_1(BigintType.BIGINT, 3000) { - @Override - public Page createPage(List types, int positionCount, float nullRate) - { - return page( - positionCount, - types.size(), - () -> createRandomBlockForType(BigintType.BIGINT, positionCount, nullRate), - createLongDictionaryBlock(0, positionCount, positionCount - 1)); - } - }, - RLE_PARTITION_BIGINT(BigintType.BIGINT, 5000) { - @Override - public Page createPage(List types, int positionCount, float nullRate) - { - return page( - positionCount, - types.size(), - () -> createRandomBlockForType(BigintType.BIGINT, positionCount, nullRate), - createRLEBlock(42, positionCount)); - } - }, - RLE_PARTITION_NULL_BIGINT(BigintType.BIGINT, 20) { - @Override - public Page createPage(List types, int positionCount, float nullRate) - { - return page( - positionCount, - types.size(), - () -> createRandomBlockForType(BigintType.BIGINT, positionCount, nullRate), - new RunLengthEncodedBlock(createLongsBlock((Long) null), positionCount)); - } - - @Override - public OptionalInt getNullChannel() - { - return OptionalInt.of(1); - } - }, + BIGINT_PARTITION_CHANNEL_SKEWED(BigintType.BIGINT, 5000, (types, positionCount, nullRate) -> { + return page( + positionCount, + types.size(), + () -> createRandomBlockForType(BigintType.BIGINT, positionCount, nullRate), + createRandomLongsBlock(positionCount, 2)); + }), + DICTIONARY_BIGINT(BigintType.BIGINT, 5000, PageTestUtils::createRandomDictionaryPage), + RLE_BIGINT(BigintType.BIGINT, 3000, PageTestUtils::createRandomRlePage), + BIGINT_PARTITION_CHANNEL_20_PERCENT(BigintType.BIGINT, 3000, (types, positionCount, nullRate) -> { + return page( + positionCount, + types.size(), + () -> createRandomBlockForType(BigintType.BIGINT, positionCount, nullRate), + createLongsBlock(LongStream.range(0, positionCount) + .mapToObj(value -> value % (positionCount / 5)) + .collect(toImmutableList()))); + }), + BIGINT_PARTITION_CHANNEL_DICTIONARY_20_PERCENT(BigintType.BIGINT, 3000, (types, positionCount, nullRate) -> + createDictionaryPartitionChannelPage(types, positionCount, nullRate, positionCount / 5)), + BIGINT_PARTITION_CHANNEL_DICTIONARY_50_PERCENT(BigintType.BIGINT, 3000, (types, positionCount, nullRate) -> + createDictionaryPartitionChannelPage(types, positionCount, nullRate, positionCount / 2)), + BIGINT_PARTITION_CHANNEL_DICTIONARY_80_PERCENT(BigintType.BIGINT, 3000, (types, positionCount, nullRate) -> + createDictionaryPartitionChannelPage(types, positionCount, nullRate, (int) (positionCount * 0.8))), + BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT(BigintType.BIGINT, 3000, (types, positionCount, nullRate) -> + createDictionaryPartitionChannelPage(types, positionCount, nullRate, positionCount)), + BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT_MINUS_1(BigintType.BIGINT, 3000, (types, positionCount, nullRate) -> + createDictionaryPartitionChannelPage(types, positionCount, nullRate, positionCount - 1)), + BIGINT_PARTITION_CHANNEL_RLE(BigintType.BIGINT, 5000, (types, positionCount, nullRate) -> { + return page( + positionCount, + types.size(), + () -> createRandomBlockForType(BigintType.BIGINT, positionCount, nullRate), + createRLEBlock(42, positionCount)); + }), + BIGINT_PARTITION_CHANNEL_RLE_NULL(BigintType.BIGINT, 20, (types, positionCount, nullRate) -> { + return page( + positionCount, + types.size(), + () -> createRandomBlockForType(BigintType.BIGINT, positionCount, nullRate), + new RunLengthEncodedBlock(createLongsBlock((Long) null), positionCount)); + }), LONG_DECIMAL(createDecimalType(MAX_SHORT_PRECISION + 1), 5000), + DICTIONARY_LONG_DECIMAL(createDecimalType(MAX_SHORT_PRECISION + 1), 5000, PageTestUtils::createRandomDictionaryPage), INTEGER(IntegerType.INTEGER, 5000), + DICTIONARY_INTEGER(IntegerType.INTEGER, 5000, PageTestUtils::createRandomDictionaryPage), SMALLINT(SmallintType.SMALLINT, 5000), + DICTIONARY_SMALLINT(SmallintType.SMALLINT, 5000, PageTestUtils::createRandomDictionaryPage), BOOLEAN(BooleanType.BOOLEAN, 5000), + DICTIONARY_BOOLEAN(BooleanType.BOOLEAN, 5000, PageTestUtils::createRandomDictionaryPage), VARCHAR(VarcharType.VARCHAR, 5000), + DICTIONARY_VARCHAR(VarcharType.VARCHAR, 5000, PageTestUtils::createRandomDictionaryPage), ARRAY_BIGINT(new ArrayType(BigintType.BIGINT), 1000), ARRAY_VARCHAR(new ArrayType(VarcharType.VARCHAR), 1000), ARRAY_ARRAY_BIGINT(new ArrayType(new ArrayType(BigintType.BIGINT)), 1000), MAP_BIGINT_BIGINT(createMapType(BigintType.BIGINT, BigintType.BIGINT), 1000), MAP_BIGINT_MAP_BIGINT_BIGINT(createMapType(BigintType.BIGINT, createMapType(BigintType.BIGINT, BigintType.BIGINT)), 1000), ROW_BIGINT_BIGINT(rowTypeWithDefaultFieldNames(ImmutableList.of(BigintType.BIGINT, BigintType.BIGINT)), 1000), - ROW_ARRAY_BIGINT_ARRAY_BIGINT(rowTypeWithDefaultFieldNames(ImmutableList.of(new ArrayType(BigintType.BIGINT), new ArrayType(BigintType.BIGINT))), 1000); + ROW_ARRAY_BIGINT_ARRAY_BIGINT(rowTypeWithDefaultFieldNames(ImmutableList.of(new ArrayType(BigintType.BIGINT), new ArrayType(BigintType.BIGINT))), 1000), + ROW_RLE_BIGINT_BIGINT(rowTypeWithDefaultFieldNames(ImmutableList.of(BigintType.BIGINT, BigintType.BIGINT)), 1000, (types, positionCount, nullRate) -> { + return PageTestUtils.createPage( + types, + positionCount, + Optional.of(ImmutableList.of(0)), + types.stream() + .map(type -> { + boolean[] isNull = null; + int nullPositionCount = 0; + if (nullRate > 0) { + isNull = new boolean[positionCount]; + Set nullPositions = chooseNullPositions(positionCount, nullRate); + for (int nullPosition : nullPositions) { + isNull[nullPosition] = true; + } + nullPositionCount = nullPositions.size(); + } + + int notNullPositionsCount = positionCount - nullPositionCount; + return RowBlock.fromFieldBlocks( + positionCount, + Optional.ofNullable(isNull), + new Block[] { + new RunLengthEncodedBlock(createLongsBlock(-65128734213L), notNullPositionsCount), + createRandomLongsBlock(notNullPositionsCount, nullRate)}); + }) + .collect(toImmutableList())); + }); private final Type type; private final int pageCount; + private final PageGenerator pageGenerator; + TestType(Type type, int pageCount) + { + this(type, pageCount, PageTestUtils::createRandomPage); + } + + TestType(Type type, int pageCount, PageGenerator pageGenerator) { this.type = requireNonNull(type, "type is null"); this.pageCount = pageCount; + this.pageGenerator = requireNonNull(pageGenerator, "pageGenerator is null"); } - public Page createPage(List types, int positionCount, float nullRate) + public PageGenerator getPageGenerator() { - return createRandomPage(types, positionCount, nullRate); + return pageGenerator; } public int getPageCount() @@ -353,6 +354,20 @@ public List getTypes(int channelCount) { return nCopies(channelCount, type); } + + interface PageGenerator + { + Page createPage(List types, int positionCount, float nullRate); + } + + private static Page createDictionaryPartitionChannelPage(List types, int positionCount, float nullRate, int dictionarySize) + { + return page( + positionCount, + types.size(), + () -> createRandomBlockForType(types.get(0), positionCount, nullRate), + createLongDictionaryBlock(0, positionCount, dictionarySize)); + } } public int getPageCount() @@ -365,6 +380,16 @@ public void setPageCount(int pageCount) this.pageCount = pageCount; } + public void setPartitionCount(int partitionCount) + { + this.partitionCount = partitionCount; + } + + public void setPositionCount(int positionCount) + { + this.positionCount = positionCount; + } + public void setType(TestType type) { this.type = requireNonNull(type, "type is null"); @@ -388,7 +413,7 @@ private void setupData(Blackhole blackhole) // and in case of unit test it will be null this.blackhole = blackhole; types = type.getTypes(channelCount); - dataPage = type.createPage(types, positionCount, nullRate); + dataPage = type.getPageGenerator().createPage(types, positionCount, nullRate); pageCount = type.getPageCount(); nullChannel = type.getNullChannel(); types = ImmutableList.builder() @@ -535,6 +560,14 @@ private static void pollute() data.setupData(null); data.setPageCount(1); benchmark.addPage(data); + // pollute row-wise processing + data = new BenchmarkData(); + data.setType(type); + data.setPartitionCount(256); + data.setPositionCount(256); + data.setupData(null); + data.setPageCount(50); + benchmark.addPage(data); }); } catch (Throwable throwable) {