From 4871c93376740dfc1d53ed7942d4eb96d8c1f0b7 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Tue, 7 Dec 2021 11:06:43 +0530 Subject: [PATCH 1/2] [HUDI-2936] Add data count checks in async clustering tests --- .../functional/TestHoodieDeltaStreamer.java | 52 ++++++++++++------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index b3cf7b28fa20..23fe54c27f6a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -257,6 +257,12 @@ static void assertRecordCount(long expected, String tablePath, SQLContext sqlCon assertEquals(expected, recordCount); } + static void assertDistinctRecordCount(long expected, String tablePath, SQLContext sqlContext) { + sqlContext.clearCache(); + long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).select("_hoodie_record_key").distinct().count(); + assertEquals(expected, recordCount); + } + static List countsPerCommit(String tablePath, SQLContext sqlContext) { sqlContext.clearCache(); List rows = sqlContext.read().format("org.apache.hudi").load(tablePath) @@ -358,12 +364,12 @@ static void assertAtLeastNReplaceCommits(int minExpected, String tablePath, File assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); } - static void assertNoReplaceCommits(int expected, String tablePath, FileSystem fs) { + static void assertNoReplaceCommits(String tablePath, FileSystem fs) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); int numDeltaCommits = (int) timeline.getInstants().count(); - assertEquals(expected, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + expected); + assertEquals(0, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + 0); } static void assertAtLeastNReplaceRequests(int minExpected, String tablePath, FileSystem fs) { @@ -916,19 +922,22 @@ public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTi public void testAsyncClusteringService() throws Exception { String tableBasePath = dfsBasePath + "/asyncClustering"; // Keep it higher than batch-size to test continuous mode - int totalRecords = 3000; + int totalRecords = 2000; // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); cfg.continuousMode = true; cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); - cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2")); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { - TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); - TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs); return true; }); + // There should be 4 commits, one of which should be a replace commit + TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs); + TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); } /** @@ -941,40 +950,45 @@ public void testAsyncClusteringService() throws Exception { public void testAsyncClusteringServiceWithConflicts() throws Exception { String tableBasePath = dfsBasePath + "/asyncClusteringWithConflicts"; // Keep it higher than batch-size to test continuous mode - int totalRecords = 3000; + int totalRecords = 2000; // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); cfg.continuousMode = true; cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); - cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2")); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { - TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); - TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs); return true; }); + // There should be 4 commits, one of which should be a replace commit + TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs); + TestHelpers.assertDistinctRecordCount(1900, tableBasePath + "/*/*.parquet", sqlContext); } - @ParameterizedTest - @ValueSource(strings = {"true", "false"}) - public void testAsyncClusteringServiceWithCompaction(String preserveCommitMetadata) throws Exception { + @Test + public void testAsyncClusteringServiceWithCompaction() throws Exception { String tableBasePath = dfsBasePath + "/asyncClusteringCompaction"; // Keep it higher than batch-size to test continuous mode - int totalRecords = 3000; + int totalRecords = 2000; // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); cfg.continuousMode = true; cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); - cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2", preserveCommitMetadata)); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "3")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { - TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs); - TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs); return true; }); + // There should be 4 commits, one of which should be a replace commit + TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs); + TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); } @ParameterizedTest @@ -1057,11 +1071,11 @@ public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMod } case HoodieClusteringJob.SCHEDULE: { TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, dfs); - TestHelpers.assertNoReplaceCommits(0, tableBasePath, dfs); + TestHelpers.assertNoReplaceCommits(tableBasePath, dfs); return true; } case HoodieClusteringJob.EXECUTE: { - TestHelpers.assertNoReplaceCommits(0, tableBasePath, dfs); + TestHelpers.assertNoReplaceCommits(tableBasePath, dfs); return true; } default: From d95c19d29bedcd1b86fc4def758813cde25e1d09 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 10 Dec 2021 14:08:22 +0530 Subject: [PATCH 2/2] No two basePaths should be same for clustering tests --- .../hudi/utilities/functional/TestHoodieDeltaStreamer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 23fe54c27f6a..78ac5a5b08b9 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -887,7 +887,7 @@ private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePa @ParameterizedTest @ValueSource(booleans = {true, false}) public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTime) throws Exception { - String tableBasePath = dfsBasePath + "/asyncClustering"; + String tableBasePath = dfsBasePath + "/asyncClusteringJob"; HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "true");