From e8cd37ca38a784a75fdbeda73873794606efa2db Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Fri, 17 Dec 2021 08:05:22 -0800 Subject: [PATCH 1/2] Adding some test fixes to continuous mode multi writer tests --- .../functional/TestHoodieDeltaStreamer.java | 12 ++- ...estHoodieDeltaStreamerWithMultiWriter.java | 98 +++++++++++++++++-- 2 files changed, 101 insertions(+), 9 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 78ac5a5b08b91..f12abfbe56b21 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -313,7 +313,7 @@ static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String l static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); + HoodieTimeline timeline = meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); int numDeltaCommits = (int) timeline.getInstants().count(); assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); @@ -336,7 +336,7 @@ static void waitTillCondition(Function condition, Future dsFut boolean ret = false; while (!ret && !dsFuture.isDone()) { try { - Thread.sleep(5000); + Thread.sleep(3000); ret = condition.apply(true); } catch (Throwable error) { LOG.warn("Got error :", error); @@ -713,15 +713,21 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir } static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function condition) throws Exception { + deltaStreamerTestRunner(ds, cfg, condition, "single_ds_job"); + } + + static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function condition, String jobId) throws Exception { Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> { try { ds.sync(); } catch (Exception ex) { + LOG.warn("DS continuous job failed, hence not proceeding with condition check for " + jobId); throw new RuntimeException(ex.getMessage(), ex); } }); - + LOG.warn("waiting on condition " + condition.toString() + " for 360 secs for " + jobId); TestHelpers.waitTillCondition(condition, dsFuture, 360); + LOG.warn("XXX going to call shutdown gracefully for continuous job for " + jobId); ds.shutdownGracefully(); dsFuture.get(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java index 39ca00ac000a2..3ce6a4c48de81 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; @@ -35,6 +36,8 @@ import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs; import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; @@ -50,7 +53,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.hudi.common.testutils.FixtureUtils.prepareFixtureTable; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; @@ -73,6 +78,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness { private static final String COW_TEST_TABLE_NAME = "testtable_COPY_ON_WRITE"; + private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class); String basePath; String propsFilePath; @@ -105,19 +111,37 @@ void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) thr cfgBackfillJob.continuousMode = false; HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + LOG.warn("XXX total commits just before starting " + meta.reloadActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().getInstants().count() + ", last :: " + + meta.getActiveTimeline().filterCompletedInstants().lastInstant().get().toString()); + List instants = meta.getActiveTimeline().filterCompletedInstants().getInstants().collect(Collectors.toList()); + for (HoodieInstant instant: instants) { + LOG.warn("timeline instant " + instant.toString()); + } HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY); cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc()); + LOG.warn(":::: Checkpoint set for backfilling one time sync " + cfgBackfillJob.checkpoint); // re-init ingestion job to start sync service HoodieDeltaStreamer ingestionJob2 = new HoodieDeltaStreamer(cfgIngestionJob, jsc()); // run ingestion & backfill in parallel, create conflict and fail one runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2, - cfgIngestionJob, backfillJob, cfgBackfillJob, true); + cfgIngestionJob, backfillJob, cfgBackfillJob, true, "batch111"); + + System.out.println("\n\n\nXXX Stage 1 complete "); + + LOG.warn("YYY total commits after 1st stage" + meta.reloadActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().getInstants().count() + ", last :: " + + meta.getActiveTimeline().filterCompletedInstants().lastInstant().get().toString()); + instants = meta.getActiveTimeline().filterCompletedInstants().getInstants().collect(Collectors.toList()); + for (HoodieInstant instant: instants) { + LOG.warn("timeline instant " + instant.toString()); + } // create new ingestion & backfill job config to generate only INSERTS to avoid conflict props = prepareMultiWriterProps(fs(), basePath, propsFilePath); @@ -148,7 +172,15 @@ void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) thr // run ingestion & backfill in parallel, avoid conflict and succeed both runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob3, - cfgIngestionJob, backfillJob2, cfgBackfillJob, false); + cfgIngestionJob, backfillJob2, cfgBackfillJob, false, "batch222"); + + LOG.warn("ZZZ total commits after 2nd stage" + meta.reloadActiveTimeline().getCommitsTimeline() + .filterCompletedInstants().getInstants().count() + ", last :: " + + meta.getActiveTimeline().filterCompletedInstants().lastInstant().get().toString()); + instants = meta.getActiveTimeline().filterCompletedInstants().getInstants().collect(Collectors.toList()); + for (HoodieInstant instant: instants) { + LOG.warn("timeline instant " + instant.toString()); + } } @ParameterizedTest @@ -301,11 +333,12 @@ private void setUpTestTable(HoodieTableType tableType) throws IOException { private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords, HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob, - HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception { + HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict, String jobId) throws Exception { ExecutorService service = Executors.newFixedThreadPool(2); HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); String lastSuccessfulCommit = timeline.lastInstant().get().getTimestamp(); + LOG.warn("::: last successfull commit " + lastSuccessfulCommit + ", 3 expected more than this. "); // Condition for parallel ingestion job Function conditionForRegularIngestion = (r) -> { if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { @@ -318,37 +351,90 @@ private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, return true; }; + AtomicBoolean continousFailed = new AtomicBoolean(false); + AtomicBoolean backfillFailed = new AtomicBoolean(false); + AtomicBoolean callShutdown = new AtomicBoolean(true); try { Future regularIngestionJobFuture = service.submit(() -> { try { - deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion); - } catch (Exception ex) { + deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion, jobId); + } catch (Throwable ex) { + continousFailed.set(true); + LOG.warn("XXX Continuous job failed " + ex.getMessage()); throw new RuntimeException(ex); } }); Future backfillJobFuture = service.submit(() -> { try { + // trigger backfill atleast after 1 requested entry is added to timline from continuous job. If not, there is a chance that backfill will complete even before + // continous job starts. + awaitCondition(new GetCommitsAfterInstant(tableBasePath, lastSuccessfulCommit)); backfillJob.sync(); - } catch (Exception ex) { + } catch (Throwable ex) { + LOG.warn("XXX backfilling job failed " + ex.getMessage()); + backfillFailed.set(true); throw new RuntimeException(ex); } }); backfillJobFuture.get(); regularIngestionJobFuture.get(); + callShutdown.set(false); if (expectConflict) { Assertions.fail("Failed to handle concurrent writes"); } } catch (Exception e) { + callShutdown.set(false); /* * Need to perform getMessage().contains since the exception coming * from {@link org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.DeltaSyncService} gets wrapped many times into RuntimeExceptions. */ if (expectConflict && e.getCause().getMessage().contains(ConcurrentModificationException.class.getName())) { // expected ConcurrentModificationException since ingestion & backfill will have overlapping writes + LOG.warn("Expected conflict happend and hence no-op"); + if (backfillFailed.get()) { + // if backfill job failed, shutdown the continuous job. + LOG.warn("Calling shutdown on ingestion job since the backfill job has failed for " + jobId); + ingestionJob.shutdownGracefully(); + } } else { + LOG.warn("------ Conflict happend, but not expected " + e.getCause().getMessage()); throw e; } + } finally { + LOG.warn("XXXX Continuous " + continousFailed.get() + ", backkfull " + backfillFailed.get()); + } + } + + class GetCommitsAfterInstant { + + String basePath; + String lastSuccessfulCommit; + HoodieTableMetaClient meta; + GetCommitsAfterInstant(String basePath, String lastSuccessfulCommit) { + this.basePath = basePath; + this.lastSuccessfulCommit = lastSuccessfulCommit; + meta = HoodieTableMetaClient.builder().setConf(fs().getConf()).setBasePath(basePath).build(); + } + + long getCommitsAfterInstant() { + HoodieTimeline timeline1 = meta.reloadActiveTimeline().getAllCommitsTimeline().findInstantsAfter(lastSuccessfulCommit); + // LOG.info("Timeline Instants=" + meta1.getActiveTimeline().getInstants().collect(Collectors.toList())); + return timeline1.getInstants().count(); + } + } + + private static void awaitCondition(GetCommitsAfterInstant callback) throws InterruptedException { + long startTime = System.currentTimeMillis(); + long soFar = 0; + while (soFar <= 5000) { + if (callback.getCommitsAfterInstant() > 0) { + break; + } else { + Thread.sleep(500); + soFar += 500; + } } + LOG.warn("Awaiting completed in " + (System.currentTimeMillis() - startTime)); } } From 4d0c615aa128499237fe15ca7907f53b0df9f4d5 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Fri, 17 Dec 2021 11:01:06 -0800 Subject: [PATCH 2/2] Removing debug logging --- .../functional/TestHoodieDeltaStreamer.java | 2 - ...estHoodieDeltaStreamerWithMultiWriter.java | 70 +++++-------------- 2 files changed, 17 insertions(+), 55 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index f12abfbe56b21..be965512460cc 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -725,9 +725,7 @@ static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer. throw new RuntimeException(ex.getMessage(), ex); } }); - LOG.warn("waiting on condition " + condition.toString() + " for 360 secs for " + jobId); TestHelpers.waitTillCondition(condition, dsFuture, 360); - LOG.warn("XXX going to call shutdown gracefully for continuous job for " + jobId); ds.shutdownGracefully(); dsFuture.get(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java index 3ce6a4c48de81..7307dec2673d3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; @@ -55,7 +54,6 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; -import java.util.stream.Collectors; import static org.apache.hudi.common.testutils.FixtureUtils.prepareFixtureTable; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; @@ -91,7 +89,6 @@ void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) thr // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts setUpTestTable(tableType); prepareInitialConfigs(fs(), basePath, "foo"); - // enable carrying forward latest checkpoint TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); @@ -110,38 +107,20 @@ void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) thr propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); cfgBackfillJob.continuousMode = false; HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - LOG.warn("XXX total commits just before starting " + meta.reloadActiveTimeline().getCommitsTimeline() - .filterCompletedInstants().getInstants().count() + ", last :: " - + meta.getActiveTimeline().filterCompletedInstants().lastInstant().get().toString()); - List instants = meta.getActiveTimeline().filterCompletedInstants().getInstants().collect(Collectors.toList()); - for (HoodieInstant instant: instants) { - LOG.warn("timeline instant " + instant.toString()); - } + HoodieTimeline timeline = meta.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY); cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc()); - LOG.warn(":::: Checkpoint set for backfilling one time sync " + cfgBackfillJob.checkpoint); // re-init ingestion job to start sync service HoodieDeltaStreamer ingestionJob2 = new HoodieDeltaStreamer(cfgIngestionJob, jsc()); // run ingestion & backfill in parallel, create conflict and fail one runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2, - cfgIngestionJob, backfillJob, cfgBackfillJob, true, "batch111"); - - System.out.println("\n\n\nXXX Stage 1 complete "); - - LOG.warn("YYY total commits after 1st stage" + meta.reloadActiveTimeline().getCommitsTimeline() - .filterCompletedInstants().getInstants().count() + ", last :: " - + meta.getActiveTimeline().filterCompletedInstants().lastInstant().get().toString()); - instants = meta.getActiveTimeline().filterCompletedInstants().getInstants().collect(Collectors.toList()); - for (HoodieInstant instant: instants) { - LOG.warn("timeline instant " + instant.toString()); - } + cfgIngestionJob, backfillJob, cfgBackfillJob, true, "batch1"); // create new ingestion & backfill job config to generate only INSERTS to avoid conflict props = prepareMultiWriterProps(fs(), basePath, propsFilePath); @@ -149,38 +128,30 @@ void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) thr props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); props.setProperty("hoodie.test.source.generate.inserts", "true"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER); - cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT, + HoodieDeltaStreamer.Config cfgBackfillJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT, propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName())); - cfgBackfillJob.continuousMode = false; + cfgBackfillJob2.continuousMode = false; meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); commitMetadata = HoodieCommitMetadata .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); - cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY); - cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + cfgBackfillJob2.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY); + cfgBackfillJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfgBackfillJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); - cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, + HoodieDeltaStreamer.Config cfgIngestionJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName())); - cfgIngestionJob.continuousMode = true; - cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + cfgIngestionJob2.continuousMode = true; + cfgIngestionJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + cfgIngestionJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); // re-init ingestion job - HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob, jsc()); + HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob2, jsc()); // re-init backfill job - HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob, jsc()); + HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob2, jsc()); // run ingestion & backfill in parallel, avoid conflict and succeed both runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob3, - cfgIngestionJob, backfillJob2, cfgBackfillJob, false, "batch222"); - - LOG.warn("ZZZ total commits after 2nd stage" + meta.reloadActiveTimeline().getCommitsTimeline() - .filterCompletedInstants().getInstants().count() + ", last :: " - + meta.getActiveTimeline().filterCompletedInstants().lastInstant().get().toString()); - instants = meta.getActiveTimeline().filterCompletedInstants().getInstants().collect(Collectors.toList()); - for (HoodieInstant instant: instants) { - LOG.warn("timeline instant " + instant.toString()); - } + cfgIngestionJob2, backfillJob2, cfgBackfillJob2, false, "batch2"); } @ParameterizedTest @@ -338,7 +309,6 @@ private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); String lastSuccessfulCommit = timeline.lastInstant().get().getTimestamp(); - LOG.warn("::: last successfull commit " + lastSuccessfulCommit + ", 3 expected more than this. "); // Condition for parallel ingestion job Function conditionForRegularIngestion = (r) -> { if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { @@ -353,14 +323,13 @@ private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, AtomicBoolean continousFailed = new AtomicBoolean(false); AtomicBoolean backfillFailed = new AtomicBoolean(false); - AtomicBoolean callShutdown = new AtomicBoolean(true); try { Future regularIngestionJobFuture = service.submit(() -> { try { deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion, jobId); } catch (Throwable ex) { continousFailed.set(true); - LOG.warn("XXX Continuous job failed " + ex.getMessage()); + LOG.error("Continuous job failed " + ex.getMessage()); throw new RuntimeException(ex); } }); @@ -371,37 +340,32 @@ private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, awaitCondition(new GetCommitsAfterInstant(tableBasePath, lastSuccessfulCommit)); backfillJob.sync(); } catch (Throwable ex) { - LOG.warn("XXX backfilling job failed " + ex.getMessage()); + LOG.error("Backfilling job failed " + ex.getMessage()); backfillFailed.set(true); throw new RuntimeException(ex); } }); backfillJobFuture.get(); regularIngestionJobFuture.get(); - callShutdown.set(false); if (expectConflict) { Assertions.fail("Failed to handle concurrent writes"); } } catch (Exception e) { - callShutdown.set(false); /* * Need to perform getMessage().contains since the exception coming * from {@link org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.DeltaSyncService} gets wrapped many times into RuntimeExceptions. */ if (expectConflict && e.getCause().getMessage().contains(ConcurrentModificationException.class.getName())) { // expected ConcurrentModificationException since ingestion & backfill will have overlapping writes - LOG.warn("Expected conflict happend and hence no-op"); if (backfillFailed.get()) { // if backfill job failed, shutdown the continuous job. LOG.warn("Calling shutdown on ingestion job since the backfill job has failed for " + jobId); ingestionJob.shutdownGracefully(); } } else { - LOG.warn("------ Conflict happend, but not expected " + e.getCause().getMessage()); + LOG.error("Conflict happened, but not expected " + e.getCause().getMessage()); throw e; } - } finally { - LOG.warn("XXXX Continuous " + continousFailed.get() + ", backkfull " + backfillFailed.get()); } }