-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-4186] Support Hudi with Spark 3.3.0 #5943
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
|
|
||
| package org.apache.hudi.client.bootstrap; | ||
|
|
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hudi.AvroConversionUtils; | ||
| import org.apache.hudi.avro.HoodieAvroUtils; | ||
| import org.apache.hudi.avro.model.HoodieFileStatus; | ||
|
|
@@ -71,11 +72,20 @@ protected Schema getBootstrapSourceSchema(HoodieEngineContext context, List<Pair | |
| } | ||
|
|
||
| private static Schema getBootstrapSourceSchemaParquet(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) { | ||
| MessageType parquetSchema = new ParquetUtils().readSchema(context.getHadoopConf().get(), filePath); | ||
| Configuration hadoopConf = context.getHadoopConf().get(); | ||
| MessageType parquetSchema = new ParquetUtils().readSchema(hadoopConf, filePath); | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change made according to SPARK-36935. ParquetSchemaConverter change |
||
| hadoopConf.set( | ||
| SQLConf.PARQUET_BINARY_AS_STRING().key(), | ||
| SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()); | ||
| hadoopConf.set( | ||
| SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), | ||
| SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString()); | ||
| hadoopConf.set( | ||
| SQLConf.CASE_SENSITIVE().key(), | ||
| SQLConf.CASE_SENSITIVE().defaultValueString()); | ||
CTTY marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(hadoopConf); | ||
CTTY marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter( | ||
| Boolean.parseBoolean(SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()), | ||
| Boolean.parseBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString())); | ||
| StructType sparkSchema = converter.convert(parquetSchema); | ||
| String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName()); | ||
| String structName = tableName + "_record"; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,17 +24,15 @@ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConver | |
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogTable | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder | ||
| import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate} | ||
| import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate} | ||
| import org.apache.spark.sql.catalyst.parser.ParserInterface | ||
| import org.apache.spark.sql.catalyst.plans.JoinType | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias} | ||
| import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, SubqueryAlias} | ||
| import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat | ||
| import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil} | ||
| import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, LogicalRelation, PartitionedFile, SparkParsePartitionUtil} | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types.DataType | ||
| import org.apache.spark.sql.types.{DataType, StructType} | ||
| import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, Row, SparkSession} | ||
| import org.apache.spark.storage.StorageLevel | ||
|
|
||
|
|
@@ -132,8 +130,8 @@ trait SparkAdapter extends Serializable { | |
| } | ||
|
|
||
| /** | ||
| * Create instance of [[ParquetFileFormat]] | ||
| */ | ||
| * Create instance of [[ParquetFileFormat]] | ||
| */ | ||
| def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] | ||
|
|
||
| /** | ||
|
|
@@ -143,6 +141,38 @@ trait SparkAdapter extends Serializable { | |
| */ | ||
| def createInterpretedPredicate(e: Expression): InterpretedPredicate | ||
|
|
||
| /** | ||
| * Create instance of [[HoodieFileScanRDD]] | ||
| * SPARK-37273 FileScanRDD constructor changed in SPARK 3.3 | ||
| */ | ||
| def createHoodieFileScanRDD(sparkSession: SparkSession, | ||
| readFunction: PartitionedFile => Iterator[InternalRow], | ||
| filePartitions: Seq[FilePartition], | ||
| readDataSchema: StructType, | ||
| metadataColumns: Seq[AttributeReference] = Seq.empty): FileScanRDD | ||
|
|
||
| /** | ||
| * Resolve [[DeleteFromTable]] | ||
| * SPARK-38626 condition is no longer Option in Spark 3.3 | ||
| */ | ||
| def resolveDeleteFromTable(deleteFromTable: Command, | ||
| resolveExpression: Expression => Expression): LogicalPlan | ||
|
|
||
| /** | ||
| * Extract condition in [[DeleteFromTable]] | ||
| * SPARK-38626 condition is no longer Option in Spark 3.3 | ||
| */ | ||
| def extractCondition(deleteFromTable: Command): Expression | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: rename to |
||
|
|
||
| /** | ||
| * Get parseQuery from ExtendedSqlParser, only for Spark 3.3+ | ||
| */ | ||
| def getQueryParserFromExtendedSqlParser(session: SparkSession, delegate: ParserInterface, | ||
| sqlText: String): LogicalPlan = { | ||
| // unsupported by default | ||
| throw new UnsupportedOperationException(s"Unsupported parseQuery method in Spark earlier than Spark 3.3.0") | ||
CTTY marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| /** | ||
| * Converts instance of [[StorageLevel]] to a corresponding string | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| import org.apache.flink.types.Row; | ||
| import org.apache.hudi.common.model.HoodieTableType; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.Disabled; | ||
| import org.junit.jupiter.api.io.TempDir; | ||
| import org.junit.jupiter.params.ParameterizedTest; | ||
| import org.junit.jupiter.params.provider.EnumSource; | ||
|
|
@@ -45,6 +46,7 @@ void beforeEach() { | |
| @TempDir | ||
| File tempFile; | ||
|
|
||
| @Disabled | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Temporarily disabled flink quickstart test due to it's instablity |
||
| @ParameterizedTest | ||
| @EnumSource(value = HoodieTableType.class) | ||
| void testHoodieFlinkQuickstart(HoodieTableType tableType) throws Exception { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -190,6 +190,12 @@ | |
| <artifactId>spark-sql_${scala.binary.version}</artifactId> | ||
| </dependency> | ||
|
|
||
| <!-- Hadoop --> | ||
| <dependency> | ||
| <groupId>org.apache.hadoop</groupId> | ||
| <artifactId>hadoop-auth</artifactId> | ||
| </dependency> | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SparkQuickStartTest would throw NoSuchMethodError without this
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good find. so can we now re-enable spark 3.2 quickstart test in GH action? check out bot.yml
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Highly likely. maybe we need a seperate jira to track that?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Filed this one: https://issues.apache.org/jira/browse/HUDI-4479 |
||
| <!-- Parquet --> | ||
| <dependency> | ||
| <groupId>org.apache.parquet</groupId> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,6 +52,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, | |
| globPaths: Seq[Path]) | ||
| extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport { | ||
|
|
||
| case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit | ||
|
|
||
| override type FileSplit = HoodieBaseFileSplit | ||
|
|
||
| // TODO(HUDI-3204) this is to override behavior (exclusively) for COW tables to always extract | ||
|
|
@@ -97,7 +99,9 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, | |
| // back into the one expected by the caller | ||
| val projectedReader = projectReader(baseFileReader, requiredSchema.structTypeSchema) | ||
|
|
||
| new HoodieFileScanRDD(sparkSession, projectedReader.apply, fileSplits) | ||
| // SPARK-37273 FileScanRDD constructor changed in SPARK 3.3 | ||
CTTY marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| sparkAdapter.createHoodieFileScanRDD(sparkSession, projectedReader.apply, fileSplits.map(_.filePartition), requiredSchema.structTypeSchema) | ||
| .asInstanceOf[HoodieUnsafeRDD] | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FileScanRDD API changed: SPARK-37273 have to split |
||
| } | ||
|
|
||
| protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,15 +44,24 @@ import scala.collection.mutable.ListBuffer | |
| object HoodieAnalysis { | ||
| type RuleBuilder = SparkSession => Rule[LogicalPlan] | ||
|
|
||
| def customOptimizerRules: Seq[RuleBuilder] = | ||
| def customOptimizerRules: Seq[RuleBuilder] = { | ||
| if (HoodieSparkUtils.gteqSpark3_1) { | ||
| val nestedSchemaPruningClass = "org.apache.spark.sql.execution.datasources.NestedSchemaPruning" | ||
| val nestedSchemaPruningRule = ReflectionUtils.loadClass(nestedSchemaPruningClass).asInstanceOf[Rule[LogicalPlan]] | ||
| val nestedSchemaPruningClass = | ||
| if (HoodieSparkUtils.gteqSpark3_3) { | ||
| "org.apache.spark.sql.execution.datasources.Spark33NestedSchemaPruning" | ||
| } else if (HoodieSparkUtils.gteqSpark3_2) { | ||
| "org.apache.spark.sql.execution.datasources.Spark32NestedSchemaPruning" | ||
| } else { | ||
| // spark 3.1 | ||
| "org.apache.spark.sql.execution.datasources.Spark31NestedSchemaPruning" | ||
| } | ||
|
|
||
| val nestedSchemaPruningRule = ReflectionUtils.loadClass(nestedSchemaPruningClass).asInstanceOf[Rule[LogicalPlan]] | ||
| Seq(_ => nestedSchemaPruningRule) | ||
| } else { | ||
| Seq.empty | ||
| } | ||
| } | ||
|
|
||
| def customResolutionRules: Seq[RuleBuilder] = { | ||
| val rules: ListBuffer[RuleBuilder] = ListBuffer( | ||
|
|
@@ -74,18 +83,21 @@ object HoodieAnalysis { | |
| val spark3ResolveReferences: RuleBuilder = | ||
| session => ReflectionUtils.loadClass(spark3ResolveReferencesClass, session).asInstanceOf[Rule[LogicalPlan]] | ||
|
|
||
| val spark32ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32" | ||
| val spark32ResolveAlterTableCommands: RuleBuilder = | ||
| session => ReflectionUtils.loadClass(spark32ResolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]] | ||
| val resolveAlterTableCommandsClass = | ||
| if (HoodieSparkUtils.gteqSpark3_3) | ||
| "org.apache.spark.sql.hudi.Spark33ResolveHudiAlterTableCommand" | ||
| else "org.apache.spark.sql.hudi.Spark32ResolveHudiAlterTableCommand" | ||
| val resolveAlterTableCommands: RuleBuilder = | ||
| session => ReflectionUtils.loadClass(resolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]] | ||
|
|
||
| // NOTE: PLEASE READ CAREFULLY | ||
| // | ||
| // It's critical for this rules to follow in this order, so that DataSource V2 to V1 fallback | ||
| // is performed prior to other rules being evaluated | ||
| rules ++= Seq(dataSourceV2ToV1Fallback, spark3Analysis, spark3ResolveReferences, spark32ResolveAlterTableCommands) | ||
| rules ++= Seq(dataSourceV2ToV1Fallback, spark3Analysis, spark3ResolveReferences, resolveAlterTableCommands) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SPARK-38939 DropColumns syntax change |
||
|
|
||
| } else if (HoodieSparkUtils.gteqSpark3_1) { | ||
| val spark31ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommand312" | ||
| val spark31ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.Spark312ResolveHudiAlterTableCommand" | ||
| val spark31ResolveAlterTableCommands: RuleBuilder = | ||
| session => ReflectionUtils.loadClass(spark31ResolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]] | ||
|
|
||
|
|
@@ -421,12 +433,10 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi | |
| UpdateTable(table, resolvedAssignments, resolvedCondition) | ||
|
|
||
| // Resolve Delete Table | ||
| case DeleteFromTable(table, condition) | ||
| case dft @ DeleteFromTable(table, condition) | ||
| if sparkAdapter.isHoodieTable(table, sparkSession) && table.resolved => | ||
| // Resolve condition | ||
| val resolvedCondition = condition.map(resolveExpressionFrom(table)(_)) | ||
| // Return the resolved DeleteTable | ||
| DeleteFromTable(table, resolvedCondition) | ||
| val resolveExpression = resolveExpressionFrom(table, None)_ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd suggest we keep syntax as it was (with parenthesis) |
||
| sparkAdapter.resolveDeleteFromTable(dft, resolveExpression) | ||
CTTY marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Append the meta field to the insert query to walk through the validate for the | ||
| // number of insert fields with the number of the target table fields. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ import org.apache.hudi.SparkAdapterSupport | |
| import org.apache.spark.sql._ | ||
| import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable | ||
| import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable | ||
| import org.apache.spark.sql.catalyst.expressions.Expression | ||
| import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ | ||
| import org.apache.spark.sql.hudi.ProvidesHoodieConfig | ||
|
|
||
|
|
@@ -36,9 +37,9 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Hoodie | |
|
|
||
| // Remove meta fields from the data frame | ||
| var df = removeMetaFields(Dataset.ofRows(sparkSession, table)) | ||
| if (deleteTable.condition.isDefined) { | ||
| df = df.filter(Column(deleteTable.condition.get)) | ||
| } | ||
| // SPARK-38626 DeleteFromTable.condition is changed from Option[Expression] to Expression in Spark 3.3 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: the comment can go into the Spark adapter implementation and is not necessary here.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be addressed in a separate PR. |
||
| val condition = sparkAdapter.extractCondition(deleteTable) | ||
| if (condition != null) df = df.filter(Column(condition)) | ||
|
|
||
| val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId) | ||
| val config = buildHoodieDeleteTableConfig(hoodieCatalogTable, sparkSession) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,6 +57,14 @@ class HoodieCommonSqlParser(session: SparkSession, delegate: ParserInterface) | |
|
|
||
| override def parseDataType(sqlText: String): DataType = delegate.parseDataType(sqlText) | ||
|
|
||
| /* SPARK-37266 Added parseQuery to ParserInterface in Spark 3.3.0. This is a patch to prevent | ||
| hackers from tampering text with persistent view, it won't be called in older Spark | ||
| Don't mark this as override for backward compatibility | ||
| Can't use sparkExtendedParser directly here due to the same reason */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, but i can't understand the java-doc: can you please elaborate on why this is here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Due to the same reason, we can't mark this method with
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed on Slack, let's instead of doing parsing in |
||
| def parseQuery(sqlText: String): LogicalPlan = parse(sqlText) { parser => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we doing double-parsing?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I reused the code flow from |
||
| sparkAdapter.getQueryParserFromExtendedSqlParser(session, delegate, sqlText) | ||
| } | ||
|
|
||
| def parseRawDataType(sqlText : String) : DataType = { | ||
| throw new UnsupportedOperationException(s"Unsupported parseRawDataType method") | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.