From e6f28951342df00adff443ea2aac2461a1387b0e Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 26 Nov 2025 16:36:30 +0530 Subject: [PATCH 1/7] Drop Airflow 2.5 --- .github/workflows/test.yml | 16 +--- pyproject.toml | 4 +- scripts/test/pre-install-airflow.sh | 2 +- tests/_triggers/test_watcher.py | 2 +- tests/operators/test_virtualenv.py | 4 - tests/utils.py | 118 +++++----------------------- 6 files changed, 26 insertions(+), 120 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b447fe3fee..582d55220f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main] + branches: [main, drop_airflow2.5] # Also run on pull requests originating from forks. Although this is insecure by default, we need it to run # integration tests on forked PRs. As a guardrail, we’ve added an Authorize step to each job, which requires manually # approving the workflow run for each pushed commit. Approval only happens after a careful code review of the changes. @@ -55,17 +55,13 @@ jobs: fail-fast: false matrix: python-version: ["3.10", "3.11", "3.12", "3.13"] - airflow-version: ["2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"] + airflow-version: ["2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"] dbt-version: ["1.10"] exclude: - - python-version: "3.11" - airflow-version: "2.5" # Apache Airflow versions prior to 2.9.0 have not been tested with Python 3.12. # Official support for Python 3.12 and the corresponding constraints.txt are available only for Apache Airflow >= 2.9.0. # See: https://github.com/apache/airflow/tree/2.9.0?tab=readme-ov-file#requirements # See: https://github.com/apache/airflow/tree/2.8.4?tab=readme-ov-file#requirements - - python-version: "3.12" - airflow-version: "2.5" - python-version: "3.12" airflow-version: "2.6" - python-version: "3.12" @@ -73,8 +69,6 @@ jobs: - python-version: "3.12" airflow-version: "2.8" # Apache Airflow versions prior to 3.1.0 have not been tested with Python 3.13. - - python-version: "3.13" - airflow-version: "2.5" - python-version: "3.13" airflow-version: "2.6" - python-version: "3.13" @@ -133,14 +127,10 @@ jobs: fail-fast: false matrix: python-version: ["3.10", "3.11", "3.13"] - airflow-version: ["2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"] + airflow-version: ["2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"] dbt-version: ["1.10"] exclude: - - python-version: "3.11" - airflow-version: "2.5" # Apache Airflow versions prior to 3.1.0 have not been tested with Python 3.13. - - python-version: "3.13" - airflow-version: "2.5" - python-version: "3.13" airflow-version: "2.6" - python-version: "3.13" diff --git a/pyproject.toml b/pyproject.toml index 98833da545..f985253ec0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ classifiers = [ dependencies = [ "aenum", "attrs", - "apache-airflow>=2.5.0", + "apache-airflow>=2.6.0", "deprecation", # Python 3.13 exposes a deprecated operator, we can remove this dependency in the future "Jinja2>=3.0.0", "msgpack", @@ -169,7 +169,7 @@ pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} [[tool.hatch.envs.tests.matrix]] python = ["3.10", "3.11", "3.12", "3.13"] -airflow = ["2.5", "2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"] +airflow = ["2.6", "2.7", "2.8", "2.9", "2.10", "2.11", "3.0", "3.1"] dbt = ["1.5", "1.6", "1.7", "1.8", "1.9", "1.10", "2.0"] [tool.hatch.envs.tests.overrides] diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 552c398c93..dd576db1fe 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -41,7 +41,7 @@ uv pip install "apache-airflow==$AIRFLOW_VERSION" apache-airflow-providers-docke uv pip install "gcsfs<2025.3.0" -if [ "$AIRFLOW_VERSION" = "2.5" ] || [ "$AIRFLOW_VERSION" = "2.6" ] ; then +if [ "$AIRFLOW_VERSION" = "2.6" ] ; then uv pip install "apache-airflow-providers-amazon" "apache-airflow==$AIRFLOW_VERSION" "urllib3<2" uv pip install "apache-airflow-providers-cncf-kubernetes" "apache-airflow==$AIRFLOW_VERSION" uv pip install "apache-airflow-providers-google<10.11" "apache-airflow==$AIRFLOW_VERSION" diff --git a/tests/_triggers/test_watcher.py b/tests/_triggers/test_watcher.py index 5a2b344ac5..be37d088f6 100644 --- a/tests/_triggers/test_watcher.py +++ b/tests/_triggers/test_watcher.py @@ -98,7 +98,7 @@ async def test_parse_node_status(self, use_event, xcom_val, expected_status): @pytest.mark.parametrize( "airflow_version, expected_val", [ - (Version("2.5.0"), "af2"), # Airflow < 3 uses get_xcom_val_af2 + (Version("2.11.0"), "af2"), # Airflow < 3 uses get_xcom_val_af2 (Version("3.0.0"), "af3"), # Airflow >= 3 uses get_xcom_val_af3 ], ) diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 560c75999c..f2eaa352a3 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -394,10 +394,6 @@ def test__release_venv_lock_current_process(tmpdir): assert not lockfile.exists() -@pytest.mark.skipif( - AIRFLOW_VERSION < Version("2.5"), - reason="This error is only reproducible with dag.test, which was introduced in Airflow 2.5", -) @pytest.mark.integration def test_integration_virtualenv_operator(caplog): """ diff --git a/tests/utils.py b/tests/utils.py index 02bf6bc29c..8902ba359b 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -7,16 +7,12 @@ from typing import Any import sqlalchemy -from airflow.configuration import secrets_backend_list from airflow.exceptions import AirflowSkipException from airflow.models.dag import DAG from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance -from airflow.secrets.local_filesystem import LocalFilesystemBackend from airflow.utils import timezone -from airflow.utils.session import NEW_SESSION, provide_session -from airflow.utils.state import DagRunState, State -from airflow.utils.types import DagRunType +from airflow.utils.state import DagRunState from packaging import version from packaging.version import Version from sqlalchemy.orm.session import Session @@ -72,105 +68,29 @@ def test_dag( dag, conn_file_path: str | None = None, custom_tester: bool = False, expect_success: bool = True ) -> DagRun: dr = None - if custom_tester: - dr = test_old_dag(dag, conn_file_path) + if AIRFLOW_VERSION not in (Version("2.10.0"), Version("2.10.1"), Version("2.10.2"), Version("2.11.0")): + dr = new_test_dag(dag) assert check_dag_success(dr, expect_success), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " - elif AIRFLOW_VERSION >= version.Version("2.5"): - if AIRFLOW_VERSION not in (Version("2.10.0"), Version("2.10.1"), Version("2.10.2"), Version("2.11.0")): + else: + # This is a work around until we fix the issue in Airflow: + # https://github.com/apache/airflow/issues/42495 + """ + FAILED tests/test_example_dags.py::test_example_dag[example_model_version] - sqlalchemy.exc.PendingRollbackError: + This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). + Original exception was: Can't flush None value found in collection DatasetModel.aliases (Background on this error at: https://sqlalche.me/e/14/7s2a) + FAILED tests/test_example_dags.py::test_example_dag[basic_cosmos_dag] + FAILED tests/test_example_dags.py::test_example_dag[cosmos_profile_mapping] + FAILED tests/test_example_dags.py::test_example_dag[user_defined_profile] + """ + try: dr = new_test_dag(dag) assert check_dag_success( dr, expect_success ), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " - else: - # This is a work around until we fix the issue in Airflow: - # https://github.com/apache/airflow/issues/42495 - """ - FAILED tests/test_example_dags.py::test_example_dag[example_model_version] - sqlalchemy.exc.PendingRollbackError: - This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). - Original exception was: Can't flush None value found in collection DatasetModel.aliases (Background on this error at: https://sqlalche.me/e/14/7s2a) - FAILED tests/test_example_dags.py::test_example_dag[basic_cosmos_dag] - FAILED tests/test_example_dags.py::test_example_dag[cosmos_profile_mapping] - FAILED tests/test_example_dags.py::test_example_dag[user_defined_profile] - """ - try: - dr = new_test_dag(dag) - assert check_dag_success( - dr, expect_success - ), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " - except sqlalchemy.exc.PendingRollbackError: - warnings.warn( - "Early versions of Airflow 2.10 and Airflow 2.11 have issues when running the test command with DatasetAlias / Datasets" - ) - else: - dr = test_old_dag(dag, conn_file_path) - assert check_dag_success(dr), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " - - return dr - - -# DAG.test() was added in Airflow version 2.5.0. And to test on older Airflow versions, we need to copy the -# implementation here. -@provide_session -def test_old_dag( - dag, - execution_date: datetime | None = None, - run_conf: dict[str, Any] | None = None, - conn_file_path: str | None = None, - variable_file_path: str | None = None, - session: Session = NEW_SESSION, -) -> DagRun: - """ - Execute one single DagRun for a given DAG and execution date. - - :param execution_date: execution date for the DAG run - :param run_conf: configuration to pass to newly created dagrun - :param conn_file_path: file path to a connection file in either yaml or json - :param variable_file_path: file path to a variable file in either yaml or json - :param session: database connection (optional) - """ - - if conn_file_path or variable_file_path: - local_secrets = LocalFilesystemBackend( - variables_file_path=variable_file_path, connections_file_path=conn_file_path - ) - secrets_backend_list.insert(0, local_secrets) - - execution_date = execution_date or timezone.utcnow() - - dag.log.debug("Clearing existing task instances for execution date %s", execution_date) - dag.clear( - start_date=execution_date, - end_date=execution_date, - dag_run_state=False, - session=session, - ) - dag.log.debug("Getting dagrun for dag %s", dag.dag_id) - dr: DagRun = _get_or_create_dagrun( - dag=dag, - start_date=execution_date, - execution_date=execution_date, - run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date), - session=session, - conf=run_conf, - ) - - tasks = dag.task_dict - dag.log.debug("starting dagrun") - # Instead of starting a scheduler, we run the minimal loop possible to check - # for task readiness and dependency management. This is notably faster - # than creating a BackfillJob and allows us to surface logs to the user - while dr.state == State.RUNNING: - schedulable_tis, _ = dr.update_state(session=session) - for ti in schedulable_tis: - add_logger_if_needed(dag, ti) - ti.task = tasks[ti.task_id] - _run_task(ti, session=session) - if conn_file_path or variable_file_path: - # Remove the local variables we have added to the secrets_backend_list - secrets_backend_list.pop(0) - - print("conn_file_path", conn_file_path) - + except sqlalchemy.exc.PendingRollbackError: + warnings.warn( + "Early versions of Airflow 2.10 and Airflow 2.11 have issues when running the test command with DatasetAlias / Datasets" + ) return dr From d757dcbc577f45689a193d280a4ab00963c6e069 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 26 Nov 2025 16:40:06 +0530 Subject: [PATCH 2/7] Add changelog --- CHANGELOG.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 3782ce1b98..e9cf24ad60 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,6 +12,7 @@ Breaking changes - ``create_task_metadata`` now expects the ``node_converters`` argument * Drop Python 3.9 Support by @pankajastro in #2118 * Drop Airflow 2.4 support by @pankajastro in #2161 +* Drop Airflow 2.5 support by @pankajastro in #2165 Features From 0b2b84c1d04d826c0fc158e0538cdc7ec6c01a14 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 26 Nov 2025 17:13:01 +0530 Subject: [PATCH 3/7] Fix tests --- tests/operators/test_local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 7088c1c2ea..a8390fec1a 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -514,7 +514,7 @@ def test_run_operator_dataset_inlets_and_outlets_airflow_210(caplog): assert test_operator.outlets == [AssetAliasModel(name="test_id_1__test")] with pytest.raises(FlushError): - run_test_dag(dag, custom_tester=True) + run_test_dag(dag) # This is a known limitation of Airflow 2.10.0 and 2.10.1 # https://github.com/apache/airflow/issues/42495 From 0f3f0453555095da69d1cbfc8830800fc2d99e57 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 26 Nov 2025 19:49:40 +0530 Subject: [PATCH 4/7] bring back test_old_dag --- tests/utils.py | 76 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 74 insertions(+), 2 deletions(-) diff --git a/tests/utils.py b/tests/utils.py index 8902ba359b..e956224040 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -7,12 +7,16 @@ from typing import Any import sqlalchemy +from airflow.configuration import secrets_backend_list from airflow.exceptions import AirflowSkipException from airflow.models.dag import DAG from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance +from airflow.secrets.local_filesystem import LocalFilesystemBackend from airflow.utils import timezone -from airflow.utils.state import DagRunState +from airflow.utils.session import NEW_SESSION, provide_session +from airflow.utils.state import DagRunState, State +from airflow.utils.types import DagRunType from packaging import version from packaging.version import Version from sqlalchemy.orm.session import Session @@ -68,7 +72,9 @@ def test_dag( dag, conn_file_path: str | None = None, custom_tester: bool = False, expect_success: bool = True ) -> DagRun: dr = None - if AIRFLOW_VERSION not in (Version("2.10.0"), Version("2.10.1"), Version("2.10.2"), Version("2.11.0")): + if custom_tester: + dr = test_old_dag(dag, conn_file_path) + elif AIRFLOW_VERSION not in (Version("2.10.0"), Version("2.10.1"), Version("2.10.2"), Version("2.11.0")): dr = new_test_dag(dag) assert check_dag_success(dr, expect_success), f"Dag {dag.dag_id} did not run successfully. State: {dr.state}. " else: @@ -94,6 +100,72 @@ def test_dag( return dr +# TODO: Test operators/test_local.py::test_run_operator_dataset_inlets_and_outlets_airflow_210 +# still depends on this utility. Remove this once that test is fixed. +@provide_session +def test_old_dag( + dag, + execution_date: datetime | None = None, + run_conf: dict[str, Any] | None = None, + conn_file_path: str | None = None, + variable_file_path: str | None = None, + session: Session = NEW_SESSION, +) -> DagRun: + """ + Execute one single DagRun for a given DAG and execution date. + + :param execution_date: execution date for the DAG run + :param run_conf: configuration to pass to newly created dagrun + :param conn_file_path: file path to a connection file in either yaml or json + :param variable_file_path: file path to a variable file in either yaml or json + :param session: database connection (optional) + """ + + if conn_file_path or variable_file_path: + local_secrets = LocalFilesystemBackend( + variables_file_path=variable_file_path, connections_file_path=conn_file_path + ) + secrets_backend_list.insert(0, local_secrets) + + execution_date = execution_date or timezone.utcnow() + + dag.log.debug("Clearing existing task instances for execution date %s", execution_date) + dag.clear( + start_date=execution_date, + end_date=execution_date, + dag_run_state=False, + session=session, + ) + dag.log.debug("Getting dagrun for dag %s", dag.dag_id) + dr: DagRun = _get_or_create_dagrun( + dag=dag, + start_date=execution_date, + execution_date=execution_date, + run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date), + session=session, + conf=run_conf, + ) + + tasks = dag.task_dict + dag.log.debug("starting dagrun") + # Instead of starting a scheduler, we run the minimal loop possible to check + # for task readiness and dependency management. This is notably faster + # than creating a BackfillJob and allows us to surface logs to the user + while dr.state == State.RUNNING: + schedulable_tis, _ = dr.update_state(session=session) + for ti in schedulable_tis: + add_logger_if_needed(dag, ti) + ti.task = tasks[ti.task_id] + _run_task(ti, session=session) + if conn_file_path or variable_file_path: + # Remove the local variables we have added to the secrets_backend_list + secrets_backend_list.pop(0) + + print("conn_file_path", conn_file_path) + + return dr + + def add_logger_if_needed(dag: DAG, ti: TaskInstance): """ Add a formatted logger to the taskinstance so all logs are surfaced to the command line instead From 9ca68e0e394d04f75cbe6d2369f6dacbd5c14fbb Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Wed, 26 Nov 2025 20:15:22 +0530 Subject: [PATCH 5/7] Apply suggestions from code review --- tests/operators/test_local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index a8390fec1a..7088c1c2ea 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -514,7 +514,7 @@ def test_run_operator_dataset_inlets_and_outlets_airflow_210(caplog): assert test_operator.outlets == [AssetAliasModel(name="test_id_1__test")] with pytest.raises(FlushError): - run_test_dag(dag) + run_test_dag(dag, custom_tester=True) # This is a known limitation of Airflow 2.10.0 and 2.10.1 # https://github.com/apache/airflow/issues/42495 From 518ca844897d98bf63eb66d0aff1c3e4eb738ad8 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Wed, 26 Nov 2025 20:44:32 +0530 Subject: [PATCH 6/7] Apply suggestions from code review --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 582d55220f..d75be38310 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main, drop_airflow2.5] + branches: [main] # Also run on pull requests originating from forks. Although this is insecure by default, we need it to run # integration tests on forked PRs. As a guardrail, we’ve added an Authorize step to each job, which requires manually # approving the workflow run for each pushed commit. Approval only happens after a careful code review of the changes. From e20962d5d58e0465c3350bf1c58e8b4e4f48a6ce Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 26 Nov 2025 21:10:36 +0530 Subject: [PATCH 7/7] Link issue --- tests/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/utils.py b/tests/utils.py index e956224040..7c30e02666 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -102,6 +102,7 @@ def test_dag( # TODO: Test operators/test_local.py::test_run_operator_dataset_inlets_and_outlets_airflow_210 # still depends on this utility. Remove this once that test is fixed. +# https://github.com/astronomer/astronomer-cosmos/issues/2166 @provide_session def test_old_dag( dag,