Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -130,7 +129,7 @@ public void start(Function<Boolean, Boolean> onShutdownCallback) {
future = res.getKey();
executor = res.getValue();
started = true;
monitorThreads(onShutdownCallback);
shutdownCallback(onShutdownCallback);
}

/**
Expand All @@ -141,34 +140,15 @@ public void start(Function<Boolean, Boolean> onShutdownCallback) {
protected abstract Pair<CompletableFuture, ExecutorService> startService();

/**
* A monitor thread is started which would trigger a callback if the service is shutdown.
* Add shutdown callback for the completable future.
*
* @param onShutdownCallback
* @param callback The callback
*/
private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
LOG.info("Submitting monitor thread !!");
Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "Monitor Thread");
t.setDaemon(isRunInDaemonMode());
return t;
}).submit(() -> {
boolean error = false;
try {
LOG.info("Monitoring thread(s) !!");
future.get();
} catch (ExecutionException ex) {
LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex);
error = true;
} catch (InterruptedException ie) {
LOG.error("Got interrupted Monitoring threads", ie);
error = true;
} finally {
// Mark as shutdown
shutdown = true;
if (null != onShutdownCallback) {
onShutdownCallback.apply(error);
}
shutdown(false);
@SuppressWarnings("unchecked")
private void shutdownCallback(Function<Boolean, Boolean> callback) {
future.whenComplete((resp, error) -> {
if (null != callback) {
callback.apply(null != error);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,11 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
if (null == this.asyncCleanerService) {
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
} else {
this.asyncCleanerService.start(null);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,26 @@ class AsyncCleanerService extends HoodieAsyncService {
private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class);

private final AbstractHoodieWriteClient writeClient;
private final String cleanInstantTime;
private final transient ExecutorService executor = Executors.newSingleThreadExecutor();

protected AsyncCleanerService(AbstractHoodieWriteClient writeClient, String cleanInstantTime) {
protected AsyncCleanerService(AbstractHoodieWriteClient writeClient) {
this.writeClient = writeClient;
this.cleanInstantTime = cleanInstantTime;
}

@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
return Pair.of(CompletableFuture.supplyAsync(() -> {
writeClient.clean(cleanInstantTime);
writeClient.clean(instantTime);
return true;
}), executor);
}, executor), executor);
}

public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) {
AsyncCleanerService asyncCleanerService = null;
if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
asyncCleanerService = new AsyncCleanerService(writeClient, instantTime);
asyncCleanerService = new AsyncCleanerService(writeClient);
asyncCleanerService.start(null);
} else {
LOG.info("Async auto cleaning is not enabled. Not running cleaner now");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,11 @@ public void initMetadataWriter() {
* checkpoint finish.
*/
public void startAsyncCleaning() {
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
if (this.asyncCleanerService == null) {
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
} else {
this.asyncCleanerService.start(null);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,11 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
public void initializeState(FunctionInitializationContext context) throws Exception {
// no operation
}

@Override
public void close() throws Exception {
if (this.writeClient != null) {
this.writeClient.close();
}
}
}