diff --git a/hudi-client/hudi-flink-client/pom.xml b/hudi-client/hudi-flink-client/pom.xml index a44bdf1b8381b..eb044312c42c8 100644 --- a/hudi-client/hudi-flink-client/pom.xml +++ b/hudi-client/hudi-flink-client/pom.xml @@ -29,6 +29,10 @@ hudi-flink-client jar + + ${flink.format.parquet.version} + + @@ -87,6 +91,13 @@ org.apache.parquet parquet-avro + ${parquet.version} + + + + org.apache.parquet + parquet-column + ${parquet.version} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java index 5da45bf25d3f1..66a39b54a910b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java @@ -34,6 +34,7 @@ import org.apache.flink.table.types.logical.TimestampType; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -46,6 +47,8 @@ import java.util.ArrayList; import java.util.List; +import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; + /** * Schema converter converts Parquet schema to and from Flink internal types. * @@ -436,7 +439,7 @@ private static Type convertField( String.format( "Can not convert Flink MapTypeInfo %s to Parquet" + " Map type as key has to be String", - typeInfo.toString())); + typeInfo)); } } else if (typeInfo instanceof ObjectArrayTypeInfo) { ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) typeInfo; @@ -567,18 +570,16 @@ private static Type convertToParquetType( int numBytes = computeMinBytesForDecimalPrecision(precision); return Types.primitive( PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition) - .precision(precision) - .scale(scale) + .as(LogicalTypeAnnotation.decimalType(scale, precision)) .length(numBytes) - .as(OriginalType.DECIMAL) .named(name); case TINYINT: return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) - .as(OriginalType.INT_8) + .as(LogicalTypeAnnotation.intType(8, true)) .named(name); case SMALLINT: return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) - .as(OriginalType.INT_16) + .as(LogicalTypeAnnotation.intType(16, true)) .named(name); case INTEGER: return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) @@ -594,16 +595,17 @@ private static Type convertToParquetType( .named(name); case DATE: return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) - .as(OriginalType.DATE) + .as(LogicalTypeAnnotation.dateType()) .named(name); case TIME_WITHOUT_TIME_ZONE: return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) - .as(OriginalType.TIME_MILLIS) + .as(LogicalTypeAnnotation.timeType(true, TimeUnit.MILLIS)) .named(name); case TIMESTAMP_WITHOUT_TIME_ZONE: TimestampType timestampType = (TimestampType) type; if (timestampType.getPrecision() == 3) { return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition) + .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS)) .named(name); } else { return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition) @@ -613,6 +615,7 @@ private static Type convertToParquetType( LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type; if (localZonedTimestampType.getPrecision() == 3) { return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition) + .as(LogicalTypeAnnotation.timestampType(false, TimeUnit.MILLIS)) .named(name); } else { return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition) diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java index 5305bcc8aba74..a1a07a65f9931 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/io/storage/row/parquet/TestParquetSchemaConverter.java @@ -51,24 +51,41 @@ void testConvertComplexTypes() { final String expected = "message converted {\n" + " optional group f_array (LIST) {\n" + " repeated group list {\n" - + " optional binary element (UTF8);\n" + + " optional binary element (STRING);\n" + " }\n" + " }\n" + " optional group f_map (MAP) {\n" + " repeated group key_value {\n" + " optional int32 key;\n" - + " optional binary value (UTF8);\n" + + " optional binary value (STRING);\n" + " }\n" + " }\n" + " optional group f_row {\n" + " optional int32 f_row_f0;\n" - + " optional binary f_row_f1 (UTF8);\n" + + " optional binary f_row_f1 (STRING);\n" + " optional group f_row_f2 {\n" + " optional int32 f_row_f2_f0;\n" - + " optional binary f_row_f2_f1 (UTF8);\n" + + " optional binary f_row_f2_f1 (STRING);\n" + " }\n" + " }\n" + "}\n"; assertThat(messageType.toString(), is(expected)); } + + @Test + void testConvertTimestampTypes() { + DataType dataType = DataTypes.ROW( + DataTypes.FIELD("ts_3", DataTypes.TIMESTAMP(3)), + DataTypes.FIELD("ts_6", DataTypes.TIMESTAMP(6)), + DataTypes.FIELD("ts_9", DataTypes.TIMESTAMP(9))); + org.apache.parquet.schema.MessageType messageType = + ParquetSchemaConverter.convertToParquetMessageType("converted", (RowType) dataType.getLogicalType()); + assertThat(messageType.getColumns().size(), is(3)); + final String expected = "message converted {\n" + + " optional int64 ts_3 (TIMESTAMP(MILLIS,true));\n" + + " optional int96 ts_6;\n" + + " optional int96 ts_9;\n" + + "}\n"; + assertThat(messageType.toString(), is(expected)); + } } diff --git a/hudi-flink-datasource/hudi-flink/pom.xml b/hudi-flink-datasource/hudi-flink/pom.xml index abc458c649afe..97288d19cd35c 100644 --- a/hudi-flink-datasource/hudi-flink/pom.xml +++ b/hudi-flink-datasource/hudi-flink/pom.xml @@ -32,7 +32,7 @@ ${project.parent.parent.basedir} - 1.11.1 + ${flink.format.parquet.version} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index b1e00875de9f2..e2be7d364b77f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -671,7 +671,7 @@ private FlinkOptions() { public static final ConfigOption HIVE_SYNC_SUPPORT_TIMESTAMP = ConfigOptions .key("hive_sync.support_timestamp") .booleanType() - .defaultValue(false) + .defaultValue(true) .withDescription("INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n" + "Disabled by default for backward compatibility."); diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index 75b4629d0ed45..584c3871cd449 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -34,8 +34,8 @@ provided org.apache.hudi. 3.1.0 - - 1.11.1 + + ${flink.format.parquet.version} 2.3.1 0.9.3 diff --git a/pom.xml b/pom.xml index 5d67a43dbd62f..29fb187d3dbea 100644 --- a/pom.xml +++ b/pom.xml @@ -129,6 +129,7 @@ flink-runtime flink-table-runtime_${scala.binary.version} flink-table-planner_${scala.binary.version} + 1.12.2 3.1.3 3.2.1 hudi-spark2