Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
be8e5d0
Add source_prunning logic to cosmos
corsettigyg Sep 19, 2025
d94314e
add source prunning logic
corsettigyg Sep 19, 2025
25800b7
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Sep 19, 2025
c156dff
Merge branch 'main' into source-prunning
corsettigyg Sep 19, 2025
a61ceb8
add cached property to transverse map
corsettigyg Sep 19, 2025
4a530bd
Merge remote-tracking branch 'origin/source-prunning' into source-pru…
corsettigyg Sep 19, 2025
3f74389
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Sep 19, 2025
2c2510a
Merge branch 'main' into source-prunning
corsettigyg Sep 19, 2025
27c5b74
fix issue
corsettigyg Sep 19, 2025
de3dab1
improve performance
corsettigyg Sep 19, 2025
31b4a45
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Sep 19, 2025
69c7673
fix spelling
corsettigyg Sep 19, 2025
b669066
Merge remote-tracking branch 'origin/source-prunning' into source-pru…
corsettigyg Sep 19, 2025
ddbd32d
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Sep 19, 2025
ea187a6
Update cosmos/config.py
corsettigyg Sep 19, 2025
1758e3d
rename to source pruning
corsettigyg Sep 19, 2025
6427f0d
rename to source pruning v2
corsettigyg Sep 19, 2025
ceee75c
fix spelling in dag_id
corsettigyg Sep 19, 2025
32b7bdd
fix file name
corsettigyg Sep 19, 2025
152bc63
revert changes in node
corsettigyg Sep 20, 2025
b5f32e1
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Sep 20, 2025
8140db7
fix doc
corsettigyg Sep 20, 2025
fc323e1
Merge remote-tracking branch 'origin/source-prunning' into source-pru…
corsettigyg Sep 20, 2025
a3ee387
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Sep 20, 2025
a19fb7e
fix source_pruning not having default value
corsettigyg Sep 20, 2025
3305675
Merge remote-tracking branch 'origin/source-prunning' into source-pru…
corsettigyg Sep 20, 2025
71266ab
add tests
corsettigyg Sep 22, 2025
82ea59d
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Sep 22, 2025
26adfd8
improve documentation
corsettigyg Sep 22, 2025
dfcda48
adding extra node to the test_graph
corsettigyg Sep 22, 2025
7187e42
exclude failing node to fix airflow 3 issue
corsettigyg Sep 23, 2025
3eb6888
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Sep 23, 2025
fcd08e8
Merge branch 'main' into source-prunning
corsettigyg Oct 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,24 @@ def calculate_operator_class(
)


def _is_source_used_by_filtered_nodes(source_node: DbtNode, filtered_nodes: dict[str, DbtNode]) -> bool:
"""
Check if a source node is referenced by any of the filtered nodes.

:param source_node: The source node to check
:param filtered_nodes: Dictionary of filtered nodes
:returns: True if the source is used by any filtered node, False otherwise
"""
source_id = source_node.unique_id

# Check if any filtered node depends on this source
for node in filtered_nodes.values():
if source_id in node.depends_on:
return True

return False


def calculate_leaves(tasks_ids: list[str], nodes: dict[str, DbtNode]) -> list[str]:
"""
Return a list of unique_ids for nodes that are not parents (don't have dependencies on other tasks).
Expand Down Expand Up @@ -257,13 +275,15 @@ def create_task_metadata(
dbt_dag_task_group_identifier: str,
use_task_group: bool = False,
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE,
source_pruning: bool = False,
normalize_task_id: Callable[..., Any] | None = None,
normalize_task_display_name: Callable[..., Any] | None = None,
test_behavior: TestBehavior = TestBehavior.AFTER_ALL,
test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER,
on_warning_callback: Callable[..., Any] | None = None,
detached_from_parent: dict[str, DbtNode] | None = None,
enable_owner_inheritance: bool | None = None,
filtered_nodes: dict[str, DbtNode] | None = None,
) -> TaskMetadata | None:
"""
Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node.
Expand Down Expand Up @@ -330,6 +350,8 @@ def create_task_metadata(
):
return None

if source_pruning and filtered_nodes and not _is_source_used_by_filtered_nodes(node, filtered_nodes):
return None
task_id, args = _get_task_id_and_args(
node, args, use_task_group, normalize_task_id, normalize_task_display_name, "source"
)
Expand Down Expand Up @@ -398,11 +420,13 @@ def generate_task_or_group(
test_behavior: TestBehavior,
source_rendering_behavior: SourceRenderingBehavior,
test_indirect_selection: TestIndirectSelection,
Comment thread
corsettigyg marked this conversation as resolved.
on_warning_callback: Callable[..., Any] | None,
source_pruning: bool = False,
on_warning_callback: Callable[..., Any] | None = None,
Comment thread
corsettigyg marked this conversation as resolved.
normalize_task_id: Callable[..., Any] | None = None,
normalize_task_display_name: Callable[..., Any] | None = None,
detached_from_parent: dict[str, DbtNode] | None = None,
enable_owner_inheritance: bool | None = None,
filtered_nodes: dict[str, DbtNode] | None = None,
**kwargs: Any,
) -> BaseOperator | TaskGroup | None:
task_or_group: BaseOperator | TaskGroup | None = None
Expand All @@ -421,13 +445,15 @@ def generate_task_or_group(
dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group),
use_task_group=use_task_group,
source_rendering_behavior=source_rendering_behavior,
source_pruning=source_pruning,
normalize_task_id=normalize_task_id,
normalize_task_display_name=normalize_task_display_name,
test_behavior=test_behavior,
test_indirect_selection=test_indirect_selection,
on_warning_callback=on_warning_callback,
detached_from_parent=detached_from_parent,
enable_owner_inheritance=enable_owner_inheritance,
filtered_nodes=filtered_nodes,
)

# In most cases, we'll map one DBT node to one Airflow task
Expand Down Expand Up @@ -631,6 +657,7 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro
node_converters = render_config.node_converters or {}
test_behavior = render_config.test_behavior
source_rendering_behavior = render_config.source_rendering_behavior
source_pruning = render_config.source_pruning
normalize_task_id = render_config.normalize_task_id
normalize_task_display_name = render_config.normalize_task_display_name
enable_owner_inheritance = render_config.enable_owner_inheritance
Expand Down Expand Up @@ -663,13 +690,15 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro
task_args=task_args,
test_behavior=test_behavior,
source_rendering_behavior=source_rendering_behavior,
source_pruning=source_pruning,
test_indirect_selection=test_indirect_selection,
on_warning_callback=on_warning_callback,
normalize_task_id=normalize_task_id,
normalize_task_display_name=normalize_task_display_name,
node=node,
detached_from_parent=detached_from_parent,
enable_owner_inheritance=enable_owner_inheritance,
filtered_nodes=nodes,
)
if task_or_group is not None:
logger.debug(f"Conversion of <{node.unique_id}> was successful!")
Expand Down
2 changes: 2 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class RenderConfig:
:param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``.
:param enable_mock_profile: Allows to enable/disable mocking profile. Enabled by default. Mock profiles are useful for parsing Cosmos DAGs in the CI, but should be disabled to benefit from partial parsing (since Cosmos 1.4).
:param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6).
:param source_pruning: Determines if source nodes without a corresponding downstream task should be removed or not. Default is False
:param airflow_vars_to_purge_dbt_ls_cache: Specify Airflow variables that will affect the LoadMode.DBT_LS cache.
:param normalize_task_id: A callable that takes a dbt node as input and returns the task ID. This allows users to assign a custom node ID separate from the display name.
:param normalize_task_display_name: A callable that takes a dbt node as input and returns the task display name. This allows users to assign a custom task display name separate from the node ID.
Expand All @@ -92,6 +93,7 @@ class RenderConfig:
project_path: Path | None = field(init=False)
enable_mock_profile: bool = True
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE
source_pruning: bool = False
airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list)
normalize_task_id: Callable[..., Any] | None = None
normalize_task_display_name: Callable[..., Any] | None = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ sources:
database: "{{ env_var('POSTGRES_DB') }}"
schema: "{{ env_var('POSTGRES_SCHEMA') }}"
tables:
- name: not_connected_source
- name: raw_customers
columns:
- name: id
Expand Down
54 changes: 54 additions & 0 deletions dev/dags/example_source_pruning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
An example DAG that uses Cosmos to render a dbt project into an Airflow DAG using Cosmos source rendering, and then
prunes unused source nodes.
"""

import os
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import SourceRenderingBehavior
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
disable_event_tracking=True,
),
)

# [START cosmos_source_node_example]

source_rendering_dag = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "altered_jaffle_shop",
),
profile_config=profile_config,
operator_args={
"install_deps": True, # install any necessary dependencies before running any dbt command
"full_refresh": True, # used only in dbt commands that support this flag
},
render_config=RenderConfig(
source_rendering_behavior=SourceRenderingBehavior.ALL,
source_pruning=True,
exclude=[
"multibyte"
], # Exclude multibyte model to avoid Airflow 3.0 AssetAlias ASCII validation issues
Comment thread
tatiana marked this conversation as resolved.
),
# normal dag parameters
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
dag_id="source_pruning_dag",
default_args={"retries": 0},
on_warning_callback=lambda context: print(context),
)
# [END cosmos_source_node_example]
3 changes: 2 additions & 1 deletion docs/configuration/render-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ The ``RenderConfig`` class takes the following arguments:
- ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``
- ``airflow_vars_to_purge_cache``: (new in v1.5) Specify Airflow variables that will affect the ``LoadMode.DBT_LS`` cache. See `Caching <./caching.html>`_ for more information.
- ``source_rendering_behavior``: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6). See `Source Nodes Rendering <./source-nodes-rendering.html>`_ for more information.
- ``normalize_task_id``: A callable that takes a dbt node as input and returns the task ID. This function allows users to set a custom task_id independently of the model name, which can be specified as the task’s display_name. This way, task_id can be modified using a user-defined function, while the model name remains as the task’s display name. The display_name parameter is available in Airflow 2.9 and above. See `Task display name <./task-display-name.html>`_ for more information.
- ``source_pruning``: When set to ``True``, automatically removes (or "prunes") any dbt source nodes from your Airflow DAG that do not have any downstream dependencies within the selected portion of the dbt graph. Defaults to ``False``. See `Source Nodes Rendering <./source-nodes-rendering.html>`_ for more information.
- ``normalize_task_id``: A callable that takes a dbt node as input and returns the task ID. This function allows users to set a custom task_id independently of the model name, which can be specified as the task's display_name. This way, task_id can be modified using a user-defined function, while the model name remains as the task's display name. The display_name parameter is available in Airflow 2.9 and above. See `Task display name <./task-display-name.html>`_ for more information.
- ``normalize_task_display_name``: This function allows users to set a custom user-defined function to alter the display name independently of the model name. This way, the task_id can be preserved while the model display name is modified.
- ``should_detach_multiple_parents_tests``: A boolean to control if tests that depend on multiple parents should be run as standalone tasks. See `Parsing Methods <testing-behavior.html>`_ for more information.
- ``enable_owner_inheritance``: (introduced in 1.10.2) A boolean to control if dbt owners should be imported as part of the airflow DAG owners. Defaults to True.
Expand Down
26 changes: 26 additions & 0 deletions docs/configuration/source-nodes-rendering.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,32 @@ Example:
)


Source Pruning
~~~~~~~~~~~~~~

The ``source_pruning`` is a boolean parameter available in the ``RenderConfig``.
When set to ``True``, it automatically removes (or "prunes") any dbt source nodes from your Airflow DAG that do not have any downstream dependencies within the selected portion of the dbt graph.

This is particularly useful for keeping your DAGs clean and focused, especially in large dbt projects where you might be selecting only a subset of models to run.

By default, this option is set to ``False``.

Example:

.. code-block:: python

from cosmos import DbtTaskGroup, RenderConfig

jaffle_shop_pruned = DbtTaskGroup(
render_config=RenderConfig(
select=["+customers"], # select only customers and its parents
source_pruning=True,
)
)

In this example, if the ``jaffle_shop`` project has multiple sources, but only some of them are upstream of the ``customers`` model, Cosmos will only render the necessary sources and prune the rest.


on_warning_callback Callback
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
2 changes: 1 addition & 1 deletion tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ def test_load_via_dbt_ls_with_exclude(postgres_profile_config):
@pytest.mark.integration
@pytest.mark.parametrize(
"project_dir,node_count",
[(DBT_PROJECTS_ROOT_DIR / ALTERED_DBT_PROJECT_NAME, 39), (DBT_PROJECTS_ROOT_DIR / "jaffle_shop_python", 28)],
[(DBT_PROJECTS_ROOT_DIR / ALTERED_DBT_PROJECT_NAME, 40), (DBT_PROJECTS_ROOT_DIR / "jaffle_shop_python", 28)],
Comment thread
tatiana marked this conversation as resolved.
)
def test_load_via_dbt_ls_without_exclude(project_dir, node_count, postgres_profile_config):
project_config = ProjectConfig(dbt_project_path=project_dir)
Expand Down
Loading