diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 045c75f0e655..1c83e4961e47 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,7 +174,10 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.40.7 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. | +| 0.40.8 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator | +| 0.38.3 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | (backport) Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator | +| 0.40.7 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. | +| ~~0.40.6~~ | | | (this version does not exist) | | 0.40.5 | 2024-06-26 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging | | 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging | | 0.40.4 | 2024-06-18 | [\#40254](https://github.com/airbytehq/airbyte/pull/40254) | Destinations: Do not throw on unrecognized airbyte message type (ignore message instead) | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index f2abf075b53b..17a0c4aa56d6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.40.7 +version=0.40.8 diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt index f30c69b7f0d5..4ccc77d75256 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.ObjectNode import io.airbyte.cdk.db.jdbc.JdbcDatabase import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst @@ -50,7 +51,8 @@ abstract class JdbcDestinationHandler( protected val catalogName: String?, protected val jdbcDatabase: JdbcDatabase, protected val rawTableNamespace: String, - private val dialect: SQLDialect + private val dialect: SQLDialect, + private val columns: DestinationColumns = DestinationColumns.V2_WITH_GENERATION, ) : DestinationHandler { protected val dslContext: DSLContext get() = DSL.using(dialect) @@ -363,6 +365,14 @@ abstract class JdbcDestinationHandler( ) } + protected open fun isAirbyteGenerationColumnMatch(existingTable: TableDefinition): Boolean { + return toJdbcTypeName(AirbyteProtocolType.INTEGER) + .equals( + existingTable.columns.getValue(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID).type, + ignoreCase = true, + ) + } + open protected fun existingSchemaMatchesStreamConfig( stream: StreamConfig?, existingTable: TableDefinition @@ -375,7 +385,11 @@ abstract class JdbcDestinationHandler( JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT ) && isAirbyteExtractedAtColumnMatch(existingTable)) || !(existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_META) && - isAirbyteMetaColumnMatch(existingTable)) + isAirbyteMetaColumnMatch(existingTable)) || + (columns == DestinationColumns.V2_WITH_GENERATION && + !(existingTable.columns.containsKey( + JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID + ) && isAirbyteGenerationColumnMatch(existingTable))) ) { // Missing AB meta columns from final table, we need them to do proper T+D so trigger // soft-reset diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt index b66b003aef0e..656408b43bbb 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt @@ -5,6 +5,7 @@ package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping import com.google.common.annotations.VisibleForTesting import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns import io.airbyte.cdk.integrations.destination.NamingConventionTransformer import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType @@ -23,12 +24,11 @@ import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf import io.airbyte.protocol.models.v0.DestinationSyncMode import java.sql.Timestamp import java.time.Instant -import java.util.* -import kotlin.Any -import kotlin.Boolean -import kotlin.IllegalArgumentException +import java.util.Locale +import java.util.Optional import kotlin.Int import org.jooq.Condition +import org.jooq.CreateTableColumnStep import org.jooq.DSLContext import org.jooq.DataType import org.jooq.Field @@ -37,6 +37,7 @@ import org.jooq.Name import org.jooq.Record import org.jooq.SQLDialect import org.jooq.SelectConditionStep +import org.jooq.SelectFieldOrAsterisk import org.jooq.conf.ParamType import org.jooq.impl.DSL import org.jooq.impl.SQLDataType @@ -45,7 +46,9 @@ abstract class JdbcSqlGenerator @JvmOverloads constructor( protected val namingTransformer: NamingConventionTransformer, - private val cascadeDrop: Boolean = false + private val cascadeDrop: Boolean = false, + @VisibleForTesting + internal val columns: DestinationColumns = DestinationColumns.V2_WITH_GENERATION, ) : SqlGenerator { protected val cdcDeletedAtColumn: ColumnId = buildColumnId("_ab_cdc_deleted_at") @@ -199,6 +202,9 @@ constructor( SQLDataType.VARCHAR(36).nullable(false) metaColumns[JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT] = timestampWithTimeZoneType.nullable(false) + if (columns == DestinationColumns.V2_WITH_GENERATION) { + metaColumns[JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID] = SQLDataType.BIGINT + } if (includeMetaColumn) metaColumns[JavaBaseConstants.COLUMN_NAME_AB_META] = structType.nullable(false) return metaColumns @@ -332,38 +338,50 @@ constructor( rawTableName: Name, namespace: String, tableName: String - ) = - dslContext - .createTable(rawTableName) - .column( - JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, - SQLDataType.VARCHAR(36).nullable(false), - ) - .column( - JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, - timestampWithTimeZoneType.nullable(false), - ) - .column( - JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, - timestampWithTimeZoneType.nullable(true), - ) - .column(JavaBaseConstants.COLUMN_NAME_DATA, structType.nullable(false)) - .column(JavaBaseConstants.COLUMN_NAME_AB_META, structType.nullable(true)) - .`as`( - DSL.select( - DSL.field(JavaBaseConstants.COLUMN_NAME_AB_ID) - .`as`(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID), - DSL.field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT) - .`as`(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT), - DSL.cast(null, timestampWithTimeZoneType) - .`as`(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT), - DSL.field(JavaBaseConstants.COLUMN_NAME_DATA) - .`as`(JavaBaseConstants.COLUMN_NAME_DATA), - DSL.cast(null, structType).`as`(JavaBaseConstants.COLUMN_NAME_AB_META), - ) - .from(DSL.table(DSL.name(namespace, tableName))), + ): String { + val hasGenerationId = columns == DestinationColumns.V2_WITH_GENERATION + + val createTable: CreateTableColumnStep = + dslContext + .createTable(rawTableName) + .column( + JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, + SQLDataType.VARCHAR(36).nullable(false), + ) + .column( + JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, + timestampWithTimeZoneType.nullable(false), + ) + .column( + JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, + timestampWithTimeZoneType.nullable(true), + ) + .column(JavaBaseConstants.COLUMN_NAME_DATA, structType.nullable(false)) + .column(JavaBaseConstants.COLUMN_NAME_AB_META, structType.nullable(true)) + if (hasGenerationId) { + createTable.column(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, SQLDataType.BIGINT) + } + + val selectColumns: MutableList = + mutableListOf( + DSL.field(JavaBaseConstants.COLUMN_NAME_AB_ID) + .`as`(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID), + DSL.field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT) + .`as`(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT), + DSL.cast(null, timestampWithTimeZoneType) + .`as`(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT), + DSL.field(JavaBaseConstants.COLUMN_NAME_DATA) + .`as`(JavaBaseConstants.COLUMN_NAME_DATA), + DSL.cast(null, structType).`as`(JavaBaseConstants.COLUMN_NAME_AB_META), ) + if (hasGenerationId) { + selectColumns += DSL.value(0).`as`(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID) + } + + return createTable + .`as`(DSL.select(selectColumns).from(DSL.table(DSL.name(namespace, tableName)))) .getSQL(ParamType.INLINED) + } override fun clearLoadedAt(streamId: StreamId): Sql { return of( diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt index 40406c36cdeb..c8350282cfbc 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt @@ -7,12 +7,14 @@ import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.db.jdbc.JdbcDatabase import io.airbyte.cdk.integrations.base.JavaBaseConstants import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT +import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_ID import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_EMITTED_AT +import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns import io.airbyte.cdk.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType @@ -90,7 +92,7 @@ abstract class JdbcSqlGeneratorIntegrationTest) { insertRecords( DSL.name(streamId.rawNamespace, streamId.rawName), - JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES, + sqlGenerator.columns.rawColumns, records, COLUMN_NAME_DATA, COLUMN_NAME_AB_META @@ -143,9 +147,12 @@ abstract class JdbcSqlGeneratorIntegrationTest, generationId: Long, ) { - // TODO handle generation ID val columnNames = - if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES + (if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES) + .toMutableList() + if (sqlGenerator.columns == DestinationColumns.V2_WITH_GENERATION) { + columnNames += COLUMN_NAME_AB_GENERATION_ID + } insertRecords( DSL.name(streamId.finalNamespace, streamId.finalName + suffix), columnNames,