From d8eb6228d039804c5a0f0c4e911ed1580f6374d8 Mon Sep 17 00:00:00 2001 From: Giovanni Corsetti <155465603+corsettigyg@users.noreply.github.com> Date: Fri, 11 Apr 2025 11:28:30 +0200 Subject: [PATCH 1/6] Update graph.py --- cosmos/airflow/graph.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 209a6d9614..5265868495 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -231,7 +231,6 @@ def create_dbt_resource_to_class(test_behavior: TestBehavior) -> dict[str, str]: } return dbt_resource_to_class - def create_task_metadata( node: DbtNode, execution_mode: ExecutionMode, @@ -241,6 +240,7 @@ def create_task_metadata( source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, normalize_task_id: Callable[..., Any] | None = None, test_behavior: TestBehavior = TestBehavior.AFTER_ALL, + test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER, # Add this parameter on_warning_callback: Callable[..., Any] | None = None, detached_from_parent: dict[str, DbtNode] | None = None, ) -> TaskMetadata | None: @@ -270,7 +270,9 @@ def create_task_metadata( "package_name": node.package_name, } - if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: + if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES + if test_indirect_selection != TestIndirectSelection.EAGER: + args["indirect_selection"] = test_indirect_selection.value args["on_warning_callback"] = on_warning_callback exclude_detached_tests_if_needed(node, args, detached_from_parent) task_id, args = _get_task_id_and_args( @@ -367,6 +369,7 @@ def generate_task_or_group( source_rendering_behavior=source_rendering_behavior, normalize_task_id=normalize_task_id, test_behavior=test_behavior, + test_indirect_selection=test_indirect_selection, on_warning_callback=on_warning_callback, detached_from_parent=detached_from_parent, ) From 6ba87776a864726f4da334b2754e7c9fa7a20346 Mon Sep 17 00:00:00 2001 From: Giovanni Corsetti <155465603+corsettigyg@users.noreply.github.com> Date: Fri, 11 Apr 2025 11:32:05 +0200 Subject: [PATCH 2/6] fix --- cosmos/airflow/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 5265868495..e2a7b4fbae 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -270,7 +270,7 @@ def create_task_metadata( "package_name": node.package_name, } - if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES + if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: if test_indirect_selection != TestIndirectSelection.EAGER: args["indirect_selection"] = test_indirect_selection.value args["on_warning_callback"] = on_warning_callback From 5cd60d6a8f2348cc80ef81d4fd51c670c46c7eb8 Mon Sep 17 00:00:00 2001 From: GiovanniCorsetti <66136602+CorsettiS@users.noreply.github.com> Date: Fri, 11 Apr 2025 11:35:59 +0200 Subject: [PATCH 3/6] update --- cosmos/airflow/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index e2a7b4fbae..4c3843e776 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -240,7 +240,7 @@ def create_task_metadata( source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, normalize_task_id: Callable[..., Any] | None = None, test_behavior: TestBehavior = TestBehavior.AFTER_ALL, - test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER, # Add this parameter + test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER, on_warning_callback: Callable[..., Any] | None = None, detached_from_parent: dict[str, DbtNode] | None = None, ) -> TaskMetadata | None: From 818b109ff851665ad19629d046e9d35d869f4377 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 11 Apr 2025 09:44:34 +0000 Subject: [PATCH 4/6] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/airflow/graph.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 4c3843e776..71bff501fc 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -231,6 +231,7 @@ def create_dbt_resource_to_class(test_behavior: TestBehavior) -> dict[str, str]: } return dbt_resource_to_class + def create_task_metadata( node: DbtNode, execution_mode: ExecutionMode, From 7e5548099995ad05bf0757de4110ec3b46c6752f Mon Sep 17 00:00:00 2001 From: corsettigyg Date: Fri, 11 Apr 2025 12:16:44 +0200 Subject: [PATCH 5/6] adding test for indirect_selection when TestBehavior.BUILD --- tests/airflow/test_graph.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 728b86283b..357b194a0a 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -808,6 +808,42 @@ def test_create_task_metadata_normalize_task_id( assert "task_display_name" not in metadata.arguments +@pytest.mark.integration +def test_build_airflow_graph_with_build_and_buildable_indirect_selection(): + with DAG("test-build-buildable", start_date=datetime(2022, 1, 1)) as dag: + task_args = { + "project_dir": SAMPLE_PROJ_PATH, + "conn_id": "fake_conn", + "profile_config": ProfileConfig( + profile_name="default", + target_name="default", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="fake_conn", + profile_args={"schema": "public"}, + ), + ), + } + render_config = RenderConfig( + test_behavior=TestBehavior.BUILD, + ) + build_airflow_graph( + nodes=sample_nodes, + dag=dag, + execution_mode=ExecutionMode.LOCAL, + test_indirect_selection=TestIndirectSelection.BUILDABLE, + task_args=task_args, + dbt_project_name="astro_shop", + render_config=render_config, + ) + + topological_sort = [task.task_id for task in dag.topological_sort()] + expected_sort = ["seed_parent_seed_build", "parent_model_build", "child_model_build", "child2_v2_model_build"] + assert topological_sort == expected_sort + + for task in dag.tasks: + if hasattr(task, 'indirect_selection'): + assert task.indirect_selection == TestIndirectSelection.BUILDABLE.value + @pytest.mark.parametrize( "node_type,node_unique_id,test_indirect_selection,additional_arguments", [ From a2fe50fd183f92839df9d5fa5cd7ec2070ae4ece Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 11 Apr 2025 10:18:05 +0000 Subject: [PATCH 6/6] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/airflow/test_graph.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 357b194a0a..1cc2418966 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -841,9 +841,10 @@ def test_build_airflow_graph_with_build_and_buildable_indirect_selection(): assert topological_sort == expected_sort for task in dag.tasks: - if hasattr(task, 'indirect_selection'): + if hasattr(task, "indirect_selection"): assert task.indirect_selection == TestIndirectSelection.BUILDABLE.value + @pytest.mark.parametrize( "node_type,node_unique_id,test_indirect_selection,additional_arguments", [