diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 082e2a9d71..f1efdde751 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: test on: push: # Run on pushes to the default branch - branches: [main] + branches: [main,optimise-airflow-3-1-IT] # Also run on pull requests originating from forks. Although this is insecure by default, we need it to run # integration tests on forked PRs. As a guardrail, we’ve added an Authorize step to each job, which requires manually # approving the workflow run for each pushed commit. Approval only happens after a careful code review of the changes. diff --git a/scripts/test/integration-setup.sh b/scripts/test/integration-setup.sh index 99c277fe8d..d4c0df8fc0 100644 --- a/scripts/test/integration-setup.sh +++ b/scripts/test/integration-setup.sh @@ -47,7 +47,7 @@ if [ "$AIRFLOW_VERSION" = "2.6.0" ] ; then pip freeze | grep -i pydantic fi -pip install -U openlineage-airflow apache-airflow==$AIRFLOW_VERSION +pip install -U openlineage-airflow apache-airflow-providers-google apache-airflow==$AIRFLOW_VERSION if [ "$AIRFLOW_VERSION" = "3.1.0" ] ; then # This error was happening only in Airflow 3.1: diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index a844e37357..f7d3d90ccb 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -27,4 +27,6 @@ pytest -vv \ --ignore=tests/test_async_example_dag.py \ --ignore=tests/test_example_k8s_dags.py \ --ignore=tests/operators/test_watcher_kubernetes_integration.py \ + --ignore=tests/test_example_dags.py \ + --ignore=tests/test_example_dags_no_connections.py \ -k 'not (simple_dag_async or example_cosmos_python_models or example_virtualenv or jaffle_shop_kubernetes or jaffle_shop_watcher_kubernetes)' diff --git a/tests/conftest.py b/tests/conftest.py index ec067649a6..7118bb0971 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import json +from datetime import datetime from unittest.mock import patch import pytest @@ -15,6 +16,13 @@ base_operator_get_connection_path = "airflow.hooks.base.BaseHook.get_connection" +def pytest_runtest_logreport(report): + """Log timestamp when tests pass/fail to help diagnose slow transitions.""" + if report.when == "call": # Only log for the actual test call, not setup/teardown + timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] + print(f" [{timestamp}] {report.nodeid} {report.outcome.upper()}") + + @pytest.fixture() def mock_bigquery_conn(): # type: ignore """ diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py index 22fea2a5a5..0f6c8eeb52 100644 --- a/tests/listeners/test_dag_run_listener.py +++ b/tests/listeners/test_dag_run_listener.py @@ -143,7 +143,9 @@ def create_dag_run(dag: DAG, run_id: str, run_after: datetime) -> DagRun: session.commit() # This creates both DagModel and DagVersion records - dagbag = DagBag(include_examples=False) + # NOTE: dag_folder=None is required to create an empty DagBag instead of + # loading all DAGs from the default DAGS_FOLDER (which would be slow) + dagbag = DagBag(dag_folder=None, include_examples=False) dagbag.bag_dag(dag) sync_bag_to_db(dagbag, bundle_name="test_bundle_listener", bundle_version="1") diff --git a/tests/utils.py b/tests/utils.py index 7c30e02666..b0812f6366 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,6 +2,7 @@ import logging import sys +import time import warnings from datetime import datetime from typing import Any @@ -34,7 +35,18 @@ def check_dag_success(dag_run: DagRun | None, expect_success: bool = True) -> bo """Check if a DAG was successful, if that Airflow version allows it.""" if dag_run is not None: if expect_success: - return dag_run.state == DagRunState.SUCCESS + if dag_run.state != DagRunState.SUCCESS: + return False + # Verify all task instances actually reached a terminal state. + for ti in dag_run.get_task_instances(): + if ti.state not in ("success", "skipped", "upstream_failed"): + log.error( + "Task %s is in unexpected state '%s' (expected success, skipped, or upstream_failed)", + ti.task_id, + ti.state, + ) + return False + return True else: return dag_run.state == DagRunState.FAILED return True @@ -49,6 +61,8 @@ def new_test_dag(dag: DAG) -> DagRun: from airflow.models.dagbundle import DagBundleModel from airflow.utils.session import create_session + t0 = time.perf_counter() + # Create DagBundle if it doesn't exist (required for DagModel foreign key) # This mimics what get_bagged_dag does via manager.sync_bundles_to_db() with create_session() as session: @@ -56,15 +70,39 @@ def new_test_dag(dag: DAG) -> DagRun: session.merge(dag_bundle) session.commit() + t1 = time.perf_counter() + print(f"[TIMING] DagBundle creation: {t1 - t0:.3f}s") + # This creates both DagModel and DagVersion records - dagbag = DagBag(include_examples=False) + # NOTE: dag_folder=None is required to create an empty DagBag instead of + # loading all DAGs from the default DAGS_FOLDER (which would be slow) + dagbag = DagBag(dag_folder=None, include_examples=False) dagbag.bag_dag(dag) + + t2 = time.perf_counter() + print(f"[TIMING] DagBag creation + bag_dag: {t2 - t1:.3f}s") + sync_bag_to_db(dagbag, bundle_name="test_bundle", bundle_version="1") + + t3 = time.perf_counter() + print(f"[TIMING] sync_bag_to_db: {t3 - t2:.3f}s") + dr = dag.test(logical_date=timezone.utcnow()) + + t4 = time.perf_counter() + print(f"[TIMING] dag.test() execution: {t4 - t3:.3f}s") + print(f"[TIMING] Total new_test_dag for {dag.dag_id}: {t4 - t0:.3f}s") + elif AIRFLOW_VERSION >= version.Version("3.0"): + t0 = time.perf_counter() dr = dag.test(logical_date=timezone.utcnow()) + t1 = time.perf_counter() + print(f"[TIMING] dag.test() execution for {dag.dag_id}: {t1 - t0:.3f}s") else: + t0 = time.perf_counter() dr = dag.test() + t1 = time.perf_counter() + print(f"[TIMING] dag.test() execution for {dag.dag_id}: {t1 - t0:.3f}s") return dr