From 33a660fda44f71dd7118f9a0e9947aff85049bcf Mon Sep 17 00:00:00 2001 From: "j.vos" Date: Mon, 3 Mar 2025 09:37:20 +0100 Subject: [PATCH 1/5] remove unused instance arguments --- cosmos/operators/_asynchronous/bigquery.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cosmos/operators/_asynchronous/bigquery.py b/cosmos/operators/_asynchronous/bigquery.py index e8879b0fe7..5af15b27ad 100644 --- a/cosmos/operators/_asynchronous/bigquery.py +++ b/cosmos/operators/_asynchronous/bigquery.py @@ -57,7 +57,7 @@ def _configure_bigquery_async_op_args(async_op_obj: Any, **kwargs: Any) -> Any: class DbtRunAirflowAsyncBigqueryOperator(BigQueryInsertJobOperator, AbstractDbtLocalBase): # type: ignore[misc] - template_fields: Sequence[str] = ("gcp_project", "dataset", "location", "compiled_sql") + template_fields: Sequence[str] = ("location", "compiled_sql") template_fields_renderers = { "compiled_sql": "sql", } @@ -73,9 +73,6 @@ def __init__( self.project_dir = project_dir self.profile_config = profile_config self.gcp_conn_id = self.profile_config.profile_mapping.conn_id # type: ignore - profile = self.profile_config.profile_mapping.profile # type: ignore - self.gcp_project = profile["project"] - self.dataset = profile["dataset"] self.extra_context = extra_context or {} self.configuration: dict[str, Any] = {} self.dbt_kwargs = dbt_kwargs or {} From 2de38eb9828811faf6043de87a846de7b1739e46 Mon Sep 17 00:00:00 2001 From: "j.vos" Date: Mon, 3 Mar 2025 10:06:06 +0100 Subject: [PATCH 2/5] adjust test --- tests/operators/_asynchronous/test_bigquery.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/operators/_asynchronous/test_bigquery.py b/tests/operators/_asynchronous/test_bigquery.py index f339c98805..5c121b2eec 100644 --- a/tests/operators/_asynchronous/test_bigquery.py +++ b/tests/operators/_asynchronous/test_bigquery.py @@ -38,8 +38,6 @@ def test_dbt_run_airflow_async_bigquery_operator_init(profile_config_mock): assert operator.project_dir == "/path/to/project" assert operator.profile_config == profile_config_mock assert operator.gcp_conn_id == "google_cloud_default" - assert operator.gcp_project == "test_project" - assert operator.dataset == "test_dataset" def test_dbt_run_airflow_async_bigquery_operator_base_cmd(profile_config_mock): From a307a37854a3eb7d9e4e41fc764a18b43f0bf586 Mon Sep 17 00:00:00 2001 From: "j.vos" Date: Mon, 3 Mar 2025 14:46:49 +0100 Subject: [PATCH 3/5] move back attributes into renamed method --- cosmos/operators/_asynchronous/bigquery.py | 14 ++++++++++---- tests/operators/_asynchronous/test_bigquery.py | 4 ++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/cosmos/operators/_asynchronous/bigquery.py b/cosmos/operators/_asynchronous/bigquery.py index 5af15b27ad..769a277790 100644 --- a/cosmos/operators/_asynchronous/bigquery.py +++ b/cosmos/operators/_asynchronous/bigquery.py @@ -57,7 +57,7 @@ def _configure_bigquery_async_op_args(async_op_obj: Any, **kwargs: Any) -> Any: class DbtRunAirflowAsyncBigqueryOperator(BigQueryInsertJobOperator, AbstractDbtLocalBase): # type: ignore[misc] - template_fields: Sequence[str] = ("location", "compiled_sql") + template_fields: Sequence[str] = ("gcp_project", "dataset", "location", "compiled_sql") template_fields_renderers = { "compiled_sql": "sql", } @@ -100,6 +100,8 @@ def __init__( self.async_context["profile_type"] = self.profile_config.get_profile_type() self.async_context["async_operator"] = BigQueryInsertJobOperator self.compiled_sql = "" + self.gcp_project = "" + self.dataset = "" @property def base_cmd(self) -> list[str]: @@ -142,10 +144,10 @@ def execute(self, context: Context, **kwargs: Any) -> None: super().execute(context=context) else: self.build_and_run_cmd(context=context, run_as_async=True, async_context=self.async_context) - self._store_compiled_sql(context=context) + self._store_template_fields(context=context) @provide_session - def _store_compiled_sql(self, context: Context, session: Session = NEW_SESSION) -> None: + def _store_template_fields(self, context: Context, session: Session = NEW_SESSION) -> None: from airflow.models.renderedtifields import RenderedTaskInstanceFields from airflow.models.taskinstance import TaskInstance @@ -156,6 +158,10 @@ def _store_compiled_sql(self, context: Context, session: Session = NEW_SESSION) self.log.debug("Executed SQL is: %s", sql) self.compiled_sql = sql + profile = self.profile_config.profile_mapping.profile # type: ignore + self.gcp_project = profile["project"] + self.dataset = profile["dataset"] + # need to refresh the rendered task field record in the db because Airflow only does this # before executing the task, not after ti = context["ti"] @@ -185,5 +191,5 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> Any: """ job_id = super().execute_complete(context=context, event=event) self.log.info("Configuration is %s", str(self.configuration)) - self._store_compiled_sql(context=context) + self._store_template_fields(context=context) return job_id diff --git a/tests/operators/_asynchronous/test_bigquery.py b/tests/operators/_asynchronous/test_bigquery.py index 5c121b2eec..560ca0f7c9 100644 --- a/tests/operators/_asynchronous/test_bigquery.py +++ b/tests/operators/_asynchronous/test_bigquery.py @@ -132,7 +132,7 @@ def test_store_compiled_sql(mock_rendered_ti, mock_get_remote_sql, profile_confi mock_task_instance.task = operator mock_context = {"ti": mock_task_instance} - operator._store_compiled_sql(mock_context, session=mock_session) + operator._store_template_fields(mock_context, session=mock_session) assert operator.compiled_sql == "SELECT * FROM test_table;" mock_rendered_ti.assert_called_once() @@ -140,7 +140,7 @@ def test_store_compiled_sql(mock_rendered_ti, mock_get_remote_sql, profile_confi mock_session.query().filter().delete.assert_called_once() -@patch("cosmos.operators._asynchronous.bigquery.DbtRunAirflowAsyncBigqueryOperator._store_compiled_sql") +@patch("cosmos.operators._asynchronous.bigquery.DbtRunAirflowAsyncBigqueryOperator._store_template_fields") def test_execute_complete(mock_store_sql, profile_config_mock): mock_context = Mock() mock_event = {"job_id": "test_job"} From 2d40934ae92e25fe235a492c009b460b37a87697 Mon Sep 17 00:00:00 2001 From: "j.vos" Date: Tue, 4 Mar 2025 09:59:58 +0100 Subject: [PATCH 4/5] remove type ignore --- cosmos/operators/_asynchronous/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/_asynchronous/bigquery.py b/cosmos/operators/_asynchronous/bigquery.py index 769a277790..1d28f5da3e 100644 --- a/cosmos/operators/_asynchronous/bigquery.py +++ b/cosmos/operators/_asynchronous/bigquery.py @@ -158,7 +158,7 @@ def _store_template_fields(self, context: Context, session: Session = NEW_SESSIO self.log.debug("Executed SQL is: %s", sql) self.compiled_sql = sql - profile = self.profile_config.profile_mapping.profile # type: ignore + profile = self.profile_config.profile_mapping.profile self.gcp_project = profile["project"] self.dataset = profile["dataset"] From bb69886283a0552b84b8a67042a45940629cbf0a Mon Sep 17 00:00:00 2001 From: "j.vos" Date: Tue, 4 Mar 2025 10:00:14 +0100 Subject: [PATCH 5/5] assert dataset and gcp project in test --- tests/operators/_asynchronous/test_bigquery.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/operators/_asynchronous/test_bigquery.py b/tests/operators/_asynchronous/test_bigquery.py index 560ca0f7c9..40717cd62c 100644 --- a/tests/operators/_asynchronous/test_bigquery.py +++ b/tests/operators/_asynchronous/test_bigquery.py @@ -133,8 +133,12 @@ def test_store_compiled_sql(mock_rendered_ti, mock_get_remote_sql, profile_confi mock_context = {"ti": mock_task_instance} operator._store_template_fields(mock_context, session=mock_session) + # check if gcp_project and dataset are set after the tasks gets executed assert operator.compiled_sql == "SELECT * FROM test_table;" + assert operator.dataset == "test_dataset" + assert operator.gcp_project == "test_project" + mock_rendered_ti.assert_called_once() mock_session.add.assert_called_once() mock_session.query().filter().delete.assert_called_once()