Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions hudi-flink-datasource/hudi-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -269,18 +269,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>javax.transaction-api</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-metastore</artifactId>
Expand Down Expand Up @@ -421,5 +409,18 @@
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<!-- Hive dependencies -->
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>javax.transaction-api</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.hudi.table.catalog;

import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
Expand All @@ -37,7 +39,6 @@
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -60,9 +61,8 @@ public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTab
allCols.addAll(hiveTable.getPartitionKeys());

String pkConstraintName = hiveTable.getParameters().get(TableOptionProperties.PK_CONSTRAINT_NAME);
List<String> primaryColNames = StringUtils.isNullOrEmpty(pkConstraintName)
? Collections.EMPTY_LIST
: StringUtils.split(hiveTable.getParameters().get(TableOptionProperties.PK_COLUMNS),",");
String pkColumnStr = hiveTable.getParameters().getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
List<String> pkColumns = StringUtils.split(pkColumnStr,",");

String[] colNames = new String[allCols.size()];
DataType[] colTypes = new DataType[allCols.size()];
Expand All @@ -73,14 +73,16 @@ public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTab
colNames[i] = fs.getName();
colTypes[i] =
toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
if (primaryColNames.contains(colNames[i])) {
if (pkColumns.contains(colNames[i])) {
colTypes[i] = colTypes[i].notNull();
}
}

org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder().fromFields(colNames, colTypes);
if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
builder.primaryKeyNamed(pkConstraintName, primaryColNames);
builder.primaryKeyNamed(pkConstraintName, pkColumns);
} else {
builder.primaryKey(pkColumns);
}

return builder.build();
Expand Down Expand Up @@ -152,7 +154,8 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
case DATE:
return DataTypes.DATE();
case TIMESTAMP:
return DataTypes.TIMESTAMP(9);
// see org.apache.hudi.hive.util.HiveSchemaUtil#convertField for details.
return DataTypes.TIMESTAMP(6);
case BINARY:
return DataTypes.BYTES();
case DECIMAL:
Expand All @@ -168,16 +171,18 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {

/** Create Hive columns from Flink TableSchema. */
public static List<FieldSchema> createHiveColumns(TableSchema schema) {
String[] fieldNames = schema.getFieldNames();
DataType[] fieldTypes = schema.getFieldDataTypes();
final DataType dataType = schema.toPersistedRowDataType();
final RowType rowType = (RowType) dataType.getLogicalType();
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
final DataType[] fieldTypes = dataType.getChildren().toArray(new DataType[0]);

List<FieldSchema> columns = new ArrayList<>(fieldNames.length);

for (int i = 0; i < fieldNames.length; i++) {
columns.add(
new FieldSchema(
fieldNames[i],
toHiveTypeInfo(fieldTypes[i], true).getTypeName(),
toHiveTypeInfo(fieldTypes[i]).getTypeName(),
null));
}

Expand All @@ -191,13 +196,12 @@ public static List<FieldSchema> createHiveColumns(TableSchema schema) {
* checkPrecision is true.
*
* @param dataType a Flink DataType
* @param checkPrecision whether to fail the conversion if the precision of the DataType is not
* supported by Hive
*
* @return the corresponding Hive data type
*/
public static TypeInfo toHiveTypeInfo(DataType dataType, boolean checkPrecision) {
public static TypeInfo toHiveTypeInfo(DataType dataType) {
checkNotNull(dataType, "type cannot be null");
LogicalType logicalType = dataType.getLogicalType();
return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType, checkPrecision));
return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.Set;

import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;

/**
* A catalog factory impl that creates {@link HoodieCatalog}.
Expand All @@ -59,6 +58,7 @@ public Catalog createCatalog(Context context) {
case "hms":
return new HoodieHiveCatalog(
context.getName(),
helper.getOptions().get(CatalogOptions.CATALOG_PATH),
helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE),
helper.getOptions().get(CatalogOptions.HIVE_CONF_DIR));
case "dfs":
Expand All @@ -82,7 +82,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(PROPERTY_VERSION);
options.add(CatalogOptions.HIVE_CONF_DIR);
options.add(CatalogOptions.MODE);
options.add(CATALOG_PATH);
options.add(CatalogOptions.CATALOG_PATH);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.table.catalog;

import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand Down Expand Up @@ -69,6 +68,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand All @@ -92,7 +92,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP;
import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
Expand All @@ -104,7 +103,6 @@
import static org.apache.hudi.configuration.FlinkOptions.PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DB;
import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT;
import static org.apache.hudi.table.catalog.TableOptionProperties.PK_COLUMNS;
import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME;
import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER;

Expand All @@ -117,12 +115,22 @@ public class HoodieHiveCatalog extends AbstractCatalog {
private final HiveConf hiveConf;
private IMetaStoreClient client;

public HoodieHiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir) {
this(catalogName, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false);
// optional catalog base path: used for db/table path inference.
private final String catalogPath;

public HoodieHiveCatalog(String catalogName, String catalogPath, String defaultDatabase, String hiveConfDir) {
this(catalogName, catalogPath, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false);
}

public HoodieHiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, boolean allowEmbedded) {
public HoodieHiveCatalog(
String catalogName,
String catalogPath,
String defaultDatabase,
HiveConf hiveConf,
boolean allowEmbedded) {
super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase);
// fallback to hive.metastore.warehouse.dir if catalog path is not specified
this.catalogPath = catalogPath == null ? hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) : catalogPath;
this.hiveConf = hiveConf;
if (!allowEmbedded) {
checkArgument(
Expand All @@ -145,7 +153,7 @@ public void open() throws CatalogException {
}
if (!databaseExists(getDefaultDatabase())) {
LOG.info("{} does not exist, will be created.", getDefaultDatabase());
CatalogDatabase database = new CatalogDatabaseImpl(Collections.EMPTY_MAP, "default database");
CatalogDatabase database = new CatalogDatabaseImpl(Collections.emptyMap(), "default database");
try {
createDatabase(getDefaultDatabase(), database, true);
} catch (DatabaseAlreadyExistException e) {
Expand Down Expand Up @@ -227,6 +235,10 @@ public void createDatabase(
Map<String, String> properties = database.getProperties();

String dbLocationUri = properties.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI);
if (dbLocationUri == null && this.catalogPath != null) {
// infer default location uri
dbLocationUri = new Path(this.catalogPath, databaseName).toString();
}

Database hiveDatabase =
new Database(databaseName, database.getComment(), dbLocationUri, properties);
Expand Down Expand Up @@ -381,8 +393,7 @@ private Table translateSparkTable2Flink(ObjectPath tablePath, Table hiveTable) {
@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
Table hiveTable = getHiveTable(tablePath);
hiveTable = translateSparkTable2Flink(tablePath, hiveTable);
Table hiveTable = translateSparkTable2Flink(tablePath, getHiveTable(tablePath));
String path = hiveTable.getSd().getLocation();
Map<String, String> parameters = hiveTable.getParameters();
Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, hiveConf);
Expand All @@ -391,16 +402,21 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder()
.fromRowDataType(AvroSchemaConverter.convertToDataType(latestTableSchema));
String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME);
String pkColumns = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
builder.primaryKeyNamed(pkConstraintName, StringUtils.split(parameters.get(PK_COLUMNS), ","));
// pkColumns expect not to be null
builder.primaryKeyNamed(pkConstraintName, StringUtils.split(pkColumns, ","));
} else if (pkColumns != null) {
builder.primaryKey(StringUtils.split(pkColumns, ","));
}
schema = builder.build();
} else {
LOG.warn("{} does not have any hoodie schema, and use hive table schema to infer the table schema", tablePath);
schema = HiveSchemaUtils.convertTableSchema(hiveTable);
}
Map<String, String> options = supplementOptions(tablePath, parameters);
return CatalogTable.of(schema, parameters.get(COMMENT),
HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), parameters);
HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), options);
}

@Override
Expand Down Expand Up @@ -439,8 +455,8 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}

private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTable) {
Configuration flinkConf = Configuration.fromMap(applyOptionsHook(catalogTable.getOptions()));
final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType()).toString();
Configuration flinkConf = Configuration.fromMap(catalogTable.getOptions());
final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPersistedRowDataType().getLogicalType()).toString();
flinkConf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);

// stores two copies of options:
Expand All @@ -449,15 +465,13 @@ private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTabl
// because the HoodieTableMetaClient is a heavy impl, we try to avoid initializing it
// when calling #getTable.

if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()) {
if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()
&& !flinkConf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
final String pkColumns = String.join(",", catalogTable.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
String recordKey = flinkConf.get(FlinkOptions.RECORD_KEY_FIELD);
if (!Objects.equals(pkColumns, recordKey)) {
throw new HoodieCatalogException(String.format("%s and %s are the different", pkColumns, recordKey));
}
flinkConf.setString(FlinkOptions.RECORD_KEY_FIELD, pkColumns);
}

if (catalogTable.isPartitioned()) {
if (catalogTable.isPartitioned() && !flinkConf.contains(FlinkOptions.PARTITION_PATH_FIELD)) {
final String partitions = String.join(",", catalogTable.getPartitionKeys());
flinkConf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
}
Expand All @@ -468,7 +482,7 @@ private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTabl

flinkConf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName());
try {
StreamerUtil.initTableIfNotExists(flinkConf);
StreamerUtil.initTableIfNotExists(flinkConf, hiveConf);
} catch (IOException e) {
throw new HoodieCatalogException("Initialize table exception.", e);
}
Expand All @@ -487,20 +501,6 @@ private String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
return location;
}

private Map<String, String> applyOptionsHook(Map<String, String> options) {
Map<String, String> properties = new HashMap<>(options);
if (!options.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
}
if (!options.containsKey(FlinkOptions.PRECOMBINE_FIELD.key())) {
properties.put(FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.PRECOMBINE_FIELD.defaultValue());
}
if (!options.containsKey(FlinkOptions.TABLE_TYPE.key())) {
properties.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE.defaultValue());
}
return properties;
}

private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, String location, boolean useRealTimeInputFormat) throws IOException {
// let Hive set default parameters for us, e.g. serialization.format
Table hiveTable =
Expand All @@ -510,7 +510,7 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,
hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName());
hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));

Map<String, String> properties = applyOptionsHook(table.getOptions());
Map<String, String> properties = new HashMap<>(table.getOptions());

if (Boolean.parseBoolean(table.getOptions().get(CatalogOptions.TABLE_EXTERNAL.key()))) {
hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString());
Expand All @@ -523,17 +523,11 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,
}

//set pk
if (table.getUnresolvedSchema().getPrimaryKey().isPresent()) {
if (table.getUnresolvedSchema().getPrimaryKey().isPresent()
&& !properties.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
String pkColumns = String.join(",", table.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
String recordKey = properties.getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
if (!Objects.equals(pkColumns, recordKey)) {
throw new HoodieCatalogException(
String.format("Primary key [%s] and record key [%s] should be the the same.",
pkColumns,
recordKey));
}
properties.put(PK_CONSTRAINT_NAME, table.getUnresolvedSchema().getPrimaryKey().get().getConstraintName());
properties.put(PK_COLUMNS, pkColumns);
properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), pkColumns);
}

if (!properties.containsKey(FlinkOptions.PATH.key())) {
Expand Down Expand Up @@ -896,4 +890,22 @@ public void alterPartitionColumnStatistics(
throws PartitionNotExistException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}

private Map<String, String> supplementOptions(
ObjectPath tablePath,
Map<String, String> options) {
if (HoodieCatalogUtil.isEmbeddedMetastore(hiveConf)) {
return options;
} else {
Map<String, String> newOptions = new HashMap<>(options);
// set up hive sync options
newOptions.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
newOptions.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
newOptions.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_DB.key(), k -> tablePath.getDatabaseName());
newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_TABLE.key(), k -> tablePath.getObjectName());
return newOptions;
}
}
}
Loading