diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 3e6f7e4b464b9..ebe7f3b952992 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -25,6 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.{HiveSyncTool, SlashEncodedDayPartitionValueExtractor} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator} + import org.apache.log4j.LogManager import org.apache.spark.sql.execution.datasources.{DataSourceUtils => SparkDataSourceUtils} @@ -462,15 +463,11 @@ object DataSourceWriteOptions { .defaultValue("true") .withDocumentation("") - val KAFKA_AVRO_VALUE_DESERIALIZER: ConfigProperty[String] = ConfigProperty + val KAFKA_AVRO_VALUE_DESERIALIZER_CLASS: ConfigProperty[String] = ConfigProperty .key("hoodie.deltastreamer.source.kafka.value.deserializer.class") - .noDefaultValue() - .withDocumentation("") - - val KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA: ConfigProperty[String] = ConfigProperty - .key("hoodie.deltastreamer.source.kafka.value.deserializer.schema") - .noDefaultValue() - .withDocumentation("") + .defaultValue("io.confluent.kafka.serializers.KafkaAvroDeserializer") + .sinceVersion("0.9.0") + .withDocumentation("This class is used by kafka client to deserialize the records") } object DataSourceOptionsHelper { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java index 461837e98519e..32c8e173a21fb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java @@ -18,14 +18,14 @@ package org.apache.hudi.utilities.deser; -import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.exception.HoodieException; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; import org.apache.avro.Schema; +import org.apache.hudi.utilities.sources.AvroKafkaSource; import org.apache.kafka.common.errors.SerializationException; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Map; import java.util.Map.Entry; @@ -48,7 +48,7 @@ public void configure(Map configs, boolean isKey) { super.configure(configs, isKey); try { TypedProperties props = getConvertToTypedProperties(configs); - sourceSchema = new Schema.Parser().parse(props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key())); + sourceSchema = new Schema.Parser().parse(props.getString(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA)); } catch (Throwable e) { throw new HoodieException(e); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index c34f8748b6943..4cea13d497725 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -24,11 +24,11 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; +import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.log4j.LogManager; @@ -40,8 +40,8 @@ import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.OffsetRange; -import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET; import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET; +import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET; /** * Reads avro serialized Kafka data, based on the confluent schema-registry. @@ -52,6 +52,10 @@ public class AvroKafkaSource extends AvroSource { // these are native kafka's config. do not change the config names. private static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer"; private static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = "value.deserializer"; + // These are settings used to pass things to KafkaAvroDeserializer + public static final String KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = "hoodie.deltastreamer.source.kafka.value.deserializer."; + public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX + "schema"; + private final KafkaOffsetGen offsetGen; private final HoodieDeltaStreamerMetrics metrics; @@ -60,22 +64,21 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spa super(props, sparkContext, sparkSession, schemaProvider); props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class); - String deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER().key(), ""); + String deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(), + DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue()); - if (deserializerClassName.isEmpty()) { - props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, KafkaAvroDeserializer.class); - } else { - try { - props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName)); + try { + props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName)); + if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) { if (schemaProvider == null) { - throw new HoodieIOException("SchemaProvider has to be set to use custom Deserializer"); + throw new HoodieIOException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer"); } - props.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key(), schemaProvider.getSourceSchema().toString()); - } catch (ClassNotFoundException e) { - String error = "Could not load custom avro kafka deserializer: " + deserializerClassName; - LOG.error(error); - throw new HoodieException(error, e); + props.put(KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA, schemaProvider.getSourceSchema().toString()); } + } catch (ClassNotFoundException e) { + String error = "Could not load custom avro kafka deserializer: " + deserializerClassName; + LOG.error(error); + throw new HoodieException(error, e); } this.metrics = metrics; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 99e960cf55fa5..4378cb195d7ab 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; +import org.apache.hudi.utilities.sources.AvroKafkaSource; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -310,7 +311,9 @@ private Map excludeHoodieConfigs(TypedProperties props) { props.keySet().stream().filter(prop -> { // In order to prevent printing unnecessary warn logs, here filter out the hoodie // configuration items before passing to kafkaParams - return !prop.toString().startsWith("hoodie."); + return !prop.toString().startsWith("hoodie.") + // We need to pass some properties to kafka client so that KafkaAvroSchemaDeserializer can use it + || prop.toString().startsWith(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX); }).forEach(prop -> { kafkaParams.put(prop.toString(), props.get(prop.toString())); }); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java index 2079c75b484c3..da671c6761fcd 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java @@ -18,8 +18,8 @@ package org.apache.hudi.utilities.deser; -import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.sources.AvroKafkaSource; import org.apache.hudi.utilities.sources.helpers.SchemaTestProvider; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; @@ -100,7 +100,7 @@ private IndexedRecord createExtendUserRecord() { public void testKafkaAvroSchemaDeserializer() { byte[] bytesOrigRecord; IndexedRecord avroRecord = createUserRecord(); - config.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key(), origSchema.toString()); + config.put(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA, origSchema.toString()); KafkaAvroSchemaDeserializer avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config)); avroDeserializer.configure(new HashMap(config), false); @@ -112,7 +112,7 @@ public void testKafkaAvroSchemaDeserializer() { byte[] bytesExtendedRecord = avroSerializer.serialize(topic, avroRecordWithAllField); SchemaTestProvider.schemaToReturn.set(evolSchema); - config.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key(), evolSchema.toString()); + config.put(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA, evolSchema.toString()); avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config)); avroDeserializer.configure(new HashMap(config), false); // record is serialized w/ evolved schema, and deserialized w/ evolved schema