diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 2b496cfb551e0..04fc02b740dd0 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -49,6 +49,7 @@ private[hudi] trait SparkVersionsSupport { def gteqSpark3_1_3: Boolean = getSparkVersion >= "3.1.3" def gteqSpark3_2: Boolean = getSparkVersion >= "3.2" def gteqSpark3_2_1: Boolean = getSparkVersion >= "3.2.1" + def gteqSpark3_2_2: Boolean = getSparkVersion >= "3.2.2" def gteqSpark3_3: Boolean = getSparkVersion >= "3.3" } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala index 6ad0ee57e081f..552eb320161ff 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import org.apache.hudi.spark3.internal.ReflectUtil import org.apache.spark.sql.catalyst.analysis.{TableOutputResolver, UnresolvedRelation} -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, ProjectionOverSchema} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} @@ -27,17 +27,23 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType trait HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils { - def resolveOutputColumns(tableName: String, + /** + * Instantiates [[ProjectionOverSchema]] utility + */ + def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema + + override def resolveOutputColumns(tableName: String, expected: Seq[Attribute], query: LogicalPlan, byName: Boolean, conf: SQLConf): LogicalPlan = TableOutputResolver.resolveOutputColumns(tableName, expected, query, byName, conf) - def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan = + override def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan = ExplainCommand(plan, mode = if (extended) ExtendedMode else SimpleMode) override def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier = { diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala index 668619e30bfd3..57864004df834 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala @@ -18,8 +18,9 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.types.StructType object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { @@ -28,4 +29,6 @@ object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = { throw new IllegalStateException(s"Should not call getRelationTimeTravel for Spark <= 3.2.x") } + + override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema = ProjectionOverSchema(schema) } diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark31NestedSchemaPruning.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark31NestedSchemaPruning.scala index b731699963df4..1b29c428bb8a9 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark31NestedSchemaPruning.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark31NestedSchemaPruning.scala @@ -17,13 +17,12 @@ package org.apache.spark.sql.execution.datasources -import org.apache.hudi.HoodieBaseRelation -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Expression, NamedExpression, ProjectionOverSchema} +import org.apache.hudi.{HoodieBaseRelation, SparkAdapterSupport} +import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, NamedExpression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} import org.apache.spark.sql.util.SchemaUtils.restoreOriginalOutputNames @@ -87,8 +86,10 @@ class Spark31NestedSchemaPruning extends Rule[LogicalPlan] { // each schemata, assuming the fields in prunedDataSchema are a subset of the fields // in dataSchema. if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { + val planUtils = SparkAdapterSupport.sparkAdapter.getCatalystPlanUtils.asInstanceOf[HoodieSpark3CatalystPlanUtils] + val prunedRelation = outputRelationBuilder(prunedDataSchema) - val projectionOverSchema = ProjectionOverSchema(prunedDataSchema) + val projectionOverSchema = planUtils.projectOverSchema(prunedDataSchema, AttributeSet(output)) Some(buildNewProjection(projects, normalizedProjects, normalizedFilters, prunedRelation, projectionOverSchema)) diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala index 5cd995a7d9815..19025ce0d5d6c 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala @@ -18,8 +18,11 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.hudi.HoodieSparkUtils +import org.apache.hudi.common.util.ValidationUtils.checkArgument +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TimeTravelRelation} +import org.apache.spark.sql.types.StructType object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { @@ -35,4 +38,20 @@ object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { None } } + + override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema = { + val klass = classOf[ProjectionOverSchema] + checkArgument(klass.getConstructors.length == 1) + val ctor = klass.getConstructors.head + + val p = if (HoodieSparkUtils.gteqSpark3_2_2) { + // Spark >= 3.2.2 + ctor.newInstance(schema, output) + } else { + // Spark 3.2.0 and 3.2.1 + ctor.newInstance(schema) // ProjectionOverSchema(schema) + } + + p.asInstanceOf[ProjectionOverSchema] + } } diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala index 8d82e0b96b5f6..7a6cb20c84952 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala @@ -17,13 +17,12 @@ package org.apache.spark.sql.execution.datasources -import org.apache.hudi.HoodieBaseRelation -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Expression, NamedExpression, ProjectionOverSchema} +import org.apache.hudi.{HoodieBaseRelation, SparkAdapterSupport} +import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, NamedExpression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} import org.apache.spark.sql.util.SchemaUtils.restoreOriginalOutputNames @@ -87,8 +86,10 @@ class Spark32NestedSchemaPruning extends Rule[LogicalPlan] { // each schemata, assuming the fields in prunedDataSchema are a subset of the fields // in dataSchema. if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { + val planUtils = SparkAdapterSupport.sparkAdapter.getCatalystPlanUtils.asInstanceOf[HoodieSpark3CatalystPlanUtils] + val prunedRelation = outputRelationBuilder(prunedDataSchema) - val projectionOverSchema = ProjectionOverSchema(prunedDataSchema) + val projectionOverSchema = planUtils.projectOverSchema(prunedDataSchema, AttributeSet(output)) Some(buildNewProjection(projects, normalizedProjects, normalizedFilters, prunedRelation, projectionOverSchema)) diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala index adeecfc814584..4d4921bc03472 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala @@ -18,8 +18,9 @@ package org.apache.spark.sql -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TimeTravelRelation} +import org.apache.spark.sql.types.StructType object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { @@ -35,4 +36,7 @@ object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { None } } + + override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema = + ProjectionOverSchema(schema, output) } diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark33NestedSchemaPruning.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark33NestedSchemaPruning.scala index e6b19b7195b81..cb390642bb599 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark33NestedSchemaPruning.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark33NestedSchemaPruning.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution.datasources -import org.apache.hudi.HoodieBaseRelation +import org.apache.hudi.{HoodieBaseRelation, SparkAdapterSupport} +import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, NamedExpression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} @@ -85,8 +86,10 @@ class Spark33NestedSchemaPruning extends Rule[LogicalPlan] { // each schemata, assuming the fields in prunedDataSchema are a subset of the fields // in dataSchema. if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { + val planUtils = SparkAdapterSupport.sparkAdapter.getCatalystPlanUtils.asInstanceOf[HoodieSpark3CatalystPlanUtils] + val prunedRelation = outputRelationBuilder(prunedDataSchema) - val projectionOverSchema = ProjectionOverSchema(prunedDataSchema,AttributeSet(output)) + val projectionOverSchema = planUtils.projectOverSchema(prunedDataSchema, AttributeSet(output)) Some(buildNewProjection(projects, normalizedProjects, normalizedFilters, prunedRelation, projectionOverSchema)) diff --git a/pom.xml b/pom.xml index 5daef106da0ad..07a9ffdae3d0b 100644 --- a/pom.xml +++ b/pom.xml @@ -146,7 +146,7 @@ flink-connector-kafka flink-hadoop-compatibility_2.12 3.1.3 - 3.2.1 + 3.2.2 3.3.0 hudi-spark2