Skip to content

Conversation

@c21
Copy link
Contributor

@c21 c21 commented Oct 16, 2021

What changes were proposed in this pull request?

This PR is to add aggregate push down feature for ORC data source v2 reader.

At a high level, the PR does:

  • The supported aggregate expression is MIN/MAX/COUNT same as Parquet aggregate push down.
  • BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, DateType are allowed in MIN/MAXX aggregate push down. All other columns types are not allowed in MIN/MAX aggregate push down.
  • All columns types are supported in COUNT aggregate push down.
  • Nested column's sub-fields are disallowed in aggregate push down.
  • If the file does not have valid statistics, Spark will throw exception and fail query.
  • If aggregate has filter or group-by column, aggregate will not be pushed down.

At code level, the PR does:

  • OrcScanBuilder: pushAggregation() checks whether the aggregation can be pushed down. The most checking logic is shared between Parquet and ORC, extracted into AggregatePushDownUtils.getSchemaForPushedAggregation(). OrcScanBuilder will create a OrcScan with aggregation and aggregation data schema.
  • OrcScan: createReaderFactory creates a ORC reader factory with aggregation and schema. Similar change with ParquetScan.
  • OrcPartitionReaderFactory: buildReaderWithAggregates creates a ORC reader with aggregate push down (i.e. read ORC file footer to process columns statistics, instead of reading actual data in the file). buildColumnarReaderWithAggregates creates a columnar ORC reader similarly. Both delegate the real work to read footer in OrcUtils.createAggInternalRowFromFooter.
  • OrcUtils.createAggInternalRowFromFooter: reads ORC file footer to process columns statistics (real heavy lift happens here). Similar to ParquetUtils.createAggInternalRowFromFooter. Leverage utility method such as OrcFooterReader.readStatistics.
  • OrcFooterReader: readStatistics reads the ORC ColumnStatistics[] into Spark OrcColumnStatistics. The transformation is needed here, because ORC ColumnStatistics[] stores all columns statistics in a flatten array style, and hard to process. Spark OrcColumnStatistics stores the statistics in nested tree structure (e.g. like StructType). This is used by OrcUtils.createAggInternalRowFromFooter
  • OrcColumnStatistics: the easy-to-manipulate structure for ORC ColumnStatistics. This is used by OrcFooterReader.readStatistics.

Why are the changes needed?

To improve the performance of query with aggregate.

Does this PR introduce any user-facing change?

Yes. A user-facing config spark.sql.orc.aggregatePushdown is added to control enabling/disabling the aggregate push down for ORC. By default the feature is disabled.

How was this patch tested?

Added unit test in FileSourceAggregatePushDownSuite.scala. Refactored all unit tests in #33639, and it now works for both Parquet and ORC.

@github-actions github-actions bot added the SQL label Oct 16, 2021
@c21
Copy link
Contributor Author

c21 commented Oct 16, 2021

cc @huaxingao, @sunchao, @viirya, @cloud-fan and @dongjoon-hyun could you help take a look when you have time? Thanks!


private lazy val fieldNamesSet: Set[String] = fieldNames.toSet
private[sql] lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap
private lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reverts the change in #33639, as we don't need to make it more public.

// Not push down Timestamp because INT96 sort order is undefined,
// Parquet doesn't return statistics for INT96.
// Not push down Binary type as Parquet can truncate the statistics.
case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType | BinaryType =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding in Parquet to disallow BinaryType here. This would make the unit tests between Parquet and ORC easier, and we are discussing to disallow it. cc @huaxingao feel free to let me revert the change if it does not make sense. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Thanks for adding this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should put StringType here too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - updated.

* this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
* according to data types. This is used for aggregate push down in ORC.
*/
public class OrcColumnsStatistics {
Copy link
Member

@viirya viirya Oct 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why this is in java? This doesn't look like to be an API open to others.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No fundamental reason actually. Was following OrcColumnVector where it converts some ORC object to its counterpart in Spark. I can move if needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrcColumnsStatistics -> OrcColumnStatistics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - updated with new name.

// are combined with filter or group by
// e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
// SELECT COUNT(col1) FROM t GROUP BY col2
// Todo: 1. add support if groupby column is partition col
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao Don't you already add the support? No?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - #34248 is not merged yet, I can do a rebase later once it's merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - rebased.

return None
}

aggregation.groupByColumns.foreach { col =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, doesn't aggregation.groupByColumns must be empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Didn't change it when moving logic from 128168d to here. Will update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - updated, thanks.

@SparkQA
Copy link

SparkQA commented Oct 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48796/

@SparkQA
Copy link

SparkQA commented Oct 16, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48796/

@SparkQA
Copy link

SparkQA commented Oct 16, 2021

Test build #144317 has finished for PR 34298 at commit 73627ed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 17, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48805/

@SparkQA
Copy link

SparkQA commented Oct 17, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48805/

@@ -123,36 +126,11 @@ abstract class ParquetAggregatePushDownSuite
}
}

test("aggregate push down - Count(partition Col): push down") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still support push down Count(partition Col), right? Do we still need this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao - yes we still support. sorry I was removing it by mistake when copying the file. Let me add it back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao - added back.

@huaxingao
Copy link
Contributor

@c21 Thanks for working on this! I took a quick look, overall it is good. I will find time to take a closer look.

@SparkQA
Copy link

SparkQA commented Oct 17, 2021

Test build #144326 has finished for PR 34298 at commit 1f36f12.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @c21 ! left some comments

* this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
* according to data types. This is used for aggregate push down in ORC.
*/
public class OrcColumnsStatistics {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrcColumnsStatistics -> OrcColumnStatistics?

public static OrcColumnsStatistics readStatistics(Reader orcReader) {
TypeDescription orcSchema = orcReader.getSchema();
ColumnStatistics[] orcStatistics = orcReader.getStatistics();
StructType dataType = OrcUtils.toCatalystSchema(orcSchema);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe rename dataType to sparkSchema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - updated.

/**
* Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
*
* Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it in pre-order and does it flatten all the nested types? might worth mention here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added comment.

partitionNameSet: Set[String],
dataFilters: Seq[Expression],
isAllowedTypeForMinMaxAggregate: DataType => Boolean,
sparkSession: SparkSession): Option[StructType] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: sparkSession is unused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - sorry, removed.

def getSchemaForPushedAggregation(
aggregation: Aggregation,
schema: StructType,
partitionNameSet: Set[String],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe partitionNames?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - updated.

val nonNullRowsCount = if (isPartitionColumn) {
val topLevelStatistics = columnsStatistics.getStatistics
if (topLevelStatistics.hasNull) {
throw new SparkException(s"Illegal ORC top-level column statistics with NULL " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm does it mean here we have an invalid ORC file or it is a valid file but Spark can't handle the case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also give an informative error message to the users so they know how to fallback

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - here it means the ORC file is invalid. Actually we don't need this check, as ORC guarantees this and this error message is quite confusing. Removed.

// Count(*) includes both null and non-null values.
val topLevelStatistics = columnsStatistics.getStatistics
if (topLevelStatistics.hasNull) {
throw new SparkException(s"Illegal ORC top-level column statistics with NULL " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why we should throw exception here - doesn't count(*) include NULLs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - yes same as above, this error message is quite confusing. Removed.

override def isSplitable(path: Path): Boolean = {
// If aggregate is pushed down, only the file footer will be read once,
// so file should be not split across multiple tasks.
pushedAggregate.isEmpty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this is a better approach than we are doing on Parquet side, cc @huaxingao . Also maybe we should change how we measure file weight when combining tasks for aggregate pushdown, since we can combine multiple large files into a single task as computing stats is much cheaper.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - agreed, that's why I diverge from Parquet code path for this. We should make sure the file only being processed by only 1 task. Splitting the file across multiple tasks is weird and useless. I can make a change on Parquet side later after this PR is merged.

Also maybe we should change how we measure file weight when combining tasks for aggregate pushdown, since we can combine multiple large files into a single task as computing stats is much cheaper.

Yes I thought this as well. It's not trivial though as we need to come up with another heuristics to decide how do we combine files when aggregate is pushed down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree this is a better approach and Parquet should do this way too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao - cool then I can address for Parquet in a followup PR, no urgent anyway.

// Not push down Timestamp because INT96 sort order is undefined,
// Parquet doesn't return statistics for INT96.
// Not push down Binary type as Parquet can truncate the statistics.
case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType | BinaryType =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should put StringType here too

dataType match {
// Not push down complex and Timestamp type.
// Not push down Binary type as ORC does not write min/max statistics for it.
case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType | BinaryType =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm should we add StringType? here how does ORC store stats for long strings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - yes, updated.

@c21
Copy link
Contributor Author

c21 commented Oct 19, 2021

Thank you @sunchao for review! Will update shortly.

Copy link
Contributor Author

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed all comments and the PR is ready for review again. Thanks @sunchao, @viirya and @huaxingao.

/**
* Columns statistics interface wrapping ORC {@link ColumnStatistics}s.
*
* Because ORC {@link ColumnStatistics}s are stored as an flatten array in ORC file footer,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added comment.

* this class is used to covert ORC {@link ColumnStatistics}s from array to nested tree structure,
* according to data types. This is used for aggregate push down in ORC.
*/
public class OrcColumnsStatistics {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - updated with new name.

public static OrcColumnsStatistics readStatistics(Reader orcReader) {
TypeDescription orcSchema = orcReader.getSchema();
ColumnStatistics[] orcStatistics = orcReader.getStatistics();
StructType dataType = OrcUtils.toCatalystSchema(orcSchema);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - updated.

def getSchemaForPushedAggregation(
aggregation: Aggregation,
schema: StructType,
partitionNameSet: Set[String],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - updated.

partitionNameSet: Set[String],
dataFilters: Seq[Expression],
isAllowedTypeForMinMaxAggregate: DataType => Boolean,
sparkSession: SparkSession): Option[StructType] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - sorry, removed.

// Count(*) includes both null and non-null values.
val topLevelStatistics = columnsStatistics.getStatistics
if (topLevelStatistics.hasNull) {
throw new SparkException(s"Illegal ORC top-level column statistics with NULL " +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - yes same as above, this error message is quite confusing. Removed.

override def isSplitable(path: Path): Boolean = {
// If aggregate is pushed down, only the file footer will be read once,
// so file should be not split across multiple tasks.
pushedAggregate.isEmpty
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - agreed, that's why I diverge from Parquet code path for this. We should make sure the file only being processed by only 1 task. Splitting the file across multiple tasks is weird and useless. I can make a change on Parquet side later after this PR is merged.

Also maybe we should change how we measure file weight when combining tasks for aggregate pushdown, since we can combine multiple large files into a single task as computing stats is much cheaper.

Yes I thought this as well. It's not trivial though as we need to come up with another heuristics to decide how do we combine files when aggregate is pushed down.

dataType match {
// Not push down complex and Timestamp type.
// Not push down Binary type as ORC does not write min/max statistics for it.
case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType | BinaryType =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - yes, updated.

// Not push down Timestamp because INT96 sort order is undefined,
// Parquet doesn't return statistics for INT96.
// Not push down Binary type as Parquet can truncate the statistics.
case StructType(_) | ArrayType(_, _) | MapType(_, _, _) | TimestampType | BinaryType =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - updated.

@@ -123,36 +126,11 @@ abstract class ParquetAggregatePushDownSuite
}
}

test("aggregate push down - Count(partition Col): push down") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao - added back.

@SparkQA
Copy link

SparkQA commented Oct 23, 2021

Test build #144547 has finished for PR 34298 at commit 3341440.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When reading this code again, I realized that we may not need the isCaseSensitive because the column name has already been normalized. We probably don't need to pass down the isCaseSensitive at all. I will double check this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick test. We don't need this isCaseSensitive. I can clean this up if you don't have time for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao - thanks for checking. Removed for ORC. I can do another PR for Parquet to help this PR review faster, but if you are already on it for Parquet code path, feel free to go ahead.

@huaxingao
Copy link
Contributor

@c21 Did you have a chance to test large ORC files with multiple partitions? For Parquet, I did some testing using customer's data, but I still doubt my testing is sufficient. Please have more testing if possible. Thanks!

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @c21 . Overall looks pretty good.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: does it mean COUNT for complex types can be pushed down? maybe make it more explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - yes, updated the doc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a space at the end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm why we can include both null and non-null values when the column is a partition column?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - because for every row, the partition column should not be NULL (similar reason for Parquet in #33639 (comment)). So for partition column, every row should be counted. Also updated the unit test FileSourceAggregatePushDownSuite."Count(partition column): push down" to test for null values.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case we are using off-heap memory, we might want to check taskContext.isDefined since otherwise the task completion listener may not be triggered to free up the memory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - makes sense to me, this is also existing behavior of ParquetPartitionReaderFactory.createParquetVectorizedReader(). Updated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm does a ORC file always have stats?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - normally it should have. Added code here to throw an actionable exception here in case the file's statistics are not valid.

Copy link
Member

@sunchao sunchao Oct 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just curious, since from https://github.com/apache/orc/blob/main/proto/orc_proto.proto, min/max are optional fields, and ORC's ColumnStatisticsImpl also doesn't set minimum or maximum if the fields from protobuf are not defined.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking Spark write code path as an example here.

Spark uses OrcOutputWriter to write ORC file, and internally it depends on ORC OrcMapreduceRecordWriter to do the actual write.

The writing of file statistics is happening during OrcOutputWriter.close() -> OrcMapreduceRecordWriter.close() -> WriterImpl.close() -> WriterImpl.writeFooter() -> TreeWriter.writeFileStatistics(). So writing file statistics is a step of writing file footer and will throw exception if not written.

TreeWriter contains individual writer per each column. Let's take IntegerTreeWriter as an example for writing int column.

TreeWriterBase (the superclass of IntegerTreeWriter) maintains real-time per-row/batch statistics (TreeWriterBase.indexStatistics), per-stripe statistics (TreeWriterBase.stripeColStatistics) and per-file statistics (TreeWriterBase.fileStatistics). TreeWriterBase.writeBatch() updates the count statistics. IntegerTreeWriter.writeBatch() updates min/max statistics for int column when writing each row.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks @c21 !

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the column has 0 values, will min/max still be defined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao - great catch! Added handling for empty file (0 value/row), we should return null instead. Also added the unit test for empty file in FileSourceAggregatePushDownSuite/"aggregate push down - different data types", thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a few comments about how we store OrcColumnStatistics? Especially for map and array types. Although it is understable by reading convertStatistics, it is better to let readers/callers quickly know the format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - sure, added some comments and an example.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? Can't we use sameResult?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - I think so, Aggregation is not a QueryPlan here, btw this was introduced in #33639, and I am refactoring here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getMinMaxFromColumnStatistics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - sorry, fixed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use DateColumnStatistics instead of ${statistics.getClass.getName}?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use DateColumnStatistics instead of ${statistics.getClass.getName}?

Sorry if it's not clear, but this is code path for case _, not case s: DateColumnStatistics. I want to print out the class name for the statistics we do not handle.

@c21
Copy link
Contributor Author

c21 commented Oct 26, 2021

Addressed all comments, and the PR is ready for review again, thanks @viirya, @sunchao and @huaxingao.

@SparkQA
Copy link

SparkQA commented Oct 27, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49102/

@SparkQA
Copy link

SparkQA commented Oct 27, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49102/

@SparkQA
Copy link

SparkQA commented Oct 27, 2021

Test build #144632 has finished for PR 34298 at commit 9b8b9ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Oct 27, 2021

Rebased to latest master

@SparkQA
Copy link

SparkQA commented Oct 27, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49145/

@SparkQA
Copy link

SparkQA commented Oct 27, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49145/

* c3: map<key: int, value: string>
* c4: array<int>
*
* OrcColumnStatistics
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

@sunchao sunchao Oct 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just curious, since from https://github.com/apache/orc/blob/main/proto/orc_proto.proto, min/max are optional fields, and ORC's ColumnStatisticsImpl also doesn't set minimum or maximum if the fields from protobuf are not defined.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Test build #144676 has finished for PR 34298 at commit 8c7c617.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val ORC_AGGREGATE_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.aggregatePushdown")
.doc("If true, aggregates will be pushed down to ORC for optimization. Support MIN, MAX and " +
"COUNT as aggregate expression. For MIN/MAX, support boolean, integer, float and date " +
"type. For COUNT, support all data types.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We support byte, short and double for MIN/MAX too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought to just use integer to represent all integer types (byte, short, int, long) and use float here to represent all float types (float and double), to be less verbose. We anyway will update Spark doc on website with more detailed explanation of this aggregate push down feature anyway (ideally a sheet).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Let's have a detailed doc later on.

* tree pre-ordering. This is used for aggregate push down in ORC.
*
* For nested data types (array, map and struct), the sub-field statistics are stored recursively
* inside parent column's `children` field. Here is an example of `OrcColumnStatistics`:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think in java doc, we are supposed to use {@code} instead of back quotes. There are a couple of other places use back quotes too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao - thanks, updated for all class-level comments. For method comments or comments in method's body, the {@code} does not work, so not changed. There are also back quotes used in comments in our code base as well for comments in body of method.

throw new SparkException(
s"Cannot read columns statistics in file: $filePath. Please consider disabling " +
s"ORC aggregate push down by setting 'spark.sql.orc.aggregatePushdown' to false.", e)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I need to do the same thing for parquet too. When the columns statistics can't be read, is this guaranteed to be a RuntimeException, or it could be other Exception or Error too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only case is the statistics is missing and OrcFooterReader.convertStatistics():orcStatistics.remove() throws RuntimeException. But just to be safe, I changed to match all Exception here.

@huaxingao
Copy link
Contributor

LGTM

@c21
Copy link
Contributor Author

c21 commented Oct 28, 2021

Addressed all comments from @huaxingao. @viirya do you wanna take another look? Thanks.

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49194/

@SparkQA
Copy link

SparkQA commented Oct 28, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49194/

@SparkQA
Copy link

SparkQA commented Oct 29, 2021

Test build #144724 has finished for PR 34298 at commit d85d4ba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Oct 29, 2021

Thanks! Merging to master.

@viirya viirya closed this in 609e749 Oct 29, 2021
@c21
Copy link
Contributor Author

c21 commented Oct 29, 2021

Thank you @viirya, @sunchao and @huaxingao for review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants