[HUDI-3184] hudi-flink support timestamp-micros#4548
[HUDI-3184] hudi-flink support timestamp-micros#4548danny0405 merged 9 commits intoapache:masterfrom
Conversation
| switch (unit) { | ||
| case MILLIS: | ||
| chronoUnit = ChronoUnit.MILLIS; | ||
| break; |
There was a problem hiding this comment.
I guess the chronoUnit can be a class member variable right ?
And, we should also fix the non-utc timezone code path.
| case TIMESTAMP_WITHOUT_TIME_ZONE: | ||
| return AvroToRowDataConverters::convertToTimestamp; | ||
| return avroObject -> convertToTimestamp(avroObject, (TimestampType) type); | ||
| case CHAR: |
There was a problem hiding this comment.
We can also instantiate the chronoUnit first based on the type precision.
| + ", it only supports precision less than 6."); | ||
| } | ||
| return TimestampData.fromLocalDateTime( | ||
| LocalDateTime.ofInstant(Instant.ofEpochSecond(0).plus(timestamp, chronoUnit), ZoneId.systemDefault())); |
There was a problem hiding this comment.
We should utc timezone here:
TimestampData.fromInstant(Instant.EPOCH.plus(timestamp, chronoUnit))| final ZoneOffset offset = ZoneOffset.systemDefault().getRules().getOffset(LocalDateTime.now()); | ||
| if (schema.getLogicalType() instanceof LogicalTypes.TimestampMillis) { | ||
| return now.toInstant(offset).toEpochMilli(); | ||
| } else if (schema.getLogicalType() instanceof LogicalTypes.TimestampMicros) { |
There was a problem hiding this comment.
We can instantiate two converters like this:
TimestampType timestampType = (TimestampType) type;
if (timestampType.getPrecision() <= 3) {
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ((TimestampData) object).toInstant().toEpochMilli();
}
};
} else if (timestampType.getPrecision() <= 6) {
converter =
new RowDataToAvroConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Schema schema, Object object) {
return ChronoUnit.MICROS.between(Instant.EPOCH, ((TimestampData) object).toInstant());
}
};
} else {
throw new UnsupportedOperationException("Unsupported type: " + type);
}| } | ||
|
|
||
| private static TimestampData convertToTimestamp(long timestamp, TimestampType logicalType) { | ||
| final int precision = logicalType.getPrecision(); |
There was a problem hiding this comment.
Same as RowDataToAvroConverters, instantiate two converters here.
| switch (unit) { | ||
| case MILLIS: | ||
| return TimeUnit.SECONDS.toMillis(now.toEpochSecond(offset)) | ||
| + now.toInstant(offset).getLong(ChronoField.MILLI_OF_SECOND); |
There was a problem hiding this comment.
Can we convert the TimestampData to Instant first then to the long then ? It is more concise i think.
|
Thanks for the contribution, i have left some comments ~ |
| private static TimestampData int64ToTimestamp( | ||
| boolean utcTimestamp, | ||
| long millionsOfDay, | ||
| ChronoUnit unit) { |
There was a problem hiding this comment.
millionsOfDay => interval
* support both avro and parquet code path * string rowdata conversion is also supported
* support both avro and parquet code path * string rowdata conversion is also supported
* support both avro and parquet code path * string rowdata conversion is also supported
Tips
What is the purpose of the pull request
hudi-flink module support timestamp-micros. (HUDI-3184)
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.