Skip to content

Conversation

@yihua
Copy link
Contributor

@yihua yihua commented Feb 16, 2022

What is the purpose of the pull request

In the continuous mode with async table services turned on such as compaction and clustering, Deltastreamer does not properly shut down the async table services and the delta sync service upon failure from the commit action not in the table service. This causes the Spark application to hang without doing any work, and the Spark app does not exit in this case. The root cause is due to a cyclic shutdown logic causing the delta sync service and the async table service to wait for each other to terminate. This PR fixes the shutdown logic so that after any failure, from the commit action or async table services, the Deltastreamer can shut down all services properly and the Spark app can exit.

Specifically, through Remote JVM Debug and YourKit Java profiling, the sequence of events below during shutdown is followed, causing endless waiting, and the executor of DeltaSyncService is not properly shut down:

BaseHoodieWriteClient
-> public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline, boolean skipLocking) throws HoodieIOException
 -> throw new IllegalMonitorStateException("Simulate lock exception");

HoodieDeltaStreamer (HoodieDeltaStreamer$DeltaSyncService)
-> protected Pair<CompletableFuture, ExecutorService> startService() 
 -> LOG.error("Shutting down delta-sync due to exception", e);
 -> shutdownAsyncServices(error);

HoodieAsyncService (SparkAsyncCompactService)
-> public void shutdown(boolean force)
 -> executor.shutdown();
 -> executor.awaitTermination(24, TimeUnit.HOURS);

AsyncCompactService (SparkAsyncCompactService)
-> protected Pair<CompletableFuture, ExecutorService> startService()
 -> return true;

HoodieAsyncService (HoodieDeltaStreamer$DeltaSyncService)
-> public void shutdown(boolean force)
 -> executor.shutdown();
 -> executor.awaitTermination(24, TimeUnit.HOURS);

Brief change log

  • Fix shutdown callback of async table services and shutdown logic in HoodieDeltaStreamer and introduce other triggers for exiting in HoodieAsyncService.

Verify this pull request

Three failure cases are tested locally and Deltastreamer exits in all of them:

  • Case 1: Deltastreamer continuous mode writing MOR table, async cleaner enabled, clean fails
  • Case 2: Deltastreamer continuous mode writing COW table, async clustering enabled, clustering execution fails
  • Case 3: Deltastreamer continuous mode writing MOR table, async cleaner enabled, delta commit fails
    (See how the failures are triggered through code at the end)

Sequence of events in Case 1 after the fix:

BaseHoodieWriteClient
-> public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline, boolean skipLocking) throws HoodieIOException
 -> throw new IllegalMonitorStateException("Simulate lock exception");

HoodieDeltaStreamer (HoodieDeltaStreamer$DeltaSyncService)
-> protected Pair<CompletableFuture, ExecutorService> startService() 
 -> LOG.error("Shutting down delta-sync due to exception", e);
 -> shutdownAsyncServices(error);
 -> executor.shutdownNow();

HoodieAsyncService (HoodieDeltaStreamer$DeltaSyncService)
-> public void waitForShutdown() throws ExecutionException, InterruptedException 
 -> LOG.error("Service shutdown with error", ex);

HoodieDeltaStreamer
-> public void sync() throws Exception 
 -> throw new HoodieException(e.getMessage(), e);

HoodieDeltaStreamer
-> public static void main(String[] args) throws Exception
 -> jssc.stop();

HoodieDeltaStreamer
-> public void close()
 -> deltaSync.close();

Sequence of events in Case 2 after the fix:

SparkExecuteClusteringCommitActionExecutor
-> public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute()
 -> throw new HoodieException("Simulate clustering error");

HoodieDeltaStreamer (HoodieDeltaStreamer$DeltaSyncService)
-> protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) {
 -> in callback: asyncClusteringService.get().start((error) -> true);

HoodieDeltaStreamer (HoodieDeltaStreamer$DeltaSyncService)
-> protected Pair<CompletableFuture, ExecutorService> startService() 
 -> throw new HoodieException("Async clustering failed.  Shutting down Delta Sync...");
 -> LOG.error("Shutting down delta-sync due to exception", e);
 -> shutdownAsyncServices(error);
 -> executor.shutdownNow();

HoodieAsyncService (HoodieDeltaStreamer$DeltaSyncService)
-> public void waitForShutdown() throws ExecutionException, InterruptedException 
 -> LOG.error("Service shutdown with error", ex);

HoodieDeltaStreamer
-> public void sync() throws Exception 
 -> throw new HoodieException(e.getMessage(), e);

HoodieDeltaStreamer
-> public static void main(String[] args) throws Exception
 -> jssc.stop();

HoodieDeltaStreamer
-> public void close()
 -> deltaSync.close();

Sequence of events in Case 3 after the fix:

BaseHoodieWriteClient
-> public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
                             String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) 
 -> after preCommit(inflightInstant, metadata), throw new IOException("Simulate IOException");

HoodieDeltaStreamer (HoodieDeltaStreamer$DeltaSyncService)
-> protected Pair<CompletableFuture, ExecutorService> startService()
 -> LOG.error("Shutting down delta-sync due to exception", e);
 -> shutdownAsyncServices(error);
 -> executor.shutdownNow();

HoodieAsyncService (HoodieDeltaStreamer$DeltaSyncService)
-> public void waitForShutdown() throws ExecutionException, InterruptedException 
 -> LOG.error("Service shutdown with error", ex);

HoodieDeltaStreamer
-> public void sync() throws Exception 
 -> throw new HoodieException(e.getMessage(), e);

HoodieDeltaStreamer
-> public static void main(String[] args) throws Exception
 -> jssc.stop();

HoodieDeltaStreamer
-> public void close()
 -> deltaSync.close();

Simulate failures:
Clean: BaseHoodieWriteClient.java

@@ -775,6 +778,8 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
        HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
+    throw new IllegalMonitorStateException("Simulate lock exception");
+    /*
...

Clustering: SparkExecuteClusteringCommitActionExecutor.java

@@ -81,6 +80,8 @@ public SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext context,
JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
+    throw new HoodieException("Simulate clustering error");
+    /*
...

Commit: BaseHoodieWriteClient.java

@@ -198,16 +196,19 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
preCommit(inflightInstant, metadata);
+      throw new IOException("Simulate IOException");
+      /*
...

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@xushiyan xushiyan self-assigned this Feb 16, 2022
@xushiyan xushiyan added the priority:critical Production degraded; pipelines stalled label Feb 16, 2022
Copy link
Contributor

@zhangyue19921010 zhangyue19921010 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM just a minor question :) Thanks for this contribution.

}
} finally {
shutdownAsyncServices(error);
executor.shutdownNow();
Copy link
Contributor

@zhangyue19921010 zhangyue19921010 Feb 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi ethan, we could use shutdownAsyncServices(error)to close async compaction and async clustering properly, and quit the loop/finished Thread directly. Is it necessary to still call executor.shutdownNow();here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to remove it, but then the DeltaStreamer Spark app would not exit. This is to shut down the DeltaSync service's executor thread pool.

Copy link
Member

@xushiyan xushiyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one nit clarification. really good triaging and fix.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:critical Production degraded; pipelines stalled

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants