Skip to content

Commit c019d19

Browse files
committed
implement generation id
1 parent 221b105 commit c019d19

File tree

33 files changed

+181
-150
lines changed

33 files changed

+181
-150
lines changed
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
testExecutionConcurrency=-1
2-
# minimum 3 minutes timeout required during parallel workload on our small warehouse
3-
JunitMethodExecutionTimeout=3 m
2+
# increased timeout required during parallel workload on our small warehouse
3+
JunitMethodExecutionTimeout=10 m

airbyte-integrations/connectors/destination-databricks/metadata.yaml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: database
33
connectorType: destination
44
definitionId: 072d5540-f236-4294-ba7c-ade8fd918496
5-
dockerImageTag: 2.0.0
5+
dockerImageTag: 3.0.0
66
dockerRepository: airbyte/destination-databricks
77
githubIssueLabel: destination-databricks
88
icon: databricks.svg
@@ -22,10 +22,15 @@ data:
2222
message: >
2323
**This is a private preview version, Do not upgrade until you review the changes**.\n
2424
This version is a rewrite of the community connector with support for Unity catalog, and staging files using Unity catalog volumes.
25-
This version also introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2),
26-
which provides better error handling, incremental delivery of data for large syncs, and improved final table structures.
25+
This version also introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2),
26+
which provides better error handling, incremental delivery of data for large syncs, and improved final table structures.
2727
To review the breaking changes, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading).
28-
upgradeDeadline: "2024-12-31"
28+
upgradeDeadline: "2025-01-31"
29+
3.0.0:
30+
message: >
31+
This version adds an `_airbyte_generation_id` column to the raw and final tables. If you ran a sync using 2.0.0, you MUST manually drop the
32+
raw and final tables and then clear (reset) your connection; this release will not automatically upgrade your tables.
33+
upgradeDeadline: "2025-01-31"
2934
documentationUrl: https://docs.airbyte.com/integrations/destinations/databricks
3035
tags:
3136
- language:java

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/DatabricksDestination.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,9 @@ class DatabricksDestination : BaseConnector(), Destination {
148148

149149
val sqlGenerator =
150150
DatabricksSqlGenerator(DatabricksNamingTransformer(), connectorConfig.database)
151-
val catalogParser = CatalogParser(sqlGenerator, connectorConfig.rawSchemaOverride)
151+
val defaultNamespace = connectorConfig.schema
152+
val catalogParser =
153+
CatalogParser(sqlGenerator, defaultNamespace, connectorConfig.rawSchemaOverride)
152154
val parsedCatalog = catalogParser.parseCatalog(catalog)
153155
val workspaceClient =
154156
DatabricksConnectorClientsFactory.createWorkspaceClient(
@@ -177,7 +179,7 @@ class DatabricksDestination : BaseConnector(), Destination {
177179
DefaultSyncOperation(
178180
parsedCatalog,
179181
destinationHandler,
180-
connectorConfig.schema,
182+
defaultNamespace,
181183
DatabricksStreamOperationFactory(storageOperations),
182184
listOf()
183185
)
@@ -192,7 +194,7 @@ class DatabricksDestination : BaseConnector(), Destination {
192194
catalog = catalog,
193195
bufferManager =
194196
BufferManager(
195-
defaultNamespace = connectorConfig.schema,
197+
defaultNamespace = defaultNamespace,
196198
(Runtime.getRuntime().maxMemory() * BufferManager.MEMORY_LIMIT_RATIO).toLong(),
197199
),
198200
airbyteMessageDeserializer = AirbyteMessageDeserializer(),

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/jdbc/DatabricksDestinationHandler.kt

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@ package io.airbyte.integrations.destination.databricks.jdbc
66

77
import io.airbyte.cdk.db.jdbc.JdbcDatabase
88
import io.airbyte.cdk.integrations.base.JavaBaseConstants
9+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT
10+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID
11+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META
12+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID
913
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
1014
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
15+
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
1116
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType.STRING
1217
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE
1318
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
@@ -35,9 +40,6 @@ class DatabricksDestinationHandler(
3540
) : DestinationHandler<MinimumDestinationState.Impl> {
3641

3742
private val log = KotlinLogging.logger {}
38-
private val abRawId = DatabricksSqlGenerator.AB_RAW_ID
39-
private val abExtractedAt = DatabricksSqlGenerator.AB_EXTRACTED_AT
40-
private val abMeta = DatabricksSqlGenerator.AB_META
4143

4244
override fun execute(sql: Sql) {
4345
val transactions: List<List<String>> = sql.transactions
@@ -169,26 +171,36 @@ class DatabricksDestinationHandler(
169171
tableDefinition: TableDefinition
170172
): Boolean {
171173
val isAbRawIdMatch =
172-
tableDefinition.columns.contains(abRawId) &&
174+
tableDefinition.columns.contains(COLUMN_NAME_AB_RAW_ID) &&
173175
DatabricksSqlGenerator.toDialectType(STRING) ==
174-
tableDefinition.columns[abRawId]?.type
176+
tableDefinition.columns[COLUMN_NAME_AB_RAW_ID]?.type
175177
val isAbExtractedAtMatch =
176-
tableDefinition.columns.contains(abExtractedAt) &&
178+
tableDefinition.columns.contains(COLUMN_NAME_AB_EXTRACTED_AT) &&
177179
DatabricksSqlGenerator.toDialectType(TIMESTAMP_WITH_TIMEZONE) ==
178-
tableDefinition.columns[abExtractedAt]?.type
180+
tableDefinition.columns[COLUMN_NAME_AB_EXTRACTED_AT]?.type
179181
val isAbMetaMatch =
180-
tableDefinition.columns.contains(abMeta) &&
182+
tableDefinition.columns.contains(COLUMN_NAME_AB_META) &&
181183
DatabricksSqlGenerator.toDialectType(STRING) ==
182-
tableDefinition.columns[abMeta]?.type
183-
if (!isAbRawIdMatch || !isAbExtractedAtMatch || !isAbMetaMatch) return false
184+
tableDefinition.columns[COLUMN_NAME_AB_META]?.type
185+
val isAbGenerationMatch =
186+
tableDefinition.columns.contains(COLUMN_NAME_AB_GENERATION_ID) &&
187+
DatabricksSqlGenerator.toDialectType(AirbyteProtocolType.INTEGER) ==
188+
tableDefinition.columns[COLUMN_NAME_AB_GENERATION_ID]?.type
189+
if (!isAbRawIdMatch || !isAbExtractedAtMatch || !isAbMetaMatch || !isAbGenerationMatch)
190+
return false
184191

185192
val expectedColumns =
186193
streamConfig.columns.entries.associate {
187194
it.key.name to DatabricksSqlGenerator.toDialectType(it.value)
188195
}
189196
val actualColumns =
190197
tableDefinition.columns.entries
191-
.filter { (it.key != abRawId && it.key != abExtractedAt && it.key != abMeta) }
198+
.filter {
199+
(it.key != COLUMN_NAME_AB_RAW_ID &&
200+
it.key != COLUMN_NAME_AB_EXTRACTED_AT &&
201+
it.key != COLUMN_NAME_AB_META &&
202+
it.key != COLUMN_NAME_AB_GENERATION_ID)
203+
}
192204
.associate {
193205
it.key to if (it.value.type != "DECIMAL") it.value.type else "DECIMAL(38, 10)"
194206
}
@@ -249,13 +261,13 @@ class DatabricksDestinationHandler(
249261

250262
val minExtractedAtLoadedNotNullQuery =
251263
"""
252-
|SELECT min(`${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT}`) as last_loaded_at
264+
|SELECT min(`$COLUMN_NAME_AB_EXTRACTED_AT`) as last_loaded_at
253265
|FROM $databaseName.${id.rawTableId(DatabricksSqlGenerator.QUOTE)}
254266
|WHERE ${JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT} IS NULL
255267
|""".trimMargin()
256268
val maxExtractedAtQuery =
257269
"""
258-
|SELECT max(`${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT}`) as last_loaded_at
270+
|SELECT max(`$COLUMN_NAME_AB_EXTRACTED_AT`) as last_loaded_at
259271
|FROM $databaseName.${id.rawTableId(DatabricksSqlGenerator.QUOTE)}
260272
""".trimMargin()
261273

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/jdbc/DatabricksSqlGenerator.kt

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44

55
package io.airbyte.integrations.destination.databricks.jdbc
66

7-
import io.airbyte.cdk.integrations.base.JavaBaseConstants as constants
7+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT as AB_EXTRACTED_AT
8+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID as AB_GENERATION
9+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT as AB_LOADED_AT
10+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META as AB_META
11+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID as AB_RAW_ID
12+
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA as AB_DATA
813
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
914
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
1015
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
@@ -17,34 +22,29 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamId
1722
import io.airbyte.integrations.base.destination.typing_deduping.Struct
1823
import io.airbyte.integrations.base.destination.typing_deduping.Union
1924
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
20-
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange.*
25+
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange.Change
26+
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange.Reason
2127
import io.airbyte.protocol.models.v0.DestinationSyncMode
22-
import io.github.oshai.kotlinlogging.KotlinLogging
2328
import java.time.Instant
24-
import java.util.*
29+
import java.util.Optional
2530

2631
class DatabricksSqlGenerator(
2732
private val namingTransformer: NamingConventionTransformer,
2833
private val unityCatalogName: String,
2934
) : SqlGenerator {
3035

31-
private val log = KotlinLogging.logger {}
3236
private val cdcDeletedColumn = buildColumnId(CDC_DELETED_COLUMN_NAME)
3337
private val metaColumnTypeMap =
3438
mapOf(
3539
buildColumnId(AB_RAW_ID) to AirbyteProtocolType.STRING,
3640
buildColumnId(AB_EXTRACTED_AT) to AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE,
37-
buildColumnId(AB_META) to AirbyteProtocolType.STRING
41+
buildColumnId(AB_META) to AirbyteProtocolType.STRING,
42+
buildColumnId(AB_GENERATION) to AirbyteProtocolType.INTEGER,
3843
)
3944

4045
companion object {
4146
const val QUOTE = "`"
4247
const val CDC_DELETED_COLUMN_NAME = "_ab_cdc_deleted_at"
43-
const val AB_RAW_ID = constants.COLUMN_NAME_AB_RAW_ID
44-
const val AB_EXTRACTED_AT = constants.COLUMN_NAME_AB_EXTRACTED_AT
45-
const val AB_LOADED_AT = constants.COLUMN_NAME_AB_LOADED_AT
46-
const val AB_DATA = constants.COLUMN_NAME_DATA
47-
const val AB_META = constants.COLUMN_NAME_AB_META
4848

4949
fun toDialectType(type: AirbyteType): String {
5050
return when (type) {
@@ -110,7 +110,8 @@ class DatabricksSqlGenerator(
110110
$AB_EXTRACTED_AT TIMESTAMP,
111111
$AB_LOADED_AT TIMESTAMP,
112112
$AB_DATA STRING,
113-
$AB_META STRING
113+
$AB_META STRING,
114+
$AB_GENERATION BIGINT
114115
)
115116
""".trimIndent(),
116117
)
@@ -129,8 +130,8 @@ class DatabricksSqlGenerator(
129130
return Sql.of(
130131
"""
131132
| UPDATE $unityCatalogName.${streamId.rawTableId(QUOTE)}
132-
| SET ${constants.COLUMN_NAME_AB_LOADED_AT} = CURRENT_TIMESTAMP
133-
| WHERE ${constants.COLUMN_NAME_AB_LOADED_AT} IS NULL
133+
| SET $AB_LOADED_AT = CURRENT_TIMESTAMP
134+
| WHERE $AB_LOADED_AT IS NULL
134135
| $extractedAtCondition
135136
| """.trimMargin()
136137
)
@@ -360,6 +361,8 @@ class DatabricksSqlGenerator(
360361
"""
361362
|to_json(
362363
| named_struct(
364+
| "sync_id",
365+
| _airbyte_meta.sync_id,
363366
| "changes",
364367
| array_union(
365368
| _airbyte_type_errors,
@@ -374,7 +377,7 @@ class DatabricksSqlGenerator(
374377
val selectFromRawTable =
375378
"""SELECT
376379
|${projectionColumns.replaceIndent(" ")},
377-
| from_json($AB_META, 'STRUCT<`changes` : ARRAY<STRUCT<`field`: STRING, `change`: STRING, `reason`: STRING>>>') as `_airbyte_meta`,
380+
| from_json($AB_META, 'STRUCT<`sync_id` : BIGINT, `changes` : ARRAY<STRUCT<`field`: STRING, `change`: STRING, `reason`: STRING>>>') as `_airbyte_meta`,
378381
|${typeCastErrorsArray.replaceIndent(" ")} as `_airbyte_type_errors`
379382
|FROM
380383
| $unityCatalogName.${stream.id.rawTableId(QUOTE)}

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/operation/DatabricksStorageOperation.kt

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,18 @@ class DatabricksStorageOperation(
4242
workspaceClient.files().upload(stagedFile, data.inputStream)
4343
destinationHandler.execute(
4444
Sql.of(
45+
// schema inference sees _airbyte_generation_id as an int (int32),
46+
// which can't be loaded into a bigint (int64) column.
47+
// So we have to explicitly cast it to a bigint.
4548
"""
46-
COPY INTO `$database`.`${streamId.rawNamespace}`.`${streamId.rawName}`
47-
FROM '$stagedFile'
48-
FILEFORMAT = CSV
49-
FORMAT_OPTIONS ('header'='true', 'inferSchema'='true', 'escape'='"');
50-
""".trimIndent(),
49+
COPY INTO `$database`.`${streamId.rawNamespace}`.`${streamId.rawName}`
50+
FROM (
51+
SELECT _airbyte_generation_id :: bigint, * except (_airbyte_generation_id)
52+
FROM '$stagedFile'
53+
)
54+
FILEFORMAT = CSV
55+
FORMAT_OPTIONS ('header'='true', 'inferSchema'='true', 'escape'='"');
56+
""".trimIndent(),
5157
),
5258
)
5359
// Databricks recommends that partners delete files in the staging directory once the data

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/staging/DatabricksFileBufferFactory.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ object DatabricksFileBufferFactory {
3636
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
3737
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
3838
JavaBaseConstants.COLUMN_NAME_DATA,
39-
JavaBaseConstants.COLUMN_NAME_AB_META
39+
JavaBaseConstants.COLUMN_NAME_AB_META,
40+
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
4041
)
4142
}
4243

@@ -56,14 +57,14 @@ object DatabricksFileBufferFactory {
5657
formattedString: String,
5758
emittedAt: Long,
5859
formattedAirbyteMetaString: String,
59-
// TODO use this value
6060
generationId: Long,
6161
): List<Any> {
6262
return listOf(
6363
id,
6464
Instant.ofEpochMilli(emittedAt),
6565
formattedString,
66-
formattedAirbyteMetaString
66+
formattedAirbyteMetaString,
67+
generationId,
6768
)
6869
}
6970
}

airbyte-integrations/connectors/destination-databricks/src/test-integration/kotlin/io/airbyte/integrations/destination/databricks/typededupe/DatabricksSqlGeneratorIntegrationTest.kt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ class DatabricksSqlGeneratorIntegrationTest :
9292
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
9393
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
9494
JavaBaseConstants.COLUMN_NAME_DATA,
95-
JavaBaseConstants.COLUMN_NAME_AB_META
95+
JavaBaseConstants.COLUMN_NAME_AB_META,
96+
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
9697
)
9798
val tableIdentifier = streamId.rawTableId(DatabricksSqlGenerator.QUOTE)
9899
insertRecords(columnNames, tableIdentifier, records)
@@ -137,14 +138,19 @@ class DatabricksSqlGeneratorIntegrationTest :
137138
streamId: StreamId,
138139
suffix: String?,
139140
records: List<JsonNode>,
140-
// TODO
141141
generationId: Long,
142142
) {
143143
val columnNames =
144144
if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES
145145
val tableIdentifier =
146146
streamId.finalTableId(DatabricksSqlGenerator.QUOTE, suffix?.lowercase() ?: "")
147-
insertRecords(columnNames, tableIdentifier, records)
147+
insertRecords(
148+
columnNames,
149+
tableIdentifier,
150+
records.map {
151+
(it as ObjectNode).put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, generationId)
152+
},
153+
)
148154
}
149155

150156
private fun insertRecords(
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_meta": "{\"changes\":[]}", "id1": 1, "id2": 200, "old_cursor": 1, "name": "Alice", "address": "{\"city\":\"Los Angeles\",\"state\":\"CA\"}"}
2-
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_meta": "{\"changes\":[]}", "id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": "{\"city\":\"Boston\",\"state\":\"MA\"}"}
3-
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_meta": "{\"changes\":[]}", "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
1+
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_generation_id": 43, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "id1": 1, "id2": 200, "old_cursor": 1, "name": "Alice", "address": "{\"city\":\"Los Angeles\",\"state\":\"CA\"}"}
2+
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_generation_id": 43, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": "{\"city\":\"Boston\",\"state\":\"MA\"}"}
3+
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_generation_id": 43, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}

0 commit comments

Comments
 (0)