Skip to content

Commit e959d1a

Browse files
committed
Tests that exactly go through the canDrop and inverseCanDrop.
1 parent 4f25a33 commit e959d1a

File tree

3 files changed

+38
-7
lines changed

3 files changed

+38
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,8 @@ class ParquetFileFormat
344344
sparkSession.sessionState.conf.parquetFilterPushDown
345345
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
346346
val returningBatch = supportBatch(sparkSession, resultSchema)
347+
val pushDownDate = sqlConf.parquetFilterPushDownDate
348+
val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
347349

348350
(file: PartitionedFile) => {
349351
assert(file.partitionValues.numFields == partitionSchema.size)
@@ -354,7 +356,8 @@ class ParquetFileFormat
354356
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
355357
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
356358
// is used here.
357-
.flatMap(new ParquetFilters().createFilter(requiredSchema, _))
359+
.flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
360+
.createFilter(requiredSchema, _))
358361
.reduceOption(FilterApi.and)
359362
} else {
360363
None

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,7 @@ import org.apache.spark.unsafe.types.UTF8String
3434
/**
3535
* Some utility function to convert Spark data source filters to Parquet filters.
3636
*/
37-
private[parquet] class ParquetFilters() {
38-
39-
val sqlConf: SQLConf = SQLConf.get
40-
val pushDownDate = sqlConf.parquetFilterPushDownDate
41-
val pushDownStartWith = sqlConf.parquetFilterPushDownStringStartWith
37+
private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) {
4238

4339
private def dateToDays(date: Date): SQLDate = {
4440
DateTimeUtils.fromJavaDate(date)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
5555
*/
5656
class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {
5757

58-
private lazy val parquetFilters = new ParquetFilters()
58+
private lazy val parquetFilters =
59+
new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownStringStartWith)
5960

6061
override def beforeEach(): Unit = {
6162
super.beforeEach()
@@ -141,6 +142,31 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
141142
checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
142143
}
143144

145+
// This function tests that exactly go through the `canDrop` and `inverseCanDrop`.
146+
private def testStringStartsWith(dataFrame: DataFrame, filter: String): Unit = {
147+
withTempPath { dir =>
148+
val path = dir.getCanonicalPath
149+
dataFrame.write.option("parquet.block.size", 512).parquet(path)
150+
Seq(true, false).foreach { enablePushDown =>
151+
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> enablePushDown.toString) {
152+
val accu = new NumRowGroupsAcc
153+
sparkContext.register(accu)
154+
155+
val df = spark.read.parquet(path).filter(filter)
156+
df.foreachPartition((it: Iterator[Row]) => it.foreach(v => accu.add(0)))
157+
df.collect
158+
if (enablePushDown) {
159+
assert(accu.value == 0)
160+
} else {
161+
assert(accu.value > 0)
162+
}
163+
164+
AccumulatorContext.remove(accu.id)
165+
}
166+
}
167+
}
168+
}
169+
144170
test("filter pushdown - boolean") {
145171
withParquetDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df =>
146172
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
@@ -710,6 +736,12 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
710736
sources.StringStartsWith("_1", null))
711737
}
712738
}
739+
740+
import testImplicits._
741+
// Test canDrop()
742+
testStringStartsWith(spark.range(1024).map(_.toString).toDF, "value like 'a%'")
743+
// Test inverseCanDrop()
744+
testStringStartsWith(spark.range(1024).map(c => "100").toDF, "value not like '100'")
713745
}
714746
}
715747

0 commit comments

Comments
 (0)