diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index efd6c9ade7..66bfdb8543 100644 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -21,6 +21,8 @@ pytest -vv \ -m 'integration and not dbtfusion' \ --ignore=tests/perf \ --ignore=tests/test_async_example_dag.py \ + --ignore=tests/test_example_dags.py \ + --ignore=tests/test_example_dags_no_connections.py \ --ignore=tests/test_example_k8s_dags.py \ --ignore=tests/operators/test_watcher_kubernetes_integration.py \ --ignore=dev/dags/cross_project_dbt_ls_dag.py \ diff --git a/tests/conftest.py b/tests/conftest.py index ec067649a6..af0a8f3772 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,9 +4,22 @@ import pytest from airflow.models.connection import Connection from packaging.version import Version +from sqlalchemy import Table as SQLAlchemyTable from cosmos.constants import AIRFLOW_VERSION +# Workaround for Airflow 3.0/3.1: avoid "Table 'dag_warning' is already defined for this +# MetaData instance" when tests trigger duplicate table registration (e.g. DagBag, sync_bag_to_db). +if AIRFLOW_VERSION >= Version("3.0"): + _original_table_init = SQLAlchemyTable.__init__ + + def _patched_table_init(self, *args, **kwargs): + if args and args[0] == "dag_warning": + kwargs["extend_existing"] = True + return _original_table_init(self, *args, **kwargs) + + SQLAlchemyTable.__init__ = _patched_table_init + if AIRFLOW_VERSION >= Version("3.1"): # Change introduced in Airflow 3.1.0 # https://github.com/apache/airflow/pull/55722/files