diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java index 85e008199adca..f57484d886c9b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java @@ -29,7 +29,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -130,7 +129,7 @@ public void start(Function onShutdownCallback) { future = res.getKey(); executor = res.getValue(); started = true; - monitorThreads(onShutdownCallback); + shutdownCallback(onShutdownCallback); } /** @@ -141,34 +140,15 @@ public void start(Function onShutdownCallback) { protected abstract Pair startService(); /** - * A monitor thread is started which would trigger a callback if the service is shutdown. + * Add shutdown callback for the completable future. * - * @param onShutdownCallback + * @param callback The callback */ - private void monitorThreads(Function onShutdownCallback) { - LOG.info("Submitting monitor thread !!"); - Executors.newSingleThreadExecutor(r -> { - Thread t = new Thread(r, "Monitor Thread"); - t.setDaemon(isRunInDaemonMode()); - return t; - }).submit(() -> { - boolean error = false; - try { - LOG.info("Monitoring thread(s) !!"); - future.get(); - } catch (ExecutionException ex) { - LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex); - error = true; - } catch (InterruptedException ie) { - LOG.error("Got interrupted Monitoring threads", ie); - error = true; - } finally { - // Mark as shutdown - shutdown = true; - if (null != onShutdownCallback) { - onShutdownCallback.apply(error); - } - shutdown(false); + @SuppressWarnings("unchecked") + private void shutdownCallback(Function callback) { + future.whenComplete((resp, error) -> { + if (null != callback) { + callback.apply(null != error); } }); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 396023ac68c53..76b10fddd4389 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -425,7 +425,11 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp HoodieTableMetaClient metaClient) { setOperationType(writeOperationType); this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient); - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + if (null == this.asyncCleanerService) { + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + } else { + this.asyncCleanerService.start(null); + } } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java index 2fd4251d2f9f7..a5a38f2cc5949 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java @@ -37,28 +37,26 @@ class AsyncCleanerService extends HoodieAsyncService { private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class); private final AbstractHoodieWriteClient writeClient; - private final String cleanInstantTime; private final transient ExecutorService executor = Executors.newSingleThreadExecutor(); - protected AsyncCleanerService(AbstractHoodieWriteClient writeClient, String cleanInstantTime) { + protected AsyncCleanerService(AbstractHoodieWriteClient writeClient) { this.writeClient = writeClient; - this.cleanInstantTime = cleanInstantTime; } @Override protected Pair startService() { + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime); return Pair.of(CompletableFuture.supplyAsync(() -> { - writeClient.clean(cleanInstantTime); + writeClient.clean(instantTime); return true; - }), executor); + }, executor), executor); } public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) { AsyncCleanerService asyncCleanerService = null; if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) { - String instantTime = HoodieActiveTimeline.createNewInstantTime(); - LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime); - asyncCleanerService = new AsyncCleanerService(writeClient, instantTime); + asyncCleanerService = new AsyncCleanerService(writeClient); asyncCleanerService.start(null); } else { LOG.info("Async auto cleaning is not enabled. Not running cleaner now"); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 2ed2536c2db7a..4108ba425e8ca 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -281,7 +281,11 @@ public void initMetadataWriter() { * checkpoint finish. */ public void startAsyncCleaning() { - this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + if (this.asyncCleanerService == null) { + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this); + } else { + this.asyncCleanerService.start(null); + } } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 13154b217575c..195e430d0b1b3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -98,4 +98,11 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { public void initializeState(FunctionInitializationContext context) throws Exception { // no operation } + + @Override + public void close() throws Exception { + if (this.writeClient != null) { + this.writeClient.close(); + } + } }