Skip to content

Commit

Permalink
astronmer#112: add create_temp_table support
Browse files Browse the repository at this point in the history
  • Loading branch information
antelmoa committed Feb 1, 2024
1 parent b0f9880 commit 14c1930
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
6 changes: 6 additions & 0 deletions great_expectations_provider/operators/great_expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,14 @@ def build_snowflake_connection_config_from_hook(self) -> Dict[str, str]:
def build_configured_sql_datasource_config_from_conn_id(
self,
) -> Datasource:
create_temp_table = self.conn.extra_dejson.get("create_temp_table") if self.conn.extra_dejson.get("create_temp_table") is not None else True

datasource_config = {
"name": f"{self.conn.conn_id}_configured_sql_datasource",
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": create_temp_table,
**self.make_connection_configuration(),
},
"data_connectors": {
Expand Down Expand Up @@ -416,11 +419,14 @@ def build_configured_sql_datasource_batch_request(self):
def build_runtime_sql_datasource_config_from_conn_id(
self,
) -> Datasource:
create_temp_table = self.conn.extra_dejson.get("create_temp_table") if self.conn.extra_dejson.get("create_temp_table") is not None else True

datasource_config = {
"name": f"{self.conn.conn_id}_runtime_sql_datasource",
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": create_temp_table,
**self.make_connection_configuration(),
},
"data_connectors": {
Expand Down
64 changes: 63 additions & 1 deletion tests/operators/test_great_expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def constructed_sql_runtime_datasource():
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": True,
"connection_string": "sqlite:///host",
},
"data_connectors": {
Expand All @@ -201,6 +202,7 @@ def constructed_sql_configured_datasource():
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": True,
"connection_string": "sqlite:///host",
},
"data_connectors": {
Expand All @@ -223,6 +225,7 @@ def constructed_sql_configured_datasource():
@pytest.fixture()
def mock_airflow_conn():
conn = mock.Mock(conn_id="sqlite_conn", schema="my_schema", host="host", conn_type="sqlite")
conn.extra_dejson.get.return_value = True
return conn


Expand Down Expand Up @@ -1067,7 +1070,6 @@ def test_great_expectations_operator__make_connection_string_data_asset_name_sch
assert operator.make_connection_configuration() == test_conn_conf
assert operator.data_asset_name == "test_table"


def test_great_expectations_operator__build_configured_sql_datasource_config_from_conn_id_uses_schema_override():
test_conn_conf = {"connection_string": "sqlite:///host"}
datasource_config = {
Expand All @@ -1076,6 +1078,7 @@ def test_great_expectations_operator__build_configured_sql_datasource_config_fro
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": True,
**test_conn_conf,
},
"data_connectors": {
Expand Down Expand Up @@ -1118,6 +1121,65 @@ def test_great_expectations_operator__build_configured_sql_datasource_config_fro
assert isinstance(constructed_datasource, Datasource)
assert constructed_datasource.config == datasource_config

def test_great_expectations_operator__build_configured_sql_datasource_config_from_conn_id_uses_extra_create_temp_table(
constructed_sql_configured_datasource
):
constructed_sql_configured_datasource["execution_engine"]["create_temp_table"] = False;

operator = GreatExpectationsOperator(
task_id="task_id",
data_context_config=in_memory_data_context_config,
data_asset_name="default_schema.my_sqlite_table",
conn_id="sqlite_conn",
expectation_suite_name="suite",
schema="my_schema",
)
operator.conn = Connection(
conn_id="sqlite_conn",
conn_type="sqlite",
host="host",
login="user",
password="password",
schema="schema",
extra={
"create_temp_table": False
}
)

constructed_datasource = operator.build_configured_sql_datasource_config_from_conn_id()

assert isinstance(constructed_datasource, Datasource)
assert constructed_datasource.config == constructed_sql_configured_datasource

def test_great_expectations_operator__build_runtime_sql_datasource_config_from_conn_id_uses_extra_create_temp_table(
constructed_sql_runtime_datasource
):
constructed_sql_runtime_datasource["execution_engine"]["create_temp_table"] = False;

operator = GreatExpectationsOperator(
task_id="task_id",
data_context_config=in_memory_data_context_config,
data_asset_name="default_schema.my_sqlite_table",
conn_id="sqlite_conn",
expectation_suite_name="suite",
schema="my_schema",
)
operator.conn = Connection(
conn_id="sqlite_conn",
conn_type="sqlite",
host="host",
login="user",
password="password",
schema="schema",
extra={
"create_temp_table": False
}
)

constructed_datasource = operator.build_runtime_sql_datasource_config_from_conn_id()

assert isinstance(constructed_datasource, Datasource)
assert constructed_datasource.config == constructed_sql_runtime_datasource

def test_great_expectations_operator__make_connection_string_raise_error():
operator = GreatExpectationsOperator(
Expand Down

0 comments on commit 14c1930

Please sign in to comment.