Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.MapType;
Expand Down Expand Up @@ -149,8 +150,12 @@ public static DataType convertToDataType(Schema schema) {
// logical timestamp type
if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
return DataTypes.TIMESTAMP(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()) {
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
return DataTypes.TIMESTAMP(6).notNull();
} else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) {
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
return DataTypes.TIME(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
Expand Down Expand Up @@ -242,19 +247,36 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
// use long to represents Timestamp
final TimestampType timestampType = (TimestampType) logicalType;
precision = timestampType.getPrecision();
org.apache.avro.LogicalType avroLogicalType;
org.apache.avro.LogicalType timestampLogicalType;
if (precision <= 3) {
avroLogicalType = LogicalTypes.timestampMillis();
timestampLogicalType = LogicalTypes.timestampMillis();
} else if (precision <= 6) {
avroLogicalType = LogicalTypes.timestampMicros();
timestampLogicalType = LogicalTypes.timestampMicros();
} else {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
}
Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
Schema timestamp = timestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(timestamp) : timestamp;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
// use long to represents LocalZonedTimestampType
final LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType;
precision = localZonedTimestampType.getPrecision();
org.apache.avro.LogicalType localZonedTimestampLogicalType;
if (precision <= 3) {
localZonedTimestampLogicalType = LogicalTypes.localTimestampMillis();
} else if (precision <= 6) {
localZonedTimestampLogicalType = LogicalTypes.localTimestampMicros();
} else {
throw new IllegalArgumentException(
"Avro does not support LOCAL TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
}
Schema localZonedTimestamp = localZonedTimestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(localZonedTimestamp) : localZonedTimestamp;
case DATE:
// use int to represents Date
Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
Expand Down Expand Up @@ -319,7 +341,6 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
.items(convertToSchema(arrayType.getElementType(), rowName));
return nullable ? nullableSchema(array) : array;
case RAW:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
default:
throw new UnsupportedOperationException(
"Unsupported to derive Schema for type: " + logicalType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
Expand Down Expand Up @@ -127,6 +128,8 @@ public static AvroToRowDataConverter createConverter(LogicalType type) {
return AvroToRowDataConverters::convertToDate;
case TIME_WITHOUT_TIME_ZONE:
return AvroToRowDataConverters::convertToTime;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return createTimestampConverter(((LocalZonedTimestampType) type).getPrecision());
case TIMESTAMP_WITHOUT_TIME_ZONE:
return createTimestampConverter(((TimestampType) type).getPrecision());
case CHAR:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@

package org.apache.hudi.util;

import org.apache.hudi.common.util.ValidationUtils;

import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
Expand All @@ -47,9 +46,13 @@ public static boolean isTimestampType(DataType type) {
* Returns the precision of the given TIMESTAMP type.
*/
public static int precision(LogicalType logicalType) {
ValidationUtils.checkArgument(logicalType instanceof TimestampType);
TimestampType timestampType = (TimestampType) logicalType;
return timestampType.getPrecision();
if (logicalType instanceof TimestampType) {
return ((TimestampType) logicalType).getPrecision();
} else if (logicalType instanceof LocalZonedTimestampType) {
return ((LocalZonedTimestampType) logicalType).getPrecision();
} else {
throw new AssertionError("Unexpected type: " + logicalType);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;

import java.io.Serializable;
import java.math.BigDecimal;
Expand Down Expand Up @@ -157,8 +156,9 @@ public Object convert(Schema schema, Object object) {
}
};
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
final int precision = ((TimestampType) type).getPrecision();
final int precision = DataTypeUtils.precision(type);
if (precision <= 3) {
converter =
new RowDataToAvroConverter() {
Expand Down Expand Up @@ -231,7 +231,7 @@ public Object convert(Schema schema, Object object) {
actualSchema = types.get(1);
} else {
throw new IllegalArgumentException(
"The Avro schema is not a nullable type: " + schema.toString());
"The Avro schema is not a nullable type: " + schema);
}
} else {
actualSchema = schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.junit.jupiter.params.provider.ValueSource;

import java.io.File;
import java.time.ZoneId;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -1438,6 +1439,37 @@ void testWriteReadWithComputedColumns() {
assertRowsEquals(result2, "[+I[3]]");
}

@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testWriteReadWithLocalTimestamp(HoodieTableType tableType) {
TableEnvironment tableEnv = batchTableEnv;
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
String createTable = sql("t1")
.field("f0 int")
.field("f1 varchar(10)")
.field("f2 TIMESTAMP_LTZ(3)")
.field("f4 TIMESTAMP_LTZ(6)")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.PRECOMBINE_FIELD, "f1")
.option(FlinkOptions.TABLE_TYPE, tableType)
.pkField("f0")
.noPartition()
.end();
tableEnv.executeSql(createTable);

String insertInto = "insert into t1 values\n"
+ "(1, 'abc', TIMESTAMP '1970-01-01 08:00:01', TIMESTAMP '1970-01-01 08:00:02'),\n"
+ "(2, 'def', TIMESTAMP '1970-01-01 08:00:03', TIMESTAMP '1970-01-01 08:00:04')";
execInsertSql(tableEnv, insertInto);

List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
final String expected = "["
+ "+I[1, abc, 1970-01-01T00:00:01Z, 1970-01-01T00:00:02Z], "
+ "+I[2, def, 1970-01-01T00:00:03Z, 1970-01-01T00:00:04Z]]";
assertRowsEquals(result, expected);
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.util.AvroSchemaConverter;

import org.apache.avro.Schema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -50,4 +51,41 @@ void testUnionSchemaWithMultipleRecordTypes() {
+ "`isDeleted` BOOLEAN NOT NULL>";
assertThat(dataType.getChildren().get(pos).toString(), is(expected));
}

@Test
void testLocalTimestampType() {
DataType dataType = DataTypes.ROW(
DataTypes.FIELD("f_localtimestamp_millis", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
DataTypes.FIELD("f_localtimestamp_micros", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))
);
// convert to avro schema
Schema schema = AvroSchemaConverter.convertToSchema(dataType.getLogicalType());
final String expectedSchema = ""
+ "[ \"null\", {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"f_localtimestamp_millis\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"long\",\n"
+ " \"logicalType\" : \"local-timestamp-millis\"\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_localtimestamp_micros\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"long\",\n"
+ " \"logicalType\" : \"local-timestamp-micros\"\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " } ]\n"
+ "} ]";
assertThat(schema.toString(true), is(expectedSchema));
// convert it back
DataType convertedDataType = AvroSchemaConverter.convertToDataType(schema);
final String expectedDataType = "ROW<"
+ "`f_localtimestamp_millis` TIMESTAMP_LTZ(3), "
+ "`f_localtimestamp_micros` TIMESTAMP_LTZ(6)>";
assertThat(convertedDataType.toString(), is(expectedDataType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -334,7 +335,10 @@ private static ColumnReader createColumnReader(
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT64:
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
int precision = fieldType instanceof TimestampType
? ((TimestampType) fieldType).getPrecision()
: ((LocalZonedTimestampType) fieldType).getPrecision();
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -334,7 +335,10 @@ private static ColumnReader createColumnReader(
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT64:
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
int precision = fieldType instanceof TimestampType
? ((TimestampType) fieldType).getPrecision()
: ((LocalZonedTimestampType) fieldType).getPrecision();
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -334,7 +335,10 @@ private static ColumnReader createColumnReader(
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT64:
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
int precision = fieldType instanceof TimestampType
? ((TimestampType) fieldType).getPrecision()
: ((LocalZonedTimestampType) fieldType).getPrecision();
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
default:
Expand Down