diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 1e07485d433f..3c023eba38e3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -71,6 +71,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -109,6 +110,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im private String testMethodName; protected transient JavaSparkContext jsc = null; protected transient HoodieSparkEngineContext context = null; + protected transient SparkSession sparkSession = null; protected transient Configuration hadoopConf = null; protected transient SQLContext sqlContext; protected transient FileSystem fs; @@ -182,6 +184,7 @@ protected void initSparkContexts(String appName) { sqlContext = new SQLContext(jsc); context = new HoodieSparkEngineContext(jsc); hadoopConf = context.getHadoopConf().get(); + sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate(); } /** diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index a217e6b7a800..ebb359390be0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -72,12 +72,19 @@ static class Config { /** * {@value #READ_LATEST_INSTANT_ON_MISSING_CKPT} allows delta-streamer to incrementally fetch from latest committed - * instant when checkpoint is not provided. + * instant when checkpoint is not provided. This config is deprecated. Please refer to {@link #MISSING_CHECKPOINT_STRATEGY}. */ + @Deprecated static final String READ_LATEST_INSTANT_ON_MISSING_CKPT = "hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt"; static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = false; + /** + * {@value #MISSING_CHECKPOINT_STRATEGY} allows delta-streamer to decide the checkpoint to consume from when checkpoint is not set. + * instant when checkpoint is not provided. + */ + static final String MISSING_CHECKPOINT_STRATEGY = "hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy"; + /** * {@value #SOURCE_FILE_FORMAT} is passed to the reader while loading dataset. Default value is parquet. */ @@ -106,13 +113,18 @@ public Pair>, String> fetchNextBatch(Option lastCkpt int numInstantsPerFetch = props.getInteger(Config.NUM_INSTANTS_PER_FETCH, Config.DEFAULT_NUM_INSTANTS_PER_FETCH); boolean readLatestOnMissingCkpt = props.getBoolean(Config.READ_LATEST_INSTANT_ON_MISSING_CKPT, Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT); + IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy = (props.containsKey(Config.MISSING_CHECKPOINT_STRATEGY)) + ? IncrSourceHelper.MissingCheckpointStrategy.valueOf(props.getString(Config.MISSING_CHECKPOINT_STRATEGY)) : null; + if (readLatestOnMissingCkpt) { + missingCheckpointStrategy = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST; + } // Use begin Instant if set and non-empty Option beginInstant = lastCkptStr.isPresent() ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr : Option.empty(); Pair instantEndpts = IncrSourceHelper.calculateBeginAndEndInstants(sparkContext, srcPath, - numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt); + numInstantsPerFetch, beginInstant, missingCheckpointStrategy); if (instantEndpts.getKey().equals(instantEndpts.getValue())) { LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index ec789ab28f49..c011157cc06f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -83,6 +83,11 @@ public Pair>, String> fetchNextBatch(Option lastCkpt int numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, DEFAULT_NUM_INSTANTS_PER_FETCH); boolean readLatestOnMissingCkpt = props.getBoolean( READ_LATEST_INSTANT_ON_MISSING_CKPT, DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT); + IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy = (props.containsKey(HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY)) + ? IncrSourceHelper.MissingCheckpointStrategy.valueOf(props.getString(HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY)) : null; + if (readLatestOnMissingCkpt) { + missingCheckpointStrategy = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST; + } // Use begin Instant if set and non-empty Option beginInstant = @@ -92,7 +97,7 @@ public Pair>, String> fetchNextBatch(Option lastCkpt Pair instantEndpts = IncrSourceHelper.calculateBeginAndEndInstants( - sparkContext, srcPath, numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt); + sparkContext, srcPath, numInstantsPerFetch, beginInstant, missingCheckpointStrategy); if (instantEndpts.getKey().equals(instantEndpts.getValue())) { LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey()); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index 8f434a007a0b..a370c314a168 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -32,6 +32,17 @@ public class IncrSourceHelper { + private static final String DEFAULT_BEGIN_TIMESTAMP = "000"; + /** + * Kafka reset offset strategies. + */ + public enum MissingCheckpointStrategy { + // read from latest commit in hoodie source table + READ_LATEST, + // read everything upto latest commit + READ_UPTO_LATEST_COMMIT + } + /** * Get a timestamp which is the next value in a descending sequence. * @@ -47,15 +58,15 @@ private static String getStrictlyLowerTimestamp(String timestamp) { /** * Find begin and end instants to be set for the next fetch. * - * @param jssc Java Spark Context - * @param srcBasePath Base path of Hudi source table - * @param numInstantsPerFetch Max Instants per fetch - * @param beginInstant Last Checkpoint String - * @param readLatestOnMissingBeginInstant when begin instant is missing, allow reading from latest committed instant + * @param jssc Java Spark Context + * @param srcBasePath Base path of Hudi source table + * @param numInstantsPerFetch Max Instants per fetch + * @param beginInstant Last Checkpoint String + * @param missingCheckpointStrategy when begin instant is missing, allow reading based on missing checkpoint strategy * @return begin and end instants */ public static Pair calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath, - int numInstantsPerFetch, Option beginInstant, boolean readLatestOnMissingBeginInstant) { + int numInstantsPerFetch, Option beginInstant, MissingCheckpointStrategy missingCheckpointStrategy) { ValidationUtils.checkArgument(numInstantsPerFetch > 0, "Make sure the config hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value"); HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build(); @@ -64,27 +75,38 @@ public static Pair calculateBeginAndEndInstants(JavaSparkContext srcMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); String beginInstantTime = beginInstant.orElseGet(() -> { - if (readLatestOnMissingBeginInstant) { - Option lastInstant = activeCommitTimeline.lastInstant(); - return lastInstant.map(hoodieInstant -> getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse("000"); + if (missingCheckpointStrategy != null) { + if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST) { + Option lastInstant = activeCommitTimeline.lastInstant(); + return lastInstant.map(hoodieInstant -> getStrictlyLowerTimestamp(hoodieInstant.getTimestamp())).orElse(DEFAULT_BEGIN_TIMESTAMP); + } else { + return DEFAULT_BEGIN_TIMESTAMP; + } } else { throw new IllegalArgumentException("Missing begin instant for incremental pull. For reading from latest " - + "committed instant set hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt to true"); + + "committed instant set hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy to a valid value"); } }); - Option nthInstant = Option.fromJavaOptional(activeCommitTimeline - .findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y)); - return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)); + if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) { + Option nthInstant = Option.fromJavaOptional(activeCommitTimeline + .findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y)); + return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)); + } else { + // if beginInstant is DEFAULT_BEGIN_TIMESTAMP, MissingCheckpointStrategy should be set. + // when MissingCheckpointStrategy is set to read everything until latest. + Option lastInstant = activeCommitTimeline.lastInstant(); + return Pair.of(beginInstantTime, lastInstant.get().getTimestamp()); + } } /** * Validate instant time seen in the incoming row. * - * @param row Input Row - * @param instantTime Hoodie Instant time of the row + * @param row Input Row + * @param instantTime Hoodie Instant time of the row * @param sinceInstant begin instant of the batch - * @param endInstant end instant of the batch + * @param endInstant end instant of the batch */ public static void validateInstantTime(Row row, String instantTime, String sinceInstant, String endInstant) { Objects.requireNonNull(instantTime); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java new file mode 100644 index 000000000000..250e288294ac --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; + +import org.apache.avro.Schema; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHoodieIncrSource extends HoodieClientTestHarness { + + @BeforeEach + public void setUp() throws IOException { + initResources(); + } + + @AfterEach + public void tearDown() throws IOException { + cleanupResources(); + } + + @Test + public void testHoodieIncrSource() throws IOException { + HoodieWriteConfig writeConfig = getConfigBuilder(basePath).build(); + + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig); + Pair> inserts = writeRecords(writeClient, true, null); + Pair> inserts2 = writeRecords(writeClient, true, null); + Pair> inserts3 = writeRecords(writeClient, true, null); + + // read everything upto latest + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, 300, inserts3.getKey()); + + // read just the latest + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, 100, inserts3.getKey()); + } + + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, int expectedCount, String expectedCheckpoint) { + + Properties properties = new Properties(); + properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath); + properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name()); + TypedProperties typedProperties = new TypedProperties(properties); + HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc, sparkSession, new TestSchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA)); + + // read everything until latest + Pair>, String> batchCheckPoint = incrSource.fetchNextBatch(Option.empty(), 500); + Assertions.assertNotNull(batchCheckPoint.getValue()); + assertEquals(batchCheckPoint.getKey().get().count(), expectedCount); + Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint); + } + + public Pair> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List insertRecords) throws IOException { + String commit = writeClient.startCommit(); + List records = insert ? dataGen.generateInserts(commit, 100) : dataGen.generateUpdates(commit, insertRecords); + JavaRDD result = writeClient.upsert(jsc.parallelize(records, 1), commit); + List statuses = result.collect(); + assertNoWriteErrors(statuses); + return Pair.of(commit, records); + } + + public HoodieWriteConfig.Builder getConfigBuilder(String basePath) { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2) + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .forTable("test-hoodie-incr-source"); + } + + class TestSchemaProvider extends SchemaProvider { + + private final Schema schema; + + public TestSchemaProvider(Schema schema) { + super(new TypedProperties()); + this.schema = schema; + } + + @Override + public Schema getSourceSchema() { + return schema; + } + } +}