diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java index 9e6204ec9a6b3..8df72fa01f211 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java @@ -80,6 +80,16 @@ public static SchemaProvider createSchemaProvider(String schemaProviderClass, } } + public static SchemaProvider createSchemaProvider(String schemaProviderClass, + TypedProperties cfg) throws IOException { + try { + return schemaProviderClass == null ? null : + (SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg); + } catch (Throwable e) { + throw new IOException("Could not load schema provider class " + schemaProviderClass, e); + } + } + public static Transformer createTransformer(String transformerClass) throws IOException { try { return transformerClass == null ? null : (Transformer) ReflectionUtils.loadClass(transformerClass); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaRegistryProvider.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaRegistryProvider.java index 68daf7995c3ae..4f58d316f7145 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaRegistryProvider.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/SchemaRegistryProvider.java @@ -57,6 +57,10 @@ private static String fetchSchemaFromRegistry(String registryUrl) throws IOExcep return node.get("schema").asText(); } + public SchemaRegistryProvider(TypedProperties props) { + this(props, null); + } + public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP)); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroKafkaSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroKafkaSource.java index baeaaa8d3c2cd..3e51e5708cb92 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroKafkaSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/AvroKafkaSource.java @@ -67,7 +67,7 @@ protected InputBatch> fetchNewData(Optional lastC private JavaRDD toRDD(OffsetRange[] offsetRanges) { JavaRDD recordRDD = KafkaUtils - .createRDD(sparkContext, String.class, Object.class, StringDecoder.class, KafkaAvroDecoder.class, + .createRDD(sparkContext, String.class, Object.class, StringDecoder.class, SourceSchemaKafkaAvroDecoder.class, offsetGen.getKafkaParams(), offsetRanges).values().map(obj -> (GenericRecord) obj); return recordRDD; } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/SourceSchemaKafkaAvroDecoder.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/SourceSchemaKafkaAvroDecoder.java new file mode 100644 index 0000000000000..ed5d7caf9d5e5 --- /dev/null +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/SourceSchemaKafkaAvroDecoder.java @@ -0,0 +1,66 @@ +package com.uber.hoodie.utilities.sources; + +import com.uber.hoodie.common.util.TypedProperties; +import com.uber.hoodie.utilities.UtilHelpers; +import com.uber.hoodie.utilities.schema.SchemaProvider; +import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import java.io.IOException; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Properties; +import kafka.serializer.Decoder; +import kafka.utils.VerifiableProperties; +import org.apache.avro.Schema; +import org.apache.kafka.common.errors.SerializationException; + +/** A Kafka decoder that uses the source schema for read. */ +public class SourceSchemaKafkaAvroDecoder extends AbstractKafkaAvroDeserializer + implements Decoder { + + private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class"; + + private final Schema sourceSchema; + + public SourceSchemaKafkaAvroDecoder(VerifiableProperties props) { + this.configure(new KafkaAvroDeserializerConfig(props.props())); + + TypedProperties typedProperties = new TypedProperties(); + copyProperties(typedProperties, props.props()); + + try { + SchemaProvider schemaProvider = + UtilHelpers.createSchemaProvider( + props.getString(SCHEMA_PROVIDER_CLASS_PROP), typedProperties); + sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object fromBytes(byte[] bytes) { + return deserialize(bytes); + } + + @Override + protected Object deserialize( + boolean includeSchemaAndVersion, + String topic, + Boolean isKey, + byte[] payload, + Schema readerSchema) + throws SerializationException { + if (readerSchema != null) { + return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, readerSchema); + } + + return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, sourceSchema); + } + + private static void copyProperties(TypedProperties typedProperties, Properties properties) { + for (Entry entry : properties.entrySet()) { + typedProperties.put(entry.getKey(), entry.getValue()); + } + } +}