Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support connection extra parameters in MsSqlHook #44190

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,14 @@ def _generate_insert_sql(self, table, values, target_fields, replace, **kwargs)
def get_conn(self) -> PymssqlConnection:
"""Return ``pymssql`` connection object."""
conn = self.connection
extra_conn_args = {key: val for key, val in conn.extra_dejson.items() if key != "sqlalchemy_scheme"}
return pymssql.connect(
server=conn.host,
user=conn.login,
password=conn.password,
database=self.schema or conn.schema,
port=str(conn.port),
**extra_conn_args,
)

def set_autocommit(
Expand Down
31 changes: 31 additions & 0 deletions providers/tests/microsoft/mssql/hooks/test_mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
port=8081,
extra={"SQlalchemy_Scheme": "mssql+testdriver", "myparam": "5@-//*"},
)
PYMSSQL_CONN_WITH_EXTRA = Connection(
conn_type="mssql",
host="test-server",
schema="test-db",
login="test-user",
password="test-password",
port=8081,
extra={"login_timeout": 30, "charset": "utf8", "tds_version": "7.0", "appname": "airflow"},
)


def get_primary_keys(self, table: str) -> list[str]:
Expand All @@ -79,6 +88,28 @@ def test_get_conn_should_return_connection(self, get_connection, mssql_get_conn)
assert mssql_get_conn.return_value == conn
mssql_get_conn.assert_called_once()

@mock.patch("airflow.providers.microsoft.mssql.hooks.mssql.pymssql.connect")
@mock.patch("airflow.providers.common.sql.hooks.sql.DbApiHook.get_connection")
def test_get_conn_with_extra_parameters(self, mock_get_connection, mock_connect):
mock_get_connection.return_value = PYMSSQL_CONN_WITH_EXTRA

hook = MsSqlHook()
hook.get_conn()

mock_connect.assert_called_once_with(
server="test-server",
user="test-user",
password="test-password",
database="test-db",
port="8081",
login_timeout=30,
charset="utf8",
tds_version="7.0",
appname="airflow",
)

assert hook.sqlalchemy_scheme == hook.DEFAULT_SQLALCHEMY_SCHEME

@mock.patch("airflow.providers.microsoft.mssql.hooks.mssql.MsSqlHook.get_conn")
@mock.patch("airflow.providers.common.sql.hooks.sql.DbApiHook.get_connection")
def test_set_autocommit_should_invoke_autocommit(self, get_connection, mssql_get_conn):
Expand Down
Loading