diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMessage.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/CkpMessage.java similarity index 90% rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMessage.java rename to hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/CkpMessage.java index 1a27ae05c5190..34f1d6f7fc551 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMessage.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/CkpMessage.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.sink.meta; +package org.apache.hudi.metadata; import org.apache.hudi.common.util.ValidationUtils; @@ -73,6 +73,10 @@ public boolean isComplete() { return State.COMPLETED == this.state; } + public boolean isCancelled() { + return State.CANCELLED == this.state; + } + public boolean isInflight() { return State.INFLIGHT == this.state; } @@ -103,7 +107,10 @@ public enum State { // than COMPLETED ABORTED, // Committed instant - COMPLETED + COMPLETED, + // In some conditions like rollback, instant may be deleted which means it has been dropped. + // So we need to mark it as CANCELLED with the highest priority + CANCELLED } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/CkpMetadata.java similarity index 60% rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java rename to hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/CkpMetadata.java index 6d0174069f205..f707378890c78 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/CkpMetadata.java @@ -16,16 +16,16 @@ * limitations under the License. */ -package org.apache.hudi.sink.meta; +package org.apache.hudi.metadata; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.exception.HoodieException; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -39,6 +39,7 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * The checkpoint metadata for bookkeeping the checkpoint messages. @@ -69,20 +70,28 @@ public class CkpMetadata implements Serializable { private static final String CKP_META = "ckp_meta"; private final FileSystem fs; + private final Path basePath; protected final Path path; private List messages; private List instantCache; - private CkpMetadata(Configuration config) { - this(FSUtils.getFs(config.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(config)), config.getString(FlinkOptions.PATH)); + private CkpMetadata(String basePath, Configuration hadoopConf) { + this(FSUtils.getFs(basePath, hadoopConf), basePath); } private CkpMetadata(FileSystem fs, String basePath) { this.fs = fs; + this.basePath = new Path(basePath); this.path = new Path(ckpMetaPath(basePath)); } + // for the fist time initialization in driver + private CkpMetadata(HoodieTableMetaClient metaClient) throws IOException { + this(metaClient.getFs(), metaClient.getBasePath()); + bootstrap(metaClient); + } + public void close() { this.instantCache = null; } @@ -96,9 +105,16 @@ public void close() { * *

This expects to be called by the driver. */ - public void bootstrap() throws IOException { + private void bootstrap(HoodieTableMetaClient metaClient) throws IOException { fs.delete(path, true); fs.mkdirs(path); + // The last pending instant excluding compaction and replacecommit should start + // for recommits of the last inflight instant if the write metadata checkpoint successfully + // but was not committed due to some rare cases. + metaClient.getActiveTimeline().reload().getCommitsTimeline().filterPendingExcludingCompaction() + .filter(instant -> !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) + .lastInstant() + .ifPresent(instant -> startInstant(instant.getTimestamp())); } public void startInstant(String instant) { @@ -165,6 +181,18 @@ public void abortInstant(String instant) { } } + /** + * Add a cancelled checkpoint message for a deleted instant. + */ + public void deleteInstant(String instant) { + Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.CANCELLED)); + try { + fs.createNewFile(path); + } catch (IOException e) { + throw new HoodieException("Exception while adding checkpoint delete metadata for instant: " + instant); + } + } + // ------------------------------------------------------------------------- // READ METHODS // ------------------------------------------------------------------------- @@ -182,8 +210,23 @@ public String lastPendingInstant() { load(); if (this.messages.size() > 0) { CkpMessage ckpMsg = this.messages.get(this.messages.size() - 1); - // consider 'aborted' as pending too to reuse the instant - if (!ckpMsg.isComplete()) { + if (validatePendingInstant(ckpMsg, false)) { + return ckpMsg.getInstant(); + } + } + return null; + } + + /** + * For some hadoop compatible systems, maybe we should supply an eagerly aligned choice for user + */ + @VisibleForTesting + @Nullable + public String latestPendingInstant() { + load(); + if (this.messages.size() > 0) { + CkpMessage ckpMsg = this.messages.get(this.messages.size() - 1); + if (validatePendingInstant(ckpMsg, true)) { return ckpMsg.getInstant(); } } @@ -203,18 +246,42 @@ public boolean isAborted(String instant) { // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- - public static CkpMetadata getInstance(Configuration config) { - return new CkpMetadata(config); + public static CkpMetadata getInstance(String basePath, Configuration hadoopConf) { + return new CkpMetadata(basePath, hadoopConf); } public static CkpMetadata getInstance(FileSystem fs, String basePath) { return new CkpMetadata(fs, basePath); } + // for the fist time initialization in driver + public static CkpMetadata getInstanceAtFirstTime(HoodieTableMetaClient metaClient) + throws IOException { + return new CkpMetadata(metaClient); + } + protected static String ckpMetaPath(String basePath) { return basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + CKP_META; } + private static List getRequestOrInflightCommitFile(String instantTime, Path metaFolder) { + return Stream.of( + HoodieTimeline.makeRequestedCommitFileName(instantTime), + HoodieTimeline.makeInflightCommitFileName(instantTime), + HoodieTimeline.makeRequestedDeltaFileName(instantTime), + HoodieTimeline.makeInflightDeltaFileName(instantTime)) + .map(fileName -> new Path(metaFolder, fileName)) + .collect(Collectors.toList()); + } + + private static List getCompleteCommitFile(String instantTime, Path metaFolder) { + return Stream.of( + HoodieTimeline.makeCommitFileName(instantTime), + HoodieTimeline.makeDeltaFileName(instantTime)) + .map(fileName -> new Path(metaFolder, fileName)) + .collect(Collectors.toList()); + } + private Path fullPath(String fileName) { return new Path(path, fileName); } @@ -231,4 +298,46 @@ private List scanCkpMetadata(Path ckpMetaPath) throws IOException { }).get()) .sorted().collect(Collectors.toList()); } + + /** + * aligned eagerly means that we will fast validate timeline instant state + * by file rather than by timeline to avoid fs scan + */ + private boolean validatePendingInstant(CkpMessage ckpMsg, boolean alignedEagerly) { + // consider 'aborted' as pending too to reuse the instant + if (!ckpMsg.isComplete() && !ckpMsg.isCancelled()) { + if (alignedEagerly) { + String pendingInstant = ckpMsg.getInstant(); + Path metaPathDir = new Path(this.basePath, HoodieTableMetaClient.METAFOLDER_NAME); + boolean isCompleted = + getCompleteCommitFile(pendingInstant, metaPathDir).stream() + .anyMatch(this::checkFileExists); + boolean isRequestedOrInflight = + getRequestOrInflightCommitFile(pendingInstant, metaPathDir).stream() + .anyMatch(this::checkFileExists); + boolean isValid = !isCompleted && isRequestedOrInflight; + boolean notExist = !isCompleted && !isRequestedOrInflight; + if (notExist) { + throw new HoodieException( + String.format( + "ckMsg is: %s but timeline instant: %s doesn't exist, isRequestedOrInflight: %s, isCompleted: %s", + ckpMsg, pendingInstant, isRequestedOrInflight, isCompleted)); + } + + return isValid; + } + + return true; + } + + return false; + } + + private boolean checkFileExists(Path fullPath) { + try { + return this.fs.isFile(fullPath); + } catch (IOException e) { + throw new HoodieException("Exception while checking instant file existence", e); + } + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 8e671b2ec5e56..cdc58000a8b48 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -64,7 +64,8 @@ import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.HoodieMergeHelper; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; -import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; +import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -325,7 +326,7 @@ public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstan @Override public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants, boolean skipLocking) { - return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute(); + return new FlinkCopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute(); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index 79261da2f583e..16d07511d7d5c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -39,7 +39,7 @@ import org.apache.hudi.table.action.compact.RunCompactionActionExecutor; import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; -import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor; +import org.apache.hudi.table.action.rollback.FlinkMergeOnReadRollbackActionExecutor; import java.util.List; import java.util.Map; @@ -124,7 +124,7 @@ public Option scheduleRollback(HoodieEngineContext context, @Override public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants, boolean skipLocking) { - return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, + return new FlinkMergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute(); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java new file mode 100644 index 0000000000000..0b2bf54d36f6b --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkCopyOnWriteRollbackActionExecutor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.rollback; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.CkpMetadata; +import org.apache.hudi.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +public class FlinkCopyOnWriteRollbackActionExecutor + extends CopyOnWriteRollbackActionExecutor { + + private static final Logger LOG = + LogManager.getLogger(FlinkCopyOnWriteRollbackActionExecutor.class); + + public FlinkCopyOnWriteRollbackActionExecutor( + HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants, + boolean skipLocking) { + + super(context, config, table, instantTime, commitInstant, deleteInstants, skipLocking); + } + + public FlinkCopyOnWriteRollbackActionExecutor( + HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants, + boolean skipTimelinePublish, + boolean skipLocking) { + super( + context, + config, + table, + instantTime, + commitInstant, + deleteInstants, + skipTimelinePublish, + skipLocking); + } + + @Override + protected void deleteInflightAndRequestedInstant( + boolean deleteInstant, + HoodieActiveTimeline activeTimeline, + HoodieInstant instantToBeDeleted) { + super.deleteInflightAndRequestedInstant(deleteInstant, activeTimeline, instantToBeDeleted); + // resolvedInstant uncompleted means that we will delete it during rollback + if (deleteInstant && !instantToBeDeleted.isCompleted()) { + CkpMetadata ckpMetadata = + CkpMetadata.getInstance( + table.getMetaClient().getFs(), table.getMetaClient().getBasePath()); + ckpMetadata.deleteInstant(instantToBeDeleted.getTimestamp()); + LOG.info("delete checkpoint metadata for for rollback instant: " + instantToBeDeleted); + } + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java new file mode 100644 index 0000000000000..0b469edb8003b --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMergeOnReadRollbackActionExecutor.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.rollback; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.CkpMetadata; +import org.apache.hudi.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +public class FlinkMergeOnReadRollbackActionExecutor + extends MergeOnReadRollbackActionExecutor { + + private static final Logger LOG = + LogManager.getLogger(FlinkMergeOnReadRollbackActionExecutor.class); + + public FlinkMergeOnReadRollbackActionExecutor( + HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants, + boolean skipLocking) { + super(context, config, table, instantTime, commitInstant, deleteInstants, skipLocking); + } + + public FlinkMergeOnReadRollbackActionExecutor( + HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants, + boolean skipTimelinePublish, + boolean skipLocking) { + super( + context, + config, + table, + instantTime, + commitInstant, + deleteInstants, + skipTimelinePublish, + skipLocking); + } + + @Override + protected void deleteInflightAndRequestedInstant( + boolean deleteInstant, + HoodieActiveTimeline activeTimeline, + HoodieInstant instantToBeDeleted) { + super.deleteInflightAndRequestedInstant(deleteInstant, activeTimeline, instantToBeDeleted); + // resolvedInstant uncompleted means that we will delete it during rollback + if (deleteInstant && !instantToBeDeleted.isCompleted()) { + CkpMetadata ckpMetadata = + CkpMetadata.getInstance( + table.getMetaClient().getFs(), table.getMetaClient().getBasePath()); + ckpMetadata.deleteInstant(instantToBeDeleted.getTimestamp()); + LOG.info("delete checkpoint metadata for for rollback instant: " + instantToBeDeleted); + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 17b789e2f0dcc..24fb4046cd525 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -32,9 +32,9 @@ import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metadata.CkpMetadata; import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; -import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.sink.utils.HiveSyncContext; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.ClusteringUtil; @@ -341,9 +341,7 @@ private static void initMetadataTable(HoodieFlinkWriteClient writeClient) { } private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) throws IOException { - CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePath()); - ckpMetadata.bootstrap(); - return ckpMetadata; + return CkpMetadata.getInstanceAtFirstTime(metaClient); } private void reset() { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 3eaa47e3b6278..1f1c6c476c9b7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -37,8 +37,8 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metadata.CkpMetadata; import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction; -import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.format.FormatUtils; import org.apache.hudi.util.FlinkTables; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java index 9fbdbdd8e1afc..9994d723f8b79 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -22,10 +22,11 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.metadata.CkpMetadata; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.common.AbstractWriteFunction; import org.apache.hudi.sink.event.WriteMetadataEvent; -import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.sink.utils.TimeWait; import org.apache.hudi.util.FlinkWriteClients; @@ -113,7 +114,7 @@ public BulkInsertWriteFunction(Configuration config, RowType rowType) { public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.writeClient = FlinkWriteClients.createWriteClient(this.config, getRuntimeContext()); - this.ckpMetadata = CkpMetadata.getInstance(config); + this.ckpMetadata = CkpMetadata.getInstance(config.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(config)); this.initInstant = lastPendingInstant(); sendBootstrapEvent(); initWriterHelper(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index fa4c3db86eac1..6ebd299b82056 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -23,10 +23,10 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.metadata.CkpMetadata; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; -import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.sink.utils.TimeWait; import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; @@ -195,6 +195,8 @@ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { private void restoreWriteMetadata() throws Exception { boolean eventSent = false; for (WriteMetadataEvent event : this.writeMetadataState.get()) { + LOG.info("restoreWriteMetadata send event, instant {}, pending instant {}, task[{}].", + event.getInstantTime(), this.currentInstant, taskID); if (Objects.equals(this.currentInstant, event.getInstantTime())) { // Reset taskID for event event.setTaskID(taskID); @@ -237,6 +239,8 @@ private void reloadWriteMetaState() throws Exception { .bootstrap(true) .build(); this.writeMetadataState.add(event); + LOG.info("reloadWriteMetaState send event, instant {}, task[{}].", + event.getInstantTime(), taskID); writeStatuses.clear(); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index f2a8a49b5ebcc..4015e440648c6 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -28,6 +28,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.metadata.CkpMetadata; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.utils.MockCoordinatorExecutor; @@ -165,6 +166,40 @@ public void testCheckpointCompleteWithPartialEvents() { assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant)); } + @Test + public void testRecommitWithCheckpointCompleteException() throws Exception { + // uncompleted meta events case + final CompletableFuture future = new CompletableFuture<>(); + final String instant = coordinator.getInstant(); + coordinator.checkpointCoordinator(1, future); + // not execute notify checkpoint complete to imitate failed commit even though it checkpoints successfully + OperatorEvent event1 = createOperatorEvent(0, instant, "par1", false, 0.2); + coordinator.handleEventFromOperator(0, event1); + OperatorEvent event2 = createOperatorEvent(1, instant, "par2", false, 0.2); + coordinator.handleEventFromOperator(1, event2); + + // recover from last successful checkpoint + OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 2); + coordinator = new StreamWriteOperatorCoordinator( + TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), context); + coordinator.start(); + coordinator.setExecutor(new MockCoordinatorExecutor(context)); + // send bootstrap event based on CkpMetadata + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(tempFile.getAbsolutePath()).build(); + CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePathV2().toString()); + String lastPendingInstant = StreamerUtil.getLastPendingInstant(metaClient); + String lastPendingInstantCached = ckpMetadata.lastPendingInstant(); + assertThat("Pending instant to be recommitted", instant.equals(lastPendingInstant) && instant.equals(lastPendingInstantCached)); + OperatorEvent event3 = createBootstrapEvent(0, lastPendingInstantCached, "par1", false, 0.2); + OperatorEvent event4 = createBootstrapEvent(1, lastPendingInstantCached, "par2", false, 0.2); + coordinator.handleEventFromOperator(0, event3); + coordinator.handleEventFromOperator(1, event4); + metaClient.reloadActiveTimeline(); + String lastCompleted = StreamerUtil.getLastCompletedInstant(metaClient); + assertThat("Recommits the instant with bootstrap events from checkpoint metadata", lastCompleted, is(instant)); + } + @Test public void testHiveSyncInvoked() throws Exception { // reset @@ -413,6 +448,33 @@ private static WriteMetadataEvent createOperatorEvent( .build(); } + private static WriteMetadataEvent createBootstrapEvent( + int taskId, + String instant, + String partitionPath, + boolean trackSuccessRecords, + double failureFraction) { + final WriteStatus writeStatus = new WriteStatus(trackSuccessRecords, failureFraction); + writeStatus.setPartitionPath(partitionPath); + + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPartitionPath(partitionPath); + writeStat.setFileId("fileId123"); + writeStat.setPath("path123"); + writeStat.setFileSizeInBytes(123); + writeStat.setTotalWriteBytes(123); + writeStat.setNumWrites(1); + + writeStatus.setStat(writeStat); + + return WriteMetadataEvent.builder() + .taskID(taskId) + .instantTime(instant) + .writeStatus(Collections.singletonList(writeStatus)) + .bootstrap(true) + .build(); + } + private void reset() throws Exception { FileUtils.cleanDirectory(tempFile); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java index fe7ce3f9478d6..47dd6ed65ac6b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java @@ -20,13 +20,22 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.metadata.CkpMetadata; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.io.TempDir; import java.io.File; @@ -39,6 +48,7 @@ /** * Test cases for {@link CkpMetadata}. */ +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class TestCkpMetadata { private CkpMetadata metadata; @@ -46,32 +56,102 @@ public class TestCkpMetadata { @TempDir File tempFile; + private HoodieTableMetaClient metaClient; + @BeforeEach public void beforeEach() throws Exception { String basePath = tempFile.getAbsolutePath(); FileSystem fs = FSUtils.getFs(tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(new Configuration())); Configuration conf = TestConfigurations.getDefaultConf(basePath); - StreamerUtil.initTableIfNotExists(conf); + this.metaClient = StreamerUtil.initTableIfNotExists(conf); this.metadata = CkpMetadata.getInstance(fs, basePath); } + @Order(1) @Test void testWriteAndReadMessage() { // write and read 5 committed checkpoints - IntStream.range(0, 3).forEach(i -> metadata.startInstant(i + "")); + IntStream.range(0, 3).forEach(i -> startInstant(i + "")); assertThat(metadata.lastPendingInstant(), is("2")); metadata.commitInstant("2"); assertThat(metadata.lastPendingInstant(), equalTo(null)); // test cleaning - IntStream.range(3, 6).forEach(i -> metadata.startInstant(i + "")); + IntStream.range(3, 6).forEach(i -> startInstant(i + "")); assertThat(metadata.getMessages().size(), is(3)); // commit and abort instant does not trigger cleaning metadata.commitInstant("6"); metadata.abortInstant("7"); assertThat(metadata.getMessages().size(), is(5)); + // test rollback instant + startInstant("8"); + assertThat(metadata.lastPendingInstant(), is("8")); + deleteInstant("8"); + assertThat(metadata.lastPendingInstant(), equalTo(null)); + } + + @Order(2) + @Test + void testBootstrap() throws Exception { + IntStream.range(0, 3).forEach(i -> startInstant(i + "")); + assertThat(this.metadata.getMessages().size(), is(3)); + // just keep the last pending instant + this.metadata = CkpMetadata.getInstanceAtFirstTime(this.metaClient); + assertThat(this.metadata.getMessages().size(), is(1)); + assertThat(metadata.lastPendingInstant(), is("2")); + } + + @Order(3) + @Test + void testLatestPendingInstant() { + // write and read 5 committed checkpoints + IntStream.range(0, 3).forEach(i -> startInstant(i + "")); + + assertThat(metadata.lastPendingInstant(), is("2")); + assertThat(metadata.latestPendingInstant(), is("2")); + metaClient.reloadActiveTimeline().saveAsComplete( + new HoodieInstant(true, HoodieActiveTimeline.DELTA_COMMIT_ACTION, "2"), Option.empty()); + assertThat(metadata.lastPendingInstant(), is("2")); + assertThat(metadata.latestPendingInstant(), equalTo(null)); + startInstant("3"); + metadata.commitInstant("3"); + assertThat(metadata.lastPendingInstant(), equalTo(null)); + assertThat(metadata.latestPendingInstant(), equalTo(null)); + } + + private void startInstant(String instant) { + HoodieActiveTimeline timeline = this.metaClient.getActiveTimeline().reload(); + HoodieInstant requestInstant = + new HoodieInstant( + HoodieInstant.State.REQUESTED, HoodieTimeline.DELTA_COMMIT_ACTION, instant); + timeline.createNewInstant(requestInstant); + timeline.transitionRequestedToInflight(requestInstant, Option.empty()); + this.metadata.startInstant(instant); + } + + private void commitInstant(String instant) { + HoodieActiveTimeline timeline = this.metaClient.getActiveTimeline().reload(); + HoodieInstant inflightInstant = + new HoodieInstant( + HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, instant); + timeline.saveAsComplete(inflightInstant, Option.empty()); + this.metadata.commitInstant(instant); + } + + private void deleteInstant(String instant) { + HoodieActiveTimeline timeline = this.metaClient.getActiveTimeline().reload(); + HoodieInstant inflightInstant = + new HoodieInstant( + HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, instant); + if (timeline.containsInstant(inflightInstant)) { + timeline.deletePending(inflightInstant); + } + timeline.deletePending( + new HoodieInstant( + HoodieInstant.State.REQUESTED, HoodieTimeline.DELTA_COMMIT_ACTION, instant)); + this.metadata.deleteInstant(instant); } }