diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 089b7801baf82..1796fb3a67176 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -216,6 +216,13 @@
provided
+
+ org.apache.spark
+ spark-hive_${scala.binary.version}
+ ${spark.version}
+ test
+
+
org.apache.spark
spark-streaming_${scala.binary.version}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
new file mode 100644
index 0000000000000..219b1ae57886d
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+
+public class HiveSchemaProvider extends SchemaProvider {
+
+ /**
+ * Configs supported.
+ */
+ public static class Config {
+ private static final String SOURCE_SCHEMA_DATABASE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.hive.database";
+ private static final String SOURCE_SCHEMA_TABLE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.hive.table";
+ private static final String TARGET_SCHEMA_DATABASE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.hive.database";
+ private static final String TARGET_SCHEMA_TABLE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.hive.table";
+ }
+
+ private static final Logger LOG = LogManager.getLogger(HiveSchemaProvider.class);
+
+ private final Schema sourceSchema;
+
+ private Schema targetSchema;
+
+ public HiveSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
+ super(props, jssc);
+ DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_TABLE_PROP));
+ String sourceSchemaDBName = props.getString(Config.SOURCE_SCHEMA_DATABASE_PROP, "default");
+ String sourceSchemaTableName = props.getString(Config.SOURCE_SCHEMA_TABLE_PROP);
+ SparkSession spark = SparkSession.builder().config(jssc.getConf()).enableHiveSupport().getOrCreate();
+ try {
+ TableIdentifier sourceSchemaTable = new TableIdentifier(sourceSchemaTableName, scala.Option.apply(sourceSchemaDBName));
+ StructType sourceSchema = spark.sessionState().catalog().getTableMetadata(sourceSchemaTable).schema();
+
+ this.sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(
+ sourceSchema,
+ sourceSchemaTableName,
+ "hoodie." + sourceSchemaDBName);
+
+ if (props.containsKey(Config.TARGET_SCHEMA_TABLE_PROP)) {
+ String targetSchemaDBName = props.getString(Config.TARGET_SCHEMA_DATABASE_PROP, "default");
+ String targetSchemaTableName = props.getString(Config.TARGET_SCHEMA_TABLE_PROP);
+ TableIdentifier targetSchemaTable = new TableIdentifier(targetSchemaTableName, scala.Option.apply(targetSchemaDBName));
+ StructType targetSchema = spark.sessionState().catalog().getTableMetadata(targetSchemaTable).schema();
+ this.targetSchema = AvroConversionUtils.convertStructTypeToAvroSchema(
+ targetSchema,
+ targetSchemaTableName,
+ "hoodie." + targetSchemaDBName);
+ }
+ } catch (NoSuchTableException | NoSuchDatabaseException e) {
+ String message = String.format("Can't find Hive table(s): %s", sourceSchemaTableName + "," + props.getString(Config.TARGET_SCHEMA_TABLE_PROP));
+ throw new IllegalArgumentException(message, e);
+ }
+ }
+
+ @Override
+ public Schema getSourceSchema() {
+ return sourceSchema;
+ }
+
+ @Override
+ public Schema getTargetSchema() {
+ if (targetSchema != null) {
+ return targetSchema;
+ } else {
+ return super.getTargetSchema();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
new file mode 100644
index 0000000000000..414ad524a04a2
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.HiveSchemaProvider;
+import org.apache.hudi.utilities.testutils.SparkClientFunctionalTestHarnessWithHiveSupport;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Basic tests against {@link HiveSchemaProvider}.
+ */
+@Tag("functional")
+public class TestHiveSchemaProvider extends SparkClientFunctionalTestHarnessWithHiveSupport {
+ private static final Logger LOG = LogManager.getLogger(TestHiveSchemaProvider.class);
+ private static final TypedProperties PROPS = new TypedProperties();
+ private static final String SOURCE_SCHEMA_TABLE_NAME = "schema_registry.source_schema_tab";
+ private static final String TARGET_SCHEMA_TABLE_NAME = "schema_registry.target_schema_tab";
+
+ @BeforeAll
+ public static void init() {
+ Pair dbAndTableName = paresDBAndTableName(SOURCE_SCHEMA_TABLE_NAME);
+ PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.database", dbAndTableName.getLeft());
+ PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.table", dbAndTableName.getRight());
+ }
+
+ @Test
+ public void testSourceSchema() throws Exception {
+ try {
+ createSchemaTable(SOURCE_SCHEMA_TABLE_NAME);
+ Schema sourceSchema = UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS, jsc()).getSourceSchema();
+
+ Schema originalSchema = new Schema.Parser().parse(
+ UtilitiesTestBase.Helpers.readFile("delta-streamer-config/hive_schema_provider_source.avsc")
+ );
+ for (Schema.Field field : sourceSchema.getFields()) {
+ Schema.Field originalField = originalSchema.getField(field.name());
+ assertTrue(originalField != null);
+ }
+ } catch (HoodieException e) {
+ LOG.error("Failed to get source schema. ", e);
+ throw e;
+ }
+ }
+
+ @Test
+ public void testTargetSchema() throws Exception {
+ try {
+ Pair dbAndTableName = paresDBAndTableName(TARGET_SCHEMA_TABLE_NAME);
+ PROPS.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.hive.database", dbAndTableName.getLeft());
+ PROPS.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.hive.table", dbAndTableName.getRight());
+ createSchemaTable(SOURCE_SCHEMA_TABLE_NAME);
+ createSchemaTable(TARGET_SCHEMA_TABLE_NAME);
+ Schema targetSchema = UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS, jsc()).getTargetSchema();
+ Schema originalSchema = new Schema.Parser().parse(
+ UtilitiesTestBase.Helpers.readFile("delta-streamer-config/hive_schema_provider_target.avsc"));
+ for (Schema.Field field : targetSchema.getFields()) {
+ Schema.Field originalField = originalSchema.getField(field.name());
+ assertTrue(originalField != null);
+ }
+ } catch (HoodieException e) {
+ LOG.error("Failed to get source/target schema. ", e);
+ throw e;
+ }
+ }
+
+ @Test
+ public void testNotExistTable() {
+ String wrongName = "wrong_schema_tab";
+ PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.table", wrongName);
+ Assertions.assertThrows(NoSuchTableException.class, () -> {
+ try {
+ UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS, jsc()).getSourceSchema();
+ } catch (Throwable exception) {
+ while (exception.getCause() != null) {
+ exception = exception.getCause();
+ }
+ throw exception;
+ }
+ });
+ }
+
+ private static Pair paresDBAndTableName(String fullName) {
+ String[] dbAndTableName = fullName.split("\\.");
+ if (dbAndTableName.length > 1) {
+ return new ImmutablePair<>(dbAndTableName[0], dbAndTableName[1]);
+ } else {
+ return new ImmutablePair<>("default", dbAndTableName[0]);
+ }
+ }
+
+ private void createSchemaTable(String fullName) throws IOException {
+ SparkSession spark = spark();
+ String createTableSQL = UtilitiesTestBase.Helpers.readFile(String.format("delta-streamer-config/%s.sql", fullName));
+ Pair dbAndTableName = paresDBAndTableName(fullName);
+ spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", dbAndTableName.getLeft()));
+ spark.sql(createTableSQL);
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SparkClientFunctionalTestHarnessWithHiveSupport.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SparkClientFunctionalTestHarnessWithHiveSupport.java
new file mode 100644
index 0000000000000..fd59d633e7aed
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SparkClientFunctionalTestHarnessWithHiveSupport.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.testutils;
+
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.spark.SparkConf;
+
+import java.util.Collections;
+
+public class SparkClientFunctionalTestHarnessWithHiveSupport extends SparkClientFunctionalTestHarness {
+
+ public SparkConf conf() {
+ return conf(Collections.singletonMap("spark.sql.catalogImplementation", "hive"));
+ }
+}
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_source.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_source.avsc
new file mode 100644
index 0000000000000..5b1c62babfcbc
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_source.avsc
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "type": "record",
+ "name": "source_schema_tab",
+ "namespace": "hoodie.schema_registry",
+ "fields": [
+ {
+ "name": "id",
+ "type": [
+ "long",
+ "null"
+ ]
+ },
+ {
+ "name": "name",
+ "type": [
+ "string",
+ "null"
+ ]
+ },
+ {
+ "name": "num1",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "num2",
+ "type": [
+ "long",
+ "null"
+ ]
+ },
+ {
+ "name": "num3",
+ "type": [
+ {
+ "type": "fixed",
+ "name": "fixed",
+ "namespace": "hoodie.schema_registry.source_schema_tab.num3",
+ "size": 9,
+ "logicalType": "decimal",
+ "precision": 20,
+ "scale": 0
+ },
+ "null"
+ ]
+ },
+ {
+ "name": "num4",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "num5",
+ "type": [
+ "float",
+ "null"
+ ]
+ },
+ {
+ "name": "num6",
+ "type": [
+ "double",
+ "null"
+ ]
+ },
+ {
+ "name": "bool",
+ "type": [
+ "boolean",
+ "null"
+ ]
+ },
+ {
+ "name": "bin",
+ "type": [
+ "bytes",
+ "null"
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_target.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_target.avsc
new file mode 100644
index 0000000000000..d3d95ed14b471
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/hive_schema_provider_target.avsc
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "type": "record",
+ "name": "target_schema_tab",
+ "namespace": "hoodie.schema_registry",
+ "fields": [
+ {
+ "name": "id",
+ "type": [
+ "long",
+ "null"
+ ]
+ },
+ {
+ "name": "name",
+ "type": [
+ "string",
+ "null"
+ ]
+ },
+ {
+ "name": "num1",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "num2",
+ "type": [
+ "long",
+ "null"
+ ]
+ },
+ {
+ "name": "num3",
+ "type": [
+ {
+ "type": "fixed",
+ "name": "fixed",
+ "namespace": "hoodie.schema_registry.target_schema_tab.num3",
+ "size": 9,
+ "logicalType": "decimal",
+ "precision": 20,
+ "scale": 0
+ },
+ "null"
+ ]
+ },
+ {
+ "name": "num4",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "num5",
+ "type": [
+ "float",
+ "null"
+ ]
+ },
+ {
+ "name": "num6",
+ "type": [
+ "double",
+ "null"
+ ]
+ },
+ {
+ "name": "bool",
+ "type": [
+ "boolean",
+ "null"
+ ]
+ },
+ {
+ "name": "bin",
+ "type": [
+ "bytes",
+ "null"
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.source_schema_tab.sql b/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.source_schema_tab.sql
new file mode 100644
index 0000000000000..b95ae0f5ee151
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.source_schema_tab.sql
@@ -0,0 +1,12 @@
+CREATE TABLE IF NOT EXISTS `schema_registry`.`source_schema_tab`(
+ `id` BIGINT,
+ `name` STRING,
+ `num1` INT,
+ `num2` BIGINT,
+ `num3` DECIMAL(20,0),
+ `num4` TINYINT,
+ `num5` FLOAT,
+ `num6` DOUBLE,
+ `bool` BOOLEAN,
+ `bin` BINARY
+)
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.target_schema_tab.sql b/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.target_schema_tab.sql
new file mode 100644
index 0000000000000..07f179f90ed93
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/schema_registry.target_schema_tab.sql
@@ -0,0 +1,12 @@
+CREATE TABLE IF NOT EXISTS `schema_registry`.`target_schema_tab`(
+ `id` BIGINT,
+ `name` STRING,
+ `num1` INT,
+ `num2` BIGINT,
+ `num3` DECIMAL(20,0),
+ `num4` TINYINT,
+ `num5` FLOAT,
+ `num6` DOUBLE,
+ `bool` BOOLEAN,
+ `bin` BINARY
+)
\ No newline at end of file