diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java b/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java index 3db8210cadee..86b7f4cc4573 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.config; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import java.io.Serializable; @@ -78,6 +79,10 @@ public String getString(String property, String defaultValue) { return containsKey(property) ? getProperty(property) : defaultValue; } + public Option getNonEmptyStringOpt(String property, String defaultValue) { + return Option.ofNullable(StringUtils.emptyToNull(getString(property, defaultValue))); + } + public List getStringList(String property, String delimiter, List defaultVal) { if (!containsKey(property)) { return defaultVal; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java index d09bad719167..a06130d39728 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java @@ -114,6 +114,7 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource { private final CloudDataFetcher gcsObjectDataFetcher; private final QueryRunner queryRunner; private final Option schemaProvider; + private final Option snapshotLoadQuerySplitter; public static final String GCS_OBJECT_KEY = "name"; @@ -145,6 +146,7 @@ public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, Sp this.gcsObjectDataFetcher = gcsObjectDataFetcher; this.queryRunner = queryRunner; this.schemaProvider = Option.ofNullable(schemaProvider); + this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props); LOG.info("srcPath: " + srcPath); LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy); @@ -171,8 +173,9 @@ public Pair>, String> fetchNextBatch(Option lastChec return Pair.of(Option.empty(), queryInfo.getStartInstant()); } - Dataset cloudObjectMetadataDF = queryRunner.run(queryInfo); - Dataset filteredSourceData = gcsObjectMetadataFetcher.applyFilter(cloudObjectMetadataDF); + Pair> queryInfoDatasetPair = queryRunner.run(queryInfo, snapshotLoadQuerySplitter); + Dataset filteredSourceData = gcsObjectMetadataFetcher.applyFilter(queryInfoDatasetPair.getRight()); + queryInfo = queryInfoDatasetPair.getLeft(); LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit); Pair>> checkPointAndDataset = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit( diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 1d302fa106b6..f87e5c231bf3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.config.HoodieIncrSourceConfig; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -56,7 +55,6 @@ import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger; -import static org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo; import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode; @@ -150,9 +148,7 @@ public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, Sp } } - this.snapshotLoadQuerySplitter = Option.ofNullable(props.getString(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, null)) - .map(className -> (SnapshotLoadQuerySplitter) ReflectionUtils.loadClass(className, - new Class[] {TypedProperties.class}, props)); + this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props); } @Override diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 4b9be847c756..325e494e0abe 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -80,6 +80,8 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { private final Option schemaProvider; + private final Option snapshotLoadQuerySplitter; + public static class Config { // control whether we do existence check for files before consuming them @Deprecated @@ -138,6 +140,7 @@ public S3EventsHoodieIncrSource( this.queryRunner = queryRunner; this.cloudDataFetcher = cloudDataFetcher; this.schemaProvider = Option.ofNullable(schemaProvider); + this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props); } @Override @@ -158,9 +161,9 @@ public Pair>, String> fetchNextBatch(Option lastChec LOG.warn("Already caught up. No new data to process"); return Pair.of(Option.empty(), queryInfo.getEndInstant()); } - - Dataset source = queryRunner.run(queryInfo); - Dataset filteredSourceData = applyFilter(source, fileFormat); + Pair> queryInfoDatasetPair = queryRunner.run(queryInfo, snapshotLoadQuerySplitter); + queryInfo = queryInfoDatasetPair.getLeft(); + Dataset filteredSourceData = applyFilter(queryInfoDatasetPair.getRight(), fileFormat); LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit); Pair>> checkPointAndDataset = diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java index 6a13607b1d5e..ca299122ec72 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/SnapshotLoadQuerySplitter.java @@ -20,10 +20,13 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import static org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME; + /** * Abstract splitter responsible for managing the snapshot load query operations. */ @@ -75,4 +78,10 @@ public QueryInfo getNextCheckpoint(Dataset df, QueryInfo queryInfo) { .map(checkpoint -> queryInfo.withUpdatedEndInstant(checkpoint)) .orElse(queryInfo); } + + public static Option getInstance(TypedProperties props) { + return props.getNonEmptyStringOpt(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, null) + .map(className -> (SnapshotLoadQuerySplitter) ReflectionUtils.loadClass(className, + new Class[] {TypedProperties.class}, props)); + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java index ef903d7c647e..2f0a8bf488e8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java @@ -21,9 +21,12 @@ import org.apache.hudi.DataSourceReadOptions; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.utilities.config.HoodieIncrSourceConfig; +import org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -62,16 +65,14 @@ public QueryRunner(SparkSession sparkSession, TypedProperties props) { * @param queryInfo all meta info about the query to be executed. * @return the output of the query as Dataset < Row >. */ - public Dataset run(QueryInfo queryInfo) { - Dataset dataset = null; + public Pair> run(QueryInfo queryInfo, Option snapshotLoadQuerySplitterOption) { if (queryInfo.isIncremental()) { - dataset = runIncrementalQuery(queryInfo); + return runIncrementalQuery(queryInfo); } else if (queryInfo.isSnapshot()) { - dataset = runSnapshotQuery(queryInfo); + return runSnapshotQuery(queryInfo, snapshotLoadQuerySplitterOption); } else { throw new HoodieException("Unknown query type " + queryInfo.getQueryType()); } - return dataset; } public static Dataset applyOrdering(Dataset dataset, List orderByColumns) { @@ -82,26 +83,34 @@ public static Dataset applyOrdering(Dataset dataset, List orde return dataset; } - public Dataset runIncrementalQuery(QueryInfo queryInfo) { + public Pair> runIncrementalQuery(QueryInfo queryInfo) { LOG.info("Running incremental query"); - return sparkSession.read().format("org.apache.hudi") + return Pair.of(queryInfo, sparkSession.read().format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()) .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryInfo.getPreviousInstant()) .option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant()) .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(), DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue())) - .load(sourcePath); + .load(sourcePath)); } - public Dataset runSnapshotQuery(QueryInfo queryInfo) { + public Pair> runSnapshotQuery(QueryInfo queryInfo, Option snapshotLoadQuerySplitterOption) { LOG.info("Running snapshot query"); - return sparkSession.read().format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()).load(sourcePath) + Dataset snapshot = sparkSession.read().format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()).load(sourcePath); + QueryInfo snapshotQueryInfo = snapshotLoadQuerySplitterOption + .map(snapshotLoadQuerySplitter -> snapshotLoadQuerySplitter.getNextCheckpoint(snapshot, queryInfo)) + .orElse(queryInfo); + return Pair.of(snapshotQueryInfo, applySnapshotQueryFilters(snapshot, snapshotQueryInfo)); + } + + public Dataset applySnapshotQueryFilters(Dataset snapshot, QueryInfo snapshotQueryInfo) { + return snapshot // add filtering so that only interested records are returned. .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - queryInfo.getStartInstant())) + snapshotQueryInfo.getStartInstant())) .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - queryInfo.getEndInstant())); + snapshotQueryInfo.getEndInstant())); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java index 5c31f310800b..bc2906d251fc 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -40,6 +40,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; +import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher; @@ -56,6 +57,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -93,6 +96,8 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn @Mock QueryRunner queryRunner; + @Mock + QueryInfo queryInfo; protected Option schemaProvider; private HoodieTableMetaClient metaClient; @@ -142,7 +147,7 @@ public void shouldFetchDataIfCommitTimeForReadsLessThanForWrites() throws IOExce filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1")); Dataset inputDs = generateDataset(filePathSizeAndCommitTime); - when(queryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); } @@ -160,7 +165,8 @@ public void testTwoFilesAndContinueInSameCommit() throws IOException { filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1")); Dataset inputDs = generateDataset(filePathSizeAndCommitTime); - when(queryRunner.run(Mockito.any())).thenReturn(inputDs); + + setMockQueryRunner(inputDs); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, "1#path/to/file2.json"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 250L, "1#path/to/file3.json"); @@ -183,7 +189,7 @@ public void largeBootstrapWithFilters() throws IOException { Dataset inputDs = generateDataset(filePathSizeAndCommitTime); - when(queryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, "1#path/to/file10006.json"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file10006.json"), 250L, "1#path/to/file10007.json"); @@ -205,7 +211,7 @@ public void testTwoFilesAndContinueAcrossCommits() throws IOException { Dataset inputDs = generateDataset(filePathSizeAndCommitTime); - when(queryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 100L, "1#path/to/file2.json"); @@ -213,10 +219,68 @@ public void testTwoFilesAndContinueAcrossCommits() throws IOException { readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json"); } + @ParameterizedTest + @CsvSource({ + "1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1", + "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2", + "3,3#path/to/file5.json,3,1#path/to/file1.json,3" + }) + public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, String exptected2, String exptected3, String exptected4) throws IOException { + + writeGcsMetadataRecords("1"); + writeGcsMetadataRecords("2"); + writeGcsMetadataRecords("3"); + + List> filePathSizeAndCommitTime = new ArrayList<>(); + // Add file paths and sizes to the list + filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 50L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 50L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 50L, "2")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 50L, "2")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 50L, "3")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "3")); + + Dataset inputDs = generateDataset(filePathSizeAndCommitTime); + + setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint)); + TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); + typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix", "path/to/skip"); + //1. snapshot query, read all records + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, typedProperties); + //2. incremental query, as commit is present in timeline + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, exptected2, typedProperties); + //3. snapshot query with source limit less than first commit size + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, typedProperties); + typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix", "path/to"); + //4. As snapshotQuery will return 1 -> same would be return as nextCheckpoint (dataset is empty due to ignore prefix). + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, typedProperties); + } + + private void setMockQueryRunner(Dataset inputDs) { + setMockQueryRunner(inputDs, Option.empty()); + } + + private void setMockQueryRunner(Dataset inputDs, Option nextCheckPointOpt) { + + when(queryRunner.run(Mockito.any(QueryInfo.class), Mockito.any())).thenAnswer(invocation -> { + QueryInfo queryInfo = invocation.getArgument(0); + QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint -> + queryInfo.withUpdatedEndInstant(nextCheckPoint)) + .orElse(queryInfo); + if (updatedQueryInfo.isSnapshot()) { + return Pair.of(updatedQueryInfo, + inputDs.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + updatedQueryInfo.getStartInstant())) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + updatedQueryInfo.getEndInstant()))); + } + return Pair.of(updatedQueryInfo, inputDs); + }); + } + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, - Option checkpointToPull, long sourceLimit, String expectedCheckpoint) { - TypedProperties typedProperties = setProps(missingCheckpointStrategy); - typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format", "json"); + Option checkpointToPull, long sourceLimit, String expectedCheckpoint, + TypedProperties typedProperties) { GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource(typedProperties, jsc(), spark(), schemaProvider.orElse(null), new GcsObjectMetadataFetcher(typedProperties, "json"), gcsObjectDataFetcher, queryRunner); @@ -230,6 +294,13 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe Assertions.assertEquals(expectedCheckpoint, nextCheckPoint); } + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, + Option checkpointToPull, long sourceLimit, String expectedCheckpoint) { + TypedProperties typedProperties = setProps(missingCheckpointStrategy); + typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format", "json"); + readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit, expectedCheckpoint, typedProperties); + } + private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, String bucketName, String generation) { String partitionPath = bucketName; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java index 7d58d21d874f..e0af8d73e269 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -40,6 +40,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; +import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; import org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelectorCommon; @@ -56,6 +57,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @@ -88,6 +91,8 @@ public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarne QueryRunner mockQueryRunner; @Mock CloudDataFetcher mockCloudDataFetcher; + @Mock + QueryInfo queryInfo; private JavaSparkContext jsc; private HoodieTableMetaClient metaClient; @@ -248,7 +253,7 @@ public void testOneFileInCommit() throws IOException { Dataset inputDs = generateDataset(filePathSizeAndCommitTime); - when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); @@ -273,7 +278,7 @@ public void testTwoFilesAndContinueInSameCommit() throws IOException { Dataset inputDs = generateDataset(filePathSizeAndCommitTime); - when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); @@ -301,7 +306,7 @@ public void testTwoFilesAndContinueAcrossCommits() throws IOException { Dataset inputDs = generateDataset(filePathSizeAndCommitTime); - when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); @@ -329,7 +334,7 @@ public void testEmptyDataAfterFilter() throws IOException { Dataset inputDs = generateDataset(filePathSizeAndCommitTime); - when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to/skip"); @@ -361,7 +366,7 @@ public void testFilterAnEntireCommit() throws IOException { Dataset inputDs = generateDataset(filePathSizeAndCommitTime); - when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); @@ -393,7 +398,7 @@ public void testFilterAnEntireMiddleCommit() throws IOException { Dataset inputDs = generateDataset(filePathSizeAndCommitTime); - when(mockQueryRunner.run(Mockito.any())).thenReturn(inputDs); + setMockQueryRunner(inputDs); when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) .thenReturn(Option.empty()); TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); @@ -407,6 +412,45 @@ public void testFilterAnEntireMiddleCommit() throws IOException { readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 50L, "3#path/to/file4.json", typedProperties); } + @ParameterizedTest + @CsvSource({ + "1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1", + "2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2", + "3,3#path/to/file5.json,3,1#path/to/file1.json,3" + }) + public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, String exptected2, String exptected3, String exptected4) throws IOException { + + writeS3MetadataRecords("1"); + writeS3MetadataRecords("2"); + writeS3MetadataRecords("3"); + + List> filePathSizeAndCommitTime = new ArrayList<>(); + // Add file paths and sizes to the list + filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 50L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 50L, "1")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 50L, "2")); + filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 50L, "2")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 50L, "3")); + filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "3")); + + Dataset inputDs = generateDataset(filePathSizeAndCommitTime); + + setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint)); + when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(), Mockito.any(), Mockito.any(), eq(schemaProvider))) + .thenReturn(Option.empty()); + TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT); + typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to/skip"); + //1. snapshot query, read all records + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, typedProperties); + //2. incremental query, as commit is present in timeline + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, exptected2, typedProperties); + //3. snapshot query with source limit less than first commit size + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, typedProperties); + typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix", "path/to"); + //4. As snapshotQuery will return 1 -> same would be return as nextCheckpoint (dataset is empty due to ignore prefix). + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, typedProperties); + } + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option checkpointToPull, long sourceLimit, String expectedCheckpoint, TypedProperties typedProperties) { @@ -422,6 +466,28 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe Assertions.assertEquals(expectedCheckpoint, nextCheckPoint); } + private void setMockQueryRunner(Dataset inputDs) { + setMockQueryRunner(inputDs, Option.empty()); + } + + private void setMockQueryRunner(Dataset inputDs, Option nextCheckPointOpt) { + + when(mockQueryRunner.run(Mockito.any(QueryInfo.class), Mockito.any())).thenAnswer(invocation -> { + QueryInfo queryInfo = invocation.getArgument(0); + QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint -> + queryInfo.withUpdatedEndInstant(nextCheckPoint)) + .orElse(queryInfo); + if (updatedQueryInfo.isSnapshot()) { + return Pair.of(updatedQueryInfo, + inputDs.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + updatedQueryInfo.getStartInstant())) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + updatedQueryInfo.getEndInstant()))); + } + return Pair.of(updatedQueryInfo, inputDs); + }); + } + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option checkpointToPull, long sourceLimit, String expectedCheckpoint) { TypedProperties typedProperties = setProps(missingCheckpointStrategy);