Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;

Expand All @@ -40,6 +41,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
Expand All @@ -49,6 +52,12 @@ public class KafkaOffsetGen {

private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class);

/**
* kafka checkpoint Pattern.
* Format: topic_name,partition_num:offset,partition_num:offset,....
*/
private final Pattern pattern = Pattern.compile(".*,.*:.*");

public static class CheckpointUtils {

/**
Expand Down Expand Up @@ -148,23 +157,38 @@ public static class Config {

private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LATEST;
private static final String KAFKA_AUTO_RESET_OFFSETS = "hoodie.deltastreamer.source.kafka.auto.reset.offsets";
private static final KafkaResetOffsetStrategies DEFAULT_KAFKA_AUTO_RESET_OFFSETS = KafkaResetOffsetStrategies.LATEST;
public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000;
public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE;
}

private final HashMap<String, Object> kafkaParams;
private final TypedProperties props;
protected final String topicName;
private KafkaResetOffsetStrategies autoResetValue;

public KafkaOffsetGen(TypedProperties props) {
this.props = props;

kafkaParams = new HashMap<>();
for (Object prop : props.keySet()) {
kafkaParams.put(prop.toString(), props.get(prop.toString()));
}
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME));
topicName = props.getString(Config.KAFKA_TOPIC_NAME);
String kafkaAutoResetOffsetsStr = props.getString(Config.KAFKA_AUTO_RESET_OFFSETS, Config.DEFAULT_KAFKA_AUTO_RESET_OFFSETS.name());
boolean found = false;
for (KafkaResetOffsetStrategies entry: KafkaResetOffsetStrategies.values()) {
if (entry.name().toLowerCase().equals(kafkaAutoResetOffsetsStr)) {
found = true;
autoResetValue = entry;
break;
}
}
if (!found) {
throw new HoodieDeltaStreamerException(Config.KAFKA_AUTO_RESET_OFFSETS + " config set to unknown value " + kafkaAutoResetOffsetsStr);
}
}

public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit, HoodieDeltaStreamerMetrics metrics) {
Expand All @@ -186,8 +210,6 @@ public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long
fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions);
metrics.updateDeltaStreamerKafkaDelayCountMetrics(delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer));
} else {
KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
.valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
switch (autoResetValue) {
case EARLIEST:
fromOffsets = consumer.beginningOffsets(topicPartitions);
Expand Down Expand Up @@ -227,12 +249,23 @@ public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long
// else return earliest offsets
private Map<TopicPartition, Long> checkupValidOffsets(KafkaConsumer consumer,
Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
if (checkTopicCheckpoint(lastCheckpointStr)) {
Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
.anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
}

switch (autoResetValue) {
case EARLIEST:
return earliestOffsets;
case LATEST:
return consumer.endOffsets(topicPartitions);
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it intentional for NONE we will throw exception? also, may I know where is NONE(KafkaResetOffsetStrategies) used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don’t think it’s necessary to add NONE

throw new HoodieNotSupportedException("Auto reset value must be one of 'earliest' or 'latest' ");
}

boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream()
.anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets;
}

private Long delayOffsetCalculation(Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions, KafkaConsumer consumer) {
Expand All @@ -257,6 +290,11 @@ public boolean checkTopicExists(KafkaConsumer consumer) {
return result.containsKey(topicName);
}

private boolean checkTopicCheckpoint(Option<String> lastCheckpointStr) {
Matcher matcher = pattern.matcher(lastCheckpointStr.get());
return matcher.matches();
}

public String getTopicName() {
return topicName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonKafkaSource;
import org.apache.hudi.utilities.sources.ParquetDFSSource;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
Expand All @@ -56,10 +57,12 @@
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -116,9 +119,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
private static final String PROPS_FILENAME_TEST_CSV = "test-csv-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties";
private static final String PROPS_FILENAME_TEST_JSON_KAFKA = "test-json-kafka-dfs-source.properties";
private static String PARQUET_SOURCE_ROOT;
private static String JSON_KAFKA_SOURCE_ROOT;
private static final int PARQUET_NUM_RECORDS = 5;
private static final int CSV_NUM_RECORDS = 3;
private static final int JSON_KAFKA_NUM_RECORDS = 5;
// Required fields
private static final String TGT_BASE_PATH_PARAM = "--target-base-path";
private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
Expand All @@ -136,15 +142,18 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final String HOODIE_CONF_VALUE2 = "hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
public static KafkaTestUtils testUtils;
protected static String topicName;

private static int testNum = 1;
protected static int testNum = 1;

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initClass(true);
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
testUtils = new KafkaTestUtils();
testUtils.setup();
topicName = "topic" + testNum;

// prepare the configs.
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties");
Expand Down Expand Up @@ -236,7 +245,7 @@ private static void populateCommonProps(TypedProperties props) {

//Kafka source properties
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", "earliest");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", String.valueOf(5000));
Expand Down Expand Up @@ -966,27 +975,56 @@ public void testDistributedTestDataSource() {
}

private static void prepareParquetDFSFiles(int numRecords) throws IOException {
String path = PARQUET_SOURCE_ROOT + "/1.parquet";
prepareParquetDFSFiles(numRecords, "1.parquet", false, null, null);
}

private static void prepareParquetDFSFiles(int numRecords, String fileName, boolean useCustomSchema,
String schemaStr, Schema schema) throws IOException {
String path = PARQUET_SOURCE_ROOT + "/" + fileName;
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
dataGenerator.generateInserts("000", numRecords)), new Path(path));
if (useCustomSchema) {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr),
schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
} else {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
dataGenerator.generateInserts("000", numRecords)), new Path(path));
}
}

private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) throws IOException {
if (createTopic) {
try {
testUtils.createTopic(topicName, 2);
} catch (TopicExistsException e) {
// no op
}
}
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
testUtils.sendMessages(topicName, Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", numRecords, HoodieTestDataGenerator.TRIP_SCHEMA)));
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException {
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc",
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT);
}

private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
String propsFileName, String parquetSourceRoot) throws IOException {
// Properties used for testing delta-streamer with Parquet source
TypedProperties parquetProps = new TypedProperties();
parquetProps.setProperty("include", "base.properties");
parquetProps.setProperty("hoodie.embed.timeline.server","false");
parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
if (useSchemaProvider) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + sourceSchemaFile);
if (hasTransformer) {
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/" + targetSchemaFile);
}
}
parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT);

UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", parquetSourceRoot);
UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + propsFileName);
}

private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
Expand All @@ -1001,6 +1039,99 @@ private void testParquetDFSSource(boolean useSchemaProvider, List<String> transf
testNum++;
}

private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetValue, String topicName) throws IOException {
// Properties used for testing delta-streamer with JsonKafka source
TypedProperties props = new TypedProperties();
populateCommonProps(props);
props.setProperty("include", "base.properties");
props.setProperty("hoodie.embed.timeline.server","false");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT);
props.setProperty("hoodie.deltastreamer.source.kafka.topic",topicName);
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source_uber.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target_uber.avsc");
props.setProperty("hoodie.deltastreamer.source.kafka.auto.reset.offsets", autoResetValue);

UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);
}

/**
* Tests Deltastreamer with parquet dfs source and transitions to JsonKafkaSource.
* @param autoResetToLatest true if auto reset value to be set to LATEST. false to leave it as default(i.e. EARLIEST)
* @throws Exception
*/
private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean autoResetToLatest) throws Exception {
// prep parquet source
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum;
int parquetRecords = 10;
prepareParquetDFSFiles(parquetRecords,"1.parquet", true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);

prepareParquetDFSSource(true, false,"source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT);
// delta streamer w/ parquest source
String tableBasePath = dfsBasePath + "/test_dfs_to_kakfa" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false,
false, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(parquetRecords, tableBasePath + "/*/*.parquet", sqlContext);
deltaStreamer.shutdownGracefully();

// prep json kafka source
topicName = "topic" + testNum;
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, autoResetToLatest ? "latest" : "earliest", topicName);
// delta streamer w/ json kafka source
deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync();
// if auto reset value is set to LATEST, this all kafka records so far may not be synced.
int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : JSON_KAFKA_NUM_RECORDS);
TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext);

// verify 2nd batch to test LATEST auto reset value.
prepareJsonKafkaDFSFiles(20, false, topicName);
totalExpectedRecords += 20;
deltaStreamer.sync();
TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext);
testNum++;
}

@Test
public void testJsonKafkaDFSSource() throws Exception {
topicName = "topic" + testNum;
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest",topicName);
String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null, "timestamp"), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);

int totalRecords = JSON_KAFKA_NUM_RECORDS;
int records = 10;
totalRecords += records;
prepareJsonKafkaDFSFiles(records, false, topicName);
deltaStreamer.sync();
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
}

@Test
public void testParquetSourceToKafkaSourceEarliestAutoResetValue() throws Exception {
testDeltaStreamerTransitionFromParquetToKafkaSource(false);
}

@Test
public void testParquetSourceToKafkaSourceLatestAutoResetValue() throws Exception {
testDeltaStreamerTransitionFromParquetToKafkaSource(true);
}

@Test
public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception {
testParquetDFSSource(false, null);
Expand Down
Loading