From 67a1a487b2dfbd35c6681c46e46e9a4052721b16 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 11 Nov 2021 12:07:40 +0530 Subject: [PATCH 1/3] [HUDI-2731] Make clustering work regardless of whether there are base files --- .../MultipleSparkJobExecutionStrategy.java | 30 +++-- ...HoodieSparkMergeOnReadTableClustering.java | 119 +++++++++++++++--- .../SparkClientFunctionalTestHarness.java | 4 +- 3 files changed, 125 insertions(+), 28 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 044b77362010..9d3b7d5e031d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -36,6 +36,7 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; @@ -63,6 +64,7 @@ import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -70,6 +72,7 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader; import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; @@ -191,7 +194,6 @@ private JavaRDD> readRecordsForGroupWithLogs(JavaSparkContext js LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction); try { Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())); HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(table.getMetaClient().getFs()) .withBasePath(table.getMetaClient().getBasePath()) @@ -205,12 +207,26 @@ private JavaRDD> readRecordsForGroupWithLogs(JavaSparkContext js .withSpillableMapBasePath(config.getSpillableMapBasePath()) .build(); - HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); - recordIterators.add(getFileSliceReader(baseFileReader, scanner, readerSchema, - tableConfig.getPayloadClass(), - tableConfig.getPreCombineField(), - tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), - tableConfig.getPartitionFieldProp())))); + if (!StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())) { + HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())); + HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); + recordIterators.add(getFileSliceReader(baseFileReader, scanner, readerSchema, + tableConfig.getPayloadClass(), + tableConfig.getPreCombineField(), + tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), + tableConfig.getPartitionFieldProp())))); + } else { + // Since there is no base file, fall back to reading log files + Iterable> iterable = () -> scanner.iterator(); + recordIterators.add(StreamSupport.stream(iterable.spliterator(), false) + .map(e -> { + try { + return transform((IndexedRecord) e.getData().getInsertValue(readerSchema).get()); + } catch (IOException io) { + throw new UncheckedIOException(io); + } + }).iterator()); + } } catch (IOException e) { throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java index 8f7a500d13f1..c0dc561e34a6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java @@ -144,28 +144,109 @@ void testClustering(boolean doUpdates, boolean populateMetaFields, boolean prese assertEquals(allFiles.length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count()); // Do the clustering and validate - client.cluster(clusteringCommitTime, true); + doClusteringAndValidate(client, clusteringCommitTime, metaClient, cfg, dataGen); + } + } - metaClient = HoodieTableMetaClient.reload(metaClient); - final HoodieTable clusteredTable = HoodieSparkTable.create(cfg, context(), metaClient); - clusteredTable.getHoodieView().sync(); - Stream dataFilesToRead = Arrays.stream(dataGen.getPartitionPaths()) - .flatMap(p -> clusteredTable.getBaseFileOnlyView().getLatestBaseFiles(p)); - // verify there should be only one base file per partition after clustering. - assertEquals(dataGen.getPartitionPaths().length, dataFilesToRead.count()); - - HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants(); - assertEquals(1, timeline.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(), - "Expecting a single commit."); - assertEquals(clusteringCommitTime, timeline.lastInstant().get().getTimestamp()); - assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, timeline.lastInstant().get().getAction()); - if (cfg.populateMetaFields()) { - assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.of("000")), - "Must contain 200 records"); - } else { - assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.empty())); + private static Stream testClusteringWithNoBaseFiles() { + return Stream.of( + Arguments.of(true, true), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false) + ); + } + + @ParameterizedTest + @MethodSource + void testClusteringWithNoBaseFiles(boolean doUpdates, boolean preserveCommitMetadata) throws Exception { + // set low compaction small File Size to generate more file groups. + HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder() + .forTable("test-trip-table") + .withPath(basePath()) + .withSchema(TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withDeleteParallelism(2) + .withAutoCommit(true) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .compactionSmallFileSize(10L) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder() + .hfileMaxFileSize(1024 * 1024 * 1024) + .parquetMaxFileSize(1024 * 1024 * 1024).build()) + .withEmbeddedTimelineServerEnabled(true) + .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() + .withEnableBackupForRemoteFileSystemView(false).build()) + // set index type to INMEMORY so that log files can be indexed, and it is safe to send + // inserts straight to the log to produce file slices with only log files and no data files + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) + .withClusteringConfig(HoodieClusteringConfig.newBuilder() + .withClusteringMaxNumGroups(10) + .withClusteringTargetPartitions(0) + .withInlineClustering(true) + .withInlineClusteringNumCommits(1) + .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build()) + .withRollbackUsingMarkers(false); + HoodieWriteConfig cfg = cfgBuilder.build(); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, cfg.getProps()); + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + // test 2 inserts + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInserts(newCommitTime, 400); + insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime); + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime); + // run updates + if (doUpdates) { + newCommitTime = "003"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, 100); + updateRecords(metaClient, records, client, cfg, newCommitTime); } + + HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); + hoodieTable.getHoodieView().sync(); + FileStatus[] allBaseFiles = listAllBaseFilesInPath(hoodieTable); + // expect 0 base files for each partition + assertEquals(0, allBaseFiles.length); + + String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString(); + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); + // verify log files are included in clustering plan for each partition. + assertEquals(dataGen.getPartitionPaths().length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count()); + + // do the clustering and validate + doClusteringAndValidate(client, clusteringCommitTime, metaClient, cfg, dataGen); } } + private void doClusteringAndValidate(SparkRDDWriteClient client, + String clusteringCommitTime, + HoodieTableMetaClient metaClient, + HoodieWriteConfig cfg, + HoodieTestDataGenerator dataGen) { + client.cluster(clusteringCommitTime, true); + metaClient = HoodieTableMetaClient.reload(metaClient); + final HoodieTable clusteredTable = HoodieSparkTable.create(cfg, context(), metaClient); + clusteredTable.getHoodieView().sync(); + Stream dataFilesToRead = Arrays.stream(dataGen.getPartitionPaths()) + .flatMap(p -> clusteredTable.getBaseFileOnlyView().getLatestBaseFiles(p)); + assertEquals(dataGen.getPartitionPaths().length, dataFilesToRead.count()); + HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants(); + assertEquals(1, timeline.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(), + "Expecting a single commit."); + assertEquals(clusteringCommitTime, timeline.lastInstant().get().getTimestamp()); + assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, timeline.lastInstant().get().getAction()); + if (cfg.populateMetaFields()) { + assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.of("000")), + "Must contain 200 records"); + } else { + assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.empty())); + } + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index 79fbdcaad93a..389a3e7bd469 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -228,8 +228,8 @@ protected void insertRecords(HoodieTableMetaClient metaClient, List records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException { From 17fa13ed0fc6589f6ed7b06bd743c0caa73a79aa Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 18 Nov 2021 21:46:13 +0530 Subject: [PATCH 2/3] Refactor and make file slice reader more reusable --- .../MultipleSparkJobExecutionStrategy.java | 31 ++++--------- ...HoodieSparkMergeOnReadTableClustering.java | 17 ++----- .../table/log/HoodieFileSliceReader.java | 46 +++++++++++++------ 3 files changed, 46 insertions(+), 48 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 9d3b7d5e031d..69e21bec9dde 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -64,7 +64,6 @@ import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -72,7 +71,6 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; -import java.util.stream.StreamSupport; import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader; import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; @@ -207,26 +205,15 @@ private JavaRDD> readRecordsForGroupWithLogs(JavaSparkContext js .withSpillableMapBasePath(config.getSpillableMapBasePath()) .build(); - if (!StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())) { - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())); - HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); - recordIterators.add(getFileSliceReader(baseFileReader, scanner, readerSchema, - tableConfig.getPayloadClass(), - tableConfig.getPreCombineField(), - tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), - tableConfig.getPartitionFieldProp())))); - } else { - // Since there is no base file, fall back to reading log files - Iterable> iterable = () -> scanner.iterator(); - recordIterators.add(StreamSupport.stream(iterable.spliterator(), false) - .map(e -> { - try { - return transform((IndexedRecord) e.getData().getInsertValue(readerSchema).get()); - } catch (IOException io) { - throw new UncheckedIOException(io); - } - }).iterator()); - } + Option baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) + ? Option.empty() + : Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()))); + HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); + recordIterators.add(getFileSliceReader(baseFileReader, scanner, readerSchema, + tableConfig.getPayloadClass(), + tableConfig.getPreCombineField(), + tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), + tableConfig.getPartitionFieldProp())))); } catch (IOException e) { throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + " and " + clusteringOp.getDeltaFilePaths(), e); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java index c0dc561e34a6..11f6485104bd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java @@ -45,6 +45,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.util.Arrays; import java.util.List; @@ -148,18 +149,9 @@ void testClustering(boolean doUpdates, boolean populateMetaFields, boolean prese } } - private static Stream testClusteringWithNoBaseFiles() { - return Stream.of( - Arguments.of(true, true), - Arguments.of(true, false), - Arguments.of(false, true), - Arguments.of(false, false) - ); - } - @ParameterizedTest - @MethodSource - void testClusteringWithNoBaseFiles(boolean doUpdates, boolean preserveCommitMetadata) throws Exception { + @ValueSource(booleans = {true, false}) + void testClusteringWithNoBaseFiles(boolean doUpdates) throws Exception { // set low compaction small File Size to generate more file groups. HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder() .forTable("test-trip-table") @@ -184,8 +176,7 @@ void testClusteringWithNoBaseFiles(boolean doUpdates, boolean preserveCommitMeta .withClusteringMaxNumGroups(10) .withClusteringTargetPartitions(0) .withInlineClustering(true) - .withInlineClusteringNumCommits(1) - .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build()) + .withInlineClusteringNumCommits(1).build()) .withRollbackUsingMarkers(false); HoodieWriteConfig cfg = cfgBuilder.build(); HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, cfg.getProps()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java index f7a7acfa9022..d8d00acb1ae0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java @@ -23,33 +23,53 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; import java.io.IOException; import java.util.Iterator; +import java.util.stream.StreamSupport; /** * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. */ public class HoodieFileSliceReader implements Iterator> { - private Iterator> recordsIterator; + private final Iterator> recordsIterator; - public static HoodieFileSliceReader getFileSliceReader( - HoodieFileReader baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, - String preCombineField, Option> simpleKeyGenFieldsOpt) throws IOException { - Iterator baseIterator = baseFileReader.getRecordIterator(schema); - while (baseIterator.hasNext()) { - GenericRecord record = (GenericRecord) baseIterator.next(); - HoodieRecord hoodieRecord = simpleKeyGenFieldsOpt.isPresent() - ? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField()) - : SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, scanner.isWithOperationField()); - scanner.processNextRecord(hoodieRecord); + public static HoodieFileSliceReader getFileSliceReader( + Option baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass, + String preCombineField, Option> simpleKeyGenFieldsOpt) throws IOException { + if (baseFileReader.isPresent()) { + Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); + while (baseIterator.hasNext()) { + GenericRecord record = (GenericRecord) baseIterator.next(); + HoodieRecord hoodieRecord = transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); + scanner.processNextRecord(hoodieRecord); + } + return new HoodieFileSliceReader(scanner.iterator()); + } else { + Iterable> iterable = () -> scanner.iterator(); + return new HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false) + .map(e -> { + try { + GenericRecord record = (GenericRecord) e.getData().getInsertValue(schema).get(); + return transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); + } catch (IOException io) { + throw new HoodieIOException("Error while creating reader for file slice with no base file.", io); + } + }).iterator()); } - return new HoodieFileSliceReader(scanner.iterator()); + } + + private static HoodieRecord transform( + GenericRecord record, HoodieMergedLogRecordScanner scanner, String payloadClass, + String preCombineField, Option> simpleKeyGenFieldsOpt) { + return simpleKeyGenFieldsOpt.isPresent() + ? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField()) + : SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, scanner.isWithOperationField()); } private HoodieFileSliceReader(Iterator> recordsItr) { From c72130d3c750d33d2245811752c5041a159ab26d Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 19 Nov 2021 12:36:55 +0530 Subject: [PATCH 3/3] Revert a test change and assert data files in tests Fix checkstyle violation --- .../TestHoodieSparkMergeOnReadTableClustering.java | 13 +++++++++---- ...tHoodieSparkMergeOnReadTableIncrementalRead.java | 8 ++++++-- ...odieSparkMergeOnReadTableInsertUpdateDelete.java | 3 ++- .../testutils/SparkClientFunctionalTestHarness.java | 6 +++--- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java index 11f6485104bd..f0ece849c058 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java @@ -53,6 +53,7 @@ import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; @Tag("functional") class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTestHarness { @@ -112,7 +113,8 @@ void testClustering(boolean doUpdates, boolean populateMetaFields, boolean prese client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 400); - insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime); + Stream dataFiles = insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime); + assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit"); /* * Write 2 (more inserts to create new files) @@ -120,7 +122,8 @@ void testClustering(boolean doUpdates, boolean populateMetaFields, boolean prese // we already set small file size to small number to force inserts to go into new file. newCommitTime = "002"; client.startCommitWithTime(newCommitTime); - insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime); + dataFiles = insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime); + assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit"); if (doUpdates) { /* @@ -187,10 +190,12 @@ void testClusteringWithNoBaseFiles(boolean doUpdates) throws Exception { String newCommitTime = "001"; client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 400); - insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime); + Stream dataFiles = insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime); + assertTrue(!dataFiles.findAny().isPresent(), "should not have any base files"); newCommitTime = "002"; client.startCommitWithTime(newCommitTime); - insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime); + dataFiles = insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime); + assertTrue(!dataFiles.findAny().isPresent(), "should not have any base files"); // run updates if (doUpdates) { newCommitTime = "003"; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java index fd2f63a26c63..db55d36876a0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java @@ -21,6 +21,7 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; @@ -57,6 +58,7 @@ import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -94,7 +96,8 @@ public void testIncrementalReadsWithCompaction() throws Exception { client.startCommitWithTime(commitTime1); List records001 = dataGen.generateInserts(commitTime1, 200); - insertRecords(metaClient, records001, client, cfg, commitTime1); + Stream dataFiles = insertRecords(metaClient, records001, client, cfg, commitTime1); + assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit"); // verify only one base file shows up with commit time 001 FileStatus[] snapshotROFiles = getROSnapshotFiles(partitionPath); @@ -142,7 +145,8 @@ public void testIncrementalReadsWithCompaction() throws Exception { String insertsTime = "006"; List records006 = dataGen.generateInserts(insertsTime, 200); client.startCommitWithTime(insertsTime); - insertRecords(metaClient, records006, client, cfg, insertsTime); + dataFiles = insertRecords(metaClient, records006, client, cfg, insertsTime); + assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit"); // verify new write shows up in snapshot mode even though there is pending compaction snapshotROFiles = getROSnapshotFiles(partitionPath); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java index fb44c14f59ad..254d75779f99 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java @@ -95,7 +95,8 @@ public void testSimpleInsertAndUpdate(HoodieFileFormat fileFormat, boolean popul client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 200); - insertRecords(metaClient, records, client, cfg, newCommitTime); + Stream dataFiles = insertRecords(metaClient, records, client, cfg, newCommitTime); + assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit"); /* * Write 2 (updates) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index 389a3e7bd469..40ef54b14cb6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -203,7 +203,8 @@ protected JavaRDD updateLocation( index.updateLocation(HoodieJavaRDD.of(writeStatus), context, table)); } - protected void insertRecords(HoodieTableMetaClient metaClient, List records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException { + protected Stream insertRecords(HoodieTableMetaClient metaClient, List records, + SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException { HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient); JavaRDD writeRecords = jsc().parallelize(records, 1); @@ -228,8 +229,7 @@ protected void insertRecords(HoodieTableMetaClient metaClient, List records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {