Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,13 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("Auto adjust lock configurations when metadata table is enabled and for async table services.");

public static final ConfigProperty<Boolean> SKIP_DEFAULT_PARTITION_VALIDATION = ConfigProperty
.key("hoodie.skip.default.partition.validation")
.defaultValue(false)
.sinceVersion("0.12.0")
.withDocumentation("When table is upgraded from pre 0.12 to 0.12, we check for \"default\" partition and fail if found one. "
+ "Users are expected to rewrite the data in those partitions. Enabling this config will bypass this validation");

private ConsistencyGuardConfig consistencyGuardConfig;
private FileSystemRetryConfig fileSystemRetryConfig;

Expand Down Expand Up @@ -2038,6 +2045,11 @@ public WriteConcurrencyMode getWriteConcurrencyMode() {
return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE));
}

// misc configs
public Boolean doSkipDefaultPartitionValidation() {
return getBoolean(SKIP_DEFAULT_PARTITION_VALIDATION);
}

/**
* Are any table services configured to run inline for both scheduling and execution?
*
Expand Down Expand Up @@ -2517,6 +2529,11 @@ public Builder withAutoAdjustLockConfigs(boolean autoAdjustLockConfigs) {
return this;
}

public Builder doSkipDefaultPartitionValidation(boolean skipDefaultPartitionValidation) {
writeConfig.setValue(SKIP_DEFAULT_PARTITION_VALIDATION, String.valueOf(skipDefaultPartitionValidation));
return this;
}

protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
// Check for mandatory properties
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.table.upgrade;

import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.config.HoodieWriteConfig;

import java.util.HashMap;
import java.util.Map;

public class FiveToFourDowngradeHandler implements DowngradeHandler {

@Override
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
return new HashMap<>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.table.upgrade;

import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH;

/**
* Upgrade handler to upgrade Hudi's table version from 4 to 5.
*/
public class FourToFiveUpgradeHandler implements UpgradeHandler {

private static final Logger LOG = LogManager.getLogger(FourToFiveUpgradeHandler.class);

@Override
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
try {
FileSystem fs = new Path(config.getBasePath()).getFileSystem(context.getHadoopConf().get());
if (!config.doSkipDefaultPartitionValidation() && fs.exists(new Path(config.getBasePath() + "/" + DEPRECATED_DEFAULT_PARTITION_PATH))) {
LOG.error(String.format("\"%s\" partition detected. From 0.12, we are changing the default partition in hudi to %s "
+ " Please read and write back the data in \"%s\" partition in hudi to new partition path \"%s\". \"\n"
+ " Sample spark command to use to re-write the data: \n\n"
+ " val df = spark.read.format(\"hudi\").load(HUDI_TABLE_PATH).filter(col(\"PARTITION_PATH_COLUMN\") === \"%s\"); \t \n\n"
+ " df.drop(\"_hoodie_commit_time\").drop(\"_hoodie_commit_seqno\").drop(\"_hoodie_record_key\")\"\n"
+ " .drop(\"_hoodie_partition_path\").drop(\"_hoodie_file_name\").withColumn(PARTITION_PATH_COLUMN,\"%s\")\"\n"
+ " .write.options(writeOptions).mode(Append).save(HUDI_TABLE_PATH);\t\n\"\n"
+ " Please fix values for PARTITION_PATH_COLUMN, HUDI_TABLE_PATH and set all write configs in above command before running. "
+ " Also do delete the records in old partition once above command succeeds. "
+ " Sample spark command to delete old partition records: \n\n"
+ " val df = spark.read.format(\"hudi\").load(HUDI_TABLE_PATH).filter(col(\"PARTITION_PATH_COLUMN\") === \"%s\"); \t \n\n"
+ " df.write.option(\"hoodie.datasource.write.operation\",\"delete\").options(writeOptions).mode(Append).save(HUDI_TABLE_PATH);\t\n\"\n",
DEPRECATED_DEFAULT_PARTITION_PATH, DEFAULT_PARTITION_PATH, DEPRECATED_DEFAULT_PARTITION_PATH, DEFAULT_PARTITION_PATH,
DEPRECATED_DEFAULT_PARTITION_PATH, DEFAULT_PARTITION_PATH, DEPRECATED_DEFAULT_PARTITION_PATH));
throw new HoodieException(String.format("Old deprecated \"%s\" partition found in hudi table. This needs a migration step before we can upgrade ",
DEPRECATED_DEFAULT_PARTITION_PATH));
}
} catch (IOException e) {
LOG.error("Fetching file system instance failed", e);
throw new HoodieException("Fetching FileSystem instance failed ", e);
}
return new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ protected Map<ConfigProperty, String> upgrade(HoodieTableVersion fromVersion, Ho
return new TwoToThreeUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
} else if (fromVersion == HoodieTableVersion.THREE && toVersion == HoodieTableVersion.FOUR) {
return new ThreeToFourUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
} else if (fromVersion == HoodieTableVersion.FOUR && toVersion == HoodieTableVersion.FIVE) {
return new FourToFiveUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper);
} else {
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true);
}
Expand All @@ -159,6 +161,8 @@ protected Map<ConfigProperty, String> downgrade(HoodieTableVersion fromVersion,
return new ThreeToTwoDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
} else if (fromVersion == HoodieTableVersion.FOUR && toVersion == HoodieTableVersion.THREE) {
return new FourToThreeDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
} else if (fromVersion == HoodieTableVersion.FIVE && toVersion == HoodieTableVersion.FOUR) {
return new FiveToFourDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper);
} else {
throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2015,7 +2015,7 @@ public void testUpgradeDowngrade() throws IOException {
assertTrue(currentStatus.getModificationTime() > prevStatus.getModificationTime());

initMetaClient();
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FOUR.versionCode());
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FIVE.versionCode());
assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist");
FileStatus newStatus = fs.getFileStatus(new Path(metadataTableBasePath));
assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime());
Expand Down Expand Up @@ -2095,7 +2095,7 @@ public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, Inte
}

initMetaClient();
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FOUR.versionCode());
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FIVE.versionCode());
assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist");
FileStatus newStatus = fs.getFileStatus(new Path(metadataTableBasePath));
assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
Expand Down Expand Up @@ -82,8 +83,10 @@
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
import static org.apache.hudi.common.util.MarkerUtils.MARKERS_FILENAME_PREFIX;
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -326,10 +329,78 @@ public void testUpgradeDowngradeBetweenThreeAndCurrentVersion() throws IOExcepti
assertEquals(checksum, metaClient.getTableConfig().getProps().getString(HoodieTableConfig.TABLE_CHECKSUM.key()));
}

@Test
public void testUpgradeFourtoFive() throws Exception {
testUpgradeFourToFiveInternal(false, false);
}

@Test
public void testUpgradeFourtoFiveWithDefaultPartition() throws Exception {
testUpgradeFourToFiveInternal(true, false);
}

@Test
public void testUpgradeFourtoFiveWithDefaultPartitionWithSkipValidation() throws Exception {
testUpgradeFourToFiveInternal(true, true);
}

private void testUpgradeFourToFiveInternal(boolean assertDefaultPartition, boolean skipDefaultPartitionValidation) throws Exception {
String tableName = metaClient.getTableConfig().getTableName();
// clean up and re instantiate meta client w/ right table props
cleanUp();
initSparkContexts();
initPath();
initTestDataGenerator();

Map<String, String> params = new HashMap<>();
addNewTableParamsToProps(params, tableName);
Properties properties = new Properties();
params.forEach((k,v) -> properties.setProperty(k, v));

initMetaClient(getTableType(), properties);
// init config, table and client.
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(false)
.doSkipDefaultPartitionValidation(skipDefaultPartitionValidation).withProps(params).build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
// Write inserts
doInsert(client);

if (assertDefaultPartition) {
doInsertWithDefaultPartition(client);
}

// downgrade table props
downgradeTableConfigsFromFiveToFour(cfg);

// perform upgrade
if (assertDefaultPartition && !skipDefaultPartitionValidation) {
// if "default" partition is present, upgrade should fail
assertThrows(HoodieException.class, () -> new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.FIVE, null), "Upgrade from 4 to 5 is expected to fail if \"default\" partition is present.");
} else {
new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.FIVE, null);

// verify hoodie.table.version got upgraded
metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath()).build();
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.FIVE.versionCode());
assertTableVersionFromPropertyFile(HoodieTableVersion.FIVE);

// verify table props
assertTableProps(cfg);
}
}

private void addNewTableParamsToProps(Map<String, String> params) {
addNewTableParamsToProps(params, metaClient.getTableConfig().getTableName());
}

private void addNewTableParamsToProps(Map<String, String> params, String tableName) {
params.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid");
params.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "uuid");
params.put(HoodieTableConfig.PARTITION_FIELDS.key(), "partition_path");
params.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition_path");
params.put(HoodieTableConfig.NAME.key(), metaClient.getTableConfig().getTableName());
params.put(HoodieTableConfig.NAME.key(), tableName);
params.put(BASE_FILE_FORMAT.key(), BASE_FILE_FORMAT.defaultValue().name());
}

Expand All @@ -342,6 +413,16 @@ private void doInsert(SparkRDDWriteClient client) {
client.insert(writeRecords, commit1).collect();
}

private void doInsertWithDefaultPartition(SparkRDDWriteClient client) {
// Write 1 (only inserts)
dataGen = new HoodieTestDataGenerator(new String[]{DEPRECATED_DEFAULT_PARTITION_PATH});
String commit1 = "005";
client.startCommitWithTime(commit1);
List<HoodieRecord> records = dataGen.generateInserts(commit1, 100);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
client.insert(writeRecords, commit1).collect();
}

private void downgradeTableConfigsFromTwoToOne(HoodieWriteConfig cfg) throws IOException {
Properties properties = new Properties(cfg.getProps());
properties.remove(HoodieTableConfig.RECORDKEY_FIELDS.key());
Expand All @@ -368,6 +449,15 @@ private void downgradeTableConfigsFromThreeToTwo(HoodieWriteConfig cfg) throws I
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.TWO);
}

private void downgradeTableConfigsFromFiveToFour(HoodieWriteConfig cfg) throws IOException {
Properties properties = new Properties();
cfg.getProps().forEach((k,v) -> properties.setProperty((String) k, (String) v));
properties.setProperty(HoodieTableConfig.VERSION.key(), "4");
metaClient = HoodieTestUtils.init(hadoopConf, basePath, getTableType(), properties);
// set hoodie.table.version to 4 in hoodie.properties file
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.FOUR);
}

private void assertTableProps(HoodieWriteConfig cfg) {
HoodieTableConfig tableConfig = metaClient.getTableConfig();
Properties originalProps = cfg.getProps();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public enum HoodieTableVersion {
// 0.10.0 onwards
THREE(3),
// 0.11.0 onwards
FOUR(4);
FOUR(4),
// 0.12.0 onwards
FIVE(5);

private final int versionCode;

Expand All @@ -49,7 +51,7 @@ public int versionCode() {
}

public static HoodieTableVersion current() {
return FOUR;
return FIVE;
}

public static HoodieTableVersion versionFromCode(int versionCode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/
public class PartitionPathEncodeUtils {

public static final String DEPRECATED_DEFAULT_PARTITION_PATH = "default";
public static final String DEFAULT_PARTITION_PATH = "__HIVE_DEFAULT_PARTITION__";

static BitSet charToEscape = new BitSet(128);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ class TestUpgradeOrDowngradeProcedure extends HoodieSparkSqlTestBase {
.build

// verify hoodie.table.version of the original table
assertResult(HoodieTableVersion.FOUR.versionCode) {
assertResult(HoodieTableVersion.FIVE.versionCode) {
metaClient.getTableConfig.getTableVersion.versionCode()
}
assertTableVersionFromPropertyFile(metaClient, HoodieTableVersion.FOUR.versionCode)
assertTableVersionFromPropertyFile(metaClient, HoodieTableVersion.FIVE.versionCode)

// downgrade table to ZERO
checkAnswer(s"""call downgrade_table(table => '$tableName', to_version => 'ZERO')""")(Seq(true))
Expand Down