From 4f5a91ac9638b43ddf85fd4b53a2c4f71d0e9126 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 6 Jan 2019 12:00:17 +0800 Subject: [PATCH 1/4] Fix schema pruning error when selecting one complex field and having is not null predicate on another one. --- .../parquet/ParquetSchemaPruning.scala | 17 +++++++-- .../parquet/ParquetSchemaPruningSuite.scala | 36 +++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala index 91080b15727d6..26cf6b9cc8125 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala @@ -40,11 +40,19 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { plan } + // `PhysicalOperation` pattern returns relation operator's outputs if there is no + // projects on it. In this case, we don't need to do schema pruning. + private def directOutput(projects: Seq[NamedExpression], plan: LogicalPlan): Boolean = { + projects.length == plan.output.length && projects.zip(plan.output).forall { + case (l, r) => l.name == r.name && l.dataType.sameType(r.dataType) + } + } + private def apply0(plan: LogicalPlan): LogicalPlan = plan transformDown { case op @ PhysicalOperation(projects, filters, l @ LogicalRelation(hadoopFsRelation: HadoopFsRelation, _, _, _)) - if canPruneRelation(hadoopFsRelation) => + if canPruneRelation(hadoopFsRelation) && !directOutput(projects, l) => val (normalizedProjects, normalizedFilters) = normalizeAttributeRefNames(l, projects, filters) val requestedRootFields = identifyRootFields(normalizedProjects, normalizedFilters) @@ -119,7 +127,12 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { .distinct.partition(_.contentAccessed) optRootFields.filter { opt => - !rootFields.exists(_.field.name == opt.field.name) + !rootFields.exists { root => + val rootFieldType = StructType(Array(root.field)) + val optFieldType = StructType(Array(opt.field)) + val merged = optFieldType.merge(rootFieldType) + root.field.name == opt.field.name && merged.sameType(optFieldType) + } } ++ rootFields } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala index 434c4414edeba..966190e12c6ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.SchemaPruningTest import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.StructType @@ -217,6 +218,41 @@ class ParquetSchemaPruningSuite Row("Y.") :: Nil) } + testSchemaPruning("select one complex field and having is null predicate on another " + + "complex field") { + val query = sql("select * from contacts") + .where("name.middle is not null") + .select( + "id", + "name.first", + "name.middle", + "name.last" + ) + .where("last = 'Jones'") + .select(count("id")).toDF() + checkScan(query, + "struct>") + checkAnswer(query, Row(0) :: Nil) + } + + testSchemaPruning("select one deep nested complex field and having is null predicate on " + + "another deep nested complex field") { + val query = sql("select * from contacts") + .where("employer.company.address is not null") + .selectExpr( + "id", + "name.first", + "name.middle", + "name.last", + "employer.id as employer_id" + ) + .where("employer_id = 0") + .select(count("id")).toDF() + checkScan(query, + "struct>>") + checkAnswer(query, Row(1) :: Nil) + } + private def testSchemaPruning(testName: String)(testThunk: => Unit) { test(s"Spark vectorized reader - without partition data column - $testName") { withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { From 6dbd753d679b3bdc7e88999019727cbb37e2bc33 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 7 Jan 2019 12:12:57 +0800 Subject: [PATCH 2/4] Address comment. --- .../datasources/parquet/ParquetSchemaPruning.scala | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala index 26cf6b9cc8125..34baa2dea16ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala @@ -40,19 +40,11 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { plan } - // `PhysicalOperation` pattern returns relation operator's outputs if there is no - // projects on it. In this case, we don't need to do schema pruning. - private def directOutput(projects: Seq[NamedExpression], plan: LogicalPlan): Boolean = { - projects.length == plan.output.length && projects.zip(plan.output).forall { - case (l, r) => l.name == r.name && l.dataType.sameType(r.dataType) - } - } - private def apply0(plan: LogicalPlan): LogicalPlan = plan transformDown { case op @ PhysicalOperation(projects, filters, l @ LogicalRelation(hadoopFsRelation: HadoopFsRelation, _, _, _)) - if canPruneRelation(hadoopFsRelation) && !directOutput(projects, l) => + if canPruneRelation(hadoopFsRelation) => val (normalizedProjects, normalizedFilters) = normalizeAttributeRefNames(l, projects, filters) val requestedRootFields = identifyRootFields(normalizedProjects, normalizedFilters) From ff1cc85ceba2618101adc5a00f8bb72da956f5dd Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 11 Jan 2019 00:18:13 +0800 Subject: [PATCH 3/4] Address comment. --- .../parquet/ParquetSchemaPruning.scala | 35 +++++++++++++------ 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala index 34baa2dea16ba..f2878c7c7e747 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala @@ -116,14 +116,27 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { // For example, for a query `SELECT name.first FROM contacts WHERE name IS NOT NULL`, // we don't need to read nested fields of `name` struct other than `first` field. val (rootFields, optRootFields) = (projectionRootFields ++ filterRootFields) - .distinct.partition(_.contentAccessed) + .distinct.partition(!_.prunedIfAnyChildAccessed) optRootFields.filter { opt => + val optFieldType = StructType(Array(opt.field)) !rootFields.exists { root => - val rootFieldType = StructType(Array(root.field)) - val optFieldType = StructType(Array(opt.field)) - val merged = optFieldType.merge(rootFieldType) - root.field.name == opt.field.name && merged.sameType(optFieldType) + root.field.name == opt.field.name && { + // Checking if current optional root field can be pruned. + // For each required root field, we merge it with the optional root field: + // 1. If this optional root field has nested fields and any nested field of it is used + // in the query, the merged field type must equal to the optional root field type. + // We can prune this optional root field. For example, for optional root field + // `struct>`, if its field + // `struct>` is used, we don't need to add this optional + // root field. + // 2. If this optional root field has no nested fields, the merged field type equals + // to the optional root field only if they are the same. If they are, we can prune + // this optional root field too. + val rootFieldType = StructType(Array(root.field)) + val merged = optFieldType.merge(rootFieldType) + merged.sameType(optFieldType) + } } } ++ rootFields } @@ -218,11 +231,11 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { // don't actually use any nested fields. These root field accesses might be excluded later // if there are any nested fields accesses in the query plan. case IsNotNull(SelectedField(field)) => - RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil + RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed = true) :: Nil case IsNull(SelectedField(field)) => - RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil + RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed = true) :: Nil case IsNotNull(_: Attribute) | IsNull(_: Attribute) => - expr.children.flatMap(getRootFields).map(_.copy(contentAccessed = false)) + expr.children.flatMap(getRootFields).map(_.copy(prunedIfAnyChildAccessed = true)) case _ => expr.children.flatMap(getRootFields) } @@ -276,9 +289,9 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { /** * This represents a "root" schema field (aka top-level, no-parent). `field` is the * `StructField` for field name and datatype. `derivedFromAtt` indicates whether it - * was derived from an attribute or had a proper child. `contentAccessed` means whether - * it was accessed with its content by the expressions refer it. + * was derived from an attribute or had a proper child. `prunedIfAnyChildAccessed` means + * whether this root field can be pruned if any of child field is used in the query. */ private case class RootField(field: StructField, derivedFromAtt: Boolean, - contentAccessed: Boolean = true) + prunedIfAnyChildAccessed: Boolean = false) } From ff2fa67bfbdf8d520337c8082397209f3e1af741 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 11 Jan 2019 16:34:32 +0800 Subject: [PATCH 4/4] Move optFieldType. --- .../execution/datasources/parquet/ParquetSchemaPruning.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala index f2878c7c7e747..840fcae8c6915 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala @@ -119,7 +119,6 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { .distinct.partition(!_.prunedIfAnyChildAccessed) optRootFields.filter { opt => - val optFieldType = StructType(Array(opt.field)) !rootFields.exists { root => root.field.name == opt.field.name && { // Checking if current optional root field can be pruned. @@ -134,6 +133,7 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { // to the optional root field only if they are the same. If they are, we can prune // this optional root field too. val rootFieldType = StructType(Array(root.field)) + val optFieldType = StructType(Array(opt.field)) val merged = optFieldType.merge(rootFieldType) merged.sameType(optFieldType) }