Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.{HiveSyncTool, SlashEncodedDayPartitionValueExtractor}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator}

import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.{DataSourceUtils => SparkDataSourceUtils}

Expand Down Expand Up @@ -462,15 +463,11 @@ object DataSourceWriteOptions {
.defaultValue("true")
.withDocumentation("")

val KAFKA_AVRO_VALUE_DESERIALIZER: ConfigProperty[String] = ConfigProperty
val KAFKA_AVRO_VALUE_DESERIALIZER_CLASS: ConfigProperty[String] = ConfigProperty
.key("hoodie.deltastreamer.source.kafka.value.deserializer.class")
.noDefaultValue()
.withDocumentation("")

val KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA: ConfigProperty[String] = ConfigProperty
.key("hoodie.deltastreamer.source.kafka.value.deserializer.schema")
.noDefaultValue()
.withDocumentation("")
.defaultValue("io.confluent.kafka.serializers.KafkaAvroDeserializer")
.sinceVersion("0.9.0")
.withDocumentation("This class is used by kafka client to deserialize the records")
}

object DataSourceOptionsHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

package org.apache.hudi.utilities.deser;

import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.Schema;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.kafka.common.errors.SerializationException;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;

import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -48,7 +48,7 @@ public void configure(Map<String, ?> configs, boolean isKey) {
super.configure(configs, isKey);
try {
TypedProperties props = getConvertToTypedProperties(configs);
sourceSchema = new Schema.Parser().parse(props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key()));
sourceSchema = new Schema.Parser().parse(props.getString(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA));
} catch (Throwable e) {
throw new HoodieException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
Expand All @@ -40,8 +40,8 @@
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;

import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET;
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET;
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.DEFAULT_ENABLE_KAFKA_COMMIT_OFFSET;

/**
* Reads avro serialized Kafka data, based on the confluent schema-registry.
Expand All @@ -52,6 +52,10 @@ public class AvroKafkaSource extends AvroSource {
// these are native kafka's config. do not change the config names.
private static final String NATIVE_KAFKA_KEY_DESERIALIZER_PROP = "key.deserializer";
private static final String NATIVE_KAFKA_VALUE_DESERIALIZER_PROP = "value.deserializer";
// These are settings used to pass things to KafkaAvroDeserializer
public static final String KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX = "hoodie.deltastreamer.source.kafka.value.deserializer.";
public static final String KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA = KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX + "schema";

private final KafkaOffsetGen offsetGen;
private final HoodieDeltaStreamerMetrics metrics;

Expand All @@ -60,22 +64,21 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spa
super(props, sparkContext, sparkSession, schemaProvider);

props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
String deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER().key(), "");
String deserializerClassName = props.getString(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().key(),
DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS().defaultValue());

if (deserializerClassName.isEmpty()) {
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, KafkaAvroDeserializer.class);
} else {
try {
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName));
try {
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName));
if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
if (schemaProvider == null) {
throw new HoodieIOException("SchemaProvider has to be set to use custom Deserializer");
throw new HoodieIOException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer");
}
props.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key(), schemaProvider.getSourceSchema().toString());
} catch (ClassNotFoundException e) {
String error = "Could not load custom avro kafka deserializer: " + deserializerClassName;
LOG.error(error);
throw new HoodieException(error, e);
props.put(KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA, schemaProvider.getSourceSchema().toString());
}
} catch (ClassNotFoundException e) {
String error = "Could not load custom avro kafka deserializer: " + deserializerClassName;
LOG.error(error);
throw new HoodieException(error, e);
}

this.metrics = metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.exception.HoodieNotSupportedException;

import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -310,7 +311,9 @@ private Map<String, Object> excludeHoodieConfigs(TypedProperties props) {
props.keySet().stream().filter(prop -> {
// In order to prevent printing unnecessary warn logs, here filter out the hoodie
// configuration items before passing to kafkaParams
return !prop.toString().startsWith("hoodie.");
return !prop.toString().startsWith("hoodie.")
// We need to pass some properties to kafka client so that KafkaAvroSchemaDeserializer can use it
|| prop.toString().startsWith(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_PROPERTY_PREFIX);
}).forEach(prop -> {
kafkaParams.put(prop.toString(), props.get(prop.toString()));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.hudi.utilities.deser;

import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.sources.AvroKafkaSource;
import org.apache.hudi.utilities.sources.helpers.SchemaTestProvider;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;

Expand Down Expand Up @@ -100,7 +100,7 @@ private IndexedRecord createExtendUserRecord() {
public void testKafkaAvroSchemaDeserializer() {
byte[] bytesOrigRecord;
IndexedRecord avroRecord = createUserRecord();
config.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key(), origSchema.toString());
config.put(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA, origSchema.toString());

KafkaAvroSchemaDeserializer avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config));
avroDeserializer.configure(new HashMap(config), false);
Expand All @@ -112,7 +112,7 @@ public void testKafkaAvroSchemaDeserializer() {
byte[] bytesExtendedRecord = avroSerializer.serialize(topic, avroRecordWithAllField);

SchemaTestProvider.schemaToReturn.set(evolSchema);
config.put(DataSourceWriteOptions.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA().key(), evolSchema.toString());
config.put(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA, evolSchema.toString());
avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config));
avroDeserializer.configure(new HashMap(config), false);
// record is serialized w/ evolved schema, and deserialized w/ evolved schema
Expand Down