Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions cosmos/airflow/compatibility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Version-aware imports for Airflow objects whose import path differs across Airflow 2 and 3.

``EmptyOperator`` moved to the standard provider in Airflow 3; the legacy
``airflow.operators.empty`` path still resolves there but emits a ``DeprecatedImportWarning``,
so on Airflow 3 we import it from the standard provider. Compare on the major version so that
Airflow 3 pre-releases (e.g. 3.0.0rc1) are treated as Airflow 3.
"""

from __future__ import annotations

from cosmos.constants import _AIRFLOW3_MAJOR_VERSION, AIRFLOW_VERSION

if AIRFLOW_VERSION.major >= _AIRFLOW3_MAJOR_VERSION:
try:
from airflow.providers.standard.operators.empty import EmptyOperator as EmptyOperator
except ImportError as exc: # pragma: no cover
raise ImportError(
"Cosmos on Airflow 3 requires `apache-airflow-providers-standard` to import `EmptyOperator`."
) from exc
else:
# The redundant ``as EmptyOperator`` alias marks the name as an explicit re-export for type
# checkers; the ``no-redef`` ignore silences the duplicate binding mypy sees across branches.
from airflow.operators.empty import EmptyOperator as EmptyOperator # type: ignore[no-redef]

# Dotted import path for the version-appropriate EmptyOperator, for the places that store the
# operator as a string for later dynamic import (e.g. ``Task.operator_class``) rather than
# referencing the class. Derived from the class itself, so it always matches the imported one.
EMPTY_OPERATOR_CLASS_PATH = f"{EmptyOperator.__module__}.{EmptyOperator.__name__}"
Comment thread
pankajkoti marked this conversation as resolved.
7 changes: 6 additions & 1 deletion cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from airflow.utils.task_group import TaskGroup

from cosmos import settings
from cosmos.airflow.compatibility import EMPTY_OPERATOR_CLASS_PATH
from cosmos.config import ExecutionConfig, RenderConfig
from cosmos.constants import (
DBT_SETUP_ASYNC_TASK_ID,
Expand Down Expand Up @@ -417,7 +418,11 @@ def create_task_metadata( # noqa: C901
args = {"task_display_name": args["task_display_name"]}
else:
args = {}
return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator", arguments=args)
return TaskMetadata(
id=task_id,
operator_class=EMPTY_OPERATOR_CLASS_PATH,
arguments=args,
)
else: # DbtResourceType.MODEL, DbtResourceType.SEED and DbtResourceType.SNAPSHOT
if node.fqn and len(node.fqn) > 0:
args[models_select_key] = f"fqn:{'.'.join(node.fqn)}"
Expand Down
7 changes: 2 additions & 5 deletions cosmos/operators/_watcher/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from airflow.sdk import DAG
except ImportError:
from airflow.models.dag import DAG # type: ignore[assignment]
from airflow.operators.empty import EmptyOperator
from cosmos.airflow.compatibility import EmptyOperator
Comment thread
pankajkoti marked this conversation as resolved.

try:
from airflow.sdk import TaskGroup
Expand Down Expand Up @@ -779,10 +779,7 @@ def create_producer_done_task(dag: DAG, task_group: TaskGroup, task_id: str) ->
is skipped on retry, this task still succeeds (trigger_rule=NONE_FAILED), preventing
the skip from propagating to tasks downstream of the group.
"""
try:
from airflow.providers.standard.operators.empty import EmptyOperator
except ImportError:
from airflow.operators.empty import EmptyOperator # type: ignore[no-redef]
from cosmos.airflow.compatibility import EmptyOperator

try:
from airflow.task.trigger_rule import TriggerRule
Expand Down
6 changes: 1 addition & 5 deletions cosmos/operators/watcher_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback, client_type

try:
from airflow.providers.standard.operators.empty import EmptyOperator
except ImportError: # pragma: no cover
from airflow.operators.empty import EmptyOperator # type: ignore[no-redef]

from cosmos.airflow._override import CosmosKubernetesPodManager
from cosmos.airflow.compatibility import EmptyOperator
from cosmos.log import get_logger
from cosmos.operators._watcher.base import BaseConsumerSensor, store_dbt_resource_status_from_log
from cosmos.operators._watcher.xcom import (
Expand Down
3 changes: 2 additions & 1 deletion tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup

from cosmos.airflow.compatibility import EMPTY_OPERATOR_CLASS_PATH
from cosmos.airflow.graph import (
_add_teardown_task,
_add_watcher_producer_task,
Expand Down Expand Up @@ -691,7 +692,7 @@ def test_create_task_metadata_model_use_task_group(caplog):
False,
SOURCE_RENDERING_BEHAVIOR,
"my_source_source",
"airflow.operators.empty.EmptyOperator",
EMPTY_OPERATOR_CLASS_PATH,
),
(
f"{DbtResourceType.SOURCE.value}.my_folder.my_source",
Expand Down
Loading