From 367461cdd5d160050e8c90dd51f6658ce6da08ef Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 27 May 2025 20:05:04 +0530 Subject: [PATCH 01/12] Fix async tests --- cosmos/operators/_asynchronous/__init__.py | 6 +++++- cosmos/operators/local.py | 2 +- tests/utils.py | 17 +++++++++++++---- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/cosmos/operators/_asynchronous/__init__.py b/cosmos/operators/_asynchronous/__init__.py index cbbb51f07e..3e31d7bdd8 100644 --- a/cosmos/operators/_asynchronous/__init__.py +++ b/cosmos/operators/_asynchronous/__init__.py @@ -42,7 +42,11 @@ def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> F return super().run_subprocess(command, env, cwd) def execute(self, context: Context, **kwargs: Any) -> None: - async_context = {"profile_type": self.profile_config.get_profile_type(), "run_id": context["run_id"]} + async_context = { + "setup_task": True, + "profile_type": self.profile_config.get_profile_type(), + "run_id": context["run_id"], + } self.build_and_run_cmd( context=context, cmd_flags=self.dbt_cmd_flags, run_as_async=True, async_context=async_context ) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index a4c2098803..25ea83cc53 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -569,7 +569,7 @@ def _handle_async_execution(self, tmp_project_dir: str, context: Context, async_ self._delete_sql_files() return - if settings.enable_setup_async_task: + if async_context.get("setup_task") and settings.enable_setup_async_task: self._upload_sql_files(tmp_project_dir, "run") else: sql = self._read_run_sql_from_target_dir(tmp_project_dir, async_context) diff --git a/tests/utils.py b/tests/utils.py index f75a155afb..3c815cf6cc 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -32,11 +32,14 @@ def run_dag(dag: DAG, conn_file_path: str | None = None) -> DagRun: def test_dag(dag, conn_file_path: str | None = None, custom_tester: bool = False) -> DagRun: + dr = None if custom_tester: - return test_old_dag(dag, conn_file_path) + dr = test_old_dag(dag, conn_file_path) + assert dr.state == DagRunState.SUCCESS, f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " elif AIRFLOW_VERSION >= version.Version("2.5"): if AIRFLOW_VERSION not in (Version("2.10.0"), Version("2.10.1"), Version("2.10.2")): - return dag.test() + dr = dag.test() + assert dr.state == DagRunState.SUCCESS, f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " else: # This is a work around until we fix the issue in Airflow: # https://github.com/apache/airflow/issues/42495 @@ -49,13 +52,19 @@ def test_dag(dag, conn_file_path: str | None = None, custom_tester: bool = False FAILED tests/test_example_dags.py::test_example_dag[user_defined_profile] """ try: - dag.test() + dr = dag.test() + assert ( + dr.state == DagRunState.SUCCESS + ), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " except sqlalchemy.exc.PendingRollbackError: warnings.warn( "Early versions of Airflow 2.10 have issues when running the test command with DatasetAlias / Datasets" ) else: - return test_old_dag(dag, conn_file_path) + dr = test_old_dag(dag, conn_file_path) + assert dr.state == DagRunState.SUCCESS, f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " + + return dr # DAG.test() was added in Airflow version 2.5.0. And to test on older Airflow versions, we need to copy the From 4714e64a4c279b851c216006704aa46d7f2037af Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 27 May 2025 20:21:40 +0530 Subject: [PATCH 02/12] Use monkeypath --- tests/test_example_dags.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 144f11e062..3169fd0c4c 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -2,7 +2,6 @@ import sys from pathlib import Path -from unittest.mock import patch try: from functools import cache @@ -125,9 +124,12 @@ def test_example_dag(session, dag_id: str): reason="See PR: https://github.com/apache/airflow/pull/34585 and Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", ) @pytest.mark.integration -@patch("cosmos.operators.local.settings.enable_setup_async_task", False) -@patch("cosmos.operators.local.settings.enable_teardown_async_task", False) -@patch("cosmos.operators._asynchronous.bigquery.settings.enable_setup_async_task", False) -def test_async_example_dag_without_setup_task(session): +# @patch("cosmos.operators.local.settings.enable_setup_async_task", False) +# @patch("cosmos.operators.local.settings.enable_teardown_async_task", False) +# @patch("cosmos.operators._asynchronous.bigquery.settings.enable_setup_async_task", False) +def test_async_example_dag_without_setup_task(monkeypatch): + monkeypatch.setattr("cosmos.operators.local.settings.enable_setup_async_task", False) + monkeypatch.setattr("cosmos.operators.local.settings.enable_teardown_async_task", False) + monkeypatch.setattr("cosmos.operators._asynchronous.bigquery.settings.enable_setup_async_task", False) for dag_id in async_dag_ids: run_dag(dag_id) From 51b1207035396f685a3f98567a32da3929dfb3b3 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 27 May 2025 20:47:20 +0530 Subject: [PATCH 03/12] Use monkeypath --- tests/test_example_dags.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 3169fd0c4c..f30f25de56 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -131,5 +131,7 @@ def test_async_example_dag_without_setup_task(monkeypatch): monkeypatch.setattr("cosmos.operators.local.settings.enable_setup_async_task", False) monkeypatch.setattr("cosmos.operators.local.settings.enable_teardown_async_task", False) monkeypatch.setattr("cosmos.operators._asynchronous.bigquery.settings.enable_setup_async_task", False) + monkeypatch.setattr("cosmos.settings.enable_setup_async_task", False) + monkeypatch.setattr("cosmos.settings.enable_teardown_async_task", False) for dag_id in async_dag_ids: run_dag(dag_id) From 2ee655a7fef3a1400214f5342e2dfdae3d06648a Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 27 May 2025 21:06:56 +0530 Subject: [PATCH 04/12] Use monkeypath --- cosmos/operators/local.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 25ea83cc53..75afa62401 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -572,6 +572,7 @@ def _handle_async_execution(self, tmp_project_dir: str, context: Context, async_ if async_context.get("setup_task") and settings.enable_setup_async_task: self._upload_sql_files(tmp_project_dir, "run") else: + self.log.info(f"task-id: {self.task_id}") sql = self._read_run_sql_from_target_dir(tmp_project_dir, async_context) profile_type = async_context["profile_type"] module_path = f"cosmos.operators._asynchronous.{profile_type}" From 4bad384fe45b56d39839d6f08788b15029f43d41 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 28 May 2025 12:54:27 +0530 Subject: [PATCH 05/12] Use monkeypath --- tests/test_example_dags.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index f30f25de56..8a72dd56c4 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -89,7 +89,7 @@ def get_dag_bag() -> DagBag: print(AIRFLOW_IGNORE_FILE.read_text()) db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) assert db.dags - assert not db.import_errors + #assert not db.import_errors return db @@ -127,11 +127,13 @@ def test_example_dag(session, dag_id: str): # @patch("cosmos.operators.local.settings.enable_setup_async_task", False) # @patch("cosmos.operators.local.settings.enable_teardown_async_task", False) # @patch("cosmos.operators._asynchronous.bigquery.settings.enable_setup_async_task", False) -def test_async_example_dag_without_setup_task(monkeypatch): +def test_async_example_dag_without_setup_task(session ,monkeypatch): monkeypatch.setattr("cosmos.operators.local.settings.enable_setup_async_task", False) monkeypatch.setattr("cosmos.operators.local.settings.enable_teardown_async_task", False) monkeypatch.setattr("cosmos.operators._asynchronous.bigquery.settings.enable_setup_async_task", False) monkeypatch.setattr("cosmos.settings.enable_setup_async_task", False) monkeypatch.setattr("cosmos.settings.enable_teardown_async_task", False) + monkeypatch.setattr("cosmos.airflow.graph.enable_setup_async_task", False) + monkeypatch.setattr("cosmos.airflow.graph.enable_teardown_async_task", False) for dag_id in async_dag_ids: run_dag(dag_id) From 6e6e2707027e1944e2bd809ca0abd9da851106d2 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 28 May 2025 13:22:40 +0530 Subject: [PATCH 06/12] Use monkeypath --- tests/test_example_dags.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 8a72dd56c4..50adef9428 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -124,16 +124,9 @@ def test_example_dag(session, dag_id: str): reason="See PR: https://github.com/apache/airflow/pull/34585 and Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", ) @pytest.mark.integration -# @patch("cosmos.operators.local.settings.enable_setup_async_task", False) -# @patch("cosmos.operators.local.settings.enable_teardown_async_task", False) -# @patch("cosmos.operators._asynchronous.bigquery.settings.enable_setup_async_task", False) def test_async_example_dag_without_setup_task(session ,monkeypatch): - monkeypatch.setattr("cosmos.operators.local.settings.enable_setup_async_task", False) - monkeypatch.setattr("cosmos.operators.local.settings.enable_teardown_async_task", False) - monkeypatch.setattr("cosmos.operators._asynchronous.bigquery.settings.enable_setup_async_task", False) - monkeypatch.setattr("cosmos.settings.enable_setup_async_task", False) - monkeypatch.setattr("cosmos.settings.enable_teardown_async_task", False) - monkeypatch.setattr("cosmos.airflow.graph.enable_setup_async_task", False) - monkeypatch.setattr("cosmos.airflow.graph.enable_teardown_async_task", False) + monkeypatch.setenv("AIRFLOW__COSMOS__ENABLE_SETUP_ASYNC_TASK", "false") + monkeypatch.setenv("AIRFLOW__COSMOS__ENABLE_TEARDOWN_ASYNC_TASK", "false") + for dag_id in async_dag_ids: run_dag(dag_id) From d492021442aa0e842c5583825d6a691df89d30c8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 07:54:32 +0000 Subject: [PATCH 07/12] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_example_dags.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 50adef9428..9e587e2323 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -89,7 +89,7 @@ def get_dag_bag() -> DagBag: print(AIRFLOW_IGNORE_FILE.read_text()) db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) assert db.dags - #assert not db.import_errors + # assert not db.import_errors return db @@ -124,7 +124,7 @@ def test_example_dag(session, dag_id: str): reason="See PR: https://github.com/apache/airflow/pull/34585 and Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", ) @pytest.mark.integration -def test_async_example_dag_without_setup_task(session ,monkeypatch): +def test_async_example_dag_without_setup_task(session, monkeypatch): monkeypatch.setenv("AIRFLOW__COSMOS__ENABLE_SETUP_ASYNC_TASK", "false") monkeypatch.setenv("AIRFLOW__COSMOS__ENABLE_TEARDOWN_ASYNC_TASK", "false") From afb2b82935cfb71e16f06037ca95c45a5fda798e Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Wed, 28 May 2025 13:42:23 +0530 Subject: [PATCH 08/12] Update cosmos/operators/local.py --- cosmos/operators/local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 75afa62401..c99de1bbeb 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -569,7 +569,7 @@ def _handle_async_execution(self, tmp_project_dir: str, context: Context, async_ self._delete_sql_files() return - if async_context.get("setup_task") and settings.enable_setup_async_task: + if settings.enable_setup_async_task: self._upload_sql_files(tmp_project_dir, "run") else: self.log.info(f"task-id: {self.task_id}") From 7b21d7da367832c6818ded7606ef4a3ac5db95d8 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Wed, 28 May 2025 13:42:46 +0530 Subject: [PATCH 09/12] Update cosmos/operators/_asynchronous/__init__.py --- cosmos/operators/_asynchronous/__init__.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/cosmos/operators/_asynchronous/__init__.py b/cosmos/operators/_asynchronous/__init__.py index 3e31d7bdd8..cbbb51f07e 100644 --- a/cosmos/operators/_asynchronous/__init__.py +++ b/cosmos/operators/_asynchronous/__init__.py @@ -42,11 +42,7 @@ def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> F return super().run_subprocess(command, env, cwd) def execute(self, context: Context, **kwargs: Any) -> None: - async_context = { - "setup_task": True, - "profile_type": self.profile_config.get_profile_type(), - "run_id": context["run_id"], - } + async_context = {"profile_type": self.profile_config.get_profile_type(), "run_id": context["run_id"]} self.build_and_run_cmd( context=context, cmd_flags=self.dbt_cmd_flags, run_as_async=True, async_context=async_context ) From 265db016607cde9b0c3dcb4813a50a2de68aef73 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Wed, 28 May 2025 13:43:00 +0530 Subject: [PATCH 10/12] Update cosmos/operators/local.py --- cosmos/operators/local.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index c99de1bbeb..a4c2098803 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -572,7 +572,6 @@ def _handle_async_execution(self, tmp_project_dir: str, context: Context, async_ if settings.enable_setup_async_task: self._upload_sql_files(tmp_project_dir, "run") else: - self.log.info(f"task-id: {self.task_id}") sql = self._read_run_sql_from_target_dir(tmp_project_dir, async_context) profile_type = async_context["profile_type"] module_path = f"cosmos.operators._asynchronous.{profile_type}" From be27783af9d15b12fb3e2f9a549f3b157e376503 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Wed, 28 May 2025 13:43:15 +0530 Subject: [PATCH 11/12] Update tests/test_example_dags.py --- tests/test_example_dags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 9e587e2323..669ab667c7 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -89,7 +89,7 @@ def get_dag_bag() -> DagBag: print(AIRFLOW_IGNORE_FILE.read_text()) db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) assert db.dags - # assert not db.import_errors + assert not db.import_errors return db From ab8530329f5021c44c6a7fe6a25adf6daf835d2a Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 28 May 2025 13:44:44 +0530 Subject: [PATCH 12/12] Use monkeypath --- tests/utils.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/tests/utils.py b/tests/utils.py index 3c815cf6cc..f75a155afb 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -32,14 +32,11 @@ def run_dag(dag: DAG, conn_file_path: str | None = None) -> DagRun: def test_dag(dag, conn_file_path: str | None = None, custom_tester: bool = False) -> DagRun: - dr = None if custom_tester: - dr = test_old_dag(dag, conn_file_path) - assert dr.state == DagRunState.SUCCESS, f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " + return test_old_dag(dag, conn_file_path) elif AIRFLOW_VERSION >= version.Version("2.5"): if AIRFLOW_VERSION not in (Version("2.10.0"), Version("2.10.1"), Version("2.10.2")): - dr = dag.test() - assert dr.state == DagRunState.SUCCESS, f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " + return dag.test() else: # This is a work around until we fix the issue in Airflow: # https://github.com/apache/airflow/issues/42495 @@ -52,19 +49,13 @@ def test_dag(dag, conn_file_path: str | None = None, custom_tester: bool = False FAILED tests/test_example_dags.py::test_example_dag[user_defined_profile] """ try: - dr = dag.test() - assert ( - dr.state == DagRunState.SUCCESS - ), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " + dag.test() except sqlalchemy.exc.PendingRollbackError: warnings.warn( "Early versions of Airflow 2.10 have issues when running the test command with DatasetAlias / Datasets" ) else: - dr = test_old_dag(dag, conn_file_path) - assert dr.state == DagRunState.SUCCESS, f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " - - return dr + return test_old_dag(dag, conn_file_path) # DAG.test() was added in Airflow version 2.5.0. And to test on older Airflow versions, we need to copy the