From da79f399138a9ed0f7996244fb0108a8881ab885 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 2 Jun 2026 17:42:59 +0530 Subject: [PATCH 1/5] Use a version-aware EmptyOperator import path via a shared constant Add a centralized EMPTY_OPERATOR_CLASS constant that resolves to airflow.providers.standard.operators.empty.EmptyOperator on Airflow 3 and the legacy airflow.operators.empty.EmptyOperator on Airflow 2. Use it for the source-without-freshness rendering path, which previously hardcoded the legacy path and emitted a DeprecatedImportWarning on Airflow 3. --- cosmos/airflow/graph.py | 3 ++- cosmos/constants.py | 8 ++++++++ tests/airflow/test_graph.py | 3 ++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index ee27054889..c6527a17a0 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -29,6 +29,7 @@ DBT_SETUP_ASYNC_TASK_ID, DBT_TEARDOWN_ASYNC_TASK_ID, DEFAULT_DBT_RESOURCES, + EMPTY_OPERATOR_CLASS, PRODUCER_WATCHER_DONE_TASK_ID, PRODUCER_WATCHER_TASK_ID, SUPPORTED_BUILD_RESOURCES, @@ -417,7 +418,7 @@ def create_task_metadata( # noqa: C901 args = {"task_display_name": args["task_display_name"]} else: args = {} - return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator", arguments=args) + return TaskMetadata(id=task_id, operator_class=EMPTY_OPERATOR_CLASS, arguments=args) else: # DbtResourceType.MODEL, DbtResourceType.SEED and DbtResourceType.SNAPSHOT if node.fqn and len(node.fqn) > 0: args[models_select_key] = f"fqn:{'.'.join(node.fqn)}" diff --git a/cosmos/constants.py b/cosmos/constants.py index 77ab291479..1bb1b36060 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -9,6 +9,14 @@ AIRFLOW_VERSION = Version(airflow.__version__) +# The EmptyOperator import path changed in Airflow 3: it moved to the standard provider. The legacy +# ``airflow.operators.empty`` path still works in Airflow 3 but emits a DeprecatedImportWarning. +EMPTY_OPERATOR_CLASS = ( + "airflow.operators.empty.EmptyOperator" + if AIRFLOW_VERSION < Version("3.0") + else "airflow.providers.standard.operators.empty.EmptyOperator" +) + BIGQUERY_PROFILE_TYPE = "bigquery" DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml") DBT_PROJECT_FILENAME = "dbt_project.yml" diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 388fd78350..68f8d63975 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -34,6 +34,7 @@ ) from cosmos.config import ProfileConfig, RenderConfig from cosmos.constants import ( + EMPTY_OPERATOR_CLASS, SUPPORTED_BUILD_RESOURCES, DbtResourceType, ExecutionMode, @@ -691,7 +692,7 @@ def test_create_task_metadata_model_use_task_group(caplog): False, SOURCE_RENDERING_BEHAVIOR, "my_source_source", - "airflow.operators.empty.EmptyOperator", + EMPTY_OPERATOR_CLASS, ), ( f"{DbtResourceType.SOURCE.value}.my_folder.my_source", From b3630ca32ec6565be6b0db629438f3c10dd22abc Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 2 Jun 2026 17:43:31 +0530 Subject: [PATCH 2/5] Render ephemeral dbt models as EmptyOperator by default Ephemeral models are inlined as CTEs into downstream models and are never written to the warehouse, so running them through a dbt operator is a no-op that still spends worker time and issues warehouse metadata queries. Add RenderConfig.ephemeral_models_as_empty_operator (default True) so that ephemeral models render as EmptyOperator tasks. The node stays in the graph, preserving the dependency chain that passes through it, while avoiding the wasted dbt invocation and decluttering the DAG. Set it to False to keep rendering ephemeral models as regular dbt run tasks. --- cosmos/airflow/graph.py | 9 +++++ cosmos/config.py | 2 ++ cosmos/constants.py | 3 ++ cosmos/dbt/graph.py | 6 ++++ .../render-config.rst | 1 + tests/airflow/test_graph.py | 36 +++++++++++++++++++ tests/test_config.py | 5 +++ 7 files changed, 62 insertions(+) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index c6527a17a0..f7ccbb1713 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -434,6 +434,15 @@ def create_task_metadata( # noqa: C901 execution_mode=execution_mode, ) + if node.has_ephemeral_materialization and render_config.ephemeral_models_as_empty_operator: + # Ephemeral models are inlined as CTEs into downstream models and never written to the + # warehouse, so running them via a dbt operator is a no-op. Render them as empty operators + # to avoid wasted dbt invocations while keeping the node in the graph so that the dependency + # chain passing through it is preserved. EmptyOperator does not accept custom parameters + # (e.g. profile_args), so recreate the args keeping only the display name when present. + args = {"task_display_name": args["task_display_name"]} if "task_display_name" in args else {} + return TaskMetadata(id=task_id, operator_class=EMPTY_OPERATOR_CLASS, arguments=args) + _override_profile_if_needed(args, node.profile_config_to_override) task_owner = node.owner diff --git a/cosmos/config.py b/cosmos/config.py index deb5f67bbd..4b337ca51e 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -79,6 +79,7 @@ class RenderConfig: :param normalize_task_display_name: A callable that takes a dbt node as input and returns the task display name. This allows users to assign a custom task display name separate from the node ID. :param should_detach_multiple_parents_tests: A boolean that allows users to decide whether to run tests with multiple parent dependencies in separate tasks. :param enable_owner_inheritance: A boolean that allows users to enable the owner inheritance from dbt models to airflow tasks. Defaults to True. + :param ephemeral_models_as_empty_operator: A boolean that controls how ephemeral models are rendered. Ephemeral models are inlined as CTEs into downstream models and never written to the warehouse, so running them via a dbt operator is effectively a no-op. When True (default), they are rendered as ``EmptyOperator`` tasks, which preserves the dependency chain that passes through them while avoiding wasted dbt invocations and decluttering the DAG. Set to False to render them as regular dbt run tasks. """ emit_datasets: bool = True @@ -106,6 +107,7 @@ class RenderConfig: normalize_task_display_name: Callable[..., Any] | None = None should_detach_multiple_parents_tests: bool = False enable_owner_inheritance: bool | None = True + ephemeral_models_as_empty_operator: bool = True def __post_init__(self, dbt_project_path: str | Path | None) -> None: if self.env_vars: diff --git a/cosmos/constants.py b/cosmos/constants.py index 1bb1b36060..8003cb37d3 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -17,6 +17,9 @@ else "airflow.providers.standard.operators.empty.EmptyOperator" ) +# dbt materialization for models that are inlined as a CTE into downstream models and never written to the warehouse. +DBT_EPHEMERAL_MATERIALIZATION = "ephemeral" + BIGQUERY_PROFILE_TYPE = "bigquery" DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml") DBT_PROJECT_FILENAME = "dbt_project.yml" diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 37ee6daa25..31e3db07d4 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -43,6 +43,7 @@ ) from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import ( + DBT_EPHEMERAL_MATERIALIZATION, DBT_LOG_DIR_NAME, DBT_LOG_FILENAME, DBT_LOG_PATH_ENVVAR, @@ -112,6 +113,11 @@ def file_path(self) -> Path: """Combined path to the node's file (path_base / original_file_path).""" return self.path_base / self.original_file_path + @property + def has_ephemeral_materialization(self) -> bool: + """Whether the node is materialized as ephemeral (inlined as a CTE, never written to the warehouse).""" + return str(self.config.get("materialized") or "").lower() == DBT_EPHEMERAL_MATERIALIZATION + @property def meta(self) -> dict[str, Any]: """ diff --git a/docs/guides/translate_dbt_to_airflow/render-config.rst b/docs/guides/translate_dbt_to_airflow/render-config.rst index da4c1b673e..608b112c35 100644 --- a/docs/guides/translate_dbt_to_airflow/render-config.rst +++ b/docs/guides/translate_dbt_to_airflow/render-config.rst @@ -31,6 +31,7 @@ The ``RenderConfig`` class takes the following arguments: - ``normalize_task_display_name``: This function allows users to set a custom user-defined function to alter the display name independently of the model name. This way, the task_id can be preserved while the model display name is modified. - ``should_detach_multiple_parents_tests``: A boolean to control if tests that depend on multiple parents should be run as standalone tasks. See `Testing Behavior `_ for more information. - ``enable_owner_inheritance``: (introduced in 1.10.2) A boolean to control if dbt owners should be imported as part of the airflow DAG owners. Defaults to True. +- ``ephemeral_models_as_empty_operator``: (new in v1.15.0) A boolean to control how ephemeral models are rendered. Ephemeral models are inlined as CTEs into downstream models and never written to the warehouse, so running them via a dbt operator is effectively a no-op. When ``True`` (default), they are rendered as ``EmptyOperator`` tasks, which preserves the dependency chain that passes through them while avoiding wasted dbt invocations and decluttering the DAG. Set to ``False`` to render them as regular dbt run tasks. - ``node_conversion_by_task_group``: A boolean to control if node_converters are used at the task group level (ex. converting models with test_behavior=AFTER_EACH means the entire task group is converted including the run task and the test task), or the individual task level (gives more granularity for converting just the run tasks or just the test tasks). Defaults to True. - ``group_nodes_by_folder``: When enabled, groups nodes by folder structure, creating a ``TaskGroup`` per resource type and folder. Disabled by default. diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 68f8d63975..d05558faae 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -675,6 +675,42 @@ def test_create_task_metadata_model_use_task_group(caplog): assert metadata.id == "run" +def _ephemeral_node(): + return DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.my_folder.my_ephemeral", + resource_type=DbtResourceType.MODEL, + depends_on=[], + path_base=Path("."), + original_file_path=Path("."), + tags=[], + config={"materialized": "ephemeral"}, + ) + + +def test_create_task_metadata_ephemeral_model_as_empty_operator_by_default(): + """Ephemeral models are rendered as EmptyOperator by default, keeping the task id of the run task.""" + metadata = create_task_metadata( + _ephemeral_node(), execution_mode=ExecutionMode.LOCAL, args={}, dbt_dag_task_group_identifier="" + ) + assert metadata.id == "my_ephemeral_run" + assert metadata.operator_class == EMPTY_OPERATOR_CLASS + assert metadata.arguments == {} + + +def test_create_task_metadata_ephemeral_model_disabled_renders_dbt_run(): + """With the flag disabled, ephemeral models render as regular dbt run tasks.""" + metadata = create_task_metadata( + _ephemeral_node(), + execution_mode=ExecutionMode.LOCAL, + args={}, + dbt_dag_task_group_identifier="", + render_config=RenderConfig(ephemeral_models_as_empty_operator=False), + ) + assert metadata.id == "my_ephemeral_run" + assert metadata.operator_class == "cosmos.operators.local.DbtRunLocalOperator" + assert metadata.arguments == {"select": "my_ephemeral"} + + @pytest.mark.parametrize( "unique_id, resource_type, has_freshness, source_rendering_behavior, expected_id, expected_operator_class", [ diff --git a/tests/test_config.py b/tests/test_config.py index 66240189d8..109c18ce69 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -278,6 +278,11 @@ def test_render_config_source_rendering_behavior_none_warns_and_normalizes(): assert config.source_rendering_behavior == SourceRenderingBehavior.NONE +def test_render_config_ephemeral_models_as_empty_operator_defaults_to_true(): + """Ephemeral models are rendered as EmptyOperator by default.""" + assert RenderConfig().ephemeral_models_as_empty_operator is True + + @pytest.mark.parametrize( "execution_mode, invocation_mode, expectation", [ From 553e12b80d4722441bb78181c0978dfb4ac198a6 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 4 Jun 2026 18:42:59 +0530 Subject: [PATCH 3/5] Document side effects of rendering ephemeral models as EmptyOperator With ephemeral_models_as_empty_operator defaulting to True, ephemeral models no longer run dbt, so they do not emit Airflow datasets, do not invoke task callbacks, and do not produce OpenLineage events. Note this in the render config, data-aware scheduling, and callbacks guides, and point to the opt-out. --- docs/guides/run_dbt/callbacks/callbacks.rst | 3 +++ docs/guides/run_dbt/customization/scheduling.rst | 4 ++++ docs/guides/translate_dbt_to_airflow/render-config.rst | 2 +- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/guides/run_dbt/callbacks/callbacks.rst b/docs/guides/run_dbt/callbacks/callbacks.rst index 9f305208df..13e3f1185e 100644 --- a/docs/guides/run_dbt/callbacks/callbacks.rst +++ b/docs/guides/run_dbt/callbacks/callbacks.rst @@ -6,6 +6,9 @@ Callbacks .. note:: Feature available when using ``ExecutionMode.LOCAL`` and ``ExecutionMode.VIRTUALENV``. +.. note:: + Since Cosmos v1.15.0, ephemeral dbt models are rendered as ``EmptyOperator`` tasks by default (``RenderConfig.ephemeral_models_as_empty_operator=True``). Because no dbt command runs for these tasks, callbacks are not invoked for ephemeral models. Set ``ephemeral_models_as_empty_operator=False`` to render them as regular dbt run tasks if you rely on callbacks for them. + Most dbt commands output `one or more artifacts `_ such as ``semantic_manifest.json``, ``manifest.json``, ``catalog.json``, ``run_results.json``, and ``sources.json`` in the target folder, which by default resides in the dbt project's root folder. However, since Cosmos creates temporary folders to run each dbt command, this folder vanishes by the end of the Cosmos task execution, diff --git a/docs/guides/run_dbt/customization/scheduling.rst b/docs/guides/run_dbt/customization/scheduling.rst index 6746dac77b..09ee50ff77 100644 --- a/docs/guides/run_dbt/customization/scheduling.rst +++ b/docs/guides/run_dbt/customization/scheduling.rst @@ -33,6 +33,10 @@ By default, if using a version between Airflow 2.4 or higher, Cosmos emits `Airf This feature is only available for ``ExecutionMode.LOCAL``, ``ExecutionMode.VIRTUALENV``, ``ExecutionMode.WATCHER`` and ``ExecutionMode.AIRFLOW_ASYNC``. +.. note:: + + Since Cosmos v1.15.0, ephemeral dbt models are rendered as ``EmptyOperator`` tasks by default (``RenderConfig.ephemeral_models_as_empty_operator=True``). These tasks do not run dbt and therefore do not emit datasets, so a DAG scheduled on an ephemeral model's dataset will not be triggered by it. Set ``ephemeral_models_as_empty_operator=False`` to render ephemeral models as regular dbt run tasks and keep emitting their datasets. + Cosmos calculates these URIs during the task execution, by using the library `OpenLineage Integration Common `_. This block illustrates a Cosmos-generated dataset for Postgres: diff --git a/docs/guides/translate_dbt_to_airflow/render-config.rst b/docs/guides/translate_dbt_to_airflow/render-config.rst index 608b112c35..50e80ba7a7 100644 --- a/docs/guides/translate_dbt_to_airflow/render-config.rst +++ b/docs/guides/translate_dbt_to_airflow/render-config.rst @@ -31,7 +31,7 @@ The ``RenderConfig`` class takes the following arguments: - ``normalize_task_display_name``: This function allows users to set a custom user-defined function to alter the display name independently of the model name. This way, the task_id can be preserved while the model display name is modified. - ``should_detach_multiple_parents_tests``: A boolean to control if tests that depend on multiple parents should be run as standalone tasks. See `Testing Behavior `_ for more information. - ``enable_owner_inheritance``: (introduced in 1.10.2) A boolean to control if dbt owners should be imported as part of the airflow DAG owners. Defaults to True. -- ``ephemeral_models_as_empty_operator``: (new in v1.15.0) A boolean to control how ephemeral models are rendered. Ephemeral models are inlined as CTEs into downstream models and never written to the warehouse, so running them via a dbt operator is effectively a no-op. When ``True`` (default), they are rendered as ``EmptyOperator`` tasks, which preserves the dependency chain that passes through them while avoiding wasted dbt invocations and decluttering the DAG. Set to ``False`` to render them as regular dbt run tasks. +- ``ephemeral_models_as_empty_operator``: (new in v1.15.0) A boolean to control how ephemeral models are rendered. Ephemeral models are inlined as CTEs into downstream models and never written to the warehouse, so running them via a dbt operator is effectively a no-op. When ``True`` (default), they are rendered as ``EmptyOperator`` tasks, which preserves the dependency chain that passes through them while avoiding wasted dbt invocations and decluttering the DAG. Because the ``EmptyOperator`` does not run dbt, behaviour tied to the ephemeral model's run task no longer occurs: no Airflow Dataset/Asset is emitted for it (so dataset-scheduled DAGs keyed on it are not triggered), task callbacks are not invoked, no OpenLineage events are produced, and per-node operator arguments (e.g. ``profile_args``) do not apply. Set to ``False`` to render them as regular dbt run tasks. - ``node_conversion_by_task_group``: A boolean to control if node_converters are used at the task group level (ex. converting models with test_behavior=AFTER_EACH means the entire task group is converted including the run task and the test task), or the individual task level (gives more granularity for converting just the run tasks or just the test tasks). Defaults to True. - ``group_nodes_by_folder``: When enabled, groups nodes by folder structure, creating a ``TaskGroup`` per resource type and folder. Disabled by default. From 5ec85a1a4ab53b746be1ba855f52241fc95755e1 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 4 Jun 2026 19:07:47 +0530 Subject: [PATCH 4/5] Render ephemeral models as EmptyOperator under BUILD test behavior The ephemeral check ran only in the non-BUILD branch, so with test_behavior=BUILD an ephemeral model was rendered as a dbt build task instead of an EmptyOperator, contradicting the documented default. Move the check ahead of the BUILD branch so it is honored regardless of test behavior, and add regression tests for BUILD mode with the flag enabled and disabled. --- cosmos/airflow/graph.py | 32 +++++++++++++++++++++++--------- tests/airflow/test_graph.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 8f56b7bc87..6ecbceb438 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -366,6 +366,29 @@ def create_task_metadata( # noqa: C901 # `AIRFLOW__COSMOS__PRE_DBT_FUSION=1`. models_select_key = "models" if settings.pre_dbt_fusion else "select" + if node.has_ephemeral_materialization and render_config.ephemeral_models_as_empty_operator: + # Ephemeral models are inlined as CTEs into downstream models and never written to the + # warehouse, so running them via a dbt operator (whether `dbt run` or `dbt build`) is a + # no-op. Render them as empty operators while keeping the node in the graph so the + # dependency chain passing through it is preserved. This check sits before the + # TestBehavior.BUILD branch so it is honored regardless of the test behavior. + is_build = ( + render_config.test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES + ) + task_id, args = _get_task_id_and_args( + node=node, + args=args, + use_task_group=use_task_group, + normalize_task_id=render_config.normalize_task_id, + normalize_task_display_name=render_config.normalize_task_display_name, + resource_suffix=resource_suffix, + include_resource_type=is_build, + execution_mode=execution_mode, + ) + # EmptyOperator does not accept custom dbt parameters (e.g. profile_args); keep only the display name. + args = {"task_display_name": args["task_display_name"]} if "task_display_name" in args else {} + return TaskMetadata(id=task_id, operator_class=EMPTY_OPERATOR_CLASS_PATH, arguments=args) + if render_config.test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: if node.fqn and len(node.fqn) > 0: args[models_select_key] = f"fqn:{'.'.join(node.fqn)}" @@ -438,15 +461,6 @@ def create_task_metadata( # noqa: C901 execution_mode=execution_mode, ) - if node.has_ephemeral_materialization and render_config.ephemeral_models_as_empty_operator: - # Ephemeral models are inlined as CTEs into downstream models and never written to the - # warehouse, so running them via a dbt operator is a no-op. Render them as empty operators - # to avoid wasted dbt invocations while keeping the node in the graph so that the dependency - # chain passing through it is preserved. EmptyOperator does not accept custom parameters - # (e.g. profile_args), so recreate the args keeping only the display name when present. - args = {"task_display_name": args["task_display_name"]} if "task_display_name" in args else {} - return TaskMetadata(id=task_id, operator_class=EMPTY_OPERATOR_CLASS_PATH, arguments=args) - _override_profile_if_needed(args, node.profile_config_to_override) task_owner = node.owner diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 6aa198e424..14f6562bd2 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -711,6 +711,34 @@ def test_create_task_metadata_ephemeral_model_disabled_renders_dbt_run(): assert metadata.arguments == {"select": "my_ephemeral"} +def test_create_task_metadata_ephemeral_model_as_empty_operator_in_build_mode(): + """Ephemeral models render as EmptyOperator even under TestBehavior.BUILD, where they would + otherwise be rendered as dbt build tasks.""" + metadata = create_task_metadata( + _ephemeral_node(), + execution_mode=ExecutionMode.LOCAL, + args={}, + dbt_dag_task_group_identifier="", + render_config=RenderConfig(test_behavior=TestBehavior.BUILD), + ) + assert metadata.id == "my_ephemeral_model_build" + assert metadata.operator_class == EMPTY_OPERATOR_CLASS_PATH + assert metadata.arguments == {} + + +def test_create_task_metadata_ephemeral_model_disabled_renders_dbt_build_in_build_mode(): + """With the flag disabled under TestBehavior.BUILD, ephemeral models render as dbt build tasks.""" + metadata = create_task_metadata( + _ephemeral_node(), + execution_mode=ExecutionMode.LOCAL, + args={}, + dbt_dag_task_group_identifier="", + render_config=RenderConfig(test_behavior=TestBehavior.BUILD, ephemeral_models_as_empty_operator=False), + ) + assert metadata.id == "my_ephemeral_model_build" + assert metadata.operator_class == "cosmos.operators.local.DbtBuildLocalOperator" + + @pytest.mark.parametrize( "unique_id, resource_type, has_freshness, source_rendering_behavior, expected_id, expected_operator_class", [ From 0259bb3a6cd1f2607b0121bf60840e63bfb7bcec Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Thu, 4 Jun 2026 19:22:39 +0530 Subject: [PATCH 5/5] Inherit the dbt model owner on ephemeral EmptyOperator tasks The ephemeral empty-operator path returned TaskMetadata without an owner, bypassing RenderConfig.enable_owner_inheritance and dropping the dbt model owner even though EmptyOperator accepts it. Set the owner the same way the regular task path does. --- cosmos/airflow/graph.py | 7 ++++++- tests/airflow/test_graph.py | 19 +++++++++++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 6ecbceb438..5287a60762 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -387,7 +387,12 @@ def create_task_metadata( # noqa: C901 ) # EmptyOperator does not accept custom dbt parameters (e.g. profile_args); keep only the display name. args = {"task_display_name": args["task_display_name"]} if "task_display_name" in args else {} - return TaskMetadata(id=task_id, operator_class=EMPTY_OPERATOR_CLASS_PATH, arguments=args) + return TaskMetadata( + id=task_id, + owner=node.owner if render_config.enable_owner_inheritance else "", + operator_class=EMPTY_OPERATOR_CLASS_PATH, + arguments=args, + ) if render_config.test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: if node.fqn and len(node.fqn) > 0: diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 14f6562bd2..010dd6f7e8 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -675,7 +675,10 @@ def test_create_task_metadata_model_use_task_group(caplog): assert metadata.id == "run" -def _ephemeral_node(): +def _ephemeral_node(owner=None): + config = {"materialized": "ephemeral"} + if owner is not None: + config["meta"] = {"owner": owner} return DbtNode( unique_id=f"{DbtResourceType.MODEL.value}.my_folder.my_ephemeral", resource_type=DbtResourceType.MODEL, @@ -683,7 +686,7 @@ def _ephemeral_node(): path_base=Path("."), original_file_path=Path("."), tags=[], - config={"materialized": "ephemeral"}, + config=config, ) @@ -739,6 +742,18 @@ def test_create_task_metadata_ephemeral_model_disabled_renders_dbt_build_in_buil assert metadata.operator_class == "cosmos.operators.local.DbtBuildLocalOperator" +def test_create_task_metadata_ephemeral_empty_operator_inherits_owner(): + """The ephemeral EmptyOperator inherits the dbt model owner when owner inheritance is enabled (default).""" + metadata = create_task_metadata( + _ephemeral_node(owner="dbt-owner"), + execution_mode=ExecutionMode.LOCAL, + args={}, + dbt_dag_task_group_identifier="", + ) + assert metadata.operator_class == EMPTY_OPERATOR_CLASS_PATH + assert metadata.owner == "dbt-owner" + + @pytest.mark.parametrize( "unique_id, resource_type, has_freshness, source_rendering_behavior, expected_id, expected_operator_class", [