diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2c7397c1ec77..150c6ac38cd0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -325,7 +325,7 @@ object SQLConf { val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath") .doc("When true, check all the partition paths under the table\'s root directory " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index ba0a7605da71..7f8892b4e04c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -65,19 +65,17 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { test("SPARK-12218: 'Not' is included in ORC filter pushdown") { import testImplicits._ - withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/table1" - (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path) - - checkAnswer( - spark.read.orc(path).where("not (a = 2) or not(b in ('1'))"), - (1 to 5).map(i => Row(i, (i % 2).toString))) - - checkAnswer( - spark.read.orc(path).where("not (a = 2 and b in ('1'))"), - (1 to 5).map(i => Row(i, (i % 2).toString))) - } + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table1" + (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path) + + checkAnswer( + spark.read.orc(path).where("not (a = 2) or not(b in ('1'))"), + (1 to 5).map(i => Row(i, (i % 2).toString))) + + checkAnswer( + spark.read.orc(path).where("not (a = 2 and b in ('1'))"), + (1 to 5).map(i => Row(i, (i % 2).toString))) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 60ccd996d6d5..d627419d8c07 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -389,53 +389,51 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { test("SPARK-10623 Enable ORC PPD") { withTempPath { dir => - withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { - import testImplicits._ - val path = dir.getCanonicalPath - - // For field "a", the first column has odds integers. This is to check the filtered count - // when `isNull` is performed. For Field "b", `isNotNull` of ORC file filters rows - // only when all the values are null (maybe this works differently when the data - // or query is complicated). So, simply here a column only having `null` is added. - val data = (0 until 10).map { i => - val maybeInt = if (i % 2 == 0) None else Some(i) - val nullValue: Option[String] = None - (maybeInt, nullValue) - } - // It needs to repartition data so that we can have several ORC files - // in order to skip stripes in ORC. - createDataFrame(data).toDF("a", "b").repartition(10).write.orc(path) - val df = spark.read.orc(path) - - def checkPredicate(pred: Column, answer: Seq[Row]): Unit = { - val sourceDf = stripSparkFilter(df.where(pred)) - val data = sourceDf.collect().toSet - val expectedData = answer.toSet - - // When a filter is pushed to ORC, ORC can apply it to rows. So, we can check - // the number of rows returned from the ORC to make sure our filter pushdown work. - // A tricky part is, ORC does not process filter rows fully but return some possible - // results. So, this checks if the number of result is less than the original count - // of data, and then checks if it contains the expected data. - assert( - sourceDf.count < 10 && expectedData.subsetOf(data), - s"No data was filtered for predicate: $pred") - } + import testImplicits._ + val path = dir.getCanonicalPath - checkPredicate('a === 5, List(5).map(Row(_, null))) - checkPredicate('a <=> 5, List(5).map(Row(_, null))) - checkPredicate('a < 5, List(1, 3).map(Row(_, null))) - checkPredicate('a <= 5, List(1, 3, 5).map(Row(_, null))) - checkPredicate('a > 5, List(7, 9).map(Row(_, null))) - checkPredicate('a >= 5, List(5, 7, 9).map(Row(_, null))) - checkPredicate('a.isNull, List(null).map(Row(_, null))) - checkPredicate('b.isNotNull, List()) - checkPredicate('a.isin(3, 5, 7), List(3, 5, 7).map(Row(_, null))) - checkPredicate('a > 0 && 'a < 3, List(1).map(Row(_, null))) - checkPredicate('a < 1 || 'a > 8, List(9).map(Row(_, null))) - checkPredicate(!('a > 3), List(1, 3).map(Row(_, null))) - checkPredicate(!('a > 0 && 'a < 3), List(3, 5, 7, 9).map(Row(_, null))) + // For field "a", the first column has odds integers. This is to check the filtered count + // when `isNull` is performed. For Field "b", `isNotNull` of ORC file filters rows + // only when all the values are null (maybe this works differently when the data + // or query is complicated). So, simply here a column only having `null` is added. + val data = (0 until 10).map { i => + val maybeInt = if (i % 2 == 0) None else Some(i) + val nullValue: Option[String] = None + (maybeInt, nullValue) + } + // It needs to repartition data so that we can have several ORC files + // in order to skip stripes in ORC. + createDataFrame(data).toDF("a", "b").repartition(10).write.orc(path) + val df = spark.read.orc(path) + + def checkPredicate(pred: Column, answer: Seq[Row]): Unit = { + val sourceDf = stripSparkFilter(df.where(pred)) + val data = sourceDf.collect().toSet + val expectedData = answer.toSet + + // When a filter is pushed to ORC, ORC can apply it to rows. So, we can check + // the number of rows returned from the ORC to make sure our filter pushdown work. + // A tricky part is, ORC does not process filter rows fully but return some possible + // results. So, this checks if the number of result is less than the original count + // of data, and then checks if it contains the expected data. + assert( + sourceDf.count < 10 && expectedData.subsetOf(data), + s"No data was filtered for predicate: $pred") } + + checkPredicate('a === 5, List(5).map(Row(_, null))) + checkPredicate('a <=> 5, List(5).map(Row(_, null))) + checkPredicate('a < 5, List(1, 3).map(Row(_, null))) + checkPredicate('a <= 5, List(1, 3, 5).map(Row(_, null))) + checkPredicate('a > 5, List(7, 9).map(Row(_, null))) + checkPredicate('a >= 5, List(5, 7, 9).map(Row(_, null))) + checkPredicate('a.isNull, List(null).map(Row(_, null))) + checkPredicate('b.isNotNull, List()) + checkPredicate('a.isin(3, 5, 7), List(3, 5, 7).map(Row(_, null))) + checkPredicate('a > 0 && 'a < 3, List(1).map(Row(_, null))) + checkPredicate('a < 1 || 'a > 8, List(9).map(Row(_, null))) + checkPredicate(!('a > 3), List(1, 3).map(Row(_, null))) + checkPredicate(!('a > 0 && 'a < 3), List(3, 5, 7, 9).map(Row(_, null))) } } @@ -511,63 +509,55 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } test("SPARK-14962 Produce correct results on array type with isnotnull") { - withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { - val data = (0 until 10).map(i => Tuple1(Array(i))) - withOrcFile(data) { file => - val actual = spark - .read - .orc(file) - .where("_1 is not null") - val expected = data.toDF() - checkAnswer(actual, expected) - } + val data = (0 until 10).map(i => Tuple1(Array(i))) + withOrcFile(data) { file => + val actual = spark + .read + .orc(file) + .where("_1 is not null") + val expected = data.toDF() + checkAnswer(actual, expected) } } test("SPARK-15198 Support for pushing down filters for boolean types") { - withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { - val data = (0 until 10).map(_ => (true, false)) - withOrcFile(data) { file => - val df = spark.read.orc(file).where("_2 == true") - val actual = stripSparkFilter(df).count() - - // ORC filter should be applied and the total count should be 0. - assert(actual === 0) - } + val data = (0 until 10).map(_ => (true, false)) + withOrcFile(data) { file => + val df = spark.read.orc(file).where("_2 == true") + val actual = stripSparkFilter(df).count() + + // ORC filter should be applied and the total count should be 0. + assert(actual === 0) } } test("Support for pushing down filters for decimal types") { - withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { - val data = (0 until 10).map(i => Tuple1(BigDecimal.valueOf(i))) - withTempPath { file => - // It needs to repartition data so that we can have several ORC files - // in order to skip stripes in ORC. - createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath) - val df = spark.read.orc(file.getCanonicalPath).where("a == 2") - val actual = stripSparkFilter(df).count() - - assert(actual < 10) - } + val data = (0 until 10).map(i => Tuple1(BigDecimal.valueOf(i))) + withTempPath { file => + // It needs to repartition data so that we can have several ORC files + // in order to skip stripes in ORC. + createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath) + val df = spark.read.orc(file.getCanonicalPath).where("a == 2") + val actual = stripSparkFilter(df).count() + + assert(actual < 10) } } test("Support for pushing down filters for timestamp types") { - withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { - val timeString = "2015-08-20 14:57:00" - val data = (0 until 10).map { i => - val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600 - Tuple1(new Timestamp(milliseconds)) - } - withTempPath { file => - // It needs to repartition data so that we can have several ORC files - // in order to skip stripes in ORC. - createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath) - val df = spark.read.orc(file.getCanonicalPath).where(s"a == '$timeString'") - val actual = stripSparkFilter(df).count() - - assert(actual < 10) - } + val timeString = "2015-08-20 14:57:00" + val data = (0 until 10).map { i => + val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600 + Tuple1(new Timestamp(milliseconds)) + } + withTempPath { file => + // It needs to repartition data so that we can have several ORC files + // in order to skip stripes in ORC. + createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath) + val df = spark.read.orc(file.getCanonicalPath).where(s"a == '$timeString'") + val actual = stripSparkFilter(df).count() + + assert(actual < 10) } }