-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-36645][SQL] Aggregate (Min/Max/Count) push down for Parquet #33639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
02af3fa
a03b960
2d5aeb1
0cf2180
d3afeb1
8deeaaf
4e3c69a
c5f8c49
eafbe1f
4417fc4
8b515ed
9ebe889
0aadd50
a63e34c
358d1ce
9e7560b
df6ca86
51ef0f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,11 +16,28 @@ | |
| */ | ||
| package org.apache.spark.sql.execution.datasources.parquet | ||
|
|
||
| import java.util | ||
|
|
||
| import scala.collection.mutable | ||
| import scala.language.existentials | ||
|
|
||
| import org.apache.hadoop.fs.{FileStatus, Path} | ||
| import org.apache.parquet.hadoop.ParquetFileWriter | ||
| import org.apache.parquet.hadoop.metadata.{ColumnChunkMetaData, ParquetMetadata} | ||
| import org.apache.parquet.io.api.Binary | ||
| import org.apache.parquet.schema.{PrimitiveType, Types} | ||
| import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName | ||
|
|
||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min} | ||
| import org.apache.spark.sql.execution.RowToColumnConverter | ||
| import org.apache.spark.sql.execution.datasources.PartitioningUtils | ||
| import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector} | ||
| import org.apache.spark.sql.internal.SQLConf.{LegacyBehaviorPolicy, PARQUET_AGGREGATE_PUSHDOWN_ENABLED} | ||
| import org.apache.spark.sql.types.StructType | ||
| import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} | ||
|
|
||
| object ParquetUtils { | ||
| def inferSchema( | ||
|
|
@@ -127,4 +144,214 @@ object ParquetUtils { | |
| file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE || | ||
| file.getName == ParquetFileWriter.PARQUET_METADATA_FILE | ||
| } | ||
|
|
||
| /** | ||
| * When the partial aggregates (Max/Min/Count) are pushed down to Parquet, we don't need to | ||
| * createRowBaseReader to read data from Parquet and aggregate at Spark layer. Instead we want | ||
| * to get the partial aggregates (Max/Min/Count) result using the statistics information | ||
| * from Parquet footer file, and then construct an InternalRow from these aggregate results. | ||
| * | ||
| * @return Aggregate results in the format of InternalRow | ||
| */ | ||
| private[sql] def createAggInternalRowFromFooter( | ||
| footer: ParquetMetadata, | ||
| filePath: String, | ||
| dataSchema: StructType, | ||
| partitionSchema: StructType, | ||
| aggregation: Aggregation, | ||
| aggSchema: StructType, | ||
| datetimeRebaseMode: LegacyBehaviorPolicy.Value, | ||
| isCaseSensitive: Boolean): InternalRow = { | ||
| val (primitiveTypes, values) = getPushedDownAggResult( | ||
| footer, filePath, dataSchema, partitionSchema, aggregation, isCaseSensitive) | ||
|
|
||
| val builder = Types.buildMessage | ||
| primitiveTypes.foreach(t => builder.addField(t)) | ||
| val parquetSchema = builder.named("root") | ||
|
|
||
| val schemaConverter = new ParquetToSparkSchemaConverter | ||
| val converter = new ParquetRowConverter(schemaConverter, parquetSchema, aggSchema, | ||
| None, datetimeRebaseMode, LegacyBehaviorPolicy.CORRECTED, NoopUpdater) | ||
| val primitiveTypeNames = primitiveTypes.map(_.getPrimitiveTypeName) | ||
| primitiveTypeNames.zipWithIndex.foreach { | ||
| case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) => | ||
| val v = values(i).asInstanceOf[Boolean] | ||
| converter.getConverter(i).asPrimitiveConverter.addBoolean(v) | ||
| case (PrimitiveType.PrimitiveTypeName.INT32, i) => | ||
| val v = values(i).asInstanceOf[Integer] | ||
| converter.getConverter(i).asPrimitiveConverter.addInt(v) | ||
| case (PrimitiveType.PrimitiveTypeName.INT64, i) => | ||
| val v = values(i).asInstanceOf[Long] | ||
| converter.getConverter(i).asPrimitiveConverter.addLong(v) | ||
| case (PrimitiveType.PrimitiveTypeName.FLOAT, i) => | ||
| val v = values(i).asInstanceOf[Float] | ||
| converter.getConverter(i).asPrimitiveConverter.addFloat(v) | ||
| case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) => | ||
| val v = values(i).asInstanceOf[Double] | ||
| converter.getConverter(i).asPrimitiveConverter.addDouble(v) | ||
| case (PrimitiveType.PrimitiveTypeName.BINARY, i) => | ||
| val v = values(i).asInstanceOf[Binary] | ||
| converter.getConverter(i).asPrimitiveConverter.addBinary(v) | ||
| case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) => | ||
| val v = values(i).asInstanceOf[Binary] | ||
| converter.getConverter(i).asPrimitiveConverter.addBinary(v) | ||
| case (_, i) => | ||
| throw new SparkException("Unexpected parquet type name: " + primitiveTypeNames(i)) | ||
| } | ||
| converter.currentRecord | ||
| } | ||
|
|
||
| /** | ||
| * When the aggregates (Max/Min/Count) are pushed down to Parquet, in the case of | ||
| * PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need buildColumnarReader | ||
| * to read data from Parquet and aggregate at Spark layer. Instead we want | ||
| * to get the aggregates (Max/Min/Count) result using the statistics information | ||
| * from Parquet footer file, and then construct a ColumnarBatch from these aggregate results. | ||
| * | ||
| * @return Aggregate results in the format of ColumnarBatch | ||
| */ | ||
| private[sql] def createAggColumnarBatchFromFooter( | ||
| footer: ParquetMetadata, | ||
| filePath: String, | ||
| dataSchema: StructType, | ||
| partitionSchema: StructType, | ||
| aggregation: Aggregation, | ||
| aggSchema: StructType, | ||
| offHeap: Boolean, | ||
| datetimeRebaseMode: LegacyBehaviorPolicy.Value, | ||
| isCaseSensitive: Boolean): ColumnarBatch = { | ||
| val row = createAggInternalRowFromFooter( | ||
| footer, | ||
| filePath, | ||
| dataSchema, | ||
| partitionSchema, | ||
| aggregation, | ||
| aggSchema, | ||
| datetimeRebaseMode, | ||
| isCaseSensitive) | ||
| val converter = new RowToColumnConverter(aggSchema) | ||
| val columnVectors = if (offHeap) { | ||
| OffHeapColumnVector.allocateColumns(1, aggSchema) | ||
| } else { | ||
| OnHeapColumnVector.allocateColumns(1, aggSchema) | ||
| } | ||
| converter.convert(row, columnVectors.toArray) | ||
| new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1) | ||
| } | ||
|
|
||
| /** | ||
| * Calculate the pushed down aggregates (Max/Min/Count) result using the statistics | ||
| * information from Parquet footer file. | ||
| * | ||
| * @return A tuple of `Array[PrimitiveType]` and Array[Any]. | ||
| * The first element is the Parquet PrimitiveType of the aggregate column, | ||
| * and the second element is the aggregated value. | ||
| */ | ||
| private[sql] def getPushedDownAggResult( | ||
| footer: ParquetMetadata, | ||
| filePath: String, | ||
| dataSchema: StructType, | ||
| partitionSchema: StructType, | ||
| aggregation: Aggregation, | ||
| isCaseSensitive: Boolean) | ||
| : (Array[PrimitiveType], Array[Any]) = { | ||
| val footerFileMetaData = footer.getFileMetaData | ||
| val fields = footerFileMetaData.getSchema.getFields | ||
| val blocks = footer.getBlocks | ||
| val primitiveTypeBuilder = mutable.ArrayBuilder.make[PrimitiveType] | ||
| val valuesBuilder = mutable.ArrayBuilder.make[Any] | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should assert
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added. Thanks |
||
| assert(aggregation.groupByColumns.length == 0, "group by shouldn't be pushed down") | ||
| aggregation.aggregateExpressions.foreach { agg => | ||
| var value: Any = None | ||
| var rowCount = 0L | ||
| var isCount = false | ||
| var index = 0 | ||
| var schemaName = "" | ||
| blocks.forEach { block => | ||
| val blockMetaData = block.getColumns | ||
| agg match { | ||
| case max: Max => | ||
| val colName = max.column.fieldNames.head | ||
| index = dataSchema.fieldNames.toList.indexOf(colName) | ||
|
||
| schemaName = "max(" + colName + ")" | ||
| val currentMax = getCurrentBlockMaxOrMin(filePath, blockMetaData, index, true) | ||
| if (value == None || currentMax.asInstanceOf[Comparable[Any]].compareTo(value) > 0) { | ||
| value = currentMax | ||
| } | ||
| case min: Min => | ||
| val colName = min.column.fieldNames.head | ||
| index = dataSchema.fieldNames.toList.indexOf(colName) | ||
| schemaName = "min(" + colName + ")" | ||
| val currentMin = getCurrentBlockMaxOrMin(filePath, blockMetaData, index, false) | ||
| if (value == None || currentMin.asInstanceOf[Comparable[Any]].compareTo(value) < 0) { | ||
| value = currentMin | ||
| } | ||
| case count: Count => | ||
| schemaName = "count(" + count.column.fieldNames.head + ")" | ||
| rowCount += block.getRowCount | ||
| var isPartitionCol = false | ||
| if (partitionSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)) | ||
|
Comment on lines
+290
to
+294
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the partition column also stored in Parquet file?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for count(column), I actually get the |
||
| .toSet.contains(count.column.fieldNames.head)) { | ||
| isPartitionCol = true | ||
| } | ||
| isCount = true | ||
| if (!isPartitionCol) { | ||
| index = dataSchema.fieldNames.toList.indexOf(count.column.fieldNames.head) | ||
| // Count(*) includes the null values, but Count(colName) doesn't. | ||
| rowCount -= getNumNulls(filePath, blockMetaData, index) | ||
| } | ||
| case _: CountStar => | ||
| schemaName = "count(*)" | ||
| rowCount += block.getRowCount | ||
| isCount = true | ||
| case _ => | ||
| } | ||
| } | ||
| if (isCount) { | ||
| valuesBuilder += rowCount | ||
| primitiveTypeBuilder += Types.required(PrimitiveTypeName.INT64).named(schemaName); | ||
| } else { | ||
| valuesBuilder += value | ||
| val field = fields.get(index) | ||
| primitiveTypeBuilder += Types.required(field.asPrimitiveType.getPrimitiveTypeName) | ||
| .as(field.getLogicalTypeAnnotation) | ||
| .length(field.asPrimitiveType.getTypeLength) | ||
| .named(schemaName) | ||
| } | ||
| } | ||
| (primitiveTypeBuilder.result, valuesBuilder.result) | ||
| } | ||
|
|
||
| /** | ||
| * Get the Max or Min value for ith column in the current block | ||
| * | ||
| * @return the Max or Min value | ||
| */ | ||
| private def getCurrentBlockMaxOrMin( | ||
| filePath: String, | ||
| columnChunkMetaData: util.List[ColumnChunkMetaData], | ||
| i: Int, | ||
| isMax: Boolean): Any = { | ||
| val statistics = columnChunkMetaData.get(i).getStatistics | ||
| if (!statistics.hasNonNullValue) { | ||
| throw new UnsupportedOperationException(s"No min/max found for Parquet file $filePath. " + | ||
| s"Set SQLConf ${PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key} to false and execute again") | ||
| } else { | ||
| if (isMax) statistics.genericGetMax else statistics.genericGetMin | ||
| } | ||
| } | ||
|
|
||
| private def getNumNulls( | ||
| filePath: String, | ||
| columnChunkMetaData: util.List[ColumnChunkMetaData], | ||
| i: Int): Long = { | ||
| val statistics = columnChunkMetaData.get(i).getStatistics | ||
| if (!statistics.isNumNullsSet) { | ||
| throw new UnsupportedOperationException(s"Number of nulls not set for Parquet file" + | ||
| s" $filePath. Set SQLConf ${PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key} to false and execute" + | ||
| s" again") | ||
| } | ||
| statistics.getNumNulls; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.