diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 74e0a2565f827..025aff09bdd20 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -73,7 +73,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; import java.io.IOException; import java.text.ParseException; import java.time.Instant; @@ -82,7 +81,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -617,60 +615,9 @@ private boolean deleteArchivedInstants(List archivedInstants, Hoo ); } - // Remove older meta-data from auxiliary path too - Option latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) - || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp))); - LOG.info("Latest Committed Instant=" + latestCommitted); - if (latestCommitted.isPresent()) { - return deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get()); - } return true; } - /** - * Remove older instants from auxiliary meta folder. - * - * @param thresholdInstant Hoodie Instant - * @return success if all eligible file deleted successfully - * @throws IOException in case of error - */ - private boolean deleteAllInstantsOlderOrEqualsInAuxMetaFolder(HoodieInstant thresholdInstant) throws IOException { - List instants = null; - boolean success = true; - try { - instants = - metaClient.scanHoodieInstantsFromFileSystem( - new Path(metaClient.getMetaAuxiliaryPath()), - HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE, - false); - } catch (FileNotFoundException e) { - /* - * On some FSs deletion of all files in the directory can auto remove the directory itself. - * GCS is one example, as it doesn't have real directories and subdirectories. When client - * removes all the files from a "folder" on GCS is has to create a special "/" to keep the folder - * around. If this doesn't happen (timeout, mis configured client, ...) folder will be deleted and - * in this case we should not break when aux folder is not found. - * GCS information: (https://cloud.google.com/storage/docs/gsutil/addlhelp/HowSubdirectoriesWork) - */ - LOG.warn("Aux path not found. Skipping: " + metaClient.getMetaAuxiliaryPath()); - return true; - } - - List instantsToBeDeleted = - instants.stream().filter(instant1 -> compareTimestamps(instant1.getTimestamp(), - LESSER_THAN_OR_EQUALS, thresholdInstant.getTimestamp())).collect(Collectors.toList()); - - for (HoodieInstant deleteInstant : instantsToBeDeleted) { - LOG.info("Deleting instant " + deleteInstant + " in auxiliary meta path " + metaClient.getMetaAuxiliaryPath()); - Path metaFile = new Path(metaClient.getMetaAuxiliaryPath(), deleteInstant.getFileName()); - if (metaClient.getFs().exists(metaFile)) { - success &= metaClient.getFs().delete(metaFile, false); - LOG.info("Deleted instant file in auxiliary meta path : " + metaFile); - } - } - return success; - } - public void archive(HoodieEngineContext context, List instants) throws HoodieCommitException { try { Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieUpgradeDowngradeException.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieUpgradeDowngradeException.java index 19d7e7e375422..97796bb71d8e4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieUpgradeDowngradeException.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieUpgradeDowngradeException.java @@ -25,6 +25,10 @@ public HoodieUpgradeDowngradeException(String msg, Throwable t) { } public HoodieUpgradeDowngradeException(int fromVersion, int toVersion, boolean upgrade) { - super(String.format("Cannot %s from version %s -> %s", upgrade ? "upgrade" : "downgrade", fromVersion, toVersion), null); + this(fromVersion, toVersion, upgrade, null); + } + + public HoodieUpgradeDowngradeException(int fromVersion, int toVersion, boolean upgrade, Throwable t) { + super(String.format("Cannot %s from version %s -> %s", upgrade ? "upgrade" : "downgrade", fromVersion, toVersion), t); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToSixUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToSixUpgradeHandler.java new file mode 100644 index 0000000000000..e3346c2f455f0 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/FiveToSixUpgradeHandler.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieUpgradeDowngradeException; +import org.apache.hudi.table.HoodieTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Upgrade handle to assist in upgrading hoodie table from version 5 to 6. + * Since we do not write/read compaction plan from .aux folder anyone, the + * upgrade handler will delete compaction files from .aux folder. + */ +public class FiveToSixUpgradeHandler implements UpgradeHandler { + + private static final Logger LOG = LoggerFactory.getLogger(FiveToSixUpgradeHandler.class); + + @Override + public Map upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { + HoodieTable table = upgradeDowngradeHelper.getTable(config, context); + HoodieTableMetaClient metaClient = table.getMetaClient(); + // delete compaction file from .aux + HoodieTimeline compactionTimeline = metaClient.getActiveTimeline().filterPendingCompactionTimeline() + .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED); + compactionTimeline.getInstantsAsStream().forEach( + deleteInstant -> { + LOG.info("Deleting instant " + deleteInstant + " in auxiliary meta path " + metaClient.getMetaAuxiliaryPath()); + Path metaFile = new Path(metaClient.getMetaAuxiliaryPath(), deleteInstant.getFileName()); + try { + if (metaClient.getFs().exists(metaFile)) { + metaClient.getFs().delete(metaFile, false); + LOG.info("Deleted instant file in auxiliary meta path : " + metaFile); + } + } catch (IOException e) { + throw new HoodieUpgradeDowngradeException(HoodieTableVersion.FIVE.versionCode(), HoodieTableVersion.SIX.versionCode(), true, e); + } + } + ); + return new HashMap<>(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java new file mode 100644 index 0000000000000..24d8ee697ed52 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import java.util.Collections; +import java.util.Map; + +/** + * Downgrade handle to assist in downgrading hoodie table from version 6 to 5. + * To ensure compatibility, we need recreate the compaction requested file to + * .aux folder. + */ +public class SixToFiveDowngradeHandler implements DowngradeHandler { + + @Override + public Map downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { + HoodieTable table = upgradeDowngradeHelper.getTable(config, context); + HoodieTableMetaClient metaClient = table.getMetaClient(); + // sync compaction requested file to .aux + HoodieTimeline compactionTimeline = new HoodieActiveTimeline(metaClient, false).filterPendingCompactionTimeline() + .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED); + compactionTimeline.getInstantsAsStream().forEach(instant -> { + String fileName = instant.getFileName(); + FileIOUtils.copy(metaClient.getFs(), + new Path(metaClient.getMetaPath(), fileName), + new Path(metaClient.getMetaAuxiliaryPath(), fileName)); + }); + return Collections.emptyMap(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java index 19152af0ec863..8e8b616e57387 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java @@ -87,7 +87,10 @@ public boolean needsUpgradeOrDowngrade(HoodieTableVersion toVersion) { * pre 0.6.0 -> v0 * 0.6.0 to 0.8.0 -> v1 * 0.9.0 -> v2 - * 0.10.0 to current -> v3 + * 0.10.0 -> v3 + * 0.11.0 -> v4 + * 0.12.0 to 0.13.0 -> v5 + * 0.14.0 to current -> v6 *

* On a high level, these are the steps performed *

@@ -147,6 +150,8 @@ protected Map upgrade(HoodieTableVersion fromVersion, Ho return new ThreeToFourUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper); } else if (fromVersion == HoodieTableVersion.FOUR && toVersion == HoodieTableVersion.FIVE) { return new FourToFiveUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper); + } else if (fromVersion == HoodieTableVersion.FIVE && toVersion == HoodieTableVersion.SIX) { + return new FiveToSixUpgradeHandler().upgrade(config, context, instantTime, upgradeDowngradeHelper); } else { throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), true); } @@ -163,6 +168,8 @@ protected Map downgrade(HoodieTableVersion fromVersion, return new FourToThreeDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper); } else if (fromVersion == HoodieTableVersion.FIVE && toVersion == HoodieTableVersion.FOUR) { return new FiveToFourDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper); + } else if (fromVersion == HoodieTableVersion.SIX && toVersion == HoodieTableVersion.FIVE) { + return new SixToFiveDowngradeHandler().downgrade(config, context, instantTime, upgradeDowngradeHelper); } else { throw new HoodieUpgradeDowngradeException(fromVersion.versionCode(), toVersion.versionCode(), false); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java index 8a13985d170bf..63d823b6ecdbb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableVersion.java @@ -38,7 +38,9 @@ public enum HoodieTableVersion { // 0.11.0 onwards FOUR(4), // 0.12.0 onwards - FIVE(5); + FIVE(5), + // 0.14.0 onwards + SIX(6); private final int versionCode; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 1372f5adb7dab..fb33da5ec458f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -35,7 +35,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; import java.text.ParseException; @@ -374,18 +373,7 @@ public Option readRestoreInfoAsBytes(HoodieInstant instant) { //----------------------------------------------------------------- public Option readCompactionPlanAsBytes(HoodieInstant instant) { - try { - // Reading from auxiliary path first. In future release, we will cleanup compaction management - // to only write to timeline and skip auxiliary and this code will be able to handle it. - return readDataFromPath(new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName())); - } catch (HoodieIOException e) { - // This will be removed in future release. See HUDI-546 - if (e.getIOException() instanceof FileNotFoundException) { - return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName())); - } else { - throw e; - } - } + return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName())); } public Option readIndexPlanAsBytes(HoodieInstant instant) { @@ -492,11 +480,6 @@ public HoodieInstant transitionLogCompactionInflightToComplete(HoodieInstant inf return commitInstant; } - private void createFileInAuxiliaryFolder(HoodieInstant instant, Option data) { - // This will be removed in future release. See HUDI-546 - Path fullPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); - FileIOUtils.createFileInPath(metaClient.getFs(), fullPath, data); - } //----------------------------------------------------------------- // END - COMPACTION RELATED META-DATA MANAGEMENT @@ -704,8 +687,6 @@ public void saveToCompactionRequested(HoodieInstant instant, Option cont public void saveToCompactionRequested(HoodieInstant instant, Option content, boolean overwrite) { ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); - // Write workload to auxiliary folder - createFileInAuxiliaryFolder(instant, content); createFileInMetaPath(instant.getFileName(), content, overwrite); } @@ -715,8 +696,6 @@ public void saveToLogCompactionRequested(HoodieInstant instant, Option c public void saveToLogCompactionRequested(HoodieInstant instant, Option content, boolean overwrite) { ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.LOG_COMPACTION_ACTION)); - // Write workload to auxiliary folder - createFileInAuxiliaryFolder(instant, content); createFileInMetaPath(instant.getFileName(), content, overwrite); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 69b43d0015856..b7311fcc37c19 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -140,19 +140,13 @@ public void testLoadingInstantsFromFiles() throws IOException { .setLayoutVersion(Option.of(new TimelineLayoutVersion(VERSION_0))).build()); // Old Timeline writes both to aux and timeline folder oldTimeline.saveToCompactionRequested(instant6, Option.of(dummy)); - // Now use latest timeline version + // Now use the latest timeline version timeline = timeline.reload(); // Ensure aux file is present - assertTrue(metaClient.getFs().exists(new Path(metaClient.getMetaAuxiliaryPath(), instant6.getFileName()))); + assertTrue(metaClient.getFs().exists(new Path(metaClient.getMetaPath(), instant6.getFileName()))); // Read 5 bytes assertEquals(5, timeline.readCompactionPlanAsBytes(instant6).get().length); - // Delete auxiliary file to mimic future release where we stop writing to aux - metaClient.getFs().delete(new Path(metaClient.getMetaAuxiliaryPath(), instant6.getFileName())); - - // Ensure requested instant is not present in aux - assertFalse(metaClient.getFs().exists(new Path(metaClient.getMetaAuxiliaryPath(), instant6.getFileName()))); - // Now read compaction plan again which should not throw exception assertEquals(5, timeline.readCompactionPlanAsBytes(instant6).get().length); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java index 93ba289be1b47..633f06b0e4f2f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java @@ -309,18 +309,6 @@ private void cluster() throws Exception { } HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp()); - HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); - if (!pendingClusteringTimeline.containsInstant(instant)) { - // this means that the clustering plan was written to auxiliary path(.tmp) - // but not the meta path(.hoodie), this usually happens when the job crush - // exceptionally. - - // clean the clustering plan in auxiliary path and cancels the clustering. - LOG.warn("The clustering plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" - + "Clean the clustering plan in auxiliary path and cancels the clustering"); - CompactionUtil.cleanInstant(table.getMetaClient(), instant); - return; - } // get clusteringParallelism. int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1 diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 0381a0280e662..63dfd26c4acb5 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -273,18 +273,6 @@ private void compact() throws Exception { } List instants = compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList()); - for (HoodieInstant instant : instants) { - if (!pendingCompactionTimeline.containsInstant(instant)) { - // this means that the compaction plan was written to auxiliary path(.tmp) - // but not the meta path(.hoodie), this usually happens when the job crush - // exceptionally. - // clean the compaction plan in auxiliary path and cancels the compaction. - LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" - + "Clean the compaction plan in auxiliary path and cancels the compaction"); - CompactionUtil.cleanInstant(table.getMetaClient(), instant); - return; - } - } // get compactionParallelism. int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index 6f935a818bc78..63a00dd10c38c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -28,18 +28,15 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.sink.compact.FlinkCompactionConfig; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.avro.Schema; import org.apache.flink.configuration.Configuration; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Locale; /** @@ -165,25 +162,6 @@ public static void inferMetadataConf(Configuration conf, HoodieTableMetaClient m } } - /** - * Cleans the metadata file for given instant {@code instant}. - */ - public static void cleanInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) { - Path commitFilePath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); - try { - if (metaClient.getFs().exists(commitFilePath)) { - boolean deleted = metaClient.getFs().delete(commitFilePath, false); - if (deleted) { - LOG.info("Removed instant " + instant); - } else { - throw new HoodieIOException("Could not delete instant " + instant); - } - } - } catch (IOException e) { - throw new HoodieIOException("Could not remove requested commit " + commitFilePath, e); - } - } - public static void rollbackCompaction(HoodieFlinkTable table, String instantTime) { HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(instantTime); if (table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(inflightInstant)) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index 0ad78890aad37..bf5c13f366cea 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -21,6 +21,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; @@ -28,6 +29,8 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper; +import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; @@ -169,6 +172,87 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception { TestData.checkWrittenDataCOW(tempFile, EXPECTED1); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testHoodieFlinkCompactorWithUpgradeAndDowngrade(boolean upgrade) throws Exception { + // Create hoodie table and insert into data. + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); + tableEnv.getConfig().getConfiguration() + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); + Map options = new HashMap<>(); + options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false"); + options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false"); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + tableEnv.executeSql(hoodieTableDDL); + tableEnv.executeSql(TestSQL.INSERT_T1).await(); + + // wait for the asynchronous commit to finish + TimeUnit.SECONDS.sleep(3); + + // Make configuration and setAvroSchema. + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkCompactionConfig cfg = new FlinkCompactionConfig(); + cfg.path = tempFile.getAbsolutePath(); + Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg); + conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); + + // create metaClient + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + + // set the table name + conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + + // set table schema + CompactionUtil.setAvroSchema(conf, metaClient); + + // infer changelog mode + CompactionUtil.inferChangelogMode(conf, metaClient); + + HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient(conf); + + String compactionInstantTime = scheduleCompactionPlan(metaClient, writeClient); + + HoodieFlinkTable table = writeClient.getHoodieTable(); + + // try to upgrade or downgrade + if (upgrade) { + metaClient.getTableConfig().setTableVersion(HoodieTableVersion.FIVE); + new UpgradeDowngrade(metaClient, writeClient.getConfig(), writeClient.getEngineContext(), FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.SIX, "none"); + } else { + metaClient.getTableConfig().setTableVersion(HoodieTableVersion.SIX); + new UpgradeDowngrade(metaClient, writeClient.getConfig(), writeClient.getEngineContext(), FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.FIVE, "none"); + } + + // generate compaction plan + // should support configurable commit metadata + HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( + table.getMetaClient(), compactionInstantTime); + + HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + // Mark instant as compaction inflight + table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); + + env.addSource(new CompactionPlanSourceFunction(Collections.singletonList(Pair.of(compactionInstantTime, compactionPlan)))) + .name("compaction_source") + .uid("uid_compaction_source") + .rebalance() + .transform("compact_task", + TypeInformation.of(CompactionCommitEvent.class), + new CompactOperator(conf)) + .setParallelism(FlinkMiniCluster.DEFAULT_PARALLELISM) + .addSink(new CompactionCommitSink(conf)) + .name("clean_commits") + .uid("uid_clean_commits") + .setParallelism(1); + + env.execute("flink_hudi_compaction"); + writeClient.close(); + TestData.checkWrittenDataCOW(tempFile, EXPECTED1); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exception {