Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ private[hudi] trait SparkVersionsSupport {
def gteqSpark3_1_3: Boolean = getSparkVersion >= "3.1.3"
def gteqSpark3_2: Boolean = getSparkVersion >= "3.2"
def gteqSpark3_2_1: Boolean = getSparkVersion >= "3.2.1"
def gteqSpark3_2_2: Boolean = getSparkVersion >= "3.2.2"
def gteqSpark3_3: Boolean = getSparkVersion >= "3.3"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,31 @@ package org.apache.spark.sql

import org.apache.hudi.spark3.internal.ReflectUtil
import org.apache.spark.sql.catalyst.analysis.{TableOutputResolver, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

trait HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils {

def resolveOutputColumns(tableName: String,
/**
* Instantiates [[ProjectionOverSchema]] utility
*/
def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema

override def resolveOutputColumns(tableName: String,
expected: Seq[Attribute],
query: LogicalPlan,
byName: Boolean,
conf: SQLConf): LogicalPlan =
TableOutputResolver.resolveOutputColumns(tableName, expected, query, byName, conf)

def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan =
override def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan =
ExplainCommand(plan, mode = if (extended) ExtendedMode else SimpleMode)

override def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.StructType

object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {

Expand All @@ -28,4 +29,6 @@ object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = {
throw new IllegalStateException(s"Should not call getRelationTimeTravel for Spark <= 3.2.x")
}

override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema = ProjectionOverSchema(schema)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.spark.sql.execution.datasources

import org.apache.hudi.HoodieBaseRelation
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Expression, NamedExpression, ProjectionOverSchema}
import org.apache.hudi.{HoodieBaseRelation, SparkAdapterSupport}
import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, NamedExpression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.util.SchemaUtils.restoreOriginalOutputNames
Expand Down Expand Up @@ -87,8 +86,10 @@ class Spark31NestedSchemaPruning extends Rule[LogicalPlan] {
// each schemata, assuming the fields in prunedDataSchema are a subset of the fields
// in dataSchema.
if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
val planUtils = SparkAdapterSupport.sparkAdapter.getCatalystPlanUtils.asInstanceOf[HoodieSpark3CatalystPlanUtils]

val prunedRelation = outputRelationBuilder(prunedDataSchema)
val projectionOverSchema = ProjectionOverSchema(prunedDataSchema)
val projectionOverSchema = planUtils.projectOverSchema(prunedDataSchema, AttributeSet(output))

Some(buildNewProjection(projects, normalizedProjects, normalizedFilters,
prunedRelation, projectionOverSchema))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TimeTravelRelation}
import org.apache.spark.sql.types.StructType

object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {

Expand All @@ -35,4 +38,20 @@ object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
None
}
}

override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema = {
val klass = classOf[ProjectionOverSchema]
checkArgument(klass.getConstructors.length == 1)
val ctor = klass.getConstructors.head

val p = if (HoodieSparkUtils.gteqSpark3_2_2) {
// Spark >= 3.2.2
ctor.newInstance(schema, output)
} else {
// Spark 3.2.0 and 3.2.1
ctor.newInstance(schema) // ProjectionOverSchema(schema)
}

p.asInstanceOf[ProjectionOverSchema]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.spark.sql.execution.datasources

import org.apache.hudi.HoodieBaseRelation
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Expression, NamedExpression, ProjectionOverSchema}
import org.apache.hudi.{HoodieBaseRelation, SparkAdapterSupport}
import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, NamedExpression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.util.SchemaUtils.restoreOriginalOutputNames
Expand Down Expand Up @@ -87,8 +86,10 @@ class Spark32NestedSchemaPruning extends Rule[LogicalPlan] {
// each schemata, assuming the fields in prunedDataSchema are a subset of the fields
// in dataSchema.
if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
val planUtils = SparkAdapterSupport.sparkAdapter.getCatalystPlanUtils.asInstanceOf[HoodieSpark3CatalystPlanUtils]

val prunedRelation = outputRelationBuilder(prunedDataSchema)
val projectionOverSchema = ProjectionOverSchema(prunedDataSchema)
val projectionOverSchema = planUtils.projectOverSchema(prunedDataSchema, AttributeSet(output))

Some(buildNewProjection(projects, normalizedProjects, normalizedFilters,
prunedRelation, projectionOverSchema))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TimeTravelRelation}
import org.apache.spark.sql.types.StructType

object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {

Expand All @@ -35,4 +36,7 @@ object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
None
}
}

override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema =
ProjectionOverSchema(schema, output)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.sql.execution.datasources

import org.apache.hudi.HoodieBaseRelation
import org.apache.hudi.{HoodieBaseRelation, SparkAdapterSupport}
import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, NamedExpression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
Expand Down Expand Up @@ -85,8 +86,10 @@ class Spark33NestedSchemaPruning extends Rule[LogicalPlan] {
// each schemata, assuming the fields in prunedDataSchema are a subset of the fields
// in dataSchema.
if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
val planUtils = SparkAdapterSupport.sparkAdapter.getCatalystPlanUtils.asInstanceOf[HoodieSpark3CatalystPlanUtils]

val prunedRelation = outputRelationBuilder(prunedDataSchema)
val projectionOverSchema = ProjectionOverSchema(prunedDataSchema,AttributeSet(output))
val projectionOverSchema = planUtils.projectOverSchema(prunedDataSchema, AttributeSet(output))

Some(buildNewProjection(projects, normalizedProjects, normalizedFilters,
prunedRelation, projectionOverSchema))
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
<flink.connector.kafka.artifactId>flink-connector-kafka</flink.connector.kafka.artifactId>
<flink.hadoop.compatibility.artifactId>flink-hadoop-compatibility_2.12</flink.hadoop.compatibility.artifactId>
<spark31.version>3.1.3</spark31.version>
<spark32.version>3.2.1</spark32.version>
<spark32.version>3.2.2</spark32.version>
<spark33.version>3.3.0</spark33.version>
<hudi.spark.module>hudi-spark2</hudi.spark.module>
<!-- NOTE: Different Spark versions might require different number of shared
Expand Down