From 4e7779abe597672a18ac978e99b5290d20a902eb Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 9 Apr 2026 17:15:50 +0530 Subject: [PATCH 1/5] Speed up Airflow 3.1+ integration tests with DAG sync caching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Airflow 3.1+ requires DAGs to be serialized to the database before dag.test() can create a DagRun. Previously, every single test created a fresh DagBundleModel, instantiated a new DagBag, and called sync_bag_to_db individually — adding significant per-test overhead that caused integration tests on 3.1/3.2 to take 5-6x longer than on 2.9. Cache DagBundle creation and track synced DAG IDs at module level so each DAG is synced at most once per session. Add a batch pre-sync fixture in test_example_dags.py that syncs all ~31 example DAGs in a single call at module start, letting individual parametrized tests skip the sync entirely. Also add invalidate_dag_sync_cache() for tests that explicitly delete DAG metadata records. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/test_airflow_versioning.py | 4 +- tests/test_example_dags.py | 12 +++++ tests/utils.py | 90 ++++++++++++++++++++++++-------- 3 files changed, 84 insertions(+), 22 deletions(-) diff --git a/tests/test_airflow_versioning.py b/tests/test_airflow_versioning.py index 3e7cb62cca..fc6c21f8e7 100644 --- a/tests/test_airflow_versioning.py +++ b/tests/test_airflow_versioning.py @@ -5,7 +5,7 @@ from packaging.version import Version from cosmos.constants import AIRFLOW_VERSION -from tests.utils import run_dag +from tests.utils import invalidate_dag_sync_cache, run_dag if AIRFLOW_VERSION < Version("3.1"): pytest.skip("Skipping Airflow versioning tests on Airflow 2.x and 3.0", allow_module_level=True) @@ -66,8 +66,10 @@ def _cleanup(dag_id, session=None): raise _cleanup(test_dag_id) + invalidate_dag_sync_cache(test_dag_id) yield _cleanup(test_dag_id) + invalidate_dag_sync_cache(test_dag_id) @pytest.fixture diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 664e3ddbfd..4088c2ee5c 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -116,6 +116,18 @@ def run_dag(dag_id: str): test_utils.run_dag(dag) +@pytest.fixture(scope="module", autouse=True) +def _presync_example_dags_to_db(): + """Pre-sync all example DAGs to the database in one batch for Airflow 3.1+. + + Without this, each test individually creates a DagBag and calls sync_bag_to_db, + adding significant per-test overhead. Batch-syncing up front lets individual tests + skip the sync entirely. + """ + dag_bag = get_dagbag_depending_on_single_dag() + test_utils.sync_dags_to_db(list(dag_bag.dags.values())) + + @pytest.mark.skipif( AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", diff --git a/tests/utils.py b/tests/utils.py index 70b408079d..f9a3dc1806 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -25,6 +25,72 @@ log = logging.getLogger(__name__) +# Airflow 3.1+ requires DAGs to be serialized to the database before dag.test() +# can create a DagRun. These module-level caches avoid redundant DB operations +# (DagBundleModel creation and sync_bag_to_db calls) across tests. +_dag_bundle_created = False +_synced_dag_ids: set[str] = set() + + +def _ensure_dag_bundle_exists(): + """Create the DagBundleModel record once (Airflow 3.1+ only). + + Subsequent calls are no-ops thanks to the module-level flag. + """ + global _dag_bundle_created + if _dag_bundle_created: + return + + from airflow.models.dagbundle import DagBundleModel + from airflow.utils.session import create_session + + with create_session() as session: + session.merge(DagBundleModel(name="test_bundle")) + session.commit() + _dag_bundle_created = True + + +def _get_dagbag_and_sync(): + """Return (DagBag class, sync_bag_to_db function), handling import differences across Airflow versions.""" + try: + from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db + except ImportError: + from airflow.models.dagbag import DagBag, sync_bag_to_db + return DagBag, sync_bag_to_db + + +def sync_dags_to_db(dags: list[DAG]): + """Batch-sync multiple DAGs to the database for Airflow 3.1+. + + Syncs all provided DAGs in a single sync_bag_to_db call, + skipping any that were already synced in this test session. + No-op for Airflow versions before 3.1. + """ + if AIRFLOW_VERSION < version.Version("3.1"): + return + + _ensure_dag_bundle_exists() + + new_dags = [dag for dag in dags if dag.dag_id not in _synced_dag_ids] + if not new_dags: + return + + DagBag, sync_bag_to_db = _get_dagbag_and_sync() + dagbag = DagBag(include_examples=False) + for dag in new_dags: + dagbag.bag_dag(dag) + sync_bag_to_db(dagbag, bundle_name="test_bundle", bundle_version="1") + _synced_dag_ids.update(dag.dag_id for dag in new_dags) + + +def invalidate_dag_sync_cache(*dag_ids: str): + """Remove DAG IDs from the sync cache, forcing re-sync on next use. + + Call this when DAG metadata records (DagModel, DagVersion) are deleted + outside the normal test flow, e.g. in versioning test cleanup fixtures. + """ + _synced_dag_ids.difference_update(dag_ids) + def run_dag(dag: DAG, conn_file_path: str | None = None) -> DagRun: return test_dag(dag=dag, conn_file_path=conn_file_path) @@ -43,27 +109,9 @@ def check_dag_success(dag_run: DagRun | None, expect_success: bool = True) -> bo def new_test_dag(dag: DAG) -> DagRun: if AIRFLOW_VERSION >= version.Version("3.1"): # Airflow 3.1+ requires DAG to be serialized to database before calling dag.test() - # because create_dagrun() checks for DagVersion and DagModel records - - try: - from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db - except ImportError: - from airflow.models.dagbag import DagBag, sync_bag_to_db - - from airflow.models.dagbundle import DagBundleModel - from airflow.utils.session import create_session - - # Create DagBundle if it doesn't exist (required for DagModel foreign key) - # This mimics what get_bagged_dag does via manager.sync_bundles_to_db() - with create_session() as session: - dag_bundle = DagBundleModel(name="test_bundle") - session.merge(dag_bundle) - session.commit() - - # This creates both DagModel and DagVersion records - dagbag = DagBag(include_examples=False) - dagbag.bag_dag(dag) - sync_bag_to_db(dagbag, bundle_name="test_bundle", bundle_version="1") + # because create_dagrun() checks for DagVersion and DagModel records. + # sync_dags_to_db caches per dag_id so repeated calls for the same DAG are skipped. + sync_dags_to_db([dag]) dr = dag.test(logical_date=timezone.utcnow()) elif AIRFLOW_VERSION >= version.Version("3.0"): dr = dag.test(logical_date=timezone.utcnow()) From 7f39270f89eba6894f26151b9cff7dfd39fea9d9 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 9 Apr 2026 18:39:16 +0530 Subject: [PATCH 2/5] Cache InProcessExecutionAPI to fix dag.test() per-task overhead MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Airflow 3.x's dag.test() creates a new InProcessExecutionAPI for every task — a full FastAPI app with ASGI middleware, JWT auth, and async event loop. For a 13-task DAG this adds ~80s of overhead (~6-8s per task), explaining why test_example_dag[basic_cosmos_dag] takes 81s on Airflow 3.2 vs 2.5s on 2.10. Add a session-scoped conftest fixture that patches in_process_api_server() to return a cached instance, so the FastAPI app is created once and reused across all tasks and tests. This is the primary bottleneck; the sync caching from the previous commit provides a secondary improvement. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/conftest.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 22d03de520..6be51b902b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,6 +12,41 @@ from cosmos.log import CosmosRichLogger +@pytest.fixture(autouse=True, scope="session") +def _cache_airflow_in_process_api(): + """Cache the InProcessExecutionAPI to avoid per-task FastAPI app creation in dag.test(). + + Airflow 3.x's dag.test() creates a new InProcessExecutionAPI — a full FastAPI + application with ASGI middleware, JWT auth, dependency injection, and an async + event loop — for every single task. This adds ~6-8s of overhead per task, + making a 13-task DAG take ~80s instead of ~2s. + + This fixture patches in_process_api_server() to return a cached instance, + so the FastAPI app is created once and reused across all tasks and tests. + """ + if AIRFLOW_VERSION < Version("3.0"): + yield + return + + try: + from airflow.sdk.execution_time import supervisor as supervisor_module + + _original_fn = supervisor_module.in_process_api_server + _cached_api = None + + def cached_in_process_api_server(): + nonlocal _cached_api + if _cached_api is None: + _cached_api = _original_fn() + return _cached_api + + supervisor_module.in_process_api_server = cached_in_process_api_server + except ImportError: + pass + + yield + + @pytest.fixture(autouse=True) def _cleanup_rich_loggers(): """Replace any CosmosRichLogger instances with standard loggers after each test. From 8a72c85ecd8d99f9e2964e45996994f544ddb3e5 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 9 Apr 2026 19:15:55 +0530 Subject: [PATCH 3/5] Handle missing in_process_api_server in Airflow 3.0 Airflow 3.0 has the supervisor module but not the in_process_api_server function (added in 3.1). Catch AttributeError alongside ImportError to make the caching fixture a no-op on 3.0. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/conftest.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 6be51b902b..d1f2e0cd00 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -41,7 +41,9 @@ def cached_in_process_api_server(): return _cached_api supervisor_module.in_process_api_server = cached_in_process_api_server - except ImportError: + except (ImportError, AttributeError): + # ImportError: Airflow 2.x doesn't have this module + # AttributeError: Airflow 3.0 has the module but not in_process_api_server pass yield From 3ee7de4bfd24ac8dec72bd00e0b4fce767f333b4 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 9 Apr 2026 19:22:09 +0530 Subject: [PATCH 4/5] Revert "Speed up Airflow 3.1+ integration tests with DAG sync caching" This reverts commit a3f1a04aab60eab407e5cd577116e2ede8925d09. --- tests/test_airflow_versioning.py | 4 +- tests/test_example_dags.py | 12 ----- tests/utils.py | 90 ++++++++------------------------ 3 files changed, 22 insertions(+), 84 deletions(-) diff --git a/tests/test_airflow_versioning.py b/tests/test_airflow_versioning.py index fc6c21f8e7..3e7cb62cca 100644 --- a/tests/test_airflow_versioning.py +++ b/tests/test_airflow_versioning.py @@ -5,7 +5,7 @@ from packaging.version import Version from cosmos.constants import AIRFLOW_VERSION -from tests.utils import invalidate_dag_sync_cache, run_dag +from tests.utils import run_dag if AIRFLOW_VERSION < Version("3.1"): pytest.skip("Skipping Airflow versioning tests on Airflow 2.x and 3.0", allow_module_level=True) @@ -66,10 +66,8 @@ def _cleanup(dag_id, session=None): raise _cleanup(test_dag_id) - invalidate_dag_sync_cache(test_dag_id) yield _cleanup(test_dag_id) - invalidate_dag_sync_cache(test_dag_id) @pytest.fixture diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 4088c2ee5c..664e3ddbfd 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -116,18 +116,6 @@ def run_dag(dag_id: str): test_utils.run_dag(dag) -@pytest.fixture(scope="module", autouse=True) -def _presync_example_dags_to_db(): - """Pre-sync all example DAGs to the database in one batch for Airflow 3.1+. - - Without this, each test individually creates a DagBag and calls sync_bag_to_db, - adding significant per-test overhead. Batch-syncing up front lets individual tests - skip the sync entirely. - """ - dag_bag = get_dagbag_depending_on_single_dag() - test_utils.sync_dags_to_db(list(dag_bag.dags.values())) - - @pytest.mark.skipif( AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS, reason="Airflow 2.9.0 and 2.9.1 have a breaking change in Dataset URIs, and Cosmos errors if `emit_datasets` is not False", diff --git a/tests/utils.py b/tests/utils.py index f9a3dc1806..70b408079d 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -25,72 +25,6 @@ log = logging.getLogger(__name__) -# Airflow 3.1+ requires DAGs to be serialized to the database before dag.test() -# can create a DagRun. These module-level caches avoid redundant DB operations -# (DagBundleModel creation and sync_bag_to_db calls) across tests. -_dag_bundle_created = False -_synced_dag_ids: set[str] = set() - - -def _ensure_dag_bundle_exists(): - """Create the DagBundleModel record once (Airflow 3.1+ only). - - Subsequent calls are no-ops thanks to the module-level flag. - """ - global _dag_bundle_created - if _dag_bundle_created: - return - - from airflow.models.dagbundle import DagBundleModel - from airflow.utils.session import create_session - - with create_session() as session: - session.merge(DagBundleModel(name="test_bundle")) - session.commit() - _dag_bundle_created = True - - -def _get_dagbag_and_sync(): - """Return (DagBag class, sync_bag_to_db function), handling import differences across Airflow versions.""" - try: - from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db - except ImportError: - from airflow.models.dagbag import DagBag, sync_bag_to_db - return DagBag, sync_bag_to_db - - -def sync_dags_to_db(dags: list[DAG]): - """Batch-sync multiple DAGs to the database for Airflow 3.1+. - - Syncs all provided DAGs in a single sync_bag_to_db call, - skipping any that were already synced in this test session. - No-op for Airflow versions before 3.1. - """ - if AIRFLOW_VERSION < version.Version("3.1"): - return - - _ensure_dag_bundle_exists() - - new_dags = [dag for dag in dags if dag.dag_id not in _synced_dag_ids] - if not new_dags: - return - - DagBag, sync_bag_to_db = _get_dagbag_and_sync() - dagbag = DagBag(include_examples=False) - for dag in new_dags: - dagbag.bag_dag(dag) - sync_bag_to_db(dagbag, bundle_name="test_bundle", bundle_version="1") - _synced_dag_ids.update(dag.dag_id for dag in new_dags) - - -def invalidate_dag_sync_cache(*dag_ids: str): - """Remove DAG IDs from the sync cache, forcing re-sync on next use. - - Call this when DAG metadata records (DagModel, DagVersion) are deleted - outside the normal test flow, e.g. in versioning test cleanup fixtures. - """ - _synced_dag_ids.difference_update(dag_ids) - def run_dag(dag: DAG, conn_file_path: str | None = None) -> DagRun: return test_dag(dag=dag, conn_file_path=conn_file_path) @@ -109,9 +43,27 @@ def check_dag_success(dag_run: DagRun | None, expect_success: bool = True) -> bo def new_test_dag(dag: DAG) -> DagRun: if AIRFLOW_VERSION >= version.Version("3.1"): # Airflow 3.1+ requires DAG to be serialized to database before calling dag.test() - # because create_dagrun() checks for DagVersion and DagModel records. - # sync_dags_to_db caches per dag_id so repeated calls for the same DAG are skipped. - sync_dags_to_db([dag]) + # because create_dagrun() checks for DagVersion and DagModel records + + try: + from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db + except ImportError: + from airflow.models.dagbag import DagBag, sync_bag_to_db + + from airflow.models.dagbundle import DagBundleModel + from airflow.utils.session import create_session + + # Create DagBundle if it doesn't exist (required for DagModel foreign key) + # This mimics what get_bagged_dag does via manager.sync_bundles_to_db() + with create_session() as session: + dag_bundle = DagBundleModel(name="test_bundle") + session.merge(dag_bundle) + session.commit() + + # This creates both DagModel and DagVersion records + dagbag = DagBag(include_examples=False) + dagbag.bag_dag(dag) + sync_bag_to_db(dagbag, bundle_name="test_bundle", bundle_version="1") dr = dag.test(logical_date=timezone.utcnow()) elif AIRFLOW_VERSION >= version.Version("3.0"): dr = dag.test(logical_date=timezone.utcnow()) From 322d21d961f90a88ce583bcfb915b4df46279ed7 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 9 Apr 2026 20:06:59 +0530 Subject: [PATCH 5/5] Address review: restore original function in teardown, tighten version guard Tighten version guard from 3.0 to 3.1 since in_process_api_server only exists in Airflow 3.1+, avoiding unnecessary import/exception work on 3.0. Restore the original function in a finally block after yield to prevent leaking the monkeypatch. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/conftest.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index d1f2e0cd00..3670e14b87 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -24,7 +24,7 @@ def _cache_airflow_in_process_api(): This fixture patches in_process_api_server() to return a cached instance, so the FastAPI app is created once and reused across all tasks and tests. """ - if AIRFLOW_VERSION < Version("3.0"): + if AIRFLOW_VERSION < Version("3.1"): yield return @@ -32,21 +32,23 @@ def _cache_airflow_in_process_api(): from airflow.sdk.execution_time import supervisor as supervisor_module _original_fn = supervisor_module.in_process_api_server - _cached_api = None + except (ImportError, AttributeError): + yield + return - def cached_in_process_api_server(): - nonlocal _cached_api - if _cached_api is None: - _cached_api = _original_fn() - return _cached_api + _cached_api = None - supervisor_module.in_process_api_server = cached_in_process_api_server - except (ImportError, AttributeError): - # ImportError: Airflow 2.x doesn't have this module - # AttributeError: Airflow 3.0 has the module but not in_process_api_server - pass + def cached_in_process_api_server(): + nonlocal _cached_api + if _cached_api is None: + _cached_api = _original_fn() + return _cached_api - yield + supervisor_module.in_process_api_server = cached_in_process_api_server + try: + yield + finally: + supervisor_module.in_process_api_server = _original_fn @pytest.fixture(autouse=True)