Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

/**
* Timestamp {@link org.apache.flink.formats.parquet.vector.reader.ColumnReader} that supports INT64 8 bytes,
Expand All @@ -40,12 +42,25 @@ public class Int64TimestampColumnReader extends AbstractColumnReader<WritableTim

private final boolean utcTimestamp;

private final ChronoUnit chronoUnit;

public Int64TimestampColumnReader(
boolean utcTimestamp,
ColumnDescriptor descriptor,
PageReader pageReader) throws IOException {
PageReader pageReader,
int precision) throws IOException {
super(descriptor, pageReader);
this.utcTimestamp = utcTimestamp;
if (precision <= 3) {
this.chronoUnit = ChronoUnit.MILLIS;
} else if (precision <= 6) {
this.chronoUnit = ChronoUnit.MICROS;
} else {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
}
checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
}

Expand All @@ -59,7 +74,7 @@ protected void readBatch(int rowId, int num, WritableTimestampVector column) {
for (int i = 0; i < num; i++) {
if (runLenDecoder.readInteger() == maxDefLevel) {
ByteBuffer buffer = readDataBuffer(8);
column.setTimestamp(rowId + i, int64ToTimestamp(utcTimestamp, buffer.getLong()));
column.setTimestamp(rowId + i, int64ToTimestamp(utcTimestamp, buffer.getLong(), chronoUnit));
} else {
column.setNullAt(rowId + i);
}
Expand All @@ -75,25 +90,30 @@ protected void readBatchFromDictionaryIds(
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.setTimestamp(i, decodeInt64ToTimestamp(
utcTimestamp, dictionary, dictionaryIds.getInt(i)));
utcTimestamp, dictionary, dictionaryIds.getInt(i), chronoUnit));
}
}
}

public static TimestampData decodeInt64ToTimestamp(
boolean utcTimestamp,
org.apache.parquet.column.Dictionary dictionary,
int id) {
int id,
ChronoUnit unit) {
long value = dictionary.decodeToLong(id);
return int64ToTimestamp(utcTimestamp, value);
return int64ToTimestamp(utcTimestamp, value, unit);
}

private static TimestampData int64ToTimestamp(boolean utcTimestamp, long millionsOfDay) {
private static TimestampData int64ToTimestamp(
boolean utcTimestamp,
long interval,
ChronoUnit unit) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

millionsOfDay => interval

final Instant instant = Instant.EPOCH.plus(interval, unit);
if (utcTimestamp) {
return TimestampData.fromEpochMillis(millionsOfDay, 0);
return TimestampData.fromInstant(instant);
} else {
Timestamp timestamp = new Timestamp(millionsOfDay);
return TimestampData.fromTimestamp(timestamp);
// this applies the local timezone
return TimestampData.fromTimestamp(Timestamp.from(instant));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -329,7 +330,7 @@ private static ColumnReader createColumnReader(
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT64:
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader);
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType)fieldType).getPrecision());
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,13 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
org.apache.avro.LogicalType avroLogicalType;
if (precision <= 3) {
avroLogicalType = LogicalTypes.timestampMillis();
} else if (precision <= 6) {
avroLogicalType = LogicalTypes.timestampMicros();
} else {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type "
+ "with precision: "
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 3.");
+ ", it only supports precision less than 6.");
}
Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(timestamp) : timestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeFieldType;
Expand All @@ -45,6 +46,7 @@
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -125,7 +127,7 @@ private static AvroToRowDataConverter createConverter(LogicalType type) {
case TIME_WITHOUT_TIME_ZONE:
return AvroToRowDataConverters::convertToTime;
case TIMESTAMP_WITHOUT_TIME_ZONE:
return AvroToRowDataConverters::convertToTimestamp;
return createTimestampConverter(((TimestampType) type).getPrecision());
case CHAR:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also instantiate the chronoUnit first based on the type precision.

case VARCHAR:
return avroObject -> StringData.fromString(avroObject.toString());
Expand Down Expand Up @@ -200,22 +202,36 @@ private static AvroToRowDataConverter createMapConverter(LogicalType type) {
};
}

private static TimestampData convertToTimestamp(Object object) {
final long millis;
if (object instanceof Long) {
millis = (Long) object;
} else if (object instanceof Instant) {
millis = ((Instant) object).toEpochMilli();
private static AvroToRowDataConverter createTimestampConverter(int precision) {
final ChronoUnit chronoUnit;
if (precision <= 3) {
chronoUnit = ChronoUnit.MILLIS;
} else if (precision <= 6) {
chronoUnit = ChronoUnit.MICROS;
} else {
JodaConverter jodaConverter = JodaConverter.getConverter();
if (jodaConverter != null) {
millis = jodaConverter.convertTimestamp(object);
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
}
return avroObject -> {
final Instant instant;
if (avroObject instanceof Long) {
instant = Instant.EPOCH.plus((Long) avroObject, chronoUnit);
} else if (avroObject instanceof Instant) {
instant = (Instant) avroObject;
} else {
throw new IllegalArgumentException(
"Unexpected object type for TIMESTAMP logical type. Received: " + object);
JodaConverter jodaConverter = JodaConverter.getConverter();
if (jodaConverter != null) {
// joda time has only millisecond precision
instant = Instant.ofEpochMilli(jodaConverter.convertTimestamp(avroObject));
} else {
throw new IllegalArgumentException(
"Unexpected object type for TIMESTAMP logical type. Received: " + avroObject);
}
}
}
return TimestampData.fromEpochMillis(millis);
return TimestampData.fromInstant(instant);
};
}

private static int convertToDate(Object object) {
Expand Down Expand Up @@ -272,10 +288,9 @@ private static byte[] convertToBytes(Object object) {
static class JodaConverter {

private static JodaConverter instance;
private static boolean instantiated = false;

public static JodaConverter getConverter() {
if (instantiated) {
if (instance != null) {
return instance;
}

Expand All @@ -287,8 +302,6 @@ public static JodaConverter getConverter() {
instance = new JodaConverter();
} catch (ClassNotFoundException e) {
instance = null;
} finally {
instantiated = true;
}
return instance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -151,15 +154,30 @@ public Object convert(Schema schema, Object object) {
};
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
final int precision = ((TimestampType) type).getPrecision();
if (precision <= 3) {
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(Schema schema, Object object) {
return ((TimestampData) object).toInstant().toEpochMilli();
}
};
@Override
public Object convert(Schema schema, Object object) {
return ((TimestampData) object).toInstant().toEpochMilli();
}
};
} else if (precision <= 6) {
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;

@Override
public Object convert(Schema schema, Object object) {
return ChronoUnit.MICROS.between(Instant.EPOCH, ((TimestampData) object).toInstant());
}
};
} else {
throw new UnsupportedOperationException("Unsupported timestamp precision: " + precision);
}
break;
case DECIMAL:
converter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.util;

import org.apache.flink.table.types.logical.TimestampType;
import org.apache.hudi.common.util.ValidationUtils;

import org.apache.flink.annotation.Internal;
Expand All @@ -29,7 +30,9 @@

import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;

/**
Expand Down Expand Up @@ -85,7 +88,14 @@ private static Converter getConverter(LogicalType logicalType) {
// see HoodieAvroUtils#convertValueForAvroLogicalTypes
return field -> (int) LocalDate.parse(field).toEpochDay();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return field -> TimestampData.fromEpochMillis(Long.parseLong(field));
final int precision = ((TimestampType) logicalType).getPrecision();
if (precision <= 3) {
return field -> TimestampData.fromInstant(Instant.EPOCH.plus(Long.parseLong(field), ChronoUnit.MILLIS));
} else if (precision <= 6) {
return field -> TimestampData.fromInstant(Instant.EPOCH.plus(Long.parseLong(field), ChronoUnit.MICROS));
} else {
throw new UnsupportedOperationException("Unsupported type: " + logicalType);
}
case CHAR:
case VARCHAR:
return StringData::fromString;
Expand All @@ -100,8 +110,7 @@ private static Converter getConverter(LogicalType logicalType) {
decimalType.getPrecision(),
decimalType.getScale());
default:
throw new UnsupportedOperationException(
"Unsupported type " + logicalType.getTypeRoot() + " for " + StringToRowDataConverter.class.getName());
throw new UnsupportedOperationException("Unsupported type: " + logicalType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,42 @@ void testWriteAndReadParMiddle(ExecMode execMode) throws Exception {
assertRowsEquals(result2, expected);
}

@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testWriteAndReadWithTimestampMicros(ExecMode execMode) throws Exception {
boolean streaming = execMode == ExecMode.STREAM;
String hoodieTableDDL = sql("t1")
.field("id int")
.field("name varchar(10)")
.field("ts timestamp(6)")
.pkField("id")
.noPartition()
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, streaming)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 values\n"
+ "(1,'Danny',TIMESTAMP '2021-12-01 01:02:01.100001'),\n"
+ "(2,'Stephen',TIMESTAMP '2021-12-02 03:04:02.200002'),\n"
+ "(3,'Julian',TIMESTAMP '2021-12-03 13:14:03.300003'),\n"
+ "(4,'Fabian',TIMESTAMP '2021-12-04 15:16:04.400004')";
execInsertSql(streamTableEnv, insertInto);

final String expected = "["
+ "+I[1, Danny, 2021-12-01T01:02:01.100001], "
+ "+I[2, Stephen, 2021-12-02T03:04:02.200002], "
+ "+I[3, Julian, 2021-12-03T13:14:03.300003], "
+ "+I[4, Fabian, 2021-12-04T15:16:04.400004]]";

List<Row> result = execSelectSql(streamTableEnv, "select * from t1", execMode);
assertRowsEquals(result, expected);

// insert another batch of data
execInsertSql(streamTableEnv, insertInto);
List<Row> result2 = execSelectSql(streamTableEnv, "select * from t1", execMode);
assertRowsEquals(result2, expected);
}

@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testInsertOverwrite(ExecMode execMode) {
Expand Down
Loading