Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public class KafkaOffsetGen {

private static volatile Logger log = LogManager.getLogger(KafkaOffsetGen.class);

private static long DEFAULT_MAX_EVENTS_TO_READ = 1000000; // 1M events max

public static class CheckpointUtils {

/**
Expand Down Expand Up @@ -170,10 +168,13 @@ enum KafkaResetOffsetStrategies {
/**
* Configs to be passed for this source. All standard Kafka consumer configs are also respected
*/
static class Config {
public static class Config {

private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic";
private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents";
private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LARGEST;
public static final long defaultMaxEventsFromKafkaSource = 5000000;
public static long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = defaultMaxEventsFromKafkaSource;
}

private final HashMap<String, String> kafkaParams;
Expand Down Expand Up @@ -229,7 +230,11 @@ public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long
new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get()));

// Come up with final set of OffsetRanges to read (account for new partitions, limit number of events)
long numEvents = Math.min(DEFAULT_MAX_EVENTS_TO_READ, sourceLimit);
long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,
Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE);
maxEventsToReadFromKafka = (maxEventsToReadFromKafka == Long.MAX_VALUE || maxEventsToReadFromKafka == Integer.MAX_VALUE)
? Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE : maxEventsToReadFromKafka;
long numEvents = sourceLimit == Long.MAX_VALUE ? maxEventsToReadFromKafka : sourceLimit;
OffsetRange[] offsetRanges = CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents);

return offsetRanges;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -78,18 +79,26 @@ public void teardown() throws Exception {
testUtils.teardown();
}

@Test
public void testJsonKafkaSource() throws IOException {

// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource) {
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
props.setProperty("metadata.broker.list", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", "smallest");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
String.valueOf(Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE));
return props;
}

@Test
public void testJsonKafkaSource() throws IOException {

// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(null);

Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
Expand Down Expand Up @@ -131,6 +140,78 @@ public void testJsonKafkaSource() throws IOException {
assertEquals(Option.empty(), fetch4AsRows.getBatch());
}

@Test
public void testJsonKafkaSourceWithDefaultUpperCap() throws IOException {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE);

Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 500;

/*
1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and
maxEventsFromKafkaSourceProp are set to Long.MAX_VALUE
*/
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(500, fetch1.getBatch().get().count());

// 2. Produce new data, extract new data based on sourceLimit
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500);
assertEquals(1500, fetch2.getBatch().get().count());

//reset the value back since it is a static variable
Config.DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = Config.defaultMaxEventsFromKafkaSource;
}

@Test
public void testJsonKafkaSourceWithConfigurableUpperCap() throws IOException {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(500L);

Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);

// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
assertEquals(900, fetch1.getBatch().get().count());

// 2. Produce new data, extract new data based on upper cap
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(500, fetch2.getBatch().get().count());

//fetch data respecting source limit where upper cap > sourceLimit
InputBatch<JavaRDD<GenericRecord>> fetch3 =
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), 400);
assertEquals(400, fetch3.getBatch().get().count());

//fetch data respecting source limit where upper cap < sourceLimit
InputBatch<JavaRDD<GenericRecord>> fetch4 =
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), 600);
assertEquals(600, fetch4.getBatch().get().count());

// 3. Extract with previous checkpoint => gives same data back (idempotent)
InputBatch<JavaRDD<GenericRecord>> fetch5 =
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(fetch2.getBatch().get().count(), fetch5.getBatch().get().count());
assertEquals(fetch2.getCheckpointForNextBatch(), fetch5.getCheckpointForNextBatch());

// 4. Extract with latest checkpoint => no new data returned
InputBatch<JavaRDD<GenericRecord>> fetch6 =
kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch4.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch6.getBatch());
}

private static HashMap<TopicAndPartition, LeaderOffset> makeOffsetMap(int[] partitions, long[] offsets) {
HashMap<TopicAndPartition, LeaderOffset> map = new HashMap<>();
for (int i = 0; i < partitions.length; i++) {
Expand Down