diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java index 4da675ea8200..c7cb544aec94 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java @@ -22,12 +22,14 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metadata.MetadataPartitionType; import java.util.Hashtable; import java.util.Map; +import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME; import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM; import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists; @@ -40,6 +42,10 @@ public class ThreeToFourUpgradeHandler implements UpgradeHandler { @Override public Map upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { Map tablePropsToAdd = new Hashtable<>(); + String database = config.getString(DATABASE_NAME); + if (StringUtils.nonEmpty(database)) { + tablePropsToAdd.put(DATABASE_NAME, database); + } tablePropsToAdd.put(TABLE_CHECKSUM, String.valueOf(HoodieTableConfig.generateChecksum(config.getProps()))); // if metadata is enabled and files partition exist then update TABLE_METADATA_INDEX_COMPLETED // schema for the files partition is same between the two versions diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala index 0ae413040bc1..b94f09665750 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala @@ -20,16 +20,18 @@ package org.apache.spark.sql.hudi.command.procedures import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion -import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion} import org.apache.hudi.common.util.Option import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig, HoodieCleanConfig} import org.apache.hudi.index.HoodieIndex import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, UpgradeDowngrade} +import org.apache.hudi.HoodieCLIUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} import java.util.function.Supplier +import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder with Logging { @@ -51,9 +53,8 @@ class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder wi val tableName = getArgValueOrDefault(args, PARAMETERS(0)) val toVersion = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] - val basePath = getBasePath(tableName) - - val config = getWriteConfigWithTrue(basePath) + val config = getWriteConfigWithTrue(tableName) + val basePath = config.getBasePath val metaClient = HoodieTableMetaClient.builder .setConf(jsc.hadoopConfiguration) .setBasePath(config.getBasePath) @@ -78,12 +79,16 @@ class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder wi Seq(Row(result)) } - private def getWriteConfigWithTrue(basePath: String) = { + private def getWriteConfigWithTrue(tableOpt: scala.Option[Any]) = { + val basePath = getBasePath(tableOpt) + val (tableName, database) = HoodieCLIUtils.getTableIdentifier(tableOpt.get.asInstanceOf[String]) HoodieWriteConfig.newBuilder + .forTable(tableName) .withPath(basePath) .withRollbackUsingMarkers(true) .withCleanConfig(HoodieCleanConfig.newBuilder.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build) .withIndexConfig(HoodieIndexConfig.newBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build) + .withProps(Map(HoodieTableConfig.DATABASE_NAME.key -> database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase)).asJava) .build } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala index ff4b5aa92ead..1bd29cabc400 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala @@ -82,6 +82,33 @@ class TestUpgradeOrDowngradeProcedure extends HoodieSparkProcedureTestBase { } } + test("Test Call upgrade_table from version three") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // downgrade table to THREE + checkAnswer(s"""call downgrade_table(table => '$tableName', to_version => 'THREE')""")(Seq(true)) + // upgrade table to FOUR + checkAnswer(s"""call upgrade_table(table => '$tableName', to_version => 'FOUR')""")(Seq(true)) + } + } + @throws[IOException] private def assertTableVersionFromPropertyFile(metaClient: HoodieTableMetaClient, versionCode: Int): Unit = { val propertyFile = new Path(metaClient.getMetaPath + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE)