Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
cefaa21
Modifies the node_converter logic to be at the task level instead of …
anyapriya May 8, 2025
c554450
Merge branch 'main' of github.com:anyapriya/astronomer-cosmos into no…
anyapriya May 8, 2025
b8b5cbe
Setting task type
anyapriya May 8, 2025
412dca8
Fixing test coverage
anyapriya May 16, 2025
3201f88
Merge branch 'main' into node_converter_at_task_level
anyapriya May 16, 2025
9cc4e9d
Merge branch 'main' of github.com:anyapriya/astronomer-cosmos into no…
anyapriya Sep 26, 2025
71d7bd5
Allows the ability to set if node_conversion happens at the task or t…
anyapriya Sep 29, 2025
c1184bc
Merge branch 'main' into node_converter_at_task_level
anyapriya Oct 1, 2025
35731e7
Merge branch 'main' of github.com:anyapriya/astronomer-cosmos into no…
anyapriya Oct 13, 2025
35ae44d
Merge branch 'main' into node_converter_at_task_level
anyapriya Oct 15, 2025
dd4282b
Merge branch 'main' into node_converter_at_task_level
tatiana Oct 28, 2025
787b24f
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Oct 28, 2025
0152397
Fixing duplicate import from merging in main
anyapriya Oct 28, 2025
59b2021
Merge branch 'main' into node_converter_at_task_level
tatiana Oct 28, 2025
cd20bff
Clean up extra parameter
anyapriya Oct 28, 2025
13b81b4
Merge branch 'main' into node_converter_at_task_level
tatiana Oct 29, 2025
298bd0f
Merge branch 'main' into node_converter_at_task_level
tatiana Nov 4, 2025
ba289e7
Move converter docs closer together
tatiana Nov 4, 2025
ee4491d
Refactor airflow/graph.py to simplify interfaces since #1759
tatiana Nov 4, 2025
9088cb2
Move node_conversion arguments together
tatiana Nov 4, 2025
9d0826f
Fix unittests
tatiana Nov 4, 2025
7dd880d
Fix integration test
tatiana Nov 4, 2025
784342a
Improve docs
tatiana Nov 4, 2025
b3e3435
Update the docs
tatiana Nov 4, 2025
ea2a250
improve changelog
tatiana Nov 4, 2025
5c8d1da
Add pre-commit to changelog
tatiana Nov 4, 2025
50923d0
Forward arguments introduced in 1.11.0 to conversion_function
tatiana Nov 4, 2025
3d519a5
Add missing properties introduced since https://github.com/astronomer…
tatiana Nov 4, 2025
a75121c
Merge branch 'main' into node_converter_at_task_level
tatiana Nov 6, 2025
8649e1a
Update config.py
tatiana Nov 6, 2025
44af853
Apply suggestions from code review
tatiana Nov 6, 2025
d03ab3f
Remove warning since node_converters is no longer experimental
tatiana Nov 6, 2025
c17c9e1
Update changelog with 1.12.0a1
tatiana Nov 5, 2025
ae3954f
Remove dbt 1.5 from run-integration-tests-dbt-async
tatiana Nov 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
Changelog
=========

1.12.0a2 (2025-11-04)
-------------------

Breaking changes

* Introduced in the PR #2080. The following functions are expected to be used internally only to Cosmos, so we hope these won't impact end-users, but we are documenting the changes just in case:
- ``generate_task_or_group`` receives ``render_config`` instead of its individual configurations, such as ``test_behavior``, ``source_rendering_behavior`` and ``enable_owner_inheritance``
- ``create_task_metadata`` receives ``render_config`` instead of its individual configurations, such as ``test_behavior``, ``source_rendering_behavior`` and ``enable_owner_inheritance``
- ``create_task_metadata`` now expects the ``node_converters`` argument

Features

* Support applying ``node_converter`` at a task level instead of task group level by @anyapriya in #1759

Enhancements

* Remove usage of contextmanager in plugins for accessing connections in Airflow >= 3.1.2 by @pankajkoti in #2073
* Refactor ``airflow/graph.py`` to simplify code-base by @tatiana in #2080

Others

* Fix broken CI due to fastapi incompatibility with cadwyn for Airflow 3 by @pankajkoti in #2076
* Pre-commit updates: #2078

1.11.0 (2025-10-29)
---------------------

Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from cosmos import settings

__version__ = "1.11.0"
__version__ = "1.12.0a2"

if not settings.enable_memory_optimised_imports:
from cosmos.airflow.dag import DbtDag
Expand Down
275 changes: 108 additions & 167 deletions cosmos/airflow/graph.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class RenderConfig:
:param selector: Name of a dbt YAML selector to use for parsing. Only supported when using ``load_method=LoadMode.DBT_LS``.
:param dbt_deps: (deprecated) Configure to run dbt deps when using dbt ls for dag parsing
:param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``.
:param node_conversion_by_task_group: A boolean that allows users to do node conversion at the task group level instead of task level. Defaults to True.
:param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path.
:param env_vars: (Deprecated since Cosmos 1.3 use ProjectConfig.env_vars) A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``.
:param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``.
Expand All @@ -76,7 +77,6 @@ class RenderConfig:
:param normalize_task_display_name: A callable that takes a dbt node as input and returns the task display name. This allows users to assign a custom task display name separate from the node ID.
:param should_detach_multiple_parents_tests: A boolean that allows users to decide whether to run tests with multiple parent dependencies in separate tasks.
:param enable_owner_inheritance: A boolean that allows users to enable the owner inheritance from dbt models to airflow tasks. Defaults to True.
:param node_conversion_by_task_group: A boolean that allows users to do node conversion at the task group level instead of task level. Defaults to True.
"""

emit_datasets: bool = True
Expand All @@ -88,6 +88,7 @@ class RenderConfig:
selector: str | None = None
dbt_deps: bool | None = None
node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None
node_conversion_by_task_group: bool | None = True
dbt_executable_path: str | Path = get_system_dbt()
env_vars: dict[str, str] | None = None
dbt_project_path: InitVar[str | Path | None] = None
Expand All @@ -101,7 +102,6 @@ class RenderConfig:
normalize_task_display_name: Callable[..., Any] | None = None
should_detach_multiple_parents_tests: bool = False
enable_owner_inheritance: bool | None = True
node_conversion_by_task_group: bool | None = True

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.env_vars:
Expand Down
11 changes: 9 additions & 2 deletions docs/configuration/render-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The ``RenderConfig`` class takes the following arguments:
- ``selector``: (new in v1.3) name of a dbt YAML selector to use for DAG parsing. Only supported when using ``load_method=LoadMode.DBT_LS``. See `Selecting & Excluding <selecting-excluding.html>`_ for more information.
- ``dbt_deps``: (deprecated in v1.9, use ``ProjectConfig.install_dbt_deps`` onwards) A Boolean to run dbt deps when using dbt ls for dag parsing. Default True
- ``node_converters``: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. Find more information below.
- ``node_conversion_by_task_group``: (new in v1.12.0) A boolean to control if node_converters are used at the task group level (ex. converting models with test_behavior=AFTER_EACH means the entire task group is converted including the run task and the test task), or the individual task level (gives more granularity for converting just the run tasks or just the test tasks). Defaults to True.
- ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path.
- ``dbt_ls_path``: Should be set when using ``load_method=LoadMode.DBT_LS_OUTPUT``. Path of the user-managed output of ``dbt ls``.
- ``enable_mock_profile``: When using ``LoadMode.DBT_LS`` with a ``ProfileMapping`` class, by default, Cosmos mocks the values of the profile. Defaults to True. In order to leverage partial parsing, this argument should be set to ``False``. Read `Partial parsing <./partial-parsing.html#profile-configuration.html>`_ for more information.
Expand Down Expand Up @@ -50,8 +51,10 @@ Users may opt to use ``InvocationMode.SUBPROCESS`` when they have multiple Pytho
and do not want Cosmos to use the dbt version installed in the same Python Virtualenv as Airflow to parse the DAG.


Customizing how nodes are rendered (experimental)
-------------------------------------------------
Customizing how nodes are rendered
----------------------------------

.. versionadded:: 1.2.0

There are circumstances when choosing specific Airflow operators to represent a dbt node is helpful.
An example could be to use an S3 sensor to represent dbt sources or to create custom operators to handle exposures.
Expand All @@ -65,3 +68,7 @@ The following example illustrates how it is possible to tell Cosmos how to conve
:end-before: [END custom_dbt_nodes]

When defining the mapping for a new type that is not part of Cosmos' ``DbtResourceType`` enumeration, users should use the syntax ``DbtResourceType("new-node-type")`` as opposed to ``DbtResourceType.EXISTING_TYPE``. It will dynamically add the new type to the enumeration ``DbtResourceType`` so that Cosmos can parse these dbt nodes and convert them into the Airflow DAG.

In Cosmos 1.12.0, this feature was further improved by adding the ``RenderConfig.node_conversion_by_task_group`` parameter.
This parameter allows users to control if node_converters are used at the task group level (ex. converting models with test_behavior=AFTER_EACH means the entire task group is converted including the run task and the test task),
or the individual task level (gives more granularity for converting just the run tasks or just the test tasks). Defaults to True.
41 changes: 27 additions & 14 deletions tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,11 @@ def test_create_task_group_for_after_each_supported_nodes(node_type: DbtResource
},
dbt_project_name="astro_shop",
node_converters={},
test_behavior=TestBehavior.AFTER_EACH,
render_config=RenderConfig(
test_behavior=TestBehavior.AFTER_EACH,
source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR,
),
on_warning_callback=None,
source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR,
)
assert isinstance(output, TaskGroup)
assert list(output.children.keys()) == [f"dbt_node.{task_suffix}", "dbt_node.test"]
Expand Down Expand Up @@ -615,7 +617,9 @@ def test_create_task_metadata_source_with_rendering_options(
metadata = create_task_metadata(
child_node,
execution_mode=ExecutionMode.LOCAL,
source_rendering_behavior=source_rendering_behavior,
render_config=RenderConfig(
source_rendering_behavior=source_rendering_behavior,
),
args={},
dbt_dag_task_group_identifier="",
)
Expand Down Expand Up @@ -922,10 +926,12 @@ def test_create_task_metadata_normalize_task_id(
args=args,
dbt_dag_task_group_identifier="",
use_task_group=use_task_group,
normalize_task_id=normalize_task_id,
normalize_task_display_name=normalize_task_display_name,
source_rendering_behavior=SourceRenderingBehavior.ALL,
test_behavior=test_behavior,
render_config=RenderConfig(
normalize_task_id=normalize_task_id,
normalize_task_display_name=normalize_task_display_name,
source_rendering_behavior=SourceRenderingBehavior.ALL,
test_behavior=test_behavior,
),
)
assert metadata.id == expected_node_id
if expected_display_name:
Expand Down Expand Up @@ -1105,10 +1111,12 @@ def test_owner(dbt_extra_config, expected_owner):
},
dbt_project_name="astro_shop",
node_converters={},
test_behavior=TestBehavior.AFTER_EACH,
render_config=RenderConfig(
test_behavior=TestBehavior.AFTER_EACH,
source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR,
enable_owner_inheritance=True,
),
on_warning_callback=None,
source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR,
enable_owner_inheritance=True,
)

assert len(output.leaves) == 1
Expand Down Expand Up @@ -1232,7 +1240,9 @@ def test_create_task_metadata_disable_owner_inheritance(enable_owner_inheritance
execution_mode=ExecutionMode.LOCAL,
args={"project_dir": SAMPLE_PROJ_PATH},
dbt_dag_task_group_identifier="test_dag",
enable_owner_inheritance=enable_owner_inheritance,
render_config=RenderConfig(
enable_owner_inheritance=enable_owner_inheritance,
),
)

assert task_metadata is not None
Expand Down Expand Up @@ -1322,11 +1332,14 @@ def test_generate_task_or_group_disable_owner_inheritance(enable_owner_inheritan
),
),
},
test_behavior=TestBehavior.NONE,
source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR,
render_config=RenderConfig(
test_behavior=TestBehavior.NONE,
source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR,
enable_owner_inheritance=enable_owner_inheritance,
),
test_indirect_selection=TestIndirectSelection.EAGER,
on_warning_callback=None,
enable_owner_inheritance=enable_owner_inheritance,
node_converters={},
)

assert task_or_group is not None
Expand Down
83 changes: 55 additions & 28 deletions tests/dbt/test_pruning.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
create_task_metadata,
generate_task_or_group,
)
from cosmos.config import ProfileConfig
from cosmos.config import ProfileConfig, RenderConfig
from cosmos.constants import (
DbtResourceType,
ExecutionMode,
Expand Down Expand Up @@ -224,9 +224,11 @@ def test_create_task_metadata_source_pruning_disabled(self):
node=self.source_with_downstream,
execution_mode=ExecutionMode.LOCAL,
args=self.task_args,
dbt_dag_task_group_identifier="test",
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=False, # Disabled
dbt_dag_task_group_identifier="tes t",
render_config=RenderConfig(
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=False, # Disabled
),
filtered_nodes=self.filtered_nodes_with_model,
)

Expand All @@ -235,8 +237,10 @@ def test_create_task_metadata_source_pruning_disabled(self):
execution_mode=ExecutionMode.LOCAL,
args=self.task_args,
dbt_dag_task_group_identifier="test",
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=False, # Disabled
render_config=RenderConfig(
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=False, # Disabled
),
filtered_nodes=self.filtered_nodes_empty,
)

Expand All @@ -254,8 +258,10 @@ def test_create_task_metadata_source_pruning_enabled(self):
execution_mode=ExecutionMode.LOCAL,
args=self.task_args,
dbt_dag_task_group_identifier="test",
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True, # Enabled
render_config=RenderConfig(
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True, # Enabled
),
filtered_nodes=self.filtered_nodes_with_model,
)

Expand All @@ -265,8 +271,10 @@ def test_create_task_metadata_source_pruning_enabled(self):
execution_mode=ExecutionMode.LOCAL,
args=self.task_args,
dbt_dag_task_group_identifier="test",
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True, # Enabled
render_config=RenderConfig(
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True, # Enabled
),
filtered_nodes=self.filtered_nodes_with_model,
)

Expand All @@ -282,8 +290,10 @@ def test_create_task_metadata_source_pruning_edge_cases(self):
execution_mode=ExecutionMode.LOCAL,
args=self.task_args,
dbt_dag_task_group_identifier="test",
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True,
render_config=RenderConfig(
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True,
),
filtered_nodes=None, # None means skip pruning check
)

Expand All @@ -293,8 +303,10 @@ def test_create_task_metadata_source_pruning_edge_cases(self):
execution_mode=ExecutionMode.LOCAL,
args=self.task_args,
dbt_dag_task_group_identifier="test",
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True,
render_config=RenderConfig(
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True,
),
filtered_nodes={}, # Empty dict means skip pruning check
)

Expand All @@ -311,8 +323,10 @@ def test_create_task_metadata_source_pruning_edge_cases(self):
execution_mode=ExecutionMode.LOCAL,
args=self.task_args,
dbt_dag_task_group_identifier="test",
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True,
render_config=RenderConfig(
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True,
),
filtered_nodes={unrelated_model.unique_id: unrelated_model}, # Non-empty but doesn't use this source
)

Expand All @@ -333,11 +347,14 @@ def test_generate_task_or_group_source_pruning(self):
node=self.source_with_downstream,
execution_mode=ExecutionMode.LOCAL,
task_args=self.task_args,
test_behavior=TestBehavior.NONE,
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True,
render_config=RenderConfig(
test_behavior=TestBehavior.NONE,
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True,
),
test_indirect_selection=TestIndirectSelection.EAGER,
filtered_nodes=self.filtered_nodes_with_model,
node_converters={},
)

# Orphaned source should NOT generate a task
Expand All @@ -348,10 +365,13 @@ def test_generate_task_or_group_source_pruning(self):
execution_mode=ExecutionMode.LOCAL,
task_args=self.task_args,
test_behavior=TestBehavior.NONE,
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True,
render_config=RenderConfig(
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True,
),
test_indirect_selection=TestIndirectSelection.EAGER,
filtered_nodes=self.filtered_nodes_with_model,
node_converters={},
)

assert task_with_downstream is not None
Expand All @@ -375,8 +395,10 @@ def test_source_pruning_with_different_source_rendering_behaviors(self):
execution_mode=ExecutionMode.LOCAL,
args=self.task_args,
dbt_dag_task_group_identifier="test",
source_rendering_behavior=SourceRenderingBehavior.NONE,
source_pruning=True,
render_config=RenderConfig(
source_rendering_behavior=SourceRenderingBehavior.NONE,
source_pruning=True,
),
filtered_nodes=self.filtered_nodes_with_model,
)

Expand All @@ -386,8 +408,10 @@ def test_source_pruning_with_different_source_rendering_behaviors(self):
execution_mode=ExecutionMode.LOCAL,
args=self.task_args,
dbt_dag_task_group_identifier="test",
source_rendering_behavior=SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS,
source_pruning=True,
render_config=RenderConfig(
source_rendering_behavior=SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS,
source_pruning=True,
),
filtered_nodes=self.filtered_nodes_with_model,
)

Expand Down Expand Up @@ -504,11 +528,14 @@ def test_source_pruning_parameter_defaults(self):
node=source,
execution_mode=ExecutionMode.LOCAL,
task_args=task_args,
test_behavior=TestBehavior.NONE,
source_rendering_behavior=SourceRenderingBehavior.ALL,
render_config=RenderConfig(
test_behavior=TestBehavior.NONE,
source_rendering_behavior=SourceRenderingBehavior.ALL,
# source_pruning not specified - should default to False
),
test_indirect_selection=TestIndirectSelection.EAGER,
# source_pruning not specified - should default to False
# filtered_nodes not specified - should default to None
node_converters={},
)

# Should create a task because pruning is disabled by default
Expand Down