diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java index c9590ff4a26bf..ea965f5c01e20 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.catalog; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.apache.flink.table.api.DataTypes; @@ -40,6 +41,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -49,11 +51,7 @@ public class HiveSchemaUtils { /** Get field names from field schemas. */ public static List getFieldNames(List fieldSchemas) { - List names = new ArrayList<>(fieldSchemas.size()); - for (FieldSchema fs : fieldSchemas) { - names.add(fs.getName()); - } - return names; + return fieldSchemas.stream().map(FieldSchema::getName).collect(Collectors.toList()); } public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTable) { @@ -204,4 +202,27 @@ public static TypeInfo toHiveTypeInfo(DataType dataType) { LogicalType logicalType = dataType.getLogicalType(); return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType)); } + + /** + * Split the field schemas by given partition keys. + * + * @param fieldSchemas The Hive field schemas. + * @param partitionKeys The partition keys. + * + * @return The pair of (regular columns, partition columns) schema fields + */ + public static Pair, List> splitSchemaByPartitionKeys( + List fieldSchemas, + List partitionKeys) { + List regularColumns = new ArrayList<>(); + List partitionColumns = new ArrayList<>(); + for (FieldSchema fieldSchema : fieldSchemas) { + if (partitionKeys.contains(fieldSchema.getName())) { + partitionColumns.add(fieldSchema); + } else { + regularColumns.add(fieldSchema); + } + } + return Pair.of(regularColumns, partitionColumns); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java index f546300249ca5..3dc191afb4c50 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java @@ -18,8 +18,10 @@ package org.apache.hudi.table.catalog; +import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -33,6 +35,10 @@ import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; import static org.apache.hudi.table.catalog.CatalogOptions.HIVE_SITE_FILE; @@ -93,4 +99,18 @@ public static HiveConf createHiveConf(@Nullable String hiveConfDir) { public static boolean isEmbeddedMetastore(HiveConf hiveConf) { return isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS)); } + + /** + * Returns the partition key list with given table. + */ + public static List getPartitionKeys(CatalogTable table) { + // the PARTITIONED BY syntax always has higher priority than option FlinkOptions#PARTITION_PATH_FIELD + if (table.isPartitioned()) { + return table.getPartitionKeys(); + } else if (table.getOptions().containsKey(FlinkOptions.PARTITION_PATH_FIELD.key())) { + return Arrays.stream(table.getOptions().get(FlinkOptions.PARTITION_PATH_FIELD.key()).split(",")) + .collect(Collectors.toList()); + } + return Collections.emptyList(); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index 1e877b133e1a7..07f62911457ab 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieCatalogException; @@ -86,7 +87,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -539,25 +539,19 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, List allColumns = HiveSchemaUtils.createHiveColumns(table.getSchema()); // Table columns and partition keys - if (table instanceof CatalogTable) { - CatalogTable catalogTable = (CatalogTable) table; - - if (catalogTable.isPartitioned()) { - int partitionKeySize = catalogTable.getPartitionKeys().size(); - List regularColumns = - allColumns.subList(0, allColumns.size() - partitionKeySize); - List partitionColumns = - allColumns.subList( - allColumns.size() - partitionKeySize, allColumns.size()); - - sd.setCols(regularColumns); - hiveTable.setPartitionKeys(partitionColumns); - } else { - sd.setCols(allColumns); - hiveTable.setPartitionKeys(new ArrayList<>()); - } + CatalogTable catalogTable = (CatalogTable) table; + + final List partitionKeys = HoodieCatalogUtil.getPartitionKeys(catalogTable); + if (partitionKeys.size() > 0) { + Pair, List> splitSchemas = HiveSchemaUtils.splitSchemaByPartitionKeys(allColumns, partitionKeys); + List regularColumns = splitSchemas.getLeft(); + List partitionColumns = splitSchemas.getRight(); + + sd.setCols(regularColumns); + hiveTable.setPartitionKeys(partitionColumns); } else { sd.setCols(allColumns); + hiveTable.setPartitionKeys(Collections.emptyList()); } HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET; @@ -572,7 +566,7 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(!useRealTimeInputFormat)); serdeProperties.put("serialization.format", "1"); - serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark((CatalogTable)table, hiveConf, properties)); + serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys)); sd.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java index 9477cd6dafc5f..a0864bbf3773b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java @@ -164,12 +164,16 @@ public static Map getTableOptions(Map options) { return copied; } - public static Map translateFlinkTableProperties2Spark(CatalogTable catalogTable, Configuration hadoopConf, Map properties) { + public static Map translateFlinkTableProperties2Spark( + CatalogTable catalogTable, + Configuration hadoopConf, + Map properties, + List partitionKeys) { Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType()); MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf); String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION); Map sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties( - catalogTable.getPartitionKeys(), + partitionKeys, sparkVersion, 4000, messageType); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java index da6cde4e89ced..66ba520af9587 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java @@ -66,8 +66,8 @@ public class TestHoodieHiveCatalog { .field("uuid", DataTypes.INT().notNull()) .field("name", DataTypes.STRING()) .field("age", DataTypes.INT()) - .field("ts", DataTypes.BIGINT()) .field("par1", DataTypes.STRING()) + .field("ts", DataTypes.BIGINT()) .primaryKey("uuid") .build(); List partitions = Collections.singletonList("par1"); @@ -95,21 +95,29 @@ public static void closeCatalog() { @ParameterizedTest @EnumSource(value = HoodieTableType.class) public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Exception { - Map originOptions = new HashMap<>(); - originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi"); - originOptions.put(FlinkOptions.TABLE_TYPE.key(), tableType.toString()); + Map options = new HashMap<>(); + options.put(FactoryUtil.CONNECTOR.key(), "hudi"); + options.put(FlinkOptions.TABLE_TYPE.key(), tableType.toString()); CatalogTable table = - new CatalogTableImpl(schema, partitions, originOptions, "hudi table"); + new CatalogTableImpl(schema, partitions, options, "hudi table"); hoodieCatalog.createTable(tablePath, table, false); CatalogBaseTable table1 = hoodieCatalog.getTable(tablePath); - assertEquals(table1.getOptions().get(CONNECTOR.key()), "hudi"); - assertEquals(table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()), tableType.toString()); - assertEquals(table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()), "uuid"); + assertEquals("hudi", table1.getOptions().get(CONNECTOR.key())); + assertEquals(tableType.toString(), table1.getOptions().get(FlinkOptions.TABLE_TYPE.key())); + assertEquals("uuid", table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key())); assertNull(table1.getOptions().get(FlinkOptions.PRECOMBINE_FIELD.key()), "preCombine key is not declared"); - assertEquals(table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames(), Collections.singletonList("uuid")); - assertEquals(((CatalogTable)table1).getPartitionKeys(), Collections.singletonList("par1")); + assertEquals(Collections.singletonList("uuid"), table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames()); + assertEquals(Collections.singletonList("par1"), ((CatalogTable)table1).getPartitionKeys()); + + // test explicit primary key + options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "id"); + table = new CatalogTableImpl(schema, partitions, options, "hudi table"); + hoodieCatalog.alterTable(tablePath, table, true); + + CatalogBaseTable table2 = hoodieCatalog.getTable(tablePath); + assertEquals("id", table2.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key())); } @ParameterizedTest