diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 408a71484d8d4..110b189474966 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -46,8 +46,10 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringUpdateException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.utilities.HiveIncrementalPuller; import org.apache.hudi.utilities.IdentitySplitter; @@ -649,6 +651,8 @@ protected Pair startService() { + toSleepMs + " ms."); Thread.sleep(toSleepMs); } + } catch (HoodieUpsertException ue) { + handleUpsertException(ue); } catch (Exception e) { LOG.error("Shutting down delta-sync due to exception", e); error = true; @@ -662,6 +666,21 @@ protected Pair startService() { }, executor), executor); } + private void handleUpsertException(HoodieUpsertException ue) { + if (ue.getCause() instanceof HoodieClusteringUpdateException) { + LOG.warn("Write rejected due to conflicts with pending clustering operation. Going to retry after 1 min with the hope " + + "that clustering will complete by then.", ue); + try { + Thread.sleep(60000); // Intentionally not using cfg.minSyncIntervalSeconds, since it could be too high or it could be 0. + // Once the delta streamer gets past this clustering update exception, regular syncs will honor cfg.minSyncIntervalSeconds. + } catch (InterruptedException e) { + throw new HoodieException("Deltastreamer interrupted while waiting for next round ", e); + } + } else { + throw ue; + } + } + /** * Shutdown async services like compaction/clustering as DeltaSync is shutdown. */ diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 227623eeb5a1e..f595ed04a7503 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -145,7 +145,7 @@ protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, S protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute, Boolean retryLastFailedClusteringJob) { HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, - clusteringInstantTime, runSchedule, scheduleAndExecute, retryLastFailedClusteringJob); + clusteringInstantTime, runSchedule, scheduleAndExecute, retryLastFailedClusteringJob); return new HoodieClusteringJob(jsc, scheduleClusteringConfig); } @@ -924,6 +924,31 @@ public void testAsyncClusteringService() throws Exception { }); } + /** + * When deltastreamer writes clashes with pending clustering, deltastreamer should keep retrying and eventually succeed(once clustering completes) + * w/o failing mid way. + * + * @throws Exception + */ + @Test + public void testAsyncClusteringServiceWithConflicts() throws Exception { + String tableBasePath = dfsBasePath + "/asyncClusteringWithConflicts"; + // Keep it higher than batch-size to test continuous mode + int totalRecords = 3000; + + // Initial bulk insert + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); + cfg.continuousMode = true; + cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2")); + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + deltaStreamerTestRunner(ds, cfg, (r) -> { + TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + return true; + }); + } + @ParameterizedTest @ValueSource(strings = {"true", "false"}) public void testAsyncClusteringServiceWithCompaction(String preserveCommitMetadata) throws Exception { @@ -1312,7 +1337,7 @@ private void testORCDFSSource(boolean useSchemaProvider, List transforme // Properties used for testing delta-streamer with orc source orcProps.setProperty("include", "base.properties"); - orcProps.setProperty("hoodie.embed.timeline.server","false"); + orcProps.setProperty("hoodie.embed.timeline.server", "false"); orcProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); orcProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); if (useSchemaProvider) { @@ -1326,9 +1351,9 @@ private void testORCDFSSource(boolean useSchemaProvider, List transforme String tableBasePath = dfsBasePath + "/test_orc_source_table" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( - TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ORCDFSSource.class.getName(), - transformerClassNames, PROPS_FILENAME_TEST_ORC, false, - useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc); + TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ORCDFSSource.class.getName(), + transformerClassNames, PROPS_FILENAME_TEST_ORC, false, + useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); TestHelpers.assertRecordCount(ORC_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); testNum++; @@ -1797,8 +1822,8 @@ public Schema getTargetSchema() { private static Stream testORCDFSSource() { // arg1 boolean useSchemaProvider, arg2 List transformerClassNames return Stream.of( - arguments(false, null), - arguments(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())) + arguments(false, null), + arguments(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())) ); }