diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 11564d186fb32..0e7e35e7ea328 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -137,7 +137,6 @@ public void processElement(I value, ProcessFunction.Context ctx, Coll @Override public void close() { - super.close(); if (this.writeClient != null) { this.writeClient.cleanHandlesGracefully(); this.writeClient.close(); @@ -402,6 +401,11 @@ private void bufferRecord(HoodieRecord> value) { } } + private boolean hasData() { + return this.buckets.size() > 0 + && this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0); + } + @SuppressWarnings("unchecked, rawtypes") private boolean flushBucket(DataBucket bucket) { String instant = instantToWrite(true); @@ -435,7 +439,7 @@ private boolean flushBucket(DataBucket bucket) { @SuppressWarnings("unchecked, rawtypes") private void flushRemaining(boolean endInput) { - this.currentInstant = instantToWrite(false); + this.currentInstant = instantToWrite(hasData()); if (this.currentInstant == null) { // in case there are empty checkpoints that has no input data throw new HoodieException("No inflight instant when flushing data!"); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index a30d76613591f..feb348fe39b50 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -30,9 +30,8 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; -import org.apache.hudi.sink.message.MessageBus; -import org.apache.hudi.sink.message.MessageDriver; import org.apache.hudi.sink.utils.HiveSyncContext; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.StreamerUtil; @@ -42,6 +41,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,11 +136,6 @@ public class StreamWriteOperatorCoordinator */ private transient TableState tableState; - /** - * The message driver. - */ - private MessageDriver messageDriver; - /** * Constructs a StreamingSinkOperatorCoordinator. * @@ -179,7 +174,6 @@ public void start() throws Exception { if (tableState.syncMetadata) { initMetadataSync(); } - this.messageDriver = MessageBus.getDriver(this.metaClient.getFs(), metaClient.getBasePath()); } @Override @@ -197,9 +191,6 @@ public void close() throws Exception { writeClient.close(); } this.eventBuffer = null; - if (this.messageDriver != null) { - this.messageDriver.close(); - } } @Override @@ -236,7 +227,7 @@ public void notifyCheckpointComplete(long checkpointId) { writeClient.scheduleCompaction(Option.empty()); } // start new instant. - startInstant(checkpointId); + startInstant(); // sync Hive if is enabled syncHiveIfEnabled(); } @@ -246,7 +237,12 @@ public void notifyCheckpointComplete(long checkpointId) { @Override public void notifyCheckpointAborted(long checkpointId) { - this.messageDriver.abortCkp(checkpointId); + // once the checkpoint was aborted, unblock the writer tasks to + // reuse the last instant. + if (!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) { + executor.execute(() -> sendCommitAckEvents(checkpointId), + "unblock data write with aborted checkpoint %s", checkpointId); + } } @Override @@ -337,19 +333,12 @@ private void addEventToBuffer(WriteMetadataEvent event) { } private void startInstant() { - // the flink checkpoint id starts from 1, - // see AbstractStreamWriteFunction#ackInstant - startInstant(MessageBus.INITIAL_CKP_ID); - } - - private void startInstant(long checkpoint) { final String instant = HoodieActiveTimeline.createNewInstantTime(); this.writeClient.startCommitWithTime(instant, tableState.commitAction); - this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, instant); - this.writeClient.upgradeDowngrade(instant); - this.messageDriver.commitCkp(checkpoint, this.instant, instant); this.instant = instant; - LOG.info("Create instant [{}] for table [{}] with type [{}]", instant, + this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant); + this.writeClient.upgradeDowngrade(this.instant); + LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); } @@ -408,6 +397,33 @@ private void handleWriteMetaEvent(WriteMetadataEvent event) { addEventToBuffer(event); } + /** + * The coordinator reuses the instant if there is no data for this round of checkpoint, + * sends the commit ack events to unblock the flushing. + */ + private void sendCommitAckEvents(long checkpointId) { + CompletableFuture>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull) + .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId))) + .toArray(CompletableFuture>[]::new); + try { + CompletableFuture.allOf(futures).get(); + } catch (Throwable throwable) { + if (!sendToFinishedTasks(throwable)) { + throw new HoodieException("Error while waiting for the commit ack events to finish sending", throwable); + } + } + } + + /** + * Decides whether the given exception is caused by sending events to FINISHED tasks. + * + *
Ugly impl: the exception may change in the future.
+ */
+ private static boolean sendToFinishedTasks(Throwable throwable) {
+ return throwable.getCause() instanceof TaskNotRunningException
+ || throwable.getCause().getMessage().contains("running");
+ }
+
/**
* Commits the instant.
*/
@@ -435,7 +451,8 @@ private boolean commitInstant(String instant, long checkpointId) {
if (writeResults.size() == 0) {
// No data has written, reset the buffer and returns early
reset();
- messageDriver.commitCkp(checkpointId, this.instant, this.instant);
+ // Send commit ack event to the write function to unblock the flushing
+ sendCommitAckEvents(checkpointId);
return false;
}
doCommit(instant, writeResults);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index f5fda5aa8b07d..f3cfbae667352 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -21,13 +21,11 @@
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.sink.message.MessageBus;
-import org.apache.hudi.sink.message.MessageClient;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.StreamerUtil;
@@ -40,8 +38,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -83,20 +79,25 @@ public class BulkInsertWriteFunction
*/
private int taskID;
+ /**
+ * Meta Client.
+ */
+ private transient HoodieTableMetaClient metaClient;
+
/**
* Write Client.
*/
private transient HoodieFlinkWriteClient writeClient;
/**
- * Gateway to send operator events to the operator coordinator.
+ * The initial inflight instant when start up.
*/
- private transient OperatorEventGateway eventGateway;
+ private volatile String initInstant;
/**
- * The message client.
+ * Gateway to send operator events to the operator coordinator.
*/
- private MessageClient messageClient;
+ private transient OperatorEventGateway eventGateway;
/**
* Constructs a StreamingSinkFunction.
@@ -111,8 +112,9 @@ public BulkInsertWriteFunction(Configuration config, RowType rowType) {
@Override
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+ this.metaClient = StreamerUtil.createMetaClient(this.config);
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
- this.messageClient = MessageBus.getClient(config.getString(FlinkOptions.PATH));
+ this.initInstant = StreamerUtil.getLastPendingInstant(this.metaClient, false);
sendBootstrapEvent();
initWriterHelper();
}
@@ -128,9 +130,6 @@ public void close() {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
}
- if (this.messageClient != null) {
- this.messageClient.close();
- }
}
/**
@@ -184,17 +183,8 @@ private void sendBootstrapEvent() {
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
}
- /**
- * Returns the next instant to write from the message bus.
- */
- @Nullable
- private String ackInstant() {
- Option It returns 3 kinds of value:
- * i) normal instant time: the previous checkpoint succeed;
- * ii) 'aborted' instant time: the previous checkpoint has been aborted;
- * ii) null: the checkpoint is till ongoing without any notifications.
- */
- @Nullable
- protected String ackInstant(long checkpointId) {
- Option Each time the driver starts a new instant, it writes a commit message into the bus, the write tasks
- * then consume the message and unblocking the data flush.
- *
- * Why we use the DFS based message queue instead of sending
- * the {@link org.apache.flink.runtime.operators.coordination.OperatorEvent} ?
- * The write task handles the operator event using the main mailbox executor which has the lowest priority for mails,
- * it is also used to process the inputs. When the write task blocks and waits for the operator event to ack the valid instant to write,
- * it actually blocks all the following events in the mailbox, the operator event can never be consumed then it causes deadlock.
- *
- * The message bus is also more lightweight than the active timeline.
- */
-public abstract class MessageBus implements AutoCloseable {
-
- public static final long INITIAL_CKP_ID = 0L;
-
- public static final String ABORTED_CKP_INSTANT = "aborted";
-
- protected static final int MESSAGE_QUEUE_LENGTH = 20;
-
- protected static final int CLIENT_MESSAGE_CACHE_SIZE = 10;
-
- private static final String MESSAGE_BUS = "message_bus";
-
- private static final String COMMIT = "commit";
-
- private static final String COMMIT_EXTENSION = "." + COMMIT;
- private static final String ABORTED_EXTENSION = ".aborted";
-
- protected final FileSystem fs;
- protected final String basePath;
- protected final String messageBusPath;
-
- protected MessageBus(FileSystem fs, String basePath) {
- this.fs = fs;
- this.basePath = basePath;
- this.messageBusPath = messageBusPath(basePath);
- }
-
- public static MessageDriver getDriver(FileSystem fs, String basePath) {
- return MessageDriver.getInstance(fs, basePath);
- }
-
- public static MessageClient getClient(FileSystem fs, String basePath) {
- return MessageClient.getSingleton(fs, basePath);
- }
-
- public static MessageClient getClient(String basePath) {
- FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf());
- return MessageClient.getSingleton(fs, basePath);
- }
-
- // -------------------------------------------------------------------------
- // Utilities
- // -------------------------------------------------------------------------
- public static boolean canAbort(String instant, long checkpointId) {
- return ABORTED_CKP_INSTANT.equals(instant) && MessageBus.notInitialCkp(checkpointId);
- }
-
- public static boolean notInitialCkp(long checkpointId) {
- return checkpointId != INITIAL_CKP_ID;
- }
-
- protected Path fullFilePath(String fileName) {
- return new Path(messageBusPath, fileName);
- }
-
- protected static String messageBusPath(String basePath) {
- return basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + MESSAGE_BUS;
- }
-
- protected static String getCommitFileName(long checkpointId) {
- return checkpointId + COMMIT_EXTENSION;
- }
-
- protected static String getAbortedFileName(long checkpointId) {
- return checkpointId + ABORTED_EXTENSION;
- }
-
- // -------------------------------------------------------------------------
- // Inner Class
- // -------------------------------------------------------------------------
-
- /**
- * A checkpoint message.
- */
- public static class CkpMessage {
- private static final String SEPARATOR = ",";
-
- public final boolean committed; // whether the checkpoint is committed
-
- public final long checkpointId;
- public final String commitInstant;
- public final String inflightInstant;
-
- private CkpMessage(long checkpointId, String commitInstant, String inflightInstant) {
- this.committed = true;
- this.checkpointId = checkpointId;
- this.commitInstant = commitInstant;
- this.inflightInstant = inflightInstant;
- }
-
- private CkpMessage(long checkpointId) {
- this.committed = false;
- this.checkpointId = checkpointId;
- this.commitInstant = ABORTED_CKP_INSTANT;
- this.inflightInstant = ABORTED_CKP_INSTANT;
- }
-
- /**
- * Encodes the instants as 'commitInstant,inflightInstant'.
- */
- public static byte[] toBytes(String commitInstant, String inflightInstant) {
- return (commitInstant + SEPARATOR + inflightInstant).getBytes(StandardCharsets.UTF_8);
- }
-
- public static CkpMessage fromBytes(long checkpointId, byte[] bytes) {
- String content = new String(bytes, StandardCharsets.UTF_8);
- String[] splits = content.split(SEPARATOR);
- return new CkpMessage(checkpointId, splits[0], splits[1]);
- }
-
- public static CkpMessage fromPath(FileSystem fs, Path path) throws IOException {
- final String[] splits = path.getName().split("\\.");
- ValidationUtils.checkState(splits.length == 2, "Invalid checkpoint message file name: " + path.getName());
- final long checkpointId = Long.parseLong(splits[0]);
- final String suffix = splits[1];
- if (suffix.equals(COMMIT)) {
- try (FSDataInputStream is = fs.open(path)) {
- byte[] bytes = FileIOUtils.readAsByteArray(is);
- return CkpMessage.fromBytes(checkpointId, bytes);
- }
- } else {
- return new CkpMessage(checkpointId);
- }
- }
- }
-}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java
deleted file mode 100644
index ea893d5367c79..0000000000000
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.message;
-
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieException;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * A client that consumes messages from the {@link MessageBus}.
- */
-public class MessageClient extends MessageBus {
- private static final Logger LOG = LoggerFactory.getLogger(MessageClient.class);
-
- private static final Map This expects to be called by the client.
- *
- * @param fs The filesystem
- * @param basePath The table base path
- * @return The instance of message bus
- */
- private static MessageClient getInstance(FileSystem fs, String basePath) {
- try {
- return new MessageClient(fs, basePath);
- } catch (IOException e) {
- throw new HoodieException("Initialize checkpoint message bus error", e);
- }
- }
-
- /**
- * Returns the singleton message bus instance.
- *
- * This expects to be called by the client.
- *
- * @param fs The filesystem
- * @param basePath The table base path
- * @return The instance of message bus
- */
- public static synchronized MessageClient getSingleton(FileSystem fs, String basePath) {
- return CLIENTS.computeIfAbsent(basePath,
- k -> getInstance(fs, basePath));
- }
-
- public synchronized Option This expects to be called by the driver.
- *
- * @param fs The filesystem
- * @param basePath The table base path
- * @return The instance of message bus
- */
- public static MessageDriver getInstance(FileSystem fs, String basePath) {
- try {
- return new MessageDriver(fs, basePath);
- } catch (IOException e) {
- throw new HoodieException("Initialize checkpoint message bus error", e);
- }
- }
-
- /**
- * Initialize the message bus, would clean all the messages.
- *
- * This expects to be called by the driver.
- */
- private void initialize() throws IOException {
- Path path = new Path(messageBusPath(basePath));
- if (fs.exists(path)) {
- fs.delete(path, true);
- }
- fs.mkdirs(path);
- }
-
- /**
- * Add a checkpoint commit message.
- *
- * @param checkpointId The checkpoint id
- * @param commitInstant The committed instant
- * @param inflightInstant The new inflight instant
- */
- public void commitCkp(long checkpointId, String commitInstant, String inflightInstant) {
- Path path = fullFilePath(getCommitFileName(checkpointId));
-
- try (FSDataOutputStream outputStream = fs.create(path, true)) {
- byte[] bytes = CkpMessage.toBytes(commitInstant, inflightInstant);
- outputStream.write(bytes);
- outputStream.close();
- this.ckpIdCache.put(checkpointId, true);
- clean();
- } catch (Throwable e) {
- throw new HoodieException("Adding committed message error for checkpoint: " + checkpointId, e);
- }
- }
-
- /**
- * Add an aborted checkpoint message.
- *
- * @param checkpointId The checkpoint id
- */
- public void abortCkp(long checkpointId) {
- Path path = fullFilePath(getAbortedFileName(checkpointId));
- try {
- fs.createNewFile(path);
- this.ckpIdCache.put(checkpointId, false);
- clean();
- } catch (Throwable e) {
- throw new HoodieException("Adding aborted message error for checkpoint: " + checkpointId, e);
- }
- }
-
- private void clean() throws IOException {
- int numToClean = this.ckpIdCache.size() - MESSAGE_QUEUE_LENGTH;
- if (numToClean >= 10) {
- for (int i = 0; i < numToClean; i++) {
- Map.Entry