diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 8690ff1cfb132..1f2a30c26656a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -52,6 +52,7 @@ import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor; import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; import org.apache.hudi.utilities.sources.Source; +import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor; import org.apache.hudi.utilities.transform.ChainedTransformer; import org.apache.hudi.utilities.transform.Transformer; @@ -122,6 +123,15 @@ public static Source createSource(String sourceClass, TypedProperties cfg, JavaS } } + public static JsonKafkaSourcePostProcessor createJsonKafkaSourcePostProcessor(String postProcessorClassName, TypedProperties props) throws IOException { + try { + return StringUtils.isNullOrEmpty(postProcessorClassName) ? null + : (JsonKafkaSourcePostProcessor) ReflectionUtils.loadClass(postProcessorClassName, props); + } catch (Throwable e) { + throw new IOException("Could not load json kafka source post processor class " + postProcessorClassName, e); + } + } + public static SchemaProvider createSchemaProvider(String schemaProviderClass, TypedProperties cfg, JavaSparkContext jssc) throws IOException { try { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSourcePostProcessException.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSourcePostProcessException.java new file mode 100644 index 0000000000000..123d419e2dfda --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSourcePostProcessException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.exception; + +import org.apache.hudi.exception.HoodieException; + +/** + * Exception throws during kafka source post process. + */ +public class HoodieSourcePostProcessException extends HoodieException { + + public HoodieSourcePostProcessException(String msg) { + super(msg); + } + + public HoodieSourcePostProcessException(String message, Throwable t) { + super(message, t); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index d6152a177f7fd..0b06d986bbf1a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -21,11 +21,14 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; +import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException; import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; +import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.log4j.LogManager; @@ -37,6 +40,8 @@ import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.OffsetRange; +import java.io.IOException; + /** * Read json kafka data. */ @@ -74,12 +79,30 @@ protected InputBatch> fetchNewData(Option lastCheckpoint } private JavaRDD toRDD(OffsetRange[] offsetRanges) { - return KafkaUtils.createRDD(sparkContext, + JavaRDD jsonStringRDD = KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()) - .filter(x -> !StringUtils.isNullOrEmpty((String)x.value())) + .filter(x -> !StringUtils.isNullOrEmpty((String) x.value())) .map(x -> x.value().toString()); + return postProcess(jsonStringRDD); + } + + private JavaRDD postProcess(JavaRDD jsonStringRDD) { + String postProcessorClassName = this.props.getString(KafkaOffsetGen.Config.JSON_KAFKA_PROCESSOR_CLASS_OPT.key(), null); + // no processor, do nothing + if (StringUtils.isNullOrEmpty(postProcessorClassName)) { + return jsonStringRDD; + } + + JsonKafkaSourcePostProcessor processor; + try { + processor = UtilHelpers.createJsonKafkaSourcePostProcessor(postProcessorClassName, this.props); + } catch (IOException e) { + throw new HoodieSourcePostProcessException("Could not init " + postProcessorClassName, e); + } + + return processor.process(jsonStringRDD); } @Override diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index edf0d85bb32a0..564c5e2058453 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -189,6 +189,12 @@ public static class Config { .defaultValue(KafkaResetOffsetStrategies.LATEST) .withDocumentation("Kafka consumer strategy for reading data."); + public static final ConfigProperty JSON_KAFKA_PROCESSOR_CLASS_OPT = ConfigProperty + .key("hoodie.deltastreamer.source.json.kafka.processor.class") + .noDefaultValue() + .withDocumentation("Json kafka source post processor class name, post process data after consuming from" + + "source and before giving it to deltastreamer."); + public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp"; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/JsonKafkaSourcePostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/JsonKafkaSourcePostProcessor.java new file mode 100644 index 0000000000000..7756dc5781481 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/JsonKafkaSourcePostProcessor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.processor; + +import org.apache.hudi.common.config.TypedProperties; + +import org.apache.spark.api.java.JavaRDD; + +import scala.Serializable; + +/** + * Base class for Json kafka source post processor. User can define their own processor that extends this class to do + * some post process on the incoming json string records before the records are converted to DataSet. + */ +public abstract class JsonKafkaSourcePostProcessor implements Serializable { + + protected TypedProperties props; + + public JsonKafkaSourcePostProcessor(TypedProperties props) { + this.props = props; + } + + public abstract JavaRDD process(JavaRDD inputJsonRecords); +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java index ff0a19273f376..87f1774e02d2e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java @@ -62,12 +62,12 @@ */ public class TestJsonKafkaSource extends SparkClientFunctionalTestHarness { - private static final String TEST_TOPIC_PREFIX = "hoodie_test_"; + protected static final String TEST_TOPIC_PREFIX = "hoodie_test_"; private static final URL SCHEMA_FILE_URL = TestJsonKafkaSource.class.getClassLoader().getResource("delta-streamer-config/source.avsc"); - private static KafkaTestUtils testUtils; + protected static KafkaTestUtils testUtils; - private final HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class); - private FilebasedSchemaProvider schemaProvider; + protected final HoodieDeltaStreamerMetrics metrics = mock(HoodieDeltaStreamerMetrics.class); + protected FilebasedSchemaProvider schemaProvider; @BeforeAll public static void initClass() throws Exception { @@ -88,7 +88,7 @@ public void init() throws Exception { schemaProvider = new FilebasedSchemaProvider(props, jsc()); } - private TypedProperties createPropsForJsonSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) { + protected TypedProperties createPropsForJsonSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) { TypedProperties props = new TypedProperties(); props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic); props.setProperty("bootstrap.servers", testUtils.brokerAddress()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java new file mode 100644 index 0000000000000..b53564df3da24 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; +import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException; +import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor; + +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.JSON_KAFKA_PROCESSOR_CLASS_OPT; +import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +public class TestJsonKafkaSourcePostProcessor extends TestJsonKafkaSource { + + @Test + public void testNoPostProcessor() { + // topic setup. + final String topic = TEST_TOPIC_PREFIX + "testNoPostProcessor"; + testUtils.createTopic(topic, 2); + + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + TypedProperties props = createPropsForJsonSource(topic, null, "earliest"); + + Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); + SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); + + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900); + + assertEquals(900, fetch1.getBatch().get().count()); + } + + @Test + public void testSampleJsonKafkaSourcePostProcessor() { + // topic setup. + final String topic = TEST_TOPIC_PREFIX + "testSampleJsonKafkaSourcePostProcessor"; + testUtils.createTopic(topic, 2); + + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + TypedProperties props = createPropsForJsonSource(topic, null, "earliest"); + + // processor class name setup + props.setProperty(JSON_KAFKA_PROCESSOR_CLASS_OPT.key(), SampleJsonKafkaSourcePostProcessor.class.getName()); + + Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); + SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); + + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + InputBatch> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900); + + assertNotEquals(900, fetch1.getBatch().get().count()); + } + + @Test + public void testInvalidJsonKafkaSourcePostProcessor() { + // topic setup. + final String topic = TEST_TOPIC_PREFIX + "testInvalidJsonKafkaSourcePostProcessor"; + testUtils.createTopic(topic, 2); + + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + TypedProperties props = createPropsForJsonSource(topic, null, "earliest"); + + // processor class name setup + props.setProperty(JSON_KAFKA_PROCESSOR_CLASS_OPT.key(), "InvalidJsonKafkaSourcePostProcessor"); + + Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); + SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); + testUtils.sendMessages(topic, jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + + Assertions.assertThrows(HoodieSourcePostProcessException.class, + () -> kafkaSource.fetchNewDataInAvroFormat(Option.empty(), 900)); + } + + /** + * JsonKafkaSourcePostProcessor that return a sub RDD of the incoming data which get the data from incoming data using + * {org.apache.spark.api.java.JavaRDD#sample(boolean, double, long)} method. + */ + public static class SampleJsonKafkaSourcePostProcessor extends JsonKafkaSourcePostProcessor { + + public SampleJsonKafkaSourcePostProcessor(TypedProperties props) { + super(props); + } + + @Override + public JavaRDD process(JavaRDD inputJsonRecords) { + return inputJsonRecords.sample(false, 0.5); + } + } + +}