Skip to content

Commit

Permalink
fix for datetime in mysql
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed Nov 23, 2024
1 parent 8aec55d commit 053a9ce
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import io.airbyte.cdk.util.Jsons
import io.micronaut.context.annotation.Primary
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.time.format.DateTimeFormatterBuilder
import java.time.format.DateTimeParseException
import java.time.temporal.ChronoField
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import javax.inject.Singleton
Expand Down Expand Up @@ -312,7 +314,13 @@ class MysqlJdbcPartitionFactory(
val timestampInStatePattern = "yyyy-MM-dd'T'HH:mm:ss"
try {
val formatter: DateTimeFormatter =
DateTimeFormatter.ofPattern(timestampInStatePattern)
DateTimeFormatterBuilder()
.appendPattern(timestampInStatePattern)
.optionalStart()
.appendFraction(ChronoField.NANO_OF_SECOND, 1, 9, true)
.optionalEnd()
.toFormatter()

Jsons.textNode(
LocalDateTime.parse(stateValue, formatter)
.format(LocalDateTimeCodec.formatter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.airbyte.cdk.discover.MetaFieldDecorator
import io.airbyte.cdk.jdbc.BinaryStreamFieldType
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
import io.airbyte.cdk.jdbc.IntFieldType
import io.airbyte.cdk.jdbc.LocalDateTimeFieldType
import io.airbyte.cdk.jdbc.OffsetDateTimeFieldType
import io.airbyte.cdk.output.BufferingOutputConsumer
import io.airbyte.cdk.read.ConcurrencyResource
Expand Down Expand Up @@ -89,6 +90,20 @@ class MysqlJdbcPartitionFactoryTest {
configuredCursor = binaryFieldId,
)

val datetimeFieldId = Field("id4", LocalDateTimeFieldType)

val datetimeStream =
Stream(
id =
StreamIdentifier.from(
StreamDescriptor().withNamespace("test").withName("stream4")
),
schema = setOf(datetimeFieldId),
configuredSyncMode = ConfiguredSyncMode.INCREMENTAL,
configuredPrimaryKey = listOf(datetimeFieldId),
configuredCursor = datetimeFieldId,
)

private fun sharedState(
global: Boolean = false,
): DefaultJdbcSharedState {
Expand Down Expand Up @@ -235,6 +250,37 @@ class MysqlJdbcPartitionFactoryTest {
)
}

@Test
fun testResumeFromCompletedCursorBasedReadTimestampWithoutTimezone() {
val incomingStateValue: OpaqueStateValue =
Jsons.readTree(
"""
{
"cursor": "2024-11-21T11:59:57.123",
"version": 2,
"state_type": "cursor_based",
"stream_name": "stream4",
"cursor_field": [
"id4"
],
"stream_namespace": "test",
"cursor_record_count": 1
}
""".trimIndent()
)

val jdbcPartition =
mysqlJdbcPartitionFactory.create(
streamFeedBootstrap(datetimeStream, incomingStateValue)
)
assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition)

assertEquals(
Jsons.valueToTree("2024-11-21T11:59:57.123000"),
(jdbcPartition as MysqlJdbcCursorIncrementalPartition).cursorLowerBound
)
}

@Test
fun testResumeFromCursorBasedReadInitialRead() {
val incomingStateValue: OpaqueStateValue =
Expand Down

0 comments on commit 053a9ce

Please sign in to comment.