From b68c77bbb98c09c280a7593b5e082565edd81530 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 28 Mar 2025 12:25:03 +0530 Subject: [PATCH 01/33] Add Airflow 3 tests matrix entry --- .github/workflows/test.yml | 6 +++--- pyproject.toml | 2 +- scripts/test/pre-install-airflow.sh | 19 ++++++++++++++++--- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d14ae5de72..ae9ba15771 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main] + branches: [main, add-airflow-3-test-matrix-entry] pull_request_target: # Also run on pull requests originated from forks branches: [main] @@ -39,7 +39,7 @@ jobs: strategy: matrix: python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] - airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] + airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "3.0"] dbt-version: ["1.9"] exclude: - python-version: "3.11" @@ -106,7 +106,7 @@ jobs: strategy: matrix: python-version: ["3.8", "3.9", "3.10", "3.11"] - airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] + airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "3.0"] dbt-version: ["1.9"] exclude: - python-version: "3.11" diff --git a/pyproject.toml b/pyproject.toml index 24a00c4962..41381bc7b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -161,7 +161,7 @@ pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} [[tool.hatch.envs.tests.matrix]] python = ["3.8", "3.9", "3.10", "3.11", "3.12"] -airflow = ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] +airflow = ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "3.0"] dbt = ["1.5", "1.6", "1.7", "1.8", "1.9"] [tool.hatch.envs.tests.overrides] diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index edc87c9dc0..2974b69e40 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -6,6 +6,7 @@ set -v AIRFLOW_VERSION="$1" PYTHON_VERSION="$2" + # Use this to set the appropriate Python environment in Github Actions, # while also not assuming --system when running locally. if [ "$GITHUB_ACTIONS" = "true" ] && [ -z "${VIRTUAL_ENV}" ]; then @@ -26,9 +27,13 @@ mv /tmp/constraint.txt.tmp /tmp/constraint.txt pip install uv uv pip install pip --upgrade -uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt -uv pip install apache-airflow-providers-docker --constraint /tmp/constraint.txt -uv pip install apache-airflow-providers-postgres --constraint /tmp/constraint.txt +if [ "$AIRFLOW_VERSION" = "3.0" ] ; then + uv pip install -r "$(pwd)/../airflow3/requirements.txt" +else + uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt + uv pip install apache-airflow-providers-docker --constraint /tmp/constraint.txt + uv pip install apache-airflow-providers-postgres --constraint /tmp/constraint.txt +fi # Due to issue https://github.com/fsspec/gcsfs/issues/664 uv pip install "gcsfs<2025.3.0" @@ -44,6 +49,14 @@ elif [ "$AIRFLOW_VERSION" = "2.7" ] ; then uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt uv pip install "apache-airflow-providers-google>10.11" "apache-airflow==$AIRFLOW_VERSION" uv pip install apache-airflow-providers-microsoft-azure --constraint /tmp/constraint.txt +elif [ "$AIRFLOW_VERSION" = "3.0"] ; then + uv pip install "apache-airflow-providers-docker" + uv pip install "apache-airflow-providers-postgres" + uv pip install "apache-airflow-providers-amazon[s3fs]" + uv pip install "apache-airflow-providers-cncf-kubernetes" + uv pip install "apache-airflow-providers-google>=10.17.0" + uv pip install "apache-airflow-providers-microsoft-azure>=8.5.0" + uv pip install "apache-airflow==3.0.0b2" --find-links https://dist.apache.org/repos/dist/dev/airflow/3.0.0b2/ else uv pip install "apache-airflow-providers-amazon[s3fs]" --constraint /tmp/constraint.txt uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt From c06e538358c5f4efa237558aa0e6a15f96f952ea Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 23 Apr 2025 12:41:13 +0530 Subject: [PATCH 02/33] Try generic install for AF3 --- scripts/test/pre-install-airflow.sh | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 2974b69e40..7192854d5c 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -27,13 +27,11 @@ mv /tmp/constraint.txt.tmp /tmp/constraint.txt pip install uv uv pip install pip --upgrade -if [ "$AIRFLOW_VERSION" = "3.0" ] ; then - uv pip install -r "$(pwd)/../airflow3/requirements.txt" -else - uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt - uv pip install apache-airflow-providers-docker --constraint /tmp/constraint.txt - uv pip install apache-airflow-providers-postgres --constraint /tmp/constraint.txt -fi + +uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt +uv pip install apache-airflow-providers-docker --constraint /tmp/constraint.txt +uv pip install apache-airflow-providers-postgres --constraint /tmp/constraint.txt + # Due to issue https://github.com/fsspec/gcsfs/issues/664 uv pip install "gcsfs<2025.3.0" @@ -49,14 +47,6 @@ elif [ "$AIRFLOW_VERSION" = "2.7" ] ; then uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt uv pip install "apache-airflow-providers-google>10.11" "apache-airflow==$AIRFLOW_VERSION" uv pip install apache-airflow-providers-microsoft-azure --constraint /tmp/constraint.txt -elif [ "$AIRFLOW_VERSION" = "3.0"] ; then - uv pip install "apache-airflow-providers-docker" - uv pip install "apache-airflow-providers-postgres" - uv pip install "apache-airflow-providers-amazon[s3fs]" - uv pip install "apache-airflow-providers-cncf-kubernetes" - uv pip install "apache-airflow-providers-google>=10.17.0" - uv pip install "apache-airflow-providers-microsoft-azure>=8.5.0" - uv pip install "apache-airflow==3.0.0b2" --find-links https://dist.apache.org/repos/dist/dev/airflow/3.0.0b2/ else uv pip install "apache-airflow-providers-amazon[s3fs]" --constraint /tmp/constraint.txt uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt From 5ff9779919ea0986c74b61398f386ee94e6220d5 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 23 Apr 2025 12:54:55 +0530 Subject: [PATCH 03/33] Skip python 3.10, airflow 3 --- .github/workflows/test.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ae9ba15771..1a3e65ac07 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -66,6 +66,8 @@ jobs: airflow-version: "2.7" - python-version: "3.9" airflow-version: "2.7" + - python-version: "3.8" + airflow-version: "3.0" steps: - uses: actions/checkout@v4 with: From 019709636bc7f71346bc6d46e18a351410537d19 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 23 Apr 2025 13:03:40 +0530 Subject: [PATCH 04/33] Skip python 3.10, airflow 3 for ITs --- .github/workflows/test.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1a3e65ac07..0383538882 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -121,6 +121,8 @@ jobs: airflow-version: "2.7" - python-version: "3.9" airflow-version: "2.7" + - python-version: "3.8" + airflow-version: "3.0" services: postgres: image: postgres From 91e78bb6f0722156b17a94db2051f628d793feb5 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 25 Apr 2025 15:16:00 +0530 Subject: [PATCH 05/33] push progress --- tests/dbt/parser/test_output.py | 6 +- tests/operators/test_local.py | 6 +- tests/test_cache.py | 111 +++++++++++++++++++++++--------- 3 files changed, 90 insertions(+), 33 deletions(-) diff --git a/tests/dbt/parser/test_output.py b/tests/dbt/parser/test_output.py index fd19f8bbcc..8c8fb04b4b 100644 --- a/tests/dbt/parser/test_output.py +++ b/tests/dbt/parser/test_output.py @@ -2,7 +2,11 @@ from unittest.mock import MagicMock import pytest -from airflow.hooks.subprocess import SubprocessResult + +try: # For Airflow 3 + from airflow.providers.standard.hooks.subprocess import SubprocessResult +except ImportError: # For Airflow 2 + from airflow.hooks.subprocess import SubprocessResult from cosmos.dbt.parser.output import ( extract_dbt_runner_issues, diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index a844e3427c..6bc627fbfe 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -11,7 +11,11 @@ from airflow import DAG from airflow import __version__ as airflow_version from airflow.exceptions import AirflowException, AirflowSkipException -from airflow.hooks.subprocess import SubprocessResult + +try: # For Airflow 3 + from airflow.providers.standard.hooks.subprocess import SubprocessResult +except ImportError: # For Airflow 2 + from airflow.hooks.subprocess import SubprocessResult from airflow.models.taskinstance import TaskInstance from airflow.utils.context import Context from packaging import version diff --git a/tests/test_cache.py b/tests/test_cache.py index c4c7e45c90..2eb825185e 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -9,7 +9,8 @@ import pytest from airflow import DAG from airflow.models import DagRun, Variable -from airflow.utils.db import create_session + +# from airflow.utils.db import create_session from airflow.utils.task_group import TaskGroup from cosmos.cache import ( @@ -167,51 +168,99 @@ def test_update_partial_parse_cache(mock_get_partial_parse_path, mock_copyfile): @pytest.fixture def vars_session(): - with create_session() as session: - var1 = Variable(key="cosmos_cache__dag_a", val='{"dag_id": "dag_a"}') - var2 = Variable(key="cosmos_cache__dag_b", val='{"dag_id": "dag_b"}') - var3 = Variable(key="cosmos_cache__dag_c__task_group_1", val='{"dag_id": "dag_c"}') - - dag_run_a = DagRun( + # TODO: Revisit approach here + mock_variables = { + "cosmos_cache__dag_a": '{"dag_id": "dag_a"}', + "cosmos_cache__dag_b": '{"dag_id": "dag_b"}', + "cosmos_cache__dag_c__task_group_1": '{"dag_id": "dag_c"}', + } + + def mock_variable_get(key, default_var=None, deserialize_json=False): + val = mock_variables.get(key, default_var) + if deserialize_json and isinstance(val, str): + import json + + return json.loads(val) + return val + + # Mock DagRun access + mock_dagruns = [ + DagRun( dag_id="dag_a", run_id="dag_a_run_a_week_ago", execution_date=datetime.now(timezone.utc) - timedelta(days=7), state="success", run_type="manual", - ) - dag_run_b = DagRun( + ), + DagRun( dag_id="dag_b", run_id="dag_b_run_yesterday", execution_date=datetime.now(timezone.utc) - timedelta(days=1), state="failed", run_type="manual", - ) - dag_run_c = DagRun( + ), + DagRun( dag_id="dag_c", run_id="dag_c_run_on_hour_ago", execution_date=datetime.now(timezone.utc) - timedelta(hours=1), state="running", run_type="manual", - ) - - session.add(var1) - session.add(var2) - session.add(var3) - session.add(dag_run_a) - session.add(dag_run_b) - session.add(dag_run_c) - session.commit() - - yield session - - session.query(Variable).filter_by(key="cosmos_cache__dag_a").delete() - session.query(Variable).filter_by(key="cosmos_cache__dag_b").delete() - session.query(Variable).filter_by(key="cosmos_cache__dag_c__task_group_1").delete() - - session.query(DagRun).filter_by(dag_id="dag_a", run_id="dag_a_run_a_week_ago").delete() - session.query(DagRun).filter_by(dag_id="dag_b", run_id="dag_b_run_yesterday").delete() - session.query(DagRun).filter_by(dag_id="dag_c", run_id="dag_c_run_on_hour_ago").delete() - session.commit() + ), + ] + + def mock_get_dag_runs(*args, **kwargs): + return mock_dagruns + + with patch("airflow.models.variable.Variable.get", side_effect=mock_variable_get), patch( + "airflow.models.dagrun.DagRun.get_dag_runs", side_effect=mock_get_dag_runs + ): + yield + + # with create_session() as session: + # var1 = Variable(key="cosmos_cache__dag_a", val='{"dag_id": "dag_a"}') + # var2 = Variable(key="cosmos_cache__dag_b", val='{"dag_id": "dag_b"}') + # var3 = Variable(key="cosmos_cache__dag_c__task_group_1", val='{"dag_id": "dag_c"}') + # + # dag_run_a = DagRun( + # dag_id="dag_a", + # run_id="dag_a_run_a_week_ago", + # execution_date=datetime.now(timezone.utc) - timedelta(days=7), + # state="success", + # run_type="manual", + # ) + # dag_run_b = DagRun( + # dag_id="dag_b", + # run_id="dag_b_run_yesterday", + # execution_date=datetime.now(timezone.utc) - timedelta(days=1), + # state="failed", + # run_type="manual", + # ) + # dag_run_c = DagRun( + # dag_id="dag_c", + # run_id="dag_c_run_on_hour_ago", + # execution_date=datetime.now(timezone.utc) - timedelta(hours=1), + # state="running", + # run_type="manual", + # ) + # + # session.add(var1) + # session.add(var2) + # session.add(var3) + # session.add(dag_run_a) + # session.add(dag_run_b) + # session.add(dag_run_c) + # session.commit() + # + # yield session + # + # session.query(Variable).filter_by(key="cosmos_cache__dag_a").delete() + # session.query(Variable).filter_by(key="cosmos_cache__dag_b").delete() + # session.query(Variable).filter_by(key="cosmos_cache__dag_c__task_group_1").delete() + # + # session.query(DagRun).filter_by(dag_id="dag_a", run_id="dag_a_run_a_week_ago").delete() + # session.query(DagRun).filter_by(dag_id="dag_b", run_id="dag_b_run_yesterday").delete() + # session.query(DagRun).filter_by(dag_id="dag_c", run_id="dag_c_run_on_hour_ago").delete() + # session.commit() @pytest.mark.integration From f83d2216e53357893e706c0acd8b6c54840feb2b Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 25 Apr 2025 15:56:11 +0530 Subject: [PATCH 06/33] Update .github/workflows/test.yml --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0383538882..8bfe981ae8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -4,7 +4,7 @@ on: push: # Run on pushes to the default branch branches: [main, add-airflow-3-test-matrix-entry] pull_request_target: # Also run on pull requests originated from forks - branches: [main] + branches: [] concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} From f165827c69cb78597768a9f4a07ddab1ace7ba4b Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 25 Apr 2025 15:57:14 +0530 Subject: [PATCH 07/33] Update .github/workflows/test.yml --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 8bfe981ae8..0383538882 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -4,7 +4,7 @@ on: push: # Run on pushes to the default branch branches: [main, add-airflow-3-test-matrix-entry] pull_request_target: # Also run on pull requests originated from forks - branches: [] + branches: [main] concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} From 7a81104423ff2b714c688b7abf07448054870348 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 25 Apr 2025 15:57:46 +0530 Subject: [PATCH 08/33] Update .github/workflows/test.yml --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0383538882..735671dfb4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -3,8 +3,8 @@ name: test on: push: # Run on pushes to the default branch branches: [main, add-airflow-3-test-matrix-entry] - pull_request_target: # Also run on pull requests originated from forks - branches: [main] + # pull_request_target: # Also run on pull requests originated from forks + # branches: [main] concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} From 448279a63caa36e72258c3320ec380b6f9b2b5e9 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 25 Apr 2025 15:58:49 +0530 Subject: [PATCH 09/33] Update .github/workflows/test.yml --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 735671dfb4..7244603eb6 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -3,8 +3,8 @@ name: test on: push: # Run on pushes to the default branch branches: [main, add-airflow-3-test-matrix-entry] - # pull_request_target: # Also run on pull requests originated from forks - # branches: [main] + pull_request_target: # Also run on pull requests originated from forks + branches: [main] concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} From 15bade9f63b6c2ea4068cf9335b24d1075c6f70e Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 25 Apr 2025 15:59:21 +0530 Subject: [PATCH 10/33] Update .github/workflows/test.yml --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7244603eb6..0383538882 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -3,8 +3,8 @@ name: test on: push: # Run on pushes to the default branch branches: [main, add-airflow-3-test-matrix-entry] - pull_request_target: # Also run on pull requests originated from forks - branches: [main] + pull_request_target: # Also run on pull requests originated from forks + branches: [main] concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} From f8ffa21bbfb3d42a4766774ae4cb1f43ae643289 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 11:06:15 +0530 Subject: [PATCH 11/33] Use airflow db migrate for AF3 db setup --- scripts/test/integration-dbt-1-5-4.sh | 21 +++++++++++++++++++-- scripts/test/integration-dbt-async.sh | 16 +++++++++++++--- scripts/test/integration-setup.sh | 12 +++++++++++- scripts/test/performance-setup.sh | 22 +++++++++++++++++++--- 4 files changed, 62 insertions(+), 9 deletions(-) diff --git a/scripts/test/integration-dbt-1-5-4.sh b/scripts/test/integration-dbt-1-5-4.sh index 3dde76373b..0876f1b84c 100644 --- a/scripts/test/integration-dbt-1-5-4.sh +++ b/scripts/test/integration-dbt-1-5-4.sh @@ -1,8 +1,25 @@ +#!/bin/bash + +set -v +set -x +set -e + + pip uninstall dbt-adapters dbt-common dbt-core dbt-extractor dbt-postgres dbt-semantic-interfaces -y pip install dbt-postgres==1.5.4 dbt-databricks==1.5.4 dbt-bigquery==1.5.4 export SOURCE_RENDERING_BEHAVIOR=all -rm -rf airflow.*; \ -airflow db init; \ +rm -rf airflow.* + +AIRFLOW_VERSION=$(airflow version) +AIRFLOW_MAJOR_VERSION=$(echo "$AIRFLOW_VERSION" | cut -d. -f1) +if [ "$AIRFLOW_MAJOR_VERSION" -ge 3 ]; then + echo "Detected Airflow $AIRFLOW_VERSION. Running 'airflow db migrate'..." + airflow db migrate +else + echo "Detected Airflow $AIRFLOW_VERSION. Running 'airflow db init'..." + airflow db init +fi + pytest -vv \ --cov=cosmos \ --cov-report=term-missing \ diff --git a/scripts/test/integration-dbt-async.sh b/scripts/test/integration-dbt-async.sh index b4ff9e9195..d4f1d49c08 100644 --- a/scripts/test/integration-dbt-async.sh +++ b/scripts/test/integration-dbt-async.sh @@ -1,5 +1,6 @@ #!/bin/bash +set -v set -x set -e @@ -11,8 +12,17 @@ echo "$DBT_VERSION" pip uninstall dbt-adapters dbt-common dbt-core dbt-extractor dbt-postgres dbt-semantic-interfaces -y pip install "dbt-postgres==$DBT_VERSION" "dbt-databricks==$DBT_VERSION" "dbt-bigquery==$DBT_VERSION" export SOURCE_RENDERING_BEHAVIOR=all -rm -rf airflow.*; \ -airflow db init; \ +rm -rf airflow.* + +AIRFLOW_VERSION=$(airflow version) +AIRFLOW_MAJOR_VERSION=$(echo "$AIRFLOW_VERSION" | cut -d. -f1) +if [ "$AIRFLOW_MAJOR_VERSION" -ge 3 ]; then + echo "Detected Airflow $AIRFLOW_VERSION. Running 'airflow db migrate'..." + airflow db migrate +else + echo "Detected Airflow $AIRFLOW_VERSION. Running 'airflow db init'..." + airflow db init +fi if [ "$DBT_VERSION" = "1.7" ]; then # Otherwise, we will get the following error: @@ -21,7 +31,7 @@ if [ "$DBT_VERSION" = "1.7" ]; then pip install protobuf==4.25.6 fi -rm -rf dbt/jaffle_shop/dbt_packages; +rm -rf dbt/jaffle_shop/dbt_packages pytest -vv \ --cov=cosmos \ --cov-report=term-missing \ diff --git a/scripts/test/integration-setup.sh b/scripts/test/integration-setup.sh index 5c2ab3af82..1954149619 100644 --- a/scripts/test/integration-setup.sh +++ b/scripts/test/integration-setup.sh @@ -10,5 +10,15 @@ pip uninstall -y 'dbt-bigquery' 'dbt-databricks' 'dbt-postgres' 'dbt-vertica' 'd rm -rf airflow.* pip freeze | grep airflow airflow db reset -y -airflow db init + +AIRFLOW_VERSION=$(airflow version) +AIRFLOW_MAJOR_VERSION=$(echo "$AIRFLOW_VERSION" | cut -d. -f1) +if [ "$AIRFLOW_MAJOR_VERSION" -ge 3 ]; then + echo "Detected Airflow $AIRFLOW_VERSION. Running 'airflow db migrate'..." + airflow db migrate +else + echo "Detected Airflow $AIRFLOW_VERSION. Running 'airflow db init'..." + airflow db init +fi + pip install 'dbt-databricks!=1.9.0' 'dbt-bigquery' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' diff --git a/scripts/test/performance-setup.sh b/scripts/test/performance-setup.sh index 7efb917c1e..417cffbcc2 100644 --- a/scripts/test/performance-setup.sh +++ b/scripts/test/performance-setup.sh @@ -1,4 +1,20 @@ -pip uninstall -y dbt-core dbt-sqlite dbt-postgres openlineage-airflow openlineage-integration-common; \ -rm -rf airflow.*; \ -airflow db init; \ +#!/bin/bash + +set -v +set -x +set -e + +pip uninstall -y dbt-core dbt-sqlite dbt-postgres openlineage-airflow openlineage-integration-common +rm -rf airflow.* + +AIRFLOW_VERSION=$(airflow version) +AIRFLOW_MAJOR_VERSION=$(echo "$AIRFLOW_VERSION" | cut -d. -f1) +if [ "$AIRFLOW_MAJOR_VERSION" -ge 3 ]; then + echo "Detected Airflow $AIRFLOW_VERSION. Running 'airflow db migrate'..." + airflow db migrate +else + echo "Detected Airflow $AIRFLOW_VERSION. Running 'airflow db init'..." + airflow db init +fi + pip install 'dbt-postgres' From 56839a3c7bc56cbf5ab6bddc48f8b21edee9668e Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 11:11:20 +0530 Subject: [PATCH 12/33] Disable fail-fast for Unit tests --- .github/workflows/test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0383538882..d4da099081 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -37,6 +37,7 @@ jobs: Run-Unit-Tests: runs-on: ubuntu-latest strategy: + fail-fast: false matrix: python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "3.0"] From 99b29af2091f6a0424e21569a7fd57f8925686ac Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 11:16:53 +0530 Subject: [PATCH 13/33] Use schedule param instead of schedule_interval --- dev/dags/example_tests_multiple_parents.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/dags/example_tests_multiple_parents.py b/dev/dags/example_tests_multiple_parents.py index fa0ec3f540..8dada5b740 100644 --- a/dev/dags/example_tests_multiple_parents.py +++ b/dev/dags/example_tests_multiple_parents.py @@ -9,7 +9,7 @@ from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig from cosmos.profiles import PostgresUserPasswordProfileMapping -DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" +DEFAULT_DBT_ROOT_PATH = Path(__file__).resolve().parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) profile_config = ProfileConfig( From 98852c2c407e9c02ec78a192f89ca2248811edbc Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 11:53:26 +0530 Subject: [PATCH 14/33] Disable fail-fast for Integration tests --- .github/workflows/test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d4da099081..222d8503f5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -107,6 +107,7 @@ jobs: needs: Authorize runs-on: ubuntu-latest strategy: + fail-fast: false matrix: python-version: ["3.8", "3.9", "3.10", "3.11"] airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "3.0"] From 47c20c9ccd445da014b8b0d63a0a29f83b5db4a8 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 11:54:15 +0530 Subject: [PATCH 15/33] Revert mocking --- tests/test_cache.py | 111 +++++++++++++------------------------------- 1 file changed, 31 insertions(+), 80 deletions(-) diff --git a/tests/test_cache.py b/tests/test_cache.py index 2eb825185e..c4c7e45c90 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -9,8 +9,7 @@ import pytest from airflow import DAG from airflow.models import DagRun, Variable - -# from airflow.utils.db import create_session +from airflow.utils.db import create_session from airflow.utils.task_group import TaskGroup from cosmos.cache import ( @@ -168,99 +167,51 @@ def test_update_partial_parse_cache(mock_get_partial_parse_path, mock_copyfile): @pytest.fixture def vars_session(): - # TODO: Revisit approach here - mock_variables = { - "cosmos_cache__dag_a": '{"dag_id": "dag_a"}', - "cosmos_cache__dag_b": '{"dag_id": "dag_b"}', - "cosmos_cache__dag_c__task_group_1": '{"dag_id": "dag_c"}', - } - - def mock_variable_get(key, default_var=None, deserialize_json=False): - val = mock_variables.get(key, default_var) - if deserialize_json and isinstance(val, str): - import json - - return json.loads(val) - return val - - # Mock DagRun access - mock_dagruns = [ - DagRun( + with create_session() as session: + var1 = Variable(key="cosmos_cache__dag_a", val='{"dag_id": "dag_a"}') + var2 = Variable(key="cosmos_cache__dag_b", val='{"dag_id": "dag_b"}') + var3 = Variable(key="cosmos_cache__dag_c__task_group_1", val='{"dag_id": "dag_c"}') + + dag_run_a = DagRun( dag_id="dag_a", run_id="dag_a_run_a_week_ago", execution_date=datetime.now(timezone.utc) - timedelta(days=7), state="success", run_type="manual", - ), - DagRun( + ) + dag_run_b = DagRun( dag_id="dag_b", run_id="dag_b_run_yesterday", execution_date=datetime.now(timezone.utc) - timedelta(days=1), state="failed", run_type="manual", - ), - DagRun( + ) + dag_run_c = DagRun( dag_id="dag_c", run_id="dag_c_run_on_hour_ago", execution_date=datetime.now(timezone.utc) - timedelta(hours=1), state="running", run_type="manual", - ), - ] - - def mock_get_dag_runs(*args, **kwargs): - return mock_dagruns - - with patch("airflow.models.variable.Variable.get", side_effect=mock_variable_get), patch( - "airflow.models.dagrun.DagRun.get_dag_runs", side_effect=mock_get_dag_runs - ): - yield - - # with create_session() as session: - # var1 = Variable(key="cosmos_cache__dag_a", val='{"dag_id": "dag_a"}') - # var2 = Variable(key="cosmos_cache__dag_b", val='{"dag_id": "dag_b"}') - # var3 = Variable(key="cosmos_cache__dag_c__task_group_1", val='{"dag_id": "dag_c"}') - # - # dag_run_a = DagRun( - # dag_id="dag_a", - # run_id="dag_a_run_a_week_ago", - # execution_date=datetime.now(timezone.utc) - timedelta(days=7), - # state="success", - # run_type="manual", - # ) - # dag_run_b = DagRun( - # dag_id="dag_b", - # run_id="dag_b_run_yesterday", - # execution_date=datetime.now(timezone.utc) - timedelta(days=1), - # state="failed", - # run_type="manual", - # ) - # dag_run_c = DagRun( - # dag_id="dag_c", - # run_id="dag_c_run_on_hour_ago", - # execution_date=datetime.now(timezone.utc) - timedelta(hours=1), - # state="running", - # run_type="manual", - # ) - # - # session.add(var1) - # session.add(var2) - # session.add(var3) - # session.add(dag_run_a) - # session.add(dag_run_b) - # session.add(dag_run_c) - # session.commit() - # - # yield session - # - # session.query(Variable).filter_by(key="cosmos_cache__dag_a").delete() - # session.query(Variable).filter_by(key="cosmos_cache__dag_b").delete() - # session.query(Variable).filter_by(key="cosmos_cache__dag_c__task_group_1").delete() - # - # session.query(DagRun).filter_by(dag_id="dag_a", run_id="dag_a_run_a_week_ago").delete() - # session.query(DagRun).filter_by(dag_id="dag_b", run_id="dag_b_run_yesterday").delete() - # session.query(DagRun).filter_by(dag_id="dag_c", run_id="dag_c_run_on_hour_ago").delete() - # session.commit() + ) + + session.add(var1) + session.add(var2) + session.add(var3) + session.add(dag_run_a) + session.add(dag_run_b) + session.add(dag_run_c) + session.commit() + + yield session + + session.query(Variable).filter_by(key="cosmos_cache__dag_a").delete() + session.query(Variable).filter_by(key="cosmos_cache__dag_b").delete() + session.query(Variable).filter_by(key="cosmos_cache__dag_c__task_group_1").delete() + + session.query(DagRun).filter_by(dag_id="dag_a", run_id="dag_a_run_a_week_ago").delete() + session.query(DagRun).filter_by(dag_id="dag_b", run_id="dag_b_run_yesterday").delete() + session.query(DagRun).filter_by(dag_id="dag_c", run_id="dag_c_run_on_hour_ago").delete() + session.commit() @pytest.mark.integration From cd8a8491a7ef141ab8ef12473578561c3bbf3bb5 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 12:01:33 +0530 Subject: [PATCH 16/33] Disable tests/test_cache.py test for AF3 --- tests/test_cache.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/test_cache.py b/tests/test_cache.py index c4c7e45c90..ca7155a4b3 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -1,3 +1,14 @@ +from airflow import __version__ as airflow_version +from packaging import version + +from cosmos.constants import _AIRFLOW3_MAJOR_VERSION + +# TODO: Enable and make tests functional in the module for AF 3. Disabling to due to DB access code in this module. +if version.parse(airflow_version).major >= _AIRFLOW3_MAJOR_VERSION: + import pytest + + pytest.skip("Skipping Cache tests on Airflow 3.0+", allow_module_level=True) + import logging import shutil import tempfile From fe349edc3cebe665f9aeb119be42a5cbecefc9d9 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 12:06:05 +0530 Subject: [PATCH 17/33] Add Af3 test matrix entry across tests in CI --- .github/workflows/test.yml | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 222d8503f5..238324b58b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -280,9 +280,10 @@ jobs: needs: Authorize runs-on: ubuntu-latest strategy: + fail-fast: false matrix: python-version: [ "3.11" ] - airflow-version: [ "2.8" ] + airflow-version: [ "2.8", "3.0" ] dbt-version: [ "1.5" ] services: postgres: @@ -352,7 +353,7 @@ jobs: fail-fast: false matrix: python-version: [ "3.11" ] - airflow-version: [ "2.10" ] + airflow-version: [ "2.10", "3.0" ] dbt-version: ["1.5", "1.6", "1.7", "1.8", "1.9"] services: postgres: @@ -419,9 +420,10 @@ jobs: needs: Authorize runs-on: ubuntu-latest strategy: + fail-fast: false matrix: python-version: ["3.11"] - airflow-version: ["2.8"] + airflow-version: ["2.8", "3.0" ] dbt-version: ["1.9"] num-models: [1, 10, 50, 100, 500] services: @@ -489,9 +491,10 @@ jobs: needs: Authorize runs-on: ubuntu-latest strategy: + fail-fast: false matrix: python-version: [ "3.12" ] - airflow-version: [ "2.10" ] + airflow-version: [ "2.10", "3.0" ] dbt-version: [ "1.9" ] steps: - uses: actions/checkout@v4 From 423ce6a495655ca19bbd4931a3eba2759978e051 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 13:23:40 +0530 Subject: [PATCH 18/33] Fix virtualenv unit tests for AF3 --- tests/operators/test_virtualenv.py | 31 ++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 5b1efabda9..0e2f0c7363 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -13,7 +13,7 @@ from packaging.version import Version from cosmos.config import ProfileConfig -from cosmos.constants import InvocationMode +from cosmos.constants import _AIRFLOW3_MAJOR_VERSION, InvocationMode from cosmos.exceptions import CosmosValueError from cosmos.operators.virtualenv import DbtCloneVirtualenvOperator, DbtVirtualenvBaseOperator from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -43,6 +43,13 @@ ) +def patch_execute_in_subprocess(func): + if AIRFLOW_VERSION.major >= _AIRFLOW3_MAJOR_VERSION: + return patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess")(func) + else: + return patch("airflow.utils.python_virtualenv.execute_in_subprocess")(func) + + class ConcreteDbtVirtualenvBaseOperator(DbtVirtualenvBaseOperator): @property @@ -51,7 +58,7 @@ def base_cmd(self) -> list[str]: @patch("cosmos.operators.local.AbstractDbtLocalBase._upload_sql_files") -@patch("airflow.utils.python_virtualenv.execute_in_subprocess") +@patch_execute_in_subprocess @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.calculate_openlineage_events_completes") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator._override_rtif") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.store_compiled_sql") @@ -100,11 +107,19 @@ def test_run_command_without_virtualenv_dir( assert dbt_cmd["command"][1] == "do-something" assert mock_execute.call_count == 2 virtualenv_call, pip_install_call = mock_execute.call_args_list - assert "python" in virtualenv_call[0][0][0] - assert virtualenv_call[0][0][1] == "-m" - assert virtualenv_call[0][0][2] == "virtualenv" - assert "pip" in pip_install_call[0][0][0] - assert pip_install_call[0][0][1] == "install" + if AIRFLOW_VERSION.major >= _AIRFLOW3_MAJOR_VERSION: + assert "python3" in virtualenv_call[0][0] + assert virtualenv_call[0][0][0] == "uv" + assert virtualenv_call[0][0][1] == "venv" + assert pip_install_call[0][0][0] == "uv" + assert pip_install_call[0][0][1] == "pip" + assert pip_install_call[0][0][2] == "install" + else: + assert "python" in virtualenv_call[0][0][0] + assert virtualenv_call[0][0][1] == "-m" + assert virtualenv_call[0][0][2] == "virtualenv" + assert "pip" in pip_install_call[0][0][0] + assert pip_install_call[0][0][1] == "install" cosmos_venv_dirs = [ f for f in os.listdir("/tmp") if os.path.isdir(os.path.join("/tmp", f)) and f.startswith("cosmos-venv") ] @@ -115,7 +130,7 @@ def test_run_command_without_virtualenv_dir( @patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._is_lock_available") @patch("time.sleep") @patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._release_venv_lock") -@patch("airflow.utils.python_virtualenv.execute_in_subprocess") +@patch_execute_in_subprocess @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.calculate_openlineage_events_completes") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator._override_rtif") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.store_compiled_sql") From 0aa0da93e83d148254f17e4c90be22578ebb73b9 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 16:17:43 +0530 Subject: [PATCH 19/33] Skip failing kubernetes unit tests, TODO issue #1702 --- tests/operators/test_kubernetes.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index 0562e28ce4..27ddd399db 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -151,6 +151,8 @@ def test_dbt_kubernetes_build_command(): ] +# TODO: Add Airflow 3 compatibility: https://github.com/astronomer/astronomer-cosmos/issues/1702 +@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.parametrize( "additional_kwargs,expected_results", [ @@ -214,6 +216,8 @@ def test_dbt_test_kubernetes_operator_constructor(additional_kwargs, expected_re assert test_operator.on_finish_action_original == expected_action +# TODO: Add Airflow 3 compatibility: https://github.com/astronomer/astronomer-cosmos/issues/1702 +@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.parametrize( "additional_kwargs,expected_results", [ @@ -304,6 +308,8 @@ def read_pod_logs(self, pod, container): return (log.encode("utf-8") for log in log_string.split("\n")) +# TODO: Add Airflow 3 compatibility: https://github.com/astronomer/astronomer-cosmos/issues/1702 +@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.skipif( not module_available, reason="Kubernetes module `airflow.providers.cncf.kubernetes.utils.pod_manager` not available" ) @@ -329,6 +335,8 @@ def cleanup(pod: str, remote_pod: str): test_operator._handle_warnings(context) +# TODO: Add Airflow 3 compatibility: https://github.com/astronomer/astronomer-cosmos/issues/1702 +@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.skipif( not module_available, reason="Kubernetes module `airflow.providers.cncf.kubernetes.utils.pod_manager` not available" ) From 29014621d92a8ee91d441a55ad47e572946a5564 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 16:27:34 +0530 Subject: [PATCH 20/33] Skip failing example DAGs test. Issue https://github.com/astronomer/astronomer-cosmos/issues/1706 --- tests/test_example_dags.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 2b4398b699..40f3490f7c 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -43,6 +43,17 @@ Version(version): MIN_VER_DAG_FILE[version] for version in sorted(MIN_VER_DAG_FILE, key=Version, reverse=True) } +# TODO: Make following dag tests compatible for AF3. Issue: https://github.com/astronomer/astronomer-cosmos/issues/1706 +AIRFLOW3_IGNORE_DAG_FILES = [ + "basic_cosmos_task_group_different_owners.py", + "cosmos_profile_mapping.py", + "user_defined_profile.py", + "example_taskflow.py", + "cosmos_manifest_example.py", + "example_cosmos_cleanup_dag.py", + "basic_cosmos_task_group.py", +] + @provide_session def get_session(session=None): @@ -77,11 +88,8 @@ def get_dag_bag() -> DagBag: if _PYTHON_VERSION < (3, 9): file.writelines(["example_duckdb_dag.py\n"]) - # Ignore Async DAG for dbt <=1.5 - if DBT_VERSION <= Version("1.5.0"): - file.writelines(["simple_dag_async.py\n"]) - if DBT_VERSION < Version("1.6.0"): + file.writelines(["simple_dag_async.py\n"]) file.writelines(["example_model_version.py\n"]) file.writelines(["example_operators.py\n"]) @@ -91,6 +99,10 @@ def get_dag_bag() -> DagBag: if AIRFLOW_VERSION < Version("2.8.0"): file.writelines("example_cosmos_dbt_build.py\n") + # TODO: Make following dag tests compatible for AF3. Issue: https://github.com/astronomer/astronomer-cosmos/issues/1706 + if AIRFLOW_VERSION.major == 3: + file.writelines([f"{dag_file}\n" for dag_file in AIRFLOW3_IGNORE_DAG_FILES]) + print(".airflowignore contents: ") print(AIRFLOW_IGNORE_FILE.read_text()) db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False) From c9d586b63a9482deb72cb7fb71a37bd58ea340c3 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 16:31:27 +0530 Subject: [PATCH 21/33] Skip AF3 failing per test. Issue #1710 --- tests/perf/test_performance.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/perf/test_performance.py b/tests/perf/test_performance.py index 995c33a740..50e07af742 100644 --- a/tests/perf/test_performance.py +++ b/tests/perf/test_performance.py @@ -12,6 +12,7 @@ from functools import lru_cache as cache import pytest +from airflow import __version__ as airflow_version from airflow.models.dagbag import DagBag from dbt.version import get_installed_version as get_dbt_version from packaging.version import Version @@ -87,6 +88,8 @@ def generate_project( model.unlink() +# TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1710 +@pytest.mark.skipif(Version(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.perf def test_perf_dag(): num_models = os.environ.get("MODEL_COUNT", 10) From 92e1febd3540c72b0f6cc7bfd7c4e56bd8f78373 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 16:33:44 +0530 Subject: [PATCH 22/33] Skip AF3 failing virtualenv test. Issue #1707 --- tests/operators/test_virtualenv.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 0e2f0c7363..9a766808bf 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -381,6 +381,8 @@ def test__release_venv_lock_current_process(tmpdir): assert not lockfile.exists() +# TODO: Make test compatible with Airflow 3.0. Issue: https://github.com/astronomer/astronomer-cosmos/issues/1707 +@pytest.mark.skipif(AIRFLOW_VERSION.major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.skipif( AIRFLOW_VERSION < Version("2.5"), reason="This error is only reproducible with dag.test, which was introduced in Airflow 2.5", From e242bac7547afcf49658674391fb3aef549f6729 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 16:37:23 +0530 Subject: [PATCH 23/33] Skip AF3 failing listener tests. Issue #1703 --- tests/listeners/test_dag_run_listener.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index a547f20ad0..77e30f8466 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -5,8 +5,10 @@ from unittest.mock import patch import pytest +from airflow import __version__ as airflow_version from airflow.models import DAG from airflow.utils.state import State +from packaging import version from cosmos import DbtRunLocalOperator, ProfileConfig, ProjectConfig from cosmos.airflow.dag import DbtDag @@ -77,6 +79,8 @@ def test_not_cosmos_dag(): assert total_cosmos_tasks(dag) == 0 +# TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1703 +@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.integration @patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") def test_on_dag_run_success(mock_emit_usage_metrics_if_enabled, caplog): @@ -102,6 +106,8 @@ def test_on_dag_run_success(mock_emit_usage_metrics_if_enabled, caplog): assert mock_emit_usage_metrics_if_enabled.call_count == 1 +# TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1703 +@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.integration @patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog): From b92330daba48845f7900685ced3e5188e3bddc7b Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 16:41:32 +0530 Subject: [PATCH 24/33] Skip AF3 failing local operator tests. Issue #1705 --- tests/operators/test_local.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 6bc627fbfe..8d320de576 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -574,6 +574,8 @@ def test_run_operator_dataset_emission_is_skipped(caplog): assert run_operator.outlets == [] +# TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1705 +@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.skipif( version.parse(airflow_version) < version.parse("2.4") or version.parse(airflow_version) in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, @@ -605,6 +607,8 @@ def test_run_operator_dataset_url_encoded_names(caplog): ] +# TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1705 +@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.integration def test_run_operator_caches_partial_parsing(caplog, tmp_path): caplog.set_level(logging.DEBUG) @@ -648,6 +652,8 @@ def test_dbt_base_operator_no_partial_parse() -> None: assert "--no-partial-parse" in cmd +# TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1705 +@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.integration @pytest.mark.parametrize("invocation_mode", [InvocationMode.SUBPROCESS, InvocationMode.DBT_RUNNER]) def test_run_test_operator_with_callback(invocation_mode, failing_test_dbt_project): @@ -683,6 +689,8 @@ def test_run_test_operator_with_callback(invocation_mode, failing_test_dbt_proje assert on_warning_callback.called +# TODO: Make test compatible with Airflow 3.0. Issue:https://github.com/astronomer/astronomer-cosmos/issues/1705 +@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.integration @pytest.mark.parametrize("invocation_mode", [InvocationMode.SUBPROCESS, InvocationMode.DBT_RUNNER]) def test_run_test_operator_without_callback(invocation_mode): From 1cba9be84c164c7d37fafbfae8a6ba547f1a183d Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 16:48:14 +0530 Subject: [PATCH 25/33] Skip dbt async test for dbt 1.6, 1.7 and 1.9. Issue: #1709, #1708 --- .github/workflows/test.yml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4e0c9693b9..cf10674c70 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -356,6 +356,19 @@ jobs: python-version: [ "3.11" ] airflow-version: [ "2.10", "3.0" ] dbt-version: ["1.5", "1.6", "1.7", "1.8", "1.9"] + # TODO: Add support for dbt 1.6, 1.7, 1.9 for Airflow 3.0. + # Issue dbt 1.7: https://github.com/astronomer/astronomer-cosmos/issues/1709 + # Issue dbt 1.6 and 1.9: https://github.com/astronomer/astronomer-cosmos/issues/1708 + exclude: + - python-version: "3.11" + airflow-version: "3.0" + dbt-version: "1.6" + - python-version: "3.11" + airflow-version: "3.0" + dbt-version: "1.7" + - python-version: "3.11" + airflow-version: "3.0" + dbt-version: "1.9" services: postgres: image: postgres From 72f62560e2df92772ca4635fd595d703f1b8c8a5 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 16:49:29 +0530 Subject: [PATCH 26/33] Add tracking ticket in TODO comment for tests/test_cache.py for AF3 compatibility --- tests/test_cache.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_cache.py b/tests/test_cache.py index ca7155a4b3..403daccbd9 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -4,6 +4,7 @@ from cosmos.constants import _AIRFLOW3_MAJOR_VERSION # TODO: Enable and make tests functional in the module for AF 3. Disabling to due to DB access code in this module. +# Issue: https://github.com/astronomer/astronomer-cosmos/issues/1711 if version.parse(airflow_version).major >= _AIRFLOW3_MAJOR_VERSION: import pytest From 551b449830e022037936050f284e4f7e3d2b0954 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 16:58:17 +0530 Subject: [PATCH 27/33] Skip failing k8s example dag test. Issue #1705 --- tests/test_example_k8s_dags.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_example_k8s_dags.py b/tests/test_example_k8s_dags.py index 2b24fd4665..36432c2f58 100644 --- a/tests/test_example_k8s_dags.py +++ b/tests/test_example_k8s_dags.py @@ -2,9 +2,11 @@ from pathlib import Path import pytest +from airflow import __version__ as airflow_version from airflow.models.dagbag import DagBag from airflow.utils.db import create_default_connections from airflow.utils.session import provide_session +from packaging import version from . import utils as test_utils @@ -35,6 +37,10 @@ def get_all_dag_files(): dag_ignorefile.writelines([f"{file}\n" for file in python_files]) +# TODO: Add compatibility for Airflow 3.0. Issue: #1705. +# Comment: https://github.com/astronomer/astronomer-cosmos/issues/1705#issuecomment-2834926490 +# Hint: Check if we can dag.test instead for Airflow 3.0 here. +@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Add compatibility for 3.0") @pytest.mark.integration def test_example_dag_kubernetes(session): get_all_dag_files() From d27d4e4c3e8fe23b4d030ed29ab5ea3ddc9d2e71 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 17:01:56 +0530 Subject: [PATCH 28/33] Comment cat command for perf test. Issue: #1710 --- .github/workflows/test.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index cf10674c70..f1efa01c5c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -482,7 +482,9 @@ jobs: # read the performance results and set them as an env var for the next step # format: NUM_MODELS={num_models}\nTIME={end - start}\n - cat /tmp/performance_results.txt > $GITHUB_STEP_SUMMARY + # TODO: Uncomment below "cat command" when issue https://github.com/astronomer/astronomer-cosmos/issues/1710 + # is resolved. The below command is failing at the moment since corresponding tests are skipped. + # cat /tmp/performance_results.txt > $GITHUB_STEP_SUMMARY env: AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres From e6d28e745d67b1dfc4b05cc08a0a7691c381d599 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 17:28:20 +0530 Subject: [PATCH 29/33] Skip failing dataset test AF3. Issue #1704 --- tests/operators/test_local.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 8d320de576..fbdc8cb3f3 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -462,6 +462,8 @@ def test_run_operator_dataset_inlets_and_outlets(caplog): assert test_operator.outlets == [] +# TODO: Add compatibility for Airflow 3. Issue: https://github.com/astronomer/astronomer-cosmos/issues/1704. +@pytest.mark.skipif(version.parse(airflow_version).major == 3, reason="Test need to be updated for Airflow 3.0") @pytest.mark.skipif( version.parse(airflow_version) < version.parse("2.10"), reason="From Airflow 2.10 onwards, we started using DatasetAlias, which changed this behaviour.", From a0f8e08c66fafd3dd7d64752fe01b38dc01ac9ab Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 18:40:43 +0530 Subject: [PATCH 30/33] Update .github/workflows/test.yml --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f1efa01c5c..79592e01a2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main, add-airflow-3-test-matrix-entry] + branches: [main] pull_request_target: # Also run on pull requests originated from forks branches: [main] From 4804c899d6933eba99f3d5a833f1cefd5d26466c Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 18:40:55 +0530 Subject: [PATCH 31/33] Update scripts/test/pre-install-airflow.sh --- scripts/test/pre-install-airflow.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 7192854d5c..a9dac22322 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -6,7 +6,6 @@ set -v AIRFLOW_VERSION="$1" PYTHON_VERSION="$2" - # Use this to set the appropriate Python environment in Github Actions, # while also not assuming --system when running locally. if [ "$GITHUB_ACTIONS" = "true" ] && [ -z "${VIRTUAL_ENV}" ]; then From 48e081d775dbf55f14db4ac6fc4afab1cfb538c8 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 18:41:06 +0530 Subject: [PATCH 32/33] Update scripts/test/pre-install-airflow.sh --- scripts/test/pre-install-airflow.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index a9dac22322..ebbd784c67 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -31,7 +31,6 @@ uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.t uv pip install apache-airflow-providers-docker --constraint /tmp/constraint.txt uv pip install apache-airflow-providers-postgres --constraint /tmp/constraint.txt - # Due to issue https://github.com/fsspec/gcsfs/issues/664 uv pip install "gcsfs<2025.3.0" From 1023f645009b12e0dc55653f92308f9721790f26 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 28 Apr 2025 18:41:15 +0530 Subject: [PATCH 33/33] Update scripts/test/pre-install-airflow.sh --- scripts/test/pre-install-airflow.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index ebbd784c67..edc87c9dc0 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -26,7 +26,6 @@ mv /tmp/constraint.txt.tmp /tmp/constraint.txt pip install uv uv pip install pip --upgrade - uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt uv pip install apache-airflow-providers-docker --constraint /tmp/constraint.txt uv pip install apache-airflow-providers-postgres --constraint /tmp/constraint.txt