diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/ColumnarArray.java b/core/trino-spi/src/main/java/io/trino/spi/block/ColumnarArray.java index 399903b47ec4..6e61decf0689 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/ColumnarArray.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/ColumnarArray.java @@ -117,7 +117,7 @@ private static ColumnarArray toColumnarArray(RunLengthEncodedBlock rleBlock) private ColumnarArray(Block nullCheckBlock, int offsetsOffset, int[] offsets, Block elementsBlock) { - this.nullCheckBlock = nullCheckBlock; + this.nullCheckBlock = requireNonNull(nullCheckBlock, "nullCheckBlock is null"); this.offsetsOffset = offsetsOffset; this.offsets = offsets; this.elementsBlock = elementsBlock; @@ -128,6 +128,11 @@ public int getPositionCount() return nullCheckBlock.getPositionCount(); } + public boolean mayHaveNull() + { + return nullCheckBlock.mayHaveNull(); + } + public boolean isNull(int position) { return nullCheckBlock.isNull(position); diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/ColumnarMap.java b/core/trino-spi/src/main/java/io/trino/spi/block/ColumnarMap.java index 9ca7878a860d..ee8bbfc584cc 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/ColumnarMap.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/ColumnarMap.java @@ -119,7 +119,7 @@ private static ColumnarMap toColumnarMap(RunLengthEncodedBlock rleBlock) private ColumnarMap(Block nullCheckBlock, int offsetsOffset, int[] offsets, Block keysBlock, Block valuesBlock) { - this.nullCheckBlock = nullCheckBlock; + this.nullCheckBlock = requireNonNull(nullCheckBlock, "nullCheckBlock is null"); this.offsetsOffset = offsetsOffset; this.offsets = offsets; this.keysBlock = keysBlock; @@ -131,6 +131,11 @@ public int getPositionCount() return nullCheckBlock.getPositionCount(); } + public boolean mayHaveNull() + { + return nullCheckBlock.mayHaveNull(); + } + public boolean isNull(int position) { return nullCheckBlock.isNull(position); diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ArrayColumnWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ArrayColumnWriter.java index a46263d2f575..2634741980f8 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ArrayColumnWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ArrayColumnWriter.java @@ -14,8 +14,8 @@ package io.trino.parquet.writer; import com.google.common.collect.ImmutableList; -import io.trino.parquet.writer.repdef.DefLevelIterable; -import io.trino.parquet.writer.repdef.DefLevelIterables; +import io.trino.parquet.writer.repdef.DefLevelWriterProvider; +import io.trino.parquet.writer.repdef.DefLevelWriterProviders; import io.trino.parquet.writer.repdef.RepLevelIterable; import io.trino.parquet.writer.repdef.RepLevelIterables; import io.trino.spi.block.ColumnarArray; @@ -49,9 +49,9 @@ public void writeBlock(ColumnChunk columnChunk) ColumnarArray columnarArray = ColumnarArray.toColumnarArray(columnChunk.getBlock()); elementWriter.writeBlock( new ColumnChunk(columnarArray.getElementsBlock(), - ImmutableList.builder() - .addAll(columnChunk.getDefLevelIterables()) - .add(DefLevelIterables.of(columnarArray, maxDefinitionLevel)) + ImmutableList.builder() + .addAll(columnChunk.getDefLevelWriterProviders()) + .add(DefLevelWriterProviders.of(columnarArray, maxDefinitionLevel)) .build(), ImmutableList.builder() .addAll(columnChunk.getRepLevelIterables()) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ColumnChunk.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ColumnChunk.java index 463e68028864..186e5e77cba7 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ColumnChunk.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ColumnChunk.java @@ -14,7 +14,7 @@ package io.trino.parquet.writer; import com.google.common.collect.ImmutableList; -import io.trino.parquet.writer.repdef.DefLevelIterable; +import io.trino.parquet.writer.repdef.DefLevelWriterProvider; import io.trino.parquet.writer.repdef.RepLevelIterable; import io.trino.spi.block.Block; @@ -25,7 +25,7 @@ public class ColumnChunk { private final Block block; - private final List defLevelIterables; + private final List defLevelWriterProviders; private final List repLevelIterables; ColumnChunk(Block block) @@ -33,16 +33,16 @@ public class ColumnChunk this(block, ImmutableList.of(), ImmutableList.of()); } - ColumnChunk(Block block, List defLevelIterables, List repLevelIterables) + ColumnChunk(Block block, List defLevelWriterProviders, List repLevelIterables) { this.block = requireNonNull(block, "block is null"); - this.defLevelIterables = ImmutableList.copyOf(defLevelIterables); + this.defLevelWriterProviders = ImmutableList.copyOf(defLevelWriterProviders); this.repLevelIterables = ImmutableList.copyOf(repLevelIterables); } - List getDefLevelIterables() + List getDefLevelWriterProviders() { - return defLevelIterables; + return defLevelWriterProviders; } List getRepLevelIterables() diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/MapColumnWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/MapColumnWriter.java index 8102259d9218..fa8176f57717 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/MapColumnWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/MapColumnWriter.java @@ -14,8 +14,8 @@ package io.trino.parquet.writer; import com.google.common.collect.ImmutableList; -import io.trino.parquet.writer.repdef.DefLevelIterable; -import io.trino.parquet.writer.repdef.DefLevelIterables; +import io.trino.parquet.writer.repdef.DefLevelWriterProvider; +import io.trino.parquet.writer.repdef.DefLevelWriterProviders; import io.trino.parquet.writer.repdef.RepLevelIterable; import io.trino.parquet.writer.repdef.RepLevelIterables; import io.trino.spi.block.ColumnarMap; @@ -50,16 +50,16 @@ public void writeBlock(ColumnChunk columnChunk) { ColumnarMap columnarMap = ColumnarMap.toColumnarMap(columnChunk.getBlock()); - ImmutableList defLevelIterables = ImmutableList.builder() - .addAll(columnChunk.getDefLevelIterables()) - .add(DefLevelIterables.of(columnarMap, maxDefinitionLevel)).build(); + ImmutableList defLevelWriterProviders = ImmutableList.builder() + .addAll(columnChunk.getDefLevelWriterProviders()) + .add(DefLevelWriterProviders.of(columnarMap, maxDefinitionLevel)).build(); ImmutableList repLevelIterables = ImmutableList.builder() .addAll(columnChunk.getRepLevelIterables()) .add(RepLevelIterables.of(columnarMap, maxRepetitionLevel)).build(); - keyWriter.writeBlock(new ColumnChunk(columnarMap.getKeysBlock(), defLevelIterables, repLevelIterables)); - valueWriter.writeBlock(new ColumnChunk(columnarMap.getValuesBlock(), defLevelIterables, repLevelIterables)); + keyWriter.writeBlock(new ColumnChunk(columnarMap.getKeysBlock(), defLevelWriterProviders, repLevelIterables)); + valueWriter.writeBlock(new ColumnChunk(columnarMap.getValuesBlock(), defLevelWriterProviders, repLevelIterables)); } @Override diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java index b4581fb19e21..c895cb92b73f 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java @@ -15,12 +15,11 @@ import com.google.common.collect.ImmutableList; import io.airlift.slice.Slices; -import io.trino.parquet.writer.repdef.DefLevelIterable; -import io.trino.parquet.writer.repdef.DefLevelIterables; +import io.trino.parquet.writer.repdef.DefLevelWriterProvider; +import io.trino.parquet.writer.repdef.DefLevelWriterProviders; import io.trino.parquet.writer.repdef.RepLevelIterable; import io.trino.parquet.writer.repdef.RepLevelIterables; import io.trino.parquet.writer.valuewriter.PrimitiveValueWriter; -import io.trino.spi.block.Block; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; @@ -50,6 +49,8 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.parquet.writer.ParquetCompressor.getCompressor; import static io.trino.parquet.writer.ParquetDataOutput.createDataOutput; +import static io.trino.parquet.writer.repdef.DefLevelWriterProvider.DefinitionLevelWriter; +import static io.trino.parquet.writer.repdef.DefLevelWriterProvider.getRootDefinitionLevelWriter; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; @@ -116,38 +117,15 @@ public void writeBlock(ColumnChunk columnChunk) // write values primitiveValueWriter.write(columnChunk.getBlock()); - if (columnChunk.getDefLevelIterables().isEmpty()) { - // write definition levels for flat data types - Block block = columnChunk.getBlock(); - if (!block.mayHaveNull()) { - for (int position = 0; position < block.getPositionCount(); position++) { - definitionLevelWriter.writeInteger(maxDefinitionLevel); - } - } - else { - for (int position = 0; position < block.getPositionCount(); position++) { - byte isNull = (byte) (block.isNull(position) ? 1 : 0); - definitionLevelWriter.writeInteger(maxDefinitionLevel - isNull); - currentPageNullCounts += isNull; - } - } - valueCount += block.getPositionCount(); - } - else { - // write definition levels for nested data types - Iterator defIterator = DefLevelIterables.getIterator(ImmutableList.builder() - .addAll(columnChunk.getDefLevelIterables()) - .add(DefLevelIterables.of(columnChunk.getBlock(), maxDefinitionLevel)) - .build()); - while (defIterator.hasNext()) { - int next = defIterator.next(); - definitionLevelWriter.writeInteger(next); - if (next != maxDefinitionLevel) { - currentPageNullCounts++; - } - valueCount++; - } - } + List defLevelWriterProviders = ImmutableList.builder() + .addAll(columnChunk.getDefLevelWriterProviders()) + .add(DefLevelWriterProviders.of(columnChunk.getBlock(), maxDefinitionLevel)) + .build(); + DefinitionLevelWriter rootDefinitionLevelWriter = getRootDefinitionLevelWriter(defLevelWriterProviders, definitionLevelWriter); + + DefLevelWriterProvider.ValuesCount valuesCount = rootDefinitionLevelWriter.writeDefinitionLevels(); + currentPageNullCounts += valuesCount.totalValuesCount() - valuesCount.maxDefinitionLevelValuesCount(); + valueCount += valuesCount.totalValuesCount(); if (columnDescriptor.getMaxRepetitionLevel() > 0) { // write repetition levels for nested types diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/StructColumnWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/StructColumnWriter.java index ffcc08a2f921..524e282063f3 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/StructColumnWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/StructColumnWriter.java @@ -14,8 +14,8 @@ package io.trino.parquet.writer; import com.google.common.collect.ImmutableList; -import io.trino.parquet.writer.repdef.DefLevelIterable; -import io.trino.parquet.writer.repdef.DefLevelIterables; +import io.trino.parquet.writer.repdef.DefLevelWriterProvider; +import io.trino.parquet.writer.repdef.DefLevelWriterProviders; import io.trino.parquet.writer.repdef.RepLevelIterable; import io.trino.parquet.writer.repdef.RepLevelIterables; import io.trino.spi.block.Block; @@ -50,9 +50,9 @@ public void writeBlock(ColumnChunk columnChunk) ColumnarRow columnarRow = toColumnarRow(columnChunk.getBlock()); checkArgument(columnarRow.getFieldCount() == columnWriters.size(), "ColumnarRow field size %s is not equal to columnWriters size %s", columnarRow.getFieldCount(), columnWriters.size()); - List defLevelIterables = ImmutableList.builder() - .addAll(columnChunk.getDefLevelIterables()) - .add(DefLevelIterables.of(columnarRow, maxDefinitionLevel)) + List defLevelWriterProviders = ImmutableList.builder() + .addAll(columnChunk.getDefLevelWriterProviders()) + .add(DefLevelWriterProviders.of(columnarRow, maxDefinitionLevel)) .build(); List repLevelIterables = ImmutableList.builder() .addAll(columnChunk.getRepLevelIterables()) @@ -62,7 +62,7 @@ public void writeBlock(ColumnChunk columnChunk) for (int i = 0; i < columnWriters.size(); ++i) { ColumnWriter columnWriter = columnWriters.get(i); Block block = columnarRow.getField(i); - columnWriter.writeBlock(new ColumnChunk(block, defLevelIterables, repLevelIterables)); + columnWriter.writeBlock(new ColumnChunk(block, defLevelWriterProviders, repLevelIterables)); } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/repdef/DefLevelIterable.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/repdef/DefLevelIterable.java deleted file mode 100644 index 7dcc7cbca5bd..000000000000 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/repdef/DefLevelIterable.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.parquet.writer.repdef; - -import com.google.common.collect.AbstractIterator; - -import java.util.OptionalInt; - -public interface DefLevelIterable -{ - DefLevelIterator getIterator(); - - abstract class DefLevelIterator - extends AbstractIterator - { - abstract boolean end(); - } -} diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/repdef/DefLevelIterables.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/repdef/DefLevelIterables.java deleted file mode 100644 index 3710a43c9307..000000000000 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/repdef/DefLevelIterables.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.parquet.writer.repdef; - -import com.google.common.collect.AbstractIterator; -import io.trino.parquet.writer.repdef.DefLevelIterable.DefLevelIterator; -import io.trino.spi.block.Block; -import io.trino.spi.block.ColumnarArray; -import io.trino.spi.block.ColumnarMap; -import io.trino.spi.block.ColumnarRow; - -import java.util.Iterator; -import java.util.List; -import java.util.OptionalInt; - -import static com.google.common.base.Preconditions.checkState; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static java.util.Collections.nCopies; -import static java.util.Objects.requireNonNull; - -public class DefLevelIterables -{ - private DefLevelIterables() {} - - public static DefLevelIterable of(Block block, int maxDefinitionLevel) - { - return new PrimitiveDefLevelIterable(block, maxDefinitionLevel); - } - - public static DefLevelIterable of(ColumnarRow columnarRow, int maxDefinitionLevel) - { - return new ColumnRowDefLevelIterable(columnarRow, maxDefinitionLevel); - } - - public static DefLevelIterable of(ColumnarArray columnarArray, int maxDefinitionLevel) - { - return new ColumnArrayDefLevelIterable(columnarArray, maxDefinitionLevel); - } - - public static DefLevelIterable of(ColumnarMap columnarMap, int maxDefinitionLevel) - { - return new ColumnMapDefLevelIterable(columnarMap, maxDefinitionLevel); - } - - public static Iterator getIterator(List iterables) - { - return new NestedDefLevelIterator(iterables); - } - - static class PrimitiveDefLevelIterable - implements DefLevelIterable - { - private final Block block; - private final int maxDefinitionLevel; - - PrimitiveDefLevelIterable(Block block, int maxDefinitionLevel) - { - this.block = requireNonNull(block, "block is null"); - this.maxDefinitionLevel = maxDefinitionLevel; - } - - @Override - public DefLevelIterator getIterator() - { - return new DefLevelIterator() - { - private int position = -1; - - @Override - boolean end() - { - return true; - } - - @Override - protected OptionalInt computeNext() - { - position++; - if (position == block.getPositionCount()) { - return endOfData(); - } - if (block.isNull(position)) { - return OptionalInt.of(maxDefinitionLevel - 1); - } - return OptionalInt.of(maxDefinitionLevel); - } - }; - } - } - - static class ColumnRowDefLevelIterable - implements DefLevelIterable - { - private final ColumnarRow columnarRow; - private final int maxDefinitionLevel; - - ColumnRowDefLevelIterable(ColumnarRow columnarRow, int maxDefinitionLevel) - { - this.columnarRow = requireNonNull(columnarRow, "columnarRow is null"); - this.maxDefinitionLevel = maxDefinitionLevel; - } - - @Override - public DefLevelIterator getIterator() - { - return new DefLevelIterator() - { - private int position = -1; - - @Override - boolean end() - { - return true; - } - - @Override - protected OptionalInt computeNext() - { - position++; - if (position == columnarRow.getPositionCount()) { - return endOfData(); - } - if (columnarRow.isNull(position)) { - return OptionalInt.of(maxDefinitionLevel - 1); - } - return OptionalInt.empty(); - } - }; - } - } - - static class ColumnMapDefLevelIterable - implements DefLevelIterable - { - private final ColumnarMap columnarMap; - private final int maxDefinitionLevel; - - ColumnMapDefLevelIterable(ColumnarMap columnarMap, int maxDefinitionLevel) - { - this.columnarMap = requireNonNull(columnarMap, "columnarMap is null"); - this.maxDefinitionLevel = maxDefinitionLevel; - } - - @Override - public DefLevelIterator getIterator() - { - return new DefLevelIterator() - { - private int position = -1; - private Iterator iterator; - - @Override - boolean end() - { - return iterator == null || !iterator.hasNext(); - } - - @Override - protected OptionalInt computeNext() - { - if (iterator != null && iterator.hasNext()) { - return iterator.next(); - } - position++; - if (position == columnarMap.getPositionCount()) { - return endOfData(); - } - if (columnarMap.isNull(position)) { - return OptionalInt.of(maxDefinitionLevel - 2); - } - int arrayLength = columnarMap.getEntryCount(position); - if (arrayLength == 0) { - return OptionalInt.of(maxDefinitionLevel - 1); - } - iterator = nCopies(arrayLength, OptionalInt.empty()).iterator(); - return iterator.next(); - } - }; - } - } - - static class ColumnArrayDefLevelIterable - implements DefLevelIterable - { - private final ColumnarArray columnarArray; - private final int maxDefinitionLevel; - - ColumnArrayDefLevelIterable(ColumnarArray columnarArray, int maxDefinitionLevel) - { - this.columnarArray = requireNonNull(columnarArray, "columnarArray is null"); - this.maxDefinitionLevel = maxDefinitionLevel; - } - - @Override - public DefLevelIterator getIterator() - { - return new DefLevelIterator() - { - private int position = -1; - private Iterator iterator; - - @Override - boolean end() - { - return iterator == null || !iterator.hasNext(); - } - - @Override - protected OptionalInt computeNext() - { - if (iterator != null && iterator.hasNext()) { - return iterator.next(); - } - position++; - if (position == columnarArray.getPositionCount()) { - return endOfData(); - } - if (columnarArray.isNull(position)) { - return OptionalInt.of(maxDefinitionLevel - 2); - } - int arrayLength = columnarArray.getLength(position); - if (arrayLength == 0) { - return OptionalInt.of(maxDefinitionLevel - 1); - } - iterator = nCopies(arrayLength, OptionalInt.empty()).iterator(); - return iterator.next(); - } - }; - } - } - - static class NestedDefLevelIterator - extends AbstractIterator - { - private final List iterators; - private int iteratorIndex; - - NestedDefLevelIterator(List iterables) - { - this.iterators = iterables.stream().map(DefLevelIterable::getIterator).collect(toImmutableList()); - } - - @Override - protected Integer computeNext() - { - DefLevelIterator current = iterators.get(iteratorIndex); - while (iteratorIndex > 0 && current.end()) { - iteratorIndex--; - current = iterators.get(iteratorIndex); - } - - while (current.hasNext()) { - OptionalInt next = current.next(); - if (next.isPresent()) { - return next.getAsInt(); - } - iteratorIndex++; - current = iterators.get(iteratorIndex); - } - checkState(iterators.stream().noneMatch(AbstractIterator::hasNext)); - return endOfData(); - } - } -} diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/repdef/DefLevelWriterProvider.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/repdef/DefLevelWriterProvider.java new file mode 100644 index 000000000000..8b3bff8e8c97 --- /dev/null +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/repdef/DefLevelWriterProvider.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.writer.repdef; + +import com.google.common.collect.Iterables; +import org.apache.parquet.column.values.ValuesWriter; + +import java.util.List; +import java.util.Optional; + +public interface DefLevelWriterProvider +{ + DefinitionLevelWriter getDefinitionLevelWriter(Optional nestedWriter, ValuesWriter encoder); + + interface DefinitionLevelWriter + { + ValuesCount writeDefinitionLevels(int positionsCount); + + ValuesCount writeDefinitionLevels(); + } + + record ValuesCount(int totalValuesCount, int maxDefinitionLevelValuesCount) + { + } + + static DefinitionLevelWriter getRootDefinitionLevelWriter(List defLevelWriterProviders, ValuesWriter encoder) + { + // Constructs hierarchy of DefinitionLevelWriter from leaf to root + DefinitionLevelWriter rootDefinitionLevelWriter = Iterables.getLast(defLevelWriterProviders) + .getDefinitionLevelWriter(Optional.empty(), encoder); + for (int nestedLevel = defLevelWriterProviders.size() - 2; nestedLevel >= 0; nestedLevel--) { + DefinitionLevelWriter nestedWriter = rootDefinitionLevelWriter; + rootDefinitionLevelWriter = defLevelWriterProviders.get(nestedLevel) + .getDefinitionLevelWriter(Optional.of(nestedWriter), encoder); + } + return rootDefinitionLevelWriter; + } +} diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/repdef/DefLevelWriterProviders.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/repdef/DefLevelWriterProviders.java new file mode 100644 index 000000000000..7de391070c78 --- /dev/null +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/repdef/DefLevelWriterProviders.java @@ -0,0 +1,342 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.writer.repdef; + +import io.trino.spi.block.Block; +import io.trino.spi.block.ColumnarArray; +import io.trino.spi.block.ColumnarMap; +import io.trino.spi.block.ColumnarRow; +import org.apache.parquet.column.values.ValuesWriter; + +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class DefLevelWriterProviders +{ + private DefLevelWriterProviders() {} + + public static DefLevelWriterProvider of(Block block, int maxDefinitionLevel) + { + return new PrimitiveDefLevelWriterProvider(block, maxDefinitionLevel); + } + + public static DefLevelWriterProvider of(ColumnarRow columnarRow, int maxDefinitionLevel) + { + return new ColumnRowDefLevelWriterProvider(columnarRow, maxDefinitionLevel); + } + + public static DefLevelWriterProvider of(ColumnarArray columnarArray, int maxDefinitionLevel) + { + return new ColumnArrayDefLevelWriterProvider(columnarArray, maxDefinitionLevel); + } + + public static DefLevelWriterProvider of(ColumnarMap columnarMap, int maxDefinitionLevel) + { + return new ColumnMapDefLevelWriterProvider(columnarMap, maxDefinitionLevel); + } + + static class PrimitiveDefLevelWriterProvider + implements DefLevelWriterProvider + { + private final Block block; + private final int maxDefinitionLevel; + + PrimitiveDefLevelWriterProvider(Block block, int maxDefinitionLevel) + { + this.block = requireNonNull(block, "block is null"); + this.maxDefinitionLevel = maxDefinitionLevel; + } + + @Override + public DefinitionLevelWriter getDefinitionLevelWriter(Optional nestedWriter, ValuesWriter encoder) + { + checkArgument(nestedWriter.isEmpty(), "nestedWriter should be empty for primitive definition level writer"); + return new DefinitionLevelWriter() + { + private int offset; + + @Override + public ValuesCount writeDefinitionLevels() + { + return writeDefinitionLevels(block.getPositionCount()); + } + + @Override + public ValuesCount writeDefinitionLevels(int positionsCount) + { + checkValidPosition(offset, positionsCount, block.getPositionCount()); + int nonNullsCount = 0; + if (!block.mayHaveNull()) { + for (int position = offset; position < offset + positionsCount; position++) { + encoder.writeInteger(maxDefinitionLevel); + } + nonNullsCount = positionsCount; + } + else { + for (int position = offset; position < offset + positionsCount; position++) { + int isNull = block.isNull(position) ? 1 : 0; + encoder.writeInteger(maxDefinitionLevel - isNull); + nonNullsCount += isNull ^ 1; + } + } + offset += positionsCount; + return new ValuesCount(positionsCount, nonNullsCount); + } + }; + } + } + + static class ColumnRowDefLevelWriterProvider + implements DefLevelWriterProvider + { + private final ColumnarRow columnarRow; + private final int maxDefinitionLevel; + + ColumnRowDefLevelWriterProvider(ColumnarRow columnarRow, int maxDefinitionLevel) + { + this.columnarRow = requireNonNull(columnarRow, "columnarRow is null"); + this.maxDefinitionLevel = maxDefinitionLevel; + } + + @Override + public DefinitionLevelWriter getDefinitionLevelWriter(Optional nestedWriterOptional, ValuesWriter encoder) + { + checkArgument(nestedWriterOptional.isPresent(), "nestedWriter should be present for column row definition level writer"); + return new DefinitionLevelWriter() + { + private final DefinitionLevelWriter nestedWriter = nestedWriterOptional.orElseThrow(); + + private int offset; + + @Override + public ValuesCount writeDefinitionLevels() + { + return writeDefinitionLevels(columnarRow.getPositionCount()); + } + + @Override + public ValuesCount writeDefinitionLevels(int positionsCount) + { + checkValidPosition(offset, positionsCount, columnarRow.getPositionCount()); + if (!columnarRow.mayHaveNull()) { + offset += positionsCount; + return nestedWriter.writeDefinitionLevels(positionsCount); + } + int maxDefinitionValuesCount = 0; + int totalValuesCount = 0; + for (int position = offset; position < offset + positionsCount; ) { + if (columnarRow.isNull(position)) { + encoder.writeInteger(maxDefinitionLevel - 1); + totalValuesCount++; + position++; + } + else { + int consecutiveNonNullsCount = 1; + position++; + while (position < offset + positionsCount && !columnarRow.isNull(position)) { + position++; + consecutiveNonNullsCount++; + } + ValuesCount valuesCount = nestedWriter.writeDefinitionLevels(consecutiveNonNullsCount); + maxDefinitionValuesCount += valuesCount.maxDefinitionLevelValuesCount(); + totalValuesCount += valuesCount.totalValuesCount(); + } + } + offset += positionsCount; + return new ValuesCount(totalValuesCount, maxDefinitionValuesCount); + } + }; + } + } + + static class ColumnMapDefLevelWriterProvider + implements DefLevelWriterProvider + { + private final ColumnarMap columnarMap; + private final int maxDefinitionLevel; + + ColumnMapDefLevelWriterProvider(ColumnarMap columnarMap, int maxDefinitionLevel) + { + this.columnarMap = requireNonNull(columnarMap, "columnarMap is null"); + this.maxDefinitionLevel = maxDefinitionLevel; + } + + @Override + public DefinitionLevelWriter getDefinitionLevelWriter(Optional nestedWriterOptional, ValuesWriter encoder) + { + checkArgument(nestedWriterOptional.isPresent(), "nestedWriter should be present for column map definition level writer"); + return new DefinitionLevelWriter() + { + private final DefinitionLevelWriter nestedWriter = nestedWriterOptional.orElseThrow(); + + private int offset; + + @Override + public ValuesCount writeDefinitionLevels() + { + return writeDefinitionLevels(columnarMap.getPositionCount()); + } + + @Override + public ValuesCount writeDefinitionLevels(int positionsCount) + { + checkValidPosition(offset, positionsCount, columnarMap.getPositionCount()); + int maxDefinitionValuesCount = 0; + int totalValuesCount = 0; + if (!columnarMap.mayHaveNull()) { + for (int position = offset; position < offset + positionsCount; ) { + int mapLength = columnarMap.getEntryCount(position); + if (mapLength == 0) { + encoder.writeInteger(maxDefinitionLevel - 1); + totalValuesCount++; + position++; + } + else { + int consecutiveNonEmptyArrayLength = mapLength; + position++; + while (position < offset + positionsCount) { + mapLength = columnarMap.getEntryCount(position); + if (mapLength == 0) { + break; + } + position++; + consecutiveNonEmptyArrayLength += mapLength; + } + ValuesCount valuesCount = nestedWriter.writeDefinitionLevels(consecutiveNonEmptyArrayLength); + maxDefinitionValuesCount += valuesCount.maxDefinitionLevelValuesCount(); + totalValuesCount += valuesCount.totalValuesCount(); + } + } + } + else { + for (int position = offset; position < offset + positionsCount; position++) { + if (columnarMap.isNull(position)) { + encoder.writeInteger(maxDefinitionLevel - 2); + totalValuesCount++; + continue; + } + int mapLength = columnarMap.getEntryCount(position); + if (mapLength == 0) { + encoder.writeInteger(maxDefinitionLevel - 1); + totalValuesCount++; + } + else { + ValuesCount valuesCount = nestedWriter.writeDefinitionLevels(mapLength); + maxDefinitionValuesCount += valuesCount.maxDefinitionLevelValuesCount(); + totalValuesCount += valuesCount.totalValuesCount(); + } + } + } + offset += positionsCount; + return new ValuesCount(totalValuesCount, maxDefinitionValuesCount); + } + }; + } + } + + static class ColumnArrayDefLevelWriterProvider + implements DefLevelWriterProvider + { + private final ColumnarArray columnarArray; + private final int maxDefinitionLevel; + + ColumnArrayDefLevelWriterProvider(ColumnarArray columnarArray, int maxDefinitionLevel) + { + this.columnarArray = requireNonNull(columnarArray, "columnarArray is null"); + this.maxDefinitionLevel = maxDefinitionLevel; + } + + @Override + public DefinitionLevelWriter getDefinitionLevelWriter(Optional nestedWriterOptional, ValuesWriter encoder) + { + checkArgument(nestedWriterOptional.isPresent(), "nestedWriter should be present for column map definition level writer"); + return new DefinitionLevelWriter() + { + private final DefinitionLevelWriter nestedWriter = nestedWriterOptional.orElseThrow(); + + private int offset; + + @Override + public ValuesCount writeDefinitionLevels() + { + return writeDefinitionLevels(columnarArray.getPositionCount()); + } + + @Override + public ValuesCount writeDefinitionLevels(int positionsCount) + { + checkValidPosition(offset, positionsCount, columnarArray.getPositionCount()); + int maxDefinitionValuesCount = 0; + int totalValuesCount = 0; + if (!columnarArray.mayHaveNull()) { + for (int position = offset; position < offset + positionsCount; ) { + int arrayLength = columnarArray.getLength(position); + if (arrayLength == 0) { + encoder.writeInteger(maxDefinitionLevel - 1); + totalValuesCount++; + position++; + } + else { + int consecutiveNonEmptyArrayLength = arrayLength; + position++; + while (position < offset + positionsCount) { + arrayLength = columnarArray.getLength(position); + if (arrayLength == 0) { + break; + } + position++; + consecutiveNonEmptyArrayLength += arrayLength; + } + ValuesCount valuesCount = nestedWriter.writeDefinitionLevels(consecutiveNonEmptyArrayLength); + maxDefinitionValuesCount += valuesCount.maxDefinitionLevelValuesCount(); + totalValuesCount += valuesCount.totalValuesCount(); + } + } + } + else { + for (int position = offset; position < offset + positionsCount; position++) { + if (columnarArray.isNull(position)) { + encoder.writeInteger(maxDefinitionLevel - 2); + totalValuesCount++; + continue; + } + int arrayLength = columnarArray.getLength(position); + if (arrayLength == 0) { + encoder.writeInteger(maxDefinitionLevel - 1); + totalValuesCount++; + } + else { + ValuesCount valuesCount = nestedWriter.writeDefinitionLevels(arrayLength); + maxDefinitionValuesCount += valuesCount.maxDefinitionLevelValuesCount(); + totalValuesCount += valuesCount.totalValuesCount(); + } + } + } + offset += positionsCount; + return new ValuesCount(totalValuesCount, maxDefinitionValuesCount); + } + }; + } + } + + private static void checkValidPosition(int offset, int positionsCount, int totalPositionsCount) + { + if (offset < 0 || positionsCount < 0 || offset + positionsCount > totalPositionsCount) { + throw new IndexOutOfBoundsException(format("Invalid offset %s and positionsCount %s in block with %s positions", offset, positionsCount, totalPositionsCount)); + } + } +} diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestDefinitionLevelWriter.java b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestDefinitionLevelWriter.java new file mode 100644 index 000000000000..8391a5ed6d99 --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestDefinitionLevelWriter.java @@ -0,0 +1,738 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.writer; + +import com.google.common.collect.ImmutableList; +import io.trino.parquet.writer.repdef.DefLevelWriterProviders; +import io.trino.spi.block.Block; +import io.trino.spi.block.ColumnarArray; +import io.trino.spi.block.ColumnarMap; +import io.trino.spi.block.ColumnarRow; +import io.trino.spi.block.LongArrayBlock; +import io.trino.spi.type.MapType; +import io.trino.spi.type.TypeOperators; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntList; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.values.ValuesWriter; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.stream.Stream; + +import static io.trino.parquet.writer.repdef.DefLevelWriterProvider.DefinitionLevelWriter; +import static io.trino.parquet.writer.repdef.DefLevelWriterProvider.ValuesCount; +import static io.trino.parquet.writer.repdef.DefLevelWriterProvider.getRootDefinitionLevelWriter; +import static io.trino.spi.block.ArrayBlock.fromElementBlock; +import static io.trino.spi.block.ColumnarArray.toColumnarArray; +import static io.trino.spi.block.ColumnarMap.toColumnarMap; +import static io.trino.spi.block.ColumnarRow.toColumnarRow; +import static io.trino.spi.block.MapBlock.fromKeyValueBlock; +import static io.trino.spi.block.RowBlock.fromFieldBlocks; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.testing.DataProviders.toDataProvider; +import static java.lang.Math.toIntExact; +import static java.util.Collections.nCopies; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestDefinitionLevelWriter +{ + private static final int POSITIONS = 8096; + private static final Random RANDOM = new Random(42); + private static final TypeOperators TYPE_OPERATORS = new TypeOperators(); + + private static final boolean[] ALL_NULLS_ARRAY = new boolean[POSITIONS]; + private static final boolean[] RANDOM_NULLS_ARRAY = new boolean[POSITIONS]; + private static final boolean[] GROUPED_NULLS_ARRAY = new boolean[POSITIONS]; + + static { + Arrays.fill(ALL_NULLS_ARRAY, true); + for (int i = 0; i < POSITIONS; i++) { + RANDOM_NULLS_ARRAY[i] = RANDOM.nextBoolean(); + } + + int maxGroupSize = 23; + int position = 0; + while (position < POSITIONS) { + int remaining = POSITIONS - position; + int groupSize = Math.min(RANDOM.nextInt(maxGroupSize) + 1, remaining); + Arrays.fill(GROUPED_NULLS_ARRAY, position, position + groupSize, RANDOM.nextBoolean()); + position += groupSize; + } + } + + @Test(dataProvider = "primitiveBlockProvider") + public void testWritePrimitiveDefinitionLevels(PrimitiveBlockProvider blockProvider) + { + Block block = blockProvider.getInputBlock(); + int maxDefinitionLevel = 3; + // Write definition levels for all positions + assertDefinitionLevels(block, ImmutableList.of(), maxDefinitionLevel); + + // Write definition levels for all positions one-at-a-time + assertDefinitionLevels(block, nCopies(block.getPositionCount(), 1), maxDefinitionLevel); + + // Write definition levels for all positions with different group sizes + assertDefinitionLevels(block, generateGroupSizes(block.getPositionCount()), maxDefinitionLevel); + } + + @DataProvider + public static Object[][] primitiveBlockProvider() + { + return Stream.of(PrimitiveBlockProvider.values()) + .collect(toDataProvider()); + } + + private enum PrimitiveBlockProvider + { + NO_NULLS { + @Override + Block getInputBlock() + { + return new LongArrayBlock(POSITIONS, Optional.empty(), new long[POSITIONS]); + } + }, + NO_NULLS_WITH_MAY_HAVE_NULL { + @Override + Block getInputBlock() + { + return new LongArrayBlock(POSITIONS, Optional.of(new boolean[POSITIONS]), new long[POSITIONS]); + } + }, + ALL_NULLS { + @Override + Block getInputBlock() + { + return new LongArrayBlock(POSITIONS, Optional.of(ALL_NULLS_ARRAY), new long[POSITIONS]); + } + }, + RANDOM_NULLS { + @Override + Block getInputBlock() + { + return new LongArrayBlock(POSITIONS, Optional.of(RANDOM_NULLS_ARRAY), new long[POSITIONS]); + } + }, + GROUPED_NULLS { + @Override + Block getInputBlock() + { + return new LongArrayBlock(POSITIONS, Optional.of(GROUPED_NULLS_ARRAY), new long[POSITIONS]); + } + }; + + abstract Block getInputBlock(); + } + + @Test(dataProvider = "rowBlockProvider") + public void testWriteRowDefinitionLevels(RowBlockProvider blockProvider) + { + ColumnarRow columnarRow = toColumnarRow(blockProvider.getInputBlock()); + int fieldMaxDefinitionLevel = 2; + // Write definition levels for all positions + for (int field = 0; field < columnarRow.getFieldCount(); field++) { + assertDefinitionLevels(columnarRow, ImmutableList.of(), field, fieldMaxDefinitionLevel); + } + + // Write definition levels for all positions one-at-a-time + for (int field = 0; field < columnarRow.getFieldCount(); field++) { + assertDefinitionLevels( + columnarRow, + nCopies(columnarRow.getPositionCount(), 1), + field, + fieldMaxDefinitionLevel); + } + + // Write definition levels for all positions with different group sizes + for (int field = 0; field < columnarRow.getFieldCount(); field++) { + assertDefinitionLevels( + columnarRow, + generateGroupSizes(columnarRow.getPositionCount()), + field, + fieldMaxDefinitionLevel); + } + } + + @DataProvider + public static Object[][] rowBlockProvider() + { + return Stream.of(RowBlockProvider.values()) + .collect(toDataProvider()); + } + + private enum RowBlockProvider + { + NO_NULLS { + @Override + Block getInputBlock() + { + return createRowBlock(Optional.empty()); + } + }, + NO_NULLS_WITH_MAY_HAVE_NULL { + @Override + Block getInputBlock() + { + return createRowBlock(Optional.of(new boolean[POSITIONS])); + } + }, + ALL_NULLS { + @Override + Block getInputBlock() + { + return createRowBlock(Optional.of(ALL_NULLS_ARRAY)); + } + }, + RANDOM_NULLS { + @Override + Block getInputBlock() + { + return createRowBlock(Optional.of(RANDOM_NULLS_ARRAY)); + } + }, + GROUPED_NULLS { + @Override + Block getInputBlock() + { + return createRowBlock(Optional.of(GROUPED_NULLS_ARRAY)); + } + }; + + abstract Block getInputBlock(); + + private static Block createRowBlock(Optional rowIsNull) + { + int positionCount = rowIsNull.map(isNull -> isNull.length).orElse(0) - toIntExact(rowIsNull.stream().count()); + int fieldCount = 4; + Block[] fieldBlocks = new Block[fieldCount]; + // no nulls block + fieldBlocks[0] = new LongArrayBlock(positionCount, Optional.empty(), new long[positionCount]); + // no nulls with mayHaveNull block + fieldBlocks[1] = new LongArrayBlock(positionCount, Optional.of(new boolean[positionCount]), new long[positionCount]); + // all nulls block + boolean[] allNulls = new boolean[positionCount]; + Arrays.fill(allNulls, false); + fieldBlocks[2] = new LongArrayBlock(positionCount, Optional.of(allNulls), new long[positionCount]); + // random nulls block + fieldBlocks[3] = createLongsBlockWithRandomNulls(positionCount); + + return fromFieldBlocks(positionCount, rowIsNull, fieldBlocks); + } + } + + @Test(dataProvider = "arrayBlockProvider") + public void testWriteArrayDefinitionLevels(ArrayBlockProvider blockProvider) + { + ColumnarArray columnarArray = toColumnarArray(blockProvider.getInputBlock()); + int maxDefinitionLevel = 3; + // Write definition levels for all positions + assertDefinitionLevels( + columnarArray, + ImmutableList.of(), + maxDefinitionLevel); + + // Write definition levels for all positions one-at-a-time + assertDefinitionLevels( + columnarArray, + nCopies(columnarArray.getPositionCount(), 1), + maxDefinitionLevel); + + // Write definition levels for all positions with different group sizes + assertDefinitionLevels( + columnarArray, + generateGroupSizes(columnarArray.getPositionCount()), + maxDefinitionLevel); + } + + @DataProvider + public static Object[][] arrayBlockProvider() + { + return Stream.of(ArrayBlockProvider.values()) + .collect(toDataProvider()); + } + + private enum ArrayBlockProvider + { + NO_NULLS { + @Override + Block getInputBlock() + { + return createArrayBlock(Optional.empty()); + } + }, + NO_NULLS_WITH_MAY_HAVE_NULL { + @Override + Block getInputBlock() + { + return createArrayBlock(Optional.of(new boolean[POSITIONS])); + } + }, + ALL_NULLS { + @Override + Block getInputBlock() + { + return createArrayBlock(Optional.of(ALL_NULLS_ARRAY)); + } + }, + RANDOM_NULLS { + @Override + Block getInputBlock() + { + return createArrayBlock(Optional.of(RANDOM_NULLS_ARRAY)); + } + }, + GROUPED_NULLS { + @Override + Block getInputBlock() + { + return createArrayBlock(Optional.of(GROUPED_NULLS_ARRAY)); + } + }; + + abstract Block getInputBlock(); + + private static Block createArrayBlock(Optional valueIsNull) + { + int[] arrayOffset = generateOffsets(valueIsNull); + return fromElementBlock(POSITIONS, valueIsNull, arrayOffset, createLongsBlockWithRandomNulls(arrayOffset[POSITIONS])); + } + } + + @Test(dataProvider = "mapBlockProvider") + public void testWriteMapDefinitionLevels(MapBlockProvider blockProvider) + { + ColumnarMap columnarMap = toColumnarMap(blockProvider.getInputBlock()); + int keysMaxDefinitionLevel = 2; + int valuesMaxDefinitionLevel = 3; + // Write definition levels for all positions + assertDefinitionLevels( + columnarMap, + ImmutableList.of(), + keysMaxDefinitionLevel, + valuesMaxDefinitionLevel); + + // Write definition levels for all positions one-at-a-time + assertDefinitionLevels( + columnarMap, + nCopies(columnarMap.getPositionCount(), 1), + keysMaxDefinitionLevel, + valuesMaxDefinitionLevel); + + // Write definition levels for all positions with different group sizes + assertDefinitionLevels( + columnarMap, + generateGroupSizes(columnarMap.getPositionCount()), + keysMaxDefinitionLevel, + valuesMaxDefinitionLevel); + } + + @DataProvider + public static Object[][] mapBlockProvider() + { + return Stream.of(MapBlockProvider.values()) + .collect(toDataProvider()); + } + + private enum MapBlockProvider + { + NO_NULLS { + @Override + Block getInputBlock() + { + return createMapBlock(Optional.empty()); + } + }, + NO_NULLS_WITH_MAY_HAVE_NULL { + @Override + Block getInputBlock() + { + return createMapBlock(Optional.of(new boolean[POSITIONS])); + } + }, + ALL_NULLS { + @Override + Block getInputBlock() + { + return createMapBlock(Optional.of(ALL_NULLS_ARRAY)); + } + }, + RANDOM_NULLS { + @Override + Block getInputBlock() + { + return createMapBlock(Optional.of(RANDOM_NULLS_ARRAY)); + } + }, + GROUPED_NULLS { + @Override + Block getInputBlock() + { + return createMapBlock(Optional.of(GROUPED_NULLS_ARRAY)); + } + }; + + abstract Block getInputBlock(); + + private static Block createMapBlock(Optional mapIsNull) + { + int[] offsets = generateOffsets(mapIsNull); + int positionCount = offsets[POSITIONS]; + Block keyBlock = new LongArrayBlock(positionCount, Optional.empty(), new long[positionCount]); + Block valueBlock = createLongsBlockWithRandomNulls(positionCount); + return fromKeyValueBlock(mapIsNull, offsets, keyBlock, valueBlock, new MapType(BIGINT, BIGINT, TYPE_OPERATORS)); + } + } + + private static class TestingValuesWriter + extends ValuesWriter + { + private final IntList values = new IntArrayList(); + + @Override + public long getBufferedSize() + { + throw new UnsupportedOperationException(); + } + + @Override + public BytesInput getBytes() + { + throw new UnsupportedOperationException(); + } + + @Override + public Encoding getEncoding() + { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() + { + throw new UnsupportedOperationException(); + } + + @Override + public long getAllocatedSize() + { + throw new UnsupportedOperationException(); + } + + @Override + public String memUsageString(String prefix) + { + throw new UnsupportedOperationException(); + } + + @Override + public void writeInteger(int v) + { + values.add(v); + } + + List getWrittenValues() + { + return values; + } + } + + private static void assertDefinitionLevels(Block block, List writePositionCounts, int maxDefinitionLevel) + { + TestingValuesWriter valuesWriter = new TestingValuesWriter(); + DefinitionLevelWriter primitiveDefLevelWriter = DefLevelWriterProviders.of(block, maxDefinitionLevel) + .getDefinitionLevelWriter(Optional.empty(), valuesWriter); + ValuesCount primitiveValuesCount; + if (writePositionCounts.isEmpty()) { + primitiveValuesCount = primitiveDefLevelWriter.writeDefinitionLevels(); + } + else { + int totalValuesCount = 0; + int maxDefinitionLevelValuesCount = 0; + for (int position = 0; position < block.getPositionCount(); position++) { + ValuesCount valuesCount = primitiveDefLevelWriter.writeDefinitionLevels(1); + totalValuesCount += valuesCount.totalValuesCount(); + maxDefinitionLevelValuesCount += valuesCount.maxDefinitionLevelValuesCount(); + } + primitiveValuesCount = new ValuesCount(totalValuesCount, maxDefinitionLevelValuesCount); + } + + int maxDefinitionValuesCount = 0; + ImmutableList.Builder expectedDefLevelsBuilder = ImmutableList.builder(); + for (int position = 0; position < block.getPositionCount(); position++) { + if (block.isNull(position)) { + expectedDefLevelsBuilder.add(maxDefinitionLevel - 1); + } + else { + expectedDefLevelsBuilder.add(maxDefinitionLevel); + maxDefinitionValuesCount++; + } + } + assertThat(primitiveValuesCount.totalValuesCount()).isEqualTo(block.getPositionCount()); + assertThat(primitiveValuesCount.maxDefinitionLevelValuesCount()).isEqualTo(maxDefinitionValuesCount); + assertThat(valuesWriter.getWrittenValues()).isEqualTo(expectedDefLevelsBuilder.build()); + } + + private static void assertDefinitionLevels( + ColumnarRow columnarRow, + List writePositionCounts, + int field, + int maxDefinitionLevel) + { + // Write definition levels + TestingValuesWriter valuesWriter = new TestingValuesWriter(); + DefinitionLevelWriter fieldRootDefLevelWriter = getRootDefinitionLevelWriter( + ImmutableList.of( + DefLevelWriterProviders.of(columnarRow, maxDefinitionLevel - 1), + DefLevelWriterProviders.of(columnarRow.getField(field), maxDefinitionLevel)), + valuesWriter); + ValuesCount fieldValuesCount; + if (writePositionCounts.isEmpty()) { + fieldValuesCount = fieldRootDefLevelWriter.writeDefinitionLevels(); + } + else { + int totalValuesCount = 0; + int maxDefinitionLevelValuesCount = 0; + for (int positionsCount : writePositionCounts) { + ValuesCount valuesCount = fieldRootDefLevelWriter.writeDefinitionLevels(positionsCount); + totalValuesCount += valuesCount.totalValuesCount(); + maxDefinitionLevelValuesCount += valuesCount.maxDefinitionLevelValuesCount(); + } + fieldValuesCount = new ValuesCount(totalValuesCount, maxDefinitionLevelValuesCount); + } + + // Verify written definition levels + int maxDefinitionValuesCount = 0; + ImmutableList.Builder expectedDefLevelsBuilder = ImmutableList.builder(); + int fieldOffset = 0; + for (int position = 0; position < columnarRow.getPositionCount(); position++) { + if (columnarRow.isNull(position)) { + expectedDefLevelsBuilder.add(maxDefinitionLevel - 2); + continue; + } + Block fieldBlock = columnarRow.getField(field); + if (fieldBlock.isNull(fieldOffset)) { + expectedDefLevelsBuilder.add(maxDefinitionLevel - 1); + } + else { + expectedDefLevelsBuilder.add(maxDefinitionLevel); + maxDefinitionValuesCount++; + } + fieldOffset++; + } + assertThat(fieldValuesCount.totalValuesCount()).isEqualTo(columnarRow.getPositionCount()); + assertThat(fieldValuesCount.maxDefinitionLevelValuesCount()).isEqualTo(maxDefinitionValuesCount); + assertThat(valuesWriter.getWrittenValues()).isEqualTo(expectedDefLevelsBuilder.build()); + } + + private static void assertDefinitionLevels( + ColumnarArray columnarArray, + List writePositionCounts, + int maxDefinitionLevel) + { + // Write definition levels + TestingValuesWriter valuesWriter = new TestingValuesWriter(); + DefinitionLevelWriter elementsRootDefLevelWriter = getRootDefinitionLevelWriter( + ImmutableList.of( + DefLevelWriterProviders.of(columnarArray, maxDefinitionLevel - 1), + DefLevelWriterProviders.of(columnarArray.getElementsBlock(), maxDefinitionLevel)), + valuesWriter); + ValuesCount elementsValuesCount; + if (writePositionCounts.isEmpty()) { + elementsValuesCount = elementsRootDefLevelWriter.writeDefinitionLevels(); + } + else { + int totalValuesCount = 0; + int maxDefinitionLevelValuesCount = 0; + for (int positionsCount : writePositionCounts) { + ValuesCount valuesCount = elementsRootDefLevelWriter.writeDefinitionLevels(positionsCount); + totalValuesCount += valuesCount.totalValuesCount(); + maxDefinitionLevelValuesCount += valuesCount.maxDefinitionLevelValuesCount(); + } + elementsValuesCount = new ValuesCount(totalValuesCount, maxDefinitionLevelValuesCount); + } + + // Verify written definition levels + int maxDefinitionValuesCount = 0; + int totalValuesCount = 0; + ImmutableList.Builder expectedDefLevelsBuilder = ImmutableList.builder(); + int elementsOffset = 0; + for (int position = 0; position < columnarArray.getPositionCount(); position++) { + if (columnarArray.isNull(position)) { + expectedDefLevelsBuilder.add(maxDefinitionLevel - 3); + totalValuesCount++; + continue; + } + int arrayLength = columnarArray.getLength(position); + if (arrayLength == 0) { + expectedDefLevelsBuilder.add(maxDefinitionLevel - 2); + totalValuesCount++; + continue; + } + totalValuesCount += arrayLength; + Block elementsBlock = columnarArray.getElementsBlock(); + for (int i = elementsOffset; i < elementsOffset + arrayLength; i++) { + if (elementsBlock.isNull(i)) { + expectedDefLevelsBuilder.add(maxDefinitionLevel - 1); + } + else { + expectedDefLevelsBuilder.add(maxDefinitionLevel); + maxDefinitionValuesCount++; + } + } + elementsOffset += arrayLength; + } + assertThat(elementsValuesCount.totalValuesCount()).isEqualTo(totalValuesCount); + assertThat(elementsValuesCount.maxDefinitionLevelValuesCount()).isEqualTo(maxDefinitionValuesCount); + assertThat(valuesWriter.getWrittenValues()).isEqualTo(expectedDefLevelsBuilder.build()); + } + + private static void assertDefinitionLevels( + ColumnarMap columnarMap, + List writePositionCounts, + int keysMaxDefinitionLevel, + int valuesMaxDefinitionLevel) + { + // Write definition levels for map keys + TestingValuesWriter keysWriter = new TestingValuesWriter(); + DefinitionLevelWriter keysRootDefLevelWriter = getRootDefinitionLevelWriter( + ImmutableList.of( + DefLevelWriterProviders.of(columnarMap, keysMaxDefinitionLevel), + DefLevelWriterProviders.of(columnarMap.getKeysBlock(), keysMaxDefinitionLevel)), + keysWriter); + ValuesCount keysValueCount; + if (writePositionCounts.isEmpty()) { + keysValueCount = keysRootDefLevelWriter.writeDefinitionLevels(); + } + else { + int totalValuesCount = 0; + int maxDefinitionLevelValuesCount = 0; + for (int positionsCount : writePositionCounts) { + ValuesCount valuesCount = keysRootDefLevelWriter.writeDefinitionLevels(positionsCount); + totalValuesCount += valuesCount.totalValuesCount(); + maxDefinitionLevelValuesCount += valuesCount.maxDefinitionLevelValuesCount(); + } + keysValueCount = new ValuesCount(totalValuesCount, maxDefinitionLevelValuesCount); + } + + // Write definition levels for map values + TestingValuesWriter valuesWriter = new TestingValuesWriter(); + DefinitionLevelWriter valuesRootDefLevelWriter = getRootDefinitionLevelWriter( + ImmutableList.of( + DefLevelWriterProviders.of(columnarMap, keysMaxDefinitionLevel), + DefLevelWriterProviders.of(columnarMap.getValuesBlock(), valuesMaxDefinitionLevel)), + valuesWriter); + ValuesCount valuesValueCount; + if (writePositionCounts.isEmpty()) { + valuesValueCount = valuesRootDefLevelWriter.writeDefinitionLevels(); + } + else { + int totalValuesCount = 0; + int maxDefinitionLevelValuesCount = 0; + for (int positionsCount : writePositionCounts) { + ValuesCount valuesCount = valuesRootDefLevelWriter.writeDefinitionLevels(positionsCount); + totalValuesCount += valuesCount.totalValuesCount(); + maxDefinitionLevelValuesCount += valuesCount.maxDefinitionLevelValuesCount(); + } + valuesValueCount = new ValuesCount(totalValuesCount, maxDefinitionLevelValuesCount); + } + + // Verify written definition levels + int maxDefinitionKeysCount = 0; + int maxDefinitionValuesCount = 0; + int totalValuesCount = 0; + ImmutableList.Builder keysExpectedDefLevelsBuilder = ImmutableList.builder(); + ImmutableList.Builder valuesExpectedDefLevelsBuilder = ImmutableList.builder(); + int valuesOffset = 0; + for (int position = 0; position < columnarMap.getPositionCount(); position++) { + if (columnarMap.isNull(position)) { + keysExpectedDefLevelsBuilder.add(keysMaxDefinitionLevel - 2); + valuesExpectedDefLevelsBuilder.add(valuesMaxDefinitionLevel - 3); + totalValuesCount++; + continue; + } + int mapLength = columnarMap.getEntryCount(position); + if (mapLength == 0) { + keysExpectedDefLevelsBuilder.add(keysMaxDefinitionLevel - 1); + valuesExpectedDefLevelsBuilder.add(valuesMaxDefinitionLevel - 2); + totalValuesCount++; + continue; + } + totalValuesCount += mapLength; + // Map keys cannot be null + keysExpectedDefLevelsBuilder.addAll(nCopies(mapLength, keysMaxDefinitionLevel)); + maxDefinitionKeysCount += mapLength; + Block valuesBlock = columnarMap.getValuesBlock(); + for (int i = valuesOffset; i < valuesOffset + mapLength; i++) { + if (valuesBlock.isNull(i)) { + valuesExpectedDefLevelsBuilder.add(valuesMaxDefinitionLevel - 1); + } + else { + valuesExpectedDefLevelsBuilder.add(valuesMaxDefinitionLevel); + maxDefinitionValuesCount++; + } + } + valuesOffset += mapLength; + } + assertThat(keysValueCount.totalValuesCount()).isEqualTo(totalValuesCount); + assertThat(keysValueCount.maxDefinitionLevelValuesCount()).isEqualTo(maxDefinitionKeysCount); + assertThat(keysWriter.getWrittenValues()).isEqualTo(keysExpectedDefLevelsBuilder.build()); + + assertThat(valuesValueCount.totalValuesCount()).isEqualTo(totalValuesCount); + assertThat(valuesValueCount.maxDefinitionLevelValuesCount()).isEqualTo(maxDefinitionValuesCount); + assertThat(valuesWriter.getWrittenValues()).isEqualTo(valuesExpectedDefLevelsBuilder.build()); + } + + private static List generateGroupSizes(int positionsCount) + { + int maxGroupSize = 17; + int offset = 0; + ImmutableList.Builder groupsBuilder = ImmutableList.builder(); + while (offset < positionsCount) { + int remaining = positionsCount - offset; + int groupSize = Math.min(RANDOM.nextInt(maxGroupSize) + 1, remaining); + groupsBuilder.add(groupSize); + offset += groupSize; + } + return groupsBuilder.build(); + } + + private static int[] generateOffsets(Optional valueIsNull) + { + int maxCardinality = 7; // array length or map size at the current position + int[] offsets = new int[POSITIONS + 1]; + for (int position = 0; position < POSITIONS; position++) { + if (valueIsNull.isPresent() && valueIsNull.get()[position]) { + offsets[position + 1] = offsets[position]; + } + else { + offsets[position + 1] = offsets[position] + RANDOM.nextInt(maxCardinality); + } + } + return offsets; + } + + private static Block createLongsBlockWithRandomNulls(int positionCount) + { + boolean[] valueIsNull = new boolean[positionCount]; + for (int i = 0; i < positionCount; i++) { + valueIsNull[i] = RANDOM.nextBoolean(); + } + return new LongArrayBlock(positionCount, Optional.of(valueIsNull), new long[positionCount]); + } +}