Skip to content

Commit 8a62070

Browse files
committed
[HUDI-4098]Support HMS for flink HudiCatalog
1 parent 50eac6a commit 8a62070

File tree

13 files changed

+360
-370
lines changed

13 files changed

+360
-370
lines changed

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,15 @@ private FlinkOptions() {
798798
.noDefaultValue()
799799
.withDescription("The hive configuration directory, where the hive-site.xml lies in, the file should be put on the client machine");
800800

801+
// ------------------------------------------------------------------------
802+
// Create Table Options With Catalog
803+
// ------------------------------------------------------------------------
804+
public static final ConfigOption<Boolean> HIVE_IS_EXTERNAL = ConfigOptions
805+
.key("hive.is-external")
806+
.booleanType()
807+
.defaultValue(true)
808+
.withDescription("Whether the table is external, default true");
809+
801810
// -------------------------------------------------------------------------
802811
// Utilities
803812
// -------------------------------------------------------------------------

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@
2323
import org.apache.flink.configuration.Configuration;
2424
import org.apache.hadoop.fs.Path;
2525

26-
import java.io.File;
27-
import java.util.ArrayList;
28-
import java.util.List;
2926
import java.util.Map;
3027

3128
/**
@@ -57,45 +54,6 @@ public static org.apache.hadoop.conf.Configuration getHadoopConf(Configuration c
5754
return hadoopConf;
5855
}
5956

60-
/**
61-
* Returns a new hadoop configuration that is initialized with the given hadoopConfDir.
62-
*
63-
* @param hadoopConfDir Hadoop conf directory path.
64-
* @return A Hadoop configuration instance.
65-
*/
66-
public static org.apache.hadoop.conf.Configuration getHadoopConfiguration(String hadoopConfDir) {
67-
if (new File(hadoopConfDir).exists()) {
68-
List<File> possiableConfFiles = new ArrayList<File>();
69-
File coreSite = new File(hadoopConfDir, "core-site.xml");
70-
if (coreSite.exists()) {
71-
possiableConfFiles.add(coreSite);
72-
}
73-
File hdfsSite = new File(hadoopConfDir, "hdfs-site.xml");
74-
if (hdfsSite.exists()) {
75-
possiableConfFiles.add(hdfsSite);
76-
}
77-
File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
78-
if (yarnSite.exists()) {
79-
possiableConfFiles.add(yarnSite);
80-
}
81-
// Add mapred-site.xml. We need to read configurations like compression codec.
82-
File mapredSite = new File(hadoopConfDir, "mapred-site.xml");
83-
if (mapredSite.exists()) {
84-
possiableConfFiles.add(mapredSite);
85-
}
86-
if (possiableConfFiles.isEmpty()) {
87-
return null;
88-
} else {
89-
org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
90-
for (File confFile : possiableConfFiles) {
91-
hadoopConfiguration.addResource(new Path(confFile.getAbsolutePath()));
92-
}
93-
return hadoopConfiguration;
94-
}
95-
}
96-
return null;
97-
}
98-
9957
/**
10058
* Creates a Hive configuration with configured dir path or empty if no Hive conf dir is set.
10159
*/

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.configuration.Configuration;
2828

2929
import java.util.Locale;
30+
import java.util.Map;
3031

3132
/**
3233
* Tool helping to resolve the flink options {@link FlinkOptions}.
@@ -66,6 +67,14 @@ public static boolean isMorTable(Configuration conf) {
6667
.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
6768
}
6869

70+
/**
71+
* Returns whether it is a MERGE_ON_READ table.
72+
*/
73+
public static boolean isMorTable(Map<String, String> options) {
74+
return options.getOrDefault(FlinkOptions.TABLE_TYPE.key(),
75+
FlinkOptions.TABLE_TYPE.defaultValue()).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
76+
}
77+
6978
/**
7079
* Returns whether it is a COPY_ON_WRITE table.
7180
*/

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/CatalogOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
* Hoodie catalog options.
3030
*/
3131
public class CatalogOptions {
32+
public static final String HIVE_SITE_FILE = "hive-site.xml";
33+
public static final String DEFAULT_DB = "default";
3234

3335
public static final ConfigOption<String> CATALOG_PATH =
3436
ConfigOptions.key("catalog.path")
@@ -42,6 +44,16 @@ public class CatalogOptions {
4244
.stringType()
4345
.defaultValue("default");
4446

47+
public static final ConfigOption<String> HIVE_CONF_DIR = ConfigOptions
48+
.key("hive.conf.dir")
49+
.stringType()
50+
.noDefaultValue();
51+
52+
public static final ConfigOption<String> MODE = ConfigOptions
53+
.key("mode")
54+
.stringType()
55+
.defaultValue("dfs");
56+
4557
/**
4658
* Returns all the common table options that can be shared.
4759
*
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.table.catalog;
20+
21+
import org.apache.hudi.common.util.StringUtils;
22+
23+
import org.apache.flink.table.api.DataTypes;
24+
import org.apache.flink.table.api.TableSchema;
25+
import org.apache.flink.table.types.DataType;
26+
import org.apache.flink.table.types.logical.LogicalType;
27+
import org.apache.hadoop.hive.metastore.api.FieldSchema;
28+
import org.apache.hadoop.hive.metastore.api.Table;
29+
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
30+
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
31+
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
32+
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
33+
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
34+
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
35+
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
36+
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
37+
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
38+
39+
import java.util.ArrayList;
40+
import java.util.Collections;
41+
import java.util.List;
42+
43+
import static org.apache.flink.util.Preconditions.checkNotNull;
44+
45+
/**
46+
* hive table options.
47+
*/
48+
public class HiveTableOptions {
49+
/** Get field names from field schemas. */
50+
public static List<String> getFieldNames(List<FieldSchema> fieldSchemas) {
51+
List<String> names = new ArrayList<>(fieldSchemas.size());
52+
for (FieldSchema fs : fieldSchemas) {
53+
names.add(fs.getName());
54+
}
55+
return names;
56+
}
57+
58+
public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTable) {
59+
List<FieldSchema> allCols = new ArrayList<>(hiveTable.getSd().getCols());
60+
allCols.addAll(hiveTable.getPartitionKeys());
61+
62+
String pkConstraintName = hiveTable.getParameters().get(TableOptionProperties.PK_CONSTRAINT_NAME);
63+
List<String> primaryColNames = StringUtils.isNullOrEmpty(pkConstraintName)
64+
? Collections.EMPTY_LIST
65+
: StringUtils.split(hiveTable.getParameters().get(TableOptionProperties.PK_COLUMNS),",");
66+
67+
String[] colNames = new String[allCols.size()];
68+
DataType[] colTypes = new DataType[allCols.size()];
69+
70+
for (int i = 0; i < allCols.size(); i++) {
71+
FieldSchema fs = allCols.get(i);
72+
73+
colNames[i] = fs.getName();
74+
colTypes[i] =
75+
toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
76+
if (primaryColNames.contains(colNames[i])) {
77+
colTypes[i] = colTypes[i].notNull();
78+
}
79+
}
80+
81+
org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder().fromFields(colNames, colTypes);
82+
if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
83+
builder.primaryKeyNamed(pkConstraintName, primaryColNames);
84+
}
85+
86+
return builder.build();
87+
}
88+
89+
/**
90+
* Convert Hive data type to a Flink data type.
91+
*
92+
* @param hiveType a Hive data type
93+
* @return the corresponding Flink data type
94+
*/
95+
public static DataType toFlinkType(TypeInfo hiveType) {
96+
checkNotNull(hiveType, "hiveType cannot be null");
97+
98+
switch (hiveType.getCategory()) {
99+
case PRIMITIVE:
100+
return toFlinkPrimitiveType((PrimitiveTypeInfo) hiveType);
101+
case LIST:
102+
ListTypeInfo listTypeInfo = (ListTypeInfo) hiveType;
103+
return DataTypes.ARRAY(toFlinkType(listTypeInfo.getListElementTypeInfo()));
104+
case MAP:
105+
MapTypeInfo mapTypeInfo = (MapTypeInfo) hiveType;
106+
return DataTypes.MAP(
107+
toFlinkType(mapTypeInfo.getMapKeyTypeInfo()),
108+
toFlinkType(mapTypeInfo.getMapValueTypeInfo()));
109+
case STRUCT:
110+
StructTypeInfo structTypeInfo = (StructTypeInfo) hiveType;
111+
112+
List<String> names = structTypeInfo.getAllStructFieldNames();
113+
List<TypeInfo> typeInfos = structTypeInfo.getAllStructFieldTypeInfos();
114+
115+
DataTypes.Field[] fields = new DataTypes.Field[names.size()];
116+
117+
for (int i = 0; i < fields.length; i++) {
118+
fields[i] = DataTypes.FIELD(names.get(i), toFlinkType(typeInfos.get(i)));
119+
}
120+
121+
return DataTypes.ROW(fields);
122+
default:
123+
throw new UnsupportedOperationException(
124+
String.format("Flink doesn't support Hive data type %s yet.", hiveType));
125+
}
126+
}
127+
128+
private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
129+
checkNotNull(hiveType, "hiveType cannot be null");
130+
131+
switch (hiveType.getPrimitiveCategory()) {
132+
case CHAR:
133+
return DataTypes.CHAR(((CharTypeInfo) hiveType).getLength());
134+
case VARCHAR:
135+
return DataTypes.VARCHAR(((VarcharTypeInfo) hiveType).getLength());
136+
case STRING:
137+
return DataTypes.STRING();
138+
case BOOLEAN:
139+
return DataTypes.BOOLEAN();
140+
case BYTE:
141+
return DataTypes.TINYINT();
142+
case SHORT:
143+
return DataTypes.SMALLINT();
144+
case INT:
145+
return DataTypes.INT();
146+
case LONG:
147+
return DataTypes.BIGINT();
148+
case FLOAT:
149+
return DataTypes.FLOAT();
150+
case DOUBLE:
151+
return DataTypes.DOUBLE();
152+
case DATE:
153+
return DataTypes.DATE();
154+
case TIMESTAMP:
155+
return DataTypes.TIMESTAMP(9);
156+
case BINARY:
157+
return DataTypes.BYTES();
158+
case DECIMAL:
159+
DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) hiveType;
160+
return DataTypes.DECIMAL(
161+
decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
162+
default:
163+
throw new UnsupportedOperationException(
164+
String.format(
165+
"Flink doesn't support Hive primitive type %s yet", hiveType));
166+
}
167+
}
168+
169+
/** Create Hive columns from Flink TableSchema. */
170+
public static List<FieldSchema> createHiveColumns(TableSchema schema) {
171+
String[] fieldNames = schema.getFieldNames();
172+
DataType[] fieldTypes = schema.getFieldDataTypes();
173+
174+
List<FieldSchema> columns = new ArrayList<>(fieldNames.length);
175+
176+
for (int i = 0; i < fieldNames.length; i++) {
177+
columns.add(
178+
new FieldSchema(
179+
fieldNames[i],
180+
toHiveTypeInfo(fieldTypes[i], true).getTypeName(),
181+
null));
182+
}
183+
184+
return columns;
185+
}
186+
187+
/**
188+
* Convert Flink DataType to Hive TypeInfo. For types with a precision parameter, e.g.
189+
* timestamp, the supported precisions in Hive and Flink can be different. Therefore the
190+
* conversion will fail for those types if the precision is not supported by Hive and
191+
* checkPrecision is true.
192+
*
193+
* @param dataType a Flink DataType
194+
* @param checkPrecision whether to fail the conversion if the precision of the DataType is not
195+
* supported by Hive
196+
* @return the corresponding Hive data type
197+
*/
198+
public static TypeInfo toHiveTypeInfo(DataType dataType, boolean checkPrecision) {
199+
checkNotNull(dataType, "type cannot be null");
200+
LogicalType logicalType = dataType.getLogicalType();
201+
return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType, checkPrecision));
202+
}
203+
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.util.Collections;
3232
import java.util.HashSet;
33+
import java.util.Locale;
3334
import java.util.Set;
3435

3536
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
@@ -53,19 +54,19 @@ public Catalog createCatalog(Context context) {
5354
final FactoryUtil.CatalogFactoryHelper helper =
5455
FactoryUtil.createCatalogFactoryHelper(this, context);
5556
helper.validate();
56-
57-
if (helper.getOptions().get(HoodieCatalogFactoryOptions.MODE).equalsIgnoreCase("hms")) {
58-
return new HoodieHiveCatalog(
59-
context.getName(),
60-
helper.getOptions().get(HoodieCatalogFactoryOptions.DEFAULT_DATABASE),
61-
helper.getOptions().get(HoodieCatalogFactoryOptions.HIVE_CONF_DIR),
62-
helper.getOptions().get(HoodieCatalogFactoryOptions.INIT_FS_TABLE));
63-
} else if (helper.getOptions().get(HoodieCatalogFactoryOptions.MODE).equalsIgnoreCase("dfs")) {
64-
return new HoodieCatalog(
65-
context.getName(),
66-
(Configuration) helper.getOptions());
67-
} else {
68-
throw new HoodieCatalogException("hoodie catalog supports only the hms and dfs modes.");
57+
String mode = helper.getOptions().get(CatalogOptions.MODE);
58+
switch (mode.toLowerCase(Locale.ROOT)) {
59+
case "hms":
60+
return new HoodieHiveCatalog(
61+
context.getName(),
62+
helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE),
63+
helper.getOptions().get(CatalogOptions.HIVE_CONF_DIR));
64+
case "dfs":
65+
return new HoodieCatalog(
66+
context.getName(),
67+
(Configuration) helper.getOptions());
68+
default:
69+
throw new HoodieCatalogException(String.format("Invalid catalog mode: %s, supported modes: [hms, dfs].", mode));
6970
}
7071
}
7172

@@ -77,12 +78,11 @@ public Set<ConfigOption<?>> requiredOptions() {
7778
@Override
7879
public Set<ConfigOption<?>> optionalOptions() {
7980
final Set<ConfigOption<?>> options = new HashSet<>();
80-
options.add(HoodieCatalogFactoryOptions.DEFAULT_DATABASE);
81+
options.add(CatalogOptions.DEFAULT_DATABASE);
8182
options.add(PROPERTY_VERSION);
82-
options.add(HoodieCatalogFactoryOptions.HIVE_CONF_DIR);
83-
options.add(HoodieCatalogFactoryOptions.MODE);
83+
options.add(CatalogOptions.HIVE_CONF_DIR);
84+
options.add(CatalogOptions.MODE);
8485
options.add(CATALOG_PATH);
85-
options.add(HoodieCatalogFactoryOptions.INIT_FS_TABLE);
8686
return options;
8787
}
8888
}

0 commit comments

Comments
 (0)