diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java index 4383b42e9f8d..fac507cb7db6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java @@ -181,9 +181,11 @@ private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) { */ public static List toHiveFieldSchema(TableSchema schema, boolean withOperationField) { List columns = new ArrayList<>(); - Collection metaFields = withOperationField - ? HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION // caution that the set may break sequence - : HoodieRecord.HOODIE_META_COLUMNS; + Collection metaFields = new ArrayList<>(HoodieRecord.HOODIE_META_COLUMNS); + if (withOperationField) { + metaFields.add(HoodieRecord.OPERATION_METADATA_FIELD); + } + for (String metaField : metaFields) { columns.add(new FieldSchema(metaField, "string", null)); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index 8644435b5abe..bb00bee50e63 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -596,7 +596,7 @@ private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(!useRealTimeInputFormat)); serdeProperties.put("serialization.format", "1"); - serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys)); + serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys, withOperationField)); sd.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java index a0864bbf3773..6e327bdc6120 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.catalog; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.configuration.FlinkOptions; @@ -28,6 +29,8 @@ import org.apache.avro.Schema; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -39,7 +42,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -49,6 +54,7 @@ import java.util.stream.Collectors; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD; import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME; /** @@ -168,8 +174,10 @@ public static Map translateFlinkTableProperties2Spark( CatalogTable catalogTable, Configuration hadoopConf, Map properties, - List partitionKeys) { - Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType()); + List partitionKeys, + boolean withOperationField) { + RowType rowType = supplementMetaFields((RowType) catalogTable.getSchema().toPhysicalRowDataType().getLogicalType(), withOperationField); + Schema schema = AvroSchemaConverter.convertToSchema(rowType); MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf); String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION); Map sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties( @@ -184,6 +192,19 @@ public static Map translateFlinkTableProperties2Spark( e -> e.getKey().equalsIgnoreCase(FlinkOptions.TABLE_TYPE.key()) ? VALUE_MAPPING.get(e.getValue()) : e.getValue())); } + private static RowType supplementMetaFields(RowType rowType, boolean withOperationField) { + Collection metaFields = new ArrayList<>(HoodieRecord.HOODIE_META_COLUMNS); + if (withOperationField) { + metaFields.add(OPERATION_METADATA_FIELD); + } + ArrayList rowFields = new ArrayList<>(); + for (String metaField : metaFields) { + rowFields.add(new RowType.RowField(metaField, new VarCharType(10000))); + } + rowFields.addAll(rowType.getFields()); + return new RowType(false, rowFields); + } + public static Map translateSparkTableProperties2Flink(Map options) { if (options.containsKey(CONNECTOR.key())) { return options; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java index 5d27cdadbbb3..c697cb92509f 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java @@ -139,6 +139,21 @@ public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Except .collect(Collectors.joining(",")); assertEquals("par1:string", partitionSchema); + // validate spark schema properties + String avroSchemaStr = hiveTable.getParameters().get("spark.sql.sources.schema.part.0"); + String expectedAvroSchemaStr = "" + + "{\"type\":\"struct\",\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_commit_seqno\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_record_key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"uuid\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}," + + "{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"par1\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"; + assertEquals(expectedAvroSchemaStr, avroSchemaStr); + // validate catalog table CatalogBaseTable table1 = hoodieCatalog.getTable(tablePath); assertEquals("hudi", table1.getOptions().get(CONNECTOR.key()));