diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9aad64901d9e..d268fd0e27af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1009,6 +1009,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED = + buildConf("spark.sql.parquet.enableNestedColumnVectorizedReader") + .doc("Enables vectorized Parquet decoding for nested columns (e.g., struct, list, map). " + + s"Requires ${PARQUET_VECTORIZED_READER_ENABLED.key} to be enabled.") + .version("3.3.0") + .booleanConf + .createWithDefault(true) + val PARQUET_RECORD_FILTER_ENABLED = buildConf("spark.sql.parquet.recordLevelFilter.enabled") .doc("If true, enables Parquet's native record-level filtering using the pushed down " + "filters. " + @@ -3926,6 +3934,9 @@ class SQLConf extends Serializable with Logging { def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED) + def parquetVectorizedReaderNestedColumnEnabled: Boolean = + getConf(PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED) + def parquetVectorizedReaderBatchSize: Int = getConf(PARQUET_VECTORIZED_READER_BATCH_SIZE) def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java new file mode 100644 index 000000000000..4b29520d30ff --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Preconditions; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.StructType; + +/** + * Contains necessary information representing a Parquet column, either of primitive or nested type. + */ +final class ParquetColumnVector { + private final ParquetColumn column; + private final List children; + private final WritableColumnVector vector; + + /** + * Repetition & Definition levels + * These are allocated only for leaf columns; for non-leaf columns, they simply maintain + * references to that of the former. + */ + private WritableColumnVector repetitionLevels; + private WritableColumnVector definitionLevels; + + /** Whether this column is primitive (i.e., leaf column) */ + private final boolean isPrimitive; + + /** Reader for this column - only set if 'isPrimitive' is true */ + private VectorizedColumnReader columnReader; + + ParquetColumnVector( + ParquetColumn column, + WritableColumnVector vector, + int capacity, + MemoryMode memoryMode, + Set missingColumns) { + + DataType sparkType = column.sparkType(); + if (!sparkType.sameType(vector.dataType())) { + throw new IllegalArgumentException("Spark type: " + sparkType + + " doesn't match the type: " + vector.dataType() + " in column vector"); + } + + this.column = column; + this.vector = vector; + this.children = new ArrayList<>(); + this.isPrimitive = column.isPrimitive(); + + if (missingColumns.contains(column)) { + vector.setAllNull(); + return; + } + + if (isPrimitive) { + // TODO: avoid allocating these if not necessary, for instance, the node is of top-level + // and is not repeated, or the node is not top-level but its max repetition level is 0. + repetitionLevels = allocateLevelsVector(capacity, memoryMode); + definitionLevels = allocateLevelsVector(capacity, memoryMode); + } else { + Preconditions.checkArgument(column.children().size() == vector.getNumChildren()); + for (int i = 0; i < column.children().size(); i++) { + ParquetColumnVector childCv = new ParquetColumnVector(column.children().apply(i), + vector.getChild(i), capacity, memoryMode, missingColumns); + children.add(childCv); + + // Only use levels from non-missing child, this can happen if only some but not all + // fields of a struct are missing. + if (!childCv.vector.isAllNull()) { + this.repetitionLevels = childCv.repetitionLevels; + this.definitionLevels = childCv.definitionLevels; + } + } + + // This can happen if all the fields of a struct are missing, in which case we should mark + // the struct itself as a missing column + if (repetitionLevels == null) { + vector.setAllNull(); + } + } + } + + /** + * Returns all the children of this column. + */ + List getChildren() { + return children; + } + + /** + * Returns all the leaf columns in depth-first order. + */ + List getLeaves() { + List result = new ArrayList<>(); + getLeavesHelper(this, result); + return result; + } + + private static void getLeavesHelper(ParquetColumnVector vector, List coll) { + if (vector.isPrimitive) { + coll.add(vector); + } else { + for (ParquetColumnVector child : vector.children) { + getLeavesHelper(child, coll); + } + } + } + + /** + * Assembles this column and calculate collection offsets recursively. + * This is a no-op for primitive columns. + */ + void assemble() { + // nothing to do if the column itself is missing + if (vector.isAllNull()) return; + + DataType type = column.sparkType(); + if (type instanceof ArrayType || type instanceof MapType) { + for (ParquetColumnVector child : children) { + child.assemble(); + } + assembleCollection(); + } else if (type instanceof StructType) { + for (ParquetColumnVector child : children) { + child.assemble(); + } + assembleStruct(); + } + } + + /** + * Resets this Parquet column vector, which includes resetting all the writable column vectors + * (used to store values, definition levels, and repetition levels) for this and all its children. + */ + void reset() { + // nothing to do if the column itself is missing + if (vector.isAllNull()) return; + + vector.reset(); + repetitionLevels.reset(); + definitionLevels.reset(); + for (ParquetColumnVector child : children) { + child.reset(); + } + } + + /** + * Returns the {@link ParquetColumn} of this column vector. + */ + ParquetColumn getColumn() { + return this.column; + } + + /** + * Returns the writable column vector used to store values. + */ + WritableColumnVector getValueVector() { + return this.vector; + } + + /** + * Returns the writable column vector used to store repetition levels. + */ + WritableColumnVector getRepetitionLevelVector() { + return this.repetitionLevels; + } + + /** + * Returns the writable column vector used to store definition levels. + */ + WritableColumnVector getDefinitionLevelVector() { + return this.definitionLevels; + } + + /** + * Returns the column reader for reading a Parquet column. + */ + VectorizedColumnReader getColumnReader() { + return this.columnReader; + } + + /** + * Sets the column vector to 'reader'. Note this can only be called on a primitive Parquet + * column. + */ + void setColumnReader(VectorizedColumnReader reader) { + if (!isPrimitive) { + throw new IllegalStateException("Can't set reader for non-primitive column"); + } + this.columnReader = reader; + } + + /** + * Assemble collections, e.g., array, map. + */ + private void assembleCollection() { + int maxDefinitionLevel = column.definitionLevel(); + int maxElementRepetitionLevel = column.repetitionLevel(); + + // There are 4 cases when calculating definition levels: + // 1. definitionLevel == maxDefinitionLevel + // ==> value is defined and not null + // 2. definitionLevel == maxDefinitionLevel - 1 + // ==> value is null + // 3. definitionLevel < maxDefinitionLevel - 1 + // ==> value doesn't exist since one of its optional parents is null + // 4. definitionLevel > maxDefinitionLevel + // ==> value is a nested element within an array or map + // + // `i` is the index over all leaf elements of this array, while `offset` is the index over + // all top-level elements of this array. + int rowId = 0; + for (int i = 0, offset = 0; i < definitionLevels.getElementsAppended(); + i = getNextCollectionStart(maxElementRepetitionLevel, i)) { + vector.reserve(rowId + 1); + int definitionLevel = definitionLevels.getInt(i); + if (definitionLevel <= maxDefinitionLevel) { + // This means the value is not an array element, but a collection that is either null or + // empty. In this case, we should increase offset to skip it when returning an array + // starting from the offset. + // + // For instance, considering an array of strings with 3 elements like the following: + // null, [], [a, b, c] + // the child array (which is of String type) in this case will be: + // null: 1 1 0 0 0 + // length: 0 0 1 1 1 + // offset: 0 0 0 1 2 + // and the array itself will be: + // null: 1 0 0 + // length: 0 0 3 + // offset: 0 1 2 + // + // It's important that for the third element `[a, b, c]`, the offset in the array + // (not the elements) starts from 2 since otherwise we'd include the first & second null + // element from child array in the result. + offset += 1; + } + if (definitionLevel <= maxDefinitionLevel - 1) { + // Collection is null or one of its optional parents is null + vector.putNull(rowId++); + } else if (definitionLevel == maxDefinitionLevel) { + // Collection is defined but empty + vector.putNotNull(rowId); + vector.putArray(rowId, offset, 0); + rowId++; + } else if (definitionLevel > maxDefinitionLevel) { + // Collection is defined and non-empty: find out how many top elements are there until the + // start of the next array. + vector.putNotNull(rowId); + int length = getCollectionSize(maxElementRepetitionLevel, i); + vector.putArray(rowId, offset, length); + offset += length; + rowId++; + } + } + vector.addElementsAppended(rowId); + } + + private void assembleStruct() { + int maxRepetitionLevel = column.repetitionLevel(); + int maxDefinitionLevel = column.definitionLevel(); + + vector.reserve(definitionLevels.getElementsAppended()); + + int rowId = 0; + boolean hasRepetitionLevels = repetitionLevels.getElementsAppended() > 0; + for (int i = 0; i < definitionLevels.getElementsAppended(); i++) { + // If repetition level > maxRepetitionLevel, the value is a nested element (e.g., an array + // element in struct>), and we should skip the definition level since it doesn't + // represent with the struct. + if (!hasRepetitionLevels || repetitionLevels.getInt(i) <= maxRepetitionLevel) { + if (definitionLevels.getInt(i) <= maxDefinitionLevel - 1) { + // Struct is null + vector.putNull(rowId); + rowId++; + } else if (definitionLevels.getInt(i) >= maxDefinitionLevel) { + vector.putNotNull(rowId); + rowId++; + } + } + } + vector.addElementsAppended(rowId); + } + + private static WritableColumnVector allocateLevelsVector(int capacity, MemoryMode memoryMode) { + switch (memoryMode) { + case ON_HEAP: + return new OnHeapColumnVector(capacity, DataTypes.IntegerType); + case OFF_HEAP: + return new OffHeapColumnVector(capacity, DataTypes.IntegerType); + default: + throw new IllegalArgumentException("Unknown memory mode: " + memoryMode); + } + } + + /** + * For a collection (i.e., array or map) element at index 'idx', returns the starting index of + * the next collection after it. + * + * @param maxRepetitionLevel the maximum repetition level for the elements in this collection + * @param idx the index of this collection in the Parquet column + * @return the starting index of the next collection + */ + private int getNextCollectionStart(int maxRepetitionLevel, int idx) { + idx += 1; + for (; idx < repetitionLevels.getElementsAppended(); idx++) { + if (repetitionLevels.getInt(idx) <= maxRepetitionLevel) { + break; + } + } + return idx; + } + + /** + * Gets the size of a collection (i.e., array or map) element, starting at 'idx'. + * + * @param maxRepetitionLevel the maximum repetition level for the elements in this collection + * @param idx the index of this collection in the Parquet column + * @return the size of this collection + */ + private int getCollectionSize(int maxRepetitionLevel, int idx) { + int size = 1; + for (idx += 1; idx < repetitionLevels.getElementsAppended(); idx++) { + if (repetitionLevels.getInt(idx) <= maxRepetitionLevel) { + break; + } else if (repetitionLevels.getInt(idx) <= maxRepetitionLevel + 1) { + // Only count elements which belong to the current collection + // For instance, suppose we have the following Parquet schema: + // + // message schema { max rl max dl + // optional group col (LIST) { 0 1 + // repeated group list { 1 2 + // optional group element (LIST) { 1 3 + // repeated group list { 2 4 + // required int32 element; 2 4 + // } + // } + // } + // } + // } + // + // For a list such as: [[[0, 1], [2, 3]], [[4, 5], [6, 7]]], the repetition & definition + // levels would be: + // + // repetition levels: [0, 2, 1, 2, 0, 2, 1, 2] + // definition levels: [2, 2, 2, 2, 2, 2, 2, 2] + // + // When calculating collection size for the outer array, we should only count repetition + // levels whose value is <= 1 (which is the max repetition level for the inner array) + size++; + } + } + return size; + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java index b26088753465..bde69402241c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.parquet; +import org.apache.parquet.column.ColumnDescriptor; + import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -42,24 +44,52 @@ final class ParquetReadState { /** The current row range */ private RowRange currentRange; + /** Maximum repetition level for the Parquet column */ + final int maxRepetitionLevel; + /** Maximum definition level for the Parquet column */ final int maxDefinitionLevel; + /** Whether this column is required */ + final boolean isRequired; + /** The current index over all rows within the column chunk. This is used to check if the * current row should be skipped by comparing against the row ranges. */ long rowId; - /** The offset in the current batch to put the next value */ - int offset; + /** The offset in the current batch to put the next value in value vector */ + int valueOffset; + + /** The offset in the current batch to put the next value in repetition & definition vector */ + int levelOffset; /** The remaining number of values to read in the current page */ int valuesToReadInPage; - /** The remaining number of values to read in the current batch */ - int valuesToReadInBatch; + /** The remaining number of rows to read in the current batch */ + int rowsToReadInBatch; + + + /* The following fields are only used when reading repeated values */ + + /** When processing repeated values, whether we've found the beginning of the first list after the + * current batch. */ + boolean lastListCompleted; - ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong rowIndexes) { - this.maxDefinitionLevel = maxDefinitionLevel; + /** When processing repeated types, the number of accumulated definition levels to process */ + int numBatchedDefLevels; + + /** When processing repeated types, whether we should skip the current batch of definition + * levels. */ + boolean shouldSkip; + + ParquetReadState( + ColumnDescriptor descriptor, + boolean isRequired, + PrimitiveIterator.OfLong rowIndexes) { + this.maxRepetitionLevel = descriptor.getMaxRepetitionLevel(); + this.maxDefinitionLevel = descriptor.getMaxDefinitionLevel(); + this.isRequired = isRequired; this.rowRanges = constructRanges(rowIndexes); nextRange(); } @@ -101,8 +131,12 @@ private Iterator constructRanges(PrimitiveIterator.OfLong rowIndexes) * Must be called at the beginning of reading a new batch. */ void resetForNewBatch(int batchSize) { - this.offset = 0; - this.valuesToReadInBatch = batchSize; + this.valueOffset = 0; + this.levelOffset = 0; + this.rowsToReadInBatch = batchSize; + this.lastListCompleted = this.maxRepetitionLevel == 0; // always true for non-repeated column + this.numBatchedDefLevels = 0; + this.shouldSkip = false; } /** @@ -127,16 +161,6 @@ long currentRangeEnd() { return currentRange.end; } - /** - * Advance the current offset and rowId to the new values. - */ - void advanceOffsetAndRowId(int newOffset, long newRowId) { - valuesToReadInBatch -= (newOffset - offset); - valuesToReadInPage -= (newRowId - rowId); - offset = newOffset; - rowId = newRowId; - } - /** * Advance to the next range. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 5669534cd111..292a0f98af1c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -74,6 +74,7 @@ public abstract class SpecificParquetRecordReaderBase extends RecordReader columns) throws IOException } } fileReader.setRequestedSchema(requestedSchema); - this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema); + this.parquetColumn = new ParquetToSparkSchemaConverter(config) + .convertParquetColumn(requestedSchema, Option.empty()); + this.sparkSchema = (StructType) parquetColumn.sparkType(); this.totalRowCount = fileReader.getFilteredRecordCount(); } @@ -191,7 +198,9 @@ protected void initialize( config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false); config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false); config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false); - this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema); + this.parquetColumn = new ParquetToSparkSchemaConverter(config) + .convertParquetColumn(requestedSchema, Option.empty()); + this.sparkSchema = (StructType) parquetColumn.sparkType(); this.totalRowCount = totalRowCount; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index ee09d2b2a3be..c2e85da3884a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.time.ZoneId; -import java.util.PrimitiveIterator; import org.apache.parquet.CorruptDeltaByteArrays; import org.apache.parquet.VersionParser.ParsedVersion; @@ -41,7 +40,6 @@ import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.types.Decimal; -import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; @@ -69,6 +67,11 @@ public class VectorizedColumnReader { */ private VectorizedRleValuesReader defColumn; + /** + * Vectorized RLE decoder for repetition levels + */ + private VectorizedRleValuesReader repColumn; + /** * Factory to get type-specific vector updater. */ @@ -93,9 +96,8 @@ public class VectorizedColumnReader { public VectorizedColumnReader( ColumnDescriptor descriptor, - LogicalTypeAnnotation logicalTypeAnnotation, - PageReader pageReader, - PrimitiveIterator.OfLong rowIndexes, + boolean isRequired, + PageReadStore pageReadStore, ZoneId convertTz, String datetimeRebaseMode, String datetimeRebaseTz, @@ -103,9 +105,10 @@ public VectorizedColumnReader( String int96RebaseTz, ParsedVersion writerVersion) throws IOException { this.descriptor = descriptor; - this.pageReader = pageReader; - this.readState = new ParquetReadState(descriptor.getMaxDefinitionLevel(), rowIndexes); - this.logicalTypeAnnotation = logicalTypeAnnotation; + this.pageReader = pageReadStore.getPageReader(descriptor); + this.readState = new ParquetReadState(descriptor, isRequired, + pageReadStore.getRowIndexes().orElse(null)); + this.logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); this.updaterFactory = new ParquetVectorUpdaterFactory( logicalTypeAnnotation, convertTz, @@ -161,9 +164,13 @@ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName } /** - * Reads `total` values from this columnReader into column. + * Reads `total` rows from this columnReader into column. */ - void readBatch(int total, WritableColumnVector column) throws IOException { + void readBatch( + int total, + WritableColumnVector column, + WritableColumnVector repetitionLevels, + WritableColumnVector definitionLevels) throws IOException { WritableColumnVector dictionaryIds = null; ParquetVectorUpdater updater = updaterFactory.getUpdater(descriptor, column.dataType()); @@ -174,22 +181,32 @@ void readBatch(int total, WritableColumnVector column) throws IOException { dictionaryIds = column.reserveDictionaryIds(total); } readState.resetForNewBatch(total); - while (readState.valuesToReadInBatch > 0) { + while (readState.rowsToReadInBatch > 0 || !readState.lastListCompleted) { if (readState.valuesToReadInPage == 0) { int pageValueCount = readPage(); + if (pageValueCount < 0) { + // we've read all the pages; this could happen when we're reading a repeated list and we + // don't know where the list will end until we've seen all the pages. + break; + } readState.resetForNewPage(pageValueCount, pageFirstRowIndex); } PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName(); if (isCurrentPageDictionaryEncoded) { // Save starting offset in case we need to decode dictionary IDs. - int startOffset = readState.offset; + int startOffset = readState.valueOffset; // Save starting row index so we can check if we need to eagerly decode dict ids later long startRowId = readState.rowId; // Read and decode dictionary ids. - defColumn.readIntegers(readState, dictionaryIds, column, - (VectorizedValuesReader) dataColumn); + if (readState.maxRepetitionLevel == 0) { + defColumn.readIntegers(readState, dictionaryIds, column, definitionLevels, + (VectorizedValuesReader) dataColumn); + } else { + repColumn.readIntegersRepeated(readState, repetitionLevels, defColumn, definitionLevels, + dictionaryIds, column, (VectorizedValuesReader) dataColumn); + } // TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post process // the values to add microseconds precision. @@ -220,24 +237,32 @@ void readBatch(int total, WritableColumnVector column) throws IOException { boolean needTransform = castLongToInt || isUnsignedInt32 || isUnsignedInt64; column.setDictionary(new ParquetDictionary(dictionary, needTransform)); } else { - updater.decodeDictionaryIds(readState.offset - startOffset, startOffset, column, + updater.decodeDictionaryIds(readState.valueOffset - startOffset, startOffset, column, dictionaryIds, dictionary); } } else { - if (column.hasDictionary() && readState.offset != 0) { + if (column.hasDictionary() && readState.valueOffset != 0) { // This batch already has dictionary encoded values but this new page is not. The batch // does not support a mix of dictionary and not so we will decode the dictionary. - updater.decodeDictionaryIds(readState.offset, 0, column, dictionaryIds, dictionary); + updater.decodeDictionaryIds(readState.valueOffset, 0, column, dictionaryIds, dictionary); } column.setDictionary(null); VectorizedValuesReader valuesReader = (VectorizedValuesReader) dataColumn; - defColumn.readBatch(readState, column, valuesReader, updater); + if (readState.maxRepetitionLevel == 0) { + defColumn.readBatch(readState, column, definitionLevels, valuesReader, updater); + } else { + repColumn.readBatchRepeated(readState, repetitionLevels, defColumn, definitionLevels, + column, valuesReader, updater); + } } } } private int readPage() { DataPage page = pageReader.readPage(); + if (page == null) { + return -1; + } this.pageFirstRowIndex = page.getFirstRowIndex().orElse(0L); return page.accept(new DataPage.Visitor() { @@ -328,18 +353,18 @@ private int readPageV1(DataPageV1 page) throws IOException { } int pageValueCount = page.getValueCount(); - int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel()); - this.defColumn = new VectorizedRleValuesReader(bitWidth); + int rlBitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxRepetitionLevel()); + this.repColumn = new VectorizedRleValuesReader(rlBitWidth); + + int dlBitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel()); + this.defColumn = new VectorizedRleValuesReader(dlBitWidth); + try { BytesInput bytes = page.getBytes(); ByteBufferInputStream in = bytes.toInputStream(); - // only used now to consume the repetition level data - page.getRlEncoding() - .getValuesReader(descriptor, REPETITION_LEVEL) - .initFromPage(pageValueCount, in); - + repColumn.initFromPage(pageValueCount, in); defColumn.initFromPage(pageValueCount, in); initDataReader(pageValueCount, page.getValueEncoding(), in); return pageValueCount; @@ -350,11 +375,16 @@ private int readPageV1(DataPageV1 page) throws IOException { private int readPageV2(DataPageV2 page) throws IOException { int pageValueCount = page.getValueCount(); - int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel()); // do not read the length from the stream. v2 pages handle dividing the page bytes. - defColumn = new VectorizedRleValuesReader(bitWidth, false); + int rlBitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxRepetitionLevel()); + repColumn = new VectorizedRleValuesReader(rlBitWidth, false); + repColumn.initFromPage(pageValueCount, page.getRepetitionLevels().toInputStream()); + + int dlBitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel()); + defColumn = new VectorizedRleValuesReader(dlBitWidth, false); defColumn.initFromPage(pageValueCount, page.getDefinitionLevels().toInputStream()); + try { initDataReader(pageValueCount, page.getDataEncoding(), page.getData().toInputStream()); return pageValueCount; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index da23b5fcec28..80f6f88810a1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -20,13 +20,17 @@ import java.io.IOException; import java.time.ZoneId; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import scala.collection.JavaConverters; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; @@ -44,7 +48,7 @@ * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the * Parquet column APIs. This is somewhat based on parquet-mr's ColumnReader. * - * TODO: handle complex types, decimal requiring more than 8 bytes, INT96. Schema mismatch. + * TODO: decimal requiring more than 8 bytes, INT96. Schema mismatch. * All of these can be handled efficiently and easily with codegen. * * This class can either return InternalRows or ColumnarBatches. With whole stage codegen @@ -64,10 +68,10 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa private int numBatched = 0; /** - * For each request column, the reader to read this column. This is NULL if this column - * is missing from the file, in which case we populate the attribute with NULL. + * Encapsulate writable column vectors with other Parquet related info such as + * repetition / definition levels. */ - private VectorizedColumnReader[] columnReaders; + private ParquetColumnVector[] columnVectors; /** * The number of rows that have been returned. @@ -80,9 +84,10 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa private long totalCountLoadedSoFar = 0; /** - * For each column, true if the column is missing in the file and we'll instead return NULLs. + * For each leaf column, if it is in the set, it means the column is missing in the file and + * we'll instead return NULLs. */ - private boolean[] missingColumns; + private Set missingColumns; /** * The timezone that timestamp INT96 values should be converted to. Null if no conversion. Here to @@ -120,8 +125,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa */ private ColumnarBatch columnarBatch; - private WritableColumnVector[] columnVectors; - /** * If true, this class returns batches instead of rows. */ @@ -246,25 +249,25 @@ private void initBatch( } } + WritableColumnVector[] vectors; if (memMode == MemoryMode.OFF_HEAP) { - columnVectors = OffHeapColumnVector.allocateColumns(capacity, batchSchema); + vectors = OffHeapColumnVector.allocateColumns(capacity, batchSchema); } else { - columnVectors = OnHeapColumnVector.allocateColumns(capacity, batchSchema); + vectors = OnHeapColumnVector.allocateColumns(capacity, batchSchema); + } + columnarBatch = new ColumnarBatch(vectors); + + columnVectors = new ParquetColumnVector[sparkSchema.fields().length]; + for (int i = 0; i < columnVectors.length; i++) { + columnVectors[i] = new ParquetColumnVector(parquetColumn.children().apply(i), + vectors[i], capacity, memMode, missingColumns); } - columnarBatch = new ColumnarBatch(columnVectors); + if (partitionColumns != null) { int partitionIdx = sparkSchema.fields().length; for (int i = 0; i < partitionColumns.fields().length; i++) { - ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i); - columnVectors[i + partitionIdx].setIsConstant(); - } - } - - // Initialize missing columns with nulls. - for (int i = 0; i < missingColumns.length; i++) { - if (missingColumns[i]) { - columnVectors[i].putNulls(0, capacity); - columnVectors[i].setIsConstant(); + ColumnVectorUtils.populate(vectors[i + partitionIdx], partitionValues, i); + vectors[i + partitionIdx].setIsConstant(); } } } @@ -298,7 +301,7 @@ public void enableReturningBatches() { * Advances to the next batch of rows. Returns false if there are no more. */ public boolean nextBatch() throws IOException { - for (WritableColumnVector vector : columnVectors) { + for (ParquetColumnVector vector : columnVectors) { vector.reset(); } columnarBatch.setNumRows(0); @@ -306,10 +309,17 @@ public boolean nextBatch() throws IOException { checkEndOfRowGroup(); int num = (int) Math.min(capacity, totalCountLoadedSoFar - rowsReturned); - for (int i = 0; i < columnReaders.length; ++i) { - if (columnReaders[i] == null) continue; - columnReaders[i].readBatch(num, columnVectors[i]); + for (ParquetColumnVector cv : columnVectors) { + for (ParquetColumnVector leafCv : cv.getLeaves()) { + VectorizedColumnReader columnReader = leafCv.getColumnReader(); + if (columnReader != null) { + columnReader.readBatch(num, leafCv.getValueVector(), + leafCv.getRepetitionLevelVector(), leafCv.getDefinitionLevelVector()); + } + } + cv.assemble(); } + rowsReturned += num; columnarBatch.setNumRows(num); numBatched = num; @@ -318,34 +328,61 @@ public boolean nextBatch() throws IOException { } private void initializeInternal() throws IOException, UnsupportedOperationException { - // Check that the requested schema is supported. - missingColumns = new boolean[requestedSchema.getFieldCount()]; - List columns = requestedSchema.getColumns(); - List paths = requestedSchema.getPaths(); - for (int i = 0; i < requestedSchema.getFieldCount(); ++i) { - Type t = requestedSchema.getFields().get(i); - if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) { - throw new UnsupportedOperationException("Complex types not supported."); - } + missingColumns = new HashSet<>(); + for (ParquetColumn column : JavaConverters.seqAsJavaList(parquetColumn.children())) { + checkColumn(column); + } + } - String[] colPath = paths.get(i); - if (fileSchema.containsPath(colPath)) { - ColumnDescriptor fd = fileSchema.getColumnDescription(colPath); - if (!fd.equals(columns.get(i))) { + /** + * Check whether a column from requested schema is missing from the file schema, or whether it + * conforms to the type of the file schema. + */ + private void checkColumn(ParquetColumn column) throws IOException { + String[] path = JavaConverters.seqAsJavaList(column.path()).toArray(new String[0]); + if (containsPath(fileSchema, path)) { + if (column.isPrimitive()) { + ColumnDescriptor desc = column.descriptor().get(); + ColumnDescriptor fd = fileSchema.getColumnDescription(desc.getPath()); + if (!fd.equals(desc)) { throw new UnsupportedOperationException("Schema evolution not supported."); } - missingColumns[i] = false; } else { - if (columns.get(i).getMaxDefinitionLevel() == 0) { - // Column is missing in data but the required data is non-nullable. This file is invalid. - throw new IOException("Required column is missing in data file. Col: " + - Arrays.toString(colPath)); + for (ParquetColumn childColumn : JavaConverters.seqAsJavaList(column.children())) { + checkColumn(childColumn); } - missingColumns[i] = true; } + } else { // A missing column which is either primitive or complex + if (column.required()) { + // Column is missing in data but the required data is non-nullable. This file is invalid. + throw new IOException("Required column is missing in data file. Col: " + + Arrays.toString(path)); + } + missingColumns.add(column); } } + /** + * Checks whether the given 'path' exists in 'parquetType'. The difference between this and + * {@link MessageType#containsPath(String[])} is that the latter only support paths to leaf + * nodes, while this support paths both to leaf and non-leaf nodes. + */ + private boolean containsPath(Type parquetType, String[] path) { + return containsPath(parquetType, path, 0); + } + + private boolean containsPath(Type parquetType, String[] path, int depth) { + if (path.length == depth) return true; + if (parquetType instanceof GroupType) { + String fieldName = path[depth]; + GroupType parquetGroupType = (GroupType) parquetType; + if (parquetGroupType.containsField(fieldName)) { + return containsPath(parquetGroupType.getType(fieldName), path, depth + 1); + } + } + return false; + } + private void checkEndOfRowGroup() throws IOException { if (rowsReturned != totalCountLoadedSoFar) return; PageReadStore pages = reader.readNextRowGroup(); @@ -353,23 +390,26 @@ private void checkEndOfRowGroup() throws IOException { throw new IOException("expecting more rows but reached last block. Read " + rowsReturned + " out of " + totalRowCount); } - List columns = requestedSchema.getColumns(); - List types = requestedSchema.asGroupType().getFields(); - columnReaders = new VectorizedColumnReader[columns.size()]; - for (int i = 0; i < columns.size(); ++i) { - if (missingColumns[i]) continue; - columnReaders[i] = new VectorizedColumnReader( - columns.get(i), - types.get(i).getLogicalTypeAnnotation(), - pages.getPageReader(columns.get(i)), - pages.getRowIndexes().orElse(null), - convertTz, - datetimeRebaseMode, - datetimeRebaseTz, - int96RebaseMode, - int96RebaseTz, - writerVersion); + for (ParquetColumnVector cv : columnVectors) { + initColumnReader(pages, cv); } totalCountLoadedSoFar += pages.getRowCount(); } + + private void initColumnReader(PageReadStore pages, ParquetColumnVector cv) throws IOException { + if (!missingColumns.contains(cv.getColumn())) { + if (cv.getColumn().isPrimitive()) { + ParquetColumn column = cv.getColumn(); + VectorizedColumnReader reader = new VectorizedColumnReader( + column.descriptor().get(), column.required(), pages, convertTz, datetimeRebaseMode, + datetimeRebaseTz, int96RebaseMode, int96RebaseTz, writerVersion); + cv.setColumnReader(reader); + } else { + // Not in missing columns and is a complex type: this must be a struct + for (ParquetColumnVector childCv : cv.getChildren()) { + initColumnReader(pages, childCv); + } + } + } + } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index bd7cbc7e1718..2cc763a5b725 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -96,13 +96,13 @@ public VectorizedRleValuesReader(int bitWidth, boolean readLength) { public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { this.in = in; if (fixedWidth) { - // initialize for repetition and definition levels + // Initialize for repetition and definition levels if (readLength) { int length = readIntLittleEndian(); this.in = in.sliceStream(length); } } else { - // initialize for values + // Initialize for values if (in.available() > 0) { init(in.read()); } @@ -157,47 +157,52 @@ public int readInteger() { } /** - * Reads a batch of values into vector `values`, using `valueReader`. The related states such - * as row index, offset, number of values left in the batch and page, etc, are tracked by - * `state`. The type-specific `updater` is used to update or skip values. + * Reads a batch of definition levels and values into vector 'defLevels' and 'values' + * respectively. The values are read using 'valueReader'. *

- * This reader reads the definition levels and then will read from `valueReader` for the - * non-null values. If the value is null, `values` will be populated with null value. + * The related states such as row index, offset, number of values left in the batch and page, + * are tracked by 'state'. The type-specific 'updater' is used to update or skip values. + *

+ * This reader reads the definition levels and then will read from 'valueReader' for the + * non-null values. If the value is null, 'values' will be populated with null value. */ public void readBatch( ParquetReadState state, WritableColumnVector values, + WritableColumnVector defLevels, VectorizedValuesReader valueReader, ParquetVectorUpdater updater) { - readBatchInternal(state, values, values, valueReader, updater); + readBatchInternal(state, values, values, defLevels, valueReader, updater); } /** - * Decoding for dictionary ids. The IDs are populated into `values` and the nullability is - * populated into `nulls`. + * Decoding for dictionary ids. The IDs are populated into 'values' and the nullability is + * populated into 'nulls'. */ public void readIntegers( ParquetReadState state, WritableColumnVector values, WritableColumnVector nulls, - VectorizedValuesReader data) { - readBatchInternal(state, values, nulls, data, new ParquetVectorUpdaterFactory.IntegerUpdater()); + WritableColumnVector defLevels, + VectorizedValuesReader valueReader) { + readBatchInternal(state, values, nulls, defLevels, valueReader, + new ParquetVectorUpdaterFactory.IntegerUpdater()); } private void readBatchInternal( ParquetReadState state, WritableColumnVector values, WritableColumnVector nulls, + WritableColumnVector defLevels, VectorizedValuesReader valueReader, ParquetVectorUpdater updater) { - int offset = state.offset; long rowId = state.rowId; - int leftInBatch = state.valuesToReadInBatch; + int leftInBatch = state.rowsToReadInBatch; int leftInPage = state.valuesToReadInPage; while (leftInBatch > 0 && leftInPage > 0) { - if (this.currentCount == 0) this.readNextGroup(); + if (currentCount == 0 && !readNextGroup()) break; int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount)); long rangeStart = state.currentRangeStart(); @@ -210,11 +215,11 @@ private void readBatchInternal( } else if (rowId > rangeEnd) { state.nextRange(); } else { - // the range [rowId, rowId + n) overlaps with the current row range in state + // The range [rowId, rowId + n) overlaps with the current row range in state long start = Math.max(rangeStart, rowId); long end = Math.min(rangeEnd, rowId + n - 1); - // skip the part [rowId, start) + // Skip the part [rowId, start) int toSkip = (int) (start - rowId); if (toSkip > 0) { skipValues(toSkip, state, valueReader, updater); @@ -222,36 +227,347 @@ private void readBatchInternal( leftInPage -= toSkip; } - // read the part [start, end] + // Read the part [start, end] n = (int) (end - start + 1); + readValuesN(n, state, defLevels, values, nulls, valueReader, updater); + + state.levelOffset += n; + leftInBatch -= n; + rowId += n; + leftInPage -= n; + currentCount -= n; + defLevels.addElementsAppended(n); + } + } + + state.rowsToReadInBatch = leftInBatch; + state.valuesToReadInPage = leftInPage; + state.rowId = rowId; + } - switch (mode) { - case RLE: - if (currentValue == state.maxDefinitionLevel) { - updater.readValues(n, offset, values, valueReader); + /** + * Reads a batch of repetition levels, definition levels and values into 'repLevels', + * 'defLevels' and 'values' respectively. The definition levels and values are read via + * 'defLevelsReader' and 'valueReader' respectively. + *

+ * The related states such as row index, offset, number of rows left in the batch and page, + * are tracked by 'state'. The type-specific 'updater' is used to update or skip values. + */ + public void readBatchRepeated( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + readBatchRepeatedInternal(state, repLevels, defLevelsReader, defLevels, values, values, true, + valueReader, updater); + } + + /** + * Reads a batch of repetition levels, definition levels and integer values into 'repLevels', + * 'defLevels', 'values' and 'nulls' respectively. The definition levels and values are read via + * 'defLevelsReader' and 'valueReader' respectively. + *

+ * The 'values' vector is used to hold non-null values, while 'nulls' vector is used to hold + * null values. + *

+ * The related states such as row index, offset, number of rows left in the batch and page, + * are tracked by 'state'. + *

+ * Unlike 'readBatchRepeated', this is used to decode dictionary indices in dictionary encoding. + */ + public void readIntegersRepeated( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + WritableColumnVector nulls, + VectorizedValuesReader valueReader) { + readBatchRepeatedInternal(state, repLevels, defLevelsReader, defLevels, values, nulls, false, + valueReader, new ParquetVectorUpdaterFactory.IntegerUpdater()); + } + + /** + * Keep reading repetition level values from the page until either: 1) we've read enough + * top-level rows to fill the current batch, or 2) we've drained the data page completely. + * + * @param valuesReused whether 'values' vector is reused for 'nulls' + */ + public void readBatchRepeatedInternal( + ParquetReadState state, + WritableColumnVector repLevels, + VectorizedRleValuesReader defLevelsReader, + WritableColumnVector defLevels, + WritableColumnVector values, + WritableColumnVector nulls, + boolean valuesReused, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + + int leftInBatch = state.rowsToReadInBatch; + int leftInPage = state.valuesToReadInPage; + long rowId = state.rowId; + + DefLevelProcessor defLevelProcessor = new DefLevelProcessor(defLevelsReader, state, defLevels, + values, nulls, valuesReused, valueReader, updater); + + while ((leftInBatch > 0 || !state.lastListCompleted) && leftInPage > 0) { + if (currentCount == 0 && !readNextGroup()) break; + + // Values to read in the current RLE/PACKED block, must be <= what's left in the page + int valuesLeftInBlock = Math.min(leftInPage, currentCount); + + // The current row range start and end + long rangeStart = state.currentRangeStart(); + long rangeEnd = state.currentRangeEnd(); + + switch (mode) { + case RLE: + // This RLE block is consist of top-level rows, so we'll need to check + // if the rows should be skipped according to row indexes. + if (currentValue == 0) { + if (leftInBatch == 0) { + state.lastListCompleted = true; } else { - nulls.putNulls(offset, n); + // # of rows to read in the block, must be <= what's left in the current batch + int n = Math.min(leftInBatch, valuesLeftInBlock); + + if (rowId + n < rangeStart) { + // Need to skip all rows in [rowId, rowId + n) + defLevelProcessor.skipValues(n); + rowId += n; + currentCount -= n; + leftInPage -= n; + } else if (rowId > rangeEnd) { + // The current row index already beyond the current range: move to the next range + // and repeat + state.nextRange(); + } else { + // The range [rowId, rowId + n) overlaps with the current row range + long start = Math.max(rangeStart, rowId); + long end = Math.min(rangeEnd, rowId + n - 1); + + // Skip the rows in [rowId, start) + int toSkip = (int) (start - rowId); + if (toSkip > 0) { + defLevelProcessor.skipValues(toSkip); + rowId += toSkip; + currentCount -= toSkip; + leftInPage -= toSkip; + } + + // Read the rows in [start, end] + n = (int) (end - start + 1); + + if (n > 0) { + repLevels.appendInts(n, 0); + defLevelProcessor.readValues(n); + } + + rowId += n; + currentCount -= n; + leftInBatch -= n; + leftInPage -= n; + } + } + } else { + // Not a top-level row: just read all the repetition levels in the block if the row + // should be included according to row indexes, else skip the rows. + if (!state.shouldSkip) { + repLevels.appendInts(valuesLeftInBlock, currentValue); } - break; - case PACKED: - for (int i = 0; i < n; ++i) { - if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { - updater.readValue(offset + i, values, valueReader); + state.numBatchedDefLevels += valuesLeftInBlock; + leftInPage -= valuesLeftInBlock; + currentCount -= valuesLeftInBlock; + } + break; + case PACKED: + int i = 0; + + for (; i < valuesLeftInBlock; i++) { + int currentValue = currentBuffer[currentBufferIdx + i]; + if (currentValue == 0) { + if (leftInBatch == 0) { + state.lastListCompleted = true; + break; + } else if (rowId < rangeStart) { + // This is a top-level row, therefore check if we should skip it with row indexes + // the row is before the current range, skip it + defLevelProcessor.skipValues(1); + } else if (rowId > rangeEnd) { + // The row is after the current range, move to the next range and compare again + state.nextRange(); + break; } else { - nulls.putNull(offset + i); + // The row is in the current range, decrement the row counter and read it + leftInBatch--; + repLevels.appendInt(0); + defLevelProcessor.readValues(1); + } + rowId++; + } else { + if (!state.shouldSkip) { + repLevels.appendInt(currentValue); } + state.numBatchedDefLevels += 1; } - break; + } + + leftInPage -= i; + currentCount -= i; + currentBufferIdx += i; + break; + } + } + + // Process all the batched def levels + defLevelProcessor.finish(); + + state.rowsToReadInBatch = leftInBatch; + state.valuesToReadInPage = leftInPage; + state.rowId = rowId; + } + + private static class DefLevelProcessor { + private final VectorizedRleValuesReader reader; + private final ParquetReadState state; + private final WritableColumnVector defLevels; + private final WritableColumnVector values; + private final WritableColumnVector nulls; + private final boolean valuesReused; + private final VectorizedValuesReader valueReader; + private final ParquetVectorUpdater updater; + + DefLevelProcessor( + VectorizedRleValuesReader reader, + ParquetReadState state, + WritableColumnVector defLevels, + WritableColumnVector values, + WritableColumnVector nulls, + boolean valuesReused, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + this.reader = reader; + this.state = state; + this.defLevels = defLevels; + this.values = values; + this.nulls = nulls; + this.valuesReused = valuesReused; + this.valueReader = valueReader; + this.updater = updater; + } + + void readValues(int n) { + if (!state.shouldSkip) { + state.numBatchedDefLevels += n; + } else { + reader.skipValues(state.numBatchedDefLevels, state, valueReader, updater); + state.numBatchedDefLevels = n; + state.shouldSkip = false; + } + } + + void skipValues(int n) { + if (state.shouldSkip) { + state.numBatchedDefLevels += n; + } else { + reader.readValues(state.numBatchedDefLevels, state, defLevels, values, nulls, valuesReused, + valueReader, updater); + state.numBatchedDefLevels = n; + state.shouldSkip = true; + } + } + + void finish() { + if (state.numBatchedDefLevels > 0) { + if (state.shouldSkip) { + reader.skipValues(state.numBatchedDefLevels, state, valueReader, updater); + } else { + reader.readValues(state.numBatchedDefLevels, state, defLevels, values, nulls, + valuesReused, valueReader, updater); } - offset += n; - leftInBatch -= n; - rowId += n; - leftInPage -= n; - currentCount -= n; + state.numBatchedDefLevels = 0; } } + } + + /** + * Read the next 'total' values (either null or non-null) from this definition level reader and + * 'valueReader'. The definition levels are read into 'defLevels'. If a value is not + * null, it is appended to 'values'. Otherwise, a null bit will be set in 'nulls'. + * + * This is only used when reading repeated values. + */ + private void readValues( + int total, + ParquetReadState state, + WritableColumnVector defLevels, + WritableColumnVector values, + WritableColumnVector nulls, + boolean valuesReused, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + + defLevels.reserveAdditional(total); + values.reserveAdditional(total); + if (!valuesReused) { + // 'nulls' is a separate column vector so we'll have to reserve it separately + nulls.reserveAdditional(total); + } + + int n = total; + int initialValueOffset = state.valueOffset; + while (n > 0) { + if (currentCount == 0 && !readNextGroup()) break; + int num = Math.min(n, this.currentCount); + readValuesN(num, state, defLevels, values, nulls, valueReader, updater); + state.levelOffset += num; + currentCount -= num; + n -= num; + } + + defLevels.addElementsAppended(total); + + int valuesRead = state.valueOffset - initialValueOffset; + values.addElementsAppended(valuesRead); + if (!valuesReused) { + nulls.addElementsAppended(valuesRead); + } + } - state.advanceOffsetAndRowId(offset, rowId); + private void readValuesN( + int n, + ParquetReadState state, + WritableColumnVector defLevels, + WritableColumnVector values, + WritableColumnVector nulls, + VectorizedValuesReader valueReader, + ParquetVectorUpdater updater) { + switch (mode) { + case RLE: + if (currentValue == state.maxDefinitionLevel) { + updater.readValues(n, state.valueOffset, values, valueReader); + } else { + nulls.putNulls(state.valueOffset, n); + } + state.valueOffset += n; + defLevels.putInts(state.levelOffset, n, currentValue); + break; + case PACKED: + for (int i = 0; i < n; ++i) { + int currentValue = currentBuffer[currentBufferIdx++]; + if (currentValue == state.maxDefinitionLevel) { + updater.readValue(state.valueOffset++, values, valueReader); + } else { + nulls.putNull(state.valueOffset++); + } + defLevels.putInt(state.levelOffset + i, currentValue); + } + break; + } } /** @@ -264,11 +580,11 @@ private void skipValues( VectorizedValuesReader valuesReader, ParquetVectorUpdater updater) { while (n > 0) { - if (this.currentCount == 0) this.readNextGroup(); + if (currentCount == 0 && !readNextGroup()) break; int num = Math.min(n, this.currentCount); switch (mode) { case RLE: - // we only need to skip non-null values from `valuesReader` since nulls are represented + // We only need to skip non-null values from `valuesReader` since nulls are represented // via definition levels which are skipped here via decrementing `currentCount`. if (currentValue == state.maxDefinitionLevel) { updater.skipValues(num, valuesReader); @@ -276,7 +592,7 @@ private void skipValues( break; case PACKED: for (int i = 0; i < num; ++i) { - // same as above, only skip non-null values from `valuesReader` + // Same as above, only skip non-null values from `valuesReader` if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { updater.skipValues(1, valuesReader); } @@ -295,7 +611,7 @@ private void skipValues( public void readIntegers(int total, WritableColumnVector c, int rowId) { int left = total; while (left > 0) { - if (this.currentCount == 0) this.readNextGroup(); + if (currentCount == 0 && !readNextGroup()) break; int n = Math.min(left, this.currentCount); switch (mode) { case RLE: @@ -505,9 +821,14 @@ private int readIntLittleEndianPaddedOnBitWidth() throws IOException { } /** - * Reads the next group. + * Reads the next group. Returns false if no more group available. */ - private void readNextGroup() { + private boolean readNextGroup() { + if (in.available() <= 0) { + currentCount = 0; + return false; + } + try { int header = readUnsignedVarInt(); this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED; @@ -515,7 +836,7 @@ private void readNextGroup() { case RLE: this.currentCount = header >>> 1; this.currentValue = readIntLittleEndianPaddedOnBitWidth(); - return; + break; case PACKED: int numGroups = header >>> 1; this.currentCount = numGroups * 8; @@ -531,13 +852,15 @@ private void readNextGroup() { this.packer.unpack8Values(buffer, buffer.position(), this.currentBuffer, valueIndex); valueIndex += 8; } - return; + break; default: throw new ParquetDecodingException("not a valid mode " + this.mode); } } catch (IOException e) { throw new ParquetDecodingException("Failed to read from input stream", e); } + + return true; } /** @@ -546,7 +869,7 @@ private void readNextGroup() { private void skipValues(int n) { int left = n; while (left > 0) { - if (this.currentCount == 0) this.readNextGroup(); + if (this.currentCount == 0 && !readNextGroup()) break; int num = Math.min(left, this.currentCount); switch (mode) { case RLE: diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index d246a3c24e4a..505377bdb683 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -127,7 +127,7 @@ public void putNotNulls(int rowId, int count) { @Override public boolean isNullAt(int rowId) { - return nulls[rowId] == 1; + return isAllNull || nulls[rowId] == 1; } // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index ae457a16123d..9ffb733a461a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -53,7 +53,7 @@ public abstract class WritableColumnVector extends ColumnVector { * Resets this column for writing. The currently stored values are no longer accessible. */ public void reset() { - if (isConstant) return; + if (isConstant || isAllNull) return; if (childColumns != null) { for (WritableColumnVector c: childColumns) { @@ -83,6 +83,10 @@ public void close() { dictionary = null; } + public void reserveAdditional(int additionalCapacity) { + reserve(elementsAppended + additionalCapacity); + } + public void reserve(int requiredCapacity) { if (requiredCapacity < 0) { throwUnsupportedException(requiredCapacity, null); @@ -117,7 +121,7 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { @Override public boolean hasNull() { - return numNulls > 0; + return isAllNull || numNulls > 0; } @Override @@ -714,15 +718,47 @@ public WritableColumnVector arrayData() { public WritableColumnVector getChild(int ordinal) { return childColumns[ordinal]; } /** - * Returns the elements appended. + * Returns the number of child vectors. + */ + public int getNumChildren() { + return childColumns.length; + } + + /** + * Returns the elements appended. This is useful */ public final int getElementsAppended() { return elementsAppended; } + /** + * Increment number of elements appended by 'num'. + * + * This is useful when one wants to use the 'putXXX' API to add new elements to the vector, but + * still want to keep count of how many elements have been added (since the 'putXXX' APIs don't + * increment count). + */ + public final void addElementsAppended(int num) { + elementsAppended += num; + } + /** * Marks this column as being constant. */ public final void setIsConstant() { isConstant = true; } + /** + * Marks this column only contains null values. + */ + public final void setAllNull() { + isAllNull = true; + } + + /** + * Whether this column only contains null values. + */ + public final boolean isAllNull() { + return isAllNull; + } + /** * Maximum number of rows that can be stored in this column. */ @@ -745,6 +781,12 @@ public WritableColumnVector arrayData() { */ protected boolean isConstant; + /** + * True if this column only contains nulls. This means the column values never change, even + * across resets. Comparing to 'isConstant' above, this doesn't require any allocation of space. + */ + protected boolean isAllNull; + /** * Default size of each array length value. This grows as necessary. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 18876dedb951..44dc145d36e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -45,6 +45,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector} import org.apache.spark.sql.internal.SQLConf @@ -173,8 +174,8 @@ class ParquetFileFormat override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { val conf = sparkSession.sessionState.conf conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled && - schema.length <= conf.wholeStageMaxNumFields && - schema.forall(_.dataType.isInstanceOf[AtomicType]) + ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && + !WholeStageCodegenExec.isTooManyFields(conf, schema) } override def vectorTypes( @@ -240,8 +241,7 @@ class ParquetFileFormat val sqlConf = sparkSession.sessionState.conf val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled val enableVectorizedReader: Boolean = - sqlConf.parquetVectorizedReaderEnabled && - resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema) val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion val capacity = sqlConf.parquetVectorizedReaderBatchSize diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 34a4eb8c002d..0e065f19a88a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.util.Locale + import org.apache.hadoop.conf.Configuration import org.apache.parquet.io.{ColumnIO, ColumnIOFactory, GroupColumnIO, PrimitiveColumnIO} import org.apache.parquet.schema._ @@ -92,10 +94,16 @@ class ParquetToSparkSchemaConverter( private def convertInternal( groupColumn: GroupColumnIO, sparkReadSchema: Option[StructType] = None): ParquetColumn = { + // First convert the read schema into a map from field name to the field itself, to avoid O(n) + // lookup cost below. + val schemaMapOpt = sparkReadSchema.map { schema => + schema.map(f => normalizeFieldName(f.name) -> f).toMap + } + val converted = (0 until groupColumn.getChildrenCount).map { i => val field = groupColumn.getChild(i) - val fieldFromReadSchema = sparkReadSchema.flatMap { schema => - schema.find(f => isSameFieldName(f.name, field.getName, caseSensitive)) + val fieldFromReadSchema = schemaMapOpt.flatMap { schemaMap => + schemaMap.get(normalizeFieldName(field.getName)) } var fieldReadType = fieldFromReadSchema.map(_.dataType) @@ -146,9 +154,8 @@ class ParquetToSparkSchemaConverter( ParquetColumn(StructType(converted.map(_._1)), groupColumn, converted.map(_._2)) } - private def isSameFieldName(left: String, right: String, caseSensitive: Boolean): Boolean = - if (!caseSensitive) left.equalsIgnoreCase(right) - else left == right + private def normalizeFieldName(name: String): String = + if (caseSensitive) name else name.toLowerCase(Locale.ROOT) /** * Converts a Parquet [[Type]] to a [[ParquetColumn]] which wraps a Spark SQL [[DataType]] with diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index 2c565c8890e7..9f2e6580ecb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -36,8 +36,9 @@ import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min} import org.apache.spark.sql.execution.datasources.AggregatePushDownUtils import org.apache.spark.sql.execution.datasources.v2.V2ColumnUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.{LegacyBehaviorPolicy, PARQUET_AGGREGATE_PUSHDOWN_ENABLED} -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, MapType, StructField, StructType} object ParquetUtils { def inferSchema( @@ -187,6 +188,30 @@ object ParquetUtils { } } + /** + * Whether columnar read is supported for the input `schema`. + */ + def isBatchReadSupportedForSchema(sqlConf: SQLConf, schema: StructType): Boolean = + sqlConf.parquetVectorizedReaderEnabled && + schema.forall(f => isBatchReadSupported(sqlConf, f.dataType)) + + def isBatchReadSupported(sqlConf: SQLConf, dt: DataType): Boolean = dt match { + case _: AtomicType => + true + case at: ArrayType => + sqlConf.parquetVectorizedReaderNestedColumnEnabled && + isBatchReadSupported(sqlConf, at.elementType) + case mt: MapType => + sqlConf.parquetVectorizedReaderNestedColumnEnabled && + isBatchReadSupported(sqlConf, mt.keyType) && + isBatchReadSupported(sqlConf, mt.valueType) + case st: StructType => + sqlConf.parquetVectorizedReaderNestedColumnEnabled && + st.fields.forall(f => isBatchReadSupported(sqlConf, f.dataType)) + case _ => + false + } + /** * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, we don't need to * createRowBaseReader to read data from Parquet and aggregate at Spark layer. Instead we want diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 12b8a631196a..ea4f5e0d287a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -70,8 +70,8 @@ case class ParquetPartitionReaderFactory( private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val resultSchema = StructType(partitionSchema.fields ++ readDataSchema.fields) private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled - private val enableVectorizedReader: Boolean = sqlConf.parquetVectorizedReaderEnabled && - resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + private val enableVectorizedReader: Boolean = + ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema) private val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled private val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion private val capacity = sqlConf.parquetVectorizedReaderBatchSize diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index f5e5b46d29ce..f98fb1eb2a57 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -1125,7 +1125,8 @@ struct -- !query output == Physical Plan == *Filter v#x IN ([a],null) -+- FileScan parquet default.t[v#x] Batched: false, DataFilters: [v#x IN ([a],null)], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/t], PartitionFilters: [], PushedFilters: [In(v, [[a],null])], ReadSchema: struct> ++- *ColumnarToRow + +- FileScan parquet default.t[v#x] Batched: true, DataFilters: [v#x IN ([a],null)], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/t], PartitionFilters: [], PushedFilters: [In(v, [[a],null])], ReadSchema: struct> -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index 4e552d51a395..a563eda1e7b0 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -1067,7 +1067,8 @@ struct -- !query output == Physical Plan == *Filter v#x IN ([a],null) -+- FileScan parquet default.t[v#x] Batched: false, DataFilters: [v#x IN ([a],null)], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/t], PartitionFilters: [], PushedFilters: [In(v, [[a],null])], ReadSchema: struct> ++- *ColumnarToRow + +- FileScan parquet default.t[v#x] Batched: true, DataFilters: [v#x IN ([a],null)], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/t], PartitionFilters: [], PushedFilters: [In(v, [[a],null])], ReadSchema: struct> -- !query diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala index c2dc20b0099a..a154a6566307 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileBasedDataSourceTest.scala @@ -38,6 +38,8 @@ private[sql] trait FileBasedDataSourceTest extends SQLTestUtils { protected val dataSourceName: String // The SQL config key for enabling vectorized reader. protected val vectorizedReaderEnabledKey: String + // The SQL config key for enabling vectorized reader for nested types. + protected val vectorizedReaderNestedEnabledKey: String /** * Reads data source file from given `path` as `DataFrame` and passes it to given function. @@ -52,8 +54,11 @@ private[sql] trait FileBasedDataSourceTest extends SQLTestUtils { f(spark.read.format(dataSourceName).load(path.toString)) } if (testVectorized) { - withSQLConf(vectorizedReaderEnabledKey -> "true") { - f(spark.read.format(dataSourceName).load(path.toString)) + Seq(true, false).foreach { enableNested => + withSQLConf(vectorizedReaderEnabledKey -> "true", + vectorizedReaderNestedEnabledKey -> enableNested.toString) { + f(spark.read.format(dataSourceName).load(path)) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala index c36bfd936246..46a7f8d3d90d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala @@ -56,6 +56,8 @@ trait OrcTest extends QueryTest with FileBasedDataSourceTest with BeforeAndAfter override protected val dataSourceName: String = "orc" override protected val vectorizedReaderEnabledKey: String = SQLConf.ORC_VECTORIZED_READER_ENABLED.key + override protected val vectorizedReaderNestedEnabledKey: String = + SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key protected override def beforeAll(): Unit = { super.beforeAll() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1SchemaPruningSuite.scala index 2ce38dae47db..4d33eacecc13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV1SchemaPruningSuite.scala @@ -25,6 +25,8 @@ class OrcV1SchemaPruningSuite extends SchemaPruningSuite { override protected val dataSourceName: String = "orc" override protected val vectorizedReaderEnabledKey: String = SQLConf.ORC_VECTORIZED_READER_ENABLED.key + override protected val vectorizedReaderNestedEnabledKey: String = + SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key override protected def sparkConf: SparkConf = super diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala index 47254f4231d5..107a2b791202 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcV2SchemaPruningSuite.scala @@ -29,6 +29,8 @@ class OrcV2SchemaPruningSuite extends SchemaPruningSuite with AdaptiveSparkPlanH override protected val dataSourceName: String = "orc" override protected val vectorizedReaderEnabledKey: String = SQLConf.ORC_VECTORIZED_READER_ENABLED.key + override protected val vectorizedReaderNestedEnabledKey: String = + SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key override protected def sparkConf: SparkConf = super diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala index bdcc1a4a5b95..64bfcdadcf45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnIndexSuite.scala @@ -102,4 +102,17 @@ class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSpar }.toDF() checkUnalignedPages(df)(actions: _*) } + + test("reading unaligned pages - struct type") { + val df = (0 until 2000).map(i => Tuple1((i.toLong, i + ":" + "o" * (i / 100)))).toDF("s") + checkUnalignedPages(df)( + df => df.filter("s._1 = 500"), + df => df.filter("s._1 = 500 or s._1 = 1500"), + df => df.filter("s._1 = 500 or s._1 = 501 or s._1 = 1500"), + df => df.filter("s._1 = 500 or s._1 = 501 or s._1 = 1000 or s._1 = 1500"), + // range filter + df => df.filter("s._1 >= 500 and s._1 < 1000"), + df => df.filter("(s._1 >= 500 and s._1 < 1000) or (s._1 >= 1500 and s._1 < 1600)") + ) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala index 4eab5c3a0927..a9a8dacc374f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala @@ -89,6 +89,43 @@ abstract class ParquetFileFormatSuite } } } + + test("support batch reads for schema") { + val testUDT = new TestUDT.MyDenseVectorUDT + Seq(true, false).foreach { enabled => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> enabled.toString) { + Seq( + Seq(StructField("f1", IntegerType), StructField("f2", BooleanType)) -> true, + Seq(StructField("f1", IntegerType), StructField("f2", ArrayType(IntegerType))) -> enabled, + Seq(StructField("f1", BooleanType), StructField("f2", testUDT)) -> false + ).foreach { case (schema, expected) => + assert(ParquetUtils.isBatchReadSupportedForSchema(conf, StructType(schema)) == expected) + } + } + } + } + + test("support batch reads for data type") { + val testUDT = new TestUDT.MyDenseVectorUDT + Seq(true, false).foreach { enabled => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> enabled.toString) { + Seq( + IntegerType -> true, + BooleanType -> true, + ArrayType(TimestampType) -> enabled, + StructType(Seq(StructField("f1", DecimalType.SYSTEM_DEFAULT), + StructField("f2", StringType))) -> enabled, + MapType(keyType = LongType, valueType = DateType) -> enabled, + testUDT -> false, + ArrayType(testUDT) -> false, + StructType(Seq(StructField("f1", ByteType), StructField("f2", testUDT))) -> false, + MapType(keyType = testUDT, valueType = BinaryType) -> false + ).foreach { case (dt, expected) => + assert(ParquetUtils.isBatchReadSupported(conf, dt) == expected) + } + } + } + } } class ParquetFileFormatV1Suite extends ParquetFileFormatSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 99b2d9844ed1..4d01db999fba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -358,6 +358,357 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } + test("vectorized reader: array") { + val data = Seq( + Tuple1(null), + Tuple1(Seq()), + Tuple1(Seq("a", "b", "c")), + Tuple1(Seq(null)) + ) + + withParquetFile(data) { file => + readParquetFile(file) { df => + checkAnswer(df.sort("_1"), + Row(null) :: Row(Seq()) :: Row(Seq(null)) :: Row(Seq("a", "b", "c")) :: Nil + ) + } + } + } + + test("vectorized reader: missing array") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { + val data = Seq( + Tuple1(null), + Tuple1(Seq()), + Tuple1(Seq("a", "b", "c")), + Tuple1(Seq(null)) + ) + + val readSchema = new StructType().add("_2", new ArrayType( + new StructType().add("a", LongType, nullable = true), + containsNull = true) + ) + + withParquetFile(data) { file => + checkAnswer(spark.read.schema(readSchema).parquet(file), + Row(null) :: Row(null) :: Row(null) :: Row(null) :: Nil + ) + } + } + } + + test("vectorized reader: array of array") { + val data = Seq( + Tuple1(Seq(Seq(0, 1), Seq(2, 3))), + Tuple1(Seq(Seq(4, 5), Seq(6, 7))) + ) + + withParquetFile(data) { file => + readParquetFile(file) { df => + checkAnswer(df.sort("_1"), + Row(Seq(Seq(0, 1), Seq(2, 3))) :: Row(Seq(Seq(4, 5), Seq(6, 7))) :: Nil + ) + } + } + } + + test("vectorized reader: struct of array") { + val data = Seq( + Tuple1(Tuple2("a", null)), + Tuple1(null), + Tuple1(Tuple2(null, null)), + Tuple1(Tuple2(null, Seq("b", "c"))), + Tuple1(Tuple2("d", Seq("e", "f"))), + Tuple1(null) + ) + + withParquetFile(data) { file => + readParquetFile(file) { df => + checkAnswer(df, + Row(Row("a", null)) :: Row(null) :: Row(Row(null, null)) :: + Row(Row(null, Seq("b", "c"))) :: Row(Row("d", Seq("e", "f"))) :: Row(null) :: Nil + ) + } + } + } + + test("vectorized reader: array of struct") { + val data = Seq( + Tuple1(null), + Tuple1(Seq()), + Tuple1(Seq(Tuple2("a", null), Tuple2(null, "b"))), + Tuple1(Seq(null)), + Tuple1(Seq(Tuple2(null, null), Tuple2("c", null), null)), + Tuple1(Seq()) + ) + + withParquetFile(data) { file => + readParquetFile(file) { df => + checkAnswer(df, + Row(null) :: + Row(Seq()) :: + Row(Seq(Row("a", null), Row(null, "b"))) :: + Row(Seq(null)) :: + Row(Seq(Row(null, null), Row("c", null), null)) :: + Row(Seq()) :: + Nil) + } + } + } + + + test("vectorized reader: array of nested struct") { + val data = Seq( + Tuple1(Tuple2("a", null)), + Tuple1(Tuple2("b", Seq(Tuple2("c", "d")))), + Tuple1(null), + Tuple1(Tuple2("e", Seq(Tuple2("f", null), Tuple2(null, "g")))), + Tuple1(Tuple2(null, null)), + Tuple1(Tuple2(null, Seq(null))), + Tuple1(Tuple2(null, Seq(Tuple2(null, null), Tuple2("h", null), null))), + Tuple1(Tuple2("i", Seq())), + Tuple1(null) + ) + + withParquetFile(data) { file => + readParquetFile(file) { df => + checkAnswer(df, + Row(Row("a", null)) :: + Row(Row("b", Seq(Row("c", "d")))) :: + Row(null) :: + Row(Row("e", Seq(Row("f", null), Row(null, "g")))) :: + Row(Row(null, null)) :: + Row(Row(null, Seq(null))) :: + Row(Row(null, Seq(Row(null, null), Row("h", null), null))) :: + Row(Row("i", Seq())) :: + Row(null) :: + Nil) + } + } + } + + test("vectorized reader: required array with required elements") { + Seq(true, false).foreach { dictionaryEnabled => + def makeRawParquetFile(path: Path, expected: Seq[Seq[String]]): Unit = { + val schemaStr = + """message spark_schema { + | required group _1 (LIST) { + | repeated group list { + | required binary element (UTF8); + | } + | } + |} + """.stripMargin + val schema = MessageTypeParser.parseMessageType(schemaStr) + val writer = createParquetWriter(schema, path, dictionaryEnabled) + + val factory = new SimpleGroupFactory(schema) + expected.foreach { values => + val group = factory.newGroup() + val list = group.addGroup(0) + values.foreach { value => + list.addGroup(0).append("element", value) + } + writer.write(group) + } + writer.close() + } + + // write the following into the Parquet file: + // 0: [ "a", "b" ] + // 1: [ ] + // 2: [ "c", "d" ] + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + val expected = Seq(Seq("a", "b"), Seq(), Seq("c", "d")) + makeRawParquetFile(path, expected) + readParquetFile(path.toString) { df => checkAnswer(df, expected.map(Row(_))) } + } + } + } + + test("vectorized reader: optional array with required elements") { + Seq(true, false).foreach { dictionaryEnabled => + def makeRawParquetFile(path: Path, expected: Seq[Seq[String]]): Unit = { + val schemaStr = + """message spark_schema { + | optional group _1 (LIST) { + | repeated group list { + | required binary element (UTF8); + | } + | } + |} + """.stripMargin + val schema = MessageTypeParser.parseMessageType(schemaStr) + val writer = createParquetWriter(schema, path, dictionaryEnabled) + + val factory = new SimpleGroupFactory(schema) + expected.foreach { values => + val group = factory.newGroup() + if (values != null) { + val list = group.addGroup(0) + values.foreach { value => + list.addGroup(0).append("element", value) + } + } + writer.write(group) + } + writer.close() + } + + // write the following into the Parquet file: + // 0: [ "a", "b" ] + // 1: null + // 2: [ "c", "d" ] + // 3: [ ] + // 4: [ "e", "f" ] + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + val expected = Seq(Seq("a", "b"), null, Seq("c", "d"), Seq(), Seq("e", "f")) + makeRawParquetFile(path, expected) + readParquetFile(path.toString) { df => checkAnswer(df, expected.map(Row(_))) } + } + } + } + + test("vectorized reader: required array with optional elements") { + Seq(true, false).foreach { dictionaryEnabled => + def makeRawParquetFile(path: Path, expected: Seq[Seq[String]]): Unit = { + val schemaStr = + """message spark_schema { + | required group _1 (LIST) { + | repeated group list { + | optional binary element (UTF8); + | } + | } + |} + """.stripMargin + val schema = MessageTypeParser.parseMessageType(schemaStr) + val writer = createParquetWriter(schema, path, dictionaryEnabled) + + val factory = new SimpleGroupFactory(schema) + expected.foreach { values => + val group = factory.newGroup() + if (values != null) { + val list = group.addGroup(0) + values.foreach { value => + val group = list.addGroup(0) + if (value != null) group.append("element", value) + } + } + writer.write(group) + } + writer.close() + } + + // write the following into the Parquet file: + // 0: [ "a", null ] + // 3: [ ] + // 4: [ null, "b" ] + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + val expected = Seq(Seq("a", null), Seq(), Seq(null, "b")) + makeRawParquetFile(path, expected) + readParquetFile(path.toString) { df => checkAnswer(df, expected.map(Row(_))) } + } + } + } + + test("vectorized reader: required array with legacy format") { + Seq(true, false).foreach { dictionaryEnabled => + def makeRawParquetFile(path: Path, expected: Seq[Seq[String]]): Unit = { + val schemaStr = + """message spark_schema { + | repeated binary element (UTF8); + |} + """.stripMargin + val schema = MessageTypeParser.parseMessageType(schemaStr) + val writer = createParquetWriter(schema, path, dictionaryEnabled) + + val factory = new SimpleGroupFactory(schema) + expected.foreach { values => + val group = factory.newGroup() + values.foreach(group.append("element", _)) + writer.write(group) + } + writer.close() + } + + // write the following into the Parquet file: + // 0: [ "a", "b" ] + // 3: [ ] + // 4: [ "c", "d" ] + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + val expected = Seq(Seq("a", "b"), Seq(), Seq("c", "d")) + makeRawParquetFile(path, expected) + readParquetFile(path.toString) { df => checkAnswer(df, expected.map(Row(_))) } + } + } + } + + test("vectorized reader: struct") { + val data = Seq( + Tuple1(null), + Tuple1((1, "a")), + Tuple1((2, null)), + Tuple1((3, "b")), + Tuple1(null) + ) + + withParquetFile(data) { file => + readParquetFile(file) { df => + checkAnswer(df.sort("_1"), + Row(null) :: Row(null) :: Row(Row(1, "a")) :: Row(Row(2, null)) :: Row(Row(3, "b")) :: Nil + ) + } + } + } + + test("vectorized reader: missing all struct fields") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { + val data = Seq( + Tuple1((1, "a")), + Tuple1((2, null)), + Tuple1(null) + ) + + val readSchema = new StructType().add("_1", + new StructType() + .add("_3", IntegerType, nullable = true) + .add("_4", LongType, nullable = true), + nullable = true) + + withParquetFile(data) { file => + checkAnswer(spark.read.schema(readSchema).parquet(file), + Row(null) :: Row(null) :: Row(null) :: Nil + ) + } + } + } + + test("vectorized reader: missing some struct fields") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { + val data = Seq( + Tuple1((1, "a")), + Tuple1((2, null)), + Tuple1(null) + ) + + val readSchema = new StructType().add("_1", + new StructType() + .add("_1", IntegerType, nullable = true) + .add("_3", LongType, nullable = true), + nullable = true) + + withParquetFile(data) { file => + checkAnswer(spark.read.schema(readSchema).parquet(file), + Row(null) :: Row(Row(1, null)) :: Row(Row(2, null)) :: Nil + ) + } + } + } + test("SPARK-34817: Support for unsigned Parquet logical types") { val parquetSchema = MessageTypeParser.parseMessageType( """message root { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala index cab93bd96fff..6a93b72472c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala @@ -31,6 +31,8 @@ abstract class ParquetSchemaPruningSuite extends SchemaPruningSuite with Adaptiv override protected val dataSourceName: String = "parquet" override protected val vectorizedReaderEnabledKey: String = SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key + override protected val vectorizedReaderNestedEnabledKey: String = + SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index 18690844d484..9eca308f16fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -51,6 +51,8 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { override protected val dataSourceName: String = "parquet" override protected val vectorizedReaderEnabledKey: String = SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key + override protected val vectorizedReaderNestedEnabledKey: String = + SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key /** * Reads the parquet file at `path` diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorizedSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorizedSuite.scala index 36a52b60688e..94509185cc12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorizedSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorizedSuite.scala @@ -195,6 +195,290 @@ class ParquetVectorizedSuite extends QueryTest with ParquetTest with SharedSpark } } + test("nested type - single page, no column index") { + (1 to 4).foreach { batchSize => + Seq(true, false).foreach { dictionaryEnabled => + testNestedStringArrayOneLevel(None, None, Seq(4), + Seq(Seq("a", "b", "c", "d")), + Seq(0, 1, 1, 1), Seq(3, 3, 3, 3), Seq("a", "b", "c", "d"), batchSize, + dictionaryEnabled = dictionaryEnabled) + + testNestedStringArrayOneLevel(None, None, Seq(4), + Seq(Seq("a", "b"), Seq("c", "d")), + Seq(0, 1, 0, 1), Seq(3, 3, 3, 3), Seq("a", "b", "c", "d"), batchSize, + dictionaryEnabled = dictionaryEnabled) + + testNestedStringArrayOneLevel(None, None, Seq(4), + Seq(Seq("a"), Seq("b"), Seq("c"), Seq("d")), + Seq(0, 0, 0, 0), Seq(3, 3, 3, 3), Seq("a", "b", "c", "d"), batchSize, + dictionaryEnabled = dictionaryEnabled) + + testNestedStringArrayOneLevel(None, None, Seq(4), + Seq(Seq("a"), Seq(null), Seq("c"), Seq(null)), + Seq(0, 0, 0, 0), Seq(3, 2, 3, 2), Seq("a", null, "c", null), batchSize, + dictionaryEnabled = dictionaryEnabled) + + testNestedStringArrayOneLevel(None, None, Seq(4), + Seq(Seq("a"), Seq(null, null, null)), + Seq(0, 0, 1, 1), Seq(3, 2, 2, 2), Seq("a", null, null, null), batchSize, + dictionaryEnabled = dictionaryEnabled) + + testNestedStringArrayOneLevel(None, None, Seq(6), + Seq(Seq("a"), Seq(null, null, null), null, Seq()), + Seq(0, 0, 1, 1, 0, 0), Seq(3, 2, 2, 2, 0, 1), Seq("a", null, null, null, null, null), + batchSize, dictionaryEnabled = dictionaryEnabled) + + testNestedStringArrayOneLevel(None, None, Seq(8), + Seq(Seq("a"), Seq(), Seq(), null, Seq("b", null, "c"), null), + Seq(0, 0, 0, 0, 0, 1, 1, 0), Seq(3, 1, 1, 0, 3, 2, 3, 0), + Seq("a", null, null, null, "b", null, "c", null), batchSize, + dictionaryEnabled = dictionaryEnabled) + } + } + } + + test("nested type - multiple page, no column index") { + BATCH_SIZE_CONFIGS.foreach { batchSize => + Seq(Seq(2, 3, 2, 3)).foreach { pageSizes => + Seq(true, false).foreach { dictionaryEnabled => + testNestedStringArrayOneLevel(None, None, pageSizes, + Seq(Seq("a"), Seq(), Seq("b", null, "c"), Seq("d", "e"), Seq(null), Seq(), null), + Seq(0, 0, 0, 1, 1, 0, 1, 0, 0, 0), Seq(3, 1, 3, 2, 3, 3, 3, 2, 1, 0), + Seq("a", null, "b", null, "c", "d", "e", null, null, null), batchSize, + dictionaryEnabled = dictionaryEnabled) + } + } + } + } + + test("nested type - multiple page, no column index, batch span multiple pages") { + (1 to 6).foreach { batchSize => + Seq(true, false).foreach { dictionaryEnabled => + // a list across multiple pages + testNestedStringArrayOneLevel(None, None, Seq(1, 5), + Seq(Seq("a"), Seq("b", "c", "d", "e", "f")), + Seq(0, 0, 1, 1, 1, 1), Seq.fill(6)(3), Seq("a", "b", "c", "d", "e", "f"), batchSize, + dictionaryEnabled = dictionaryEnabled) + + testNestedStringArrayOneLevel(None, None, Seq(1, 3, 2), + Seq(Seq("a"), Seq("b", "c", "d"), Seq("e", "f")), + Seq(0, 0, 1, 1, 0, 1), Seq.fill(6)(3), Seq("a", "b", "c", "d", "e", "f"), batchSize, + dictionaryEnabled = dictionaryEnabled) + + testNestedStringArrayOneLevel(None, None, Seq(2, 2, 2), + Seq(Seq("a", "b"), Seq("c", "d"), Seq("e", "f")), + Seq(0, 1, 0, 1, 0, 1), Seq.fill(6)(3), Seq("a", "b", "c", "d", "e", "f"), batchSize, + dictionaryEnabled = dictionaryEnabled) + } + } + } + + test("nested type - RLE encoding") { + (1 to 8).foreach { batchSize => + Seq(Seq(26), Seq(4, 3, 11, 4, 4), Seq(18, 8)).foreach { pageSizes => + Seq(true, false).foreach { dictionaryEnabled => + testNestedStringArrayOneLevel(None, None, pageSizes, + (0 to 6).map(i => Seq(('a' + i).toChar.toString)) ++ + Seq((7 to 17).map(i => ('a' + i).toChar.toString)) ++ + (18 to 25).map(i => Seq(('a' + i).toChar.toString)), + Seq.fill(8)(0) ++ Seq.fill(10)(1) ++ Seq.fill(8)(0), Seq.fill(26)(3), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + } + } + } + } + + test("nested type - column index with ranges") { + (1 to 8).foreach { batchSize => + Seq(Seq(8), Seq(6, 2), Seq(1, 5, 2)).foreach { pageSizes => + Seq(true, false).foreach { dictionaryEnabled => + var ranges = Seq((1L, 2L)) + testNestedStringArrayOneLevel(None, Some(ranges), pageSizes, + Seq(Seq("b", "c", "d", "e", "f"), Seq("g", "h")), + Seq(0, 0, 1, 1, 1, 1, 0, 1), Seq.fill(8)(3), + Seq("a", "b", "c", "d", "e", "f", "g", "h"), + batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((3L, 5L)) + testNestedStringArrayOneLevel(None, Some(ranges), pageSizes, + Seq(), + Seq(0, 0, 1, 1, 1, 1, 0, 1), Seq.fill(8)(3), + Seq("a", "b", "c", "d", "e", "f", "g", "h"), + batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((0L, 0L)) + testNestedStringArrayOneLevel(None, Some(ranges), pageSizes, + Seq(Seq("a")), + Seq(0, 0, 1, 1, 1, 1, 0, 1), Seq.fill(8)(3), + Seq("a", "b", "c", "d", "e", "f", "g", "h"), + batchSize, dictionaryEnabled = dictionaryEnabled) + } + } + } + } + + test("nested type - column index with ranges and RLE encoding") { + BATCH_SIZE_CONFIGS.foreach { batchSize => + Seq(Seq(26), Seq(4, 3, 11, 4, 4), Seq(18, 8)).foreach { pageSizes => + Seq(true, false).foreach { dictionaryEnabled => + var ranges = Seq((0L, 2L)) + testNestedStringArrayOneLevel(None, Some(ranges), pageSizes, + Seq(Seq("a"), Seq("b"), Seq("c")), + Seq.fill(8)(0) ++ Seq.fill(10)(1) ++ Seq.fill(8)(0), Seq.fill(26)(3), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((4L, 6L)) + testNestedStringArrayOneLevel(None, Some(ranges), pageSizes, + Seq(Seq("e"), Seq("f"), Seq("g")), + Seq.fill(8)(0) ++ Seq.fill(10)(1) ++ Seq.fill(8)(0), Seq.fill(26)(3), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((6L, 9L)) + testNestedStringArrayOneLevel(None, Some(ranges), pageSizes, + Seq(Seq("g")) ++ Seq((7 to 17).map(i => ('a' + i).toChar.toString)) ++ + Seq(Seq("s"), Seq("t")), + Seq.fill(8)(0) ++ Seq.fill(10)(1) ++ Seq.fill(8)(0), Seq.fill(26)(3), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((4L, 6L), (14L, 20L)) + testNestedStringArrayOneLevel(None, Some(ranges), pageSizes, + Seq(Seq("e"), Seq("f"), Seq("g"), Seq("y"), Seq("z")), + Seq.fill(8)(0) ++ Seq.fill(10)(1) ++ Seq.fill(8)(0), Seq.fill(26)(3), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + } + } + } + } + + test("nested type - column index with ranges and nulls") { + BATCH_SIZE_CONFIGS.foreach { batchSize => + Seq(Seq(16), Seq(8, 8), Seq(4, 4, 4, 4), Seq(2, 6, 4, 4)).foreach { pageSizes => + Seq(true, false).foreach { dictionaryEnabled => + testNestedStringArrayOneLevel(None, None, pageSizes, + Seq(Seq("a", null), Seq("c", "d"), Seq(), Seq("f", null, "h"), + Seq("i", "j", "k", null), Seq(), null, null, Seq()), + Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0), + Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1), + (0 to 15), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + var ranges = Seq((0L, 15L)) + testNestedStringArrayOneLevel(None, Some(ranges), pageSizes, + Seq(Seq("a", null), Seq("c", "d"), Seq(), Seq("f", null, "h"), + Seq("i", "j", "k", null), Seq(), null, null, Seq()), + Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0), + Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1), + (0 to 15), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((0L, 2L)) + testNestedStringArrayOneLevel(None, Some(ranges), pageSizes, + Seq(Seq("a", null), Seq("c", "d"), Seq()), + Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0), + Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1), + (0 to 15), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((3L, 7L)) + testNestedStringArrayOneLevel(None, Some(ranges), pageSizes, + Seq(Seq("f", null, "h"), Seq("i", "j", "k", null), Seq(), null, null), + Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0), + Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1), + (0 to 15), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((5, 12L)) + testNestedStringArrayOneLevel(None, Some(ranges), pageSizes, + Seq(Seq(), null, null, Seq()), + Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0), + Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1), + (0 to 15), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((5, 12L)) + testNestedStringArrayOneLevel(None, Some(ranges), pageSizes, + Seq(Seq(), null, null, Seq()), + Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0), + Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1), + (0 to 15), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((0L, 0L), (2, 3), (5, 7), (8, 10)) + testNestedStringArrayOneLevel(None, Some(ranges), pageSizes, + Seq(Seq("a", null), Seq(), Seq("f", null, "h"), Seq(), null, null, Seq()), + Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0), + Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1), + (0 to 15), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + } + } + } + } + + test("nested type - column index with ranges, nulls and first row indexes") { + BATCH_SIZE_CONFIGS.foreach { batchSize => + Seq(true, false).foreach { dictionaryEnabled => + val pageSizes = Seq(4, 4, 4, 4) + var firstRowIndexes = Seq(10L, 20, 30, 40) + var ranges = Seq((0L, 5L)) + testNestedStringArrayOneLevel(Some(firstRowIndexes), Some(ranges), pageSizes, + Seq(), + Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0), + Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1), + (0 to 15), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((5L, 15)) + testNestedStringArrayOneLevel(Some(firstRowIndexes), Some(ranges), pageSizes, + Seq(Seq("a", null), Seq("c", "d")), + Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0), + Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1), + (0 to 15), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((25, 28)) + testNestedStringArrayOneLevel(Some(firstRowIndexes), Some(ranges), pageSizes, + Seq(), + Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0), + Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1), + (0 to 15), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((35, 45)) + testNestedStringArrayOneLevel(Some(firstRowIndexes), Some(ranges), pageSizes, + Seq(Seq(), null, null, Seq()), + Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0), + Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1), + (0 to 15), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((45, 55)) + testNestedStringArrayOneLevel(Some(firstRowIndexes), Some(ranges), pageSizes, + Seq(), + Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0), + Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1), + (0 to 15), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((45, 55)) + testNestedStringArrayOneLevel(Some(firstRowIndexes), Some(ranges), pageSizes, + Seq(), + Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0), + Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1), + (0 to 15), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + + ranges = Seq((15, 29), (31, 35)) + testNestedStringArrayOneLevel(Some(firstRowIndexes), Some(ranges), pageSizes, + Seq(Seq(), Seq("f", null, "h")), + Seq(0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0), + Seq(3, 2, 3, 3, 1, 3, 2, 3, 3, 3, 3, 2, 1, 0, 0, 1), + (0 to 15), + batchSize = batchSize, dictionaryEnabled = dictionaryEnabled) + } + } + } + private def testPrimitiveString( firstRowIndexesOpt: Option[Seq[Long]], rangesOpt: Option[Seq[(Long, Long)]], @@ -236,6 +520,52 @@ class ParquetVectorizedSuite extends QueryTest with ParquetTest with SharedSpark rangesOpt), expectedValues.map(i => Row(i)), batchSize) } + private def testNestedStringArrayOneLevel( + firstRowIndexesOpt: Option[Seq[Long]], + rangesOpt: Option[Seq[(Long, Long)]], + pageSizes: Seq[Int], + expected: Seq[Seq[String]], + rls: Seq[Int], + dls: Seq[Int], + values: Seq[String] = VALUES, + batchSize: Int, + dictionaryEnabled: Boolean = false): Unit = { + assert(pageSizes.sum == rls.length && rls.length == dls.length) + firstRowIndexesOpt.foreach(a => assert(pageSizes.length == a.length)) + + val parquetSchema = MessageTypeParser.parseMessageType( + s"""message root { + | optional group _1 (LIST) { + | repeated group list { + | optional binary a(UTF8); + | } + | } + |} + |""".stripMargin + ) + + val maxRepLevel = 1 + val maxDefLevel = 3 + val ty = parquetSchema.getType("_1", "list", "a").asPrimitiveType() + val cd = new ColumnDescriptor(Seq("_1", "list", "a").toArray, ty, maxRepLevel, maxDefLevel) + + var i = 0 + var numRows = 0 + val memPageStore = new MemPageStore(expected.length) + val pageFirstRowIndexes = ArrayBuffer.empty[Long] + pageSizes.foreach { size => + pageFirstRowIndexes += numRows + numRows += rls.slice(i, i + size).count(_ == 0) + writeDataPage(cd, memPageStore, rls.slice(i, i + size), dls.slice(i, i + size), + values.slice(i, i + size), maxDefLevel, dictionaryEnabled) + i += size + } + + checkAnswer(expected.length, parquetSchema, + TestPageReadStore(memPageStore, firstRowIndexesOpt.getOrElse(pageFirstRowIndexes).toSeq, + rangesOpt), expected.map(i => Row(i)), batchSize) + } + /** * Write a single data page using repetition levels, definition levels and values provided. *