Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ static void assertRecordCount(long expected, String tablePath, SQLContext sqlCon
assertEquals(expected, recordCount);
}

static void assertDistinctRecordCount(long expected, String tablePath, SQLContext sqlContext) {
sqlContext.clearCache();
long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).select("_hoodie_record_key").distinct().count();
assertEquals(expected, recordCount);
}

static List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) {
sqlContext.clearCache();
List<Row> rows = sqlContext.read().format("org.apache.hudi").load(tablePath)
Expand Down Expand Up @@ -358,12 +364,12 @@ static void assertAtLeastNReplaceCommits(int minExpected, String tablePath, File
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
}

static void assertNoReplaceCommits(int expected, String tablePath, FileSystem fs) {
static void assertNoReplaceCommits(String tablePath, FileSystem fs) {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline();
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numDeltaCommits = (int) timeline.getInstants().count();
assertEquals(expected, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + expected);
assertEquals(0, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + 0);
}

static void assertAtLeastNReplaceRequests(int minExpected, String tablePath, FileSystem fs) {
Expand Down Expand Up @@ -881,7 +887,7 @@ private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePa
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTime) throws Exception {
String tableBasePath = dfsBasePath + "/asyncClustering";
String tableBasePath = dfsBasePath + "/asyncClusteringJob";

HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "true");

Expand Down Expand Up @@ -916,19 +922,22 @@ public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTi
public void testAsyncClusteringService() throws Exception {
String tableBasePath = dfsBasePath + "/asyncClustering";
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
int totalRecords = 2000;

// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2"));
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
return true;
});
// There should be 4 commits, one of which should be a replace commit
TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
}

/**
Expand All @@ -941,40 +950,45 @@ public void testAsyncClusteringService() throws Exception {
public void testAsyncClusteringServiceWithConflicts() throws Exception {
String tableBasePath = dfsBasePath + "/asyncClusteringWithConflicts";
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
int totalRecords = 2000;

// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2"));
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
return true;
});
// There should be 4 commits, one of which should be a replace commit
TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
TestHelpers.assertDistinctRecordCount(1900, tableBasePath + "/*/*.parquet", sqlContext);
}

@ParameterizedTest
@ValueSource(strings = {"true", "false"})
public void testAsyncClusteringServiceWithCompaction(String preserveCommitMetadata) throws Exception {
@Test
public void testAsyncClusteringServiceWithCompaction() throws Exception {
String tableBasePath = dfsBasePath + "/asyncClusteringCompaction";
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
int totalRecords = 2000;

// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2", preserveCommitMetadata));
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
return true;
});
// There should be 4 commits, one of which should be a replace commit
TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
}

@ParameterizedTest
Expand Down Expand Up @@ -1057,11 +1071,11 @@ public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMod
}
case HoodieClusteringJob.SCHEDULE: {
TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, dfs);
TestHelpers.assertNoReplaceCommits(0, tableBasePath, dfs);
TestHelpers.assertNoReplaceCommits(tableBasePath, dfs);
return true;
}
case HoodieClusteringJob.EXECUTE: {
TestHelpers.assertNoReplaceCommits(0, tableBasePath, dfs);
TestHelpers.assertNoReplaceCommits(tableBasePath, dfs);
return true;
}
default:
Expand Down