Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,34 @@ def create_task_metadata( # noqa: C901
# `AIRFLOW__COSMOS__PRE_DBT_FUSION=1`.
models_select_key = "models" if settings.pre_dbt_fusion else "select"

if node.has_ephemeral_materialization and render_config.ephemeral_models_as_empty_operator:
# Ephemeral models are inlined as CTEs into downstream models and never written to the
# warehouse, so running them via a dbt operator (whether `dbt run` or `dbt build`) is a
# no-op. Render them as empty operators while keeping the node in the graph so the
# dependency chain passing through it is preserved. This check sits before the
# TestBehavior.BUILD branch so it is honored regardless of the test behavior.
is_build = (
render_config.test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES
)
task_id, args = _get_task_id_and_args(
node=node,
args=args,
use_task_group=use_task_group,
normalize_task_id=render_config.normalize_task_id,
normalize_task_display_name=render_config.normalize_task_display_name,
resource_suffix=resource_suffix,
include_resource_type=is_build,
execution_mode=execution_mode,
)
# EmptyOperator does not accept custom dbt parameters (e.g. profile_args); keep only the display name.
args = {"task_display_name": args["task_display_name"]} if "task_display_name" in args else {}
return TaskMetadata(
id=task_id,
owner=node.owner if render_config.enable_owner_inheritance else "",
operator_class=EMPTY_OPERATOR_CLASS_PATH,
arguments=args,
)

if render_config.test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES:
if node.fqn and len(node.fqn) > 0:
args[models_select_key] = f"fqn:{'.'.join(node.fqn)}"
Expand Down
2 changes: 2 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class RenderConfig:
:param normalize_task_display_name: A callable that takes a dbt node as input and returns the task display name. This allows users to assign a custom task display name separate from the node ID.
:param should_detach_multiple_parents_tests: A boolean that allows users to decide whether to run tests with multiple parent dependencies in separate tasks.
:param enable_owner_inheritance: A boolean that allows users to enable the owner inheritance from dbt models to airflow tasks. Defaults to True.
:param ephemeral_models_as_empty_operator: A boolean that controls how ephemeral models are rendered. Ephemeral models are inlined as CTEs into downstream models and never written to the warehouse, so running them via a dbt operator is effectively a no-op. When True (default), they are rendered as ``EmptyOperator`` tasks, which preserves the dependency chain that passes through them while avoiding wasted dbt invocations and decluttering the DAG. Set to False to render them as regular dbt run tasks.
"""

emit_datasets: bool = True
Expand Down Expand Up @@ -106,6 +107,7 @@ class RenderConfig:
normalize_task_display_name: Callable[..., Any] | None = None
should_detach_multiple_parents_tests: bool = False
enable_owner_inheritance: bool | None = True
ephemeral_models_as_empty_operator: bool = True

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.env_vars:
Expand Down
3 changes: 3 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

AIRFLOW_VERSION = Version(airflow.__version__)

# dbt materialization for models that are inlined as a CTE into downstream models and never written to the warehouse.
DBT_EPHEMERAL_MATERIALIZATION = "ephemeral"

BIGQUERY_PROFILE_TYPE = "bigquery"
DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
DBT_PROJECT_FILENAME = "dbt_project.yml"
Expand Down
6 changes: 6 additions & 0 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
)
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import (
DBT_EPHEMERAL_MATERIALIZATION,
DBT_LOG_DIR_NAME,
DBT_LOG_FILENAME,
DBT_LOG_PATH_ENVVAR,
Expand Down Expand Up @@ -112,6 +113,11 @@ def file_path(self) -> Path:
"""Combined path to the node's file (path_base / original_file_path)."""
return self.path_base / self.original_file_path

@property
def has_ephemeral_materialization(self) -> bool:
"""Whether the node is materialized as ephemeral (inlined as a CTE, never written to the warehouse)."""
return str(self.config.get("materialized") or "").lower() == DBT_EPHEMERAL_MATERIALIZATION

@property
def meta(self) -> dict[str, Any]:
"""
Expand Down
3 changes: 3 additions & 0 deletions docs/guides/run_dbt/callbacks/callbacks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ Callbacks
.. note::
Feature available when using ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV``.

.. note::
Since Cosmos v1.15.0, ephemeral dbt models are rendered as ``EmptyOperator`` tasks by default (``RenderConfig.ephemeral_models_as_empty_operator=True``). Because no dbt command runs for these tasks, callbacks are not invoked for ephemeral models. Set ``ephemeral_models_as_empty_operator=False`` to render them as regular dbt run tasks if you rely on callbacks for them.

Most dbt commands output `one or more artifacts <https://docs.getdbt.com/reference/artifacts/dbt-artifacts>`_
such as ``semantic_manifest.json``, ``manifest.json``, ``catalog.json``, ``run_results.json``, and ``sources.json`` in the target folder, which by default resides in the dbt project's root folder.
However, since Cosmos creates temporary folders to run each dbt command, this folder vanishes by the end of the Cosmos task execution,
Expand Down
4 changes: 4 additions & 0 deletions docs/guides/run_dbt/customization/scheduling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ By default, if using a version between Airflow 2.4 or higher, Cosmos emits `Airf

This feature is only available for ``ExecutionMode.LOCAL``, ``ExecutionMode.VIRTUALENV``, ``ExecutionMode.WATCHER`` and ``ExecutionMode.AIRFLOW_ASYNC``.

.. note::

Since Cosmos v1.15.0, ephemeral dbt models are rendered as ``EmptyOperator`` tasks by default (``RenderConfig.ephemeral_models_as_empty_operator=True``). These tasks do not run dbt and therefore do not emit datasets, so a DAG scheduled on an ephemeral model's dataset will not be triggered by it. Set ``ephemeral_models_as_empty_operator=False`` to render ephemeral models as regular dbt run tasks and keep emitting their datasets.

Cosmos calculates these URIs during the task execution, by using the library `OpenLineage Integration Common <https://pypi.org/project/openlineage-integration-common/>`_.

This block illustrates a Cosmos-generated dataset for Postgres:
Expand Down
1 change: 1 addition & 0 deletions docs/guides/translate_dbt_to_airflow/render-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The ``RenderConfig`` class takes the following arguments:
- ``normalize_task_display_name``: This function allows users to set a custom user-defined function to alter the display name independently of the model name. This way, the task_id can be preserved while the model display name is modified.
- ``should_detach_multiple_parents_tests``: A boolean to control if tests that depend on multiple parents should be run as standalone tasks. See `Testing Behavior <testing-behavior.html>`_ for more information.
- ``enable_owner_inheritance``: (introduced in 1.10.2) A boolean to control if dbt owners should be imported as part of the airflow DAG owners. Defaults to True.
- ``ephemeral_models_as_empty_operator``: (new in v1.15.0) A boolean to control how ephemeral models are rendered. Ephemeral models are inlined as CTEs into downstream models and never written to the warehouse, so running them via a dbt operator is effectively a no-op. When ``True`` (default), they are rendered as ``EmptyOperator`` tasks, which preserves the dependency chain that passes through them while avoiding wasted dbt invocations and decluttering the DAG. Because the ``EmptyOperator`` does not run dbt, behaviour tied to the ephemeral model's run task no longer occurs: no Airflow Dataset/Asset is emitted for it (so dataset-scheduled DAGs keyed on it are not triggered), task callbacks are not invoked, no OpenLineage events are produced, and per-node operator arguments (e.g. ``profile_args``) do not apply. Set to ``False`` to render them as regular dbt run tasks.
- ``node_conversion_by_task_group``: A boolean to control if node_converters are used at the task group level (ex. converting models with test_behavior=AFTER_EACH means the entire task group is converted including the run task and the test task), or the individual task level (gives more granularity for converting just the run tasks or just the test tasks). Defaults to True.
Comment thread
pankajkoti marked this conversation as resolved.
- ``group_nodes_by_folder``: When enabled, groups nodes by folder structure, creating a ``TaskGroup`` per resource type and folder. Disabled by default.

Expand Down
79 changes: 79 additions & 0 deletions tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,85 @@ def test_create_task_metadata_model_use_task_group(caplog):
assert metadata.id == "run"


def _ephemeral_node(owner=None):
config = {"materialized": "ephemeral"}
if owner is not None:
config["meta"] = {"owner": owner}
return DbtNode(
unique_id=f"{DbtResourceType.MODEL.value}.my_folder.my_ephemeral",
resource_type=DbtResourceType.MODEL,
depends_on=[],
path_base=Path("."),
original_file_path=Path("."),
tags=[],
config=config,
)


def test_create_task_metadata_ephemeral_model_as_empty_operator_by_default():
"""Ephemeral models are rendered as EmptyOperator by default, keeping the task id of the run task."""
metadata = create_task_metadata(
_ephemeral_node(), execution_mode=ExecutionMode.LOCAL, args={}, dbt_dag_task_group_identifier=""
)
assert metadata.id == "my_ephemeral_run"
assert metadata.operator_class == EMPTY_OPERATOR_CLASS_PATH
assert metadata.arguments == {}


def test_create_task_metadata_ephemeral_model_disabled_renders_dbt_run():
"""With the flag disabled, ephemeral models render as regular dbt run tasks."""
metadata = create_task_metadata(
_ephemeral_node(),
execution_mode=ExecutionMode.LOCAL,
args={},
dbt_dag_task_group_identifier="",
render_config=RenderConfig(ephemeral_models_as_empty_operator=False),
)
assert metadata.id == "my_ephemeral_run"
assert metadata.operator_class == "cosmos.operators.local.DbtRunLocalOperator"
assert metadata.arguments == {"select": "my_ephemeral"}

Comment thread
pankajkoti marked this conversation as resolved.

def test_create_task_metadata_ephemeral_model_as_empty_operator_in_build_mode():
"""Ephemeral models render as EmptyOperator even under TestBehavior.BUILD, where they would
otherwise be rendered as dbt build tasks."""
metadata = create_task_metadata(
_ephemeral_node(),
execution_mode=ExecutionMode.LOCAL,
args={},
dbt_dag_task_group_identifier="",
render_config=RenderConfig(test_behavior=TestBehavior.BUILD),
)
assert metadata.id == "my_ephemeral_model_build"
assert metadata.operator_class == EMPTY_OPERATOR_CLASS_PATH
assert metadata.arguments == {}


def test_create_task_metadata_ephemeral_model_disabled_renders_dbt_build_in_build_mode():
"""With the flag disabled under TestBehavior.BUILD, ephemeral models render as dbt build tasks."""
metadata = create_task_metadata(
_ephemeral_node(),
execution_mode=ExecutionMode.LOCAL,
args={},
dbt_dag_task_group_identifier="",
render_config=RenderConfig(test_behavior=TestBehavior.BUILD, ephemeral_models_as_empty_operator=False),
)
assert metadata.id == "my_ephemeral_model_build"
assert metadata.operator_class == "cosmos.operators.local.DbtBuildLocalOperator"


def test_create_task_metadata_ephemeral_empty_operator_inherits_owner():
"""The ephemeral EmptyOperator inherits the dbt model owner when owner inheritance is enabled (default)."""
metadata = create_task_metadata(
_ephemeral_node(owner="dbt-owner"),
execution_mode=ExecutionMode.LOCAL,
args={},
dbt_dag_task_group_identifier="",
)
assert metadata.operator_class == EMPTY_OPERATOR_CLASS_PATH
assert metadata.owner == "dbt-owner"


@pytest.mark.parametrize(
"unique_id, resource_type, has_freshness, source_rendering_behavior, expected_id, expected_operator_class",
[
Expand Down
5 changes: 5 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ def test_render_config_source_rendering_behavior_none_warns_and_normalizes():
assert config.source_rendering_behavior == SourceRenderingBehavior.NONE


def test_render_config_ephemeral_models_as_empty_operator_defaults_to_true():
"""Ephemeral models are rendered as EmptyOperator by default."""
assert RenderConfig().ephemeral_models_as_empty_operator is True


@pytest.mark.parametrize(
"execution_mode, invocation_mode, expectation",
[
Expand Down
Loading