diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index bffd7d2a251ab..36d60a4001225 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -32,6 +32,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.util.StreamerUtil; @@ -41,6 +42,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -346,6 +348,12 @@ private void reloadWriteMetaState() throws Exception { writeStatuses.clear(); } + public void handleOperatorEvent(OperatorEvent event) { + ValidationUtils.checkArgument(event instanceof CommitAckEvent, + "The write function can only handle CommitAckEvent"); + this.confirming = false; + } + /** * Represents a data item in the buffer, this is needed to reduce the * memory footprint. @@ -558,14 +566,14 @@ private String instantToWrite(boolean hasData) { String instant = this.writeClient.getLastPendingInstant(this.actionType); // if exactly-once semantics turns on, // waits for the checkpoint notification until the checkpoint timeout threshold hits. - if (confirming) { - long waitingTime = 0L; - long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT); - long interval = 500L; + long waitingTime = 0L; + long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT); + long interval = 500L; + while (confirming) { // wait condition: // 1. there is no inflight instant // 2. the inflight instant does not change and the checkpoint has buffering data - while (instant == null || (instant.equals(this.currentInstant) && hasData)) { + if (instant == null || (instant.equals(this.currentInstant) && hasData)) { // sleep for a while try { if (waitingTime > ckpTimeout) { @@ -578,10 +586,11 @@ private String instantToWrite(boolean hasData) { } // refresh the inflight instant instant = this.writeClient.getLastPendingInstant(this.actionType); + } else { + // the inflight instant changed, which means the last instant was committed + // successfully. + confirming = false; } - // the inflight instant changed, which means the last instant was committed - // successfully. - confirming = false; } return instant; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java index b0f8328c1d56c..c16743e2ace80 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperator.java @@ -42,8 +42,8 @@ public StreamWriteOperator(Configuration conf) { } @Override - public void handleOperatorEvent(OperatorEvent operatorEvent) { - // do nothing + public void handleOperatorEvent(OperatorEvent event) { + this.sinkFunction.handleOperatorEvent(event); } void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 84f3c0b1e4b98..cdcd4e6265554 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.utils.CoordinatorExecutor; import org.apache.hudi.sink.utils.HiveSyncContext; @@ -40,6 +41,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +58,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists; @@ -353,6 +356,26 @@ private void handleWriteMetaEvent(WriteMetadataEvent event) { addEventToBuffer(event); } + /** + * The coordinator reuses the instant if there is no data for this round of checkpoint, + * sends the commit ack events to unblock the flushing. + */ + private void sendCommitAckEvents() { + CompletableFuture[] futures = IntStream.range(0, this.parallelism) + .mapToObj(taskID -> { + try { + return this.context.sendEvent(CommitAckEvent.getInstance(), taskID); + } catch (TaskNotRunningException e) { + throw new HoodieException("Error while sending commit ack event to task [" + taskID + "] error", e); + } + }).toArray(CompletableFuture[]::new); + try { + CompletableFuture.allOf(futures).get(); + } catch (Exception e) { + throw new HoodieException("Error while waiting for the commit ack events to finish sending", e); + } + } + /** * Commits the instant. * @@ -373,6 +396,8 @@ private boolean commitInstant(String instant) { if (writeResults.size() == 0) { // No data has written, reset the buffer and returns early reset(); + // Send commit ack event to the write function to unblock the flushing + sendCommitAckEvents(); return false; } doCommit(instant, writeResults); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java new file mode 100644 index 0000000000000..7c81dee379ba7 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.event; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; + +/** + * An operator event to mark successful instant commit. + */ +public class CommitAckEvent implements OperatorEvent { + private static final long serialVersionUID = 1L; + + private static final CommitAckEvent INSTANCE = new CommitAckEvent(); + + public static CommitAckEvent getInstance() { + return INSTANCE; + } +}