diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index ec53be8ca0492..65f07d7c7a83e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -62,9 +62,7 @@ public CleanFunction(Configuration conf) { public void open(Configuration parameters) throws Exception { super.open(parameters); if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { - // do not use the remote filesystem view because the async cleaning service - // local timeline is very probably to fall behind with the remote one. - this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext(), false); + this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); if (conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index f3253e48a23b5..65f9eec9aa41a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -32,6 +32,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.sink.utils.HiveSyncContext; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.CompactionUtil; @@ -137,6 +138,16 @@ public class StreamWriteOperatorCoordinator */ private transient TableState tableState; + /** + * The checkpoint metadata. + */ + private CkpMetadata ckpMetadata; + + /** + * Current checkpoint. + */ + private long checkpointId = -1; + /** * Constructs a StreamingSinkOperatorCoordinator. * @@ -175,6 +186,8 @@ public void start() throws Exception { if (tableState.syncMetadata) { initMetadataSync(); } + this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), metaClient.getBasePath()); + this.ckpMetadata.bootstrap(this.metaClient); } @Override @@ -192,10 +205,14 @@ public void close() throws Exception { writeClient.close(); } this.eventBuffer = null; + if (this.ckpMetadata != null) { + this.ckpMetadata.close(); + } } @Override public void checkpointCoordinator(long checkpointId, CompletableFuture result) { + this.checkpointId = checkpointId; executor.execute( () -> { try { @@ -238,6 +255,15 @@ public void notifyCheckpointComplete(long checkpointId) { ); } + @Override + public void notifyCheckpointAborted(long checkpointId) { + if (checkpointId == this.checkpointId) { + executor.execute(() -> { + this.ckpMetadata.abortInstant(this.instant); + }, "abort instant %s", this.instant); + } + } + @Override public void resetToCheckpoint(long checkpointID, byte[] checkpointData) { // no operation @@ -340,6 +366,7 @@ private void startInstant() { // because the instant request from write task is asynchronous. this.instant = this.writeClient.startCommit(); this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant); + this.ckpMetadata.startInstant(this.instant); LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); } @@ -488,6 +515,7 @@ private void doCommit(String instant, List writeResults) { tableState.commitAction, partitionToReplacedFileIds); if (success) { reset(); + this.ckpMetadata.commitInstant(instant); LOG.info("Commit instant [{}] success!", instant); } else { throw new HoodieException(String.format("Commit instant [%s] failed!", instant)); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 0f944c56577e2..cdb378f88885d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -24,7 +24,6 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -37,6 +36,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction; +import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.format.FormatUtils; import org.apache.hudi.util.FlinkTables; @@ -83,6 +83,8 @@ public class BootstrapOperator> protected HoodieTable hoodieTable; + private CkpMetadata ckpMetadata; + protected final Configuration conf; protected transient org.apache.hadoop.conf.Configuration hadoopConf; @@ -101,8 +103,7 @@ public BootstrapOperator(Configuration conf) { @Override public void snapshotState(StateSnapshotContext context) throws Exception { - HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(this.conf); - lastInstantTime = StreamerUtil.getLastPendingInstant(metaClient); + lastInstantTime = this.ckpMetadata.lastPendingInstant(); instantState.update(Collections.singletonList(lastInstantTime)); } @@ -124,6 +125,7 @@ public void initializeState(StateInitializationContext context) throws Exception this.hadoopConf = StreamerUtil.getHadoopConf(); this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext()); + this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath()); this.aggregateManager = getRuntimeContext().getGlobalAggregateManager(); preLoadIndexRecords(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java index 4089907243c87..9b34c3edcd800 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -21,11 +21,11 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.common.AbstractWriteFunction; import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.sink.utils.TimeWait; import org.apache.hudi.util.StreamerUtil; @@ -79,11 +79,6 @@ public class BulkInsertWriteFunction */ private int taskID; - /** - * Meta Client. - */ - private transient HoodieTableMetaClient metaClient; - /** * Write Client. */ @@ -99,6 +94,11 @@ public class BulkInsertWriteFunction */ private transient OperatorEventGateway eventGateway; + /** + * Checkpoint metadata. + */ + private CkpMetadata ckpMetadata; + /** * Constructs a StreamingSinkFunction. * @@ -112,9 +112,9 @@ public BulkInsertWriteFunction(Configuration config, RowType rowType) { @Override public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); - this.metaClient = StreamerUtil.createMetaClient(this.config); this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); - this.initInstant = StreamerUtil.getLastPendingInstant(this.metaClient, false); + this.ckpMetadata = CkpMetadata.getInstance(config.getString(FlinkOptions.PATH)); + this.initInstant = lastPendingInstant(); sendBootstrapEvent(); initWriterHelper(); } @@ -187,7 +187,7 @@ private void sendBootstrapEvent() { * Returns the last pending instant time. */ protected String lastPendingInstant() { - return StreamerUtil.getLastPendingInstant(this.metaClient); + return this.ckpMetadata.lastPendingInstant(); } private String instantToWrite() { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 0e7300591286f..f9cf938e44aee 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -26,6 +26,7 @@ import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.sink.utils.TimeWait; import org.apache.hudi.util.StreamerUtil; @@ -114,6 +115,11 @@ public abstract class AbstractStreamWriteFunction */ protected List writeStatuses; + /** + * The checkpoint metadata. + */ + private transient CkpMetadata ckpMetadata; + /** * Constructs a StreamWriteFunctionBase. * @@ -135,6 +141,7 @@ public void initializeState(FunctionInitializationContext context) throws Except TypeInformation.of(WriteMetadataEvent.class) )); + this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), this.metaClient.getBasePath()); this.currentInstant = lastPendingInstant(); if (context.isRestored()) { restoreWriteMetadata(); @@ -217,7 +224,7 @@ public void handleOperatorEvent(OperatorEvent event) { * Returns the last pending instant time. */ protected String lastPendingInstant() { - return StreamerUtil.getLastPendingInstant(this.metaClient); + return this.ckpMetadata.lastPendingInstant(); } /** @@ -238,7 +245,7 @@ protected String instantToWrite(boolean hasData) { // wait condition: // 1. there is no inflight instant // 2. the inflight instant does not change and the checkpoint has buffering data - if (instant == null || (instant.equals(this.currentInstant) && hasData)) { + if (instant == null || (instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant))) { // sleep for a while timeWait.waitFor(); // refresh the inflight instant diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMessage.java b/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMessage.java new file mode 100644 index 0000000000000..1a27ae05c5190 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMessage.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.meta; + +import org.apache.hudi.common.util.ValidationUtils; + +import org.apache.hadoop.fs.FileStatus; +import org.jetbrains.annotations.NotNull; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * A checkpoint message. + */ +public class CkpMessage implements Serializable, Comparable { + private static final long serialVersionUID = 1L; + + public static final Comparator COMPARATOR = Comparator.comparing(CkpMessage::getInstant) + .thenComparing(CkpMessage::getState); + + private final String instant; // the instant time + private final State state; // the checkpoint state + + public CkpMessage(String instant, String state) { + this.instant = instant; + this.state = State.valueOf(state); + } + + public CkpMessage(FileStatus fileStatus) { + String fileName = fileStatus.getPath().getName(); + String[] nameAndExt = fileName.split("\\."); + ValidationUtils.checkState(nameAndExt.length == 2); + String name = nameAndExt[0]; + String ext = nameAndExt[1]; + + this.instant = name; + this.state = State.valueOf(ext); + } + + public String getInstant() { + return instant; + } + + public State getState() { + return state; + } + + public boolean isAborted() { + return State.ABORTED == this.state; + } + + public boolean isComplete() { + return State.COMPLETED == this.state; + } + + public boolean isInflight() { + return State.INFLIGHT == this.state; + } + + public static String getFileName(String instant, State state) { + return instant + "." + state.name(); + } + + public static List getAllFileNames(String instant) { + return Arrays.stream(State.values()) + .map(state -> getFileName(instant, state)) + .collect(Collectors.toList()); + } + + @Override + public int compareTo(@NotNull CkpMessage o) { + return COMPARATOR.compare(this, o); + } + + /** + * Instant State. + */ + public enum State { + // Inflight instant + INFLIGHT, + // Aborted instant + // An instant can be aborted then be reused again, so it has lower priority + // than COMPLETED + ABORTED, + // Committed instant + COMPLETED + } + + @Override + public String toString() { + return "Ckp{" + "instant='" + instant + '\'' + ", state='" + state + '\'' + '}'; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java new file mode 100644 index 0000000000000..3fdba7fd6cf53 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.meta; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * The checkpoint metadata for bookkeeping the checkpoint messages. + * + *

Each time the driver starts a new instant, it writes a commit message into the metadata, the write tasks + * then consume the message and unblock the data flushing. + * + *

Why we use the DFS based message queue instead of sending + * the {@link org.apache.flink.runtime.operators.coordination.OperatorEvent} ? + * The write task handles the operator event using the main mailbox executor which has the lowest priority for mails, + * it is also used to process the inputs. When the write task blocks and waits for the operator event to ack the valid instant to write, + * it actually blocks all the subsequent events in the mailbox, the operator event would never be consumed then it causes deadlock. + * + *

The checkpoint metadata is also more lightweight than the active timeline. + * + *

NOTE: should be removed in the future if we have good manner to handle the async notifications from driver. + */ +public class CkpMetadata implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class); + + protected static final int MAX_RETAIN_CKP_NUM = 3; + + // the ckp metadata directory + private static final String CKP_META = "ckp_meta"; + + private final FileSystem fs; + protected final Path path; + + private List messages; + private List instantCache; + + private CkpMetadata(String basePath) { + this(FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()), basePath); + } + + private CkpMetadata(FileSystem fs, String basePath) { + this.fs = fs; + this.path = new Path(ckpMetaPath(basePath)); + } + + public void close() { + this.instantCache = null; + } + + // ------------------------------------------------------------------------- + // WRITE METHODS + // ------------------------------------------------------------------------- + /** + * Initialize the message bus, would clean all the messages and publish the last pending instant. + * + *

This expects to be called by the driver. + */ + public void bootstrap(HoodieTableMetaClient metaClient) throws IOException { + fs.delete(path, true); + fs.mkdirs(path); + metaClient.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction() + .lastInstant().ifPresent(instant -> startInstant(instant.getTimestamp())); + } + + public void startInstant(String instant) { + Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT)); + try { + fs.createNewFile(path); + } catch (IOException e) { + throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant); + } + // cleaning + clean(instant); + } + + private void clean(String newInstant) { + if (this.instantCache == null) { + this.instantCache = new ArrayList<>(); + } + this.instantCache.add(newInstant); + if (instantCache.size() > MAX_RETAIN_CKP_NUM) { + final String instant = instantCache.get(0); + boolean[] error = new boolean[1]; + CkpMessage.getAllFileNames(instant).stream().map(this::fullPath).forEach(path -> { + try { + fs.delete(path, false); + } catch (IOException e) { + error[0] = true; + LOG.warn("Exception while cleaning the checkpoint meta file: " + path); + } + }); + if (!error[0]) { + instantCache.remove(0); + } + } + } + + /** + * Add a checkpoint commit message. + * + * @param instant The committed instant + */ + public void commitInstant(String instant) { + Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.COMPLETED)); + try { + fs.createNewFile(path); + } catch (IOException e) { + throw new HoodieException("Exception while adding checkpoint commit metadata for instant: " + instant); + } + } + + /** + * Add an aborted checkpoint message. + */ + public void abortInstant(String instant) { + Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.ABORTED)); + try { + fs.createNewFile(path); + } catch (IOException e) { + throw new HoodieException("Exception while adding checkpoint abort metadata for instant: " + instant); + } + } + + // ------------------------------------------------------------------------- + // READ METHODS + // ------------------------------------------------------------------------- + + private void load() { + try { + this.messages = scanCkpMetadata(this.path); + } catch (IOException e) { + throw new HoodieException("Exception while scanning the checkpoint meta files under path: " + this.path); + } + } + + @Nullable + public String lastPendingInstant() { + load(); + for (int i = this.messages.size() - 1; i >= 0; i--) { + CkpMessage ckpMsg = this.messages.get(i); + // consider 'aborted' as pending too to reuse the instant + if (!ckpMsg.isComplete()) { + return ckpMsg.getInstant(); + } + } + return null; + } + + public List getMessages() { + load(); + return messages; + } + + public boolean isAborted(String instant) { + ValidationUtils.checkState(this.messages != null, "The checkpoint metadata should #load first"); + return this.messages.stream().anyMatch(ckpMsg -> instant.equals(ckpMsg.getInstant()) && ckpMsg.isAborted()); + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + public static CkpMetadata getInstance(String basePath) { + return new CkpMetadata(basePath); + } + + public static CkpMetadata getInstance(FileSystem fs, String basePath) { + return new CkpMetadata(fs, basePath); + } + + protected static String ckpMetaPath(String basePath) { + return basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + CKP_META; + } + + private Path fullPath(String fileName) { + return new Path(path, fileName); + } + + private List scanCkpMetadata(Path ckpMetaPath) throws IOException { + return Arrays.stream(this.fs.listStatus(ckpMetaPath)).map(CkpMessage::new) + .collect(Collectors.groupingBy(CkpMessage::getInstant)).values().stream() + .map(messages -> messages.stream().reduce((x, y) -> { + // Pick the one with the highest state + if (x.getState().compareTo(y.getState()) >= 0) { + return x; + } + return y; + }).get()) + .sorted().collect(Collectors.toList()); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 73009c30f76e1..55d403dc4db8d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -488,7 +488,7 @@ public static String getLastPendingInstant(HoodieTableMetaClient metaClient, boo if (reloadTimeline) { metaClient.reloadActiveTimeline(); } - return metaClient.getCommitsTimeline().filterInflights() + return metaClient.getCommitsTimeline().filterPendingExcludingCompaction() .lastInstant() .map(HoodieInstant::getTimestamp) .orElse(null); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 35523a8fb426c..08035aff5a167 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -95,8 +95,8 @@ public void testCheckpointFails() throws Exception { .assertEmptyEvent() .checkpointFails(1) .consume(TestData.DATA_SET_INSERT) - .checkpointThrows(2, - "Timeout(1000ms) while waiting for instant initialize") + //.checkpointThrows(2, + // "Timeout(1000ms) while waiting for instant initialize") // do not send the write event and fails the checkpoint, // behaves like the last checkpoint is successful. .checkpointFails(2) diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index ebe9140adb2ed..e1e86ce32bd82 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -178,10 +178,11 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + ""); String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); tableEnv.executeSql(hoodieTableDDL); - tableEnv.executeSql(TestSQL.INSERT_T1).await(); - // wait for the asynchronous commit to finish - TimeUnit.SECONDS.sleep(5); + // insert dataset + tableEnv.executeSql(TestSQL.INSERT_T1).await(); + // update the dataset + tableEnv.executeSql(TestSQL.UPDATE_INSERT_T1).await(); // Make configuration and setAvroSchema. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -195,8 +196,6 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env); asyncCompactionService.start(null); - tableEnv.executeSql(TestSQL.UPDATE_INSERT_T1).await(); - // wait for the asynchronous commit to finish TimeUnit.SECONDS.sleep(5); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java b/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java new file mode 100644 index 0000000000000..c4eecd7e4941b --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.meta; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.TestConfigurations; + +import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.stream.IntStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test cases for {@link CkpMetadata}. + */ +public class TestCkpMetadata { + + private CkpMetadata metadata; + + @TempDir + File tempFile; + + @BeforeEach + public void beforeEach() throws Exception { + String basePath = tempFile.getAbsolutePath(); + FileSystem fs = FSUtils.getFs(tempFile.getAbsolutePath(), StreamerUtil.getHadoopConf()); + + Configuration conf = TestConfigurations.getDefaultConf(basePath); + StreamerUtil.initTableIfNotExists(conf); + + this.metadata = CkpMetadata.getInstance(fs, basePath); + } + + @Test + void testWriteAndReadMessage() { + // write and read 5 committed checkpoints + IntStream.range(0, 3).forEach(i -> metadata.startInstant(i + "")); + + assertThat(metadata.lastPendingInstant(), is("2")); + metadata.commitInstant("2"); + assertThat(metadata.lastPendingInstant(), is("1")); + + // test cleaning + IntStream.range(3, 6).forEach(i -> metadata.startInstant(i + "")); + assertThat(metadata.getMessages().size(), is(3)); + // commit and abort instant does not trigger cleaning + metadata.commitInstant("6"); + metadata.abortInstant("7"); + assertThat(metadata.getMessages().size(), is(5)); + } +}