Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.44.16'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

//remove once upgrading the CDK version to 0.4.x or later
Expand All @@ -43,8 +43,9 @@ application {

dependencies {
// Databricks JDBC driver
implementation 'com.databricks:databricks-jdbc:2.6.36'
implementation 'com.databricks:databricks-sdk-java:0.23.0'
// We're using the OSS version https://docs.gcp.databricks.com/en/integrations/jdbc/oss.html
// which has support for GCP databricks instances.
implementation 'com.databricks:databricks-jdbc:0.9.2-oss'
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,25 @@ object DatabricksConnectorClientsFactory {
// EnableArrow=0 flag is undocumented and disables ArrowBuf when reading data
// Destinations only reads data for metadata or for comparison of actual data in tests. so
// we don't need it to be optimized.
val jdbcUrl =
"jdbc:databricks://${config.hostname}:${config.port}/${config.database};transportMode=http;httpPath=${config.httpPath};EnableArrow=0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gisripa do you remember what the /${config.database} thing is supposed to do? It sounds like we're passing in the database (i.e. unity catalog name?)

but the oss driver's docs sound like this is actually supposed to be the schema, which sounds super weird. I.e. their url format is jdbc:databricks://<server-hostname>:<port>/<schema>;

(I'm seeing some weird test failures with ConcurrentModificationException and am very confused about how that could be happening, no idea if this is related)

datasource.host = config.hostname
datasource.port = config.port
datasource.httpPath = config.httpPath
datasource.properties["catalog"] = config.database
datasource.properties["schema"] = config.schema
datasource.properties["EnableArrow"] = 0
// TODO this is supposed to be the default???
datasource.properties["statement_timeout"] = 172800
when (config.authentication) {
is BasicAuthentication -> {
datasource.setURL(
"$jdbcUrl;AuthMech=3;UID=token;PWD=${config.authentication.personalAccessToken}"
)
datasource.properties["AuthMech"] = 3
datasource.username = "token"
datasource.password = config.authentication.personalAccessToken
}
is OAuth2Authentication -> {
datasource.setURL(
"$jdbcUrl;AuthMech=11;Auth_Flow=1;OAuth2ClientId=${config.authentication.clientId};OAuth2Secret=${config.authentication.secret}"
)
datasource.properties["AuthMech"] = 11
datasource.properties["Auth_Flow"] = 1
datasource.properties["OAuth2ClientId"] = config.authentication.clientId
datasource.properties["OAuth2Secret"] = config.authentication.secret
}
}
return datasource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,17 +238,14 @@ class DatabricksDestinationHandler(
.bufferedResultSetQuery(
{ connection: Connection -> connection.createStatement().executeQuery(query) },
{ resultSet: ResultSet ->
resultSet.getObject("last_loaded_at", LocalDateTime::class.java)
resultSet.getString("last_loaded_at")?.let {
Instant.parse(it)
}
},
)
.stream()
.filter { ts: LocalDateTime? -> Objects.nonNull(ts) }
.filter { ts: Instant? -> Objects.nonNull(ts) }
.findFirst()
.map {
// Databricks doesn't have offset stored, so we always use UTC as the supposed
// timezone
it.toInstant(ZoneOffset.UTC)
}
}

private fun getInitialRawTableState(id: StreamId, suffix: String): InitialRawTableStatus {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000", "_airbyte_generation_id": 42, "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_meta": {"changes": []}}
{"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_meta": {"changes": []}}
{"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_meta": {"sync_id": 42, "changes": [{"field": "string", "change": "NULLED", "reason": "SOURCE_SERIALIZATION_ERROR"}]}}
{"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000Z", "_airbyte_generation_id": 42, "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56.000000Z", "timestamp_without_timezone": "2023-01-23T12:34:56.000", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_meta": {"changes": []}}
{"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000Z", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_meta": {"changes": []}}
{"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000Z", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_meta": {"sync_id": 42, "changes": [{"field": "string", "change": "NULLED", "reason": "SOURCE_SERIALIZATION_ERROR"}]}}
// In databricks we don't care about struct/array as they are stored as strings and can be safely queried using jsonpath, time&timetz are non-existent and stored as strings too
{"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000", "_airbyte_generation_id": 42, "array":{}, "struct": [], "time_with_timezone": "{}", "time_without_timezone": "{}", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_meta": {"changes":[{"field":"number","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"integer","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"boolean","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"timestamp_with_timezone","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"timestamp_without_timezone","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"date","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"}]}}
{"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000Z", "_airbyte_generation_id": 42, "array":{}, "struct": [], "time_with_timezone": "{}", "time_without_timezone": "{}", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_meta": {"changes":[{"field":"number","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"integer","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"boolean","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"timestamp_with_timezone","change":"NULLED","reason.000000Z":"DESTINATION_TYPECAST_ERROR"},{"field":"timestamp_without_timezone","change":"NULLED","reason.000":"DESTINATION_TYPECAST_ERROR"},{"field":"date","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"}]}}
// Note that for numbers where we parse the value to JSON (struct, array, unknown) we lose precision.
// But for numbers where we create a NUMBER column, we do not lose precision (see the `number` column).
{"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000", "_airbyte_generation_id": 42, "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_meta": {"changes": []}}
{"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000Z", "_airbyte_generation_id": 42, "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_meta": {"changes": []}}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}}}
{"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}}
{"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}, "_airbyte_meta": {"sync_id": 42, "changes": [{"field": "string", "change": "NULLED", "reason": "SOURCE_SERIALIZATION_ERROR"}]}}
{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": null, "number": "foo", "integer": "bar", "boolean": "fizz", "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": "airbyte", "unknown": null}}
{"_airbyte_raw_id": "a4a783b5-7729-4d0b-b659-48ceb08713f1", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118}}
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}}}
{"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}}
{"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}, "_airbyte_meta": {"sync_id": 42, "changes": [{"field": "string", "change": "NULLED", "reason": "SOURCE_SERIALIZATION_ERROR"}]}}
{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": null, "number": "foo", "integer": "bar", "boolean": "fizz", "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": "airbyte", "unknown": null}}
{"_airbyte_raw_id": "a4a783b5-7729-4d0b-b659-48ceb08713f1", "_airbyte_generation_id": 42, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118}}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_meta": {"changes": []}, "id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00.000000", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}
{"_airbyte_raw_id": "b9ac9f01-abc1-4e7c-89e5-eac9223d5726", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_meta": {"changes": [{"field":"integer","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"}]}, "id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:01", "string": "Bob"}
{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_meta": {"changes": []}, "id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00.000000Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}
{"_airbyte_raw_id": "b9ac9f01-abc1-4e7c-89e5-eac9223d5726", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_meta": {"changes": [{"field":"integer","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"}]}, "id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:01.000000Z", "string": "Bob"}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"_airbyte_raw_id": "d7b81af0-01da-4846-a650-cc398986bc99", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}}
{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}}
{"_airbyte_raw_id": "ad690bfb-c2c2-4172-bd73-a16c86ccbb67", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": 126}}
{"_airbyte_raw_id": "b9ac9f01-abc1-4e7c-89e5-eac9223d5726", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:01Z", "string": "Bob", "integer": "oops"}}
{"_airbyte_raw_id": "d7b81af0-01da-4846-a650-cc398986bc99", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}}
{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}}
{"_airbyte_raw_id": "ad690bfb-c2c2-4172-bd73-a16c86ccbb67", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": 126}}
{"_airbyte_raw_id": "b9ac9f01-abc1-4e7c-89e5-eac9223d5726", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:01Z", "string": "Bob", "integer": "oops"}}
Loading