From 7ca029f78e6119ef5bba716ff9dbc8bc9601188b Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Thu, 22 Oct 2020 15:48:44 -0400 Subject: [PATCH 01/10] add odbc connection type with all-purpose/virtual cluster support --- .gitignore | 1 + dbt/adapters/spark/connections.py | 66 +++++++++++++++++++++++++++++-- requirements.txt | 1 + setup.py | 1 + 4 files changed, 65 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 85c98e1ca..d6f5c9d02 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ dist/ dbt-integration-tests test/integration/.user.yml .DS_Store +.vscode diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index 3ec6abf4a..3d8aeefea 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -10,6 +10,7 @@ from TCLIService.ttypes import TOperationState as ThriftState from thrift.transport import THttpClient from pyhive import hive +import pyodbc from datetime import datetime from hologram.helpers import StrEnum @@ -25,6 +26,12 @@ class SparkConnectionMethod(StrEnum): THRIFT = 'thrift' HTTP = 'http' + ODBC = 'odbc' + + +class SparkClusterType(StrEnum): + ALL_PURPOSE = "all-purpose" + VIRTUAL = "virtual" @dataclass @@ -33,6 +40,8 @@ class SparkCredentials(Credentials): method: SparkConnectionMethod schema: str database: Optional[str] + driver: Optional[str] = None + cluster_type: Optional[SparkClusterType] = SparkClusterType.ALL_PURPOSE cluster: Optional[str] = None token: Optional[str] = None user: Optional[str] = None @@ -62,10 +71,10 @@ def type(self): return 'spark' def _connection_keys(self): - return 'host', 'port', 'cluster', 'schema', 'organization' + return 'host', 'port', 'cluster', 'cluster_type', 'schema', 'organization' -class ConnectionWrapper(object): +class PyhiveConnectionWrapper(object): """Wrap a Spark connection in a way that no-ops transactions""" # https://forums.databricks.com/questions/2157/in-apache-spark-sql-can-we-roll-back-the-transacti.html # noqa @@ -177,11 +186,27 @@ def description(self): return self._cursor.description +class PyodbcConnectionWrapper(PyhiveConnectionWrapper): + + def execute(self, sql, bindings=None): + if sql.strip().endswith(";"): + sql = sql.strip()[:-1] + + # pyodbc does not handle a None type binding! + if bindings is None: + self._cursor.execute(sql) + else: + + self._cursor.execute(sql, bindings) + + class SparkConnectionManager(SQLConnectionManager): TYPE = 'spark' + SPARK_CLUSTER_HTTP_PATH = "sql/protocolv1/o/{organization}/{cluster}" + SPARK_VIRTUAL_CLUSTER_HTTP_PATH = "/sql/1.0/endpoints/{cluster}" SPARK_CONNECTION_URL = ( - "https://{host}:{port}/sql/protocolv1/o/{organization}/{cluster}" + "https://{host}:{port}/" + SPARK_CLUSTER_HTTP_PATH ) @contextmanager @@ -265,6 +290,7 @@ def open(cls, connection): }) conn = hive.connect(thrift_transport=transport) + handle = PyhiveConnectionWrapper(conn) elif creds.method == 'thrift': cls.validate_creds(creds, ['host', 'port', 'user', 'schema']) @@ -274,6 +300,39 @@ def open(cls, connection): username=creds.user, auth=creds.auth, kerberos_service_name=creds.kerberos_service_name) # noqa + handle = PyhiveConnectionWrapper(conn) + elif creds.method == 'odbc': + required_fields = ['driver', 'host', 'port', 'token', + 'organization', 'cluster', 'cluster_type'] # noqa + cls.validate_creds(creds, required_fields) + + http_path = None + + if creds.cluster_type == SparkClusterType.ALL_PURPOSE: + http_path = cls.SPARK_CLUSTER_HTTP_PATH.format( + organization=creds.organization, + cluster=creds.cluster + ) + elif creds.cluster_type == SparkClusterType.VIRTUAL: + http_path = cls.SPARK_VIRTUAL_CLUSTER_HTTP_PATH.format( + cluster=creds.cluster + ) + + connection_params = [] + connection_params.append(f"DRIVER={creds.driver}") + connection_params.append(f"Host={creds.host}") + connection_params.append(f"PORT={creds.port}") + connection_params.append("UID=token") + connection_params.append(f"PWD={creds.token}") + connection_params.append(f"HTTPPath={http_path}") + connection_params.append("AuthMech=3") + connection_params.append("ThriftTransport=2") + connection_params.append("SSL=1") + + connection_str = ";".join(connection_params) + + conn = pyodbc.connect(connection_str, autocommit=True) + handle = PyodbcConnectionWrapper(conn) else: raise dbt.exceptions.DbtProfileError( f"invalid credential method: {creds.method}" @@ -304,7 +363,6 @@ def open(cls, connection): else: raise exc - handle = ConnectionWrapper(conn) connection.handle = handle connection.state = ConnectionState.OPEN return connection diff --git a/requirements.txt b/requirements.txt index b1d6e5c2c..c3770d21c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ dbt-core==0.18.0 PyHive[hive]>=0.6.0,<0.7.0 thrift>=0.11.0,<0.12.0 +pyodbc>=4.0.30 diff --git a/setup.py b/setup.py index 5738c57ab..a6ef60f47 100644 --- a/setup.py +++ b/setup.py @@ -63,5 +63,6 @@ def _dbt_spark_version(): f'dbt-core=={dbt_version}', 'PyHive[hive]>=0.6.0,<0.7.0', 'thrift>=0.11.0,<0.12.0', + 'pyodbc>=4.0.30', ] ) From 25bfc61a3eaf5e6cabe3d51db5de235b741cd06b Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Mon, 26 Oct 2020 13:05:20 -0400 Subject: [PATCH 02/10] fix SQL parameter style for pyodbc --- dbt/adapters/spark/connections.py | 46 +++++++++++++------ requirements.txt | 3 +- setup.py | 3 +- .../integration/spark-databricks-odbc.dbtspec | 35 ++++++++++++++ tox.ini | 10 ++++ 5 files changed, 80 insertions(+), 17 deletions(-) create mode 100644 test/integration/spark-databricks-odbc.dbtspec diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index 3d8aeefea..fc9751a5c 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -6,12 +6,14 @@ from dbt.contracts.connection import ConnectionState from dbt.logger import GLOBAL_LOGGER as logger from dbt.utils import DECIMALS +from dbt.adapters.spark import __version__ from TCLIService.ttypes import TOperationState as ThriftState from thrift.transport import THttpClient from pyhive import hive import pyodbc from datetime import datetime +import sqlparams from hologram.helpers import StrEnum from dataclasses import dataclass @@ -23,6 +25,10 @@ NUMBERS = DECIMALS + (int, float) +def _build_odbc_connnection_string(**kwargs) -> str: + return ";".join([f"{k}={v}" for k, v in kwargs.items()]) + + class SparkConnectionMethod(StrEnum): THRIFT = 'thrift' HTTP = 'http' @@ -71,7 +77,8 @@ def type(self): return 'spark' def _connection_keys(self): - return 'host', 'port', 'cluster', 'cluster_type', 'schema', 'organization' + return ('host', 'port', 'cluster', + 'cluster_type', 'schema', 'organization') class PyhiveConnectionWrapper(object): @@ -192,12 +199,14 @@ def execute(self, sql, bindings=None): if sql.strip().endswith(";"): sql = sql.strip()[:-1] + query = sqlparams.SQLParams('format', 'qmark') # pyodbc does not handle a None type binding! if bindings is None: + sql, bindings = query.format(sql, []) self._cursor.execute(sql) else: - - self._cursor.execute(sql, bindings) + sql, bindings = query.format(sql, bindings) + self._cursor.execute(sql, *bindings) class SparkConnectionManager(SQLConnectionManager): @@ -317,19 +326,26 @@ def open(cls, connection): http_path = cls.SPARK_VIRTUAL_CLUSTER_HTTP_PATH.format( cluster=creds.cluster ) + else: + raise dbt.exceptions.DbtProfileError( + f"invalid custer type: {creds.cluster_type}" + ) - connection_params = [] - connection_params.append(f"DRIVER={creds.driver}") - connection_params.append(f"Host={creds.host}") - connection_params.append(f"PORT={creds.port}") - connection_params.append("UID=token") - connection_params.append(f"PWD={creds.token}") - connection_params.append(f"HTTPPath={http_path}") - connection_params.append("AuthMech=3") - connection_params.append("ThriftTransport=2") - connection_params.append("SSL=1") - - connection_str = ";".join(connection_params) + dbt_spark_version = __version__.version + user_agent_entry = f"fishtown-analytics-dbt-spark/{dbt_spark_version} (Databricks)" # noqa + + connection_str = _build_odbc_connnection_string( + DRIVER=creds.driver, + HOST=creds.host, + PORT=creds.port, + UID="token", + PWD=creds.token, + HTTPPath=http_path, + AuthMech=3, + ThriftTransport=2, + SSL=1, + UserAgentEntry=user_agent_entry, + ) conn = pyodbc.connect(connection_str, autocommit=True) handle = PyodbcConnectionWrapper(conn) diff --git a/requirements.txt b/requirements.txt index c3770d21c..f60496bc7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ dbt-core==0.18.0 PyHive[hive]>=0.6.0,<0.7.0 -thrift>=0.11.0,<0.12.0 pyodbc>=4.0.30 +sqlparams>=3.0.0 +thrift>=0.11.0,<0.12.0 diff --git a/setup.py b/setup.py index a6ef60f47..b4104baa4 100644 --- a/setup.py +++ b/setup.py @@ -62,7 +62,8 @@ def _dbt_spark_version(): install_requires=[ f'dbt-core=={dbt_version}', 'PyHive[hive]>=0.6.0,<0.7.0', - 'thrift>=0.11.0,<0.12.0', 'pyodbc>=4.0.30', + 'sqlparams>=3.0.0', + 'thrift>=0.11.0,<0.12.0' ] ) diff --git a/test/integration/spark-databricks-odbc.dbtspec b/test/integration/spark-databricks-odbc.dbtspec new file mode 100644 index 000000000..df77dc3f7 --- /dev/null +++ b/test/integration/spark-databricks-odbc.dbtspec @@ -0,0 +1,35 @@ +target: + type: spark + host: "{{ env_var('DBT_DATABRICKS_HOST_NAME') }}" + cluster: "{{ env_var('DBT_DATABRICKS_CLUSTER_NAME') }}" + token: "{{ env_var('DBT_DATABRICKS_TOKEN') }}" + method: odbc + driver: /Library/simba/spark/lib/libsparkodbc_sbu.dylib + port: 443 + schema: "analytics_{{ var('_dbt_random_suffix') }}" + connect_retries: 5 + connect_timeout: 60 +projects: + - overrides: incremental + paths: + "models/incremental.sql": + materialized: incremental + body: "select * from {{ source('raw', 'seed') }}" + facts: + base: + rowcount: 10 + extended: + rowcount: 20 + - overrides: snapshot_strategy_check_cols + dbt_project_yml: &file_format_delta + # we're going to UPDATE the seed tables as part of testing, so we must make them delta format + seeds: + dbt_test_project: + file_format: delta + snapshots: + dbt_test_project: + file_format: delta + - overrides: snapshot_strategy_timestamp + dbt_project_yml: *file_format_delta +sequences: + test_dbt_incremental: incremental diff --git a/tox.ini b/tox.ini index c37457074..f631d060e 100644 --- a/tox.ini +++ b/tox.ini @@ -27,6 +27,16 @@ deps = -r{toxinidir}/dev_requirements.txt -e. +[testenv:integration-spark-databricks-odbc] +basepython = python3 +commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-databricks-odbc.dbtspec' +passenv = DBT_DATABRICKS_HOST_NAME DBT_DATABRICKS_CLUSTER_NAME DBT_DATABRICKS_TOKEN DBT_INVOCATION_ENV +deps = + -r{toxinidir}/requirements.txt + -r{toxinidir}/dev_requirements.txt + -e. + + [testenv:integration-spark-thrift] basepython = python3 commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark.dbtspec' From 9366d2e4b978cbf146dc73c58c98dded97594f70 Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Thu, 29 Oct 2020 11:42:19 -0400 Subject: [PATCH 03/10] testing --- test/integration/spark-databricks-odbc.dbtspec | 2 +- tox.ini | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/spark-databricks-odbc.dbtspec b/test/integration/spark-databricks-odbc.dbtspec index df77dc3f7..abed9e7b0 100644 --- a/test/integration/spark-databricks-odbc.dbtspec +++ b/test/integration/spark-databricks-odbc.dbtspec @@ -4,7 +4,7 @@ target: cluster: "{{ env_var('DBT_DATABRICKS_CLUSTER_NAME') }}" token: "{{ env_var('DBT_DATABRICKS_TOKEN') }}" method: odbc - driver: /Library/simba/spark/lib/libsparkodbc_sbu.dylib + driver: "{{ env_var('ODBC_DRIVER') }}" port: 443 schema: "analytics_{{ var('_dbt_random_suffix') }}" connect_retries: 5 diff --git a/tox.ini b/tox.ini index f631d060e..06735a686 100644 --- a/tox.ini +++ b/tox.ini @@ -30,7 +30,7 @@ deps = [testenv:integration-spark-databricks-odbc] basepython = python3 commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-databricks-odbc.dbtspec' -passenv = DBT_DATABRICKS_HOST_NAME DBT_DATABRICKS_CLUSTER_NAME DBT_DATABRICKS_TOKEN DBT_INVOCATION_ENV +passenv = DBT_DATABRICKS_HOST_NAME DBT_DATABRICKS_CLUSTER_NAME DBT_DATABRICKS_TOKEN DBT_INVOCATION_ENV ODBC_DRIVER deps = -r{toxinidir}/requirements.txt -r{toxinidir}/dev_requirements.txt From c2dc0fd43a6c6d50b35879cb41f0ff8f89127208 Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Thu, 29 Oct 2020 12:07:27 -0400 Subject: [PATCH 04/10] update credentials schema and integration tests --- .circleci/config.yml | 19 ++++++- dbt/adapters/spark/connections.py | 51 ++++++++++--------- dev_requirements.txt | 2 +- ....dbtspec => spark-databricks-http.dbtspec} | 2 +- ... => spark-databricks-odbc-cluster.dbtspec} | 10 +++- ...spark-databricks-odbc-sql-endpoint.dbtspec | 44 ++++++++++++++++ .../{spark.dbtspec => spark-thrift.dbtspec} | 2 +- tox.ini | 19 +++++-- 8 files changed, 115 insertions(+), 34 deletions(-) rename test/integration/{spark-databricks.dbtspec => spark-databricks-http.dbtspec} (98%) rename test/integration/{spark-databricks-odbc.dbtspec => spark-databricks-odbc-cluster.dbtspec} (74%) create mode 100644 test/integration/spark-databricks-odbc-sql-endpoint.dbtspec rename test/integration/{spark.dbtspec => spark-thrift.dbtspec} (98%) diff --git a/.circleci/config.yml b/.circleci/config.yml index 200c8e0ff..fe73dae88 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -55,11 +55,25 @@ jobs: - checkout - run: name: Run integration tests - command: tox -e integration-spark-databricks + command: tox -e integration-spark-databricks-http no_output_timeout: 1h - store_artifacts: path: ./logs + # integration-spark-databricks-odbc: + # environment: + # DBT_INVOCATION_ENV: circle + # docker: + # - image: kwigley/spark-test-container:1 + # steps: + # - checkout + # - run: + # name: Run integration tests + # command: ODBC_DRIVER=Simba tox -e integration-spark-databricks-odbc-cluster,integration-spark-databricks-odbc-sql-endpoint + # no_output_timeout: 1h + # - store_artifacts: + # path: ./logs + workflows: version: 2 test-everything: @@ -71,3 +85,6 @@ workflows: - integration-spark-databricks: requires: - unit + # - integration-spark-databricks-odbc: + # requires: + # - unit diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index fc9751a5c..bd0c44622 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -35,11 +35,6 @@ class SparkConnectionMethod(StrEnum): ODBC = 'odbc' -class SparkClusterType(StrEnum): - ALL_PURPOSE = "all-purpose" - VIRTUAL = "virtual" - - @dataclass class SparkCredentials(Credentials): host: str @@ -47,8 +42,8 @@ class SparkCredentials(Credentials): schema: str database: Optional[str] driver: Optional[str] = None - cluster_type: Optional[SparkClusterType] = SparkClusterType.ALL_PURPOSE cluster: Optional[str] = None + endpoint: Optional[str] = None token: Optional[str] = None user: Optional[str] = None port: int = 443 @@ -78,7 +73,7 @@ def type(self): def _connection_keys(self): return ('host', 'port', 'cluster', - 'cluster_type', 'schema', 'organization') + 'endpoint', 'schema', 'organization') class PyhiveConnectionWrapper(object): @@ -198,13 +193,12 @@ class PyodbcConnectionWrapper(PyhiveConnectionWrapper): def execute(self, sql, bindings=None): if sql.strip().endswith(";"): sql = sql.strip()[:-1] - - query = sqlparams.SQLParams('format', 'qmark') # pyodbc does not handle a None type binding! if bindings is None: - sql, bindings = query.format(sql, []) self._cursor.execute(sql) else: + # pyodbc only supports `qmark` sql params! + query = sqlparams.SQLParams('format', 'qmark') sql, bindings = query.format(sql, bindings) self._cursor.execute(sql, *bindings) @@ -213,7 +207,7 @@ class SparkConnectionManager(SQLConnectionManager): TYPE = 'spark' SPARK_CLUSTER_HTTP_PATH = "sql/protocolv1/o/{organization}/{cluster}" - SPARK_VIRTUAL_CLUSTER_HTTP_PATH = "/sql/1.0/endpoints/{cluster}" + SPARK_SQL_ENDPOINT_HTTP_PATH = "/sql/1.0/endpoints/{endpoint}" SPARK_CONNECTION_URL = ( "https://{host}:{port}/" + SPARK_CLUSTER_HTTP_PATH ) @@ -277,7 +271,7 @@ def open(cls, connection): for i in range(1 + creds.connect_retries): try: - if creds.method == 'http': + if creds.method == SparkConnectionMethod.HTTP: cls.validate_creds(creds, ['token', 'host', 'port', 'cluster', 'organization']) @@ -300,7 +294,7 @@ def open(cls, connection): conn = hive.connect(thrift_transport=transport) handle = PyhiveConnectionWrapper(conn) - elif creds.method == 'thrift': + elif creds.method == SparkConnectionMethod.THRIFT: cls.validate_creds(creds, ['host', 'port', 'user', 'schema']) @@ -310,30 +304,38 @@ def open(cls, connection): auth=creds.auth, kerberos_service_name=creds.kerberos_service_name) # noqa handle = PyhiveConnectionWrapper(conn) - elif creds.method == 'odbc': - required_fields = ['driver', 'host', 'port', 'token', - 'organization', 'cluster', 'cluster_type'] # noqa - cls.validate_creds(creds, required_fields) - + elif creds.method == SparkConnectionMethod.ODBC: http_path = None - - if creds.cluster_type == SparkClusterType.ALL_PURPOSE: + if creds.cluster and creds.endpoint: + raise dbt.exceptions.DbtProfileError( + "`cluster` and `endpoint` cannot both be set when" + " using the odbc method to connect to Spark" + ) + elif creds.cluster is not None: + required_fields = ['driver', 'host', 'port', 'token', + 'organization', 'cluster'] http_path = cls.SPARK_CLUSTER_HTTP_PATH.format( organization=creds.organization, cluster=creds.cluster ) - elif creds.cluster_type == SparkClusterType.VIRTUAL: - http_path = cls.SPARK_VIRTUAL_CLUSTER_HTTP_PATH.format( - cluster=creds.cluster + elif creds.endpoint is not None: + required_fields = ['driver', 'host', 'port', 'token', + 'endpoint'] + http_path = cls.SPARK_SQL_ENDPOINT_HTTP_PATH.format( + endpoint=creds.endpoint ) else: raise dbt.exceptions.DbtProfileError( - f"invalid custer type: {creds.cluster_type}" + "Either `cluster` or `endpoint` must set when" + " using the odbc method to connect to Spark" ) + cls.validate_creds(creds, required_fields) + dbt_spark_version = __version__.version user_agent_entry = f"fishtown-analytics-dbt-spark/{dbt_spark_version} (Databricks)" # noqa + # https://www.simba.com/products/Spark/doc/v2/ODBC_InstallGuide/unix/content/odbc/options/driver.htm connection_str = _build_odbc_connnection_string( DRIVER=creds.driver, HOST=creds.host, @@ -342,6 +344,7 @@ def open(cls, connection): PWD=creds.token, HTTPPath=http_path, AuthMech=3, + SparkServerType=3, ThriftTransport=2, SSL=1, UserAgentEntry=user_agent_entry, diff --git a/dev_requirements.txt b/dev_requirements.txt index 1eadc8f50..c657c54fa 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -10,6 +10,6 @@ pytest-xdist>=2.1.0,<3 flaky>=3.5.3,<4 # Test requirements -pytest-dbt-adapter==0.2.0 +pytest-dbt-adapter==0.3.0 sasl==0.2.1 thrift_sasl==0.4.1 diff --git a/test/integration/spark-databricks.dbtspec b/test/integration/spark-databricks-http.dbtspec similarity index 98% rename from test/integration/spark-databricks.dbtspec rename to test/integration/spark-databricks-http.dbtspec index 72c36df7c..c20e4242b 100644 --- a/test/integration/spark-databricks.dbtspec +++ b/test/integration/spark-databricks-http.dbtspec @@ -17,7 +17,7 @@ projects: facts: base: rowcount: 10 - extended: + added: rowcount: 20 - overrides: snapshot_strategy_check_cols dbt_project_yml: &file_format_delta diff --git a/test/integration/spark-databricks-odbc.dbtspec b/test/integration/spark-databricks-odbc-cluster.dbtspec similarity index 74% rename from test/integration/spark-databricks-odbc.dbtspec rename to test/integration/spark-databricks-odbc-cluster.dbtspec index abed9e7b0..8dc4975ea 100644 --- a/test/integration/spark-databricks-odbc.dbtspec +++ b/test/integration/spark-databricks-odbc-cluster.dbtspec @@ -18,7 +18,7 @@ projects: facts: base: rowcount: 10 - extended: + added: rowcount: 20 - overrides: snapshot_strategy_check_cols dbt_project_yml: &file_format_delta @@ -32,4 +32,12 @@ projects: - overrides: snapshot_strategy_timestamp dbt_project_yml: *file_format_delta sequences: + test_dbt_empty: empty + test_dbt_base: base + test_dbt_ephemeral: ephemeral test_dbt_incremental: incremental + test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp + test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols + test_dbt_data_test: data_test + test_dbt_ephemeral_data_tests: data_test_ephemeral_models + test_dbt_schema_test: schema_test diff --git a/test/integration/spark-databricks-odbc-sql-endpoint.dbtspec b/test/integration/spark-databricks-odbc-sql-endpoint.dbtspec new file mode 100644 index 000000000..d0e23dabc --- /dev/null +++ b/test/integration/spark-databricks-odbc-sql-endpoint.dbtspec @@ -0,0 +1,44 @@ +target: + type: spark + host: "{{ env_var('DBT_DATABRICKS_HOST_NAME') }}" + endpoint: "{{ env_var('DBT_DATABRICKS_ENDPOINT') }}" + token: "{{ env_var('DBT_DATABRICKS_TOKEN') }}" + method: odbc + driver: "{{ env_var('ODBC_DRIVER') }}" + port: 443 + schema: "analytics_{{ var('_dbt_random_suffix') }}" + connect_retries: 5 + connect_timeout: 60 +projects: + - overrides: incremental + paths: + "models/incremental.sql": + materialized: incremental + body: "select * from {{ source('raw', 'seed') }}" + facts: + base: + rowcount: 10 + added: + rowcount: 20 + - overrides: snapshot_strategy_check_cols + dbt_project_yml: &file_format_delta + # we're going to UPDATE the seed tables as part of testing, so we must make them delta format + seeds: + dbt_test_project: + file_format: delta + snapshots: + dbt_test_project: + file_format: delta + - overrides: snapshot_strategy_timestamp + dbt_project_yml: *file_format_delta +sequences: + test_dbt_empty: empty + test_dbt_base: base + test_dbt_ephemeral: ephemeral + # The SQL Endpoint does not support `create temporary view` + # test_dbt_incremental: incremental + test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp + test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols + test_dbt_data_test: data_test + test_dbt_ephemeral_data_tests: data_test_ephemeral_models + test_dbt_schema_test: schema_test diff --git a/test/integration/spark.dbtspec b/test/integration/spark-thrift.dbtspec similarity index 98% rename from test/integration/spark.dbtspec rename to test/integration/spark-thrift.dbtspec index 68b3c919f..58f5a9065 100644 --- a/test/integration/spark.dbtspec +++ b/test/integration/spark-thrift.dbtspec @@ -16,7 +16,7 @@ projects: facts: base: rowcount: 10 - extended: + added: rowcount: 20 sequences: test_dbt_empty: empty diff --git a/tox.ini b/tox.ini index 06735a686..f865309f6 100644 --- a/tox.ini +++ b/tox.ini @@ -18,28 +18,37 @@ deps = -r{toxinidir}/requirements.txt -r{toxinidir}/dev_requirements.txt -[testenv:integration-spark-databricks] +[testenv:integration-spark-databricks-http] basepython = python3 -commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-databricks.dbtspec' +commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-databricks-http.dbtspec' passenv = DBT_DATABRICKS_HOST_NAME DBT_DATABRICKS_CLUSTER_NAME DBT_DATABRICKS_TOKEN DBT_INVOCATION_ENV deps = -r{toxinidir}/requirements.txt -r{toxinidir}/dev_requirements.txt -e. -[testenv:integration-spark-databricks-odbc] +[testenv:integration-spark-databricks-odbc-cluster] basepython = python3 -commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-databricks-odbc.dbtspec' +commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-databricks-odbc-cluster.dbtspec' passenv = DBT_DATABRICKS_HOST_NAME DBT_DATABRICKS_CLUSTER_NAME DBT_DATABRICKS_TOKEN DBT_INVOCATION_ENV ODBC_DRIVER deps = -r{toxinidir}/requirements.txt -r{toxinidir}/dev_requirements.txt -e. +[testenv:integration-spark-databricks-odbc-sql-endpoint] +basepython = python3 +commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-databricks-odbc-sql-endpoint.dbtspec' +passenv = DBT_DATABRICKS_HOST_NAME DBT_DATABRICKS_ENDPOINT DBT_DATABRICKS_TOKEN DBT_INVOCATION_ENV ODBC_DRIVER +deps = + -r{toxinidir}/requirements.txt + -r{toxinidir}/dev_requirements.txt + -e. + [testenv:integration-spark-thrift] basepython = python3 -commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark.dbtspec' +commands = /bin/bash -c '{envpython} -m pytest -v test/integration/spark-thrift.dbtspec' passenv = DBT_INVOCATION_ENV deps = -r{toxinidir}/requirements.txt From a18be085900f8c3f0c19943d8606c49e5f7ed63c Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Fri, 30 Oct 2020 13:43:48 -0400 Subject: [PATCH 05/10] set up integration tests --- .circleci/config.yml | 51 ++++++++++++++++++------------- dbt/adapters/spark/connections.py | 13 +++++++- setup.py | 6 ++-- 3 files changed, 46 insertions(+), 24 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index fe73dae88..4f79c7076 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -5,7 +5,7 @@ jobs: environment: DBT_INVOCATION_ENV: circle docker: - - image: fishtownanalytics/test-container:9 + - image: fishtownanalytics/test-container:10 steps: - checkout - run: tox -e flake8,unit @@ -14,7 +14,7 @@ jobs: environment: DBT_INVOCATION_ENV: circle docker: - - image: fishtownanalytics/test-container:9 + - image: fishtownanalytics/test-container:10 - image: godatadriven/spark:2 environment: WAIT_FOR: localhost:5432 @@ -46,11 +46,11 @@ jobs: - store_artifacts: path: ./logs - integration-spark-databricks: + integration-spark-databricks-http: environment: DBT_INVOCATION_ENV: circle docker: - - image: fishtownanalytics/test-container:9 + - image: fishtownanalytics/test-container:10 steps: - checkout - run: @@ -60,19 +60,28 @@ jobs: - store_artifacts: path: ./logs - # integration-spark-databricks-odbc: - # environment: - # DBT_INVOCATION_ENV: circle - # docker: - # - image: kwigley/spark-test-container:1 - # steps: - # - checkout - # - run: - # name: Run integration tests - # command: ODBC_DRIVER=Simba tox -e integration-spark-databricks-odbc-cluster,integration-spark-databricks-odbc-sql-endpoint - # no_output_timeout: 1h - # - store_artifacts: - # path: ./logs + integration-spark-databricks-odbc: + environment: + DBT_INVOCATION_ENV: circle + ODBC_DRIVER: Simba + ODBC_DRIVER_URL: https://databricks.com/wp-content/uploads/drivers-2020/SimbaSparkODBC-2.6.16.1019-Debian-64bit.zip + docker: + - image: fishtownanalytics/test-container:10 + command: | + curl -L $ODBC_DRIVER_URL > /tmp/simba_odbc.zip + unzip /tmp/simba_odbc.zip -d /tmp/ + dpkg -i /tmp/SimbaSparkODBC-*/*.deb + echo "[$ODBC_DRIVER]\nDriver = /opt/simba/spark/lib/64/libsparkodbc_sb64.so" >> /etc/odbcinst.ini + rm /tmp/simba_odbc.zip + rm -rf /tmp/SimbaSparkODBC* + steps: + - checkout + - run: + name: Run integration tests + command: tox -e integration-spark-databricks-odbc-cluster,integration-spark-databricks-odbc-sql-endpoint + no_output_timeout: 1h + - store_artifacts: + path: ./logs workflows: version: 2 @@ -82,9 +91,9 @@ workflows: - integration-spark-thrift: requires: - unit - - integration-spark-databricks: + - integration-spark-databricks-http: + requires: + - unit + - integration-spark-databricks-odbc: requires: - unit - # - integration-spark-databricks-odbc: - # requires: - # - unit diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index bd0c44622..3dc06470c 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -11,7 +11,10 @@ from TCLIService.ttypes import TOperationState as ThriftState from thrift.transport import THttpClient from pyhive import hive -import pyodbc +try: + import pyodbc +except ImportError: + pyodbc = None from datetime import datetime import sqlparams @@ -67,6 +70,14 @@ def __post_init__(self): ) self.database = None + if self.method == SparkConnectionMethod.ODBC and pyodbc is None: + raise dbt.exceptions.RuntimeException( + f"{self.method} connection method requires " + "additional dependencies. \n" + "Install the additional required dependencies with " + "`pip install dbt-spark[ODBC]`" + ) + @property def type(self): return 'spark' diff --git a/setup.py b/setup.py index b4104baa4..152a7b699 100644 --- a/setup.py +++ b/setup.py @@ -62,8 +62,10 @@ def _dbt_spark_version(): install_requires=[ f'dbt-core=={dbt_version}', 'PyHive[hive]>=0.6.0,<0.7.0', - 'pyodbc>=4.0.30', 'sqlparams>=3.0.0', 'thrift>=0.11.0,<0.12.0' - ] + ], + extra_requires={ + "ODBC": ['pyodbc>=4.0.30'], + } ) From 165d83b6adb18c282b6f4ce1ce4371e023e16e53 Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Mon, 2 Nov 2020 11:46:37 -0500 Subject: [PATCH 06/10] add unit test and update integration test image for ODBC tests --- .circleci/config.yml | 15 ++-- dbt/adapters/spark/connections.py | 21 +++-- test/unit/test_adapter.py | 127 ++++++++++++++++++++++++++++-- 3 files changed, 138 insertions(+), 25 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 4f79c7076..508eb0ab5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -63,17 +63,12 @@ jobs: integration-spark-databricks-odbc: environment: DBT_INVOCATION_ENV: circle - ODBC_DRIVER: Simba - ODBC_DRIVER_URL: https://databricks.com/wp-content/uploads/drivers-2020/SimbaSparkODBC-2.6.16.1019-Debian-64bit.zip + ODBC_DRIVER: Simba # TODO: move to env var to test image docker: - - image: fishtownanalytics/test-container:10 - command: | - curl -L $ODBC_DRIVER_URL > /tmp/simba_odbc.zip - unzip /tmp/simba_odbc.zip -d /tmp/ - dpkg -i /tmp/SimbaSparkODBC-*/*.deb - echo "[$ODBC_DRIVER]\nDriver = /opt/simba/spark/lib/64/libsparkodbc_sb64.so" >> /etc/odbcinst.ini - rm /tmp/simba_odbc.zip - rm -rf /tmp/SimbaSparkODBC* + - image: 828731156495.dkr.ecr.us-east-1.amazonaws.com/dbt-spark-odbc-test-container:latest + aws_auth: + aws_access_key_id: $AWS_ACCESS_KEY_ID_STAGING + aws_secret_access_key: $AWS_SECRET_ACCESS_KEY_STAGING steps: - checkout - run: diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index 3dc06470c..27c999cd3 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -78,6 +78,16 @@ def __post_init__(self): "`pip install dbt-spark[ODBC]`" ) + if ( + self.method == SparkConnectionMethod.ODBC and + self.cluster and + self.endpoint + ): + raise dbt.exceptions.RuntimeException( + "`cluster` and `endpoint` cannot both be set when" + f" using {self.method} method to connect to Spark" + ) + @property def type(self): return 'spark' @@ -217,10 +227,10 @@ def execute(self, sql, bindings=None): class SparkConnectionManager(SQLConnectionManager): TYPE = 'spark' - SPARK_CLUSTER_HTTP_PATH = "sql/protocolv1/o/{organization}/{cluster}" + SPARK_CLUSTER_HTTP_PATH = "/sql/protocolv1/o/{organization}/{cluster}" SPARK_SQL_ENDPOINT_HTTP_PATH = "/sql/1.0/endpoints/{endpoint}" SPARK_CONNECTION_URL = ( - "https://{host}:{port}/" + SPARK_CLUSTER_HTTP_PATH + "https://{host}:{port}" + SPARK_CLUSTER_HTTP_PATH ) @contextmanager @@ -317,12 +327,7 @@ def open(cls, connection): handle = PyhiveConnectionWrapper(conn) elif creds.method == SparkConnectionMethod.ODBC: http_path = None - if creds.cluster and creds.endpoint: - raise dbt.exceptions.DbtProfileError( - "`cluster` and `endpoint` cannot both be set when" - " using the odbc method to connect to Spark" - ) - elif creds.cluster is not None: + if creds.cluster is not None: required_fields = ['driver', 'host', 'port', 'token', 'organization', 'cluster'] http_path = cls.SPARK_CLUSTER_HTTP_PATH.format( diff --git a/test/unit/test_adapter.py b/test/unit/test_adapter.py index 70bc955f5..2cbd7810d 100644 --- a/test/unit/test_adapter.py +++ b/test/unit/test_adapter.py @@ -73,6 +73,42 @@ def _get_target_thrift_kerberos(self, project): }, 'target': 'test' }) + + def _get_target_odbc_cluster(self, project): + return config_from_parts_or_dicts(project, { + 'outputs': { + 'test': { + 'type': 'spark', + 'method': 'odbc', + 'schema': 'analytics', + 'host': 'myorg.sparkhost.com', + 'port': 443, + 'token': 'abc123', + 'organization': '0123456789', + 'cluster': '01234-23423-coffeetime', + 'driver': 'Simba', + } + }, + 'target': 'test' + }) + + def _get_target_odbc_sql_endpoint(self, project): + return config_from_parts_or_dicts(project, { + 'outputs': { + 'test': { + 'type': 'spark', + 'method': 'odbc', + 'schema': 'analytics', + 'host': 'myorg.sparkhost.com', + 'port': 443, + 'token': 'abc123', + 'endpoint': '012342342393920a', + 'driver': 'Simba', + } + }, + 'target': 'test' + }) + def test_http_connection(self): config = self._get_target_http(self.project_cfg) adapter = SparkAdapter(config) @@ -81,7 +117,8 @@ def hive_http_connect(thrift_transport): self.assertEqual(thrift_transport.scheme, 'https') self.assertEqual(thrift_transport.port, 443) self.assertEqual(thrift_transport.host, 'myorg.sparkhost.com') - self.assertEqual(thrift_transport.path, '/sql/protocolv1/o/0123456789/01234-23423-coffeetime') + self.assertEqual( + thrift_transport.path, '/sql/protocolv1/o/0123456789/01234-23423-coffeetime') # with mock.patch.object(hive, 'connect', new=hive_http_connect): with mock.patch('dbt.adapters.spark.connections.hive.connect', new=hive_http_connect): @@ -90,7 +127,8 @@ def hive_http_connect(thrift_transport): self.assertEqual(connection.state, 'open') self.assertIsNotNone(connection.handle) - self.assertEqual(connection.credentials.cluster, '01234-23423-coffeetime') + self.assertEqual(connection.credentials.cluster, + '01234-23423-coffeetime') self.assertEqual(connection.credentials.token, 'abc123') self.assertEqual(connection.credentials.schema, 'analytics') self.assertIsNone(connection.credentials.database) @@ -135,6 +173,56 @@ def hive_thrift_connect(host, port, username, auth, kerberos_service_name): self.assertEqual(connection.credentials.schema, 'analytics') self.assertIsNone(connection.credentials.database) + def test_odbc_cluster_connection(self): + config = self._get_target_odbc_cluster(self.project_cfg) + adapter = SparkAdapter(config) + + def pyodbc_connect(connection_str, autocommit): + self.assertTrue(autocommit) + self.assertIn('driver=simba;', connection_str.lower()) + self.assertIn('port=443;', connection_str.lower()) + self.assertIn('host=myorg.sparkhost.com;', + connection_str.lower()) + self.assertIn( + 'httppath=/sql/protocolv1/o/0123456789/01234-23423-coffeetime;', connection_str.lower()) # noqa + + with mock.patch('dbt.adapters.spark.connections.pyodbc.connect', new=pyodbc_connect): # noqa + connection = adapter.acquire_connection('dummy') + connection.handle # trigger lazy-load + + self.assertEqual(connection.state, 'open') + self.assertIsNotNone(connection.handle) + self.assertEqual(connection.credentials.cluster, + '01234-23423-coffeetime') + self.assertEqual(connection.credentials.token, 'abc123') + self.assertEqual(connection.credentials.schema, 'analytics') + self.assertIsNone(connection.credentials.database) + + def test_odbc_endpoint_connection(self): + config = self._get_target_odbc_sql_endpoint(self.project_cfg) + adapter = SparkAdapter(config) + + def pyodbc_connect(connection_str, autocommit): + self.assertTrue(autocommit) + self.assertIn('driver=simba;', connection_str.lower()) + self.assertIn('port=443;', connection_str.lower()) + self.assertIn('host=myorg.sparkhost.com;', + connection_str.lower()) + self.assertIn( + 'httppath=/sql/1.0/endpoints/012342342393920a;', connection_str.lower()) # noqa + + with mock.patch('dbt.adapters.spark.connections.pyodbc.connect', new=pyodbc_connect): # noqa + connection = adapter.acquire_connection('dummy') + connection.handle # trigger lazy-load + + self.assertEqual(connection.state, 'open') + self.assertIsNotNone(connection.handle) + self.assertEqual(connection.credentials.endpoint, + '012342342393920a') + self.assertEqual(connection.credentials.token, 'abc123') + self.assertEqual(connection.credentials.schema, 'analytics') + self.assertIsNone(connection.credentials.database) + def test_parse_relation(self): self.maxDiff = None rel_type = SparkRelation.get_relation_type.Table @@ -169,10 +257,12 @@ def test_parse_relation(self): ('Partition Provider', 'Catalog') ] - input_cols = [Row(keys=['col_name', 'data_type'], values=r) for r in plain_rows] + input_cols = [Row(keys=['col_name', 'data_type'], values=r) + for r in plain_rows] config = self._get_target_http(self.project_cfg) - rows = SparkAdapter(config).parse_describe_extended(relation, input_cols) + rows = SparkAdapter(config).parse_describe_extended( + relation, input_cols) self.assertEqual(len(rows), 3) self.assertEqual(rows[0].to_dict(omit_none=False), { 'table_database': None, @@ -247,10 +337,12 @@ def test_parse_relation_with_statistics(self): ('Partition Provider', 'Catalog') ] - input_cols = [Row(keys=['col_name', 'data_type'], values=r) for r in plain_rows] + input_cols = [Row(keys=['col_name', 'data_type'], values=r) + for r in plain_rows] config = self._get_target_http(self.project_cfg) - rows = SparkAdapter(config).parse_describe_extended(relation, input_cols) + rows = SparkAdapter(config).parse_describe_extended( + relation, input_cols) self.assertEqual(len(rows), 1) self.assertEqual(rows[0].to_dict(omit_none=False), { 'table_database': None, @@ -283,7 +375,8 @@ def test_relation_with_database(self): adapter.Relation.create(schema='different', identifier='table') with self.assertRaises(RuntimeException): # not fine - database set - adapter.Relation.create(database='something', schema='different', identifier='table') + adapter.Relation.create( + database='something', schema='different', identifier='table') def test_profile_with_database(self): profile = { @@ -305,3 +398,23 @@ def test_profile_with_database(self): } with self.assertRaises(RuntimeException): config_from_parts_or_dicts(self.project_cfg, profile) + + def test_profile_with_cluster_and_sql_endpoint(self): + profile = { + 'outputs': { + 'test': { + 'type': 'spark', + 'method': 'odbc', + 'schema': 'analytics', + 'host': 'myorg.sparkhost.com', + 'port': 443, + 'token': 'abc123', + 'organization': '0123456789', + 'cluster': '01234-23423-coffeetime', + 'endpoint': '0123412341234e', + } + }, + 'target': 'test' + } + with self.assertRaises(RuntimeException): + config_from_parts_or_dicts(self.project_cfg, profile) From 9db1e6d2fce4c37fbc0e027f9ce857a24fda5be3 Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Mon, 2 Nov 2020 13:22:40 -0500 Subject: [PATCH 07/10] update docs --- .circleci/config.yml | 1 + README.md | 74 +++++++++++++++++++++++++++++++------------- 2 files changed, 53 insertions(+), 22 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 508eb0ab5..bcbeceef1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -65,6 +65,7 @@ jobs: DBT_INVOCATION_ENV: circle ODBC_DRIVER: Simba # TODO: move to env var to test image docker: + # image based on `fishtownanalytics/test-container` w/ Simba ODBC Spark driver installed - image: 828731156495.dkr.ecr.us-east-1.amazonaws.com/dbt-spark-odbc-test-container:latest aws_auth: aws_access_key_id: $AWS_ACCESS_KEY_ID_STAGING diff --git a/README.md b/README.md index d01494fe3..8c70a09cc 100644 --- a/README.md +++ b/README.md @@ -27,11 +27,21 @@ For more information on using Spark with dbt, consult the dbt documentation: ### Installation This plugin can be installed via pip: -``` +```bash # Install dbt-spark from PyPi: $ pip install dbt-spark ``` +dbt-spark also supports connections via ODBC driver, but it requires [`pyodbc`](https://github.com/mkleehammer/pyodbc). You can install it seperately or via pip as well: + +```bash +# Install dbt-spark from PyPi: +$ pip install "dbt-spark[ODBC]" +``` + +See https://github.com/mkleehammer/pyodbc/wiki/Install for more info about installing `pyodbc`. + + ### Configuring your profile **Connection Method** @@ -40,18 +50,20 @@ Connections can be made to Spark in two different modes. The `http` mode is used A dbt profile can be configured to run against Spark using the following configuration: -| Option | Description | Required? | Example | -|---------|----------------------------------------------------|-------------------------|--------------------------| -| method | Specify the connection method (`thrift` or `http`) | Required | `http` | -| schema | Specify the schema (database) to build models into | Required | `analytics` | -| host | The hostname to connect to | Required | `yourorg.sparkhost.com` | -| port | The port to connect to the host on | Optional (default: 443 for `http`, 10001 for `thrift`) | `443` | -| token | The token to use for authenticating to the cluster | Required for `http` | `abc123` | -| organization | The id of the Azure Databricks workspace being used; only for Azure Databricks | See Databricks Note | `1234567891234567` | -| cluster | The name of the cluster to connect to | Required for `http` | `01234-23423-coffeetime` | -| user | The username to use to connect to the cluster | Optional | `hadoop` | -| connect_timeout | The number of seconds to wait before retrying to connect to a Pending Spark cluster | Optional (default: 10) | `60` | -| connect_retries | The number of times to try connecting to a Pending Spark cluster before giving up | Optional (default: 0) | `5` | +| Option | Description | Required? | Example | +| --------------- | ----------------------------------------------------------------------------------- | ------------------------------------------------------------------ | ---------------------------------------------- | +| method | Specify the connection method (`thrift` or `http` or `odbc`) | Required | `http` | +| schema | Specify the schema (database) to build models into | Required | `analytics` | +| host | The hostname to connect to | Required | `yourorg.sparkhost.com` | +| port | The port to connect to the host on | Optional (default: 443 for `http` and `odbc`, 10001 for `thrift`) | `443` | +| token | The token to use for authenticating to the cluster | Required for `http` and `odbc` | `abc123` | +| organization | The id of the Azure Databricks workspace being used; only for Azure Databricks | See Databricks Note | `1234567891234567` | +| cluster | The name of the cluster to connect to | Required for `http` and `odbc` if connecting to a specific cluster | `01234-23423-coffeetime` | +| endpoint | The ID of the SQL endpoint to connect to | Required for `odbc` if connecting to SQL endpoint | `1234567891234a` | +| driver | Path of ODBC driver installed or name of ODBC DSN configured | Required for `odbc` | `/opt/simba/spark/lib/64/libsparkodbc_sb64.so` | +| user | The username to use to connect to the cluster | Optional | `hadoop` | +| connect_timeout | The number of seconds to wait before retrying to connect to a Pending Spark cluster | Optional (default: 10) | `60` | +| connect_retries | The number of times to try connecting to a Pending Spark cluster before giving up | Optional (default: 0) | `5` | **Databricks Note** @@ -104,6 +116,24 @@ your_profile_name: connect_timeout: 60 ``` +**ODBC connection** +``` +your_profile_name: + target: dev + outputs: + dev: + method: odbc + type: spark + schema: analytics + host: yourorg.sparkhost.com + organization: 1234567891234567 # Azure Databricks ONLY + port: 443 + token: abc123 + cluster: 01234-23423-coffeetime + driver: path/to/driver + connect_retries: 5 + connect_timeout: 60 +``` ### Usage Notes @@ -113,15 +143,15 @@ your_profile_name: The following configurations can be supplied to models run with the dbt-spark plugin: -| Option | Description | Required? | Example | -|---------|----------------------------------------------------|-------------------------|--------------------------| -| file_format | The file format to use when creating tables (`parquet`, `delta`, `csv`, `json`, `text`, `jdbc`, `orc`, `hive` or `libsvm`). | Optional | `parquet`| -| location_root | The created table uses the specified directory to store its data. The table alias is appended to it. | Optional | `/mnt/root` | -| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `partition_1` | -| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `cluster_1` | -| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | `8` | -| incremental_strategy | The strategy to use for incremental models (`insert_overwrite` or `merge`). Note `merge` requires `file_format` = `delta` and `unique_key` to be specified. | Optional (default: `insert_overwrite`) | `merge` | -| persist_docs | Whether dbt should include the model description as a table `comment` | Optional | `{'relation': true}` | +| Option | Description | Required? | Example | +| -------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------- | -------------------- | +| file_format | The file format to use when creating tables (`parquet`, `delta`, `csv`, `json`, `text`, `jdbc`, `orc`, `hive` or `libsvm`). | Optional | `parquet` | +| location_root | The created table uses the specified directory to store its data. The table alias is appended to it. | Optional | `/mnt/root` | +| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `partition_1` | +| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `cluster_1` | +| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | `8` | +| incremental_strategy | The strategy to use for incremental models (`insert_overwrite` or `merge`). Note `merge` requires `file_format` = `delta` and `unique_key` to be specified. | Optional (default: `insert_overwrite`) | `merge` | +| persist_docs | Whether dbt should include the model description as a table `comment` | Optional | `{'relation': true}` | **Incremental Models** From 95e0d72ffccfe3ea92b38f3ccbae36510b6dbe59 Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Mon, 2 Nov 2020 13:26:11 -0500 Subject: [PATCH 08/10] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8c70a09cc..e72ea13b4 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ $ pip install dbt-spark dbt-spark also supports connections via ODBC driver, but it requires [`pyodbc`](https://github.com/mkleehammer/pyodbc). You can install it seperately or via pip as well: ```bash -# Install dbt-spark from PyPi: +# Install dbt-spark w/ pyodbc from PyPi: $ pip install "dbt-spark[ODBC]" ``` From 534486e3237b4a62e63eab712a267eff67ecc053 Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Wed, 4 Nov 2020 09:21:26 -0500 Subject: [PATCH 09/10] Update .circleci/config.yml --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index bcbeceef1..a0d9ac493 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -63,7 +63,7 @@ jobs: integration-spark-databricks-odbc: environment: DBT_INVOCATION_ENV: circle - ODBC_DRIVER: Simba # TODO: move to env var to test image + ODBC_DRIVER: Simba # TODO: move env var to Docker image docker: # image based on `fishtownanalytics/test-container` w/ Simba ODBC Spark driver installed - image: 828731156495.dkr.ecr.us-east-1.amazonaws.com/dbt-spark-odbc-test-container:latest From f882e157814d68901efab06430bf3429d6403d0c Mon Sep 17 00:00:00 2001 From: Kyle Wigley Date: Thu, 5 Nov 2020 15:38:36 -0500 Subject: [PATCH 10/10] Apply suggestions from code review Co-authored-by: Jeremy Cohen --- README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e72ea13b4..e253b7efe 100644 --- a/README.md +++ b/README.md @@ -129,10 +129,14 @@ your_profile_name: organization: 1234567891234567 # Azure Databricks ONLY port: 443 token: abc123 + + # one of: cluster: 01234-23423-coffeetime + endpoint: coffee01234time + driver: path/to/driver - connect_retries: 5 - connect_timeout: 60 + connect_retries: 5 # cluster only + connect_timeout: 60 # cluster only ```