Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from cosmos.config import RenderConfig
from cosmos.constants import (
AIRFLOW_EMPTY_OPERATOR_CLASS_IMPORT_PATH,
DBT_SETUP_ASYNC_TASK_ID,
DBT_TEARDOWN_ASYNC_TASK_ID,
DEFAULT_DBT_RESOURCES,
Expand Down Expand Up @@ -232,6 +233,15 @@
return dbt_resource_to_class


def _create_empty_operator_task_metadata(task_id: str, args: dict[str, Any]) -> TaskMetadata:
# empty operator does not accept custom parameters (e.g., profile_args). recreate the args.
if "task_display_name" in args:
args = {"task_display_name": args["task_display_name"]}
else:
args = {}
return TaskMetadata(id=task_id, operator_class=AIRFLOW_EMPTY_OPERATOR_CLASS_IMPORT_PATH, arguments=args)


def create_task_metadata(
node: DbtNode,
execution_mode: ExecutionMode,
Expand Down Expand Up @@ -278,6 +288,9 @@
)
elif node.resource_type == DbtResourceType.MODEL:
task_id, args = _get_task_id_and_args(node, args, use_task_group, normalize_task_id, "run")
if node.config.get("materialized") == "ephemeral":
# render models with ephemeral materialization as empty operators
return _create_empty_operator_task_metadata(task_id, args)

Check warning on line 293 in cosmos/airflow/graph.py

View check run for this annotation

Codecov / codecov/patch

cosmos/airflow/graph.py#L293

Added line #L293 was not covered by tests
elif node.resource_type == DbtResourceType.SOURCE:
args["on_warning_callback"] = on_warning_callback

Expand All @@ -290,14 +303,12 @@
args["select"] = f"source:{node.resource_name}"
args.pop("models")
task_id, args = _get_task_id_and_args(node, args, use_task_group, normalize_task_id, "source")
if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL:
if node.has_freshness is False and (
source_rendering_behavior == SourceRenderingBehavior.ALL
or (SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS and node.has_test is True)
):
# render sources without freshness as empty operators
# empty operator does not accept custom parameters (e.g., profile_args). recreate the args.
if "task_display_name" in args:
args = {"task_display_name": args["task_display_name"]}
else:
args = {}
return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator", arguments=args)
return _create_empty_operator_task_metadata(task_id, args)
else:
task_id, args = _get_task_id_and_args(
node, args, use_task_group, normalize_task_id, node.resource_type.value
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import aenum
from packaging.version import Version

AIRFLOW_EMPTY_OPERATOR_CLASS_IMPORT_PATH = "airflow.operators.empty.EmptyOperator"
BIGQUERY_PROFILE_TYPE = "bigquery"
DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
Expand Down
7 changes: 6 additions & 1 deletion cosmos/core/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos.constants import AIRFLOW_EMPTY_OPERATOR_CLASS_IMPORT_PATH
from cosmos.core.graph.entities import Task
from cosmos.log import get_logger

Expand Down Expand Up @@ -37,7 +38,11 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None)
task_id=task.id,
dag=dag,
task_group=task_group,
**({} if class_name == "EmptyOperator" else {"extra_context": task.extra_context}),
**(
{}
if class_name == AIRFLOW_EMPTY_OPERATOR_CLASS_IMPORT_PATH.split(".")[-1]
else {"extra_context": task.extra_context}
),
**task_kwargs,
)

Expand Down
3 changes: 2 additions & 1 deletion cosmos/core/graph/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List

from cosmos.constants import AIRFLOW_EMPTY_OPERATOR_CLASS_IMPORT_PATH
from cosmos.log import get_logger

logger = get_logger(__name__)
Expand Down Expand Up @@ -58,6 +59,6 @@ class Task(CosmosEntity):
"""

owner: str = ""
operator_class: str = "airflow.operators.empty.EmptyOperator"
operator_class: str = AIRFLOW_EMPTY_OPERATOR_CLASS_IMPORT_PATH
arguments: Dict[str, Any] = field(default_factory=dict)
extra_context: Dict[str, Any] = field(default_factory=dict)