Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.deser;

import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.Schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.kafka.common.errors.SerializationException;

import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;

/**
* Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization.
*/
public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer {

private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
private Schema sourceSchema;

public KafkaAvroSchemaDeserializer() {}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
super.configure(configs, isKey);
try {
TypedProperties props = getConvertToTypedProperties(configs);
String className = props.getString(SCHEMA_PROVIDER_CLASS_PROP);
SchemaProvider schemaProvider = (SchemaProvider) ReflectionUtils.loadClass(className, props);
sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
} catch (Throwable e) {
throw new HoodieException(e);
}
}

/**
* Pretty much copy-paste from the {@link AbstractKafkaAvroDeserializer} except line 87:
* DatumReader reader = new GenericDatumReader(schema, sourceSchema);
* <p>
* We need to inject reader schema during deserialization or later stages of the pipeline break.
*
* @param includeSchemaAndVersion
* @param topic
* @param isKey
* @param payload
* @param readerSchema
* @return
* @throws SerializationException
*/
@Override
protected Object deserialize(
boolean includeSchemaAndVersion,
String topic,
Boolean isKey,
byte[] payload,
Schema readerSchema)
throws SerializationException {
return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, sourceSchema);
}

private TypedProperties getConvertToTypedProperties(Map<String, ?> configs) {
TypedProperties typedProperties = new TypedProperties();
for (Entry<String, ?> entry : configs.entrySet()) {
typedProperties.put(entry.getKey(), entry.getValue());
}
return typedProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.hudi.utilities.sources.helpers.AvroKafkaSourceHelpers;
import org.apache.spark.api.java.JavaSparkContext;

import java.io.IOException;
Expand All @@ -42,46 +43,94 @@ public class SchemaRegistryProvider extends SchemaProvider {
* Configs supported.
*/
public static class Config {

private static final String SRC_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
private static final String TARGET_SCHEMA_REGISTRY_URL_PROP =
"hoodie.deltastreamer.schemaprovider.registry.targetUrl";
private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
private static final String CACHE_SCHEMAS = "hoodie.deltastreamer.schemaprovider.registry.cache_enabled";
}

private static String fetchSchemaFromRegistry(String registryUrl) throws IOException {
URL registry = new URL(registryUrl);
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(registry.openStream());
return node.get("schema").asText();
private Schema sourceSchema;
private Schema targetSchema;
private final boolean cacheDisabled;
private final boolean injectKafkaFieldSchema;
private final String registryUrl;
private final String targetRegistryUrl;
private final boolean noTargetSchema;

public SchemaRegistryProvider(TypedProperties props) {
this(props, null);
}

public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
this.cacheDisabled = !props.getBoolean(Config.CACHE_SCHEMAS, false);
this.injectKafkaFieldSchema = props.getBoolean(AvroKafkaSourceHelpers.INJECT_KAFKA_META_FIELDS, false);
this.registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
this.targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
this.noTargetSchema = targetRegistryUrl.equals("null");
}

private static Schema getSchema(String registryUrl) throws IOException {
return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
public String fetchSchemaFromRegistry(String registryUrl) throws IOException {
URL registry = new URL(registryUrl);
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(registry.openStream());
return node.get("schema").asText();
}

private Schema getSchema(String registryUrl) throws IOException {
Schema schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
if (injectKafkaFieldSchema) {
schema = AvroKafkaSourceHelpers.addKafkaMetadataFields(schema);
}
return schema;
}

@Override
public Schema getSourceSchema() {
String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
if (cacheDisabled) {
return getSourceSchemaFromRegistry();
}
if (sourceSchema == null) {
synchronized (this) {
if (sourceSchema == null) {
sourceSchema = getSourceSchemaFromRegistry();
}
}
}
return sourceSchema;
}

@Override
public Schema getTargetSchema() {
if (noTargetSchema) {
return null;
}
if (cacheDisabled) {
return getTargetSchemaFromRegistry();
}
if (targetSchema == null) {
synchronized (this) {
if (targetSchema == null) {
targetSchema = getTargetSchemaFromRegistry();
}
}
}
return targetSchema;
}

private Schema getSourceSchemaFromRegistry() {
try {
return getSchema(registryUrl);
} catch (IOException ioe) {
throw new HoodieIOException("Error reading source schema from registry :" + registryUrl, ioe);
}
}

@Override
public Schema getTargetSchema() {
String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
String targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
private Schema getTargetSchemaFromRegistry() {
try {
return getSchema(targetRegistryUrl);
} catch (IOException ioe) {
throw new HoodieIOException("Error reading target schema from registry :" + registryUrl, ioe);
throw new HoodieIOException("Error reading target schema from registry :" + targetRegistryUrl, ioe);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroKafkaSourceHelpers;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
Expand All @@ -42,18 +44,34 @@
*/
public class AvroKafkaSource extends AvroSource {

private static final String KAFKA_AVRO_VALUE_DESERIALIZER = "hoodie.deltastreamer.source.value.deserializer.class";
private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class);

private final KafkaOffsetGen offsetGen;

private final HoodieDeltaStreamerMetrics metrics;
private final boolean injectKafkaData;

public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) {
super(props, sparkContext, sparkSession, schemaProvider);
this.metrics = metrics;

props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", KafkaAvroDeserializer.class);
String deserializerClassName = props.getString(KAFKA_AVRO_VALUE_DESERIALIZER, "");

if (deserializerClassName.isEmpty()) {
props.put("value.deserializer", KafkaAvroSchemaDeserializer.class);
} else {
try {
props.put("value.deserializer", Class.forName(deserializerClassName));
} catch (ClassNotFoundException e) {
String error = "Could not load custom avro kafka deserializer: " + deserializerClassName;
LOG.error(error);
throw new HoodieException(error, e);
}
}

this.metrics = metrics;
this.injectKafkaData = props.getBoolean(AvroKafkaSourceHelpers.INJECT_KAFKA_META_FIELDS, false);

offsetGen = new KafkaOffsetGen(props);
}

Expand All @@ -70,6 +88,10 @@ protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastChe
}

private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
if (injectKafkaData) {
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
LocationStrategies.PreferConsistent()).map(AvroKafkaSourceHelpers::addKafkaFields);
}
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.sources.helpers;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class AvroKafkaSourceHelpers {

public static final String INJECT_KAFKA_META_FIELDS = "hoodie.deltastreamer.source.inject_kafka_fields";

public static final String KAFKA_PARTITION_META_FIELD = "_hudi_kafka_partition";
public static final String KAFKA_OFFSET_META_FIELD = "_hudi_kafka_offset";
public static final String KAFKA_TOPIC_META_FIELD = "_hudi_kafka_topic";
public static final String KAFKA_KEY_META_FIELD = "_hudi_kafka_key";
public static final String KAFKA_META_FIELDS_PATTERN = "<KAFKA_FIELDS>";

public static final String ALL_KAFKA_META_FIELDS = String.join(
",",
AvroKafkaSourceHelpers.KAFKA_PARTITION_META_FIELD,
AvroKafkaSourceHelpers.KAFKA_OFFSET_META_FIELD,
AvroKafkaSourceHelpers.KAFKA_TOPIC_META_FIELD,
AvroKafkaSourceHelpers.KAFKA_KEY_META_FIELD);

public static String transform(String sql) {
return sql.replaceAll(KAFKA_META_FIELDS_PATTERN, ALL_KAFKA_META_FIELDS);
}

public static GenericRecord addKafkaFields(ConsumerRecord<Object, Object> obj) {
GenericRecord record = (GenericRecord) obj.value();
record.put(AvroKafkaSourceHelpers.KAFKA_OFFSET_META_FIELD, obj.offset());
record.put(AvroKafkaSourceHelpers.KAFKA_PARTITION_META_FIELD, obj.partition());
record.put(AvroKafkaSourceHelpers.KAFKA_TOPIC_META_FIELD, obj.topic());
record.put(AvroKafkaSourceHelpers.KAFKA_KEY_META_FIELD, obj.key());
return record;
}

private static boolean isKafkaMetadataField(String fieldName) {
return AvroKafkaSourceHelpers.KAFKA_PARTITION_META_FIELD.equals(fieldName)
|| AvroKafkaSourceHelpers.KAFKA_OFFSET_META_FIELD.equals(fieldName)
|| AvroKafkaSourceHelpers.KAFKA_TOPIC_META_FIELD.equals(fieldName)
|| AvroKafkaSourceHelpers.KAFKA_KEY_META_FIELD.equals(fieldName);
}

/**
* Adds the Kafka metadata fields to the given schema, so later on the appropriate data can be injected.
*/
public static Schema addKafkaMetadataFields(Schema schema) {

final List<Schema.Field> parentFields = new ArrayList<>();
final List<Schema.Field> schemaFields = schema.getFields();

for (Schema.Field field : schemaFields) {
if (!isKafkaMetadataField(field.name())) {
Schema.Field newField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal());
for (Map.Entry<String, Object> prop : field.getObjectProps().entrySet()) {
newField.addProp(prop.getKey(), prop.getValue());
}
parentFields.add(newField);
}
}

final Schema.Field partitionField =
new Schema.Field(
AvroKafkaSourceHelpers.KAFKA_PARTITION_META_FIELD,
Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.INT))),
"",
Schema.NULL_VALUE);
final Schema.Field offsetField =
new Schema.Field(
AvroKafkaSourceHelpers.KAFKA_OFFSET_META_FIELD,
Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG))),
"",
Schema.NULL_VALUE);
final Schema.Field topicField =
new Schema.Field(
AvroKafkaSourceHelpers.KAFKA_TOPIC_META_FIELD,
Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))),
"",
Schema.NULL_VALUE);
final Schema.Field keyField =
new Schema.Field(
AvroKafkaSourceHelpers.KAFKA_KEY_META_FIELD,
Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))),
"",
Schema.NULL_VALUE);

parentFields.add(partitionField);
parentFields.add(offsetField);
parentFields.add(topicField);
parentFields.add(keyField);

final Schema mergedSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false);
mergedSchema.setFields(parentFields);
return mergedSchema;
}
}
Loading