diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 31ce05173ac3c..d18238fa4b6b8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -175,11 +175,6 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Schema string representing the latest schema of the table. Hudi passes this to " + "implementations of evolution of schema"); - public static final ConfigProperty SCHEMA_EVOLUTION_ENABLE = ConfigProperty - .key("hoodie.schema.on.read.enable") - .defaultValue(false) - .withDocumentation("enable full schema evolution for hoodie"); - public static final ConfigProperty ENABLE_INTERNAL_SCHEMA_CACHE = ConfigProperty .key("hoodie.schema.cache.enable") .defaultValue(false) @@ -929,11 +924,11 @@ public void setInternalSchemaCacheEnable(boolean enable) { } public boolean getSchemaEvolutionEnable() { - return getBoolean(SCHEMA_EVOLUTION_ENABLE); + return getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE); } public void setSchemaEvolutionEnable(boolean enable) { - setValue(SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable)); + setValue(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable)); } /** @@ -2175,7 +2170,7 @@ public Builder withSchema(String schemaStr) { } public Builder withSchemaEvolutionEnable(boolean enable) { - writeConfig.setValue(SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable)); + writeConfig.setValue(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE, String.valueOf(enable)); return this; } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala new file mode 100644 index 0000000000000..4a7dca840876d --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util + +/** + * Utility allowing for seamless conversion b/w Java/Scala functional primitives + */ +object JFunction { + + def toScala[T, R](f: java.util.function.Function[T, R]): T => R = + (t: T) => f.apply(t) + + def toJava[T](f: T => Unit): java.util.function.Consumer[T] = + new java.util.function.Consumer[T] { + override def accept(t: T): Unit = f.apply(t) + } + +} \ No newline at end of file diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index a97743e62fac8..1823e61b22d3e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -28,14 +28,13 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.DataType import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row, SparkSession} -import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import java.util.Locale @@ -141,8 +140,8 @@ trait SparkAdapter extends Serializable { maxSplitBytes: Long): Seq[FilePartition] def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = { - tripAlias(table) match { - case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl) + unfoldSubqueryAliases(table) match { + case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table) case relation: UnresolvedRelation => isHoodieTable(toTableIdentifier(relation), spark) case _=> false @@ -162,20 +161,15 @@ trait SparkAdapter extends Serializable { isHoodieTable(table) } - def tripAlias(plan: LogicalPlan): LogicalPlan = { + protected def unfoldSubqueryAliases(plan: LogicalPlan): LogicalPlan = { plan match { case SubqueryAlias(_, relation: LogicalPlan) => - tripAlias(relation) + unfoldSubqueryAliases(relation) case other => other } } - /** - * Create customresolutionRule to deal with alter command for hudi. - */ - def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan] - /** * Create instance of [[ParquetFileFormat]] */ diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index d0365dced199e..1b69d7db4ec69 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hudi.HoodieConversionUtils; import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; @@ -73,12 +74,14 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadStat; import org.apache.hudi.timeline.service.TimelineService; +import org.apache.hudi.util.JFunction; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.SparkSessionExtensions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -98,6 +101,7 @@ import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -145,6 +149,10 @@ public static void tearDownAll() throws IOException { FileSystem.closeAll(); } + protected Option> getSparkSessionExtensionsInjector() { + return Option.empty(); + } + @BeforeEach public void setTestMethodName(TestInfo testInfo) { if (testInfo.getTestMethod().isPresent()) { @@ -186,16 +194,32 @@ public void cleanupResources() throws IOException { * @param appName The specified application name. */ protected void initSparkContexts(String appName) { + Option> sparkSessionExtensionsInjector = + getSparkSessionExtensionsInjector(); + + if (sparkSessionExtensionsInjector.isPresent()) { + // In case we need to inject extensions into Spark Session, we have + // to stop any session that might still be active and since Spark will try + // to re-use it + HoodieConversionUtils.toJavaOption(SparkSession.getActiveSession()) + .ifPresent(SparkSession::stop); + } + // Initialize a local spark env jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName + "#" + testMethodName)); jsc.setLogLevel("ERROR"); - hadoopConf = jsc.hadoopConfiguration(); - // SQLContext stuff - sqlContext = new SQLContext(jsc); + hadoopConf = jsc.hadoopConfiguration(); context = new HoodieSparkEngineContext(jsc); - hadoopConf = context.getHadoopConf().get(); - sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate(); + + sparkSession = SparkSession.builder() + .withExtensions(JFunction.toScala(sparkSessionExtensions -> { + sparkSessionExtensionsInjector.ifPresent(injector -> injector.accept(sparkSessionExtensions)); + return null; + })) + .config(jsc.getConf()) + .getOrCreate(); + sqlContext = new SQLContext(sparkSession); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java index 6be92af9adcd4..cc62bcc32824f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java @@ -31,6 +31,11 @@ description = "The following set of configurations are common across Hudi.") public class HoodieCommonConfig extends HoodieConfig { + public static final ConfigProperty SCHEMA_EVOLUTION_ENABLE = ConfigProperty + .key("hoodie.schema.on.read.enable") + .defaultValue(false) + .withDocumentation("Enables support for Schema Evolution feature"); + public static final ConfigProperty SPILLABLE_DISK_MAP_TYPE = ConfigProperty .key("hoodie.common.spillable.diskmap.type") .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index a62a402b6ac22..0102870e92bbe 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -19,7 +19,7 @@ package org.apache.hudi import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.HoodieConversionUtils.toScalaOption -import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig} +import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig} import org.apache.hudi.common.fs.ConsistencyGuardConfig import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig @@ -142,6 +142,9 @@ object DataSourceReadOptions { .key("hoodie.datasource.read.incr.fallback.fulltablescan.enable") .defaultValue("false") .withDocumentation("When doing an incremental query whether we should fall back to full table scans if file does not exist.") + + val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[Boolean] = HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE + /** @deprecated Use {@link QUERY_TYPE} and its methods instead */ @Deprecated val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index eee5a4881c9b9..71c38f765509d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -25,11 +25,10 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.config.HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE import org.apache.hudi.exception.HoodieException -import org.apache.hudi.internal.schema.InternalSchema import org.apache.log4j.LogManager import org.apache.spark.sql.execution.streaming.{Sink, Source} +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog import org.apache.spark.sql.hudi.streaming.HoodieStreamSource import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode @@ -100,9 +99,18 @@ class DefaultSource extends RelationProvider val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent val tableType = metaClient.getTableType val queryType = parameters(QUERY_TYPE.key) - val userSchema = if (schema == null) Option.empty[StructType] else Some(schema) + // NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain + // Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that + // case we opt in to not be providing catalog's schema, and instead force Hudi relations to fetch the schema + // from the table itself + val userSchema = if (isUsingHiveCatalog(sqlContext.sparkSession)) { + None + } else { + Option(schema) + } log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType") + if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) { new EmptyRelation(sqlContext, metaClient) } else { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 4a12256c4330a..47e391a560a22 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -34,7 +34,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.util.ValidationUtils.checkState -import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.io.storage.HoodieHFileReader import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex @@ -74,7 +74,7 @@ case class HoodieTableState(tablePath: String, abstract class HoodieBaseRelation(val sqlContext: SQLContext, val metaClient: HoodieTableMetaClient, val optParams: Map[String, String], - userSchema: Option[StructType]) + schemaSpec: Option[StructType]) extends BaseRelation with FileRelation with PrunedFilteredScan @@ -128,24 +128,28 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, */ protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = { val schemaResolver = new TableSchemaResolver(metaClient) - val avroSchema = Try(schemaResolver.getTableAvroSchema) match { - case Success(schema) => schema - case Failure(e) => - logWarning("Failed to fetch schema from the table", e) - // If there is no commit in the table, we can't get the schema - // t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead. - userSchema match { - case Some(s) => convertToAvroSchema(s) - case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty") - } + val avroSchema: Schema = schemaSpec.map(convertToAvroSchema).getOrElse { + Try(schemaResolver.getTableAvroSchema) match { + case Success(schema) => schema + case Failure(e) => + logError("Failed to fetch schema from the table", e) + throw new HoodieSchemaException("Failed to fetch schema from the table") + } } - // try to find internalSchema - val internalSchemaFromMeta = try { - schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema) - } catch { - case _: Exception => InternalSchema.getEmptyInternalSchema + + val internalSchema: InternalSchema = if (!isSchemaEvolutionEnabled) { + InternalSchema.getEmptyInternalSchema + } else { + Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match { + case Success(internalSchemaOpt) => + toScalaOption(internalSchemaOpt).getOrElse(InternalSchema.getEmptyInternalSchema) + case Failure(e) => + logWarning("Failed to fetch internal-schema from the table", e) + InternalSchema.getEmptyInternalSchema + } } - (avroSchema, internalSchemaFromMeta) + + (avroSchema, internalSchema) } protected lazy val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) @@ -503,6 +507,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, private def prunePartitionColumns(dataStructSchema: StructType): StructType = StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name))) + + private def isSchemaEvolutionEnabled = { + // NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as + // t/h Spark Session configuration (for ex, for Spark SQL) + optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || + sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean + } } object HoodieBaseRelation extends SparkAdapterSupport { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index da2736e59bdda..84280559e9552 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -26,7 +26,7 @@ import org.apache.hudi.HoodieConversionUtils.toProperties import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} -import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties} +import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieConfig, HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model._ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline @@ -338,7 +338,7 @@ object HoodieSparkSqlWriter { def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema]): Map[String, String] = { val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else "false" parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> SerDeHelper.toJson(internalSchemaOpt.getOrElse(null)), - HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable) + HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable) } /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 60428415861be..63f1a7afc2cad 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -18,11 +18,10 @@ package org.apache.hudi import java.util.Properties - import org.apache.hudi.DataSourceOptionsHelper.allAlternatives import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE -import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieConfig, TypedProperties} +import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, HoodieConfig, TypedProperties} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException @@ -163,9 +162,9 @@ object HoodieWriterUtils { // Check schema evolution for bootstrap table. // now we do not support bootstrap table. if (params.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL) - && params.getOrElse(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key(), "false").toBoolean) { + && params.getOrElse(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "false").toBoolean) { throw new HoodieException(String - .format("now schema evolution cannot support bootstrap table, pls set %s to false", HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key())) + .format("now schema evolution cannot support bootstrap table, pls set %s to false", HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key())) } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala index fa01ba37e9d7a..e69d0d5293af1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -253,8 +253,11 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport { .filterKeys(_.startsWith("hoodie.")) } - def isEnableHive(sparkSession: SparkSession): Boolean = - "hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) + /** + * Checks whether Spark is using Hive as Session's Catalog + */ + def isUsingHiveCatalog(sparkSession: SparkSession): Boolean = + sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive" /** * Convert different query instant time format to the commit time format. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index eca73be0bb39d..3f67d5017fc9c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.hive.HiveExternalCatalog -import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isEnableHive, withSparkConf} +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isUsingHiveCatalog, withSparkConf} import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -57,7 +57,7 @@ trait ProvidesHoodieConfig extends Logging { require(hoodieCatalogTable.primaryKeys.nonEmpty, s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator") - val enableHive = isEnableHive(sparkSession) + val enableHive = isUsingHiveCatalog(sparkSession) val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf) @@ -174,7 +174,7 @@ trait ProvidesHoodieConfig extends Logging { logInfo(s"Insert statement use write operation type: $operation, payloadClass: $payloadClassName") - val enableHive = isEnableHive(sparkSession) + val enableHive = isUsingHiveCatalog(sparkSession) withSparkConf(sparkSession, catalogProperties) { Map( @@ -213,7 +213,7 @@ trait ProvidesHoodieConfig extends Logging { hoodieCatalogTable: HoodieCatalogTable, partitionsToDrop: String): Map[String, String] = { val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") - val enableHive = isEnableHive(sparkSession) + val enableHive = isUsingHiveCatalog(sparkSession) val catalogProperties = hoodieCatalogTable.catalogProperties val tableConfig = hoodieCatalogTable.tableConfig @@ -259,7 +259,7 @@ trait ProvidesHoodieConfig extends Logging { val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) val options = hoodieCatalogTable.catalogProperties - val enableHive = isEnableHive(sparkSession) + val enableHive = isUsingHiveCatalog(sparkSession) withSparkConf(sparkSession, options) { Map( diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index 9bf1d721525c3..75803fd77903b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps import org.apache.spark.sql.hive.HiveClientUtils import org.apache.spark.sql.hive.HiveExternalCatalog._ -import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isEnableHive +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils} import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD import org.apache.spark.sql.types.StructType @@ -144,7 +144,7 @@ object CreateHoodieTableCommand { ) // Create table in the catalog - val enableHive = isEnableHive(sparkSession) + val enableHive = isUsingHiveCatalog(sparkSession) if (enableHive) { createHiveDataSourceTable(sparkSession, newTable, ignoreIfExists) } else { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala index fff44bb7f570b..783875296cf60 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala @@ -33,17 +33,13 @@ class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit) new HoodieCommonSqlParser(session, parser) } - HoodieAnalysis.customResolutionRules().foreach { rule => + HoodieAnalysis.customResolutionRules.foreach { ruleBuilder => extensions.injectResolutionRule { session => - rule(session) + ruleBuilder(session) } } - extensions.injectResolutionRule { session => - sparkAdapter.createResolveHudiAlterTableCommand(session) - } - - HoodieAnalysis.customPostHocResolutionRules().foreach { rule => + HoodieAnalysis.customPostHocResolutionRules.foreach { rule => extensions.injectPostHocResolutionRule { session => rule(session) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index dcacbef3a26fa..97e453ff7e939 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -39,45 +39,69 @@ import org.apache.spark.sql.{AnalysisException, SparkSession} import java.util import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer object HoodieAnalysis { - def customResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = - Seq( + type RuleBuilder = SparkSession => Rule[LogicalPlan] + + def customResolutionRules: Seq[RuleBuilder] = { + val rules: ListBuffer[RuleBuilder] = ListBuffer( + // Default rules session => HoodieResolveReferences(session), session => HoodieAnalysis(session) - ) ++ extraResolutionRules() - - def customPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = - Seq( - session => HoodiePostAnalysisRule(session) - ) ++ extraPostHocResolutionRules() + ) - def extraResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = { if (HoodieSparkUtils.gteqSpark3_2) { + val dataSourceV2ToV1FallbackClass = "org.apache.spark.sql.hudi.analysis.HoodieDataSourceV2ToV1Fallback" + val dataSourceV2ToV1Fallback: RuleBuilder = + session => ReflectionUtils.loadClass(dataSourceV2ToV1FallbackClass, session).asInstanceOf[Rule[LogicalPlan]] + val spark3AnalysisClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3Analysis" - val spark3Analysis: SparkSession => Rule[LogicalPlan] = + val spark3Analysis: RuleBuilder = session => ReflectionUtils.loadClass(spark3AnalysisClass, session).asInstanceOf[Rule[LogicalPlan]] - val spark3ResolveReferences = "org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences" - val spark3References: SparkSession => Rule[LogicalPlan] = - session => ReflectionUtils.loadClass(spark3ResolveReferences, session).asInstanceOf[Rule[LogicalPlan]] + val spark3ResolveReferencesClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences" + val spark3ResolveReferences: RuleBuilder = + session => ReflectionUtils.loadClass(spark3ResolveReferencesClass, session).asInstanceOf[Rule[LogicalPlan]] - Seq(spark3Analysis, spark3References) - } else { - Seq.empty + val spark32ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32" + val spark32ResolveAlterTableCommands: RuleBuilder = + session => ReflectionUtils.loadClass(spark32ResolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]] + + // NOTE: PLEASE READ CAREFULLY + // + // It's critical for this rules to follow in this order, so that DataSource V2 to V1 fallback + // is performed prior to other rules being evaluated + rules ++= Seq(dataSourceV2ToV1Fallback, spark3Analysis, spark3ResolveReferences, spark32ResolveAlterTableCommands) + + } else if (HoodieSparkUtils.gteqSpark3_1) { + val spark31ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommand312" + val spark31ResolveAlterTableCommands: RuleBuilder = + session => ReflectionUtils.loadClass(spark31ResolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]] + + rules ++= Seq(spark31ResolveAlterTableCommands) } + + rules } - def extraPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = + def customPostHocResolutionRules: Seq[RuleBuilder] = { + val rules: ListBuffer[RuleBuilder] = ListBuffer( + // Default rules + session => HoodiePostAnalysisRule(session) + ) + if (HoodieSparkUtils.gteqSpark3_2) { val spark3PostHocResolutionClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3PostAnalysisRule" - val spark3PostHocResolution: SparkSession => Rule[LogicalPlan] = + val spark3PostHocResolution: RuleBuilder = session => ReflectionUtils.loadClass(spark3PostHocResolutionClass, session).asInstanceOf[Rule[LogicalPlan]] - Seq(spark3PostHocResolution) - } else { - Seq.empty + rules += spark3PostHocResolution } + + rules + } + } /** diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index f7c62adc6578e..636599ce0cf48 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -453,7 +453,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable) // Enable the hive sync by default if spark have enable the hive metastore. - val enableHive = isEnableHive(sparkSession) + val enableHive = isUsingHiveCatalog(sparkSession) withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) { Map( "path" -> path, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 7c86da0c9e362..fea1ec357145b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -18,20 +18,24 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.FileSystem +import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} +import org.apache.hudi.common.util import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.{HoodieException, HoodieUpsertException} import org.apache.hudi.keygen._ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.hudi.util.JFunction import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, concat, lit, udf} +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.types._ import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat @@ -42,6 +46,7 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, ValueSource} import java.sql.{Date, Timestamp} +import java.util.function.Consumer import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -67,6 +72,12 @@ class TestCOWDataSource extends HoodieClientTestBase { val verificationCol: String = "driver" val updatedVerificationVal: String = "driver_update" + override def getSparkSessionExtensionsInjector: util.Option[Consumer[SparkSessionExtensions]] = + toJavaOption( + Some( + JFunction.toJava((receiver: SparkSessionExtensions) => new HoodieSparkSessionExtension().apply(receiver))) + ) + @BeforeEach override def setUp() { initPath() initSparkContexts() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala index 68fc6d7c41d89..6736f44799168 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala @@ -25,6 +25,7 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.util.Utils +import org.joda.time.DateTimeZone import org.scalactic.source import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag} @@ -40,7 +41,10 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { dir } - TimeZone.setDefault(DateTimeUtils.getTimeZone("CTT")) + // NOTE: We have to fix the timezone to make sure all date-/timestamp-bound utilities output + // is consistent with the fixtures + DateTimeZone.setDefault(DateTimeZone.UTC) + TimeZone.setDefault(DateTimeUtils.getTimeZone("UTC")) protected lazy val spark: SparkSession = SparkSession.builder() .master("local[1]") .appName("hoodie sql test") @@ -50,7 +54,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { .config("hoodie.upsert.shuffle.parallelism", "4") .config("hoodie.delete.shuffle.parallelism", "4") .config("spark.sql.warehouse.dir", sparkWareHouse.getCanonicalPath) - .config("spark.sql.session.timeZone", "CTT") + .config("spark.sql.session.timeZone", "UTC") .config(sparkConf()) .getOrCreate() diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index 0e74c997d7ee4..27c7d0c44531d 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -159,12 +159,6 @@ class Spark2Adapter extends SparkAdapter { throw new IllegalStateException(s"Should not call getRelationTimeTravel for spark2") } - override def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan] = { - new Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan - } - } - override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { Some(new Spark24HoodieParquetFileFormat(appendPartitionValues)) } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index e5f4476cc5a98..c47fbdead2b37 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -112,8 +112,8 @@ abstract class BaseSpark3Adapter extends SparkAdapter { } override def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = { - tripAlias(table) match { - case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl) + unfoldSubqueryAliases(table) match { + case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table) case relation: UnresolvedRelation => isHoodieTable(toTableIdentifier(relation), spark) case DataSourceV2Relation(table: Table, _, _, _, _) => isHoodieTable(table.properties()) diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala index 22431cb2574a3..9dcf53062178a 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala @@ -41,19 +41,6 @@ class Spark3_1Adapter extends BaseSpark3Adapter { override def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer = new HoodieSpark3_1AvroDeserializer(rootAvroType, rootCatalystType) - override def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan] = { - if (SPARK_VERSION.startsWith("3.1")) { - val loadClassName = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommand312" - val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader) - val ctor = clazz.getConstructors.head - ctor.newInstance(sparkSession).asInstanceOf[Rule[LogicalPlan]] - } else { - new Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan - } - } - } - override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { Some(new Spark31HoodieParquetFileFormat(appendPartitionValues)) } diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommand312.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommand312.scala index 522cecdaaf07f..11dff7eb868b3 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommand312.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommand312.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.hudi +import org.apache.hudi.common.config.HoodieCommonConfig + import java.util.Locale import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID @@ -114,8 +116,9 @@ case class ResolveHudiAlterTableCommand312(sparkSession: SparkSession) extends R } } - private def schemaEvolutionEnabled(): Boolean = sparkSession - .sessionState.conf.getConfString(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key(), "false").toBoolean + private def schemaEvolutionEnabled(): Boolean = + sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key, + HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString).toBoolean private def isHoodieTable(table: CatalogTable): Boolean = table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi" diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala index d94fee1f410ae..3bc3446d1f120 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala @@ -17,19 +17,19 @@ package org.apache.hudi -import org.apache.hudi.exception.HoodieException -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.catalog.{Table, TableProvider} -import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap -class Spark3DefaultSource extends DefaultSource with DataSourceRegister with TableProvider { +/** + * NOTE: PLEASE READ CAREFULLY + * All of Spark DataSourceV2 APIs are deliberately disabled to make sure + * there are no regressions in performance + * Please check out HUDI-4178 for more details + */ +class Spark3DefaultSource extends DefaultSource with DataSourceRegister /* with TableProvider */ { override def shortName(): String = "hudi" + /* def inferSchema: StructType = new StructType() override def inferSchema(options: CaseInsensitiveStringMap): StructType = inferSchema @@ -43,4 +43,5 @@ class Spark3DefaultSource extends DefaultSource with DataSourceRegister with Tab HoodieInternalV2Table(SparkSession.active, path) } + */ } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala index 15624c741130c..1b045f6654207 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala @@ -67,19 +67,6 @@ class Spark3_2Adapter extends BaseSpark3Adapter { ) } - override def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan] = { - if (SPARK_VERSION.startsWith("3.2")) { - val loadClassName = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32" - val clazz = Class.forName(loadClassName, true, Thread.currentThread().getContextClassLoader) - val ctor = clazz.getConstructors.head - ctor.newInstance(sparkSession).asInstanceOf[Rule[LogicalPlan]] - } else { - new Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan - } - } - } - override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = { Some(new Spark32HoodieParquetFileFormat(appendPartitionValues)) } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommandSpark32.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommandSpark32.scala index 96d919cf0a5b5..f6f18261565e8 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommandSpark32.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/ResolveHudiAlterTableCommandSpark32.scala @@ -17,12 +17,12 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.common.config.HoodieCommonConfig import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.internal.schema.action.TableChange.ColumnChangeID -import org.apache.spark.sql.catalyst.analysis.ResolvedTable -import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, DropColumns, LogicalPlan, RenameColumn, ReplaceColumns, SetTableProperties, UnsetTableProperties} +import org.apache.spark.sql.catalyst.analysis.ResolvedTable +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table import org.apache.spark.sql.hudi.command.{AlterTableCommand => HudiAlterTableCommand} @@ -33,33 +33,38 @@ import org.apache.spark.sql.hudi.command.{AlterTableCommand => HudiAlterTableCom */ class ResolveHudiAlterTableCommandSpark32(sparkSession: SparkSession) extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case set @ SetTableProperties(asTable(table), _) if schemaEvolutionEnabled && set.resolved => - HudiAlterTableCommand(table, set.changes, ColumnChangeID.PROPERTY_CHANGE) - case unSet @ UnsetTableProperties(asTable(table), _, _) if schemaEvolutionEnabled && unSet.resolved => - HudiAlterTableCommand(table, unSet.changes, ColumnChangeID.PROPERTY_CHANGE) - case drop @ DropColumns(asTable(table), _) if schemaEvolutionEnabled && drop.resolved => - HudiAlterTableCommand(table, drop.changes, ColumnChangeID.DELETE) - case add @ AddColumns(asTable(table), _) if schemaEvolutionEnabled && add.resolved => - HudiAlterTableCommand(table, add.changes, ColumnChangeID.ADD) - case renameColumn @ RenameColumn(asTable(table), _, _) if schemaEvolutionEnabled && renameColumn.resolved=> - HudiAlterTableCommand(table, renameColumn.changes, ColumnChangeID.UPDATE) - case alter @ AlterColumn(asTable(table), _, _, _, _, _) if schemaEvolutionEnabled && alter.resolved => - HudiAlterTableCommand(table, alter.changes, ColumnChangeID.UPDATE) - case replace @ ReplaceColumns(asTable(table), _) if schemaEvolutionEnabled && replace.resolved => - HudiAlterTableCommand(table, replace.changes, ColumnChangeID.REPLACE) + def apply(plan: LogicalPlan): LogicalPlan = { + if (schemaEvolutionEnabled) { + plan.resolveOperatorsUp { + case set@SetTableProperties(ResolvedHoodieV2TablePlan(t), _) if set.resolved => + HudiAlterTableCommand(t.v1Table, set.changes, ColumnChangeID.PROPERTY_CHANGE) + case unSet@UnsetTableProperties(ResolvedHoodieV2TablePlan(t), _, _) if unSet.resolved => + HudiAlterTableCommand(t.v1Table, unSet.changes, ColumnChangeID.PROPERTY_CHANGE) + case drop@DropColumns(ResolvedHoodieV2TablePlan(t), _) if drop.resolved => + HudiAlterTableCommand(t.v1Table, drop.changes, ColumnChangeID.DELETE) + case add@AddColumns(ResolvedHoodieV2TablePlan(t), _) if add.resolved => + HudiAlterTableCommand(t.v1Table, add.changes, ColumnChangeID.ADD) + case renameColumn@RenameColumn(ResolvedHoodieV2TablePlan(t), _, _) if renameColumn.resolved => + HudiAlterTableCommand(t.v1Table, renameColumn.changes, ColumnChangeID.UPDATE) + case alter@AlterColumn(ResolvedHoodieV2TablePlan(t), _, _, _, _, _) if alter.resolved => + HudiAlterTableCommand(t.v1Table, alter.changes, ColumnChangeID.UPDATE) + case replace@ReplaceColumns(ResolvedHoodieV2TablePlan(t), _) if replace.resolved => + HudiAlterTableCommand(t.v1Table, replace.changes, ColumnChangeID.REPLACE) + } + } else { + plan + } } - private def schemaEvolutionEnabled(): Boolean = sparkSession - .sessionState.conf.getConfString(HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key(), "false").toBoolean + private def schemaEvolutionEnabled: Boolean = + sparkSession.sessionState.conf.getConfString(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key, + HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue.toString).toBoolean - object asTable { - def unapply(a: LogicalPlan): Option[CatalogTable] = { - a match { - case ResolvedTable(_, _, table: HoodieInternalV2Table, _) => - table.catalogTable - case _ => - None + object ResolvedHoodieV2TablePlan { + def unapply(plan: LogicalPlan): Option[HoodieInternalV2Table] = { + plan match { + case ResolvedTable(_, _, v2Table: HoodieInternalV2Table, _) => Some(v2Table) + case _ => None } } } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala index 4c77733b144aa..e351174587245 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala @@ -17,72 +17,77 @@ package org.apache.spark.sql.hudi.analysis -import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.{DefaultSource, SparkAdapterSupport} +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedPartitionSpec} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper +import org.apache.spark.sql.connector.catalog.{Table, V1Table} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.PreWriteCheck.failAnalysis import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, V2SessionCatalog} -import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{castIfNeeded, getTableLocation, removeMetaFields, tableExistsInPath} import org.apache.spark.sql.hudi.catalog.{HoodieCatalog, HoodieInternalV2Table} import org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand, ShowHoodieTablePartitionsCommand, TruncateHoodieTableCommand} +import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, SQLContext, SparkSession} import scala.collection.JavaConverters.mapAsJavaMapConverter /** - * Rule for convert the logical plan to command. - * @param sparkSession + * NOTE: PLEASE READ CAREFULLY + * + * Since Hudi relations don't currently implement DS V2 Read API, we have to fallback to V1 here. + * Such fallback will have considerable performance impact, therefore it's only performed in cases + * where V2 API have to be used. Currently only such use-case is using of Schema Evolution feature + * + * Check out HUDI-4178 for more details */ -case class HoodieSpark3Analysis(sparkSession: SparkSession) extends Rule[LogicalPlan] - with SparkAdapterSupport with ProvidesHoodieConfig { +class HoodieDataSourceV2ToV1Fallback(sparkSession: SparkSession) extends Rule[LogicalPlan] + with ProvidesHoodieConfig { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown { - case dsv2 @ DataSourceV2Relation(d: HoodieInternalV2Table, _, _, _, _) => - val output = dsv2.output - val catalogTable = if (d.catalogTable.isDefined) { - Some(d.v1Table) - } else { - None - } + case v2r @ DataSourceV2Relation(v2Table: HoodieInternalV2Table, _, _, _, _) => + val output = v2r.output + val catalogTable = v2Table.catalogTable.map(_ => v2Table.v1Table) val relation = new DefaultSource().createRelation(new SQLContext(sparkSession), - buildHoodieConfig(d.hoodieCatalogTable)) + buildHoodieConfig(v2Table.hoodieCatalogTable), v2Table.hoodieCatalogTable.tableSchema) + LogicalRelation(relation, output, catalogTable, isStreaming = false) - case a @ InsertIntoStatement(r: DataSourceV2Relation, partitionSpec, _, _, _, _) if a.query.resolved && - r.table.isInstanceOf[HoodieInternalV2Table] && - needsSchemaAdjustment(a.query, r.table.asInstanceOf[HoodieInternalV2Table], partitionSpec, r.schema) => - val projection = resolveQueryColumnsByOrdinal(a.query, r.output) - if (projection != a.query) { - a.copy(query = projection) - } else { - a - } + } +} + +class HoodieSpark3Analysis(sparkSession: SparkSession) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown { + case s @ InsertIntoStatement(r @ DataSourceV2Relation(v2Table: HoodieInternalV2Table, _, _, _, _), partitionSpec, _, _, _, _) + if s.query.resolved && needsSchemaAdjustment(s.query, v2Table.hoodieCatalogTable.table, partitionSpec, r.schema) => + val projection = resolveQueryColumnsByOrdinal(s.query, r.output) + if (projection != s.query) { + s.copy(query = projection) + } else { + s + } } /** * Need to adjust schema based on the query and relation schema, for example, * if using insert into xx select 1, 2 here need to map to column names - * @param query - * @param hoodieTable - * @param partitionSpec - * @param schema - * @return */ private def needsSchemaAdjustment(query: LogicalPlan, - hoodieTable: HoodieInternalV2Table, + table: CatalogTable, partitionSpec: Map[String, Option[String]], schema: StructType): Boolean = { val output = query.output val queryOutputWithoutMetaFields = removeMetaFields(output) - val partitionFields = hoodieTable.hoodieCatalogTable.partitionFields - val partitionSchema = hoodieTable.hoodieCatalogTable.partitionSchema + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, table) + + val partitionFields = hoodieCatalogTable.partitionFields + val partitionSchema = hoodieCatalogTable.partitionSchema val staticPartitionValues = partitionSpec.filter(p => p._2.isDefined).mapValues(_.get) assert(staticPartitionValues.isEmpty || @@ -91,8 +96,8 @@ case class HoodieSpark3Analysis(sparkSession: SparkSession) extends Rule[Logical s"is: ${staticPartitionValues.mkString("," + "")}") assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size - == hoodieTable.hoodieCatalogTable.tableSchemaWithoutMetaFields.size, - s"Required select columns count: ${hoodieTable.hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " + + == hoodieCatalogTable.tableSchemaWithoutMetaFields.size, + s"Required select columns count: ${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " + s"Current select columns(including static partition column) count: " + s"${staticPartitionValues.size + queryOutputWithoutMetaFields.size},columns: " + s"(${(queryOutputWithoutMetaFields.map(_.name) ++ staticPartitionValues.keys).mkString(",")})") @@ -126,7 +131,6 @@ case class HoodieSpark3Analysis(sparkSession: SparkSession) extends Rule[Logical /** * Rule for resolve hoodie's extended syntax or rewrite some logical plan. - * @param sparkSession */ case class HoodieSpark3ResolveReferences(sparkSession: SparkSession) extends Rule[LogicalPlan] with SparkAdapterSupport with ProvidesHoodieConfig { @@ -173,28 +177,26 @@ case class HoodieSpark3ResolveReferences(sparkSession: SparkSession) extends Rul } /** - * Rule for rewrite some spark commands to hudi's implementation. - * @param sparkSession + * Rule replacing resolved Spark's commands (not working for Hudi tables out-of-the-box) with + * corresponding Hudi implementations */ case class HoodieSpark3PostAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { plan match { - case ShowPartitions(ResolvedTable(_, idt, _: HoodieInternalV2Table, _), specOpt, _) => + case ShowPartitions(ResolvedTable(_, id, HoodieV1OrV2Table(_), _), specOpt, _) => ShowHoodieTablePartitionsCommand( - idt.asTableIdentifier, specOpt.map(s => s.asInstanceOf[UnresolvedPartitionSpec].spec)) + id.asTableIdentifier, specOpt.map(s => s.asInstanceOf[UnresolvedPartitionSpec].spec)) // Rewrite TruncateTableCommand to TruncateHoodieTableCommand - case TruncateTable(ResolvedTable(_, idt, _: HoodieInternalV2Table, _)) => - TruncateHoodieTableCommand(idt.asTableIdentifier, None) + case TruncateTable(ResolvedTable(_, id, HoodieV1OrV2Table(_), _)) => + TruncateHoodieTableCommand(id.asTableIdentifier, None) - case TruncatePartition( - ResolvedTable(_, idt, _: HoodieInternalV2Table, _), - partitionSpec: UnresolvedPartitionSpec) => - TruncateHoodieTableCommand(idt.asTableIdentifier, Some(partitionSpec.spec)) + case TruncatePartition(ResolvedTable(_, id, HoodieV1OrV2Table(_), _), partitionSpec: UnresolvedPartitionSpec) => + TruncateHoodieTableCommand(id.asTableIdentifier, Some(partitionSpec.spec)) - case DropPartitions(ResolvedTable(_, idt, _: HoodieInternalV2Table, _), specs, ifExists, purge) => + case DropPartitions(ResolvedTable(_, id, HoodieV1OrV2Table(_), _), specs, ifExists, purge) => AlterHoodieTableDropPartitionCommand( - idt.asTableIdentifier, + id.asTableIdentifier, specs.seq.map(f => f.asInstanceOf[UnresolvedPartitionSpec]).map(s => s.spec), ifExists, purge, @@ -205,3 +207,12 @@ case class HoodieSpark3PostAnalysisRule(sparkSession: SparkSession) extends Rule } } } + +private[sql] object HoodieV1OrV2Table extends SparkAdapterSupport { + def unapply(table: Table): Option[CatalogTable] = table match { + case V1Table(catalogTable) if sparkAdapter.isHoodieTable(catalogTable) => Some(catalogTable) + case v2: HoodieInternalV2Table => v2.catalogTable + case _ => None + } +} + diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala index e1c2f228fa311..2b3b7a0782c94 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.hudi.exception.HoodieException import org.apache.hudi.sql.InsertMode import org.apache.hudi.sync.common.util.ConfigUtils -import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, SparkAdapterSupport} import org.apache.spark.sql.HoodieSpark3SqlUtils.convertTransforms import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute} @@ -33,6 +33,7 @@ import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChan import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.hudi.analysis.HoodieV1OrV2Table import org.apache.spark.sql.hudi.command._ import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig} import org.apache.spark.sql.types.{StructField, StructType} @@ -105,12 +106,30 @@ class HoodieCatalog extends DelegatingCatalogExtension case _ => catalogTable0 } - HoodieInternalV2Table( + + val v2Table = HoodieInternalV2Table( spark = spark, path = catalogTable.location.toString, catalogTable = Some(catalogTable), tableIdentifier = Some(ident.toString)) - case o => o + + val schemaEvolutionEnabled: Boolean = spark.sessionState.conf.getConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean + + // NOTE: PLEASE READ CAREFULLY + // + // Since Hudi relations don't currently implement DS V2 Read API, we by default fallback to V1 here. + // Such fallback will have considerable performance impact, therefore it's only performed in cases + // where V2 API have to be used. Currently only such use-case is using of Schema Evolution feature + // + // Check out HUDI-4178 for more details + if (schemaEvolutionEnabled) { + v2Table + } else { + v2Table.v1TableWrapper + } + + case t => t } } @@ -132,7 +151,7 @@ class HoodieCatalog extends DelegatingCatalogExtension override def dropTable(ident: Identifier): Boolean = { val table = loadTable(ident) table match { - case _: HoodieInternalV2Table => + case HoodieV1OrV2Table(_) => DropHoodieTableCommand(ident.asTableIdentifier, ifExists = true, isView = false, purge = false).run(spark) true case _ => super.dropTable(ident) @@ -142,7 +161,7 @@ class HoodieCatalog extends DelegatingCatalogExtension override def purgeTable(ident: Identifier): Boolean = { val table = loadTable(ident) table match { - case _: HoodieInternalV2Table => + case HoodieV1OrV2Table(_) => DropHoodieTableCommand(ident.asTableIdentifier, ifExists = true, isView = false, purge = true).run(spark) true case _ => super.purgeTable(ident) @@ -153,56 +172,53 @@ class HoodieCatalog extends DelegatingCatalogExtension @throws[TableAlreadyExistsException] override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { loadTable(oldIdent) match { - case _: HoodieInternalV2Table => + case HoodieV1OrV2Table(_) => AlterHoodieTableRenameCommand(oldIdent.asTableIdentifier, newIdent.asTableIdentifier, false).run(spark) case _ => super.renameTable(oldIdent, newIdent) } } override def alterTable(ident: Identifier, changes: TableChange*): Table = { - val tableIdent = TableIdentifier(ident.name(), ident.namespace().lastOption) - // scalastyle:off - val table = loadTable(ident) match { - case hoodieTable: HoodieInternalV2Table => hoodieTable - case _ => return super.alterTable(ident, changes: _*) - } - // scalastyle:on - - val grouped = changes.groupBy(c => c.getClass) - - grouped.foreach { - case (t, newColumns) if t == classOf[AddColumn] => - AlterHoodieTableAddColumnsCommand( - tableIdent, - newColumns.asInstanceOf[Seq[AddColumn]].map { col => - StructField( - col.fieldNames()(0), - col.dataType(), - col.isNullable) - }).run(spark) - case (t, columnChanges) if classOf[ColumnChange].isAssignableFrom(t) => - columnChanges.foreach { - case dataType: UpdateColumnType => - val colName = UnresolvedAttribute(dataType.fieldNames()).name - val newDataType = dataType.newDataType() - val structField = StructField(colName, newDataType) - AlterHoodieTableChangeColumnCommand(tableIdent, colName, structField).run(spark) - case dataType: UpdateColumnComment => - val newComment = dataType.newComment() - val colName = UnresolvedAttribute(dataType.fieldNames()).name - val fieldOpt = table.schema().findNestedField(dataType.fieldNames(), includeCollections = true, - spark.sessionState.conf.resolver).map(_._2) - val field = fieldOpt.getOrElse { - throw new AnalysisException( - s"Couldn't find column $colName in:\n${table.schema().treeString}") + loadTable(ident) match { + case HoodieV1OrV2Table(table) => { + val tableIdent = TableIdentifier(ident.name(), ident.namespace().lastOption) + changes.groupBy(c => c.getClass).foreach { + case (t, newColumns) if t == classOf[AddColumn] => + AlterHoodieTableAddColumnsCommand( + tableIdent, + newColumns.asInstanceOf[Seq[AddColumn]].map { col => + StructField( + col.fieldNames()(0), + col.dataType(), + col.isNullable) + }).run(spark) + + case (t, columnChanges) if classOf[ColumnChange].isAssignableFrom(t) => + columnChanges.foreach { + case dataType: UpdateColumnType => + val colName = UnresolvedAttribute(dataType.fieldNames()).name + val newDataType = dataType.newDataType() + val structField = StructField(colName, newDataType) + AlterHoodieTableChangeColumnCommand(tableIdent, colName, structField).run(spark) + case dataType: UpdateColumnComment => + val newComment = dataType.newComment() + val colName = UnresolvedAttribute(dataType.fieldNames()).name + val fieldOpt = table.schema.findNestedField(dataType.fieldNames(), includeCollections = true, + spark.sessionState.conf.resolver).map(_._2) + val field = fieldOpt.getOrElse { + throw new AnalysisException( + s"Couldn't find column $colName in:\n${table.schema.treeString}") + } + AlterHoodieTableChangeColumnCommand(tableIdent, colName, field.withComment(newComment)).run(spark) } - AlterHoodieTableChangeColumnCommand(tableIdent, colName, field.withComment(newComment)).run(spark) + case (t, _) => + throw new UnsupportedOperationException(s"not supported table change: ${t.getClass}") } - case (t, _) => - throw new UnsupportedOperationException(s"not supported table change: ${t.getClass}") - } - loadTable(ident) + loadTable(ident) + } + case _ => super.alterTable(ident, changes: _*) + } } private def deduceTableLocationURIAndTableType( diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala index 848925aafe417..9eb4a773f8d4f 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala @@ -21,7 +21,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.connector.catalog.TableCapability._ -import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, V2TableWithV1Fallback} +import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, V1Table, V2TableWithV1Fallback} import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.hudi.ProvidesHoodieConfig @@ -74,6 +74,8 @@ case class HoodieInternalV2Table(spark: SparkSession, override def v1Table: CatalogTable = hoodieCatalogTable.table + def v1TableWrapper: V1Table = V1Table(v1Table) + override def partitioning(): Array[Transform] = { hoodieCatalogTable.partitionFields.map { col => new IdentityTransform(new FieldReference(Seq(col)))