Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String l

static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
HoodieTimeline timeline = meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numDeltaCommits = (int) timeline.getInstants().count();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
Expand All @@ -336,7 +336,7 @@ static void waitTillCondition(Function<Boolean, Boolean> condition, Future dsFut
boolean ret = false;
while (!ret && !dsFuture.isDone()) {
try {
Thread.sleep(5000);
Thread.sleep(3000);
ret = condition.apply(true);
} catch (Throwable error) {
LOG.warn("Got error :", error);
Expand Down Expand Up @@ -713,14 +713,18 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir
}

static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition) throws Exception {
deltaStreamerTestRunner(ds, cfg, condition, "single_ds_job");
}

static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition, String jobId) throws Exception {
Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
try {
ds.sync();
} catch (Exception ex) {
LOG.warn("DS continuous job failed, hence not proceeding with condition check for " + jobId);
throw new RuntimeException(ex.getMessage(), ex);
}
});

TestHelpers.waitTillCondition(condition, dsFuture, 360);
ds.shutdownGracefully();
dsFuture.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;

import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -50,6 +52,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.apache.hudi.common.testutils.FixtureUtils.prepareFixtureTable;
Expand All @@ -73,6 +76,7 @@
public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness {

private static final String COW_TEST_TABLE_NAME = "testtable_COPY_ON_WRITE";
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class);

String basePath;
String propsFilePath;
Expand All @@ -85,7 +89,6 @@ void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) thr
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
setUpTestTable(tableType);
prepareInitialConfigs(fs(), basePath, "foo");
// enable carrying forward latest checkpoint
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
Expand All @@ -104,7 +107,7 @@ void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) thr
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
cfgBackfillJob.continuousMode = false;
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieTimeline timeline = meta.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
Expand All @@ -117,38 +120,38 @@ void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) thr

// run ingestion & backfill in parallel, create conflict and fail one
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2,
cfgIngestionJob, backfillJob, cfgBackfillJob, true);
cfgIngestionJob, backfillJob, cfgBackfillJob, true, "batch1");

// create new ingestion & backfill job config to generate only INSERTS to avoid conflict
props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
props.setProperty("hoodie.test.source.generate.inserts", "true");
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT,
HoodieDeltaStreamer.Config cfgBackfillJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT,
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
cfgBackfillJob.continuousMode = false;
cfgBackfillJob2.continuousMode = false;
meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
cfgBackfillJob2.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));

cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
HoodieDeltaStreamer.Config cfgIngestionJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
cfgIngestionJob2.continuousMode = true;
cfgIngestionJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
// re-init ingestion job
HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob, jsc());
HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob2, jsc());
// re-init backfill job
HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob2, jsc());

// run ingestion & backfill in parallel, avoid conflict and succeed both
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob3,
cfgIngestionJob, backfillJob2, cfgBackfillJob, false);
cfgIngestionJob2, backfillJob2, cfgBackfillJob2, false, "batch2");
}

@ParameterizedTest
Expand Down Expand Up @@ -301,7 +304,7 @@ private void setUpTestTable(HoodieTableType tableType) throws IOException {

private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords,
HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob,
HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception {
HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict, String jobId) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(2);
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
Expand All @@ -318,18 +321,27 @@ private void runJobsInParallel(String tableBasePath, HoodieTableType tableType,
return true;
};

AtomicBoolean continousFailed = new AtomicBoolean(false);
AtomicBoolean backfillFailed = new AtomicBoolean(false);
try {
Future regularIngestionJobFuture = service.submit(() -> {
try {
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion);
} catch (Exception ex) {
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion, jobId);
} catch (Throwable ex) {
continousFailed.set(true);
LOG.error("Continuous job failed " + ex.getMessage());
throw new RuntimeException(ex);
}
});
Future backfillJobFuture = service.submit(() -> {
try {
// trigger backfill atleast after 1 requested entry is added to timline from continuous job. If not, there is a chance that backfill will complete even before
// continous job starts.
awaitCondition(new GetCommitsAfterInstant(tableBasePath, lastSuccessfulCommit));
backfillJob.sync();
} catch (Exception ex) {
} catch (Throwable ex) {
LOG.error("Backfilling job failed " + ex.getMessage());
backfillFailed.set(true);
throw new RuntimeException(ex);
}
});
Expand All @@ -345,10 +357,48 @@ private void runJobsInParallel(String tableBasePath, HoodieTableType tableType,
*/
if (expectConflict && e.getCause().getMessage().contains(ConcurrentModificationException.class.getName())) {
// expected ConcurrentModificationException since ingestion & backfill will have overlapping writes
if (backfillFailed.get()) {
// if backfill job failed, shutdown the continuous job.
LOG.warn("Calling shutdown on ingestion job since the backfill job has failed for " + jobId);
ingestionJob.shutdownGracefully();
}
} else {
LOG.error("Conflict happened, but not expected " + e.getCause().getMessage());
throw e;
}
}
}

class GetCommitsAfterInstant {

String basePath;
String lastSuccessfulCommit;
HoodieTableMetaClient meta;
GetCommitsAfterInstant(String basePath, String lastSuccessfulCommit) {
this.basePath = basePath;
this.lastSuccessfulCommit = lastSuccessfulCommit;
meta = HoodieTableMetaClient.builder().setConf(fs().getConf()).setBasePath(basePath).build();
}

long getCommitsAfterInstant() {
HoodieTimeline timeline1 = meta.reloadActiveTimeline().getAllCommitsTimeline().findInstantsAfter(lastSuccessfulCommit);
// LOG.info("Timeline Instants=" + meta1.getActiveTimeline().getInstants().collect(Collectors.toList()));
return timeline1.getInstants().count();
}
}

private static void awaitCondition(GetCommitsAfterInstant callback) throws InterruptedException {
long startTime = System.currentTimeMillis();
long soFar = 0;
while (soFar <= 5000) {
if (callback.getCommitsAfterInstant() > 0) {
break;
} else {
Thread.sleep(500);
soFar += 500;
}
}
LOG.warn("Awaiting completed in " + (System.currentTimeMillis() - startTime));
}

}