diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java index f57484d886c9b..85e008199adca 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java @@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -129,7 +130,7 @@ public void start(Function onShutdownCallback) { future = res.getKey(); executor = res.getValue(); started = true; - shutdownCallback(onShutdownCallback); + monitorThreads(onShutdownCallback); } /** @@ -140,15 +141,34 @@ public void start(Function onShutdownCallback) { protected abstract Pair startService(); /** - * Add shutdown callback for the completable future. + * A monitor thread is started which would trigger a callback if the service is shutdown. * - * @param callback The callback + * @param onShutdownCallback */ - @SuppressWarnings("unchecked") - private void shutdownCallback(Function callback) { - future.whenComplete((resp, error) -> { - if (null != callback) { - callback.apply(null != error); + private void monitorThreads(Function onShutdownCallback) { + LOG.info("Submitting monitor thread !!"); + Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "Monitor Thread"); + t.setDaemon(isRunInDaemonMode()); + return t; + }).submit(() -> { + boolean error = false; + try { + LOG.info("Monitoring thread(s) !!"); + future.get(); + } catch (ExecutionException ex) { + LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex); + error = true; + } catch (InterruptedException ie) { + LOG.error("Got interrupted Monitoring threads", ie); + error = true; + } finally { + // Mark as shutdown + shutdown = true; + if (null != onShutdownCallback) { + onShutdownCallback.apply(error); + } + shutdown(false); } }); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 286e3adbb7824..3fe28c64bc5cc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -425,11 +425,7 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp HoodieTableMetaClient metaClient) { setOperationType(writeOperationType); this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient); - if (null == this.asyncCleanerService) { - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); - } else { - this.asyncCleanerService.start(null); - } + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java index a5a38f2cc5949..2fd4251d2f9f7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java @@ -37,26 +37,28 @@ class AsyncCleanerService extends HoodieAsyncService { private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class); private final AbstractHoodieWriteClient writeClient; + private final String cleanInstantTime; private final transient ExecutorService executor = Executors.newSingleThreadExecutor(); - protected AsyncCleanerService(AbstractHoodieWriteClient writeClient) { + protected AsyncCleanerService(AbstractHoodieWriteClient writeClient, String cleanInstantTime) { this.writeClient = writeClient; + this.cleanInstantTime = cleanInstantTime; } @Override protected Pair startService() { - String instantTime = HoodieActiveTimeline.createNewInstantTime(); - LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime); return Pair.of(CompletableFuture.supplyAsync(() -> { - writeClient.clean(instantTime); + writeClient.clean(cleanInstantTime); return true; - }, executor), executor); + }), executor); } public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) { AsyncCleanerService asyncCleanerService = null; if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) { - asyncCleanerService = new AsyncCleanerService(writeClient); + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime); + asyncCleanerService = new AsyncCleanerService(writeClient, instantTime); asyncCleanerService.start(null); } else { LOG.info("Async auto cleaning is not enabled. Not running cleaner now"); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 9a56d6da49f42..374dd1226ca25 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -281,11 +281,7 @@ public void initMetadataWriter() { * checkpoint finish. */ public void startAsyncCleaning() { - if (this.asyncCleanerService == null) { - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); - } else { - this.asyncCleanerService.start(null); - } + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index bb7900624c7e3..77d6630044670 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -98,11 +98,4 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { public void initializeState(FunctionInitializationContext context) throws Exception { // no operation } - - @Override - public void close() throws Exception { - if (this.writeClient != null) { - this.writeClient.close(); - } - } }