Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {

if (event.isEndInput()) {
// handle end input event synchronously
handleEndInputEvent(event);
// wrap handleEndInputEvent in executeSync to preserve the order of events
executor.executeSync(() -> handleEndInputEvent(event), "handle end input event for instant %s", this.instant);
} else {
executor.execute(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@ public boolean isReady(String currentInstant) {
return lastBatch && this.instantTime.equals(currentInstant);
}

@Override
public String toString() {
return "WriteMetadataEvent{"
+ "writeStatusesSize=" + writeStatuses.size()
+ ", taskID=" + taskID
+ ", instantTime='" + instantTime + '\''
+ ", lastBatch=" + lastBatch
+ ", endInput=" + endInput
+ ", bootstrap=" + bootstrap
+ '}';
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import javax.annotation.Nullable;

import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
* An executor service that catches all the throwable with logging.
Expand Down Expand Up @@ -85,25 +87,21 @@ public void execute(
final ExceptionHook hook,
final String actionName,
final Object... actionParams) {
executor.execute(wrapAction(action, hook, actionName, actionParams));
}

executor.execute(
() -> {
final String actionString = String.format(actionName, actionParams);
try {
action.run();
logger.info("Executor executes action [{}] success!", actionString);
} catch (Throwable t) {
// if we have a JVM critical error, promote it immediately, there is a good
// chance the
// logging or job failing will not succeed any more
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
final String errMsg = String.format("Executor executes action [%s] error", actionString);
logger.error(errMsg, t);
if (hook != null) {
hook.apply(errMsg, t);
}
}
});
/**
* Run the action in a loop and wait for completion.
*/
public void executeSync(ThrowingRunnable<Throwable> action, String actionName, Object... actionParams) {
try {
executor.submit(wrapAction(action, this.exceptionHook, actionName, actionParams)).get();
} catch (InterruptedException e) {
handleException(e, this.exceptionHook, getActionString(actionName, actionParams));
} catch (ExecutionException e) {
// nonfatal exceptions are handled by wrapAction
ExceptionUtils.rethrowIfFatalErrorOrOOM(e.getCause());
}
}

@Override
Expand All @@ -120,6 +118,40 @@ public void close() throws Exception {
}
}

private <E extends Throwable> Runnable wrapAction(
final ThrowingRunnable<E> action,
final ExceptionHook hook,
final String actionName,
final Object... actionParams) {

return () -> {
final Supplier<String> actionString = getActionString(actionName, actionParams);
try {
action.run();
logger.info("Executor executes action [{}] success!", actionString.get());
} catch (Throwable t) {
handleException(t, hook, actionString);
}
};
}

private void handleException(Throwable t, ExceptionHook hook, Supplier<String> actionString) {
// if we have a JVM critical error, promote it immediately, there is a good
// chance the
// logging or job failing will not succeed any more
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
final String errMsg = String.format("Executor executes action [%s] error", actionString.get());
logger.error(errMsg, t);
if (hook != null) {
hook.apply(errMsg, t);
}
}

private Supplier<String> getActionString(String actionName, Object... actionParams) {
// avoid String.format before OOM rethrown
return () -> String.format(actionName, actionParams);
}

// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestUtils;
Expand All @@ -46,11 +47,14 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import org.slf4j.Logger;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -298,6 +302,40 @@ void testSyncMetadataTableWithReusedInstant() throws Exception {
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
}

@Test
public void testEndInputIsTheLastEvent() throws Exception {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
Logger logger = Mockito.mock(Logger.class); // avoid too many logs by executor
NonThrownExecutor executor = NonThrownExecutor.builder(logger).waitForTasksFinish(true).build();

try (StreamWriteOperatorCoordinator coordinator = new StreamWriteOperatorCoordinator(conf, context)) {
coordinator.start();
coordinator.setExecutor(executor);
coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0));
TimeUnit.SECONDS.sleep(5); // wait for handled bootstrap event

int eventCount = 20_000; // big enough to fill executor's queue
for (int i = 0; i < eventCount; i++) {
coordinator.handleEventFromOperator(0, createOperatorEvent(0, coordinator.getInstant(), "par1", true, 0.1));
}

WriteMetadataEvent endInput = WriteMetadataEvent.builder()
.taskID(0)
.instantTime(coordinator.getInstant())
.writeStatus(Collections.emptyList())
.endInput(true)
.build();
coordinator.handleEventFromOperator(0, endInput);

// wait for submitted events completed
executor.close();

// there should be no events after endInput
assertNull(coordinator.getEventBuffer()[0]);
}
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
Expand Down