Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ public static SchemaProvider createSchemaProvider(String schemaProviderClass,
}
}

public static SchemaProvider createSchemaProvider(String schemaProviderClass,
TypedProperties cfg) throws IOException {
try {
return schemaProviderClass == null ? null :
(SchemaProvider) ReflectionUtils.loadClass(schemaProviderClass, cfg);
} catch (Throwable e) {
throw new IOException("Could not load schema provider class " + schemaProviderClass, e);
}
}

public static Transformer createTransformer(String transformerClass) throws IOException {
try {
return transformerClass == null ? null : (Transformer) ReflectionUtils.loadClass(transformerClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ private static String fetchSchemaFromRegistry(String registryUrl) throws IOExcep
return node.get("schema").asText();
}

public SchemaRegistryProvider(TypedProperties props) {
this(props, null);
}

public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Optional<String> lastC

private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
JavaRDD<GenericRecord> recordRDD = KafkaUtils
.createRDD(sparkContext, String.class, Object.class, StringDecoder.class, KafkaAvroDecoder.class,
.createRDD(sparkContext, String.class, Object.class, StringDecoder.class, SourceSchemaKafkaAvroDecoder.class,
offsetGen.getKafkaParams(), offsetRanges).values().map(obj -> (GenericRecord) obj);
return recordRDD;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.uber.hoodie.utilities.sources;

import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.utilities.UtilHelpers;
import com.uber.hoodie.utilities.schema.SchemaProvider;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import kafka.serializer.Decoder;
import kafka.utils.VerifiableProperties;
import org.apache.avro.Schema;
import org.apache.kafka.common.errors.SerializationException;

/** A Kafka decoder that uses the source schema for read. */
public class SourceSchemaKafkaAvroDecoder extends AbstractKafkaAvroDeserializer
implements Decoder<Object> {

private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";

private final Schema sourceSchema;

public SourceSchemaKafkaAvroDecoder(VerifiableProperties props) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason why you are using VerifiableProperties here and not TypedProperties?

this.configure(new KafkaAvroDeserializerConfig(props.props()));

TypedProperties typedProperties = new TypedProperties();
copyProperties(typedProperties, props.props());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do away with this function?


try {
SchemaProvider schemaProvider =
UtilHelpers.createSchemaProvider(
props.getString(SCHEMA_PROVIDER_CLASS_PROP), typedProperties);
sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public Object fromBytes(byte[] bytes) {
return deserialize(bytes);
}

@Override
protected Object deserialize(
boolean includeSchemaAndVersion,
String topic,
Boolean isKey,
byte[] payload,
Schema readerSchema)
throws SerializationException {
if (readerSchema != null) {
return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, readerSchema);
}

return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, sourceSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@haiminh87 I was thinking if we could make this configurable in sense that have a boolean like readUsingLatestSchema with a default value of true and can be overridden via TypedProperties instance.

}

private static void copyProperties(TypedProperties typedProperties, Properties properties) {
for (Entry<Object, Object> entry : properties.entrySet()) {
typedProperties.put(entry.getKey(), entry.getValue());
}
}
}