diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java index e6b15788fe79e..9b61ecb44efee 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java @@ -37,6 +37,7 @@ import org.apache.flink.table.types.logical.TinyIntType; import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -144,6 +145,17 @@ public TypeInfo visit(TimestampType timestampType) { } } + + @Override + public TypeInfo visit(LocalZonedTimestampType localZonedTimestampType) { + int precision = localZonedTimestampType.getPrecision(); + if (precision >=0 && precision <= 9){ + return TypeInfoFactory.timestampTypeInfo; + } else { + return TypeInfoFactory.longTypeInfo; + } + } + @Override public TypeInfo visit(ArrayType arrayType) { LogicalType elementType = arrayType.getElementType(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java index 5d27cdadbbb35..0421bd0b069fd 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java @@ -83,6 +83,7 @@ public class TestHoodieHiveCatalog { .field("age", DataTypes.INT()) .field("par1", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) + .field("update_time", DataTypes.TIMESTAMP_LTZ()) .primaryKey("uuid") .build(); List partitions = Collections.singletonList("par1"); @@ -132,7 +133,8 @@ public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Except + "uuid:int," + "name:string," + "age:int," - + "ts:bigint"; + + "ts:bigint," + + "update_time:timestamp"; assertEquals(expectedFieldSchema, fieldSchema); String partitionSchema = hiveTable.getPartitionKeys().stream() .map(f -> f.getName() + ":" + f.getType()) @@ -148,7 +150,7 @@ public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Except String tableSchema = table1.getUnresolvedSchema().getColumns().stream() .map(Schema.UnresolvedColumn::toString) .collect(Collectors.joining(",")); - String expectedTableSchema = "`uuid` INT NOT NULL,`name` STRING,`age` INT,`par1` STRING,`ts` BIGINT"; + String expectedTableSchema = "`uuid` INT NOT NULL,`name` STRING,`age` INT,`par1` STRING,`ts` BIGINT,`update_time` TIMESTAMP_LTZ(6)"; assertEquals(expectedTableSchema, tableSchema); assertEquals(Collections.singletonList("uuid"), table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames()); assertEquals(Collections.singletonList("par1"), ((CatalogTable) table1).getPartitionKeys()); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java index 50a6aabd2bbd9..141a71f5bcc93 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java @@ -232,6 +232,8 @@ private static String convertField(final Type parquetType, boolean supportTimest return field.append("DATE").toString(); } else if (supportTimestamp && originalType == OriginalType.TIMESTAMP_MICROS) { return field.append("TIMESTAMP").toString(); + } else if (supportTimestamp && originalType == OriginalType.TIMESTAMP_MILLIS) { + return field.append("TIMESTAMP_LTZ").toString(); } // TODO - fix the method naming here