From 9d1580c972cf925755a33947252225fea12608f4 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 5 May 2025 17:18:53 +0530 Subject: [PATCH 01/11] Run listener tests --- .github/workflows/test.yml | 2 +- tests/listeners/test_dag_run_listener.py | 53 ++++++++++++++++++------ 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 93d4bebdbf..20bd6871f4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main] + branches: [main, fix_af3_test] pull_request_target: # Also run on pull requests originated from forks branches: [main] diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index 77e30f8466..dc9cbe4c76 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -1,6 +1,6 @@ import logging import uuid -from datetime import datetime +from datetime import datetime, timedelta, timezone from pathlib import Path from unittest.mock import patch @@ -8,17 +8,21 @@ from airflow import __version__ as airflow_version from airflow.models import DAG from airflow.utils.state import State +from airflow.utils.types import DagRunTriggeredByType, DagRunType from packaging import version from cosmos import DbtRunLocalOperator, ProfileConfig, ProjectConfig from cosmos.airflow.dag import DbtDag from cosmos.airflow.task_group import DbtTaskGroup +from cosmos.constants import _AIRFLOW3_MAJOR_VERSION from cosmos.listeners.dag_run_listener import on_dag_run_failed, on_dag_run_success, total_cosmos_tasks from cosmos.profiles import PostgresUserPasswordProfileMapping DBT_ROOT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt" DBT_PROJECT_NAME = "jaffle_shop" +AIRFLOW_VERSION_MAJOR = version.parse(airflow_version).major + profile_config = ProfileConfig( profile_name="default", target_name="dev", @@ -79,8 +83,8 @@ def test_not_cosmos_dag(): assert total_cosmos_tasks(dag) == 0 -# TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1703 -@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") +# # TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1703 +# @pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.integration @patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") def test_on_dag_run_success(mock_emit_usage_metrics_if_enabled, caplog): @@ -95,10 +99,23 @@ def test_on_dag_run_success(mock_emit_usage_metrics_if_enabled, caplog): dag_id="basic_cosmos_dag", ) run_id = str(uuid.uuid1()) - dag_run = dag.create_dagrun( - state=State.NONE, - run_id=run_id, - ) + + run_after = datetime.now(timezone.utc) - timedelta(seconds=1) + if AIRFLOW_VERSION_MAJOR < _AIRFLOW3_MAJOR_VERSION: + # Airflow 2 + dag_run = dag.create_dagrun( + state=State.NONE, + run_id=run_id, + ) + else: + # Airflow 3 + dag_run = dag.create_dagrun( + state=State.NONE, + run_id=run_id, + run_after=run_after, + run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.TIMETABLE, + ) on_dag_run_success(dag_run, msg="test success") assert "Running on_dag_run_success" in caplog.text @@ -107,7 +124,7 @@ def test_on_dag_run_success(mock_emit_usage_metrics_if_enabled, caplog): # TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1703 -@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") +# @pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.integration @patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog): @@ -122,10 +139,22 @@ def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog): dag_id="basic_cosmos_dag", ) run_id = str(uuid.uuid1()) - dag_run = dag.create_dagrun( - state=State.FAILED, - run_id=run_id, - ) + run_after = datetime.now(timezone.utc) - timedelta(seconds=1) + if AIRFLOW_VERSION_MAJOR < _AIRFLOW3_MAJOR_VERSION: + # Airflow 2 + dag_run = dag.create_dagrun( + state=State.NONE, + run_id=run_id, + ) + else: + # Airflow 3 + dag_run = dag.create_dagrun( + state=State.NONE, + run_id=run_id, + run_after=run_after, + run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.TIMETABLE, + ) on_dag_run_failed(dag_run, msg="test failed") assert "Running on_dag_run_failed" in caplog.text From 6a30ea22d7e3e483a8a4797b1ce19da3beb3674f Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 5 May 2025 17:21:57 +0530 Subject: [PATCH 02/11] Run listener tests --- tests/listeners/test_dag_run_listener.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index dc9cbe4c76..9bff3b2611 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -8,7 +8,6 @@ from airflow import __version__ as airflow_version from airflow.models import DAG from airflow.utils.state import State -from airflow.utils.types import DagRunTriggeredByType, DagRunType from packaging import version from cosmos import DbtRunLocalOperator, ProfileConfig, ProjectConfig @@ -109,6 +108,8 @@ def test_on_dag_run_success(mock_emit_usage_metrics_if_enabled, caplog): ) else: # Airflow 3 + from airflow.utils.types import DagRunTriggeredByType, DagRunType + dag_run = dag.create_dagrun( state=State.NONE, run_id=run_id, @@ -148,6 +149,8 @@ def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog): ) else: # Airflow 3 + from airflow.utils.types import DagRunTriggeredByType, DagRunType + dag_run = dag.create_dagrun( state=State.NONE, run_id=run_id, From 661b20b26895cd65c9b204f1d992611773248598 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 5 May 2025 19:50:35 +0530 Subject: [PATCH 03/11] Adjust dagrun hash --- cosmos/listeners/dag_run_listener.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index 0314c3474d..f816a568ac 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -1,12 +1,17 @@ from __future__ import annotations +from airflow import __version__ as airflow_version from airflow.listeners import hookimpl from airflow.models.dag import DAG from airflow.models.dagrun import DagRun +from packaging import version from cosmos import telemetry +from cosmos.constants import _AIRFLOW3_MAJOR_VERSION from cosmos.log import get_logger +AIRFLOW_VERSION_MAJOR = version.parse(airflow_version).major + logger = get_logger(__name__) @@ -50,8 +55,13 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.debug("The DAG does not use Cosmos") return + if AIRFLOW_VERSION_MAJOR < _AIRFLOW3_MAJOR_VERSION: + dag_hash = dag_run.dag_hash + else: + dag_hash = dag_run.__hash__() + additional_telemetry_metrics = { - "dag_hash": dag_run.dag_hash, + "dag_hash": dag_hash, "status": EventStatus.SUCCESS, "task_count": len(serialized_dag.task_ids), "cosmos_task_count": total_cosmos_tasks(serialized_dag), @@ -73,8 +83,13 @@ def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: logger.debug("The DAG does not use Cosmos") return + if AIRFLOW_VERSION_MAJOR < _AIRFLOW3_MAJOR_VERSION: + dag_hash = dag_run.dag_hash + else: + dag_hash = dag_run.__hash__() + additional_telemetry_metrics = { - "dag_hash": dag_run.dag_hash, + "dag_hash": dag_hash, "status": EventStatus.FAILED, "task_count": len(serialized_dag.task_ids), "cosmos_task_count": total_cosmos_tasks(serialized_dag), From 3fce7b820bd4ff3126e94b0704bbfa3733eecf5c Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Mon, 5 May 2025 21:03:20 +0530 Subject: [PATCH 04/11] Update .github/workflows/test.yml --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 20bd6871f4..93d4bebdbf 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main, fix_af3_test] + branches: [main] pull_request_target: # Also run on pull requests originated from forks branches: [main] From 515eed0f45660469d26e09090eb7828e8a8494e5 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Mon, 5 May 2025 21:04:30 +0530 Subject: [PATCH 05/11] Update tests/listeners/test_dag_run_listener.py --- tests/listeners/test_dag_run_listener.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index 9bff3b2611..0792c2f821 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -82,8 +82,6 @@ def test_not_cosmos_dag(): assert total_cosmos_tasks(dag) == 0 -# # TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1703 -# @pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.integration @patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") def test_on_dag_run_success(mock_emit_usage_metrics_if_enabled, caplog): From 56de4508e463dd69fb986b9e23642ce129a53c02 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Mon, 5 May 2025 21:05:06 +0530 Subject: [PATCH 06/11] Update tests/listeners/test_dag_run_listener.py --- tests/listeners/test_dag_run_listener.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index 0792c2f821..cdd4dbdf65 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -123,7 +123,6 @@ def test_on_dag_run_success(mock_emit_usage_metrics_if_enabled, caplog): # TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1703 -# @pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.integration @patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog): From 7f8d9235b6761f2c375983a029cec1d5909abd56 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Mon, 5 May 2025 21:05:31 +0530 Subject: [PATCH 07/11] Update tests/listeners/test_dag_run_listener.py --- tests/listeners/test_dag_run_listener.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index cdd4dbdf65..456dea9c1a 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -122,7 +122,6 @@ def test_on_dag_run_success(mock_emit_usage_metrics_if_enabled, caplog): assert mock_emit_usage_metrics_if_enabled.call_count == 1 -# TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1703 @pytest.mark.integration @patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog): From dae30c1bab6d6b1e308b9167a1a24a17d48d4df3 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 5 May 2025 23:29:44 +0530 Subject: [PATCH 08/11] Adjust dagrun hash --- cosmos/listeners/dag_run_listener.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index f816a568ac..ff1f815f24 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -58,7 +58,7 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: if AIRFLOW_VERSION_MAJOR < _AIRFLOW3_MAJOR_VERSION: dag_hash = dag_run.dag_hash else: - dag_hash = dag_run.__hash__() + dag_hash = dag_run.dag.__hash__() additional_telemetry_metrics = { "dag_hash": dag_hash, @@ -86,7 +86,7 @@ def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: if AIRFLOW_VERSION_MAJOR < _AIRFLOW3_MAJOR_VERSION: dag_hash = dag_run.dag_hash else: - dag_hash = dag_run.__hash__() + dag_hash = dag_run.dag.__hash__() additional_telemetry_metrics = { "dag_hash": dag_hash, From 85ce589ce2a70bd7d6396f2507179690fa878dee Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 6 May 2025 11:18:19 +0530 Subject: [PATCH 09/11] Adjust dagrun hash --- cosmos/listeners/dag_run_listener.py | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index ff1f815f24..f824df5a17 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -1,17 +1,13 @@ from __future__ import annotations -from airflow import __version__ as airflow_version from airflow.listeners import hookimpl from airflow.models.dag import DAG from airflow.models.dagrun import DagRun -from packaging import version +from airflow.utils.hashlib_wrapper import md5 from cosmos import telemetry -from cosmos.constants import _AIRFLOW3_MAJOR_VERSION from cosmos.log import get_logger -AIRFLOW_VERSION_MAJOR = version.parse(airflow_version).major - logger = get_logger(__name__) @@ -55,13 +51,8 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.debug("The DAG does not use Cosmos") return - if AIRFLOW_VERSION_MAJOR < _AIRFLOW3_MAJOR_VERSION: - dag_hash = dag_run.dag_hash - else: - dag_hash = dag_run.dag.__hash__() - additional_telemetry_metrics = { - "dag_hash": dag_hash, + "dag_hash": md5(dag_run.dag_id.encode("utf-8")).hexdigest(), "status": EventStatus.SUCCESS, "task_count": len(serialized_dag.task_ids), "cosmos_task_count": total_cosmos_tasks(serialized_dag), @@ -83,13 +74,8 @@ def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: logger.debug("The DAG does not use Cosmos") return - if AIRFLOW_VERSION_MAJOR < _AIRFLOW3_MAJOR_VERSION: - dag_hash = dag_run.dag_hash - else: - dag_hash = dag_run.dag.__hash__() - additional_telemetry_metrics = { - "dag_hash": dag_hash, + "dag_hash": md5(dag_run.dag_id.encode("utf-8")).hexdigest(), "status": EventStatus.FAILED, "task_count": len(serialized_dag.task_ids), "cosmos_task_count": total_cosmos_tasks(serialized_dag), From 0ed073b31a555784eb813d4069f64919c0412676 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 6 May 2025 11:31:57 +0530 Subject: [PATCH 10/11] Adjust dagrun hash --- cosmos/listeners/dag_run_listener.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index f824df5a17..c34bfdd253 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -1,9 +1,10 @@ from __future__ import annotations +import hashlib + from airflow.listeners import hookimpl from airflow.models.dag import DAG from airflow.models.dagrun import DagRun -from airflow.utils.hashlib_wrapper import md5 from cosmos import telemetry from cosmos.log import get_logger @@ -52,7 +53,7 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: return additional_telemetry_metrics = { - "dag_hash": md5(dag_run.dag_id.encode("utf-8")).hexdigest(), + "dag_hash": hashlib.md5(dag_run.dag_id.encode("utf-8")).hexdigest()[:8], "status": EventStatus.SUCCESS, "task_count": len(serialized_dag.task_ids), "cosmos_task_count": total_cosmos_tasks(serialized_dag), @@ -75,7 +76,7 @@ def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: return additional_telemetry_metrics = { - "dag_hash": md5(dag_run.dag_id.encode("utf-8")).hexdigest(), + "dag_hash": hashlib.md5(dag_run.dag_id.encode("utf-8")).hexdigest()[:8], "status": EventStatus.FAILED, "task_count": len(serialized_dag.task_ids), "cosmos_task_count": total_cosmos_tasks(serialized_dag), From 8934c5344ff8b05c1ec25f165ab57173e42dc003 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 6 May 2025 15:17:10 +0530 Subject: [PATCH 11/11] Restore AF2 prop --- cosmos/listeners/dag_run_listener.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py index c34bfdd253..2abd21e7d1 100644 --- a/cosmos/listeners/dag_run_listener.py +++ b/cosmos/listeners/dag_run_listener.py @@ -2,13 +2,18 @@ import hashlib +from airflow import __version__ as airflow_version from airflow.listeners import hookimpl from airflow.models.dag import DAG from airflow.models.dagrun import DagRun +from packaging import version from cosmos import telemetry +from cosmos.constants import _AIRFLOW3_MAJOR_VERSION from cosmos.log import get_logger +AIRFLOW_VERSION_MAJOR = version.parse(airflow_version).major + logger = get_logger(__name__) @@ -52,8 +57,13 @@ def on_dag_run_success(dag_run: DagRun, msg: str) -> None: logger.debug("The DAG does not use Cosmos") return + if AIRFLOW_VERSION_MAJOR < _AIRFLOW3_MAJOR_VERSION: + dag_hash = dag_run.dag_hash + else: + dag_hash = hashlib.md5(dag_run.dag_id.encode("utf-8")).hexdigest() + additional_telemetry_metrics = { - "dag_hash": hashlib.md5(dag_run.dag_id.encode("utf-8")).hexdigest()[:8], + "dag_hash": dag_hash, "status": EventStatus.SUCCESS, "task_count": len(serialized_dag.task_ids), "cosmos_task_count": total_cosmos_tasks(serialized_dag), @@ -75,8 +85,13 @@ def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: logger.debug("The DAG does not use Cosmos") return + if AIRFLOW_VERSION_MAJOR < _AIRFLOW3_MAJOR_VERSION: + dag_hash = dag_run.dag_hash + else: + dag_hash = hashlib.md5(dag_run.dag_id.encode("utf-8")).hexdigest() + additional_telemetry_metrics = { - "dag_hash": hashlib.md5(dag_run.dag_id.encode("utf-8")).hexdigest()[:8], + "dag_hash": dag_hash, "status": EventStatus.FAILED, "task_count": len(serialized_dag.task_ids), "cosmos_task_count": total_cosmos_tasks(serialized_dag),