diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 75e8beaef17c..25b34b11e7cc 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -285,7 +285,8 @@ public void handleEventFromOperator(int i, OperatorEvent operatorEvent) { if (event.isEndInput()) { // handle end input event synchronously - handleEndInputEvent(event); + // wrap handleEndInputEvent in executeSync to preserve the order of events + executor.executeSync(() -> handleEndInputEvent(event), "handle end input event for instant %s", this.instant); } else { executor.execute( () -> { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java index eb89b5ad7ebb..0eb06bdd822f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java @@ -163,6 +163,18 @@ public boolean isReady(String currentInstant) { return lastBatch && this.instantTime.equals(currentInstant); } + @Override + public String toString() { + return "WriteMetadataEvent{" + + "writeStatusesSize=" + writeStatuses.size() + + ", taskID=" + taskID + + ", instantTime='" + instantTime + '\'' + + ", lastBatch=" + lastBatch + + ", endInput=" + endInput + + ", bootstrap=" + bootstrap + + '}'; + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java index 242b3ee0d8b0..535e05f68763 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java @@ -26,9 +26,11 @@ import javax.annotation.Nullable; import java.util.Objects; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; /** * An executor service that catches all the throwable with logging. @@ -85,25 +87,21 @@ public void execute( final ExceptionHook hook, final String actionName, final Object... actionParams) { + executor.execute(wrapAction(action, hook, actionName, actionParams)); + } - executor.execute( - () -> { - final String actionString = String.format(actionName, actionParams); - try { - action.run(); - logger.info("Executor executes action [{}] success!", actionString); - } catch (Throwable t) { - // if we have a JVM critical error, promote it immediately, there is a good - // chance the - // logging or job failing will not succeed any more - ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - final String errMsg = String.format("Executor executes action [%s] error", actionString); - logger.error(errMsg, t); - if (hook != null) { - hook.apply(errMsg, t); - } - } - }); + /** + * Run the action in a loop and wait for completion. + */ + public void executeSync(ThrowingRunnable action, String actionName, Object... actionParams) { + try { + executor.submit(wrapAction(action, this.exceptionHook, actionName, actionParams)).get(); + } catch (InterruptedException e) { + handleException(e, this.exceptionHook, getActionString(actionName, actionParams)); + } catch (ExecutionException e) { + // nonfatal exceptions are handled by wrapAction + ExceptionUtils.rethrowIfFatalErrorOrOOM(e.getCause()); + } } @Override @@ -120,6 +118,40 @@ public void close() throws Exception { } } + private Runnable wrapAction( + final ThrowingRunnable action, + final ExceptionHook hook, + final String actionName, + final Object... actionParams) { + + return () -> { + final Supplier actionString = getActionString(actionName, actionParams); + try { + action.run(); + logger.info("Executor executes action [{}] success!", actionString.get()); + } catch (Throwable t) { + handleException(t, hook, actionString); + } + }; + } + + private void handleException(Throwable t, ExceptionHook hook, Supplier actionString) { + // if we have a JVM critical error, promote it immediately, there is a good + // chance the + // logging or job failing will not succeed any more + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + final String errMsg = String.format("Executor executes action [%s] error", actionString.get()); + logger.error(errMsg, t); + if (hook != null) { + hook.apply(errMsg, t); + } + } + + private Supplier getActionString(String actionName, Object... actionParams) { + // avoid String.format before OOM rethrown + return () -> String.format(actionName, actionParams); + } + // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 59a0580e56c5..d5d35f7494f4 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -30,6 +30,7 @@ import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.utils.MockCoordinatorExecutor; +import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestUtils; @@ -46,11 +47,14 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; +import org.slf4j.Logger; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; @@ -298,6 +302,40 @@ void testSyncMetadataTableWithReusedInstant() throws Exception { assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant)); } + @Test + public void testEndInputIsTheLastEvent() throws Exception { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1); + Logger logger = Mockito.mock(Logger.class); // avoid too many logs by executor + NonThrownExecutor executor = NonThrownExecutor.builder(logger).waitForTasksFinish(true).build(); + + try (StreamWriteOperatorCoordinator coordinator = new StreamWriteOperatorCoordinator(conf, context)) { + coordinator.start(); + coordinator.setExecutor(executor); + coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0)); + TimeUnit.SECONDS.sleep(5); // wait for handled bootstrap event + + int eventCount = 20_000; // big enough to fill executor's queue + for (int i = 0; i < eventCount; i++) { + coordinator.handleEventFromOperator(0, createOperatorEvent(0, coordinator.getInstant(), "par1", true, 0.1)); + } + + WriteMetadataEvent endInput = WriteMetadataEvent.builder() + .taskID(0) + .instantTime(coordinator.getInstant()) + .writeStatus(Collections.emptyList()) + .endInput(true) + .build(); + coordinator.handleEventFromOperator(0, endInput); + + // wait for submitted events completed + executor.close(); + + // there should be no events after endInput + assertNull(coordinator.getEventBuffer()[0]); + } + } + // ------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------