Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringUpdateException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.utilities.HiveIncrementalPuller;
import org.apache.hudi.utilities.IdentitySplitter;
Expand Down Expand Up @@ -649,6 +651,8 @@ protected Pair<CompletableFuture, ExecutorService> startService() {
+ toSleepMs + " ms.");
Thread.sleep(toSleepMs);
}
} catch (HoodieUpsertException ue) {
handleUpsertException(ue);
} catch (Exception e) {
LOG.error("Shutting down delta-sync due to exception", e);
error = true;
Expand All @@ -662,6 +666,21 @@ protected Pair<CompletableFuture, ExecutorService> startService() {
}, executor), executor);
}

private void handleUpsertException(HoodieUpsertException ue) {
if (ue.getCause() instanceof HoodieClusteringUpdateException) {
LOG.warn("Write rejected due to conflicts with pending clustering operation. Going to retry after 1 min with the hope "
+ "that clustering will complete by then.", ue);
try {
Thread.sleep(60000); // Intentionally not using cfg.minSyncIntervalSeconds, since it could be too high or it could be 0.
// Once the delta streamer gets past this clustering update exception, regular syncs will honor cfg.minSyncIntervalSeconds.
} catch (InterruptedException e) {
throw new HoodieException("Deltastreamer interrupted while waiting for next round ", e);
}
} else {
throw ue;
}
}

/**
* Shutdown async services like compaction/clustering as DeltaSync is shutdown.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, S

protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute, Boolean retryLastFailedClusteringJob) {
HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
clusteringInstantTime, runSchedule, scheduleAndExecute, retryLastFailedClusteringJob);
clusteringInstantTime, runSchedule, scheduleAndExecute, retryLastFailedClusteringJob);
return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
}

Expand Down Expand Up @@ -924,6 +924,31 @@ public void testAsyncClusteringService() throws Exception {
});
}

/**
* When deltastreamer writes clashes with pending clustering, deltastreamer should keep retrying and eventually succeed(once clustering completes)
* w/o failing mid way.
*
* @throws Exception
*/
@Test
public void testAsyncClusteringServiceWithConflicts() throws Exception {
String tableBasePath = dfsBasePath + "/asyncClusteringWithConflicts";
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;

// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
return true;
});
}

@ParameterizedTest
@ValueSource(strings = {"true", "false"})
public void testAsyncClusteringServiceWithCompaction(String preserveCommitMetadata) throws Exception {
Expand Down Expand Up @@ -1312,7 +1337,7 @@ private void testORCDFSSource(boolean useSchemaProvider, List<String> transforme

// Properties used for testing delta-streamer with orc source
orcProps.setProperty("include", "base.properties");
orcProps.setProperty("hoodie.embed.timeline.server","false");
orcProps.setProperty("hoodie.embed.timeline.server", "false");
orcProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
orcProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
if (useSchemaProvider) {
Expand All @@ -1326,9 +1351,9 @@ private void testORCDFSSource(boolean useSchemaProvider, List<String> transforme

String tableBasePath = dfsBasePath + "/test_orc_source_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ORCDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_ORC, false,
useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ORCDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_ORC, false,
useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(ORC_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
testNum++;
Expand Down Expand Up @@ -1797,8 +1822,8 @@ public Schema getTargetSchema() {
private static Stream<Arguments> testORCDFSSource() {
// arg1 boolean useSchemaProvider, arg2 List<String> transformerClassNames
return Stream.of(
arguments(false, null),
arguments(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()))
arguments(false, null),
arguments(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()))
);
}

Expand Down