-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Arrow Connector : Preserve TIME and TIMESTAMP as Wall-Clock (Align with JDBC Connector) #25901
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c631eab
8159a3d
c5f4adb
4415b48
d290d44
af5f46e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,7 +73,11 @@ | |
| import java.math.BigDecimal; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.time.Duration; | ||
| import java.time.Instant; | ||
| import java.time.LocalDateTime; | ||
| import java.time.LocalTime; | ||
| import java.time.ZoneId; | ||
| import java.time.ZoneOffset; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
@@ -432,11 +436,25 @@ public void assignBlockFromTimeStampMilliVector(TimeStampMilliVector vector, Typ | |
| } | ||
| else { | ||
| long millis = vector.get(i); | ||
| type.writeLong(builder, millis); | ||
| long localMillis = getLocalMillisFromUTCMillis(millis); | ||
| type.writeLong(builder, localMillis); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private static long getLocalMillisFromUTCMillis(long millis) | ||
| { | ||
| // Interpret Arrow millis as if they were "local wall-clock time" in datasource | ||
| LocalDateTime localDateTime = Instant.ofEpochMilli(millis) | ||
| .atZone(ZoneOffset.UTC) // treat raw millis as UTC | ||
| .toLocalDateTime(); | ||
|
|
||
| // Rebuild millis in local epoch (wall-clock, no shift) | ||
| return localDateTime.atZone(ZoneId.systemDefault()) | ||
|
lithinwxd marked this conversation as resolved.
|
||
| .toInstant() | ||
| .toEpochMilli(); | ||
| } | ||
|
|
||
| public void assignBlockFromFloat8Vector(Float8Vector vector, Type type, BlockBuilder builder, int startIndex, int endIndex) | ||
| { | ||
| for (int i = startIndex; i < endIndex; i++) { | ||
|
|
@@ -559,7 +577,9 @@ public void assignBlockFromTimeMilliVector(TimeMilliVector vector, Type type, Bl | |
| } | ||
| else { | ||
| long millis = vector.get(i); | ||
| type.writeLong(builder, millis); | ||
| // Interpret Arrow millis as if they were "local wall-clock time" in datasource | ||
| long localMillis = getLocalMillisFromUTCMillis(millis); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A |
||
| type.writeLong(builder, localMillis); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -639,7 +659,9 @@ public void assignBlockFromTimeMilliTZVector(TimeStampMilliTZVector vector, Type | |
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A |
||
| else { | ||
| long millis = vector.get(i); | ||
| type.writeLong(builder, millis); | ||
| // Interpret Arrow millis as if they were "local wall-clock time" in datasource | ||
| long localMillis = getLocalMillisFromUTCMillis(millis); | ||
| type.writeLong(builder, localMillis); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,7 +66,11 @@ | |
| import java.math.BigDecimal; | ||
| import java.math.BigInteger; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.time.Instant; | ||
| import java.time.LocalDate; | ||
| import java.time.LocalDateTime; | ||
| import java.time.ZoneId; | ||
| import java.time.ZoneOffset; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
|
|
@@ -726,6 +730,15 @@ public void testAssignTimestampType() | |
|
|
||
| Block block = builder.build(); | ||
| long result = timestampType.getLong(block, 0); | ||
| // Recompute expected according to current reinterpretation logic | ||
| LocalDateTime localDateTime = Instant.ofEpochMilli(value) | ||
| .atZone(ZoneOffset.UTC) // interpret Arrow millis as UTC | ||
| .toLocalDateTime(); | ||
|
|
||
| value = localDateTime | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: don't reuse |
||
| .atZone(ZoneId.systemDefault()) // reinterpret in system default | ||
| .toInstant() | ||
| .toEpochMilli(); | ||
| assertEquals(result, value); | ||
| } | ||
| } | ||
|
|
@@ -748,6 +761,15 @@ public void testAssignTimestampTypeWithSqlTimestamp() | |
|
|
||
| Block block = builder.build(); | ||
| long result = timestampType.getLong(block, 0); | ||
| // Recompute expected according to current reinterpretation logic | ||
| LocalDateTime localDateTime = Instant.ofEpochMilli(expectedMillis) | ||
| .atZone(ZoneOffset.UTC) // interpret Arrow millis as UTC | ||
| .toLocalDateTime(); | ||
|
|
||
| expectedMillis = localDateTime | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: change the definition of |
||
| .atZone(ZoneId.systemDefault()) // reinterpret in system default | ||
| .toInstant() | ||
| .toEpochMilli(); | ||
| assertEquals(result, expectedMillis); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -102,7 +102,7 @@ public void getStream(CallContext callContext, Ticket ticket, ServerStreamListen | |
|
|
||
| try (ResultSet resultSet = stmt.executeQuery(query.toUpperCase())) { | ||
| JdbcToArrowConfig config = new JdbcToArrowConfigBuilder().setAllocator(allocator).setTargetBatchSize(2048) | ||
| .setCalendar(Calendar.getInstance(TimeZone.getDefault())).build(); | ||
| .setCalendar(Calendar.getInstance(TimeZone.getTimeZone("UTC"))).build(); | ||
|
lithinwxd marked this conversation as resolved.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is wrong, an Arrow You shouldn't need to change this in the test, it is saying that our datasource is generating localtime data with a timezone, resulting in a |
||
| Schema schema = jdbcToArrowSchema(resultSet.getMetaData(), config); | ||
| try (VectorSchemaRoot streamRoot = VectorSchemaRoot.create(schema, allocator)) { | ||
| VectorLoader loader = new VectorLoader(streamRoot); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about
assignBlockFromTimeStampMicroVector?