Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Oct 1, 2018

What changes were proposed in this pull request?

This PR aims to fix an ORC performance regression at Spark 2.4.0 RCs from Spark 2.3.2. Currently, for column names with ., the pushed predicates are ignored.

Test Data

scala> val df = spark.range(Int.MaxValue).sample(0.2).toDF("col.with.dot")
scala> df.write.mode("overwrite").orc("/tmp/orc")

Spark 2.3.2

scala> spark.sql("set spark.sql.orc.impl=native")
scala> spark.sql("set spark.sql.orc.filterPushdown=true")
scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 1542 ms

scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 152 ms

Spark 2.4.0 RC3

scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 4074 ms

scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 1771 ms

How was this patch tested?

Pass the Jenkins with a newly added test case.

@SparkQA
Copy link

SparkQA commented Oct 1, 2018

Test build #96810 has finished for PR 22597 at commit f6c3dca.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters
// in order to distinguish predicate pushdown for nested columns.
private def quoteAttributeNameIfNeeded(name: String) : String = {
if (!name.contains("`") && name.contains(".")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this condition take the backtick in column name into account? For instance,

>>> spark.range(1).toDF("abc`.abc").show()
+--------+
|abc`.abc|
+--------+
|       0|
+--------+

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review. I'll consider that, too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon . Actually, Spark 2.3.2 ORC (native/hive) doesn't support a backtick character in column names. It fails on writing operation. And, although Spark 2.4.0 broadens the supported special characters like . and " in column names, the backtick character is not handled yet.

So, for that one, I'll proceed in another PR since it's an improvement instead of a regression.

Also, cc @gatorsmile and @dbtsai .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ORC and AVRO improvement, SPARK-25722 is created.

@dbtsai
Copy link
Member

dbtsai commented Oct 12, 2018

Is it possible to add tests like parquet to remove the filter in Spark SQL to ensure that the predicate is pushed down to the reader? Thanks.

@gatorsmile
Copy link
Member

Yes. Please add a test case.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Oct 12, 2018

Thank you for review, @dbtsai and @gatorsmile .

BTW, what do you mean by removing? The pushed filter doesn't introduce correctness issue like Parquet. Since it's a performance slowdown, this PR want to fix it. We don't want to remove filters in this PR.

Also, for the performance slowdown, we cannot add a test case. We usually make a benchmark to detect this kind of regression. Do we want to a benchmark case at FilterPushdownBenchmark instead?

@dbtsai
Copy link
Member

dbtsai commented Oct 12, 2018

In ParquetFilter, the way we test if a predicate pushdown works is by removing that predicate from Spark SQL physical plan, and only relying on the reader to do the filter. Thus, if there is a bug in pushdown filter in reader, Spark will get the incorrect result. This can use in test to ensure no regression later.

@dongjoon-hyun
Copy link
Member Author

Thanks. I got it. You mean stripSparkFilter which is used in both OrcQuerySuite.scala and ParquetFilterSuite.scala. Sure!

@cloud-fan
Copy link
Contributor

In ParquetFilter, the way we test if a predicate pushdown works is by removing that predicate from Spark SQL physical plan, and only relying on the reader to do the filter.

I haven't looked into, but the parquet record-level filtering is disabled by default, so if we remove predicates from spark side, the result can be wrong even if the predicates are pushed ro parquet.

@SparkQA
Copy link

SparkQA commented Oct 13, 2018

Test build #97329 has finished for PR 22597 at commit 335a39f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

test("SPARK-25579 ORC PPD should support column names with dot") {
import testImplicits._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test at OrcFilterSuite too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, this is OrcFilterSuite.

Can we add a test at OrcFilterSuite too?

For HiveOrcFilterSuite, hive ORC implementation doesn't support dot.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. One end to end test should be enough

val path = new File(dir, "orc").getCanonicalPath
Seq((1, 2), (3, 4)).toDF("col.dot.1", "col.dot.2").write.orc(path)
val df = spark.read.orc(path).where("`col.dot.1` = 1 and `col.dot.2` = 2")
checkAnswer(stripSparkFilter(df), Row(1, 2))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun, technically shouldn't we test if the stripes are filtered? I added some tests a long ago (stripSparkFilter is added by me FWIW as well):

test("Support for pushing down filters for decimal types") {
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
val data = (0 until 10).map(i => Tuple1(BigDecimal.valueOf(i)))
withTempPath { file =>
// It needs to repartition data so that we can have several ORC files
// in order to skip stripes in ORC.
spark.createDataFrame(data).toDF("a").repartition(10)
.write.orc(file.getCanonicalPath)
val df = spark.read.orc(file.getCanonicalPath).where("a == 2")
val actual = stripSparkFilter(df).count()
assert(actual < 10)
}
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like that test, this test also generates two ORC files with one row and test if PPD works.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Oct 14, 2018

I haven't looked into, but the parquet record-level filtering is disabled by default, so if we remove predicates from spark side, the result can be wrong even if the predicates are pushed ro parquet.

That's explicitly enabled for the parquet tests (that's disabled by me FWIW). For ORC tests, since it doesn't support record by record filter, it tests if the output is smaller then the original data.

Some parquet tests do this as well for instance,

test("Filters should be pushed down for Parquet readers at row group level") {
import testImplicits._
withSQLConf(
// Makes sure disabling 'spark.sql.parquet.recordFilter' still enables
// row group level filtering.
SQLConf.PARQUET_RECORD_FILTER_ENABLED.key -> "false",
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
withTempPath { path =>
val data = (1 to 1024)
data.toDF("a").coalesce(1)
.write.option("parquet.block.size", 512)
.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath).filter("a == 500")
// Here, we strip the Spark side filter and check the actual results from Parquet.
val actual = stripSparkFilter(df).collect().length
// Since those are filtered at row group level, the result count should be less
// than the total length but should not be a single record.
// Note that, if record level filtering is enabled, it should be a single record.
// If no filter is pushed down to Parquet, it should be the total length of data.
assert(actual > 1 && actual < data.length)
}
}
}

@SparkQA
Copy link

SparkQA commented Oct 15, 2018

Test build #97366 has finished for PR 22597 at commit 849c7fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val path = new File(dir, "orc").getCanonicalPath
Seq((1, 2), (3, 4)).toDF("col.dot.1", "col.dot.2").write.orc(path)
val df = spark.read.orc(path).where("`col.dot.1` = 1 and `col.dot.2` = 2")
checkAnswer(stripSparkFilter(df), Row(1, 2))
Copy link
Contributor

@cloud-fan cloud-fan Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to confirm, this only works when (1, 2) and (3, 4) are in different row groups? (not sure what's the terminology in ORC)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. It works when they are in different stripes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we generalize this into nested cases? The parent struct can contain dot as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ORC data source doesn't support nested column pruning yet.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Oct 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @dbtsai ! I ignored PPDs with nested columns here because Spark doesn't pushdown in Spark 2.4 and until now without your PR (#22573). With your PR, Spark 3.0 will support that and we can update this to handle that cases, too.

@cloud-fan . Actually, ORC 1.5.0 starts to support PPD with nested columns ORC-323. So, @dbtsai and I discussed about supporting that before. We are going to support ORC PPDs with nested columns in Spark 3.0 without regression.

withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempDir { dir =>
val path = new File(dir, "orc").getCanonicalPath
Seq((1, 2), (3, 4)).toDF("col.dot.1", "col.dot.2").write.orc(path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about explicitly repartition to make separate output files?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using the default parallelism from TestSparkSession on two rows and it generates separate output files already.

If you are concerning some possibility of flakiness, we are able to increase the number of rows to 10 and call repartition(10) and check assert(actual < 10) as you did before. Do you want that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not rely on implicit environment values, let's make the test as explicit as possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Thank you for confirmation, @cloud-fan and @HyukjinKwon .

df.write.mode(SaveMode.Overwrite).orc(path.getCanonicalPath)
}

protected def checkPredicatePushDown(df: DataFrame, numRows: Int, predicate: String): Unit = {
Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Oct 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon . I refactor this since it's repeated three times now.
And, this function should be here because the existing two instances are in OrcQueryTest and new instance is in OrcQuerySuite. There is another similar instance, but I skipped it because it's not the same pattern.

@cloud-fan
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Oct 16, 2018

Test build #97436 has finished for PR 22597 at commit 7686179.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Oct 16, 2018

Test build #97441 has finished for PR 22597 at commit 7686179.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Oct 16, 2018

Test build #97445 has finished for PR 22597 at commit 7686179.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master and branch-2.4.

asfgit pushed a commit that referenced this pull request Oct 16, 2018
… predicates

## What changes were proposed in this pull request?

This PR aims to fix an ORC performance regression at Spark 2.4.0 RCs from Spark 2.3.2. Currently, for column names with `.`, the pushed predicates are ignored.

**Test Data**
```scala
scala> val df = spark.range(Int.MaxValue).sample(0.2).toDF("col.with.dot")
scala> df.write.mode("overwrite").orc("/tmp/orc")
```

**Spark 2.3.2**
```scala
scala> spark.sql("set spark.sql.orc.impl=native")
scala> spark.sql("set spark.sql.orc.filterPushdown=true")
scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 1542 ms

scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 152 ms
```

**Spark 2.4.0 RC3**
```scala
scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 4074 ms

scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 1771 ms
```

## How was this patch tested?

Pass the Jenkins with a newly added test case.

Closes #22597 from dongjoon-hyun/SPARK-25579.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
(cherry picked from commit 2c664ed)
Signed-off-by: hyukjinkwon <[email protected]>
@asfgit asfgit closed this in 2c664ed Oct 16, 2018
@dongjoon-hyun
Copy link
Member Author

Thank you all!

@dongjoon-hyun dongjoon-hyun deleted the SPARK-25579 branch October 16, 2018 21:37
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
… predicates

## What changes were proposed in this pull request?

This PR aims to fix an ORC performance regression at Spark 2.4.0 RCs from Spark 2.3.2. Currently, for column names with `.`, the pushed predicates are ignored.

**Test Data**
```scala
scala> val df = spark.range(Int.MaxValue).sample(0.2).toDF("col.with.dot")
scala> df.write.mode("overwrite").orc("/tmp/orc")
```

**Spark 2.3.2**
```scala
scala> spark.sql("set spark.sql.orc.impl=native")
scala> spark.sql("set spark.sql.orc.filterPushdown=true")
scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 1542 ms

scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 152 ms
```

**Spark 2.4.0 RC3**
```scala
scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 4074 ms

scala> spark.time(spark.read.orc("/tmp/orc").where("`col.with.dot` < 10").show)
+------------+
|col.with.dot|
+------------+
|           5|
|           7|
|           8|
+------------+

Time taken: 1771 ms
```

## How was this patch tested?

Pass the Jenkins with a newly added test case.

Closes apache#22597 from dongjoon-hyun/SPARK-25579.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants