From cfafb7be45b0a6e3f206daf65dc3cb00fcc53ee9 Mon Sep 17 00:00:00 2001 From: yoli22 Date: Sun, 26 Oct 2025 23:20:14 +0900 Subject: [PATCH 1/5] [Feature][Connector-V2] Kafka Source support to read TIMESTAMP_TZ --- .../seatunnel/common/utils/DateTimeUtils.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java index b1e0856e1fb..0a6d9fe769a 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java @@ -118,6 +118,13 @@ public class DateTimeUtils { public static final DateTimeFormatter YYYY_MM_DD_HH_MM_SS_14_FORMATTER = DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS_NO_SPLIT.value); + // offset datetime formatter map + public static final Map OFFSET_DATETIME_FORMATTER_MAP = + new LinkedHashMap<>(); + + public static Set> + OFFSET_DATETIME_FORMATTER_MAP_ENTRY_SET = new LinkedHashSet<>(); + static { YYYY_MM_DD_HH_MM_SS_19_FORMATTER_MAP.put( Pattern.compile("\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}"), @@ -210,6 +217,42 @@ public class DateTimeUtils { YYYY_M_D_HH_MM_15_FORMATTER_MAP_ENTRY_SET.addAll( YYYY_M_D_HH_MM_15_FORMATTER_MAP.entrySet()); + + OFFSET_DATETIME_FORMATTER_MAP.put( + Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}Z"), + DateTimeFormatter.ISO_OFFSET_DATE_TIME); + + OFFSET_DATETIME_FORMATTER_MAP.put( + Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}[+-]\\d{2}:\\d{2}"), + DateTimeFormatter.ISO_OFFSET_DATE_TIME); + + OFFSET_DATETIME_FORMATTER_MAP.put( + Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{1,9}Z"), + DateTimeFormatter.ISO_OFFSET_DATE_TIME); + + OFFSET_DATETIME_FORMATTER_MAP.put( + Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{1,9}[+-]\\d{2}:\\d{2}"), + DateTimeFormatter.ISO_OFFSET_DATE_TIME); + + OFFSET_DATETIME_FORMATTER_MAP_ENTRY_SET.addAll( + OFFSET_DATETIME_FORMATTER_MAP.entrySet()); + } + + /** + * gave an offset datetime string and return the {@link DateTimeFormatter} which can be used to + * parse it. + * + * @param dateTime eg: 2020-02-03T12:12:10Z or 2020-02-03T12:12:10+09:00 + * @return the DateTimeFormatter matched, will return null when not matched any pattern + */ + public static DateTimeFormatter matchOffsetDateTimeFormatter(String dateTime) { + for (Map.Entry entry : + OFFSET_DATETIME_FORMATTER_MAP_ENTRY_SET) { + if (entry.getKey().matcher(dateTime).matches()) { + return entry.getValue(); + } + } + return null; } /** From 14f472bdb6a00b293c41e376fdbb25e08d0a5665 Mon Sep 17 00:00:00 2001 From: yoli22 Date: Sun, 26 Oct 2025 23:23:14 +0900 Subject: [PATCH 2/5] [Feature][Connector-V2] Kafka Source support to read TIMESTAMP_TZ --- .../format/avro/AvroToRowConverter.java | 1 + .../format/json/JsonToRowConverters.java | 40 +++++++++++++++++++ .../protobuf/ProtobufToRowConverter.java | 37 ++++++++--------- .../text/TextDeserializationSchema.java | 30 ++++++++++++++ 4 files changed, 90 insertions(+), 18 deletions(-) diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java index 84b10636001..37f53146b8b 100644 --- a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java +++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java @@ -98,6 +98,7 @@ private Object convertField(SeaTunnelDataType dataType, Object val) { case DATE: case DECIMAL: case TIMESTAMP: + case TIMESTAMP_TZ: return val; case BYTES: return ((ByteBuffer) val).array(); diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java index 949392d1724..66cb47829a1 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java @@ -38,9 +38,13 @@ import java.io.Serializable; import java.lang.reflect.Array; import java.math.BigDecimal; +import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoField; @@ -155,6 +159,13 @@ public Object convert(JsonNode jsonNode, String fieldName) { return convertToLocalDateTime(jsonNode, fieldName); } }; + case TIMESTAMP_TZ: + return new JsonToObjectConverter() { + @Override + public Object convert(JsonNode jsonNode, String fieldName) { + return convertToOffsetDateTime(jsonNode, fieldName); + } + }; case FLOAT: return new JsonToObjectConverter() { @Override @@ -284,6 +295,35 @@ private LocalDateTime convertToLocalDateTime(JsonNode jsonNode, String fieldName return LocalDateTime.of(localDate, localTime); } + private OffsetDateTime convertToOffsetDateTime(JsonNode jsonNode, String fieldName) { + String datetimeStr = jsonNode.asText(); + DateTimeFormatter dateTimeFormatter = fieldFormatterMap.get(fieldName); + + if (dateTimeFormatter == null) { + dateTimeFormatter = DateTimeUtils.matchOffsetDateTimeFormatter(datetimeStr); + fieldFormatterMap.put(fieldName, dateTimeFormatter); + } + + if (dateTimeFormatter == null) { + throw CommonError.formatDateTimeError(datetimeStr, fieldName); + } + + TemporalAccessor parsedTimestamp = dateTimeFormatter.parse(datetimeStr); + LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime()); + LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate()); + ZoneOffset offset = parsedTimestamp.query(TemporalQueries.offset()); + + if (offset == null) { + offset = ZoneId.systemDefault().getRules().getOffset(Instant.now()); + } + + if (localDate == null || localTime == null) { + throw CommonError.formatDateTimeError(datetimeStr, fieldName); + } + + return OffsetDateTime.of(localDate, localTime, offset); + } + private String convertToString(JsonNode jsonNode) { if (jsonNode.isContainerNode()) { return jsonNode.toString(); diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java index 3f7d01b3ff6..a620a872b6e 100644 --- a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java +++ b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java @@ -52,8 +52,8 @@ public Descriptors.Descriptor getDescriptor() { try { descriptor = createDescriptor(); } catch (IOException - | Descriptors.DescriptorValidationException - | InterruptedException e) { + | Descriptors.DescriptorValidationException + | InterruptedException e) { throw new RuntimeException(e); } } @@ -107,6 +107,7 @@ private Object convertField( case DATE: case DECIMAL: case TIMESTAMP: + case TIMESTAMP_TZ: return val; case BYTES: return ((ByteString) val).toByteArray(); @@ -124,22 +125,22 @@ private Object convertField( Map res = ((List) val) .stream() - .collect( - Collectors.toMap( - dm -> - convertField( - descriptor, - dm, - mapType.getKeyType(), - getFieldValue(dm, "key"), - null), - dm -> - convertField( - descriptor, - dm, - mapType.getValueType(), - getFieldValue(dm, "value"), - null))); + .collect( + Collectors.toMap( + dm -> + convertField( + descriptor, + dm, + mapType.getKeyType(), + getFieldValue(dm, "key"), + null), + dm -> + convertField( + descriptor, + dm, + mapType.getValueType(), + getFieldValue(dm, "value"), + null))); return res; case ROW: diff --git a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java index d073f09b6b8..5c118fea7fa 100644 --- a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java @@ -43,9 +43,13 @@ import java.io.IOException; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoField; @@ -257,6 +261,8 @@ private Object convert( return objectArrayList.toArray(new LocalTime[0]); case TIMESTAMP: return objectArrayList.toArray(new LocalDateTime[0]); + case TIMESTAMP_TZ: + return objectArrayList.toArray(new OffsetDateTime[0]); default: throw new SeaTunnelTextFormatException( CommonErrorCode.UNSUPPORTED_DATA_TYPE, @@ -330,6 +336,30 @@ private Object convert( LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime()); LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate()); return LocalDateTime.of(localDate, localTime); + case TIMESTAMP_TZ: + DateTimeFormatter dateTimeTzFormatter = fieldFormatterMap.get(fieldName); + if (dateTimeTzFormatter == null) { + dateTimeTzFormatter = DateTimeUtils.matchDateTimeFormatter(field); + fieldFormatterMap.put(fieldName, dateTimeTzFormatter); + } + if (dateTimeTzFormatter == null) { + throw CommonError.formatDateTimeError(field, fieldName); + } + + TemporalAccessor parsedTimestampTz = dateTimeTzFormatter.parse(field); + LocalTime localTimeTz = parsedTimestampTz.query(TemporalQueries.localTime()); + LocalDate localDateTz = parsedTimestampTz.query(TemporalQueries.localDate()); + ZoneOffset offset = parsedTimestampTz.query(TemporalQueries.offset()); + + if (offset == null) { + offset = ZoneId.systemDefault().getRules().getOffset(Instant.now()); + } + + if (localDateTz == null || localTimeTz == null) { + throw CommonError.formatDateTimeError(field, fieldName); + } + + return OffsetDateTime.of(localDateTz, localTimeTz, offset); case ROW: Map splitsMap = splitLineBySeaTunnelRowType(field, (SeaTunnelRowType) fieldType, level + 1); From bd6efcc9b22a29a23553896bde79c4e4b48502be Mon Sep 17 00:00:00 2001 From: yoli22 Date: Sun, 26 Oct 2025 23:44:06 +0900 Subject: [PATCH 3/5] add test case --- .../json/JsonRowDataSerDeSchemaTest.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java index 8471756b8b5..fa421a12464 100644 --- a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java +++ b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java @@ -28,6 +28,8 @@ import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.LocalTimeType; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import org.apache.seatunnel.api.table.type.MapType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -694,4 +696,35 @@ public void testSerializationWithNumber() { String expected = "{\"id\":1,\"code\":\"1001015\",\"fe_result\":80}"; assertEquals(new String(serialize), expected); } + + @Test + public void testDeserializationWithTimestampTz() throws Exception { + SeaTunnelRowType schema = + new SeaTunnelRowType( + new String[] {"timestamp_tz"}, + new SeaTunnelDataType[] {LocalTimeType.OFFSET_DATE_TIME_TYPE}); + CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "test", schema); + JsonDeserializationSchema deserializationSchema = + new JsonDeserializationSchema(catalogTables, false, false); + + OffsetDateTime timestampUtc = OffsetDateTime.of(2024, 1, 15, 10, 30, 45, 0, ZoneOffset.UTC); + SeaTunnelRow rowUtc = + deserializationSchema.deserialize( + "{\"timestamp_tz\":\"2024-01-15T10:30:45Z\"}".getBytes()); + assertEquals(timestampUtc, rowUtc.getField(0)); + + OffsetDateTime timestampKst = + OffsetDateTime.of(2024, 1, 15, 10, 30, 45, 0, ZoneOffset.ofHours(9)); + SeaTunnelRow rowKst = + deserializationSchema.deserialize( + "{\"timestamp_tz\":\"2024-01-15T10:30:45+09:00\"}".getBytes()); + assertEquals(timestampKst, rowKst.getField(0)); + + OffsetDateTime timestampMillis = + OffsetDateTime.of(2024, 1, 15, 10, 30, 45, 123000000, ZoneOffset.UTC); + SeaTunnelRow rowMillis = + deserializationSchema.deserialize( + "{\"timestamp_tz\":\"2024-01-15T10:30:45.123Z\"}".getBytes()); + assertEquals(timestampMillis, rowMillis.getField(0)); + } } From bf5d0dbee7c108108dddc5339c7f437f71ecdfa9 Mon Sep 17 00:00:00 2001 From: yoli22 Date: Mon, 27 Oct 2025 00:00:39 +0900 Subject: [PATCH 4/5] trigger workflow From 3970ac6888b8f06d8ffc99c25e2dda09137311d0 Mon Sep 17 00:00:00 2001 From: yoli22 Date: Wed, 29 Oct 2025 00:02:23 +0900 Subject: [PATCH 5/5] fix spotless error --- .../seatunnel/common/utils/DateTimeUtils.java | 6 ++-- .../json/JsonRowDataSerDeSchemaTest.java | 4 +-- .../protobuf/ProtobufToRowConverter.java | 36 +++++++++---------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java index 0a6d9fe769a..32a58a37066 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java @@ -231,11 +231,11 @@ public class DateTimeUtils { DateTimeFormatter.ISO_OFFSET_DATE_TIME); OFFSET_DATETIME_FORMATTER_MAP.put( - Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{1,9}[+-]\\d{2}:\\d{2}"), + Pattern.compile( + "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{1,9}[+-]\\d{2}:\\d{2}"), DateTimeFormatter.ISO_OFFSET_DATE_TIME); - OFFSET_DATETIME_FORMATTER_MAP_ENTRY_SET.addAll( - OFFSET_DATETIME_FORMATTER_MAP.entrySet()); + OFFSET_DATETIME_FORMATTER_MAP_ENTRY_SET.addAll(OFFSET_DATETIME_FORMATTER_MAP.entrySet()); } /** diff --git a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java index fa421a12464..e3c40bf07a6 100644 --- a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java +++ b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java @@ -28,8 +28,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.LocalTimeType; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; import org.apache.seatunnel.api.table.type.MapType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -49,6 +47,8 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalQueries; import java.util.HashMap; diff --git a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java index a620a872b6e..6d495cacd08 100644 --- a/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java +++ b/seatunnel-formats/seatunnel-format-protobuf/src/main/java/org/apache/seatunnel/format/protobuf/ProtobufToRowConverter.java @@ -52,8 +52,8 @@ public Descriptors.Descriptor getDescriptor() { try { descriptor = createDescriptor(); } catch (IOException - | Descriptors.DescriptorValidationException - | InterruptedException e) { + | Descriptors.DescriptorValidationException + | InterruptedException e) { throw new RuntimeException(e); } } @@ -125,22 +125,22 @@ private Object convertField( Map res = ((List) val) .stream() - .collect( - Collectors.toMap( - dm -> - convertField( - descriptor, - dm, - mapType.getKeyType(), - getFieldValue(dm, "key"), - null), - dm -> - convertField( - descriptor, - dm, - mapType.getValueType(), - getFieldValue(dm, "value"), - null))); + .collect( + Collectors.toMap( + dm -> + convertField( + descriptor, + dm, + mapType.getKeyType(), + getFieldValue(dm, "key"), + null), + dm -> + convertField( + descriptor, + dm, + mapType.getValueType(), + getFieldValue(dm, "value"), + null))); return res; case ROW: