From 70b4415ec024cb66f3fcc390cfc39e230b4654a2 Mon Sep 17 00:00:00 2001 From: John Horan Date: Mon, 27 Apr 2026 10:33:23 +0100 Subject: [PATCH 01/10] update graph Co-authored-by: Copilot --- cosmos/airflow/graph.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 08cf9a1f7e..917fb58af3 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -745,7 +745,13 @@ def _add_watcher_dependencies( Iterate through the watcher consumer tasks and: - set the producer task ID in all of them - make the producer task to be the parent of the root dbt nodes, without blocking them from sensing XCom + - if the producer task has depends_on_past=True, set wait_for_downstream=True in the producer and all its downstream tasks """ + producer_watcher_done_task = next((t for t in producer_airflow_task.downstream_list if t.task_id.endswith(PRODUCER_WATCHER_DONE_TASK_ID)), None) + needs_wait_for_downstream = producer_watcher_done_task is not None and producer_airflow_task.depends_on_past + if needs_wait_for_downstream: + producer_airflow_task.wait_for_downstream = True + for node_id, task_or_taskgroup in tasks_map.items(): # We do not want to set a dependency between the producer task (or its gate) and itself if node_id in (PRODUCER_WATCHER_TASK_ID, PRODUCER_WATCHER_DONE_TASK_ID): @@ -758,6 +764,11 @@ def _add_watcher_dependencies( ) for task in node_tasks: task.producer_task_id = producer_airflow_task.task_id # type: ignore[union-attr] + if needs_wait_for_downstream: + task.wait_for_downstream = True + + if needs_wait_for_downstream and not task_or_taskgroup.downstream_task_ids: + task_or_taskgroup >> producer_watcher_done_task # Make the producer task to be the parent of the root dbt nodes, without blocking them from sensing XCom # We only managed to do this in the case of DbtDag. @@ -1008,6 +1019,7 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro leaves_ids = calculate_leaves(tasks_ids=list(tasks_map.keys()), nodes=nodes) for leaf_node_id in leaves_ids: tasks_map[leaf_node_id] >> test_task + tasks_map[f"{dbt_project_name}_test"] = test_task elif render_config.test_behavior in (TestBehavior.BUILD, TestBehavior.AFTER_EACH): # Handle detached test nodes for node_id, node in detached_nodes.items(): From b54a38c67c2c7363147dd2c129004cd645389a41 Mon Sep 17 00:00:00 2001 From: John Horan Date: Mon, 27 Apr 2026 10:49:09 +0100 Subject: [PATCH 02/10] add test Co-authored-by: Copilot --- tests/airflow/test_graph.py | 75 +++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 0165ab1a2f..bc49288872 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -2144,3 +2144,78 @@ def test_add_watcher_producer_task_passes_freshness_callback_via_setup_operator_ task_metadata = mock_create_task.call_args[0][0] assert task_metadata.arguments["freshness_callback"] is my_callback + + +@pytest.mark.parametrize("depends_on_past", [False, True]) +@pytest.mark.parametrize("test_behavior", [TestBehavior.NONE, TestBehavior.AFTER_EACH, TestBehavior.AFTER_ALL]) +def test_watcher_dependency_wiring(test_behavior, depends_on_past): + with DAG("test-id", start_date=datetime(2022, 1, 1), default_args={"depends_on_past": depends_on_past}) as dag: + task_args = { + "project_dir": SAMPLE_PROJ_PATH, + "conn_id": "fake_conn", + "profile_config": ProfileConfig( + profile_name="default", + target_name="default", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="fake_conn", + profile_args={"schema": "public"}, + ), + ), + } + + child_2b = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.child2.v2_b", + resource_type=DbtResourceType.MODEL, + depends_on=[parent_node.unique_id], + path_base=SAMPLE_PROJ_PATH, + original_file_path=Path("gen3/models/child2_v2.sql"), + tags=["nightly"], + config={"materialized": "table", "meta": {"cosmos": {"operator_kwargs": {"pool": "custom_pool"}}}}, + has_test=True, + has_non_detached_test=True, + ) + child_2b_test = DbtNode( + unique_id=f"{DbtResourceType.TEST.value}.{SAMPLE_PROJ_PATH.stem}.child2.test_v2_b", + resource_type=DbtResourceType.TEST, + depends_on=[child_2b.unique_id], + path_base=Path("."), + original_file_path=Path("."), + ) + + build_airflow_graph( + nodes={child_2b.unique_id: child_2b, child_2b_test.unique_id: child_2b_test, **sample_nodes}, + dag=dag, + execution_mode=ExecutionMode.WATCHER, + test_indirect_selection=TestIndirectSelection.EAGER, + task_args=task_args, + render_config=RenderConfig( + test_behavior=test_behavior, + ), + dbt_project_name="astro_shop", + task_group=TaskGroup("tg", dag=dag) + ) + if not depends_on_past: + assert dag.task_dict["tg.dbt_producer_watcher_done"].upstream_task_ids == {"tg.dbt_producer_watcher"} + assert all(task.wait_for_downstream is False for task in dag.tasks) + return + + assert all(task.wait_for_downstream is True for task in dag.tasks if task.task_id != "tg.dbt_producer_watcher_done") + if test_behavior == TestBehavior.NONE: + assert dag.task_dict["tg.dbt_producer_watcher_done"].upstream_task_ids == { + "tg.child_run", + "tg.dbt_producer_watcher", + "tg.child2_v2_run", + "tg.child2_v2_b_run", + } + if test_behavior == TestBehavior.AFTER_EACH: + assert dag.task_dict["tg.dbt_producer_watcher_done"].upstream_task_ids == { + "tg.child_run", + "tg.dbt_producer_watcher", + "tg.child2_v2_run", + "tg.child2_v2_b.test", + } + if test_behavior == TestBehavior.AFTER_ALL: + assert dag.task_dict["tg.dbt_producer_watcher_done"].upstream_task_ids == { + "tg.dbt_producer_watcher", + "tg.astro_shop_test", + } From 4d52358acf208707c47b38dcb2dc8ff96fe55949 Mon Sep 17 00:00:00 2001 From: John Horan Date: Mon, 27 Apr 2026 12:32:55 +0100 Subject: [PATCH 03/10] add test Co-authored-by: Copilot --- cosmos/airflow/graph.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 917fb58af3..affdbcfec5 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -730,6 +730,7 @@ def _add_watcher_producer_task( task_id=PRODUCER_WATCHER_DONE_TASK_ID, ) producer_airflow_task >> producer_done_task + tasks_map[PRODUCER_WATCHER_DONE_TASK_ID] = producer_done_task return producer_airflow_task @@ -745,7 +746,8 @@ def _add_watcher_dependencies( Iterate through the watcher consumer tasks and: - set the producer task ID in all of them - make the producer task to be the parent of the root dbt nodes, without blocking them from sensing XCom - - if the producer task has depends_on_past=True, set wait_for_downstream=True in the producer and all its downstream tasks + - if the producer task has depends_on_past=True, set wait_for_downstream=True in the producer and all its downstream + tasks so the entire task group behaves as a single unit that needs to succeed for the next run to start. """ producer_watcher_done_task = next((t for t in producer_airflow_task.downstream_list if t.task_id.endswith(PRODUCER_WATCHER_DONE_TASK_ID)), None) needs_wait_for_downstream = producer_watcher_done_task is not None and producer_airflow_task.depends_on_past @@ -763,7 +765,8 @@ def _add_watcher_dependencies( else [task_or_taskgroup] ) for task in node_tasks: - task.producer_task_id = producer_airflow_task.task_id # type: ignore[union-attr] + if hasattr(task, "producer_task_id"): + task.producer_task_id = producer_airflow_task.task_id # type: ignore[union-attr] if needs_wait_for_downstream: task.wait_for_downstream = True From e6b22edb6cc4b789601afab7ee5df43650bc9bb0 Mon Sep 17 00:00:00 2001 From: John Horan Date: Mon, 27 Apr 2026 12:34:35 +0100 Subject: [PATCH 04/10] use task_map --- cosmos/airflow/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index affdbcfec5..e733dbb746 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -749,7 +749,7 @@ def _add_watcher_dependencies( - if the producer task has depends_on_past=True, set wait_for_downstream=True in the producer and all its downstream tasks so the entire task group behaves as a single unit that needs to succeed for the next run to start. """ - producer_watcher_done_task = next((t for t in producer_airflow_task.downstream_list if t.task_id.endswith(PRODUCER_WATCHER_DONE_TASK_ID)), None) + producer_watcher_done_task = tasks_map.get(PRODUCER_WATCHER_DONE_TASK_ID) needs_wait_for_downstream = producer_watcher_done_task is not None and producer_airflow_task.depends_on_past if needs_wait_for_downstream: producer_airflow_task.wait_for_downstream = True From 271f4f1947f8eb0bca9fb0d9924fe8b70dd59524 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 27 Apr 2026 11:53:46 +0000 Subject: [PATCH 05/10] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/airflow/graph.py | 2 +- tests/airflow/test_graph.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index e733dbb746..309550f70d 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -771,7 +771,7 @@ def _add_watcher_dependencies( task.wait_for_downstream = True if needs_wait_for_downstream and not task_or_taskgroup.downstream_task_ids: - task_or_taskgroup >> producer_watcher_done_task + task_or_taskgroup >> producer_watcher_done_task # Make the producer task to be the parent of the root dbt nodes, without blocking them from sensing XCom # We only managed to do this in the case of DbtDag. diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index bc49288872..f08f179d55 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -2192,7 +2192,7 @@ def test_watcher_dependency_wiring(test_behavior, depends_on_past): test_behavior=test_behavior, ), dbt_project_name="astro_shop", - task_group=TaskGroup("tg", dag=dag) + task_group=TaskGroup("tg", dag=dag), ) if not depends_on_past: assert dag.task_dict["tg.dbt_producer_watcher_done"].upstream_task_ids == {"tg.dbt_producer_watcher"} From 9731143fd1536c16aac5441711563820bd7c4f79 Mon Sep 17 00:00:00 2001 From: John Horan Date: Mon, 27 Apr 2026 14:50:55 +0100 Subject: [PATCH 06/10] refactor around mcabe limit Co-authored-by: Copilot --- cosmos/airflow/graph.py | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 309550f70d..e28b32dcb0 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -776,21 +776,29 @@ def _add_watcher_dependencies( # Make the producer task to be the parent of the root dbt nodes, without blocking them from sensing XCom # We only managed to do this in the case of DbtDag. # The way it is implemented is by setting the trigger_rule to "always" for the consumer tasks, and by having the producer task with a high priority_weight. - if "DbtDag" in dag.__class__.__name__: + + if ( + "DbtDag" in dag.__class__.__name__ + and nodes + and node_id in nodes + and not set(nodes[node_id].depends_on).intersection(nodes) + ): # Is this dbt node a root of the (subset of the) dbt project? # Note: this may happen in one scenarios: # - the dbt node not having any `depends_on` within the user-selected `nodes` - if nodes and node_id in nodes and not set(nodes[node_id].depends_on).intersection(nodes): - producer_airflow_task >> task_or_taskgroup - if isinstance(task_or_taskgroup, TaskGroup): - taskgroup = task_or_taskgroup - always_run_tasks = [ - task for task in node_tasks if not set(task.upstream_task_ids).intersection(taskgroup.children) - ] - else: - always_run_tasks = [task_or_taskgroup] - for task in always_run_tasks: - task.trigger_rule = task_args.get("trigger_rule", "always") # type: ignore[union-attr] + producer_airflow_task >> task_or_taskgroup + always_run_tasks = ( + [ + task + for task in node_tasks + if not set(task.upstream_task_ids).intersection(task_or_taskgroup.children) + ] + if isinstance(task_or_taskgroup, TaskGroup) + else [task_or_taskgroup] + ) + + for task in always_run_tasks: + task.trigger_rule = task_args.get("trigger_rule", "always") # type: ignore[union-attr] def should_create_detached_nodes(render_config: RenderConfig) -> bool: From a5436e09699b038e0a0eac3352ca5972e0c48ab6 Mon Sep 17 00:00:00 2001 From: John Horan Date: Mon, 27 Apr 2026 14:55:26 +0100 Subject: [PATCH 07/10] mypy --- cosmos/airflow/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index e28b32dcb0..9d2f2fdbaf 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -768,7 +768,7 @@ def _add_watcher_dependencies( if hasattr(task, "producer_task_id"): task.producer_task_id = producer_airflow_task.task_id # type: ignore[union-attr] if needs_wait_for_downstream: - task.wait_for_downstream = True + task.wait_for_downstream = True # type: ignore[union-attr] if needs_wait_for_downstream and not task_or_taskgroup.downstream_task_ids: task_or_taskgroup >> producer_watcher_done_task From 061d706b860e2db1f0b74b8d939a8c2e5a3a7c46 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 27 Apr 2026 13:55:54 +0000 Subject: [PATCH 08/10] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/airflow/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 9d2f2fdbaf..adf5bef710 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -768,7 +768,7 @@ def _add_watcher_dependencies( if hasattr(task, "producer_task_id"): task.producer_task_id = producer_airflow_task.task_id # type: ignore[union-attr] if needs_wait_for_downstream: - task.wait_for_downstream = True # type: ignore[union-attr] + task.wait_for_downstream = True # type: ignore[union-attr] if needs_wait_for_downstream and not task_or_taskgroup.downstream_task_ids: task_or_taskgroup >> producer_watcher_done_task From 4c0931ce0b106d6fb0d3ce18ea24a060127ea71a Mon Sep 17 00:00:00 2001 From: John Horan Date: Mon, 27 Apr 2026 15:09:22 +0100 Subject: [PATCH 09/10] update comment Co-authored-by: Copilot --- cosmos/airflow/graph.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index adf5bef710..0a1218ea46 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -746,8 +746,9 @@ def _add_watcher_dependencies( Iterate through the watcher consumer tasks and: - set the producer task ID in all of them - make the producer task to be the parent of the root dbt nodes, without blocking them from sensing XCom - - if the producer task has depends_on_past=True, set wait_for_downstream=True in the producer and all its downstream - tasks so the entire task group behaves as a single unit that needs to succeed for the next run to start. + - if the producer task has depends_on_past=True, set wait_for_downstream=True in the producer and all its + watcher tasks, and put them upstream of the producer_watcher_done task so the entire task group + behaves as a single unit that needs to succeed for the next run to start. """ producer_watcher_done_task = tasks_map.get(PRODUCER_WATCHER_DONE_TASK_ID) needs_wait_for_downstream = producer_watcher_done_task is not None and producer_airflow_task.depends_on_past From ec12577f93e520f5d7f55717e2f4cb96e92814aa Mon Sep 17 00:00:00 2001 From: John Horan Date: Tue, 28 Apr 2026 17:33:44 +0100 Subject: [PATCH 10/10] resolve test failure --- tests/test_converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_converter.py b/tests/test_converter.py index 8c3dba5da1..1ea9a04a0a 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -414,7 +414,7 @@ def test_converter_creates_dag_with_test_with_multiple_parents_test_afterall(): ) tasks = converter.tasks_map - assert len(converter.tasks_map) == 3 + assert len(converter.tasks_map) == 4 assert tasks["model.my_dbt_project.combined_model"].task_id == "combined_model_run" assert tasks["model.my_dbt_project.model_a"].task_id == "model_a_run"