From 131ca8c5e9b75312dd220dd29bc9bd9bb7bf692c Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 5 Feb 2026 17:24:13 +0530 Subject: [PATCH 1/5] Initialise DagBag with None dag_folder --- .github/workflows/test.yml | 2 +- tests/listeners/test_dag_run_listener.py | 4 +++- tests/utils.py | 4 +++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 082e2a9d71..f1efdde751 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main] + branches: [main,optimise-airflow-3-1-IT] # Also run on pull requests originating from forks. Although this is insecure by default, we need it to run # integration tests on forked PRs. As a guardrail, we’ve added an Authorize step to each job, which requires manually # approving the workflow run for each pushed commit. Approval only happens after a careful code review of the changes. diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index 22fea2a5a5..0f6c8eeb52 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -143,7 +143,9 @@ def create_dag_run(dag: DAG, run_id: str, run_after: datetime) -> DagRun: session.commit() # This creates both DagModel and DagVersion records - dagbag = DagBag(include_examples=False) + # NOTE: dag_folder=None is required to create an empty DagBag instead of + # loading all DAGs from the default DAGS_FOLDER (which would be slow) + dagbag = DagBag(dag_folder=None, include_examples=False) dagbag.bag_dag(dag) sync_bag_to_db(dagbag, bundle_name="test_bundle_listener", bundle_version="1") diff --git a/tests/utils.py b/tests/utils.py index 7c30e02666..3ac15ce1d9 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -57,7 +57,9 @@ def new_test_dag(dag: DAG) -> DagRun: session.commit() # This creates both DagModel and DagVersion records - dagbag = DagBag(include_examples=False) + # NOTE: dag_folder=None is required to create an empty DagBag instead of + # loading all DAGs from the default DAGS_FOLDER (which would be slow) + dagbag = DagBag(dag_folder=None, include_examples=False) dagbag.bag_dag(dag) sync_bag_to_db(dagbag, bundle_name="test_bundle", bundle_version="1") dr = dag.test(logical_date=timezone.utcnow()) From 9398783ad1910725cc213afeb48fb831d16aad0f Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 5 Feb 2026 18:11:11 +0530 Subject: [PATCH 2/5] Disable telemetry for tests and add timing --- tests/conftest.py | 23 +++++++++++++++++++++++ tests/utils.py | 25 +++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index ec067649a6..c343f16d2b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import json +import os from unittest.mock import patch import pytest @@ -7,6 +8,12 @@ from cosmos.constants import AIRFLOW_VERSION +# Disable telemetry during tests to avoid network overhead from HTTP calls. +# Each telemetry call has a 1s timeout and fires on every task success/failure, +# which can add significant overhead (20+ seconds for DAGs with many tasks). +# Set this BEFORE importing cosmos.settings to ensure it takes effect. +os.environ["DO_NOT_TRACK"] = "1" + if AIRFLOW_VERSION >= Version("3.1"): # Change introduced in Airflow 3.1.0 # https://github.com/apache/airflow/pull/55722/files @@ -15,6 +22,22 @@ base_operator_get_connection_path = "airflow.hooks.base.BaseHook.get_connection" +@pytest.fixture(scope="session", autouse=True) +def disable_telemetry_for_tests(): + """ + Disable telemetry for all tests to avoid network overhead. + + The telemetry listeners make HTTP calls on every task success/failure, + each with a 1s timeout. For DAGs with many tasks, this can overhead per test. + """ + import cosmos.settings + + original_value = cosmos.settings.do_not_track + cosmos.settings.do_not_track = True + yield + cosmos.settings.do_not_track = original_value + + @pytest.fixture() def mock_bigquery_conn(): # type: ignore """ diff --git a/tests/utils.py b/tests/utils.py index 3ac15ce1d9..d4f03e6750 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,6 +2,7 @@ import logging import sys +import time import warnings from datetime import datetime from typing import Any @@ -49,6 +50,8 @@ def new_test_dag(dag: DAG) -> DagRun: from airflow.models.dagbundle import DagBundleModel from airflow.utils.session import create_session + t0 = time.perf_counter() + # Create DagBundle if it doesn't exist (required for DagModel foreign key) # This mimics what get_bagged_dag does via manager.sync_bundles_to_db() with create_session() as session: @@ -56,17 +59,39 @@ def new_test_dag(dag: DAG) -> DagRun: session.merge(dag_bundle) session.commit() + t1 = time.perf_counter() + log.info("[TIMING] DagBundle creation: %.3fs", t1 - t0) + # This creates both DagModel and DagVersion records # NOTE: dag_folder=None is required to create an empty DagBag instead of # loading all DAGs from the default DAGS_FOLDER (which would be slow) dagbag = DagBag(dag_folder=None, include_examples=False) dagbag.bag_dag(dag) + + t2 = time.perf_counter() + log.info("[TIMING] DagBag creation + bag_dag: %.3fs", t2 - t1) + sync_bag_to_db(dagbag, bundle_name="test_bundle", bundle_version="1") + + t3 = time.perf_counter() + log.info("[TIMING] sync_bag_to_db: %.3fs", t3 - t2) + dr = dag.test(logical_date=timezone.utcnow()) + + t4 = time.perf_counter() + log.info("[TIMING] dag.test() execution: %.3fs", t4 - t3) + log.info("[TIMING] Total new_test_dag for %s: %.3fs", dag.dag_id, t4 - t0) + elif AIRFLOW_VERSION >= version.Version("3.0"): + t0 = time.perf_counter() dr = dag.test(logical_date=timezone.utcnow()) + t1 = time.perf_counter() + log.info("[TIMING] dag.test() execution for %s: %.3fs", dag.dag_id, t1 - t0) else: + t0 = time.perf_counter() dr = dag.test() + t1 = time.perf_counter() + log.info("[TIMING] dag.test() execution for %s: %.3fs", dag.dag_id, t1 - t0) return dr From 176fad76298ff77f1e79c09518586f8a8c5e0efe Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 5 Feb 2026 19:45:40 +0530 Subject: [PATCH 3/5] log timestamps --- tests/conftest.py | 27 ++++++--------------------- tests/utils.py | 14 +++++++------- 2 files changed, 13 insertions(+), 28 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index c343f16d2b..7118bb0971 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,5 @@ import json -import os +from datetime import datetime from unittest.mock import patch import pytest @@ -8,12 +8,6 @@ from cosmos.constants import AIRFLOW_VERSION -# Disable telemetry during tests to avoid network overhead from HTTP calls. -# Each telemetry call has a 1s timeout and fires on every task success/failure, -# which can add significant overhead (20+ seconds for DAGs with many tasks). -# Set this BEFORE importing cosmos.settings to ensure it takes effect. -os.environ["DO_NOT_TRACK"] = "1" - if AIRFLOW_VERSION >= Version("3.1"): # Change introduced in Airflow 3.1.0 # https://github.com/apache/airflow/pull/55722/files @@ -22,20 +16,11 @@ base_operator_get_connection_path = "airflow.hooks.base.BaseHook.get_connection" -@pytest.fixture(scope="session", autouse=True) -def disable_telemetry_for_tests(): - """ - Disable telemetry for all tests to avoid network overhead. - - The telemetry listeners make HTTP calls on every task success/failure, - each with a 1s timeout. For DAGs with many tasks, this can overhead per test. - """ - import cosmos.settings - - original_value = cosmos.settings.do_not_track - cosmos.settings.do_not_track = True - yield - cosmos.settings.do_not_track = original_value +def pytest_runtest_logreport(report): + """Log timestamp when tests pass/fail to help diagnose slow transitions.""" + if report.when == "call": # Only log for the actual test call, not setup/teardown + timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] + print(f" [{timestamp}] {report.nodeid} {report.outcome.upper()}") @pytest.fixture() diff --git a/tests/utils.py b/tests/utils.py index d4f03e6750..319440a5b8 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -60,7 +60,7 @@ def new_test_dag(dag: DAG) -> DagRun: session.commit() t1 = time.perf_counter() - log.info("[TIMING] DagBundle creation: %.3fs", t1 - t0) + print(f"[TIMING] DagBundle creation: {t1 - t0:.3f}s") # This creates both DagModel and DagVersion records # NOTE: dag_folder=None is required to create an empty DagBag instead of @@ -69,29 +69,29 @@ def new_test_dag(dag: DAG) -> DagRun: dagbag.bag_dag(dag) t2 = time.perf_counter() - log.info("[TIMING] DagBag creation + bag_dag: %.3fs", t2 - t1) + print(f"[TIMING] DagBag creation + bag_dag: {t2 - t1:.3f}s") sync_bag_to_db(dagbag, bundle_name="test_bundle", bundle_version="1") t3 = time.perf_counter() - log.info("[TIMING] sync_bag_to_db: %.3fs", t3 - t2) + print(f"[TIMING] sync_bag_to_db: {t3 - t2:.3f}s") dr = dag.test(logical_date=timezone.utcnow()) t4 = time.perf_counter() - log.info("[TIMING] dag.test() execution: %.3fs", t4 - t3) - log.info("[TIMING] Total new_test_dag for %s: %.3fs", dag.dag_id, t4 - t0) + print(f"[TIMING] dag.test() execution: {t4 - t3:.3f}s") + print(f"[TIMING] Total new_test_dag for {dag.dag_id}: {t4 - t0:.3f}s") elif AIRFLOW_VERSION >= version.Version("3.0"): t0 = time.perf_counter() dr = dag.test(logical_date=timezone.utcnow()) t1 = time.perf_counter() - log.info("[TIMING] dag.test() execution for %s: %.3fs", dag.dag_id, t1 - t0) + print(f"[TIMING] dag.test() execution for {dag.dag_id}: {t1 - t0:.3f}s") else: t0 = time.perf_counter() dr = dag.test() t1 = time.perf_counter() - log.info("[TIMING] dag.test() execution for %s: %.3fs", dag.dag_id, t1 - t0) + print(f"[TIMING] dag.test() execution for {dag.dag_id}: {t1 - t0:.3f}s") return dr From b1fce66001cb65d9b1cea5900ba713aad9d6575f Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 9 Feb 2026 16:45:52 +0530 Subject: [PATCH 4/5] Check task status --- scripts/test/integration-setup.sh | 2 +- tests/utils.py | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/scripts/test/integration-setup.sh b/scripts/test/integration-setup.sh index 99c277fe8d..d4c0df8fc0 100644 --- a/scripts/test/integration-setup.sh +++ b/scripts/test/integration-setup.sh @@ -47,7 +47,7 @@ if [ "$AIRFLOW_VERSION" = "2.6.0" ] ; then pip freeze | grep -i pydantic fi -pip install -U openlineage-airflow apache-airflow==$AIRFLOW_VERSION +pip install -U openlineage-airflow apache-airflow-providers-google apache-airflow==$AIRFLOW_VERSION if [ "$AIRFLOW_VERSION" = "3.1.0" ] ; then # This error was happening only in Airflow 3.1: diff --git a/tests/utils.py b/tests/utils.py index 319440a5b8..b0812f6366 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -35,7 +35,18 @@ def check_dag_success(dag_run: DagRun | None, expect_success: bool = True) -> bo """Check if a DAG was successful, if that Airflow version allows it.""" if dag_run is not None: if expect_success: - return dag_run.state == DagRunState.SUCCESS + if dag_run.state != DagRunState.SUCCESS: + return False + # Verify all task instances actually reached a terminal state. + for ti in dag_run.get_task_instances(): + if ti.state not in ("success", "skipped", "upstream_failed"): + log.error( + "Task %s is in unexpected state '%s' (expected success, skipped, or upstream_failed)", + ti.task_id, + ti.state, + ) + return False + return True else: return dag_run.state == DagRunState.FAILED return True From e60f3159e764aec0472e60b178a881cebb1402aa Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 9 Feb 2026 20:24:12 +0530 Subject: [PATCH 5/5] Skip running example DAGs --- scripts/test/integration.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index a844e37357..f7d3d90ccb 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -27,4 +27,6 @@ pytest -vv \ --ignore=tests/test_async_example_dag.py \ --ignore=tests/test_example_k8s_dags.py \ --ignore=tests/operators/test_watcher_kubernetes_integration.py \ + --ignore=tests/test_example_dags.py \ + --ignore=tests/test_example_dags_no_connections.py \ -k 'not (simple_dag_async or example_cosmos_python_models or example_virtualenv or jaffle_shop_kubernetes or jaffle_shop_watcher_kubernetes)'