Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@

package org.apache.hudi

import org.apache.avro.Schema.Type
import org.apache.avro.generic.GenericRecord
import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema}
import org.apache.avro.{JsonProperties, Schema}
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.spark.rdd.RDD
Expand All @@ -29,31 +28,9 @@ import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

object AvroConversionUtils {

/**
* Check the nullability of the input Avro type and resolve it when it is nullable. The first
* return value is a [[Boolean]] indicating if the input Avro type is nullable. The second
* return value is either provided Avro type if it's not nullable, or its resolved non-nullable part
* in case it is
*/
def resolveAvroTypeNullability(avroType: Schema): (Boolean, Schema) = {
if (avroType.getType == Type.UNION) {
val fields = avroType.getTypes.asScala
val actualType = fields.filter(_.getType != Type.NULL)
if (fields.length != 2 || actualType.length != 1) {
throw new AvroRuntimeException(
s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
"type is supported")
}
(true, actualType.head)
} else {
(false, avroType)
}
}

/**
* Creates converter to transform Avro payload into Spark's Catalyst one
*
Expand Down Expand Up @@ -104,7 +81,7 @@ object AvroConversionUtils {
recordNamespace: String): Row => GenericRecord = {
val serde = sparkAdapter.createSparkRowSerDe(sourceSqlType)
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(sourceSqlType, structName, recordNamespace)
val (nullable, _) = resolveAvroTypeNullability(avroSchema)
val nullable = AvroSchemaUtils.resolveNullableSchema(avroSchema) != avroSchema

val converter = AvroConversionUtils.createInternalRowToAvroConverter(sourceSqlType, avroSchema, nullable)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.SPARK_VERSION
Expand Down Expand Up @@ -84,7 +84,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport {
// making Spark deserialize its internal representation [[InternalRow]] into [[Row]] for subsequent conversion
// (and back)
val sameSchema = writerAvroSchema.equals(readerAvroSchema)
val (nullable, _) = AvroConversionUtils.resolveAvroTypeNullability(writerAvroSchema)
val nullable = AvroSchemaUtils.resolveNullableSchema(writerAvroSchema) != writerAvroSchema

// NOTE: We have to serialize Avro schema, and then subsequently parse it on the executor node, since Spark
// serializer is not able to digest it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import scala.language.implicitConversions
*/
object JFunction {

def scalaFunction1Noop[T]: T => Unit = _ => {}

////////////////////////////////////////////////////////////
// From Java to Scala
////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.sql

import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.SparkAdapterSupport.sparkAdapter
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction}
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, GenerateUnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeEq, AttributeReference, Cast, Expression, Like, Literal, MutableProjection, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CreateStruct, Expression, GetStructField, Like, Literal, Projection, SubqueryExpression, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -44,6 +47,9 @@ trait HoodieCatalystExpressionUtils {
*/
def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): Seq[Expression]

// TODO scala-doc
def matchCast(expr: Expression): Option[(Expression, DataType, Option[String])]

/**
* Matches an expression iff
*
Expand Down Expand Up @@ -75,7 +81,7 @@ trait HoodieCatalystExpressionUtils {
def unapplyCastExpression(expr: Expression): Option[(Expression, DataType, Option[String], Boolean)]
}

object HoodieCatalystExpressionUtils {
object HoodieCatalystExpressionUtils extends SparkAdapterSupport {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it need to extend SparkAdapterSupport? Is there something that changes across spark versions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, adapter is needed to match Cast expression (MatchCast object below)


/**
* Convenience extractor allowing to untuple [[Cast]] across Spark versions
Expand All @@ -85,6 +91,12 @@ object HoodieCatalystExpressionUtils {
sparkAdapter.getCatalystExpressionUtils.unapplyCastExpression(expr)
}

/**
* Leverages [[AttributeEquals]] predicate on 2 provided [[Attribute]]s
*/
def attributeEquals(one: Attribute, other: Attribute): Boolean =
new AttributeEq(one).equals(new AttributeEq(other))

/**
* Generates instance of [[UnsafeProjection]] projecting row of one [[StructType]] into another [[StructType]]
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.internal.SQLConf

trait HoodieCatalystPlansUtils {
Expand All @@ -48,47 +47,21 @@ trait HoodieCatalystPlansUtils {
*/
def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan

/**
* Convert a AliasIdentifier to TableIdentifier.
*/
def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier

/**
* Convert a UnresolvedRelation to TableIdentifier.
*/
def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier

/**
* Create Join logical plan.
*/
def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join

/**
* Test if the logical plan is a Insert Into LogicalPlan.
*/
def isInsertInto(plan: LogicalPlan): Boolean

/**
* Get the member of the Insert Into LogicalPlan.
*/
def getInsertIntoChildren(plan: LogicalPlan):
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]

/**
* if the logical plan is a TimeTravelRelation LogicalPlan.
*/
def isRelationTimeTravel(plan: LogicalPlan): Boolean

/**
* Get the member of the TimeTravelRelation LogicalPlan.
* Decomposes [[InsertIntoStatement]] into its arguments allowing to accommodate for API
* changes in Spark 3.3
*/
def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])]
def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]

/**
* Create a Insert Into LogicalPlan.
* Rebases instance of {@code InsertIntoStatement} onto provided instance of {@code targetTable} and {@code query}
*/
def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan
def rebaseInsertIntoStatement(iis: LogicalPlan, targetTable: LogicalPlan, query: LogicalPlan): LogicalPlan

/**
* Test if the logical plan is a Repair Table LogicalPlan.
Expand All @@ -98,6 +71,5 @@ trait HoodieCatalystPlansUtils {
/**
* Get the member of the Repair Table LogicalPlan.
*/
def getRepairTableChildren(plan: LogicalPlan):
Option[(TableIdentifier, Boolean, Boolean, String)]
def getRepairTableChildren(plan: LogicalPlan): Option[(TableIdentifier, Boolean, Boolean, String)]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

/**
* This class primarily serves as a proxy for [[AttributeEquals]] inaccessible outside
* the current package
*/
class AttributeEq(attr: Attribute) extends AttributeEquals(attr) {}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,21 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.TablePathUtils
import org.apache.spark.sql._
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.parser.HoodieExtendedParserInterface
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
import org.apache.spark.storage.StorageLevel

import java.util.Locale
Expand All @@ -48,6 +54,12 @@ trait SparkAdapter extends Serializable {
*/
def isColumnarBatchRow(r: InternalRow): Boolean

/**
* Creates Catalyst [[Metadata]] for Hudi's meta-fields (designating these w/
* [[METADATA_COL_ATTR_KEY]] if available (available in Spark >= 3.2)
*/
def createCatalystMetadataForMetaField: Metadata

/**
* Inject table-valued functions to SparkSessionExtensions
*/
Expand Down Expand Up @@ -96,36 +108,31 @@ trait SparkAdapter extends Serializable {
/**
* Create the hoodie's extended spark sql parser.
*/
def createExtendedSparkParser: Option[(SparkSession, ParserInterface) => ParserInterface] = None
def createExtendedSparkParser(spark: SparkSession, delegate: ParserInterface): HoodieExtendedParserInterface

/**
* Create the SparkParsePartitionUtil.
*/
def getSparkParsePartitionUtil: SparkParsePartitionUtil

/**
* ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called.
*/
def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String]

/**
* Combine [[PartitionedFile]] to [[FilePartition]] according to `maxSplitBytes`.
*/
def getFilePartitions(sparkSession: SparkSession, partitionedFiles: Seq[PartitionedFile],
maxSplitBytes: Long): Seq[FilePartition]

def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
unfoldSubqueryAliases(table) match {
case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
// This is to handle the cases when table is loaded by providing
// the path to the Spark DS and not from the catalog
case LogicalRelation(fsr: HadoopFsRelation, _, _, _) =>
fsr.options.get("path").map { pathStr =>
val path = new Path(pathStr)
TablePathUtils.isHoodieTablePath(path.getFileSystem(spark.sparkContext.hadoopConfiguration), path)
} getOrElse(false)

case _ => false
/**
* Checks whether [[LogicalPlan]] refers to Hudi table, and if it's the case extracts
* corresponding [[CatalogTable]]
*/
def resolveHoodieTable(plan: LogicalPlan): Option[CatalogTable] = {
EliminateSubqueryAliases(plan) match {
// First, we need to weed out unresolved plans
case plan if !plan.resolved => None
// NOTE: When resolving Hudi table we allow [[Filter]]s and [[Project]]s be applied
// on top of it
case PhysicalOperation(_, _, LogicalRelation(_, _, Some(table), _)) if isHoodieTable(table) => Some(table)
case _ => None
}
}

Expand All @@ -142,15 +149,6 @@ trait SparkAdapter extends Serializable {
isHoodieTable(table)
}

protected def unfoldSubqueryAliases(plan: LogicalPlan): LogicalPlan = {
plan match {
case SubqueryAlias(_, relation: LogicalPlan) =>
unfoldSubqueryAliases(relation)
case other =>
other
}
}

/**
* Create instance of [[ParquetFileFormat]]
*/
Expand Down Expand Up @@ -182,28 +180,12 @@ trait SparkAdapter extends Serializable {
readDataSchema: StructType,
metadataColumns: Seq[AttributeReference] = Seq.empty): FileScanRDD

/**
* Resolve [[DeleteFromTable]]
* SPARK-38626 condition is no longer Option in Spark 3.3
*/
def resolveDeleteFromTable(deleteFromTable: Command,
resolveExpression: Expression => Expression): LogicalPlan

/**
* Extract condition in [[DeleteFromTable]]
* SPARK-38626 condition is no longer Option in Spark 3.3
*/
def extractDeleteCondition(deleteFromTable: Command): Expression

/**
* Get parseQuery from ExtendedSqlParser, only for Spark 3.3+
*/
def getQueryParserFromExtendedSqlParser(session: SparkSession, delegate: ParserInterface,
sqlText: String): LogicalPlan = {
// unsupported by default
throw new UnsupportedOperationException(s"Unsupported parseQuery method in Spark earlier than Spark 3.3.0")
}

/**
* Converts instance of [[StorageLevel]] to a corresponding string
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.parser

import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
* This trait helps us to bridge compatibility gap of [[ParserInterface]] b/w different
* Spark versions
*/
trait HoodieExtendedParserInterface extends ParserInterface {

def parseQuery(sqlText: String): LogicalPlan = {
throw new UnsupportedOperationException(s"Unsupported, parseQuery is implemented in Spark >= 3.3.0")
}

def parseMultipartIdentifier(sqlText: String): Seq[String] = {
throw new UnsupportedOperationException(s"Unsupported, parseMultipartIdentifier is implemented in Spark >= 3.0.0")
}

}
Loading