diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java new file mode 100644 index 0000000000000..f7c1cfe696db0 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.deser; + +import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import org.apache.avro.Schema; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.kafka.common.errors.SerializationException; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; + +/** + * Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization. + */ +public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer { + + private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class"; + private Schema sourceSchema; + + public KafkaAvroSchemaDeserializer() {} + + @Override + public void configure(Map configs, boolean isKey) { + super.configure(configs, isKey); + try { + TypedProperties props = getConvertToTypedProperties(configs); + String className = props.getString(SCHEMA_PROVIDER_CLASS_PROP); + SchemaProvider schemaProvider = (SchemaProvider) ReflectionUtils.loadClass(className, props); + sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema(); + } catch (Throwable e) { + throw new HoodieException(e); + } + } + + /** + * Pretty much copy-paste from the {@link AbstractKafkaAvroDeserializer} except line 87: + * DatumReader reader = new GenericDatumReader(schema, sourceSchema); + *

+ * We need to inject reader schema during deserialization or later stages of the pipeline break. + * + * @param includeSchemaAndVersion + * @param topic + * @param isKey + * @param payload + * @param readerSchema + * @return + * @throws SerializationException + */ + @Override + protected Object deserialize( + boolean includeSchemaAndVersion, + String topic, + Boolean isKey, + byte[] payload, + Schema readerSchema) + throws SerializationException { + return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, sourceSchema); + } + + private TypedProperties getConvertToTypedProperties(Map configs) { + TypedProperties typedProperties = new TypedProperties(); + for (Entry entry : configs.entrySet()) { + typedProperties.put(entry.getKey(), entry.getValue()); + } + return typedProperties; + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java index 47c4c2f81a790..e997e718fefb6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.avro.Schema; +import org.apache.hudi.utilities.sources.helpers.AvroKafkaSourceHelpers; import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; @@ -42,31 +43,82 @@ public class SchemaRegistryProvider extends SchemaProvider { * Configs supported. */ public static class Config { - private static final String SRC_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url"; - private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = - "hoodie.deltastreamer.schemaprovider.registry.targetUrl"; + private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.targetUrl"; + private static final String CACHE_SCHEMAS = "hoodie.deltastreamer.schemaprovider.registry.cache_enabled"; } - private static String fetchSchemaFromRegistry(String registryUrl) throws IOException { - URL registry = new URL(registryUrl); - ObjectMapper mapper = new ObjectMapper(); - JsonNode node = mapper.readTree(registry.openStream()); - return node.get("schema").asText(); + private Schema sourceSchema; + private Schema targetSchema; + private final boolean cacheDisabled; + private final boolean injectKafkaFieldSchema; + private final String registryUrl; + private final String targetRegistryUrl; + private final boolean noTargetSchema; + + public SchemaRegistryProvider(TypedProperties props) { + this(props, null); } public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP)); + this.cacheDisabled = !props.getBoolean(Config.CACHE_SCHEMAS, false); + this.injectKafkaFieldSchema = props.getBoolean(AvroKafkaSourceHelpers.INJECT_KAFKA_META_FIELDS, false); + this.registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP); + this.targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl); + this.noTargetSchema = targetRegistryUrl.equals("null"); } - private static Schema getSchema(String registryUrl) throws IOException { - return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl)); + public String fetchSchemaFromRegistry(String registryUrl) throws IOException { + URL registry = new URL(registryUrl); + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(registry.openStream()); + return node.get("schema").asText(); + } + + private Schema getSchema(String registryUrl) throws IOException { + Schema schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl)); + if (injectKafkaFieldSchema) { + schema = AvroKafkaSourceHelpers.addKafkaMetadataFields(schema); + } + return schema; } @Override public Schema getSourceSchema() { - String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP); + if (cacheDisabled) { + return getSourceSchemaFromRegistry(); + } + if (sourceSchema == null) { + synchronized (this) { + if (sourceSchema == null) { + sourceSchema = getSourceSchemaFromRegistry(); + } + } + } + return sourceSchema; + } + + @Override + public Schema getTargetSchema() { + if (noTargetSchema) { + return null; + } + if (cacheDisabled) { + return getTargetSchemaFromRegistry(); + } + if (targetSchema == null) { + synchronized (this) { + if (targetSchema == null) { + targetSchema = getTargetSchemaFromRegistry(); + } + } + } + return targetSchema; + } + + private Schema getSourceSchemaFromRegistry() { try { return getSchema(registryUrl); } catch (IOException ioe) { @@ -74,14 +126,11 @@ public Schema getSourceSchema() { } } - @Override - public Schema getTargetSchema() { - String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP); - String targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl); + private Schema getTargetSchemaFromRegistry() { try { return getSchema(targetRegistryUrl); } catch (IOException ioe) { - throw new HoodieIOException("Error reading target schema from registry :" + registryUrl, ioe); + throw new HoodieIOException("Error reading target schema from registry :" + targetRegistryUrl, ioe); } } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index 256516bd2026f..ae2eafa55f2f7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -20,12 +20,14 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.AvroKafkaSourceHelpers; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.log4j.LogManager; @@ -42,18 +44,34 @@ */ public class AvroKafkaSource extends AvroSource { + private static final String KAFKA_AVRO_VALUE_DESERIALIZER = "hoodie.deltastreamer.source.value.deserializer.class"; private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class); - private final KafkaOffsetGen offsetGen; - private final HoodieDeltaStreamerMetrics metrics; + private final boolean injectKafkaData; public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieDeltaStreamerMetrics metrics) { super(props, sparkContext, sparkSession, schemaProvider); - this.metrics = metrics; + props.put("key.deserializer", StringDeserializer.class); - props.put("value.deserializer", KafkaAvroDeserializer.class); + String deserializerClassName = props.getString(KAFKA_AVRO_VALUE_DESERIALIZER, ""); + + if (deserializerClassName.isEmpty()) { + props.put("value.deserializer", KafkaAvroSchemaDeserializer.class); + } else { + try { + props.put("value.deserializer", Class.forName(deserializerClassName)); + } catch (ClassNotFoundException e) { + String error = "Could not load custom avro kafka deserializer: " + deserializerClassName; + LOG.error(error); + throw new HoodieException(error, e); + } + } + + this.metrics = metrics; + this.injectKafkaData = props.getBoolean(AvroKafkaSourceHelpers.INJECT_KAFKA_META_FIELDS, false); + offsetGen = new KafkaOffsetGen(props); } @@ -70,6 +88,10 @@ protected InputBatch> fetchNewData(Option lastChe } private JavaRDD toRDD(OffsetRange[] offsetRanges) { + if (injectKafkaData) { + return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, + LocationStrategies.PreferConsistent()).map(AvroKafkaSourceHelpers::addKafkaFields); + } return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value()); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroKafkaSourceHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroKafkaSourceHelpers.java new file mode 100644 index 0000000000000..86826b4e91f14 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroKafkaSourceHelpers.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class AvroKafkaSourceHelpers { + + public static final String INJECT_KAFKA_META_FIELDS = "hoodie.deltastreamer.source.inject_kafka_fields"; + + public static final String KAFKA_PARTITION_META_FIELD = "_hudi_kafka_partition"; + public static final String KAFKA_OFFSET_META_FIELD = "_hudi_kafka_offset"; + public static final String KAFKA_TOPIC_META_FIELD = "_hudi_kafka_topic"; + public static final String KAFKA_KEY_META_FIELD = "_hudi_kafka_key"; + public static final String KAFKA_META_FIELDS_PATTERN = ""; + + public static final String ALL_KAFKA_META_FIELDS = String.join( + ",", + AvroKafkaSourceHelpers.KAFKA_PARTITION_META_FIELD, + AvroKafkaSourceHelpers.KAFKA_OFFSET_META_FIELD, + AvroKafkaSourceHelpers.KAFKA_TOPIC_META_FIELD, + AvroKafkaSourceHelpers.KAFKA_KEY_META_FIELD); + + public static String transform(String sql) { + return sql.replaceAll(KAFKA_META_FIELDS_PATTERN, ALL_KAFKA_META_FIELDS); + } + + public static GenericRecord addKafkaFields(ConsumerRecord obj) { + GenericRecord record = (GenericRecord) obj.value(); + record.put(AvroKafkaSourceHelpers.KAFKA_OFFSET_META_FIELD, obj.offset()); + record.put(AvroKafkaSourceHelpers.KAFKA_PARTITION_META_FIELD, obj.partition()); + record.put(AvroKafkaSourceHelpers.KAFKA_TOPIC_META_FIELD, obj.topic()); + record.put(AvroKafkaSourceHelpers.KAFKA_KEY_META_FIELD, obj.key()); + return record; + } + + private static boolean isKafkaMetadataField(String fieldName) { + return AvroKafkaSourceHelpers.KAFKA_PARTITION_META_FIELD.equals(fieldName) + || AvroKafkaSourceHelpers.KAFKA_OFFSET_META_FIELD.equals(fieldName) + || AvroKafkaSourceHelpers.KAFKA_TOPIC_META_FIELD.equals(fieldName) + || AvroKafkaSourceHelpers.KAFKA_KEY_META_FIELD.equals(fieldName); + } + + /** + * Adds the Kafka metadata fields to the given schema, so later on the appropriate data can be injected. + */ + public static Schema addKafkaMetadataFields(Schema schema) { + + final List parentFields = new ArrayList<>(); + final List schemaFields = schema.getFields(); + + for (Schema.Field field : schemaFields) { + if (!isKafkaMetadataField(field.name())) { + Schema.Field newField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()); + for (Map.Entry prop : field.getObjectProps().entrySet()) { + newField.addProp(prop.getKey(), prop.getValue()); + } + parentFields.add(newField); + } + } + + final Schema.Field partitionField = + new Schema.Field( + AvroKafkaSourceHelpers.KAFKA_PARTITION_META_FIELD, + Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.INT))), + "", + Schema.NULL_VALUE); + final Schema.Field offsetField = + new Schema.Field( + AvroKafkaSourceHelpers.KAFKA_OFFSET_META_FIELD, + Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG))), + "", + Schema.NULL_VALUE); + final Schema.Field topicField = + new Schema.Field( + AvroKafkaSourceHelpers.KAFKA_TOPIC_META_FIELD, + Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))), + "", + Schema.NULL_VALUE); + final Schema.Field keyField = + new Schema.Field( + AvroKafkaSourceHelpers.KAFKA_KEY_META_FIELD, + Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))), + "", + Schema.NULL_VALUE); + + parentFields.add(partitionField); + parentFields.add(offsetField); + parentFields.add(topicField); + parentFields.add(keyField); + + final Schema mergedSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false); + mergedSchema.setFields(parentFields); + return mergedSchema; + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java index 9429c89d0ebf5..0010e4cbe373c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.transform; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.sources.helpers.AvroKafkaSourceHelpers; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -62,6 +63,9 @@ public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Datas LOG.info("Registering tmp table : " + tmpTable); rowDataset.registerTempTable(tmpTable); String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable); + if (properties.getBoolean(AvroKafkaSourceHelpers.INJECT_KAFKA_META_FIELDS, false)) { + sqlStr = AvroKafkaSourceHelpers.transform(sqlStr); + } LOG.info("SQL Query for transformation : (" + sqlStr + ")"); return sparkSession.sql(sqlStr); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java new file mode 100644 index 0000000000000..10283322d559b --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema; + +import org.apache.avro.Schema; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.sources.helpers.AvroKafkaSourceHelpers; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +class SchemaRegistryProviderTest { + + private static final String AVRO_SCHEMA = "{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"com.example\",\n" + + " \"name\": \"FullName\",\n" + + " \"fields\": [\n" + + " { \"name\": \"first\", \"type\": \"string\" },\n" + + " { \"name\": \"last\", \"type\": \"string\" }\n" + + " ]\n" + + "}"; + + TypedProperties initProps() { + TypedProperties tp = new TypedProperties(); + tp.put("hoodie.deltastreamer.schemaprovider.registry.url", "sourceUrl"); + tp.put("hoodie.deltastreamer.schemaprovider.registry.targetUrl", "targetUrl"); + tp.put("hoodie.deltastreamer.schemaprovider.registry.cache_enabled", "false"); + tp.put(AvroKafkaSourceHelpers.INJECT_KAFKA_META_FIELDS, "false"); + return tp; + } + + @Test + void getSourceSchemaNoCacheNoKafka() throws IOException { + TypedProperties tp = initProps(); + SchemaRegistryProvider srp = spy(new SchemaRegistryProvider(tp, null)); + doReturn(AVRO_SCHEMA).when(srp).fetchSchemaFromRegistry("sourceUrl"); + Schema sc1 = srp.getSourceSchema(); + Schema sc2 = srp.getSourceSchema(); + verify(srp, times(2)).fetchSchemaFromRegistry("sourceUrl"); + assertEquals(sc1, sc2); + } + + @Test + void getTargetSchemaNoCacheNoKafka() throws IOException { + TypedProperties tp = initProps(); + SchemaRegistryProvider srp = spy(new SchemaRegistryProvider(tp, null)); + doReturn(AVRO_SCHEMA).when(srp).fetchSchemaFromRegistry("targetUrl"); + Schema sc1 = srp.getTargetSchema(); + Schema sc2 = srp.getTargetSchema(); + verify(srp, times(2)).fetchSchemaFromRegistry("targetUrl"); + assertEquals(sc1, sc2); + } + + @Test + void getSourceSchemaWithCacheNoKafka() throws IOException { + TypedProperties tp = initProps(); + tp.put("hoodie.deltastreamer.schemaprovider.registry.cache_enabled", "true"); + SchemaRegistryProvider srp = spy(new SchemaRegistryProvider(tp, null)); + doReturn(AVRO_SCHEMA).when(srp).fetchSchemaFromRegistry("sourceUrl"); + Schema sc1 = srp.getSourceSchema(); + Schema sc2 = srp.getSourceSchema(); + verify(srp, times(1)).fetchSchemaFromRegistry("sourceUrl"); + assertEquals(sc1, sc2); + } + + @Test + void getTargetSchemaWithCacheNoKafka() throws IOException { + TypedProperties tp = initProps(); + tp.put("hoodie.deltastreamer.schemaprovider.registry.cache_enabled", "true"); + SchemaRegistryProvider srp = spy(new SchemaRegistryProvider(tp, null)); + doReturn(AVRO_SCHEMA).when(srp).fetchSchemaFromRegistry("targetUrl"); + Schema sc1 = srp.getTargetSchema(); + Schema sc2 = srp.getTargetSchema(); + verify(srp, times(1)).fetchSchemaFromRegistry("targetUrl"); + assertEquals(sc1, sc2); + } + + void assertFieldExistsInSchema(Schema sc, String fieldName) { + for (Schema.Field f : sc.getFields()) { + if (f.name().equals(fieldName)) { + return; + } + } + fail(String.format("No '%s' field found in schema %s", fieldName, sc)); + } + + @Test + void getSourceSchemaWithCacheAndKafka() throws IOException { + TypedProperties tp = initProps(); + tp.put("hoodie.deltastreamer.schemaprovider.registry.cache_enabled", "true"); + tp.put(AvroKafkaSourceHelpers.INJECT_KAFKA_META_FIELDS, "true"); + SchemaRegistryProvider srp = spy(new SchemaRegistryProvider(tp, null)); + doReturn(AVRO_SCHEMA).when(srp).fetchSchemaFromRegistry("sourceUrl"); + Schema sc1 = srp.getSourceSchema(); + Schema sc2 = srp.getSourceSchema(); + assertEquals(sc1, sc2); + System.out.println(sc1); + verify(srp, times(1)).fetchSchemaFromRegistry("sourceUrl"); + assertFieldExistsInSchema(sc1, AvroKafkaSourceHelpers.KAFKA_KEY_META_FIELD); + assertFieldExistsInSchema(sc1, AvroKafkaSourceHelpers.KAFKA_OFFSET_META_FIELD); + assertFieldExistsInSchema(sc1, AvroKafkaSourceHelpers.KAFKA_PARTITION_META_FIELD); + assertFieldExistsInSchema(sc1, AvroKafkaSourceHelpers.KAFKA_TOPIC_META_FIELD); + } + + @Test + void getTargetSchemaWithCacheAndKafka() throws IOException { + TypedProperties tp = initProps(); + tp.put("hoodie.deltastreamer.schemaprovider.registry.cache_enabled", "true"); + tp.put(AvroKafkaSourceHelpers.INJECT_KAFKA_META_FIELDS, "true"); + SchemaRegistryProvider srp = spy(new SchemaRegistryProvider(tp, null)); + doReturn(AVRO_SCHEMA).when(srp).fetchSchemaFromRegistry("targetUrl"); + Schema sc1 = srp.getTargetSchema(); + Schema sc2 = srp.getTargetSchema(); + assertEquals(sc1, sc2); + System.out.println(sc1); + verify(srp, times(1)).fetchSchemaFromRegistry("targetUrl"); + assertFieldExistsInSchema(sc1, AvroKafkaSourceHelpers.KAFKA_KEY_META_FIELD); + assertFieldExistsInSchema(sc1, AvroKafkaSourceHelpers.KAFKA_OFFSET_META_FIELD); + assertFieldExistsInSchema(sc1, AvroKafkaSourceHelpers.KAFKA_PARTITION_META_FIELD); + assertFieldExistsInSchema(sc1, AvroKafkaSourceHelpers.KAFKA_TOPIC_META_FIELD); + } + + @Test + void getNullTargetSchema() throws IOException { + TypedProperties tp = initProps(); + tp.put("hoodie.deltastreamer.schemaprovider.registry.targetUrl", "null"); + SchemaRegistryProvider srp = spy(new SchemaRegistryProvider(tp, null)); + + Schema sc = srp.getTargetSchema(); + assertNull(sc); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformerTest.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformerTest.java new file mode 100644 index 0000000000000..508a71e12bb96 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformerTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.transform; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.sources.helpers.AvroKafkaSourceHelpers; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.anyString; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class SqlQueryBasedTransformerTest { + + @Test + void applyNoKafkaFieldsAreInjected() { + Dataset dataset = mock(Dataset.class); + dataset.registerTempTable(anyString()); + + SparkSession sparkSession = mock(SparkSession.class); + ArgumentCaptor ac = ArgumentCaptor.forClass(String.class); + + SqlQueryBasedTransformer t = new SqlQueryBasedTransformer(); + TypedProperties props = new TypedProperties(); + props.put("hoodie.deltastreamer.transformer.sql", "sql query "); + props.put(AvroKafkaSourceHelpers.INJECT_KAFKA_META_FIELDS, "false"); + Dataset res = t.apply(null, sparkSession, dataset, props); + verify(sparkSession, times(1)).sql(ac.capture()); + + assertTrue(ac.getValue().startsWith("sql query HOODIE_SRC_TMP_TABLE_")); + assertTrue(ac.getValue().endsWith(AvroKafkaSourceHelpers.KAFKA_META_FIELDS_PATTERN)); + } + + @Test + void applyKafkaFieldsAreInjected() { + Dataset dataset = mock(Dataset.class); + dataset.registerTempTable(anyString()); + + SparkSession sparkSession = mock(SparkSession.class); + ArgumentCaptor ac = ArgumentCaptor.forClass(String.class); + + SqlQueryBasedTransformer t = new SqlQueryBasedTransformer(); + TypedProperties props = new TypedProperties(); + props.put("hoodie.deltastreamer.transformer.sql", "sql query "); + props.put(AvroKafkaSourceHelpers.INJECT_KAFKA_META_FIELDS, "true"); + + Dataset res = t.apply(null, sparkSession, dataset, props); + verify(sparkSession, times(1)).sql(ac.capture()); + assertTrue(ac.getValue().startsWith("sql query HOODIE_SRC_TMP_TABLE_")); + assertTrue(ac.getValue().endsWith(AvroKafkaSourceHelpers.ALL_KAFKA_META_FIELDS)); + } +}