Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -39,17 +39,19 @@
import org.apache.spark.sql.Row;
import org.apache.spark.streaming.kafka010.KafkaTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -58,41 +60,37 @@
/**
* Tests against {@link JsonKafkaSource}.
*/
public class TestJsonKafkaSource extends UtilitiesTestBase {
public class TestJsonKafkaSource extends SparkClientFunctionalTestHarness {

private static String TEST_TOPIC_NAME = "hoodie_test";
private static final String TEST_TOPIC_PREFIX = "hoodie_test_";
private static final URL SCHEMA_FILE_URL = TestJsonKafkaSource.class.getClassLoader().getResource("delta-streamer-config/source.avsc");
private static KafkaTestUtils testUtils;

private final HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class);
private FilebasedSchemaProvider schemaProvider;
private KafkaTestUtils testUtils;
private HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class);

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initClass(false);
testUtils = new KafkaTestUtils();
testUtils.setup();
}

@AfterAll
public static void cleanupClass() {
UtilitiesTestBase.cleanupClass();
testUtils.teardown();
}

@BeforeEach
public void setup() throws Exception {
super.setup();
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
testUtils = new KafkaTestUtils();
testUtils.setup();
}

@AfterEach
public void teardown() throws Exception {
super.teardown();
testUtils.teardown();
public void init() throws Exception {
String schemaFilePath = Objects.requireNonNull(SCHEMA_FILE_URL).toURI().getPath();
TypedProperties props = new TypedProperties();
props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath);
schemaProvider = new FilebasedSchemaProvider(props, jsc());
}

private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource, String resetStrategy) {
private TypedProperties createPropsForJsonSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME);
props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic);
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", resetStrategy);
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Expand All @@ -107,16 +105,17 @@ private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSo
public void testJsonKafkaSource() {

// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSource";
testUtils.createTopic(topic, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(null, "earliest");
TypedProperties props = createPropsForJsonSource(topic, null, "earliest");

Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);

// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
assertEquals(900, fetch1.getBatch().get().count());
// Test Avro To DataFrame<Row> path
Expand All @@ -125,7 +124,7 @@ public void testJsonKafkaSource() {
assertEquals(900, fetch1AsRows.count());

// 2. Produce new data, extract new data
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(1100, fetch2.getBatch().get().count());
Expand Down Expand Up @@ -155,19 +154,20 @@ public void testJsonKafkaSource() {
@Test
public void testJsonKafkaSourceFilterNullMsg() {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceFilterNullMsg";
testUtils.createTopic(topic, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(null, "earliest");
TypedProperties props = createPropsForJsonSource(topic, null, "earliest");

Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);

// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
// Send 1000 non-null messages to Kafka
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
// Send 100 null messages to Kafka
testUtils.sendMessages(TEST_TOPIC_NAME,new String[100]);
testUtils.sendMessages(topic, new String[100]);
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
// Verify that messages with null values are filtered
assertEquals(1000, fetch1.getBatch().get().count());
Expand All @@ -177,15 +177,16 @@ public void testJsonKafkaSourceFilterNullMsg() {
@Test
public void testJsonKafkaSourceResetStrategy() {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceResetStrategy";
testUtils.createTopic(topic, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();

TypedProperties earliestProps = createPropsForJsonSource(null, "earliest");
Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc, sparkSession, schemaProvider, metrics);
TypedProperties earliestProps = createPropsForJsonSource(topic, null, "earliest");
Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter earliestKafkaSource = new SourceFormatAdapter(earliestJsonSource);

TypedProperties latestProps = createPropsForJsonSource(null, "latest");
Source latestJsonSource = new JsonKafkaSource(latestProps, jsc, sparkSession, schemaProvider, metrics);
TypedProperties latestProps = createPropsForJsonSource(topic, null, "latest");
Source latestJsonSource = new JsonKafkaSource(latestProps, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter latestKafkaSource = new SourceFormatAdapter(latestJsonSource);

// 1. Extract with a none data kafka checkpoint
Expand All @@ -195,7 +196,7 @@ public void testJsonKafkaSourceResetStrategy() {
assertEquals(earFetch0.getBatch(), latFetch0.getBatch());
assertEquals(earFetch0.getCheckpointForNextBatch(), latFetch0.getCheckpointForNextBatch());

testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));

// 2. Extract new checkpoint with a null / empty string pre checkpoint
// => earliest fetch with max source limit will get all of data and a end offset checkpoint
Expand All @@ -209,23 +210,24 @@ public void testJsonKafkaSourceResetStrategy() {
@Test
public void testJsonKafkaSourceWithDefaultUpperCap() {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceWithDefaultUpperCap";
testUtils.createTopic(topic, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest");
TypedProperties props = createPropsForJsonSource(topic, Long.MAX_VALUE, "earliest");

Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);

/*
1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and
maxEventsFromKafkaSourceProp are set to Long.MAX_VALUE
*/
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(1000, fetch1.getBatch().get().count());

// 2. Produce new data, extract new data based on sourceLimit
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500);
assertEquals(1000, fetch2.getBatch().get().count());
Expand All @@ -234,27 +236,28 @@ public void testJsonKafkaSourceWithDefaultUpperCap() {
@Test
public void testJsonKafkaSourceInsertRecordsLessSourceLimit() {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceInsertRecordsLessSourceLimit";
testUtils.createTopic(topic, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest");
TypedProperties props = createPropsForJsonSource(topic, Long.MAX_VALUE, "earliest");

Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500");

/*
1. maxEventsFromKafkaSourceProp set to more than generated insert records
and sourceLimit less than the generated insert records num.
*/
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 400)));
testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 400)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 300);
assertEquals(300, fetch1.getBatch().get().count());

/*
2. Produce new data, extract new data based on sourceLimit
and sourceLimit less than the generated insert records num.
*/
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 600)));
testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 600)));
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 300);
assertEquals(300, fetch2.getBatch().get().count());
Expand All @@ -263,20 +266,21 @@ public void testJsonKafkaSourceInsertRecordsLessSourceLimit() {
@Test
public void testJsonKafkaSourceWithConfigurableUpperCap() {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceWithConfigurableUpperCap";
testUtils.createTopic(topic, 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(500L, "earliest");
TypedProperties props = createPropsForJsonSource(topic, 500L, "earliest");

Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);

// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900);
assertEquals(900, fetch1.getBatch().get().count());

// 2. Produce new data, extract new data based on upper cap
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 1000)));
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(500, fetch2.getBatch().get().count());
Expand Down Expand Up @@ -306,22 +310,23 @@ public void testJsonKafkaSourceWithConfigurableUpperCap() {
@Test
public void testCommitOffsetToKafka() {
// topic setup.
testUtils.createTopic(TEST_TOPIC_NAME, 2);
final String topic = TEST_TOPIC_PREFIX + "testCommitOffsetToKafka";
testUtils.createTopic(topic, 2);
List<TopicPartition> topicPartitions = new ArrayList<>();
TopicPartition topicPartition0 = new TopicPartition(TEST_TOPIC_NAME, 0);
TopicPartition topicPartition0 = new TopicPartition(topic, 0);
topicPartitions.add(topicPartition0);
TopicPartition topicPartition1 = new TopicPartition(TEST_TOPIC_NAME, 1);
TopicPartition topicPartition1 = new TopicPartition(topic, 1);
topicPartitions.add(topicPartition1);

HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
TypedProperties props = createPropsForJsonSource(null, "earliest");
TypedProperties props = createPropsForJsonSource(topic, null, "earliest");
props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true");
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);

// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000)));

InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 599);
// commit to kafka after first batch
Expand All @@ -340,7 +345,7 @@ public void testCommitOffsetToKafka() {
assertEquals(500L, endOffsets.get(topicPartition0));
assertEquals(500L, endOffsets.get(topicPartition1));

testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 500)));
testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 500)));
InputBatch<Dataset<Row>> fetch2 =
kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);

Expand Down