diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java index 2ed4c42582c3c..ff0a19273f376 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java @@ -23,11 +23,11 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config; -import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -39,17 +39,19 @@ import org.apache.spark.sql.Row; import org.apache.spark.streaming.kafka010.KafkaTestUtils; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET; +import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -58,41 +60,37 @@ /** * Tests against {@link JsonKafkaSource}. */ -public class TestJsonKafkaSource extends UtilitiesTestBase { +public class TestJsonKafkaSource extends SparkClientFunctionalTestHarness { - private static String TEST_TOPIC_NAME = "hoodie_test"; + private static final String TEST_TOPIC_PREFIX = "hoodie_test_"; + private static final URL SCHEMA_FILE_URL = TestJsonKafkaSource.class.getClassLoader().getResource("delta-streamer-config/source.avsc"); + private static KafkaTestUtils testUtils; + private final HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class); private FilebasedSchemaProvider schemaProvider; - private KafkaTestUtils testUtils; - private HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class); @BeforeAll public static void initClass() throws Exception { - UtilitiesTestBase.initClass(false); + testUtils = new KafkaTestUtils(); + testUtils.setup(); } @AfterAll public static void cleanupClass() { - UtilitiesTestBase.cleanupClass(); + testUtils.teardown(); } @BeforeEach - public void setup() throws Exception { - super.setup(); - schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc); - testUtils = new KafkaTestUtils(); - testUtils.setup(); - } - - @AfterEach - public void teardown() throws Exception { - super.teardown(); - testUtils.teardown(); + public void init() throws Exception { + String schemaFilePath = Objects.requireNonNull(SCHEMA_FILE_URL).toURI().getPath(); + TypedProperties props = new TypedProperties(); + props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", schemaFilePath); + schemaProvider = new FilebasedSchemaProvider(props, jsc()); } - private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource, String resetStrategy) { + private TypedProperties createPropsForJsonSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) { TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); + props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); props.setProperty("auto.offset.reset", resetStrategy); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); @@ -107,16 +105,17 @@ private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSo public void testJsonKafkaSource() { // topic setup. - testUtils.createTopic(TEST_TOPIC_NAME, 2); + final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSource"; + testUtils.createTopic(topic, 2); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - TypedProperties props = createPropsForJsonSource(null, "earliest"); + TypedProperties props = createPropsForJsonSource(topic, null, "earliest"); - Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); + Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); // 1. Extract without any checkpoint => get all the data, respecting sourceLimit assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000))); InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900); assertEquals(900, fetch1.getBatch().get().count()); // Test Avro To DataFrame path @@ -125,7 +124,7 @@ public void testJsonKafkaSource() { assertEquals(900, fetch1AsRows.count()); // 2. Produce new data, extract new data - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000))); + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 1000))); InputBatch> fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); assertEquals(1100, fetch2.getBatch().get().count()); @@ -155,19 +154,20 @@ public void testJsonKafkaSource() { @Test public void testJsonKafkaSourceFilterNullMsg() { // topic setup. - testUtils.createTopic(TEST_TOPIC_NAME, 2); + final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceFilterNullMsg"; + testUtils.createTopic(topic, 2); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - TypedProperties props = createPropsForJsonSource(null, "earliest"); + TypedProperties props = createPropsForJsonSource(topic, null, "earliest"); - Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); + Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); // 1. Extract without any checkpoint => get all the data, respecting sourceLimit assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); // Send 1000 non-null messages to Kafka - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000))); // Send 100 null messages to Kafka - testUtils.sendMessages(TEST_TOPIC_NAME,new String[100]); + testUtils.sendMessages(topic, new String[100]); InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); // Verify that messages with null values are filtered assertEquals(1000, fetch1.getBatch().get().count()); @@ -177,15 +177,16 @@ public void testJsonKafkaSourceFilterNullMsg() { @Test public void testJsonKafkaSourceResetStrategy() { // topic setup. - testUtils.createTopic(TEST_TOPIC_NAME, 2); + final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceResetStrategy"; + testUtils.createTopic(topic, 2); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - TypedProperties earliestProps = createPropsForJsonSource(null, "earliest"); - Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc, sparkSession, schemaProvider, metrics); + TypedProperties earliestProps = createPropsForJsonSource(topic, null, "earliest"); + Source earliestJsonSource = new JsonKafkaSource(earliestProps, jsc(), spark(), schemaProvider, metrics); SourceFormatAdapter earliestKafkaSource = new SourceFormatAdapter(earliestJsonSource); - TypedProperties latestProps = createPropsForJsonSource(null, "latest"); - Source latestJsonSource = new JsonKafkaSource(latestProps, jsc, sparkSession, schemaProvider, metrics); + TypedProperties latestProps = createPropsForJsonSource(topic, null, "latest"); + Source latestJsonSource = new JsonKafkaSource(latestProps, jsc(), spark(), schemaProvider, metrics); SourceFormatAdapter latestKafkaSource = new SourceFormatAdapter(latestJsonSource); // 1. Extract with a none data kafka checkpoint @@ -195,7 +196,7 @@ public void testJsonKafkaSourceResetStrategy() { assertEquals(earFetch0.getBatch(), latFetch0.getBatch()); assertEquals(earFetch0.getCheckpointForNextBatch(), latFetch0.getCheckpointForNextBatch()); - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000))); // 2. Extract new checkpoint with a null / empty string pre checkpoint // => earliest fetch with max source limit will get all of data and a end offset checkpoint @@ -209,23 +210,24 @@ public void testJsonKafkaSourceResetStrategy() { @Test public void testJsonKafkaSourceWithDefaultUpperCap() { // topic setup. - testUtils.createTopic(TEST_TOPIC_NAME, 2); + final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceWithDefaultUpperCap"; + testUtils.createTopic(topic, 2); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest"); + TypedProperties props = createPropsForJsonSource(topic, Long.MAX_VALUE, "earliest"); - Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); + Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); /* 1. Extract without any checkpoint => get all the data, respecting default upper cap since both sourceLimit and maxEventsFromKafkaSourceProp are set to Long.MAX_VALUE */ - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000))); InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); assertEquals(1000, fetch1.getBatch().get().count()); // 2. Produce new data, extract new data based on sourceLimit - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000))); + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 1000))); InputBatch> fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 1500); assertEquals(1000, fetch2.getBatch().get().count()); @@ -234,11 +236,12 @@ public void testJsonKafkaSourceWithDefaultUpperCap() { @Test public void testJsonKafkaSourceInsertRecordsLessSourceLimit() { // topic setup. - testUtils.createTopic(TEST_TOPIC_NAME, 2); + final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceInsertRecordsLessSourceLimit"; + testUtils.createTopic(topic, 2); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - TypedProperties props = createPropsForJsonSource(Long.MAX_VALUE, "earliest"); + TypedProperties props = createPropsForJsonSource(topic, Long.MAX_VALUE, "earliest"); - Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); + Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500"); @@ -246,7 +249,7 @@ public void testJsonKafkaSourceInsertRecordsLessSourceLimit() { 1. maxEventsFromKafkaSourceProp set to more than generated insert records and sourceLimit less than the generated insert records num. */ - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 400))); + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 400))); InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 300); assertEquals(300, fetch1.getBatch().get().count()); @@ -254,7 +257,7 @@ public void testJsonKafkaSourceInsertRecordsLessSourceLimit() { 2. Produce new data, extract new data based on sourceLimit and sourceLimit less than the generated insert records num. */ - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 600))); + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 600))); InputBatch> fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), 300); assertEquals(300, fetch2.getBatch().get().count()); @@ -263,20 +266,21 @@ public void testJsonKafkaSourceInsertRecordsLessSourceLimit() { @Test public void testJsonKafkaSourceWithConfigurableUpperCap() { // topic setup. - testUtils.createTopic(TEST_TOPIC_NAME, 2); + final String topic = TEST_TOPIC_PREFIX + "testJsonKafkaSourceWithConfigurableUpperCap"; + testUtils.createTopic(topic, 2); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - TypedProperties props = createPropsForJsonSource(500L, "earliest"); + TypedProperties props = createPropsForJsonSource(topic, 500L, "earliest"); - Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); + Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); // 1. Extract without any checkpoint => get all the data, respecting sourceLimit - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000))); InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900); assertEquals(900, fetch1.getBatch().get().count()); // 2. Produce new data, extract new data based on upper cap - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 1000))); + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 1000))); InputBatch> fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); assertEquals(500, fetch2.getBatch().get().count()); @@ -306,22 +310,23 @@ public void testJsonKafkaSourceWithConfigurableUpperCap() { @Test public void testCommitOffsetToKafka() { // topic setup. - testUtils.createTopic(TEST_TOPIC_NAME, 2); + final String topic = TEST_TOPIC_PREFIX + "testCommitOffsetToKafka"; + testUtils.createTopic(topic, 2); List topicPartitions = new ArrayList<>(); - TopicPartition topicPartition0 = new TopicPartition(TEST_TOPIC_NAME, 0); + TopicPartition topicPartition0 = new TopicPartition(topic, 0); topicPartitions.add(topicPartition0); - TopicPartition topicPartition1 = new TopicPartition(TEST_TOPIC_NAME, 1); + TopicPartition topicPartition1 = new TopicPartition(topic, 1); topicPartitions.add(topicPartition1); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - TypedProperties props = createPropsForJsonSource(null, "earliest"); + TypedProperties props = createPropsForJsonSource(topic, null, "earliest"); props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true"); - Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics); + Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); // 1. Extract without any checkpoint => get all the data, respecting sourceLimit assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000))); InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 599); // commit to kafka after first batch @@ -340,7 +345,7 @@ public void testCommitOffsetToKafka() { assertEquals(500L, endOffsets.get(topicPartition0)); assertEquals(500L, endOffsets.get(topicPartition1)); - testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 500))); + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("001", 500))); InputBatch> fetch2 = kafkaSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);