diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 78ac5a5b08b91..be965512460cc 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -313,7 +313,7 @@ static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String l static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); + HoodieTimeline timeline = meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); int numDeltaCommits = (int) timeline.getInstants().count(); assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); @@ -336,7 +336,7 @@ static void waitTillCondition(Function condition, Future dsFut boolean ret = false; while (!ret && !dsFuture.isDone()) { try { - Thread.sleep(5000); + Thread.sleep(3000); ret = condition.apply(true); } catch (Throwable error) { LOG.warn("Got error :", error); @@ -713,14 +713,18 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir } static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function condition) throws Exception { + deltaStreamerTestRunner(ds, cfg, condition, "single_ds_job"); + } + + static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function condition, String jobId) throws Exception { Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> { try { ds.sync(); } catch (Exception ex) { + LOG.warn("DS continuous job failed, hence not proceeding with condition check for " + jobId); throw new RuntimeException(ex.getMessage(), ex); } }); - TestHelpers.waitTillCondition(condition, dsFuture, 360); ds.shutdownGracefully(); dsFuture.get(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java index 39ca00ac000a2..7307dec2673d3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -35,6 +35,8 @@ import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs; import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; @@ -50,6 +52,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import static org.apache.hudi.common.testutils.FixtureUtils.prepareFixtureTable; @@ -73,6 +76,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness { private static final String COW_TEST_TABLE_NAME = "testtable_COPY_ON_WRITE"; + private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class); String basePath; String propsFilePath; @@ -85,7 +89,6 @@ void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) thr // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts setUpTestTable(tableType); prepareInitialConfigs(fs(), basePath, "foo"); - // enable carrying forward latest checkpoint TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); @@ -104,7 +107,7 @@ void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) thr propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); cfgBackfillJob.continuousMode = false; HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + HoodieTimeline timeline = meta.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY); @@ -117,7 +120,7 @@ void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) thr // run ingestion & backfill in parallel, create conflict and fail one runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2, - cfgIngestionJob, backfillJob, cfgBackfillJob, true); + cfgIngestionJob, backfillJob, cfgBackfillJob, true, "batch1"); // create new ingestion & backfill job config to generate only INSERTS to avoid conflict props = prepareMultiWriterProps(fs(), basePath, propsFilePath); @@ -125,30 +128,30 @@ void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) thr props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); props.setProperty("hoodie.test.source.generate.inserts", "true"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER); - cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT, + HoodieDeltaStreamer.Config cfgBackfillJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT, propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName())); - cfgBackfillJob.continuousMode = false; + cfgBackfillJob2.continuousMode = false; meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); commitMetadata = HoodieCommitMetadata .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); - cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY); - cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + cfgBackfillJob2.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY); + cfgBackfillJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfgBackfillJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); - cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, + HoodieDeltaStreamer.Config cfgIngestionJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName())); - cfgIngestionJob.continuousMode = true; - cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + cfgIngestionJob2.continuousMode = true; + cfgIngestionJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfgIngestionJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); // re-init ingestion job - HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob, jsc()); + HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob2, jsc()); // re-init backfill job - HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob, jsc()); + HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob2, jsc()); // run ingestion & backfill in parallel, avoid conflict and succeed both runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob3, - cfgIngestionJob, backfillJob2, cfgBackfillJob, false); + cfgIngestionJob2, backfillJob2, cfgBackfillJob2, false, "batch2"); } @ParameterizedTest @@ -301,7 +304,7 @@ private void setUpTestTable(HoodieTableType tableType) throws IOException { private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords, HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob, - HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception { + HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict, String jobId) throws Exception { ExecutorService service = Executors.newFixedThreadPool(2); HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); @@ -318,18 +321,27 @@ private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, return true; }; + AtomicBoolean continousFailed = new AtomicBoolean(false); + AtomicBoolean backfillFailed = new AtomicBoolean(false); try { Future regularIngestionJobFuture = service.submit(() -> { try { - deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion); - } catch (Exception ex) { + deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion, jobId); + } catch (Throwable ex) { + continousFailed.set(true); + LOG.error("Continuous job failed " + ex.getMessage()); throw new RuntimeException(ex); } }); Future backfillJobFuture = service.submit(() -> { try { + // trigger backfill atleast after 1 requested entry is added to timline from continuous job. If not, there is a chance that backfill will complete even before + // continous job starts. + awaitCondition(new GetCommitsAfterInstant(tableBasePath, lastSuccessfulCommit)); backfillJob.sync(); - } catch (Exception ex) { + } catch (Throwable ex) { + LOG.error("Backfilling job failed " + ex.getMessage()); + backfillFailed.set(true); throw new RuntimeException(ex); } }); @@ -345,10 +357,48 @@ private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, */ if (expectConflict && e.getCause().getMessage().contains(ConcurrentModificationException.class.getName())) { // expected ConcurrentModificationException since ingestion & backfill will have overlapping writes + if (backfillFailed.get()) { + // if backfill job failed, shutdown the continuous job. + LOG.warn("Calling shutdown on ingestion job since the backfill job has failed for " + jobId); + ingestionJob.shutdownGracefully(); + } } else { + LOG.error("Conflict happened, but not expected " + e.getCause().getMessage()); throw e; } } } + class GetCommitsAfterInstant { + + String basePath; + String lastSuccessfulCommit; + HoodieTableMetaClient meta; + GetCommitsAfterInstant(String basePath, String lastSuccessfulCommit) { + this.basePath = basePath; + this.lastSuccessfulCommit = lastSuccessfulCommit; + meta = HoodieTableMetaClient.builder().setConf(fs().getConf()).setBasePath(basePath).build(); + } + + long getCommitsAfterInstant() { + HoodieTimeline timeline1 = meta.reloadActiveTimeline().getAllCommitsTimeline().findInstantsAfter(lastSuccessfulCommit); + // LOG.info("Timeline Instants=" + meta1.getActiveTimeline().getInstants().collect(Collectors.toList())); + return timeline1.getInstants().count(); + } + } + + private static void awaitCondition(GetCommitsAfterInstant callback) throws InterruptedException { + long startTime = System.currentTimeMillis(); + long soFar = 0; + while (soFar <= 5000) { + if (callback.getCommitsAfterInstant() > 0) { + break; + } else { + Thread.sleep(500); + soFar += 500; + } + } + LOG.warn("Awaiting completed in " + (System.currentTimeMillis() - startTime)); + } + }