From 208e59e7c6125fb4b499ac8bbcf45ac690d2b6bf Mon Sep 17 00:00:00 2001 From: liujinhui1994 <965147871@qq.com> Date: Mon, 2 Nov 2020 22:49:25 +0800 Subject: [PATCH 1/9] hudi-1367 --- .../sources/helpers/KafkaOffsetGen.java | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index fc7ba79096bf4..ab418f6d63d1d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -40,6 +40,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; /** @@ -156,15 +158,18 @@ public static class Config { private final HashMap kafkaParams; private final TypedProperties props; protected final String topicName; + private final KafkaResetOffsetStrategies autoResetValue; public KafkaOffsetGen(TypedProperties props) { this.props = props; + kafkaParams = new HashMap<>(); for (Object prop : props.keySet()) { kafkaParams.put(prop.toString(), props.get(prop.toString())); } DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME)); topicName = props.getString(Config.KAFKA_TOPIC_NAME); + autoResetValue = KafkaResetOffsetStrategies.valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase()); } public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) { @@ -186,8 +191,6 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions); metrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer)); } else { - KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies - .valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase()); switch (autoResetValue) { case EARLIEST: fromOffsets = consumer.beginningOffsets(topicPartitions); @@ -227,12 +230,23 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long // else return earliest offsets private Map checkupValidOffsets(KafkaConsumer consumer, Option lastCheckpointStr, Set topicPartitions) { - Map checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); Map earliestOffsets = consumer.beginningOffsets(topicPartitions); + if (checkTopicCheckPoint(lastCheckpointStr)) { + Map checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); + boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream() + .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey())); + return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; + } + + switch (autoResetValue) { + case EARLIEST: + return earliestOffsets; + case LATEST: + return consumer.endOffsets(topicPartitions); + default: + throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' "); + } - boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream() - .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey())); - return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; } private Long delayOffsetCalculation(Option lastCheckpointStr, Set topicPartitions, KafkaConsumer consumer) { @@ -257,6 +271,12 @@ public boolean checkTopicExists(KafkaConsumer consumer) { return result.containsKey(topicName); } + public boolean checkTopicCheckPoint(Option lastCheckpointStr) { + Pattern DIRECTORY_PATTERN = Pattern.compile(".*=.*"); + Matcher matcher = DIRECTORY_PATTERN.matcher(lastCheckpointStr.get()); + return matcher.matches(); + } + public String getTopicName() { return topicName; } From 3a5fd9b91b20cfc245855d2e7d0c5902fada60e6 Mon Sep 17 00:00:00 2001 From: liujinhui1994 <965147871@qq.com> Date: Tue, 3 Nov 2020 09:18:00 +0800 Subject: [PATCH 2/9] hudi-1367 --- .../hudi/utilities/sources/helpers/KafkaOffsetGen.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index ab418f6d63d1d..a9cb99e60d6fe 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -51,6 +51,8 @@ public class KafkaOffsetGen { private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class); + private final Pattern pattern = Pattern.compile(".*=.*"); + public static class CheckpointUtils { /** @@ -272,8 +274,7 @@ public boolean checkTopicExists(KafkaConsumer consumer) { } public boolean checkTopicCheckPoint(Option lastCheckpointStr) { - Pattern DIRECTORY_PATTERN = Pattern.compile(".*=.*"); - Matcher matcher = DIRECTORY_PATTERN.matcher(lastCheckpointStr.get()); + Matcher matcher = pattern.matcher(lastCheckpointStr.get()); return matcher.matches(); } From b998d8547208490753d0fdf0b15f12121a128636 Mon Sep 17 00:00:00 2001 From: liujinhui1994 <965147871@qq.com> Date: Tue, 3 Nov 2020 10:43:55 +0800 Subject: [PATCH 3/9] hudi-1367 --- .../apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index a9cb99e60d6fe..f96e462ef34b1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -51,7 +51,7 @@ public class KafkaOffsetGen { private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class); - private final Pattern pattern = Pattern.compile(".*=.*"); + private final Pattern pattern = Pattern.compile(".*,.*:.*"); public static class CheckpointUtils { From e2a0ce91bf7f784bfb294c9574e78115131cfb92 Mon Sep 17 00:00:00 2001 From: liujinhui1994 <965147871@qq.com> Date: Wed, 11 Nov 2020 10:19:23 +0800 Subject: [PATCH 4/9] hudi-1367 Modify as suggested: Enum more KafkaResetOffsetStrategies --- .../apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index f96e462ef34b1..80b8fee09062e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -142,7 +142,7 @@ public static long totalNewMessages(OffsetRange[] ranges) { * Kafka reset offset strategies. */ enum KafkaResetOffsetStrategies { - LATEST, EARLIEST + LATEST, EARLIEST, NONE } /** From 33444fdb84f3f1b9de22ed90a7db6ebc8eeb0b2b Mon Sep 17 00:00:00 2001 From: liujinhui1994 <965147871@qq.com> Date: Sat, 20 Feb 2021 17:31:11 +0800 Subject: [PATCH 5/9] Fix some defect --- .../hudi/utilities/sources/helpers/KafkaOffsetGen.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 80b8fee09062e..45fad63fb54cb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -51,6 +51,9 @@ public class KafkaOffsetGen { private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class); + /** + * kafka checkpoint Pattern. + */ private final Pattern pattern = Pattern.compile(".*,.*:.*"); public static class CheckpointUtils { @@ -142,7 +145,7 @@ public static long totalNewMessages(OffsetRange[] ranges) { * Kafka reset offset strategies. */ enum KafkaResetOffsetStrategies { - LATEST, EARLIEST, NONE + LATEST, EARLIEST } /** @@ -233,7 +236,7 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long private Map checkupValidOffsets(KafkaConsumer consumer, Option lastCheckpointStr, Set topicPartitions) { Map earliestOffsets = consumer.beginningOffsets(topicPartitions); - if (checkTopicCheckPoint(lastCheckpointStr)) { + if (checkTopicCheckpoint(lastCheckpointStr)) { Map checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream() .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey())); @@ -273,7 +276,7 @@ public boolean checkTopicExists(KafkaConsumer consumer) { return result.containsKey(topicName); } - public boolean checkTopicCheckPoint(Option lastCheckpointStr) { + private boolean checkTopicCheckpoint(Option lastCheckpointStr) { Matcher matcher = pattern.matcher(lastCheckpointStr.get()); return matcher.matches(); } From d531fb1d2d97797fbe2ea9faf7e8a774498550a8 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Sat, 20 Feb 2021 19:18:29 -0500 Subject: [PATCH 6/9] Adding tests to delta streamer to test transition from parquet source to kafka source --- .../functional/TestHoodieDeltaStreamer.java | 141 +++++++++++++++++- .../testutils/UtilitiesTestBase.java | 17 ++- 2 files changed, 146 insertions(+), 12 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 616d039ed6600..eff8707634d22 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -48,6 +48,7 @@ import org.apache.hudi.utilities.sources.CsvDFSSource; import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.InputBatch; +import org.apache.hudi.utilities.sources.JsonKafkaSource; import org.apache.hudi.utilities.sources.ParquetDFSSource; import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; @@ -56,10 +57,12 @@ import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer; import org.apache.hudi.utilities.transform.Transformer; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -116,9 +119,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties"; private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; + private static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties"; private static String PARQUET_SOURCE_ROOT; + private static String JSON_KAFKA_SOURCE_ROOT; private static final int PARQUET_NUM_RECORDS = 5; private static final int CSV_NUM_RECORDS = 3; + private static final int JSON_KAFKA_NUM_RECORDS = 5; // Required fields private static final String TGT_BASE_PATH_PARAM = "--target-base-path"; private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah"; @@ -143,6 +149,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { public static void initClass() throws Exception { UtilitiesTestBase.initClass(true); PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles"; + JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles"; testUtils = new KafkaTestUtils(); testUtils.setup(); @@ -966,27 +973,55 @@ public void testDistributedTestDataSource() { } private static void prepareParquetDFSFiles(int numRecords) throws IOException { - String path = PARQUET_SOURCE_ROOT + "/1.parquet"; + prepareParquetDFSFiles(numRecords, "1.parquet", false, null, null); + } + + private static void prepareParquetDFSFiles(int numRecords, String fileName, boolean useCustomSchema, + String schemaStr, Schema schema) throws IOException { + String path = PARQUET_SOURCE_ROOT + "/" + fileName; HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - Helpers.saveParquetToDFS(Helpers.toGenericRecords( - dataGenerator.generateInserts("000", numRecords)), new Path(path)); + if (useCustomSchema) { + Helpers.saveParquetToDFS(Helpers.toGenericRecords( + dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr), + schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA); + } else { + Helpers.saveParquetToDFS(Helpers.toGenericRecords( + dataGenerator.generateInserts("000", numRecords)), new Path(path)); + } + } + + private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic) throws IOException { + if (createTopic) { + try { + testUtils.createTopic("topic1", 2); + } catch (TopicExistsException e) { + // no op + } + } + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + testUtils.sendMessages("topic1", Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA))); } private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException { + prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", + PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT); + } + + private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, + String propsFileName, String parquetSourceRoot) throws IOException { // Properties used for testing delta-streamer with Parquet source TypedProperties parquetProps = new TypedProperties(); parquetProps.setProperty("include", "base.properties"); parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); if (useSchemaProvider) { - parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); + parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + sourceSchemaFile); if (hasTransformer) { - parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); + parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/" + targetSchemaFile); } } - parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT); - - UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET); + parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot); + UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + propsFileName); } private void testParquetDFSSource(boolean useSchemaProvider, List transformerClassNames) throws Exception { @@ -1001,6 +1036,96 @@ private void testParquetDFSSource(boolean useSchemaProvider, List transf testNum++; } + private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue) throws IOException { + // Properties used for testing delta-streamer with JsonKafka source + TypedProperties props = new TypedProperties(); + populateCommonProps(props); + props.setProperty("include", "base.properties"); + props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); + props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT); + props.setProperty("hoodie.deltastreamer.source.kafka.topic","topic1"); + props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc"); + props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc"); + props.setProperty("auto.offset.reset", autoResetValue); + + UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName); + } + + /** + * Tests Deltastreamer with parquet dfs source and transitions to JsonKafkaSource. + * @param autoResetToLatest true if auto reset value to be set to LATEST. false to leave it as default(i.e. EARLIEST) + * @throws Exception + */ + private void testDeltaStreamerWithParquetSourceAndTransitionToKafkaSource(boolean autoResetToLatest) throws Exception { + // prep parquet source + PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum; + int parquetRecords = 10; + prepareParquetDFSFiles(parquetRecords,"1.parquet", true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA); + + prepareParquetDFSSource(true, false,"source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET, + PARQUET_SOURCE_ROOT); + // delta streamer w/ parquest source + String tableBasePath = dfsBasePath + "/test_dfs_to_kakfa" + testNum; + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), + Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false, + false, 100000, false, null, null, "timestamp"), jsc); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(parquetRecords, tableBasePath + "/*/*.parquet", sqlContext); + deltaStreamer.shutdownGracefully(); + + // prep json kafka source + prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true); + prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, autoResetToLatest ? "latest" : "earliest"); + // delta streamer w/ json kafka source + deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), + Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, + true, 100000, false, null, null, "timestamp"), jsc); + deltaStreamer.sync(); + // if auto reset value is set to LATEST, this all kafka records so far may not be synced. + int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : JSON_KAFKA_NUM_RECORDS); + TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext); + + // verify 2nd batch to test LATEST auto reset value. + prepareJsonKafkaDFSFiles(20, false); + totalExpectedRecords += 20; + deltaStreamer.sync(); + TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext); + testNum++; + } + + @Test + public void testJsonKafkaDFSSource() throws Exception { + prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true); + prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest"); + String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum; + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), + Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false, + true, 100000, false, null, null, "timestamp"), jsc); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); + + int totalRecords = JSON_KAFKA_NUM_RECORDS; + int records = 10; + totalRecords += records; + prepareJsonKafkaDFSFiles(records, false); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); + } + + @Test + public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws Exception { + testDeltaStreamerWithParquetSourceAndTransitionToKafkaSource(false); + } + + @Test + public void testParquetSourceToKafkaSourceLatestAutoResetValue() throws Exception { + testDeltaStreamerWithParquetSourceAndTransitionToKafkaSource(true); + } + @Test public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception { testParquetDFSSource(false, null); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index 0bbdb23466984..b83fa7890fff9 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -46,6 +46,7 @@ import com.fasterxml.jackson.dataformat.csv.CsvMapper; import com.fasterxml.jackson.dataformat.csv.CsvSchema; import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; @@ -279,8 +280,12 @@ public static void saveCsvToDFS( } public static void saveParquetToDFS(List records, Path targetFile) throws IOException { + saveParquetToDFS(records, targetFile, HoodieTestDataGenerator.AVRO_SCHEMA); + } + + public static void saveParquetToDFS(List records, Path targetFile, Schema schema) throws IOException { try (ParquetWriter writer = AvroParquetWriter.builder(targetFile) - .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA) + .withSchema(schema) .withConf(HoodieTestUtils.getDefaultHadoopConf()) .withWriteMode(Mode.OVERWRITE) .build()) { @@ -308,9 +313,9 @@ public static TypedProperties setupSchemaOnDFSWithAbsoluteScope(String scope, St return props; } - public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) { + public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, Schema schema) { try { - Option recordOpt = hoodieRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA); + Option recordOpt = hoodieRecord.getData().getInsertValue(schema); return (GenericRecord) recordOpt.get(); } catch (IOException e) { return null; @@ -318,9 +323,13 @@ public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) { } public static List toGenericRecords(List hoodieRecords) { + return toGenericRecords(hoodieRecords, HoodieTestDataGenerator.AVRO_SCHEMA); + } + + public static List toGenericRecords(List hoodieRecords, Schema schema) { List records = new ArrayList<>(); for (HoodieRecord hoodieRecord : hoodieRecords) { - records.add(toGenericRecord(hoodieRecord)); + records.add(toGenericRecord(hoodieRecord, schema)); } return records; } From 39adecbf70449aaa02b083146662f2507dec3355 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Tue, 23 Feb 2021 20:47:55 -0500 Subject: [PATCH 7/9] Fixing test failures --- .../functional/TestHoodieDeltaStreamer.java | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index eff8707634d22..44a6145d388d5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -142,6 +142,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"; private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); public static KafkaTestUtils testUtils; + private static String topicName; private static int testNum = 1; @@ -152,6 +153,7 @@ public static void initClass() throws Exception { JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles"; testUtils = new KafkaTestUtils(); testUtils.setup(); + topicName = "topic" + testNum; // prepare the configs. UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties"); @@ -990,16 +992,16 @@ private static void prepareParquetDFSFiles(int numRecords, String fileName, bool } } - private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic) throws IOException { + private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) throws IOException { if (createTopic) { try { - testUtils.createTopic("topic1", 2); + testUtils.createTopic(topicName, 2); } catch (TopicExistsException e) { // no op } } HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - testUtils.sendMessages("topic1", Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA))); + testUtils.sendMessages(topicName, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA))); } private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException { @@ -1012,6 +1014,7 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans // Properties used for testing delta-streamer with Parquet source TypedProperties parquetProps = new TypedProperties(); parquetProps.setProperty("include", "base.properties"); + parquetProps.setProperty("hoodie.embed.timeline.server","false"); parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); if (useSchemaProvider) { @@ -1036,15 +1039,16 @@ private void testParquetDFSSource(boolean useSchemaProvider, List transf testNum++; } - private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue) throws IOException { + private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException { // Properties used for testing delta-streamer with JsonKafka source TypedProperties props = new TypedProperties(); populateCommonProps(props); props.setProperty("include", "base.properties"); + props.setProperty("hoodie.embed.timeline.server","false"); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT); - props.setProperty("hoodie.deltastreamer.source.kafka.topic","topic1"); + props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName); props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc"); props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc"); props.setProperty("auto.offset.reset", autoResetValue); @@ -1057,7 +1061,7 @@ private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetVal * @param autoResetToLatest true if auto reset value to be set to LATEST. false to leave it as default(i.e. EARLIEST) * @throws Exception */ - private void testDeltaStreamerWithParquetSourceAndTransitionToKafkaSource(boolean autoResetToLatest) throws Exception { + private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean autoResetToLatest) throws Exception { // prep parquet source PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum; int parquetRecords = 10; @@ -1076,8 +1080,9 @@ private void testDeltaStreamerWithParquetSourceAndTransitionToKafkaSource(boolea deltaStreamer.shutdownGracefully(); // prep json kafka source - prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true); - prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, autoResetToLatest ? "latest" : "earliest"); + topicName = "topic" + testNum; + prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName); + prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, autoResetToLatest ? "latest" : "earliest", topicName); // delta streamer w/ json kafka source deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), @@ -1089,7 +1094,7 @@ private void testDeltaStreamerWithParquetSourceAndTransitionToKafkaSource(boolea TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext); // verify 2nd batch to test LATEST auto reset value. - prepareJsonKafkaDFSFiles(20, false); + prepareJsonKafkaDFSFiles(20, false, topicName); totalExpectedRecords += 20; deltaStreamer.sync(); TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext); @@ -1098,8 +1103,9 @@ private void testDeltaStreamerWithParquetSourceAndTransitionToKafkaSource(boolea @Test public void testJsonKafkaDFSSource() throws Exception { - prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true); - prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest"); + topicName = "topic" + testNum; + prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName); + prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest",topicName); String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(), @@ -1111,19 +1117,19 @@ public void testJsonKafkaDFSSource() throws Exception { int totalRecords = JSON_KAFKA_NUM_RECORDS; int records = 10; totalRecords += records; - prepareJsonKafkaDFSFiles(records, false); + prepareJsonKafkaDFSFiles(records, false, topicName); deltaStreamer.sync(); TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); } @Test public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws Exception { - testDeltaStreamerWithParquetSourceAndTransitionToKafkaSource(false); + testDeltaStreamerTransitionFromParquetToKafkaSource(false); } @Test public void testParquetSourceToKafkaSourceLatestAutoResetValue() throws Exception { - testDeltaStreamerWithParquetSourceAndTransitionToKafkaSource(true); + testDeltaStreamerTransitionFromParquetToKafkaSource(true); } @Test From f14edeb95d5079c4c168533b4409a0f911bdf260 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Wed, 24 Feb 2021 10:23:25 -0500 Subject: [PATCH 8/9] Fixing fialing test --- .../functional/TestHoodieDeltaStreamer.java | 4 ++-- .../TestHoodieMultiTableDeltaStreamer.java | 21 ++++++++++++------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 44a6145d388d5..94394b574130a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -142,9 +142,9 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3"; private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); public static KafkaTestUtils testUtils; - private static String topicName; + protected static String topicName; - private static int testNum = 1; + protected static int testNum = 1; @BeforeAll public static void initClass() throws Exception { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java index a6f4edfb57c22..ad1b75395aad3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java @@ -119,12 +119,14 @@ public void testInvalidIngestionProps() { @Test //0 corresponds to fg public void testMultiTableExecution() throws IOException { //create topics for each table - testUtils.createTopic("topic1", 2); - testUtils.createTopic("topic2", 2); + String topicName1 = "topic" + testNum++; + String topicName2 = "topic" + testNum; + testUtils.createTopic(topicName1, 2); + testUtils.createTopic(topicName2, 2); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - testUtils.sendMessages("topic1", Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); - testUtils.sendMessages("topic2", Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); + testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); + testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); HoodieMultiTableDeltaStreamer.Config cfg = TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", JsonKafkaSource.class.getName(), false); HoodieMultiTableDeltaStreamer streamer = new HoodieMultiTableDeltaStreamer(cfg, jsc); @@ -132,21 +134,23 @@ public void testMultiTableExecution() throws IOException { TypedProperties properties = executionContexts.get(1).getProperties(); properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc"); properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc"); + properties.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName2); executionContexts.get(1).setProperties(properties); TypedProperties properties1 = executionContexts.get(0).getProperties(); properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_short_trip_uber.avsc"); properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_short_trip_uber.avsc"); + properties1.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName1); executionContexts.get(0).setProperties(properties1); - String targetBasePath1 = executionContexts.get(1).getConfig().targetBasePath; - String targetBasePath2 = executionContexts.get(0).getConfig().targetBasePath; + String targetBasePath1 = executionContexts.get(0).getConfig().targetBasePath; + String targetBasePath2 = executionContexts.get(1).getConfig().targetBasePath; streamer.sync(); TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + "/*/*.parquet", sqlContext); TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext); //insert updates for already existing records in kafka topics - testUtils.sendMessages("topic1", Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); - testUtils.sendMessages("topic2", Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); + testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA))); + testUtils.sendMessages(topicName2, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, HoodieTestDataGenerator.SHORT_TRIP_SCHEMA))); streamer.sync(); assertEquals(2, streamer.getSuccessTables().size()); assertTrue(streamer.getFailedTables().isEmpty()); @@ -154,5 +158,6 @@ public void testMultiTableExecution() throws IOException { //assert the record count matches now TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + "/*/*.parquet", sqlContext); TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext); + testNum++; } } From 3ea040e1ec261cac4a1f894bc5197c34774e717c Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Wed, 24 Feb 2021 22:13:20 -0500 Subject: [PATCH 9/9] Addressing feedback --- .../sources/helpers/KafkaOffsetGen.java | 20 ++++++++++++++++--- .../functional/TestHoodieDeltaStreamer.java | 4 ++-- .../utilities/sources/TestKafkaSource.java | 2 +- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 45fad63fb54cb..e37ec0a11435e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -21,6 +21,7 @@ import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieDeltaStreamerException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -53,6 +54,7 @@ public class KafkaOffsetGen { /** * kafka checkpoint Pattern. + * Format: topic_name,partition_num:offset,partition_num:offset,.... */ private final Pattern pattern = Pattern.compile(".*,.*:.*"); @@ -155,7 +157,8 @@ public static class Config { private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic"; private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents"; - private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LATEST; + private static final String KAFKA_AUTO_RESET_OFFSETS = "hoodie.deltastreamer.source.kafka.auto.reset.offsets"; + private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_RESET_OFFSETS = KafkaResetOffsetStrategies.LATEST; public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000; public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE; } @@ -163,7 +166,7 @@ public static class Config { private final HashMap kafkaParams; private final TypedProperties props; protected final String topicName; - private final KafkaResetOffsetStrategies autoResetValue; + private KafkaResetOffsetStrategies autoResetValue; public KafkaOffsetGen(TypedProperties props) { this.props = props; @@ -174,7 +177,18 @@ public KafkaOffsetGen(TypedProperties props) { } DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME)); topicName = props.getString(Config.KAFKA_TOPIC_NAME); - autoResetValue = KafkaResetOffsetStrategies.valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase()); + String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_RESET_OFFSETS, Config.DEFAULT_KAFKA_AUTO_RESET_OFFSETS.name()); + boolean found = false; + for (KafkaResetOffsetStrategies entry: KafkaResetOffsetStrategies.values()) { + if (entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) { + found = true; + autoResetValue = entry; + break; + } + } + if (!found) { + throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_RESET_OFFSETS + " config set to unknown value " + kafkaAutoResetOffsetsStr); + } } public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 94394b574130a..7fb5b1862dc9a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -245,7 +245,7 @@ private static void populateCommonProps(TypedProperties props) { //Kafka source properties props.setProperty("bootstrap.servers", testUtils.brokerAddress()); - props.setProperty("auto.offset.reset", "earliest"); + props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", "earliest"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000)); @@ -1051,7 +1051,7 @@ private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetVal props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName); props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc"); props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc"); - props.setProperty("auto.offset.reset", autoResetValue); + props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", autoResetValue); UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index e8cb2a6f1e5f9..9004c661bcd76 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -88,7 +88,7 @@ private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSo TypedProperties props = new TypedProperties(); props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); - props.setProperty("auto.offset.reset", resetStrategy); + props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", resetStrategy); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(Config.maxEventsFromKafkaSource));