From 5324482b4a90551d4afa2af535e778c0c56ff8d5 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 2 Jun 2026 17:42:59 +0530 Subject: [PATCH 1/5] Use a version-aware EmptyOperator import path via a shared constant Add a centralized EMPTY_OPERATOR_CLASS constant that resolves to airflow.providers.standard.operators.empty.EmptyOperator on Airflow 3 and the legacy airflow.operators.empty.EmptyOperator on Airflow 2. Use it for the source-without-freshness rendering path, which previously hardcoded the legacy path and emitted a DeprecatedImportWarning on Airflow 3. --- cosmos/airflow/graph.py | 3 ++- cosmos/constants.py | 8 ++++++++ tests/airflow/test_graph.py | 3 ++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 755893f5d8..1160765217 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -29,6 +29,7 @@ DBT_SETUP_ASYNC_TASK_ID, DBT_TEARDOWN_ASYNC_TASK_ID, DEFAULT_DBT_RESOURCES, + EMPTY_OPERATOR_CLASS, PRODUCER_WATCHER_DONE_TASK_ID, PRODUCER_WATCHER_TASK_ID, SUPPORTED_BUILD_RESOURCES, @@ -417,7 +418,7 @@ def create_task_metadata( # noqa: C901 args = {"task_display_name": args["task_display_name"]} else: args = {} - return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator", arguments=args) + return TaskMetadata(id=task_id, operator_class=EMPTY_OPERATOR_CLASS, arguments=args) else: # DbtResourceType.MODEL, DbtResourceType.SEED and DbtResourceType.SNAPSHOT if node.fqn and len(node.fqn) > 0: args[models_select_key] = f"fqn:{'.'.join(node.fqn)}" diff --git a/cosmos/constants.py b/cosmos/constants.py index 94f22de9fc..d7e3fec299 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -9,6 +9,14 @@ AIRFLOW_VERSION = Version(airflow.__version__) +# The EmptyOperator import path changed in Airflow 3: it moved to the standard provider. The legacy +# ``airflow.operators.empty`` path still works in Airflow 3 but emits a DeprecatedImportWarning. +EMPTY_OPERATOR_CLASS = ( + "airflow.operators.empty.EmptyOperator" + if AIRFLOW_VERSION < Version("3.0") + else "airflow.providers.standard.operators.empty.EmptyOperator" +) + BIGQUERY_PROFILE_TYPE = "bigquery" DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml") DBT_PROJECT_FILENAME = "dbt_project.yml" diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 388fd78350..68f8d63975 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -34,6 +34,7 @@ ) from cosmos.config import ProfileConfig, RenderConfig from cosmos.constants import ( + EMPTY_OPERATOR_CLASS, SUPPORTED_BUILD_RESOURCES, DbtResourceType, ExecutionMode, @@ -691,7 +692,7 @@ def test_create_task_metadata_model_use_task_group(caplog): False, SOURCE_RENDERING_BEHAVIOR, "my_source_source", - "airflow.operators.empty.EmptyOperator", + EMPTY_OPERATOR_CLASS, ), ( f"{DbtResourceType.SOURCE.value}.my_folder.my_source", From fe4c60d01a616ac55fab0b43fb33ec50d4600894 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 3 Jun 2026 16:41:38 +0530 Subject: [PATCH 2/5] Centralize version-aware EmptyOperator imports in a compatibility module Add cosmos/airflow/compatibility.py, a lazily resolved (PEP 562) module that exports EmptyOperator under its real name on both Airflow 2 and 3, picking the standard-provider path on Airflow 3 to avoid the DeprecatedImportWarning emitted by the legacy airflow.operators.empty path. Use sites now reference the operator by name instead of a hardcoded path: - watcher_kubernetes.py and _watcher/base.py import EmptyOperator from the module rather than duplicating try/except import blocks. - graph.py derives the dotted operator_class string via get_version_aware_operator_class_path(EmptyOperator). Remove the EMPTY_OPERATOR_CLASS constant from constants.py. --- cosmos/airflow/compatibility.py | 59 ++++++++++++++++++++++++++ cosmos/airflow/graph.py | 8 +++- cosmos/constants.py | 8 ---- cosmos/operators/_watcher/base.py | 7 +-- cosmos/operators/watcher_kubernetes.py | 6 +-- tests/airflow/test_graph.py | 4 +- 6 files changed, 70 insertions(+), 22 deletions(-) create mode 100644 cosmos/airflow/compatibility.py diff --git a/cosmos/airflow/compatibility.py b/cosmos/airflow/compatibility.py new file mode 100644 index 0000000000..140ae0d8f1 --- /dev/null +++ b/cosmos/airflow/compatibility.py @@ -0,0 +1,59 @@ +"""Version-aware imports for Airflow objects whose import path differs across Airflow 2 and 3. + +Names exported here are resolved lazily on first attribute access (PEP 562), so importing +this module is free: the underlying Airflow object is only imported when the name is actually +used. Use sites keep referencing the object by its real name (e.g. ``EmptyOperator``) rather +than a version-specific dotted path. +""" + +from __future__ import annotations + +import importlib +from typing import TYPE_CHECKING + +from packaging.version import Version + +from cosmos.constants import AIRFLOW_VERSION + +if TYPE_CHECKING: + # Resolved for type checkers / IDEs only; no import happens at runtime here. The standard + # provider path is tried first so the type resolves to the real class on Airflow 3 (the + # legacy module is a deprecated shim typed as ``Any`` there). + try: + from airflow.providers.standard.operators.empty import EmptyOperator as EmptyOperator + except ImportError: + from airflow.operators.empty import EmptyOperator as EmptyOperator # type: ignore[no-redef] + +# Single source of truth for where ``EmptyOperator`` lives. The operator moved to the standard +# provider in Airflow 3; the legacy ``airflow.operators.empty`` path still resolves there but +# emits a ``DeprecatedImportWarning``, so on Airflow 3 we select the standard provider path. +_EMPTY_OPERATOR_MODULE = ( + "airflow.operators.empty" if AIRFLOW_VERSION < Version("3.0") else "airflow.providers.standard.operators.empty" +) + +# Maps the exported name to the module it should be imported from for this Airflow version. +_LAZY_IMPORTS = { + "EmptyOperator": _EMPTY_OPERATOR_MODULE, +} + + +def __getattr__(name: str) -> object: + module_path = _LAZY_IMPORTS.get(name) + if module_path is None: + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + return getattr(importlib.import_module(module_path), name) + + +def get_version_aware_operator_class_path(operator: type) -> str: + """Return the fully qualified import path for the given operator class. + + The path is read from the class itself (``__module__`` + ``__name__``), so it reflects + the actual module the class lives in for the running Airflow version. Pair it with the + version-aware classes exported from this module, e.g.:: + + get_version_aware_operator_class_path(EmptyOperator) + + Used where Cosmos stores the operator as a dotted string for later dynamic import + (e.g. ``Task.operator_class``) instead of referencing the class directly. + """ + return f"{operator.__module__}.{operator.__name__}" diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 1160765217..0f4050308a 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -24,12 +24,12 @@ from airflow.utils.task_group import TaskGroup from cosmos import settings +from cosmos.airflow.compatibility import EmptyOperator, get_version_aware_operator_class_path from cosmos.config import ExecutionConfig, RenderConfig from cosmos.constants import ( DBT_SETUP_ASYNC_TASK_ID, DBT_TEARDOWN_ASYNC_TASK_ID, DEFAULT_DBT_RESOURCES, - EMPTY_OPERATOR_CLASS, PRODUCER_WATCHER_DONE_TASK_ID, PRODUCER_WATCHER_TASK_ID, SUPPORTED_BUILD_RESOURCES, @@ -418,7 +418,11 @@ def create_task_metadata( # noqa: C901 args = {"task_display_name": args["task_display_name"]} else: args = {} - return TaskMetadata(id=task_id, operator_class=EMPTY_OPERATOR_CLASS, arguments=args) + return TaskMetadata( + id=task_id, + operator_class=get_version_aware_operator_class_path(EmptyOperator), + arguments=args, + ) else: # DbtResourceType.MODEL, DbtResourceType.SEED and DbtResourceType.SNAPSHOT if node.fqn and len(node.fqn) > 0: args[models_select_key] = f"fqn:{'.'.join(node.fqn)}" diff --git a/cosmos/constants.py b/cosmos/constants.py index d7e3fec299..94f22de9fc 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -9,14 +9,6 @@ AIRFLOW_VERSION = Version(airflow.__version__) -# The EmptyOperator import path changed in Airflow 3: it moved to the standard provider. The legacy -# ``airflow.operators.empty`` path still works in Airflow 3 but emits a DeprecatedImportWarning. -EMPTY_OPERATOR_CLASS = ( - "airflow.operators.empty.EmptyOperator" - if AIRFLOW_VERSION < Version("3.0") - else "airflow.providers.standard.operators.empty.EmptyOperator" -) - BIGQUERY_PROFILE_TYPE = "bigquery" DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml") DBT_PROJECT_FILENAME = "dbt_project.yml" diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index 37853cdaec..8615b0c16a 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -51,7 +51,7 @@ from airflow.sdk import DAG except ImportError: from airflow.models.dag import DAG # type: ignore[assignment] - from airflow.operators.empty import EmptyOperator + from cosmos.airflow.compatibility import EmptyOperator try: from airflow.sdk import TaskGroup @@ -779,10 +779,7 @@ def create_producer_done_task(dag: DAG, task_group: TaskGroup, task_id: str) -> is skipped on retry, this task still succeeds (trigger_rule=NONE_FAILED), preventing the skip from propagating to tasks downstream of the group. """ - try: - from airflow.providers.standard.operators.empty import EmptyOperator - except ImportError: - from airflow.operators.empty import EmptyOperator # type: ignore[no-redef] + from cosmos.airflow.compatibility import EmptyOperator try: from airflow.task.trigger_rule import TriggerRule diff --git a/cosmos/operators/watcher_kubernetes.py b/cosmos/operators/watcher_kubernetes.py index 2538622138..d3cedb3df8 100644 --- a/cosmos/operators/watcher_kubernetes.py +++ b/cosmos/operators/watcher_kubernetes.py @@ -16,12 +16,8 @@ from airflow.exceptions import AirflowException, AirflowSkipException from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback, client_type -try: - from airflow.providers.standard.operators.empty import EmptyOperator -except ImportError: # pragma: no cover - from airflow.operators.empty import EmptyOperator # type: ignore[no-redef] - from cosmos.airflow._override import CosmosKubernetesPodManager +from cosmos.airflow.compatibility import EmptyOperator from cosmos.log import get_logger from cosmos.operators._watcher.base import BaseConsumerSensor, store_dbt_resource_status_from_log from cosmos.operators._watcher.xcom import ( diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 68f8d63975..3200e5f8e1 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -18,6 +18,7 @@ from airflow.operators.empty import EmptyOperator from airflow.utils.task_group import TaskGroup +from cosmos.airflow.compatibility import get_version_aware_operator_class_path from cosmos.airflow.graph import ( _add_teardown_task, _add_watcher_producer_task, @@ -34,7 +35,6 @@ ) from cosmos.config import ProfileConfig, RenderConfig from cosmos.constants import ( - EMPTY_OPERATOR_CLASS, SUPPORTED_BUILD_RESOURCES, DbtResourceType, ExecutionMode, @@ -692,7 +692,7 @@ def test_create_task_metadata_model_use_task_group(caplog): False, SOURCE_RENDERING_BEHAVIOR, "my_source_source", - EMPTY_OPERATOR_CLASS, + get_version_aware_operator_class_path(EmptyOperator), ), ( f"{DbtResourceType.SOURCE.value}.my_folder.my_source", From 93b8b57cc1bf4b0aba5beba3bcb09fde8a970aa2 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Wed, 3 Jun 2026 16:58:28 +0530 Subject: [PATCH 3/5] Compare on Airflow major version and cache lazy imports Select the EmptyOperator module via AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION instead of comparing against Version("3.0"), so Airflow 3 pre-releases (e.g. 3.0.0rc1) are treated as Airflow 3 and the standard-provider path is chosen. Cache each resolved symbol in the module namespace from __getattr__ so repeated attribute access skips the lazy lookup. --- cosmos/airflow/compatibility.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/cosmos/airflow/compatibility.py b/cosmos/airflow/compatibility.py index 140ae0d8f1..f0bdd9f916 100644 --- a/cosmos/airflow/compatibility.py +++ b/cosmos/airflow/compatibility.py @@ -11,9 +11,7 @@ import importlib from typing import TYPE_CHECKING -from packaging.version import Version - -from cosmos.constants import AIRFLOW_VERSION +from cosmos.constants import _AIRFLOW3_MAJOR_VERSION, AIRFLOW_VERSION if TYPE_CHECKING: # Resolved for type checkers / IDEs only; no import happens at runtime here. The standard @@ -27,8 +25,11 @@ # Single source of truth for where ``EmptyOperator`` lives. The operator moved to the standard # provider in Airflow 3; the legacy ``airflow.operators.empty`` path still resolves there but # emits a ``DeprecatedImportWarning``, so on Airflow 3 we select the standard provider path. +# Compare on the major version so Airflow 3 pre-releases (e.g. 3.0.0rc1) are treated as Airflow 3. _EMPTY_OPERATOR_MODULE = ( - "airflow.operators.empty" if AIRFLOW_VERSION < Version("3.0") else "airflow.providers.standard.operators.empty" + "airflow.operators.empty" + if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION + else "airflow.providers.standard.operators.empty" ) # Maps the exported name to the module it should be imported from for this Airflow version. @@ -41,7 +42,11 @@ def __getattr__(name: str) -> object: module_path = _LAZY_IMPORTS.get(name) if module_path is None: raise AttributeError(f"module {__name__!r} has no attribute {name!r}") - return getattr(importlib.import_module(module_path), name) + # Cache the resolved symbol in the module namespace so subsequent attribute access skips + # __getattr__ entirely (PEP 562 only invokes it for names missing from globals()). + resolved = getattr(importlib.import_module(module_path), name) + globals()[name] = resolved + return resolved def get_version_aware_operator_class_path(operator: type) -> str: From b91b95fa4d4cf50c8d02f6688c31dfd8409d6572 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 4 Jun 2026 16:51:24 +0530 Subject: [PATCH 4/5] Resolve EmptyOperator via a direct version-aware import Replace the lazy PEP 562 compatibility module with a plain version-gated import of EmptyOperator and an EMPTY_OPERATOR_CLASS_PATH constant derived from the imported class. The class is a real importable name, so type checkers resolve it natively and graph.py serializes the operator path via the constant instead of a runtime helper. --- cosmos/airflow/compatibility.py | 72 +++++++++------------------------ cosmos/airflow/graph.py | 4 +- tests/airflow/test_graph.py | 4 +- 3 files changed, 22 insertions(+), 58 deletions(-) diff --git a/cosmos/airflow/compatibility.py b/cosmos/airflow/compatibility.py index f0bdd9f916..f5b2644607 100644 --- a/cosmos/airflow/compatibility.py +++ b/cosmos/airflow/compatibility.py @@ -1,64 +1,28 @@ """Version-aware imports for Airflow objects whose import path differs across Airflow 2 and 3. -Names exported here are resolved lazily on first attribute access (PEP 562), so importing -this module is free: the underlying Airflow object is only imported when the name is actually -used. Use sites keep referencing the object by its real name (e.g. ``EmptyOperator``) rather -than a version-specific dotted path. +``EmptyOperator`` moved to the standard provider in Airflow 3; the legacy +``airflow.operators.empty`` path still resolves there but emits a ``DeprecatedImportWarning``, +so on Airflow 3 we import it from the standard provider. Compare on the major version so that +Airflow 3 pre-releases (e.g. 3.0.0rc1) are treated as Airflow 3. """ from __future__ import annotations -import importlib -from typing import TYPE_CHECKING - from cosmos.constants import _AIRFLOW3_MAJOR_VERSION, AIRFLOW_VERSION -if TYPE_CHECKING: - # Resolved for type checkers / IDEs only; no import happens at runtime here. The standard - # provider path is tried first so the type resolves to the real class on Airflow 3 (the - # legacy module is a deprecated shim typed as ``Any`` there). +if AIRFLOW_VERSION.major >= _AIRFLOW3_MAJOR_VERSION: try: from airflow.providers.standard.operators.empty import EmptyOperator as EmptyOperator - except ImportError: - from airflow.operators.empty import EmptyOperator as EmptyOperator # type: ignore[no-redef] - -# Single source of truth for where ``EmptyOperator`` lives. The operator moved to the standard -# provider in Airflow 3; the legacy ``airflow.operators.empty`` path still resolves there but -# emits a ``DeprecatedImportWarning``, so on Airflow 3 we select the standard provider path. -# Compare on the major version so Airflow 3 pre-releases (e.g. 3.0.0rc1) are treated as Airflow 3. -_EMPTY_OPERATOR_MODULE = ( - "airflow.operators.empty" - if AIRFLOW_VERSION.major < _AIRFLOW3_MAJOR_VERSION - else "airflow.providers.standard.operators.empty" -) - -# Maps the exported name to the module it should be imported from for this Airflow version. -_LAZY_IMPORTS = { - "EmptyOperator": _EMPTY_OPERATOR_MODULE, -} - - -def __getattr__(name: str) -> object: - module_path = _LAZY_IMPORTS.get(name) - if module_path is None: - raise AttributeError(f"module {__name__!r} has no attribute {name!r}") - # Cache the resolved symbol in the module namespace so subsequent attribute access skips - # __getattr__ entirely (PEP 562 only invokes it for names missing from globals()). - resolved = getattr(importlib.import_module(module_path), name) - globals()[name] = resolved - return resolved - - -def get_version_aware_operator_class_path(operator: type) -> str: - """Return the fully qualified import path for the given operator class. - - The path is read from the class itself (``__module__`` + ``__name__``), so it reflects - the actual module the class lives in for the running Airflow version. Pair it with the - version-aware classes exported from this module, e.g.:: - - get_version_aware_operator_class_path(EmptyOperator) - - Used where Cosmos stores the operator as a dotted string for later dynamic import - (e.g. ``Task.operator_class``) instead of referencing the class directly. - """ - return f"{operator.__module__}.{operator.__name__}" + except ImportError as exc: + raise ImportError( + "Cosmos on Airflow 3 requires `apache-airflow-providers-standard` to import `EmptyOperator`." + ) from exc +else: + # The redundant ``as EmptyOperator`` alias marks the name as an explicit re-export for type + # checkers; the ``no-redef`` ignore silences the duplicate binding mypy sees across branches. + from airflow.operators.empty import EmptyOperator as EmptyOperator # type: ignore[no-redef] + +# Dotted import path for the version-appropriate EmptyOperator, for the places that store the +# operator as a string for later dynamic import (e.g. ``Task.operator_class``) rather than +# referencing the class. Derived from the class itself, so it always matches the imported one. +EMPTY_OPERATOR_CLASS_PATH = f"{EmptyOperator.__module__}.{EmptyOperator.__name__}" diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 0f4050308a..d7370abb88 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -24,7 +24,7 @@ from airflow.utils.task_group import TaskGroup from cosmos import settings -from cosmos.airflow.compatibility import EmptyOperator, get_version_aware_operator_class_path +from cosmos.airflow.compatibility import EMPTY_OPERATOR_CLASS_PATH from cosmos.config import ExecutionConfig, RenderConfig from cosmos.constants import ( DBT_SETUP_ASYNC_TASK_ID, @@ -420,7 +420,7 @@ def create_task_metadata( # noqa: C901 args = {} return TaskMetadata( id=task_id, - operator_class=get_version_aware_operator_class_path(EmptyOperator), + operator_class=EMPTY_OPERATOR_CLASS_PATH, arguments=args, ) else: # DbtResourceType.MODEL, DbtResourceType.SEED and DbtResourceType.SNAPSHOT diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 3200e5f8e1..7a7c2a3711 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -18,7 +18,7 @@ from airflow.operators.empty import EmptyOperator from airflow.utils.task_group import TaskGroup -from cosmos.airflow.compatibility import get_version_aware_operator_class_path +from cosmos.airflow.compatibility import EMPTY_OPERATOR_CLASS_PATH from cosmos.airflow.graph import ( _add_teardown_task, _add_watcher_producer_task, @@ -692,7 +692,7 @@ def test_create_task_metadata_model_use_task_group(caplog): False, SOURCE_RENDERING_BEHAVIOR, "my_source_source", - get_version_aware_operator_class_path(EmptyOperator), + EMPTY_OPERATOR_CLASS_PATH, ), ( f"{DbtResourceType.SOURCE.value}.my_folder.my_source", From c0830b2f63937c5192c755ed686c6cfa1f3a5231 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 4 Jun 2026 17:23:52 +0530 Subject: [PATCH 5/5] Exclude the unreachable provider-missing guard from coverage --- cosmos/airflow/compatibility.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/airflow/compatibility.py b/cosmos/airflow/compatibility.py index f5b2644607..9ee15816e6 100644 --- a/cosmos/airflow/compatibility.py +++ b/cosmos/airflow/compatibility.py @@ -13,7 +13,7 @@ if AIRFLOW_VERSION.major >= _AIRFLOW3_MAJOR_VERSION: try: from airflow.providers.standard.operators.empty import EmptyOperator as EmptyOperator - except ImportError as exc: + except ImportError as exc: # pragma: no cover raise ImportError( "Cosmos on Airflow 3 requires `apache-airflow-providers-standard` to import `EmptyOperator`." ) from exc