diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 4f99926887692..5fc989e2e5185 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -23,6 +23,7 @@
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -261,6 +262,11 @@ private MessageType getTableParquetSchemaFromDataFile() {
}
}
+ public static MessageType convertAvroSchemaToParquet(Schema schema, Configuration hadoopConf) {
+ AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(hadoopConf);
+ return avroSchemaConverter.convert(schema);
+ }
+
private Schema convertParquetSchemaToAvro(MessageType parquetSchema) {
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf());
return avroSchemaConverter.convert(parquetSchema);
diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieCatalogException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieCatalogException.java
new file mode 100644
index 0000000000000..ccfef909096b5
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieCatalogException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * Exception thrown for Hoodie Catalog errors.
+ */
+public class HoodieCatalogException extends RuntimeException {
+
+ public HoodieCatalogException() {
+ super();
+ }
+
+ public HoodieCatalogException(String message) {
+ super(message);
+ }
+
+ public HoodieCatalogException(String message, Throwable t) {
+ super(message, t);
+ }
+
+ public HoodieCatalogException(Throwable t) {
+ super(t);
+ }
+
+}
diff --git a/hudi-flink-datasource/hudi-flink/pom.xml b/hudi-flink-datasource/hudi-flink/pom.xml
index 5ad323f93442d..77e8a77cf52a8 100644
--- a/hudi-flink-datasource/hudi-flink/pom.xml
+++ b/hudi-flink-datasource/hudi-flink/pom.xml
@@ -269,7 +269,42 @@
-
+
+ javax.transaction
+ jta
+ 1.1
+ test
+
+
+ javax.transaction
+ javax.transaction-api
+ 1.3
+ test
+
+
+ ${hive.groupid}
+ hive-metastore
+ ${hive.version}
+ provided
+
+
+ javax.transaction
+ jta
+
+
+ javax.transaction
+ javax.transaction-api
+
+
+ javax.mail
+ mail
+
+
+ org.eclipse.jetty.aggregate
+ *
+
+
+
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 4ce77afc9f15c..00ebf094261b6 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -27,6 +27,7 @@
import org.apache.flink.configuration.Configuration;
import java.util.Locale;
+import java.util.Map;
/**
* Tool helping to resolve the flink options {@link FlinkOptions}.
@@ -66,6 +67,14 @@ public static boolean isMorTable(Configuration conf) {
.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
}
+ /**
+ * Returns whether it is a MERGE_ON_READ table.
+ */
+ public static boolean isMorTable(Map options) {
+ return options.getOrDefault(FlinkOptions.TABLE_TYPE.key(),
+ FlinkOptions.TABLE_TYPE.defaultValue()).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+ }
+
/**
* Returns whether it is a COPY_ON_WRITE table.
*/
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/CatalogOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/CatalogOptions.java
index 842f2757af39c..080153c1ba250 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/CatalogOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/CatalogOptions.java
@@ -29,6 +29,8 @@
* Hoodie catalog options.
*/
public class CatalogOptions {
+ public static final String HIVE_SITE_FILE = "hive-site.xml";
+ public static final String DEFAULT_DB = "default";
public static final ConfigOption CATALOG_PATH =
ConfigOptions.key("catalog.path")
@@ -42,6 +44,22 @@ public class CatalogOptions {
.stringType()
.defaultValue("default");
+ public static final ConfigOption HIVE_CONF_DIR = ConfigOptions
+ .key("hive.conf.dir")
+ .stringType()
+ .noDefaultValue();
+
+ public static final ConfigOption MODE = ConfigOptions
+ .key("mode")
+ .stringType()
+ .defaultValue("dfs");
+
+ public static final ConfigOption TABLE_EXTERNAL = ConfigOptions
+ .key("table.external")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether the table is external, default false");
+
/**
* Returns all the common table options that can be shared.
*
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
new file mode 100644
index 0000000000000..36503c152c3c8
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.catalog;
+
+import org.apache.hudi.common.util.StringUtils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utilities for Hive field schema.
+ */
+public class HiveSchemaUtils {
+ /** Get field names from field schemas. */
+ public static List getFieldNames(List fieldSchemas) {
+ List names = new ArrayList<>(fieldSchemas.size());
+ for (FieldSchema fs : fieldSchemas) {
+ names.add(fs.getName());
+ }
+ return names;
+ }
+
+ public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTable) {
+ List allCols = new ArrayList<>(hiveTable.getSd().getCols());
+ allCols.addAll(hiveTable.getPartitionKeys());
+
+ String pkConstraintName = hiveTable.getParameters().get(TableOptionProperties.PK_CONSTRAINT_NAME);
+ List primaryColNames = StringUtils.isNullOrEmpty(pkConstraintName)
+ ? Collections.EMPTY_LIST
+ : StringUtils.split(hiveTable.getParameters().get(TableOptionProperties.PK_COLUMNS),",");
+
+ String[] colNames = new String[allCols.size()];
+ DataType[] colTypes = new DataType[allCols.size()];
+
+ for (int i = 0; i < allCols.size(); i++) {
+ FieldSchema fs = allCols.get(i);
+
+ colNames[i] = fs.getName();
+ colTypes[i] =
+ toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
+ if (primaryColNames.contains(colNames[i])) {
+ colTypes[i] = colTypes[i].notNull();
+ }
+ }
+
+ org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder().fromFields(colNames, colTypes);
+ if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
+ builder.primaryKeyNamed(pkConstraintName, primaryColNames);
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Convert Hive data type to a Flink data type.
+ *
+ * @param hiveType a Hive data type
+ * @return the corresponding Flink data type
+ */
+ public static DataType toFlinkType(TypeInfo hiveType) {
+ checkNotNull(hiveType, "hiveType cannot be null");
+
+ switch (hiveType.getCategory()) {
+ case PRIMITIVE:
+ return toFlinkPrimitiveType((PrimitiveTypeInfo) hiveType);
+ case LIST:
+ ListTypeInfo listTypeInfo = (ListTypeInfo) hiveType;
+ return DataTypes.ARRAY(toFlinkType(listTypeInfo.getListElementTypeInfo()));
+ case MAP:
+ MapTypeInfo mapTypeInfo = (MapTypeInfo) hiveType;
+ return DataTypes.MAP(
+ toFlinkType(mapTypeInfo.getMapKeyTypeInfo()),
+ toFlinkType(mapTypeInfo.getMapValueTypeInfo()));
+ case STRUCT:
+ StructTypeInfo structTypeInfo = (StructTypeInfo) hiveType;
+
+ List names = structTypeInfo.getAllStructFieldNames();
+ List typeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+
+ DataTypes.Field[] fields = new DataTypes.Field[names.size()];
+
+ for (int i = 0; i < fields.length; i++) {
+ fields[i] = DataTypes.FIELD(names.get(i), toFlinkType(typeInfos.get(i)));
+ }
+
+ return DataTypes.ROW(fields);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Flink doesn't support Hive data type %s yet.", hiveType));
+ }
+ }
+
+ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
+ checkNotNull(hiveType, "hiveType cannot be null");
+
+ switch (hiveType.getPrimitiveCategory()) {
+ case CHAR:
+ return DataTypes.CHAR(((CharTypeInfo) hiveType).getLength());
+ case VARCHAR:
+ return DataTypes.VARCHAR(((VarcharTypeInfo) hiveType).getLength());
+ case STRING:
+ return DataTypes.STRING();
+ case BOOLEAN:
+ return DataTypes.BOOLEAN();
+ case BYTE:
+ return DataTypes.TINYINT();
+ case SHORT:
+ return DataTypes.SMALLINT();
+ case INT:
+ return DataTypes.INT();
+ case LONG:
+ return DataTypes.BIGINT();
+ case FLOAT:
+ return DataTypes.FLOAT();
+ case DOUBLE:
+ return DataTypes.DOUBLE();
+ case DATE:
+ return DataTypes.DATE();
+ case TIMESTAMP:
+ return DataTypes.TIMESTAMP(9);
+ case BINARY:
+ return DataTypes.BYTES();
+ case DECIMAL:
+ DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) hiveType;
+ return DataTypes.DECIMAL(
+ decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Flink doesn't support Hive primitive type %s yet", hiveType));
+ }
+ }
+
+ /** Create Hive columns from Flink TableSchema. */
+ public static List createHiveColumns(TableSchema schema) {
+ String[] fieldNames = schema.getFieldNames();
+ DataType[] fieldTypes = schema.getFieldDataTypes();
+
+ List columns = new ArrayList<>(fieldNames.length);
+
+ for (int i = 0; i < fieldNames.length; i++) {
+ columns.add(
+ new FieldSchema(
+ fieldNames[i],
+ toHiveTypeInfo(fieldTypes[i], true).getTypeName(),
+ null));
+ }
+
+ return columns;
+ }
+
+ /**
+ * Convert Flink DataType to Hive TypeInfo. For types with a precision parameter, e.g.
+ * timestamp, the supported precisions in Hive and Flink can be different. Therefore the
+ * conversion will fail for those types if the precision is not supported by Hive and
+ * checkPrecision is true.
+ *
+ * @param dataType a Flink DataType
+ * @param checkPrecision whether to fail the conversion if the precision of the DataType is not
+ * supported by Hive
+ * @return the corresponding Hive data type
+ */
+ public static TypeInfo toHiveTypeInfo(DataType dataType, boolean checkPrecision) {
+ checkNotNull(dataType, "type cannot be null");
+ LogicalType logicalType = dataType.getLogicalType();
+ return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType, checkPrecision));
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
index 8ab632ba5a5aa..4a63b7a26ba09 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java
@@ -18,6 +18,8 @@
package org.apache.hudi.table.catalog;
+import org.apache.hudi.exception.HoodieCatalogException;
+
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.Catalog;
@@ -28,10 +30,11 @@
import java.util.Collections;
import java.util.HashSet;
+import java.util.Locale;
import java.util.Set;
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
-import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
/**
* A catalog factory impl that creates {@link HoodieCatalog}.
@@ -51,22 +54,35 @@ public Catalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validate();
-
- return new HoodieCatalog(
- context.getName(),
- (Configuration) helper.getOptions());
+ String mode = helper.getOptions().get(CatalogOptions.MODE);
+ switch (mode.toLowerCase(Locale.ROOT)) {
+ case "hms":
+ return new HoodieHiveCatalog(
+ context.getName(),
+ helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE),
+ helper.getOptions().get(CatalogOptions.HIVE_CONF_DIR));
+ case "dfs":
+ return new HoodieCatalog(
+ context.getName(),
+ (Configuration) helper.getOptions());
+ default:
+ throw new HoodieCatalogException(String.format("Invalid catalog mode: %s, supported modes: [hms, dfs].", mode));
+ }
}
@Override
public Set> requiredOptions() {
- Set> options = new HashSet<>();
- options.add(CATALOG_PATH);
- options.add(DEFAULT_DATABASE);
- return options;
+ return Collections.emptySet();
}
@Override
public Set> optionalOptions() {
- return Collections.emptySet();
+ final Set> options = new HashSet<>();
+ options.add(CatalogOptions.DEFAULT_DATABASE);
+ options.add(PROPERTY_VERSION);
+ options.add(CatalogOptions.HIVE_CONF_DIR);
+ options.add(CatalogOptions.MODE);
+ options.add(CATALOG_PATH);
+ return options;
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
new file mode 100644
index 0000000000000..f546300249ca5
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.catalog;
+
+import org.apache.hudi.configuration.HadoopConfigurations;
+
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+import static org.apache.hudi.table.catalog.CatalogOptions.HIVE_SITE_FILE;
+
+/**
+ * Utilities for Hoodie Catalog.
+ */
+public class HoodieCatalogUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieCatalogUtil.class);
+
+ /**
+ * Returns a new {@code HiveConf}.
+ *
+ * @param hiveConfDir Hive conf directory path.
+ * @return A HiveConf instance.
+ */
+ public static HiveConf createHiveConf(@Nullable String hiveConfDir) {
+ // create HiveConf from hadoop configuration with hadoop conf directory configured.
+ Configuration hadoopConf = HadoopConfigurations.getHadoopConf(new org.apache.flink.configuration.Configuration());
+
+ // ignore all the static conf file URLs that HiveConf may have set
+ HiveConf.setHiveSiteLocation(null);
+ HiveConf.setLoadMetastoreConfig(false);
+ HiveConf.setLoadHiveServer2Config(false);
+ HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class);
+
+ LOG.info("Setting hive conf dir as {}", hiveConfDir);
+
+ if (hiveConfDir != null) {
+ Path hiveSite = new Path(hiveConfDir, HIVE_SITE_FILE);
+ if (!hiveSite.toUri().isAbsolute()) {
+ // treat relative URI as local file to be compatible with previous behavior
+ hiveSite = new Path(new File(hiveSite.toString()).toURI());
+ }
+ try (InputStream inputStream = hiveSite.getFileSystem(hadoopConf).open(hiveSite)) {
+ hiveConf.addResource(inputStream, hiveSite.toString());
+ // trigger a read from the conf so that the input stream is read
+ isEmbeddedMetastore(hiveConf);
+ } catch (IOException e) {
+ throw new CatalogException(
+ "Failed to load hive-site.xml from specified path:" + hiveSite, e);
+ }
+ } else {
+ // user doesn't provide hive conf dir, we try to find it in classpath
+ URL hiveSite =
+ Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE);
+ if (hiveSite != null) {
+ LOG.info("Found {} in classpath: {}", HIVE_SITE_FILE, hiveSite);
+ hiveConf.addResource(hiveSite);
+ }
+ }
+ return hiveConf;
+ }
+
+ /**
+ * Check whether the hive.metastore.uris is empty
+ */
+ public static boolean isEmbeddedMetastore(HiveConf hiveConf) {
+ return isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
new file mode 100644
index 0000000000000..ff80a7004ba31
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -0,0 +1,899 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.catalog;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieCatalogException;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.sync.common.util.ConfigUtils;
+import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
+import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner;
+import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogPropertiesUtil;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP;
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
+import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+import static org.apache.hudi.configuration.FlinkOptions.PATH;
+import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DB;
+import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT;
+import static org.apache.hudi.table.catalog.TableOptionProperties.PK_COLUMNS;
+import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME;
+import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER;
+
+/**
+ * A catalog implementation for Hoodie based on MetaStore.
+ */
+public class HoodieHiveCatalog extends AbstractCatalog {
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieHiveCatalog.class);
+
+ private final HiveConf hiveConf;
+ private IMetaStoreClient client;
+
+ public HoodieHiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir) {
+ this(catalogName, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false);
+ }
+
+ public HoodieHiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, boolean allowEmbedded) {
+ super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase);
+ this.hiveConf = hiveConf;
+ if (!allowEmbedded) {
+ checkArgument(
+ !HoodieCatalogUtil.isEmbeddedMetastore(this.hiveConf),
+ "Embedded metastore is not allowed. Make sure you have set a valid value for "
+ + HiveConf.ConfVars.METASTOREURIS);
+ }
+ LOG.info("Created Hoodie Catalog '{}' in hms mode", catalogName);
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ if (this.client == null) {
+ try {
+ this.client = Hive.get(hiveConf).getMSC();
+ } catch (Exception e) {
+ throw new HoodieCatalogException("Failed to create hive metastore client", e);
+ }
+ LOG.info("Connected to Hive metastore");
+ }
+ if (!databaseExists(getDefaultDatabase())) {
+ LOG.info("{} does not exist, will be created.", getDefaultDatabase());
+ CatalogDatabase database = new CatalogDatabaseImpl(Collections.EMPTY_MAP, "default database");
+ try {
+ createDatabase(getDefaultDatabase(), database, true);
+ } catch (DatabaseAlreadyExistException e) {
+ throw new HoodieCatalogException(getName(), e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (client != null) {
+ client.close();
+ client = null;
+ LOG.info("Disconnect to hive metastore");
+ }
+ }
+
+ public HiveConf getHiveConf() {
+ return hiveConf;
+ }
+
+ // ------ databases ------
+
+ @Override
+ public List listDatabases() throws CatalogException {
+ try {
+ return client.getAllDatabases();
+ } catch (TException e) {
+ throw new HoodieCatalogException(
+ String.format("Failed to list all databases in %s", getName()), e);
+ }
+ }
+
+ private Database getHiveDatabase(String databaseName) throws DatabaseNotExistException {
+ try {
+ return client.getDatabase(databaseName);
+ } catch (NoSuchObjectException e) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ } catch (TException e) {
+ throw new HoodieCatalogException(
+ String.format("Failed to get database %s from %s", databaseName, getName()), e);
+ }
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ Database hiveDatabase = getHiveDatabase(databaseName);
+
+ Map properties = new HashMap<>(hiveDatabase.getParameters());
+
+ properties.put(SqlCreateHiveDatabase.DATABASE_LOCATION_URI, hiveDatabase.getLocationUri());
+
+ return new CatalogDatabaseImpl(properties, hiveDatabase.getDescription());
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException {
+ try {
+ return client.getDatabase(databaseName) != null;
+ } catch (NoSuchObjectException e) {
+ return false;
+ } catch (TException e) {
+ throw new HoodieCatalogException(
+ String.format(
+ "Failed to determine whether database %s exists or not", databaseName),
+ e);
+ }
+ }
+
+ @Override
+ public void createDatabase(
+ String databaseName, CatalogDatabase database, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ checkArgument(
+ !isNullOrWhitespaceOnly(databaseName), "Database name can not null or empty");
+ checkNotNull(database, "database cannot be null");
+
+ Map properties = database.getProperties();
+
+ String dbLocationUri = properties.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI);
+
+ Database hiveDatabase =
+ new Database(databaseName, database.getComment(), dbLocationUri, properties);
+
+ try {
+ client.createDatabase(hiveDatabase);
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new DatabaseAlreadyExistException(getName(), hiveDatabase.getName());
+ }
+ } catch (TException e) {
+ throw new HoodieCatalogException(
+ String.format("Failed to create database %s", hiveDatabase.getName()), e);
+ }
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+ try {
+ client.dropDatabase(name, true, ignoreIfNotExists, cascade);
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), name);
+ }
+ } catch (InvalidOperationException e) {
+ throw new DatabaseNotEmptyException(getName(), name);
+ } catch (TException e) {
+ throw new HoodieCatalogException(String.format("Failed to drop database %s", name), e);
+ }
+ }
+
+ @Override
+ public void alterDatabase(
+ String databaseName, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ checkArgument(
+ !isNullOrWhitespaceOnly(databaseName), "Database name cannot be null or empty");
+ checkNotNull(newDatabase, "New database cannot be null");
+
+ // client.alterDatabase doesn't throw any exception if there is no existing database
+ Database hiveDB;
+ try {
+ hiveDB = getHiveDatabase(databaseName);
+ } catch (DatabaseNotExistException e) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+
+ return;
+ }
+
+ try {
+ client.alterDatabase(databaseName, alterDatabase(hiveDB, newDatabase));
+ } catch (TException e) {
+ throw new HoodieCatalogException(
+ String.format("Failed to alter database %s", databaseName), e);
+ }
+ }
+
+ private static Database alterDatabase(Database hiveDB, CatalogDatabase newDatabase) {
+ Map newParams = newDatabase.getProperties();
+ String opStr = newParams.remove(ALTER_DATABASE_OP);
+ if (opStr == null) {
+ // by default is to alter db properties
+ opStr = SqlAlterHiveDatabase.AlterHiveDatabaseOp.CHANGE_PROPS.name();
+ }
+ String newLocation = newParams.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI);
+ SqlAlterHiveDatabase.AlterHiveDatabaseOp op =
+ SqlAlterHiveDatabase.AlterHiveDatabaseOp.valueOf(opStr);
+ switch (op) {
+ case CHANGE_PROPS:
+ hiveDB.setParameters(newParams);
+ break;
+ case CHANGE_LOCATION:
+ hiveDB.setLocationUri(newLocation);
+ break;
+ case CHANGE_OWNER:
+ String ownerName = newParams.remove(DATABASE_OWNER_NAME);
+ String ownerType = newParams.remove(DATABASE_OWNER_TYPE);
+ hiveDB.setOwnerName(ownerName);
+ switch (ownerType) {
+ case SqlAlterHiveDatabaseOwner.ROLE_OWNER:
+ hiveDB.setOwnerType(PrincipalType.ROLE);
+ break;
+ case SqlAlterHiveDatabaseOwner.USER_OWNER:
+ hiveDB.setOwnerType(PrincipalType.USER);
+ break;
+ default:
+ throw new CatalogException("Unsupported database owner type: " + ownerType);
+ }
+ break;
+ default:
+ throw new CatalogException("Unsupported alter database op:" + opStr);
+ }
+ // is_generic is deprecated, remove it
+ if (hiveDB.getParameters() != null) {
+ hiveDB.getParameters().remove(CatalogPropertiesUtil.IS_GENERIC);
+ }
+ return hiveDB;
+ }
+
+ // ------ tables ------
+
+ private Table isHoodieTable(Table hiveTable) {
+ if (!hiveTable.getParameters().getOrDefault(SPARK_SOURCE_PROVIDER, "").equalsIgnoreCase("hudi")
+ && !isFlinkHoodieTable(hiveTable)) {
+ throw new HoodieCatalogException(String.format("the %s is not hoodie table", hiveTable.getTableName()));
+ }
+ return hiveTable;
+ }
+
+ private boolean isFlinkHoodieTable(Table hiveTable) {
+ return hiveTable.getParameters().getOrDefault(CONNECTOR.key(), "").equalsIgnoreCase("hudi");
+ }
+
+ @VisibleForTesting
+ public Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
+ try {
+ Table hiveTable = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+ return isHoodieTable(hiveTable);
+ } catch (NoSuchObjectException e) {
+ throw new TableNotExistException(getName(), tablePath);
+ } catch (TException e) {
+ throw new HoodieCatalogException(String.format("Failed to get table %s from Hive metastore", tablePath.getObjectName()));
+ }
+ }
+
+ private Table translateSparkTable2Flink(ObjectPath tablePath, Table hiveTable) {
+ if (!isFlinkHoodieTable(hiveTable)) {
+ try {
+ Map parameters = hiveTable.getParameters();
+ parameters.putAll(TableOptionProperties.translateSparkTableProperties2Flink(hiveTable));
+ String path = hiveTable.getSd().getLocation();
+ parameters.put(PATH.key(), path);
+ if (!parameters.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
+ Path hoodieTablePath = new Path(path);
+ boolean hiveStyle = Arrays.stream(FSUtils.getFs(hoodieTablePath, hiveConf).listStatus(hoodieTablePath))
+ .map(fileStatus -> fileStatus.getPath().getName())
+ .filter(f -> !f.equals(".hoodie") && !f.equals("default"))
+ .anyMatch(FilePathUtils::isHiveStylePartitioning);
+ parameters.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), String.valueOf(hiveStyle));
+ }
+ client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
+ } catch (Exception e) {
+ throw new HoodieCatalogException("Failed to update table schema", e);
+ }
+ }
+ return hiveTable;
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+ Table hiveTable = getHiveTable(tablePath);
+ hiveTable = translateSparkTable2Flink(tablePath, hiveTable);
+ String path = hiveTable.getSd().getLocation();
+ Map parameters = hiveTable.getParameters();
+ Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, hiveConf);
+ org.apache.flink.table.api.Schema schema;
+ if (latestTableSchema != null) {
+ org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder()
+ .fromRowDataType(AvroSchemaConverter.convertToDataType(latestTableSchema));
+ String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME);
+ if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
+ builder.primaryKeyNamed(pkConstraintName, StringUtils.split(parameters.get(PK_COLUMNS), ","));
+ }
+ schema = builder.build();
+ } else {
+ LOG.warn("{} does not have any hoodie schema, and use hive table schema to infer the table schema", tablePath);
+ schema = HiveSchemaUtils.convertTableSchema(hiveTable);
+ }
+ return CatalogTable.of(schema, parameters.get(COMMENT),
+ HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), parameters);
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkNotNull(table, "Table cannot be null");
+
+ if (!databaseExists(tablePath.getDatabaseName())) {
+ throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
+ }
+
+ if (!table.getOptions().getOrDefault(CONNECTOR.key(), "").equalsIgnoreCase("hudi")) {
+ throw new HoodieCatalogException(String.format("The %s is not hoodie table", tablePath.getObjectName()));
+ }
+
+ if (table instanceof CatalogView) {
+ throw new HoodieCatalogException("CREATE VIEW is not supported.");
+ }
+
+ try {
+ boolean isMorTable = OptionsResolver.isMorTable(table.getOptions());
+ Table hiveTable = instantiateHiveTable(tablePath, table, inferTablePath(tablePath, table), isMorTable);
+ //create hive table
+ client.createTable(hiveTable);
+ //init hoodie metaClient
+ initTableIfNotExists(tablePath, (CatalogTable)table);
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
+ } catch (Exception e) {
+ throw new HoodieCatalogException(
+ String.format("Failed to create table %s", tablePath.getFullName()), e);
+ }
+ }
+
+ private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTable) {
+ Configuration flinkConf = Configuration.fromMap(applyOptionsHook(catalogTable.getOptions()));
+ final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType()).toString();
+ flinkConf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
+
+ // stores two copies of options:
+ // - partition keys
+ // - primary keys
+ // because the HoodieTableMetaClient is a heavy impl, we try to avoid initializing it
+ // when calling #getTable.
+
+ if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()) {
+ final String pkColumns = String.join(",", catalogTable.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
+ String recordKey = flinkConf.get(FlinkOptions.RECORD_KEY_FIELD);
+ if (!Objects.equals(pkColumns, recordKey)) {
+ throw new HoodieCatalogException(String.format("%s and %s are the different", pkColumns, recordKey));
+ }
+ }
+
+ if (catalogTable.isPartitioned()) {
+ final String partitions = String.join(",", catalogTable.getPartitionKeys());
+ flinkConf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
+ }
+
+ if (!flinkConf.getOptional(PATH).isPresent()) {
+ flinkConf.setString(PATH, inferTablePath(tablePath, catalogTable));
+ }
+
+ flinkConf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName());
+ try {
+ StreamerUtil.initTableIfNotExists(flinkConf);
+ } catch (IOException e) {
+ throw new HoodieCatalogException("Initialize table exception.", e);
+ }
+ }
+
+ private String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
+ String location = table.getOptions().getOrDefault(PATH.key(), "");
+ if (StringUtils.isNullOrEmpty(location)) {
+ try {
+ Path dbLocation = new Path(client.getDatabase(tablePath.getDatabaseName()).getLocationUri());
+ location = new Path(dbLocation, tablePath.getObjectName()).toString();
+ } catch (TException e) {
+ throw new HoodieCatalogException(String.format("Failed to infer hoodie table path for table %s", tablePath), e);
+ }
+ }
+ return location;
+ }
+
+ private Map applyOptionsHook(Map options) {
+ Map properties = new HashMap<>(options);
+ if (!options.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
+ properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
+ }
+ if (!options.containsKey(FlinkOptions.PRECOMBINE_FIELD.key())) {
+ properties.put(FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.PRECOMBINE_FIELD.defaultValue());
+ }
+ if (!options.containsKey(FlinkOptions.TABLE_TYPE.key())) {
+ properties.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE.defaultValue());
+ }
+ return properties;
+ }
+
+ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, String location, boolean useRealTimeInputFormat) throws IOException {
+ // let Hive set default parameters for us, e.g. serialization.format
+ Table hiveTable =
+ org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(
+ tablePath.getDatabaseName(), tablePath.getObjectName());
+
+ hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName());
+ hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
+
+ Map properties = applyOptionsHook(table.getOptions());
+
+ if (Boolean.parseBoolean(table.getOptions().get(CatalogOptions.TABLE_EXTERNAL.key()))) {
+ hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString());
+ properties.put("EXTERNAL", "TRUE");
+ }
+
+ // Table comment
+ if (table.getComment() != null) {
+ properties.put(COMMENT, table.getComment());
+ }
+
+ //set pk
+ if (table.getUnresolvedSchema().getPrimaryKey().isPresent()) {
+ String pkColumns = String.join(",", table.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
+ String recordKey = properties.getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
+ if (!Objects.equals(pkColumns, recordKey)) {
+ throw new HoodieCatalogException(
+ String.format("Primary key [%s] and record key [%s] should be the the same.",
+ pkColumns,
+ recordKey));
+ }
+ properties.put(PK_CONSTRAINT_NAME, table.getUnresolvedSchema().getPrimaryKey().get().getConstraintName());
+ properties.put(PK_COLUMNS, pkColumns);
+ }
+
+ if (!properties.containsKey(FlinkOptions.PATH.key())) {
+ properties.put(FlinkOptions.PATH.key(), location);
+ }
+
+ //set sd
+ StorageDescriptor sd = new StorageDescriptor();
+ List allColumns = HiveSchemaUtils.createHiveColumns(table.getSchema());
+
+ // Table columns and partition keys
+ if (table instanceof CatalogTable) {
+ CatalogTable catalogTable = (CatalogTable) table;
+
+ if (catalogTable.isPartitioned()) {
+ int partitionKeySize = catalogTable.getPartitionKeys().size();
+ List regularColumns =
+ allColumns.subList(0, allColumns.size() - partitionKeySize);
+ List partitionColumns =
+ allColumns.subList(
+ allColumns.size() - partitionKeySize, allColumns.size());
+
+ sd.setCols(regularColumns);
+ hiveTable.setPartitionKeys(partitionColumns);
+ } else {
+ sd.setCols(allColumns);
+ hiveTable.setPartitionKeys(new ArrayList<>());
+ }
+ } else {
+ sd.setCols(allColumns);
+ }
+
+ HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
+ //ignore uber input Format
+ String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat);
+ String outputFormatClassName = HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat);
+ String serDeClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
+ sd.setInputFormat(inputFormatClassName);
+ sd.setOutputFormat(outputFormatClassName);
+ Map serdeProperties = new HashMap<>();
+ serdeProperties.put("path", location);
+ serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(!useRealTimeInputFormat));
+ serdeProperties.put("serialization.format", "1");
+
+ serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark((CatalogTable)table, hiveConf, properties));
+
+ sd.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties));
+
+ sd.setLocation(location);
+ hiveTable.setSd(sd);
+
+ hiveTable.setParameters(properties);
+ return hiveTable;
+ }
+
+ @Override
+ public List listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ checkArgument(
+ !isNullOrWhitespaceOnly(databaseName), "Database name cannot be null or empty");
+
+ try {
+ return client.getAllTables(databaseName);
+ } catch (UnknownDBException e) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ } catch (TException e) {
+ throw new HoodieCatalogException(
+ String.format("Failed to list tables in database %s", databaseName), e);
+ }
+ }
+
+ @Override
+ public List listViews(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new HoodieCatalogException("Hoodie catalog does not support to listViews");
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+
+ try {
+ return client.tableExists(tablePath.getDatabaseName(), tablePath.getObjectName());
+ } catch (UnknownDBException e) {
+ return false;
+ } catch (TException e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to check whether table %s exists or not.",
+ tablePath.getFullName()),
+ e);
+ }
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+
+ try {
+ client.dropTable(
+ tablePath.getDatabaseName(),
+ tablePath.getObjectName(),
+ // Indicate whether associated data should be deleted.
+ // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can
+ // be changed later if necessary
+ true,
+ ignoreIfNotExists);
+ } catch (NoSuchObjectException e) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+ } catch (TException e) {
+ throw new HoodieCatalogException(
+ String.format("Failed to drop table %s", tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException, CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkArgument(
+ !isNullOrWhitespaceOnly(newTableName), "New table name cannot be null or empty");
+
+ try {
+ // alter_table() doesn't throw a clear exception when target table doesn't exist.
+ // Thus, check the table existence explicitly
+ if (tableExists(tablePath)) {
+ ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
+ // alter_table() doesn't throw a clear exception when new table already exists.
+ // Thus, check the table existence explicitly
+ if (tableExists(newPath)) {
+ throw new TableAlreadyExistException(getName(), newPath);
+ } else {
+ Table hiveTable = getHiveTable(tablePath);
+
+ //update hoodie
+ StorageDescriptor sd = hiveTable.getSd();
+ String location = sd.getLocation();
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(location).setConf(hiveConf).build();
+ //Init table with new name
+ HoodieTableMetaClient.withPropertyBuilder().fromProperties(metaClient.getTableConfig().getProps())
+ .setTableName(newTableName)
+ .initTable(hiveConf, location);
+
+ hiveTable.setTableName(newTableName);
+ client.alter_table(
+ tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
+ }
+ } else if (!ignoreIfNotExists) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+ } catch (Exception e) {
+ throw new HoodieCatalogException(
+ String.format("Failed to rename table %s", tablePath.getFullName()), e);
+ }
+ }
+
+ private boolean sameOptions(Map existingOptions, Map newOptions, ConfigOption option) {
+ return existingOptions.getOrDefault(option.key(), String.valueOf(option.defaultValue()))
+ .equalsIgnoreCase(newOptions.getOrDefault(option.key(), String.valueOf(option.defaultValue())));
+ }
+
+ @Override
+ public void alterTable(
+ ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+ checkNotNull(newCatalogTable, "New catalog table cannot be null");
+
+ if (!newCatalogTable.getOptions().getOrDefault(CONNECTOR.key(), "").equalsIgnoreCase("hudi")) {
+ throw new HoodieCatalogException(String.format("The %s is not hoodie table", tablePath.getObjectName()));
+ }
+ if (newCatalogTable instanceof CatalogView) {
+ throw new HoodieCatalogException("Hoodie catalog does not support to ALTER VIEW");
+ }
+
+ try {
+ Table hiveTable = getHiveTable(tablePath);
+ if (!sameOptions(hiveTable.getParameters(), newCatalogTable.getOptions(), FlinkOptions.TABLE_TYPE)
+ || !sameOptions(hiveTable.getParameters(), newCatalogTable.getOptions(), FlinkOptions.INDEX_TYPE)) {
+ throw new HoodieCatalogException("Hoodie catalog does not support to alter table type and index type");
+ }
+ } catch (TableNotExistException e) {
+ if (!ignoreIfNotExists) {
+ throw e;
+ }
+ return;
+ }
+
+ try {
+ boolean isMorTable = OptionsResolver.isMorTable(newCatalogTable.getOptions());
+ Table hiveTable = instantiateHiveTable(tablePath, newCatalogTable, inferTablePath(tablePath, newCatalogTable), isMorTable);
+ //alter hive table
+ client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
+ } catch (Exception e) {
+ LOG.error("Failed to alter table {}", tablePath.getObjectName(), e);
+ throw new HoodieCatalogException(String.format("Failed to alter table %s", tablePath.getObjectName()), e);
+ }
+ }
+
+ @Override
+ public List listPartitions(ObjectPath tablePath)
+ throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+
+ @Override
+ public List listPartitions(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, TableNotPartitionedException,
+ PartitionSpecInvalidException, CatalogException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+
+ @Override
+ public List listPartitionsByFilter(
+ ObjectPath tablePath, List expressions)
+ throws TableNotExistException, TableNotPartitionedException, CatalogException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws CatalogException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+
+ @Override
+ public void createPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition partition,
+ boolean ignoreIfExists)
+ throws TableNotExistException, TableNotPartitionedException,
+ PartitionSpecInvalidException, PartitionAlreadyExistsException,
+ CatalogException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+
+ @Override
+ public void dropPartition(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+
+ @Override
+ public void alterPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition newPartition,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+
+ @Override
+ public List listFunctions(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath)
+ throws FunctionNotExistException, CatalogException {
+ throw new FunctionNotExistException(getName(), functionPath);
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) throws CatalogException {
+ return false;
+ }
+
+ @Override
+ public void createFunction(
+ ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
+ throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+
+ @Override
+ public void alterFunction(
+ ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public void alterTableStatistics(
+ ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+
+ @Override
+ public void alterTableColumnStatistics(
+ ObjectPath tablePath,
+ CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException, TablePartitionedException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+
+ @Override
+ public void alterPartitionStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogTableStatistics partitionStatistics,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new HoodieCatalogException("Not supported.");
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
index ba6ca4efd726c..9477cd6dafc5f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
@@ -19,13 +19,22 @@
package org.apache.hudi.table.catalog;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.avro.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +48,7 @@
import java.util.Properties;
import java.util.stream.Collectors;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
/**
@@ -47,6 +57,12 @@
public class TableOptionProperties {
private static final Logger LOG = LoggerFactory.getLogger(TableOptionProperties.class);
+ public static final String SPARK_SOURCE_PROVIDER = "spark.sql.sources.provider";
+ public static final String SPARK_VERSION = "spark.version";
+ public static final String DEFAULT_SPARK_VERSION = "spark2.4.4";
+ static final Map VALUE_MAPPING = new HashMap<>();
+ static final Map KEY_MAPPING = new HashMap<>();
+
private static final String FILE_NAME = "table_option.properties";
public static final String PK_CONSTRAINT_NAME = "pk.constraint.name";
@@ -56,6 +72,25 @@ public class TableOptionProperties {
public static final List NON_OPTION_KEYS = Arrays.asList(PK_CONSTRAINT_NAME, PK_COLUMNS, COMMENT, PARTITION_COLUMNS);
+ static {
+ VALUE_MAPPING.put("mor", HoodieTableType.MERGE_ON_READ.name());
+ VALUE_MAPPING.put("cow", HoodieTableType.COPY_ON_WRITE.name());
+
+ VALUE_MAPPING.put(HoodieTableType.MERGE_ON_READ.name(), "mor");
+ VALUE_MAPPING.put(HoodieTableType.COPY_ON_WRITE.name(), "cow");
+
+ KEY_MAPPING.put("type", FlinkOptions.TABLE_TYPE.key());
+ KEY_MAPPING.put("primaryKey", FlinkOptions.RECORD_KEY_FIELD.key());
+ KEY_MAPPING.put("preCombineField", FlinkOptions.PRECOMBINE_FIELD.key());
+ KEY_MAPPING.put("payloadClass", FlinkOptions.PAYLOAD_CLASS_NAME.key());
+ KEY_MAPPING.put(SPARK_SOURCE_PROVIDER, CONNECTOR.key());
+ KEY_MAPPING.put(FlinkOptions.KEYGEN_CLASS_NAME.key(), FlinkOptions.KEYGEN_CLASS_NAME.key());
+ KEY_MAPPING.put(FlinkOptions.TABLE_TYPE.key(), "type");
+ KEY_MAPPING.put(FlinkOptions.RECORD_KEY_FIELD.key(), "primaryKey");
+ KEY_MAPPING.put(FlinkOptions.PRECOMBINE_FIELD.key(), "preCombineField");
+ KEY_MAPPING.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), "payloadClass");
+ }
+
/**
* Initialize the {@link #FILE_NAME} meta file.
*/
@@ -128,4 +163,33 @@ public static Map getTableOptions(Map options) {
NON_OPTION_KEYS.forEach(copied::remove);
return copied;
}
+
+ public static Map translateFlinkTableProperties2Spark(CatalogTable catalogTable, Configuration hadoopConf, Map properties) {
+ Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType());
+ MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
+ String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
+ Map sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(
+ catalogTable.getPartitionKeys(),
+ sparkVersion,
+ 4000,
+ messageType);
+ properties.putAll(sparkTableProperties);
+ return properties.entrySet().stream()
+ .filter(e -> KEY_MAPPING.containsKey(e.getKey()) && !catalogTable.getOptions().containsKey(KEY_MAPPING.get(e.getKey())))
+ .collect(Collectors.toMap(e -> KEY_MAPPING.get(e.getKey()),
+ e -> e.getKey().equalsIgnoreCase(FlinkOptions.TABLE_TYPE.key()) ? VALUE_MAPPING.get(e.getValue()) : e.getValue()));
+ }
+
+ public static Map translateSparkTableProperties2Flink(Map options) {
+ if (options.containsKey(CONNECTOR.key())) {
+ return options;
+ }
+ return options.entrySet().stream().filter(e -> KEY_MAPPING.containsKey(e.getKey()))
+ .collect(Collectors.toMap(e -> KEY_MAPPING.get(e.getKey()),
+ e -> e.getKey().equalsIgnoreCase("type") ? VALUE_MAPPING.get(e.getValue()) : e.getValue()));
+ }
+
+ public static Map translateSparkTableProperties2Flink(Table hiveTable) {
+ return translateSparkTableProperties2Flink(hiveTable.getParameters());
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java
new file mode 100644
index 0000000000000..d6cfe3ed723a7
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.catalog;
+
+import org.apache.hudi.exception.HoodieCatalogException;
+
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Create a TypeInfoLogicalTypeVisitor for hoodie table.
+ */
+public class TypeInfoLogicalTypeVisitor extends LogicalTypeDefaultVisitor {
+ private final LogicalType type;
+ // whether to check type precision
+ private final boolean checkPrecision;
+
+ TypeInfoLogicalTypeVisitor(DataType dataType, boolean checkPrecision) {
+ this(dataType.getLogicalType(), checkPrecision);
+ }
+
+ TypeInfoLogicalTypeVisitor(LogicalType type, boolean checkPrecision) {
+ this.type = type;
+ this.checkPrecision = checkPrecision;
+ }
+
+ @Override
+ public TypeInfo visit(CharType charType) {
+ // Flink and Hive have different length limit for CHAR. Promote it to STRING if it
+ // exceeds the limits of
+ // Hive and we're told not to check precision. This can be useful when calling Hive UDF
+ // to process data.
+ if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH || charType.getLength() < 1) {
+ if (checkPrecision) {
+ throw new HoodieCatalogException(
+ String.format(
+ "HiveCatalog doesn't support char type with length of '%d'. "
+ + "The supported length is [%d, %d]",
+ charType.getLength(), 1, HiveChar.MAX_CHAR_LENGTH));
+ } else {
+ return TypeInfoFactory.stringTypeInfo;
+ }
+ }
+ return TypeInfoFactory.getCharTypeInfo(charType.getLength());
+ }
+
+ @Override
+ public TypeInfo visit(VarCharType varCharType) {
+ // Flink's StringType is defined as VARCHAR(Integer.MAX_VALUE)
+ // We don't have more information in LogicalTypeRoot to distinguish StringType and a
+ // VARCHAR(Integer.MAX_VALUE) instance
+ // Thus always treat VARCHAR(Integer.MAX_VALUE) as StringType
+ if (varCharType.getLength() == Integer.MAX_VALUE) {
+ return TypeInfoFactory.stringTypeInfo;
+ }
+ // Flink and Hive have different length limit for VARCHAR. Promote it to STRING if it
+ // exceeds the limits of
+ // Hive and we're told not to check precision. This can be useful when calling Hive UDF
+ // to process data.
+ if (varCharType.getLength() > HiveVarchar.MAX_VARCHAR_LENGTH
+ || varCharType.getLength() < 1) {
+ if (checkPrecision) {
+ throw new HoodieCatalogException(
+ String.format(
+ "HiveCatalog doesn't support varchar type with length of '%d'. "
+ + "The supported length is [%d, %d]",
+ varCharType.getLength(), 1, HiveVarchar.MAX_VARCHAR_LENGTH));
+ } else {
+ return TypeInfoFactory.stringTypeInfo;
+ }
+ }
+ return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
+ }
+
+ @Override
+ public TypeInfo visit(BooleanType booleanType) {
+ return TypeInfoFactory.booleanTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(VarBinaryType varBinaryType) {
+ // Flink's BytesType is defined as VARBINARY(Integer.MAX_VALUE)
+ // We don't have more information in LogicalTypeRoot to distinguish BytesType and a
+ // VARBINARY(Integer.MAX_VALUE) instance
+ // Thus always treat VARBINARY(Integer.MAX_VALUE) as BytesType
+ if (varBinaryType.getLength() == VarBinaryType.MAX_LENGTH) {
+ return TypeInfoFactory.binaryTypeInfo;
+ }
+ return defaultMethod(varBinaryType);
+ }
+
+ @Override
+ public TypeInfo visit(DecimalType decimalType) {
+ // Flink and Hive share the same precision and scale range
+ // Flink already validates the type so we don't need to validate again here
+ return TypeInfoFactory.getDecimalTypeInfo(
+ decimalType.getPrecision(), decimalType.getScale());
+ }
+
+ @Override
+ public TypeInfo visit(TinyIntType tinyIntType) {
+ return TypeInfoFactory.byteTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(SmallIntType smallIntType) {
+ return TypeInfoFactory.shortTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(IntType intType) {
+ return TypeInfoFactory.intTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(BigIntType bigIntType) {
+ return TypeInfoFactory.longTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(FloatType floatType) {
+ return TypeInfoFactory.floatTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(DoubleType doubleType) {
+ return TypeInfoFactory.doubleTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(DateType dateType) {
+ return TypeInfoFactory.dateTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(TimestampType timestampType) {
+ if (checkPrecision && timestampType.getPrecision() == 9) {
+ throw new HoodieCatalogException(
+ "HoodieCatalog currently does not support timestamp of precision 9");
+ }
+ return TypeInfoFactory.timestampTypeInfo;
+ }
+
+ @Override
+ public TypeInfo visit(ArrayType arrayType) {
+ LogicalType elementType = arrayType.getElementType();
+ TypeInfo elementTypeInfo = elementType.accept(this);
+ if (null != elementTypeInfo) {
+ return TypeInfoFactory.getListTypeInfo(elementTypeInfo);
+ } else {
+ return defaultMethod(arrayType);
+ }
+ }
+
+ @Override
+ public TypeInfo visit(MapType mapType) {
+ LogicalType keyType = mapType.getKeyType();
+ LogicalType valueType = mapType.getValueType();
+ TypeInfo keyTypeInfo = keyType.accept(this);
+ TypeInfo valueTypeInfo = valueType.accept(this);
+ if (null == keyTypeInfo || null == valueTypeInfo) {
+ return defaultMethod(mapType);
+ } else {
+ return TypeInfoFactory.getMapTypeInfo(keyTypeInfo, valueTypeInfo);
+ }
+ }
+
+ @Override
+ public TypeInfo visit(RowType rowType) {
+ List names = rowType.getFieldNames();
+ List typeInfos = new ArrayList<>(names.size());
+ for (String name : names) {
+ TypeInfo typeInfo = rowType.getTypeAt(rowType.getFieldIndex(name)).accept(this);
+ if (null != typeInfo) {
+ typeInfos.add(typeInfo);
+ } else {
+ return defaultMethod(rowType);
+ }
+ }
+ return TypeInfoFactory.getStructTypeInfo(names, typeInfos);
+ }
+
+ @Override
+ public TypeInfo visit(NullType nullType) {
+ return TypeInfoFactory.voidTypeInfo;
+ }
+
+ @Override
+ protected TypeInfo defaultMethod(LogicalType logicalType) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Flink doesn't support converting type %s to Hive type yet.",
+ type.toString()));
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
index d03c5aac272e8..0da1aca0e243c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
@@ -442,4 +442,8 @@ public static String[] extractHivePartitionFields(org.apache.flink.configuration
}
return conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS).split(",");
}
+
+ public static boolean isHiveStylePartitioning(String path) {
+ return HIVE_PARTITION_NAME_PATTERN.matcher(path).matches();
+ }
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index a8c5bd51ce931..d817f537cbddd 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -37,6 +37,7 @@
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
@@ -515,6 +516,20 @@ public static Schema getTableAvroSchema(HoodieTableMetaClient metaClient, boolea
return schemaUtil.getTableAvroSchema(includeMetadataFields);
}
+ public static Schema getLatestTableSchema(String path, org.apache.hadoop.conf.Configuration hadoopConf) {
+ if (StringUtils.isNullOrEmpty(path) || !StreamerUtil.tableExists(path, hadoopConf)) {
+ return null;
+ }
+
+ try {
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(path, hadoopConf);
+ return getTableAvroSchema(metaClient, false);
+ } catch (Exception e) {
+ LOG.warn("Error while resolving the latest table schema", e);
+ }
+ return null;
+ }
+
public static boolean fileExists(FileSystem fs, Path path) {
try {
return fs.exists(path);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index e8794b0d3b759..9fd7e2f912b70 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -23,6 +23,8 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.table.catalog.HoodieHiveCatalog;
+import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -1321,6 +1323,46 @@ void testWriteAndReadWithDataSkipping() {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}
+ @Test
+ void testBuiltinFunctionWithHMSCatalog() {
+ TableEnvironment tableEnv = batchTableEnv;
+
+ HoodieHiveCatalog hoodieCatalog = HoodieCatalogTestUtils.createHiveCatalog("hudi_catalog");
+
+ tableEnv.registerCatalog("hudi_catalog", hoodieCatalog);
+ tableEnv.executeSql("use catalog hudi_catalog");
+
+ String dbName = "hudi";
+ tableEnv.executeSql("create database " + dbName);
+ tableEnv.executeSql("use " + dbName);
+
+ String hoodieTableDDL = sql("t1")
+ .field("f_int int")
+ .field("f_date DATE")
+ .field("f_par string")
+ .pkField("f_int")
+ .partitionField("f_par")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/" + dbName + "/" + "t1")
+ .option(FlinkOptions.RECORD_KEY_FIELD, "f_int")
+ .option(FlinkOptions.PRECOMBINE_FIELD, "f_date")
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ String insertSql = "insert into t1 values (1, TO_DATE('2022-02-02'), '1'), (2, DATE '2022-02-02', '1')";
+ execInsertSql(tableEnv, insertSql);
+
+ List result = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ final String expected = "["
+ + "+I[1, 2022-02-02, 1], "
+ + "+I[2, 2022-02-02, 1]]";
+ assertRowsEquals(result, expected);
+
+ List partitionResult = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1 where f_int = 1").execute().collect());
+ assertRowsEquals(partitionResult, "[+I[1, 2022-02-02, 1]]");
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java
new file mode 100644
index 0000000000000..6a077ec7c46b1
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/HoodieCatalogTestUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.catalog;
+
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.io.IOException;
+
+/** Test utils for Hoodie catalog. */
+public class HoodieCatalogTestUtils {
+ private static final String HIVE_WAREHOUSE_URI_FORMAT =
+ "jdbc:derby:;databaseName=%s;create=true";
+
+ private static final String TEST_CATALOG_NAME = "test_catalog";
+
+ private static final org.junit.rules.TemporaryFolder TEMPORARY_FOLDER = new org.junit.rules.TemporaryFolder();
+
+ /** Create a HiveCatalog with an embedded Hive Metastore. */
+ public static HoodieHiveCatalog createHiveCatalog() {
+ return createHiveCatalog(TEST_CATALOG_NAME);
+ }
+
+ public static HoodieHiveCatalog createHiveCatalog(String name) {
+ return new HoodieHiveCatalog(
+ name,
+ null,
+ createHiveConf(),
+ true);
+ }
+
+ public static HiveConf createHiveConf() {
+ ClassLoader classLoader = HoodieCatalogTestUtils.class.getClassLoader();
+ try {
+ TEMPORARY_FOLDER.create();
+ String warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db";
+ String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir);
+
+ HiveConf.setHiveSiteLocation(classLoader.getResource(CatalogOptions.HIVE_SITE_FILE));
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.setVar(
+ HiveConf.ConfVars.METASTOREWAREHOUSE,
+ TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());
+ hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri);
+ return hiveConf;
+ } catch (IOException e) {
+ throw new CatalogException("Failed to create test HiveConf to HiveCatalog.", e);
+ }
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java
new file mode 100644
index 0000000000000..aa4df11ad29a1
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalogFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.catalog;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
+import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test cases for {@link HoodieCatalogFactory}.
+ */
+public class TestHoodieCatalogFactory {
+ private static final URL CONF_DIR =
+ Thread.currentThread().getContextClassLoader().getResource("test-catalog-factory-conf");
+
+ @TempDir
+ File tempFile;
+
+ @Test
+ public void testCreateHMSCatalog() {
+ final String catalogName = "mycatalog";
+
+ final HoodieHiveCatalog expectedCatalog = HoodieCatalogTestUtils.createHiveCatalog(catalogName);
+
+ final Map options = new HashMap<>();
+ options.put(CommonCatalogOptions.CATALOG_TYPE.key(), HoodieCatalogFactory.IDENTIFIER);
+ options.put(CatalogOptions.HIVE_CONF_DIR.key(), CONF_DIR.getPath());
+ options.put(CatalogOptions.MODE.key(), "hms");
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ catalogName, options, null, Thread.currentThread().getContextClassLoader());
+
+ assertEquals(
+ ((HoodieHiveCatalog) actualCatalog)
+ .getHiveConf()
+ .getVar(HiveConf.ConfVars.METASTOREURIS), "dummy-hms");
+ checkEquals(expectedCatalog, (HoodieHiveCatalog) actualCatalog);
+ }
+
+ @Test
+ public void testCreateDFSCatalog() {
+ final String catalogName = "mycatalog";
+
+ Map catalogOptions = new HashMap<>();
+ catalogOptions.put(CATALOG_PATH.key(), tempFile.getAbsolutePath());
+ catalogOptions.put(DEFAULT_DATABASE.key(), "test_db");
+ HoodieCatalog expectedCatalog = new HoodieCatalog(catalogName, Configuration.fromMap(catalogOptions));
+
+ final Map options = new HashMap<>();
+ options.put(CommonCatalogOptions.CATALOG_TYPE.key(), HoodieCatalogFactory.IDENTIFIER);
+ options.put(CATALOG_PATH.key(), tempFile.getAbsolutePath());
+ options.put(DEFAULT_DATABASE.key(), "test_db");
+ options.put(CatalogOptions.MODE.key(), "dfs");
+
+ final Catalog actualCatalog =
+ FactoryUtil.createCatalog(
+ catalogName, options, null, Thread.currentThread().getContextClassLoader());
+
+ checkEquals(expectedCatalog, (AbstractCatalog)actualCatalog);
+ }
+
+ private static void checkEquals(AbstractCatalog c1, AbstractCatalog c2) {
+ // Only assert a few selected properties for now
+ assertEquals(c2.getName(), c1.getName());
+ assertEquals(c2.getDefaultDatabase(), c1.getDefaultDatabase());
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
new file mode 100644
index 0000000000000..4d9b4e051888e
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.catalog;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieCatalogException;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hudi.util.StreamerUtil;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieHiveCatalog}.
+ */
+public class TestHoodieHiveCatalog {
+ TableSchema schema =
+ TableSchema.builder()
+ .field("uuid", DataTypes.INT().notNull())
+ .field("name", DataTypes.STRING())
+ .field("age", DataTypes.INT())
+ .field("ts", DataTypes.BIGINT())
+ .field("par1", DataTypes.STRING())
+ .primaryKey("uuid")
+ .build();
+ List partitions = Collections.singletonList("par1");
+ private static HoodieHiveCatalog hoodieCatalog;
+ private final ObjectPath tablePath = new ObjectPath("default", "test");
+
+ @BeforeAll
+ public static void createCatalog() {
+ hoodieCatalog = HoodieCatalogTestUtils.createHiveCatalog();
+ hoodieCatalog.open();
+ }
+
+ @AfterEach
+ public void dropTable() throws TableNotExistException {
+ hoodieCatalog.dropTable(tablePath, true);
+ }
+
+ @AfterAll
+ public static void closeCatalog() {
+ if (hoodieCatalog != null) {
+ hoodieCatalog.close();
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Exception {
+ Map originOptions = new HashMap<>();
+ originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
+ originOptions.put(FlinkOptions.TABLE_TYPE.key(), tableType.toString());
+
+ CatalogTable table =
+ new CatalogTableImpl(schema, partitions, originOptions, "hudi table");
+ hoodieCatalog.createTable(tablePath, table, false);
+
+ CatalogBaseTable table1 = hoodieCatalog.getTable(tablePath);
+ assertEquals(table1.getOptions().get(CONNECTOR.key()), "hudi");
+ assertEquals(table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()), tableType.toString());
+ assertEquals(table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()), "uuid");
+ assertEquals(table1.getOptions().get(FlinkOptions.PRECOMBINE_FIELD.key()), "ts");
+ assertEquals(table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames(), Collections.singletonList("uuid"));
+ assertEquals(((CatalogTable)table1).getPartitionKeys(), Collections.singletonList("par1"));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testCreateExternalTable(boolean isExternal) throws TableAlreadyExistException, DatabaseNotExistException, TableNotExistException, IOException {
+ Map originOptions = new HashMap<>();
+ originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
+ originOptions.put(CatalogOptions.TABLE_EXTERNAL.key(), String.valueOf(isExternal));
+ CatalogTable table =
+ new CatalogTableImpl(schema, originOptions, "hudi table");
+ hoodieCatalog.createTable(tablePath, table, false);
+ Table table1 = hoodieCatalog.getHiveTable(tablePath);
+ if (isExternal) {
+ assertTrue(Boolean.parseBoolean(table1.getParameters().get(CatalogOptions.TABLE_EXTERNAL.key())));
+ assertEquals("EXTERNAL_TABLE", table1.getTableType());
+ } else {
+ assertFalse(Boolean.parseBoolean(table1.getParameters().get(CatalogOptions.TABLE_EXTERNAL.key())));
+ assertEquals("MANAGED_TABLE", table1.getTableType());
+ }
+
+ hoodieCatalog.dropTable(tablePath, false);
+ Path path = new Path(table1.getParameters().get(FlinkOptions.PATH.key()));
+ boolean exists = StreamerUtil.fileExists(FileSystem.getLocal(new Configuration()), path);
+ assertTrue(isExternal && exists || !isExternal && !exists);
+ }
+
+ @Test
+ public void testCreateNonHoodieTable() throws TableAlreadyExistException, DatabaseNotExistException {
+ CatalogTable table =
+ new CatalogTableImpl(schema, Collections.emptyMap(), "hudi table");
+ try {
+ hoodieCatalog.createTable(tablePath, table, false);
+ } catch (HoodieCatalogException e) {
+ assertEquals(String.format("The %s is not hoodie table", tablePath.getObjectName()), e.getMessage());
+ }
+ }
+
+ @Test
+ public void testAlterTable() throws Exception {
+ Map originOptions = new HashMap<>();
+ originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
+ CatalogTable originTable =
+ new CatalogTableImpl(schema, partitions, originOptions, "hudi table");
+ hoodieCatalog.createTable(tablePath, originTable, false);
+
+ Table hiveTable = hoodieCatalog.getHiveTable(tablePath);
+ Map newOptions = hiveTable.getParameters();
+ newOptions.put("k", "v");
+ CatalogTable newTable = new CatalogTableImpl(schema, partitions, newOptions, "alter hudi table");
+ hoodieCatalog.alterTable(tablePath, newTable, false);
+
+ hiveTable = hoodieCatalog.getHiveTable(tablePath);
+ assertEquals(hiveTable.getParameters().get(CONNECTOR.key()), "hudi");
+ assertEquals(hiveTable.getParameters().get("k"), "v");
+ }
+
+ @Test
+ public void testRenameTable() throws Exception {
+ Map originOptions = new HashMap<>();
+ originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
+ CatalogTable originTable =
+ new CatalogTableImpl(schema, partitions, originOptions, "hudi table");
+ hoodieCatalog.createTable(tablePath, originTable, false);
+
+ hoodieCatalog.renameTable(tablePath, "test1", false);
+
+ assertEquals(hoodieCatalog.getHiveTable(new ObjectPath("default", "test1")).getTableName(), "test1");
+
+ hoodieCatalog.renameTable(new ObjectPath("default", "test1"), "test", false);
+ }
+}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/resources/hive-site.xml b/hudi-flink-datasource/hudi-flink/src/test/resources/hive-site.xml
new file mode 100644
index 0000000000000..073f7f865d2cd
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/resources/hive-site.xml
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+ hive.metastore.schema.verification
+ false
+
+
+
+ datanucleus.schema.autoCreateTables
+ true
+
+
diff --git a/hudi-flink-datasource/hudi-flink/src/test/resources/test-catalog-factory-conf/hive-site.xml b/hudi-flink-datasource/hudi-flink/src/test/resources/test-catalog-factory-conf/hive-site.xml
new file mode 100644
index 0000000000000..08bd2ec210b3f
--- /dev/null
+++ b/hudi-flink-datasource/hudi-flink/src/test/resources/test-catalog-factory-conf/hive-site.xml
@@ -0,0 +1,26 @@
+
+
+
+
+
+
+
+ hive.metastore.uris
+ dummy-hms
+
+