Skip to content

Commit 221b105

Browse files
committed
upgrade cdk; fix compiler errors
1 parent 355ac15 commit 221b105

File tree

8 files changed

+53
-28
lines changed

8 files changed

+53
-28
lines changed

airbyte-integrations/connectors/destination-databricks/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ plugins {
1818
}
1919

2020
airbyteJavaConnector {
21-
cdkVersionRequired = '0.35.6'
21+
cdkVersionRequired = '0.38.3'
2222
features = ['db-destinations', 's3-destinations', 'typing-deduping']
2323
useLocalCdk = false
2424
}

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/DatabricksDestination.kt

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer
1515
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager
1616
import io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer
1717
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
18-
import io.airbyte.cdk.integrations.util.addDefaultNamespaceToStreams
1918
import io.airbyte.integrations.base.destination.operation.DefaultFlush
2019
import io.airbyte.integrations.base.destination.operation.DefaultSyncOperation
2120
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser
21+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
2222
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
2323
import io.airbyte.integrations.destination.databricks.jdbc.DatabricksDestinationHandler
2424
import io.airbyte.integrations.destination.databricks.jdbc.DatabricksNamingTransformer
@@ -60,7 +60,7 @@ class DatabricksDestination : BaseConnector(), Destination {
6060
val datasource = DatabricksConnectorClientsFactory.createDataSource(connectorConfig)
6161
val jdbcDatabase = DefaultJdbcDatabase(datasource)
6262
val destinationHandler =
63-
DatabricksDestinationHandler(connectorConfig.database, jdbcDatabase)
63+
DatabricksDestinationHandler(sqlGenerator, connectorConfig.database, jdbcDatabase)
6464
val workspaceClient =
6565
DatabricksConnectorClientsFactory.createWorkspaceClient(
6666
connectorConfig.hostname,
@@ -85,6 +85,17 @@ class DatabricksDestination : BaseConnector(), Destination {
8585
dummyNamespace,
8686
dummyName
8787
)
88+
val streamConfig =
89+
StreamConfig(
90+
id = streamId,
91+
destinationSyncMode = DestinationSyncMode.OVERWRITE,
92+
primaryKey = listOf(),
93+
cursor = Optional.empty(),
94+
columns = linkedMapOf(),
95+
generationId = 0,
96+
minimumGenerationId = 0,
97+
syncId = 0
98+
)
8899

89100
try {
90101
storageOperations.prepareStage(streamId, DestinationSyncMode.OVERWRITE)
@@ -98,9 +109,14 @@ class DatabricksDestination : BaseConnector(), Destination {
98109
try {
99110
val writeBuffer = DatabricksFileBufferFactory.createBuffer(FileUploadFormat.CSV)
100111
writeBuffer.use {
101-
it.accept("{\"airbyte_check\":\"passed\"}", "{}", System.currentTimeMillis())
112+
it.accept(
113+
"{\"airbyte_check\":\"passed\"}",
114+
"{}",
115+
generationId = 0,
116+
System.currentTimeMillis()
117+
)
102118
it.flush()
103-
storageOperations.writeToStage(streamId, writeBuffer)
119+
storageOperations.writeToStage(streamConfig, writeBuffer)
104120
}
105121
} catch (e: Exception) {
106122
log.error(e) { "Failed to write to stage as part of CHECK" }
@@ -129,9 +145,6 @@ class DatabricksDestination : BaseConnector(), Destination {
129145

130146
// TODO: Deserialization should be taken care by connector runner framework later
131147
val connectorConfig = DatabricksConnectorConfig.deserialize(config)
132-
// TODO: This abomination continues to stay, this call should be implicit in ParsedCatalog
133-
// with defaultNamespace injected
134-
addDefaultNamespaceToStreams(catalog, connectorConfig.schema)
135148

136149
val sqlGenerator =
137150
DatabricksSqlGenerator(DatabricksNamingTransformer(), connectorConfig.database)
@@ -145,7 +158,7 @@ class DatabricksDestination : BaseConnector(), Destination {
145158
val datasource = DatabricksConnectorClientsFactory.createDataSource(connectorConfig)
146159
val jdbcDatabase = DefaultJdbcDatabase(datasource)
147160
val destinationHandler =
148-
DatabricksDestinationHandler(connectorConfig.database, jdbcDatabase)
161+
DatabricksDestinationHandler(sqlGenerator, connectorConfig.database, jdbcDatabase)
149162

150163
// Minimum surface area for AsyncConsumer's lifecycle functions to call.
151164
val storageOperations =
@@ -179,9 +192,9 @@ class DatabricksDestination : BaseConnector(), Destination {
179192
catalog = catalog,
180193
bufferManager =
181194
BufferManager(
195+
defaultNamespace = connectorConfig.schema,
182196
(Runtime.getRuntime().maxMemory() * BufferManager.MEMORY_LIMIT_RATIO).toLong(),
183197
),
184-
defaultNamespace = Optional.of(connectorConfig.schema),
185198
airbyteMessageDeserializer = AirbyteMessageDeserializer(),
186199
)
187200
}

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/jdbc/DatabricksDestinationHandler.kt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import java.util.*
2929
import kotlin.streams.asSequence
3030

3131
class DatabricksDestinationHandler(
32+
private val sqlGenerator: DatabricksSqlGenerator,
3233
private val databaseName: String,
3334
private val jdbcDatabase: JdbcDatabase,
3435
) : DestinationHandler<MinimumDestinationState.Impl> {
@@ -111,6 +112,13 @@ class DatabricksDestinationHandler(
111112
.toList()
112113
}
113114

115+
override fun createNamespaces(schemas: Set<String>) {
116+
for (schema in schemas) {
117+
// TODO: Optimize by running SHOW SCHEMAS; rather than CREATE SCHEMA if not exists
118+
execute(sqlGenerator.createSchema(schema))
119+
}
120+
}
121+
114122
private fun findExistingTable(
115123
streamIds: List<StreamId>
116124
): Map<String, LinkedHashMap<String, TableDefinition>> {
@@ -182,7 +190,7 @@ class DatabricksDestinationHandler(
182190
tableDefinition.columns.entries
183191
.filter { (it.key != abRawId && it.key != abExtractedAt && it.key != abMeta) }
184192
.associate {
185-
it.key!! to if (it.value.type != "DECIMAL") it.value.type else "DECIMAL(38, 10)"
193+
it.key to if (it.value.type != "DECIMAL") it.value.type else "DECIMAL(38, 10)"
186194
}
187195
return actualColumns == expectedColumns
188196
}
@@ -221,7 +229,7 @@ class DatabricksDestinationHandler(
221229
// Handle resultset call in the function which will be closed
222230
// after the scope is exited
223231
val resultSet =
224-
metadata?.getTables(
232+
metadata.getTables(
225233
databaseName,
226234
id.rawNamespace,
227235
id.rawName,

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/operation/DatabricksStorageOperation.kt

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ class DatabricksStorageOperation(
3636
// Hoist them to SqlGenerator interface in CDK, until then using concrete instance.
3737
private val databricksSqlGenerator = sqlGenerator as DatabricksSqlGenerator
3838

39-
override fun writeToStage(streamId: StreamId, data: SerializableBuffer) {
39+
override fun writeToStage(streamConfig: StreamConfig, data: SerializableBuffer) {
40+
val streamId = streamConfig.id
4041
val stagedFile = "${stagingDirectory(streamId, database)}/${data.filename}"
4142
workspaceClient.files().upload(stagedFile, data.inputStream)
4243
destinationHandler.execute(
@@ -122,12 +123,6 @@ class DatabricksStorageOperation(
122123
}
123124
}
124125

125-
override fun createFinalNamespace(streamId: StreamId) {
126-
val finalSchema = streamId.finalNamespace
127-
// TODO: Optimize by running SHOW SCHEMAS; rather than CREATE SCHEMA if not exists
128-
destinationHandler.execute(sqlGenerator.createSchema(finalSchema))
129-
}
130-
131126
override fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean) {
132127
// The table doesn't exist. Create it. Don't force.
133128
destinationHandler.execute(

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/operation/DatabricksStreamOperation.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ class DatabricksStreamOperation(
2727
private val storageOperation: StorageOperation<SerializableBuffer>,
2828
destinationInitialStatus: DestinationInitialStatus<MinimumDestinationState.Impl>,
2929
private val fileUploadFormat: FileUploadFormat,
30+
disableTypeDedupe: Boolean,
3031
) :
3132
AbstractStreamOperation<MinimumDestinationState.Impl, SerializableBuffer>(
3233
storageOperation,
3334
destinationInitialStatus,
35+
disableTypeDedupe = disableTypeDedupe
3436
) {
3537
private val log = KotlinLogging.logger {}
3638
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
@@ -40,6 +42,7 @@ class DatabricksStreamOperation(
4042
it.accept(
4143
record.serialized!!,
4244
Jsons.serialize(record.record!!.meta),
45+
streamConfig.generationId,
4346
record.record!!.emittedAt,
4447
)
4548
}
@@ -51,7 +54,7 @@ class DatabricksStreamOperation(
5154
)
5255
}) to staging"
5356
}
54-
storageOperation.writeToStage(streamConfig.id, writeBuffer)
57+
storageOperation.writeToStage(streamConfig, writeBuffer)
5558
}
5659
}
5760
}

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/operation/DatabricksStreamOperationFactory.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ import io.airbyte.integrations.base.destination.typing_deduping.migrators.Minimu
1313
class DatabricksStreamOperationFactory(private val storageOperations: DatabricksStorageOperation) :
1414
StreamOperationFactory<MinimumDestinationState.Impl> {
1515
override fun createInstance(
16-
destinationInitialStatus: DestinationInitialStatus<MinimumDestinationState.Impl>
16+
destinationInitialStatus: DestinationInitialStatus<MinimumDestinationState.Impl>,
17+
disableTypeDedupe: Boolean,
1718
): StreamOperation<MinimumDestinationState.Impl> {
1819
return DatabricksStreamOperation(
1920
storageOperations,
2021
destinationInitialStatus,
21-
FileUploadFormat.CSV
22+
FileUploadFormat.CSV,
23+
disableTypeDedupe,
2224
)
2325
}
2426
}

airbyte-integrations/connectors/destination-databricks/src/main/kotlin/io/airbyte/integrations/destination/databricks/staging/DatabricksFileBufferFactory.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ object DatabricksFileBufferFactory {
5555
id: UUID,
5656
formattedString: String,
5757
emittedAt: Long,
58-
formattedAirbyteMetaString: String
58+
formattedAirbyteMetaString: String,
59+
// TODO use this value
60+
generationId: Long,
5961
): List<Any> {
6062
return listOf(
6163
id,

airbyte-integrations/connectors/destination-databricks/src/test-integration/kotlin/io/airbyte/integrations/destination/databricks/typededupe/DatabricksSqlGeneratorIntegrationTest.kt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,23 +61,23 @@ class DatabricksSqlGeneratorIntegrationTest :
6161
}
6262
}
6363

64+
override val sqlGenerator: SqlGenerator
65+
get() = DatabricksSqlGenerator(DatabricksNamingTransformer(), connectorConfig.database)
66+
private val databricksSqlGenerator = sqlGenerator as DatabricksSqlGenerator
6467
override val destinationHandler: DestinationHandler<MinimumDestinationState.Impl>
6568
get() =
6669
DatabricksDestinationHandler(
70+
databricksSqlGenerator,
6771
connectorConfig.database,
6872
jdbcDatabase,
6973
)
70-
override val sqlGenerator: SqlGenerator
71-
get() = DatabricksSqlGenerator(DatabricksNamingTransformer(), connectorConfig.database)
7274
override val supportsSafeCast: Boolean
7375
get() = true
7476

7577
override fun createNamespace(namespace: String) {
7678
destinationHandler.execute(sqlGenerator.createSchema(namespace))
7779
}
7880

79-
private val databricksSqlGenerator = sqlGenerator as DatabricksSqlGenerator
80-
8181
override fun createRawTable(streamId: StreamId) {
8282
destinationHandler.execute(databricksSqlGenerator.createRawTable(streamId))
8383
}
@@ -136,7 +136,9 @@ class DatabricksSqlGeneratorIntegrationTest :
136136
includeCdcDeletedAt: Boolean,
137137
streamId: StreamId,
138138
suffix: String?,
139-
records: List<JsonNode>
139+
records: List<JsonNode>,
140+
// TODO
141+
generationId: Long,
140142
) {
141143
val columnNames =
142144
if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES

0 commit comments

Comments
 (0)