Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.config;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import java.io.Serializable;
Expand Down Expand Up @@ -78,6 +79,10 @@ public String getString(String property, String defaultValue) {
return containsKey(property) ? getProperty(property) : defaultValue;
}

public Option<String> getNonEmptyStringOpt(String property, String defaultValue) {
return Option.ofNullable(StringUtils.emptyToNull(getString(property, defaultValue)));
}

public List<String> getStringList(String property, String delimiter, List<String> defaultVal) {
if (!containsKey(property)) {
return defaultVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class GcsEventsHoodieIncrSource extends HoodieIncrSource {
private final CloudDataFetcher gcsObjectDataFetcher;
private final QueryRunner queryRunner;
private final Option<SchemaProvider> schemaProvider;
private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;


public static final String GCS_OBJECT_KEY = "name";
Expand Down Expand Up @@ -145,6 +146,7 @@ public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, Sp
this.gcsObjectDataFetcher = gcsObjectDataFetcher;
this.queryRunner = queryRunner;
this.schemaProvider = Option.ofNullable(schemaProvider);
this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props);

LOG.info("srcPath: " + srcPath);
LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy);
Expand All @@ -171,8 +173,9 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastChec
return Pair.of(Option.empty(), queryInfo.getStartInstant());
}

Dataset<Row> cloudObjectMetadataDF = queryRunner.run(queryInfo);
Dataset<Row> filteredSourceData = gcsObjectMetadataFetcher.applyFilter(cloudObjectMetadataDF);
Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = queryRunner.run(queryInfo, snapshotLoadQuerySplitter);
Dataset<Row> filteredSourceData = gcsObjectMetadataFetcher.applyFilter(queryInfoDatasetPair.getRight());
queryInfo = queryInfoDatasetPair.getLeft();
LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit);
Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset =
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
Expand Down Expand Up @@ -56,7 +55,6 @@
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
import static org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME;
import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.generateQueryInfo;
import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getHollowCommitHandleMode;

Expand Down Expand Up @@ -150,9 +148,7 @@ public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, Sp
}
}

this.snapshotLoadQuerySplitter = Option.ofNullable(props.getString(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, null))
.map(className -> (SnapshotLoadQuerySplitter) ReflectionUtils.loadClass(className,
new Class<?>[] {TypedProperties.class}, props));
this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {

private final Option<SchemaProvider> schemaProvider;

private final Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitter;

public static class Config {
// control whether we do existence check for files before consuming them
@Deprecated
Expand Down Expand Up @@ -138,6 +140,7 @@ public S3EventsHoodieIncrSource(
this.queryRunner = queryRunner;
this.cloudDataFetcher = cloudDataFetcher;
this.schemaProvider = Option.ofNullable(schemaProvider);
this.snapshotLoadQuerySplitter = SnapshotLoadQuerySplitter.getInstance(props);
}

@Override
Expand All @@ -158,9 +161,9 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastChec
LOG.warn("Already caught up. No new data to process");
return Pair.of(Option.empty(), queryInfo.getEndInstant());
}

Dataset<Row> source = queryRunner.run(queryInfo);
Dataset<Row> filteredSourceData = applyFilter(source, fileFormat);
Pair<QueryInfo, Dataset<Row>> queryInfoDatasetPair = queryRunner.run(queryInfo, snapshotLoadQuerySplitter);
queryInfo = queryInfoDatasetPair.getLeft();
Dataset<Row> filteredSourceData = applyFilter(queryInfoDatasetPair.getRight(), fileFormat);

LOG.info("Adjusting end checkpoint:" + queryInfo.getEndInstant() + " based on sourceLimit :" + sourceLimit);
Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> checkPointAndDataset =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import static org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME;

/**
* Abstract splitter responsible for managing the snapshot load query operations.
*/
Expand Down Expand Up @@ -75,4 +78,10 @@ public QueryInfo getNextCheckpoint(Dataset<Row> df, QueryInfo queryInfo) {
.map(checkpoint -> queryInfo.withUpdatedEndInstant(checkpoint))
.orElse(queryInfo);
}

public static Option<SnapshotLoadQuerySplitter> getInstance(TypedProperties props) {
return props.getNonEmptyStringOpt(SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, null)
.map(className -> (SnapshotLoadQuerySplitter) ReflectionUtils.loadClass(className,
new Class<?>[] {TypedProperties.class}, props));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;

import org.apache.hudi.utilities.sources.SnapshotLoadQuerySplitter;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -62,16 +65,14 @@ public QueryRunner(SparkSession sparkSession, TypedProperties props) {
* @param queryInfo all meta info about the query to be executed.
* @return the output of the query as Dataset < Row >.
*/
public Dataset<Row> run(QueryInfo queryInfo) {
Dataset<Row> dataset = null;
public Pair<QueryInfo, Dataset<Row>> run(QueryInfo queryInfo, Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitterOption) {
if (queryInfo.isIncremental()) {
dataset = runIncrementalQuery(queryInfo);
return runIncrementalQuery(queryInfo);
} else if (queryInfo.isSnapshot()) {
dataset = runSnapshotQuery(queryInfo);
return runSnapshotQuery(queryInfo, snapshotLoadQuerySplitterOption);
} else {
throw new HoodieException("Unknown query type " + queryInfo.getQueryType());
}
return dataset;
}

public static Dataset<Row> applyOrdering(Dataset<Row> dataset, List<String> orderByColumns) {
Expand All @@ -82,26 +83,34 @@ public static Dataset<Row> applyOrdering(Dataset<Row> dataset, List<String> orde
return dataset;
}

public Dataset<Row> runIncrementalQuery(QueryInfo queryInfo) {
public Pair<QueryInfo, Dataset<Row>> runIncrementalQuery(QueryInfo queryInfo) {
LOG.info("Running incremental query");
return sparkSession.read().format("org.apache.hudi")
return Pair.of(queryInfo, sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryInfo.getPreviousInstant())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), queryInfo.getEndInstant())
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(),
props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(),
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().defaultValue()))
.load(sourcePath);
.load(sourcePath));
}

public Dataset<Row> runSnapshotQuery(QueryInfo queryInfo) {
public Pair<QueryInfo, Dataset<Row>> runSnapshotQuery(QueryInfo queryInfo, Option<SnapshotLoadQuerySplitter> snapshotLoadQuerySplitterOption) {
LOG.info("Running snapshot query");
return sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()).load(sourcePath)
Dataset<Row> snapshot = sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), queryInfo.getQueryType()).load(sourcePath);
QueryInfo snapshotQueryInfo = snapshotLoadQuerySplitterOption
.map(snapshotLoadQuerySplitter -> snapshotLoadQuerySplitter.getNextCheckpoint(snapshot, queryInfo))
.orElse(queryInfo);
return Pair.of(snapshotQueryInfo, applySnapshotQueryFilters(snapshot, snapshotQueryInfo));
}

public Dataset<Row> applySnapshotQueryFilters(Dataset<Row> snapshot, QueryInfo snapshotQueryInfo) {
return snapshot
// add filtering so that only interested records are returned.
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
queryInfo.getStartInstant()))
snapshotQueryInfo.getStartInstant()))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
queryInfo.getEndInstant()));
snapshotQueryInfo.getEndInstant()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;
import org.apache.hudi.utilities.sources.helpers.gcs.GcsObjectMetadataFetcher;

Expand All @@ -56,6 +57,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
Expand Down Expand Up @@ -93,6 +96,8 @@ public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarn

@Mock
QueryRunner queryRunner;
@Mock
QueryInfo queryInfo;

protected Option<SchemaProvider> schemaProvider;
private HoodieTableMetaClient metaClient;
Expand Down Expand Up @@ -142,7 +147,7 @@ public void shouldFetchDataIfCommitTimeForReadsLessThanForWrites() throws IOExce
filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);

when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
setMockQueryRunner(inputDs);

readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json");
}
Expand All @@ -160,7 +165,8 @@ public void testTwoFilesAndContinueInSameCommit() throws IOException {
filePathSizeAndCommitTime.add(Triple.of("path/to/file3.json", 200L, "1"));

Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
when(queryRunner.run(Mockito.any())).thenReturn(inputDs);

setMockQueryRunner(inputDs);

readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, "1#path/to/file2.json");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 250L, "1#path/to/file3.json");
Expand All @@ -183,7 +189,7 @@ public void largeBootstrapWithFilters() throws IOException {

Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);

when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
setMockQueryRunner(inputDs);

readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 250L, "1#path/to/file10006.json");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file10006.json"), 250L, "1#path/to/file10007.json");
Expand All @@ -205,18 +211,76 @@ public void testTwoFilesAndContinueAcrossCommits() throws IOException {

Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);

when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
setMockQueryRunner(inputDs);

readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"), 100L, "1#path/to/file2.json");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"), 1000L, "2#path/to/file5.json");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 100L, "1#path/to/file1.json");
}

@ParameterizedTest
@CsvSource({
"1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1",
"2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2",
"3,3#path/to/file5.json,3,1#path/to/file1.json,3"
})
public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, String exptected2, String exptected3, String exptected4) throws IOException {

writeGcsMetadataRecords("1");
writeGcsMetadataRecords("2");
writeGcsMetadataRecords("3");

List<Triple<String, Long, String>> filePathSizeAndCommitTime = new ArrayList<>();
// Add file paths and sizes to the list
filePathSizeAndCommitTime.add(Triple.of("path/to/file1.json", 50L, "1"));
filePathSizeAndCommitTime.add(Triple.of("path/to/file2.json", 50L, "1"));
filePathSizeAndCommitTime.add(Triple.of("path/to/skip1.json", 50L, "2"));
filePathSizeAndCommitTime.add(Triple.of("path/to/skip2.json", 50L, "2"));
filePathSizeAndCommitTime.add(Triple.of("path/to/file5.json", 50L, "3"));
filePathSizeAndCommitTime.add(Triple.of("path/to/file4.json", 50L, "3"));

Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);

setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint));
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix", "path/to/skip");
//1. snapshot query, read all records
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1, typedProperties);
//2. incremental query, as commit is present in timeline
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L, exptected2, typedProperties);
//3. snapshot query with source limit less than first commit size
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3, typedProperties);
typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix", "path/to");
//4. As snapshotQuery will return 1 -> same would be return as nextCheckpoint (dataset is empty due to ignore prefix).
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4, typedProperties);
}

private void setMockQueryRunner(Dataset<Row> inputDs) {
setMockQueryRunner(inputDs, Option.empty());
}

private void setMockQueryRunner(Dataset<Row> inputDs, Option<String> nextCheckPointOpt) {

when(queryRunner.run(Mockito.any(QueryInfo.class), Mockito.any())).thenAnswer(invocation -> {
QueryInfo queryInfo = invocation.getArgument(0);
QueryInfo updatedQueryInfo = nextCheckPointOpt.map(nextCheckPoint ->
queryInfo.withUpdatedEndInstant(nextCheckPoint))
.orElse(queryInfo);
if (updatedQueryInfo.isSnapshot()) {
return Pair.of(updatedQueryInfo,
inputDs.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
updatedQueryInfo.getStartInstant()))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
updatedQueryInfo.getEndInstant())));
}
return Pair.of(updatedQueryInfo, inputDs);
});
}

private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy,
Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint) {
TypedProperties typedProperties = setProps(missingCheckpointStrategy);
typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format", "json");
Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint,
TypedProperties typedProperties) {

GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource(typedProperties, jsc(),
spark(), schemaProvider.orElse(null), new GcsObjectMetadataFetcher(typedProperties, "json"), gcsObjectDataFetcher, queryRunner);
Expand All @@ -230,6 +294,13 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe
Assertions.assertEquals(expectedCheckpoint, nextCheckPoint);
}

private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy,
Option<String> checkpointToPull, long sourceLimit, String expectedCheckpoint) {
TypedProperties typedProperties = setProps(missingCheckpointStrategy);
typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format", "json");
readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit, expectedCheckpoint, typedProperties);
}

private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, String bucketName, String generation) {
String partitionPath = bucketName;

Expand Down
Loading