Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.table.catalog;

import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;

import org.apache.flink.table.api.DataTypes;
Expand All @@ -40,6 +41,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -49,11 +51,7 @@
public class HiveSchemaUtils {
/** Get field names from field schemas. */
public static List<String> getFieldNames(List<FieldSchema> fieldSchemas) {
List<String> names = new ArrayList<>(fieldSchemas.size());
for (FieldSchema fs : fieldSchemas) {
names.add(fs.getName());
}
return names;
return fieldSchemas.stream().map(FieldSchema::getName).collect(Collectors.toList());
}

public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTable) {
Expand Down Expand Up @@ -204,4 +202,27 @@ public static TypeInfo toHiveTypeInfo(DataType dataType) {
LogicalType logicalType = dataType.getLogicalType();
return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType));
}

/**
* Split the field schemas by given partition keys.
*
* @param fieldSchemas The Hive field schemas.
* @param partitionKeys The partition keys.
*
* @return The pair of (regular columns, partition columns) schema fields
*/
public static Pair<List<FieldSchema>, List<FieldSchema>> splitSchemaByPartitionKeys(
List<FieldSchema> fieldSchemas,
List<String> partitionKeys) {
List<FieldSchema> regularColumns = new ArrayList<>();
List<FieldSchema> partitionColumns = new ArrayList<>();
for (FieldSchema fieldSchema : fieldSchemas) {
if (partitionKeys.contains(fieldSchema.getName())) {
partitionColumns.add(fieldSchema);
} else {
regularColumns.add(fieldSchema);
}
}
return Pair.of(regularColumns, partitionColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.hudi.table.catalog;

import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;

import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand All @@ -33,6 +35,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
import static org.apache.hudi.table.catalog.CatalogOptions.HIVE_SITE_FILE;
Expand Down Expand Up @@ -93,4 +99,18 @@ public static HiveConf createHiveConf(@Nullable String hiveConfDir) {
public static boolean isEmbeddedMetastore(HiveConf hiveConf) {
return isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
}

/**
* Returns the partition key list with given table.
*/
public static List<String> getPartitionKeys(CatalogTable table) {
// the PARTITIONED BY syntax always has higher priority than option FlinkOptions#PARTITION_PATH_FIELD
if (table.isPartitioned()) {
return table.getPartitionKeys();
} else if (table.getOptions().containsKey(FlinkOptions.PARTITION_PATH_FIELD.key())) {
return Arrays.stream(table.getOptions().get(FlinkOptions.PARTITION_PATH_FIELD.key()).split(","))
.collect(Collectors.toList());
}
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieCatalogException;
Expand Down Expand Up @@ -86,7 +87,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -539,25 +539,19 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,
List<FieldSchema> allColumns = HiveSchemaUtils.createHiveColumns(table.getSchema());

// Table columns and partition keys
if (table instanceof CatalogTable) {
CatalogTable catalogTable = (CatalogTable) table;

if (catalogTable.isPartitioned()) {
int partitionKeySize = catalogTable.getPartitionKeys().size();
List<FieldSchema> regularColumns =
allColumns.subList(0, allColumns.size() - partitionKeySize);
List<FieldSchema> partitionColumns =
allColumns.subList(
allColumns.size() - partitionKeySize, allColumns.size());

sd.setCols(regularColumns);
hiveTable.setPartitionKeys(partitionColumns);
} else {
sd.setCols(allColumns);
hiveTable.setPartitionKeys(new ArrayList<>());
}
CatalogTable catalogTable = (CatalogTable) table;

final List<String> partitionKeys = HoodieCatalogUtil.getPartitionKeys(catalogTable);
if (partitionKeys.size() > 0) {
Pair<List<FieldSchema>, List<FieldSchema>> splitSchemas = HiveSchemaUtils.splitSchemaByPartitionKeys(allColumns, partitionKeys);
List<FieldSchema> regularColumns = splitSchemas.getLeft();
List<FieldSchema> partitionColumns = splitSchemas.getRight();

sd.setCols(regularColumns);
hiveTable.setPartitionKeys(partitionColumns);
} else {
sd.setCols(allColumns);
hiveTable.setPartitionKeys(Collections.emptyList());
}

HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
Expand All @@ -572,7 +566,7 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table,
serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(!useRealTimeInputFormat));
serdeProperties.put("serialization.format", "1");

serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark((CatalogTable)table, hiveConf, properties));
serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys));

sd.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,16 @@ public static Map<String, String> getTableOptions(Map<String, String> options) {
return copied;
}

public static Map<String, String> translateFlinkTableProperties2Spark(CatalogTable catalogTable, Configuration hadoopConf, Map<String, String> properties) {
public static Map<String, String> translateFlinkTableProperties2Spark(
CatalogTable catalogTable,
Configuration hadoopConf,
Map<String, String> properties,
List<String> partitionKeys) {
Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType());
MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(
catalogTable.getPartitionKeys(),
partitionKeys,
sparkVersion,
4000,
messageType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public class TestHoodieHiveCatalog {
.field("uuid", DataTypes.INT().notNull())
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.field("ts", DataTypes.BIGINT())
.field("par1", DataTypes.STRING())
.field("ts", DataTypes.BIGINT())
.primaryKey("uuid")
.build();
List<String> partitions = Collections.singletonList("par1");
Expand Down Expand Up @@ -95,21 +95,29 @@ public static void closeCatalog() {
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Exception {
Map<String, String> originOptions = new HashMap<>();
originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
originOptions.put(FlinkOptions.TABLE_TYPE.key(), tableType.toString());
Map<String, String> options = new HashMap<>();
options.put(FactoryUtil.CONNECTOR.key(), "hudi");
options.put(FlinkOptions.TABLE_TYPE.key(), tableType.toString());

CatalogTable table =
new CatalogTableImpl(schema, partitions, originOptions, "hudi table");
new CatalogTableImpl(schema, partitions, options, "hudi table");
hoodieCatalog.createTable(tablePath, table, false);

CatalogBaseTable table1 = hoodieCatalog.getTable(tablePath);
assertEquals(table1.getOptions().get(CONNECTOR.key()), "hudi");
assertEquals(table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()), tableType.toString());
assertEquals(table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()), "uuid");
assertEquals("hudi", table1.getOptions().get(CONNECTOR.key()));
assertEquals(tableType.toString(), table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()));
assertEquals("uuid", table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()));
assertNull(table1.getOptions().get(FlinkOptions.PRECOMBINE_FIELD.key()), "preCombine key is not declared");
assertEquals(table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames(), Collections.singletonList("uuid"));
assertEquals(((CatalogTable)table1).getPartitionKeys(), Collections.singletonList("par1"));
assertEquals(Collections.singletonList("uuid"), table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
assertEquals(Collections.singletonList("par1"), ((CatalogTable)table1).getPartitionKeys());

// test explicit primary key
options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "id");
table = new CatalogTableImpl(schema, partitions, options, "hudi table");
hoodieCatalog.alterTable(tablePath, table, true);

CatalogBaseTable table2 = hoodieCatalog.getTable(tablePath);
assertEquals("id", table2.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()));
}

@ParameterizedTest
Expand Down