diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index c4a1606fd52f1..1297f40f852ae 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -287,7 +287,6 @@ org.apache.kafka kafka_${scala.binary.version} ${kafka.version} - test diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java index 43f2ff27d2eff..604a267ccb61d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java @@ -40,8 +40,10 @@ public class FilebasedSchemaProvider extends SchemaProvider { * Configs supported. */ public static class Config { - private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.file"; + + public static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.file"; private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.file"; + // public static final String SOURCE_SCHEMA_PROP = "hoodie.deltastreamer.schemaprovider.source.schema"; } private final FileSystem fs; @@ -49,14 +51,17 @@ public static class Config { private final Schema sourceSchema; private Schema targetSchema; + private TypedProperties props; public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); + this.props = props; DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP)); String sourceFile = props.getString(Config.SOURCE_SCHEMA_FILE_PROP); this.fs = FSUtils.getFs(sourceFile, jssc.hadoopConfiguration(), true); try { this.sourceSchema = new Schema.Parser().parse(this.fs.open(new Path(sourceFile))); + // props.setProperty(Config.SOURCE_SCHEMA_PROP, sourceSchema.toString()); if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) { this.targetSchema = new Schema.Parser().parse(fs.open(new Path(props.getString(Config.TARGET_SCHEMA_FILE_PROP)))); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java new file mode 100644 index 0000000000000..aaae059b37070 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.serde; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.UtilHelpers; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig; + +import kafka.utils.VerifiableProperties; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.kafka.common.errors.SerializationException; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +import static org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig.SCHEMA_PROVIDER_CLASS_PROP; + +public class AbstractHoodieKafkaAvroDeserializer { + + private final DecoderFactory decoderFactory = DecoderFactory.get(); + private boolean useSpecificAvroReader = false; + private Schema sourceSchema; + private Schema targetSchema; + + public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) { + // this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP)); + TypedProperties typedProperties = new TypedProperties(); + copyProperties(typedProperties, properties.props()); + try { + SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider( + typedProperties.getString(SCHEMA_PROVIDER_CLASS_PROP), typedProperties, null); + this.sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema(); + this.targetSchema = Objects.requireNonNull(schemaProvider).getTargetSchema(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void copyProperties(TypedProperties typedProperties, Properties properties) { + for (Map.Entry entry : properties.entrySet()) { + typedProperties.put(entry.getKey(), entry.getValue()); + } + } + + protected void configure(HoodieKafkaAvroDeserializationConfig config) { + useSpecificAvroReader = config + .getBoolean(HoodieKafkaAvroDeserializationConfig.SPECIFIC_AVRO_READER_CONFIG); + } + + protected Object deserialize(byte[] payload) throws SerializationException { + return deserialize(null, null, payload, targetSchema); + } + + /** + * Just like single-parameter version but accepts an Avro schema to use for reading. + * + * @param payload serialized data + * @param readerSchema schema to use for Avro read (optional, enables Avro projection) + * @return the deserialized object + */ + protected Object deserialize(byte[] payload, Schema readerSchema) throws SerializationException { + return deserialize(null, null, payload, readerSchema); + } + + protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) { + try { + ByteBuffer buffer = this.getByteBuffer(payload); + int id = buffer.getInt(); + int length = buffer.limit() - 1 - 4; + Object result; + if (sourceSchema.getType().equals(Schema.Type.BYTES)) { + byte[] bytes = new byte[length]; + buffer.get(bytes, 0, length); + result = bytes; + } else { + int start = buffer.position() + buffer.arrayOffset(); + DatumReader reader = this.getDatumReader(sourceSchema, readerSchema); + Object object = reader.read(null, this.decoderFactory.binaryDecoder(buffer.array(), start, length, null)); + if (sourceSchema.getType().equals(Schema.Type.STRING)) { + object = object.toString(); + } + result = object; + } + return result; + } catch (IOException | RuntimeException e) { + throw new SerializationException("Error deserializing payload: ", e); + } + } + + private ByteBuffer getByteBuffer(byte[] payload) { + ByteBuffer buffer = ByteBuffer.wrap(payload); + if (buffer.get() != 0) { + throw new SerializationException("Unknown magic byte!"); + } else { + return buffer; + } + } + + private DatumReader getDatumReader(Schema writerSchema, Schema readerSchema) { + if (this.useSpecificAvroReader) { + return new SpecificDatumReader(writerSchema, readerSchema); + } else { + return new GenericDatumReader(writerSchema, readerSchema); + } + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/HoodieKafkaAvroDecoder.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/HoodieKafkaAvroDecoder.java new file mode 100644 index 0000000000000..1a0e950534078 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/HoodieKafkaAvroDecoder.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.serde; + +import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig; + +import kafka.serializer.Decoder; +import kafka.utils.VerifiableProperties; +import org.apache.avro.Schema; +import org.apache.kafka.common.errors.SerializationException; + +public class HoodieKafkaAvroDecoder extends AbstractHoodieKafkaAvroDeserializer implements Decoder { + + public HoodieKafkaAvroDecoder(VerifiableProperties properties) { + super(properties); + configure(new HoodieKafkaAvroDeserializationConfig(properties.props())); + } + + @Override + public Object fromBytes(byte[] bytes) { + return deserialize(bytes); + } + + @Override + protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException { + return super.deserialize(topic, isKey, payload, readerSchema); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/config/HoodieKafkaAvroDeserializationConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/config/HoodieKafkaAvroDeserializationConfig.java new file mode 100644 index 0000000000000..1457c6a6e9ce5 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/config/HoodieKafkaAvroDeserializationConfig.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.serde.config; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import java.util.Map; + +public class HoodieKafkaAvroDeserializationConfig extends AbstractConfig { + + public static final String SPECIFIC_AVRO_READER_CONFIG = "specific.avro.reader"; + public static final boolean SPECIFIC_AVRO_READER_DEFAULT = false; + public static final String SPECIFIC_AVRO_READER_DOC = "If true, tries to look up the SpecificRecord class "; + + public static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class"; + + private static ConfigDef config; + + static { + config = new ConfigDef() + .define(SPECIFIC_AVRO_READER_CONFIG, ConfigDef.Type.BOOLEAN, SPECIFIC_AVRO_READER_DEFAULT, + ConfigDef.Importance.LOW, SPECIFIC_AVRO_READER_DOC); + } + + public HoodieKafkaAvroDeserializationConfig(Map props) { + super(config, props); + } + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index 256516bd2026f..17a8b00610618 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -18,10 +18,13 @@ package org.apache.hudi.utilities.sources; +import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.serde.HoodieKafkaAvroDecoder; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; @@ -37,6 +40,8 @@ import org.apache.spark.streaming.kafka010.LocationStrategies; import org.apache.spark.streaming.kafka010.OffsetRange; +import java.util.Collections; + /** * Reads avro serialized Kafka data, based on the confluent schema-registry. */ @@ -45,15 +50,21 @@ public class AvroKafkaSource extends AvroSource { private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class); private final KafkaOffsetGen offsetGen; - private final HoodieDeltaStreamerMetrics metrics; + private final String useSchemaRegistry = "hoodie.deltastreamer.source.avro.serde.useSchemaRegistry"; public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) { super(props, sparkContext, sparkSession, schemaProvider); this.metrics = metrics; + boolean useSchemaRegistryForDeserialization = props.getBoolean(useSchemaRegistry, true); props.put("key.deserializer", StringDeserializer.class); - props.put("value.deserializer", KafkaAvroDeserializer.class); + if (useSchemaRegistryForDeserialization) { + props.put("value.deserializer", KafkaAvroDeserializer.class); + } else { + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_FILE_PROP)); + props.put("value.deserializer", HoodieKafkaAvroDecoder.class); + } offsetGen = new KafkaOffsetGen(props); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/serde/TestHoodieKafkaAvroDecoder.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/serde/TestHoodieKafkaAvroDecoder.java new file mode 100644 index 0000000000000..d96a3cbd5f049 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/serde/TestHoodieKafkaAvroDecoder.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.serde; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; + +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import kafka.utils.VerifiableProperties; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Properties; + +public class TestHoodieKafkaAvroDecoder extends UtilitiesTestBase { + + private static KafkaAvroSerializer avroSerializer; + private static KafkaAvroDeserializer avroDeserializer; + private static String topic; + + @BeforeAll + public static void init() throws Exception { + try { + UtilitiesTestBase.initClass(); + } catch (Exception e) { + System.out.println("Exception thrown " + e.getMessage() + " ... " + e.getCause()); + throw e; + } + Properties defaultConfig = new Properties(); + defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus"); + SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient(); + avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient); + avroSerializer = new KafkaAvroSerializer(schemaRegistryClient, new HashMap(defaultConfig)); + topic = "test_topic"; + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_uber.avsc", dfs, dfsBasePath + "/source_uber.avsc"); + } + + @BeforeEach + public void setup() throws Exception { + super.setup(); + } + + @AfterEach + public void teardown() throws Exception { + super.teardown(); + } + + @AfterAll + public static void cleanupClass() { + UtilitiesTestBase.cleanupClass(); + } + + private GenericRecord createAvroRecord(Schema schema) { + GenericRecord avroRecord = new GenericData.Record(schema); + avroRecord.put("_row_key", "testUser"); + avroRecord.put("timestamp", 0.0); + avroRecord.put("rider", "dummy_rider"); + avroRecord.put("driver", "dummy_driver"); + avroRecord.put("fare", 50.0); + avroRecord.put("_hoodie_is_deleted", false); + return avroRecord; + } + + @Test + public void testKafkaAvroDecoder() { + TypedProperties typedProperties = new TypedProperties(); + typedProperties.setProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_FILE_PROP, dfsBasePath + "/source_uber.avsc"); + typedProperties.setProperty(HoodieKafkaAvroDeserializationConfig.SCHEMA_PROVIDER_CLASS_PROP, FilebasedSchemaProvider.class.getName()); + VerifiableProperties verifiableProperties = new VerifiableProperties(typedProperties); + FilebasedSchemaProvider schemaProvider = new FilebasedSchemaProvider(typedProperties, jsc); + byte[] bytes; + GenericRecord record = createAvroRecord(schemaProvider.getSourceSchema()); + bytes = avroSerializer.serialize(topic, record); + HoodieKafkaAvroDecoder kafkaAvroDecoder = new HoodieKafkaAvroDecoder(verifiableProperties); + Assertions.assertEquals(record, avroDeserializer.deserialize(topic, bytes)); + Assertions.assertEquals(record, kafkaAvroDecoder.deserialize(bytes)); + } +}