Prevent watcher producer skip propagating to downstream tasks via trigger_rule#2591
Prevent watcher producer skip propagating to downstream tasks via trigger_rule#2591tatiana wants to merge 26 commits into
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2591 +/- ##
==========================================
- Coverage 98.04% 97.96% -0.09%
==========================================
Files 104 104
Lines 7737 7761 +24
==========================================
+ Hits 7586 7603 +17
- Misses 151 158 +7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…er skips it will work if: users do dbt_task_group >> downstream task if users do downstream task << dbt_task_group if users do dbt_group.set_downstream(downstream task) It will not work if users do post_dbt.set_upstream(dbt_group)
740ad75 to
58eaa09
Compare
DbtTaskGroup downstream tasksDbtTaskGroup watcher producer skip from propagating downstream tasks
There was a problem hiding this comment.
Pull request overview
This PR introduces an opt-in mitigation for a Watcher-mode DbtTaskGroup behavior where the producer can be skipped on retry and (via Airflow’s default all_success) inadvertently cause tasks downstream of the TaskGroup to be skipped even when all consumer tasks succeeded.
Changes:
- Adds
propagate_watcher_trigger_ruleconfig (and env var) to opt in to the new behavior. - Updates
DbtTaskGroupdependency wiring (>>,<<,set_downstream) to set downstream tasks’trigger_rule="none_failed"in watcher modes when the setting is enabled. - Adds integration tests + a new example DAG and dbt project to reproduce/validate the behavior.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
cosmos/airflow/task_group.py |
Overrides downstream wiring to apply trigger_rule="none_failed" to downstream tasks when watcher + opt-in setting. |
cosmos/settings.py |
Introduces propagate_watcher_trigger_rule boolean setting. |
tests/operators/test_watcher.py |
Adds integration tests covering default behavior vs opt-in behavior. |
docs/reference/configs/cosmos-conf.rst |
Documents the new propagate_watcher_trigger_rule setting and limitations. |
docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst |
Expands watcher retry docs and documents the new setting in the watcher guide. |
dev/failed_dags/example_watcher_downstream_not_skipped.py |
Adds a reproduction DAG demonstrating downstream-skip behavior and the opt-in fix. |
dev/dags/dbt/watcher_downstream_not_skipped/dbt_project.yml |
Adds a small dbt project used by tests/repro DAG. |
dev/dags/dbt/watcher_downstream_not_skipped/models/model_a.sql |
Sample model for the repro dbt project. |
dev/dags/dbt/watcher_downstream_not_skipped/models/model_retry.sql |
Model designed to fail once (via sequence pre-hook) to exercise watcher retry/skip flow. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…ped as TriggerRule enum in AF3 but accepts string values at runtime. This matches the pattern used elsewhere in Cosmos (e.g., graph.py).
https://github.com/astronomer/astronomer-cosmos/pull/2591\#discussion_r3124199904 - test_rshift_sets_trigger_rule_on_downstream_task_group_roots — updated to use get_roots() instead of children - test_set_downstream_sets_trigger_rule — covers set_downstream() path - test_rlshift_sets_trigger_rule — covers << path (__rlshift__)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
https://github.com/astronomer/astronomer-cosmos/pull/2591\#discussion_r3124372405 https://github.com/astronomer/astronomer-cosmos/pull/2591\#discussion_r3124372373 All references updated across code, tests, and docs — none_failed → none_failed_min_one_success everywhere. This ensures downstream tasks won't run when the entire group is skipped (e.g., via ShortCircuit), only when at least one upstream succeeded.
DbtTaskGroup watcher producer skip from propagating downstream tasksThere was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return result | ||
|
|
||
| def __rlshift__(self, other: Any) -> Any: | ||
| # other << dbt_group — other is downstream of dbt_group |
There was a problem hiding this comment.
__rlshift__ is commented as supporting other << dbt_group (dependency created from the downstream task side), but the docs for propagate_watcher_trigger_rule explicitly call out that task << dbt_group / task.set_upstream(dbt_group) cannot be intercepted. Unless there is a concrete Airflow code path that actually invokes DbtTaskGroup.__rlshift__ for this expression, this method/comment is misleading and may give a false sense that the limitation is handled. Consider removing __rlshift__ (and its test) or updating the comment/docstring to clarify when (if ever) it is invoked.
| # other << dbt_group — other is downstream of dbt_group | |
| # Reflected ``<<`` hook used only when Python/Airflow dispatch resolves to this | |
| # TaskGroup instance. This is not a general interception point for | |
| # ``task << dbt_group`` / ``task.set_upstream(dbt_group)``; watcher trigger-rule | |
| # propagation must not rely on this method being invoked for those cases. |
| @patch("cosmos.settings.propagate_watcher_trigger_rule", True) | ||
| def test_rlshift_sets_trigger_rule(): | ||
| tg = _make_task_group(ExecutionMode.WATCHER) | ||
| task = MagicMock(trigger_rule="all_success") | ||
|
|
||
| tg.__rlshift__(task) | ||
|
|
||
| assert task.trigger_rule == "none_failed_min_one_success" |
There was a problem hiding this comment.
This test calls tg.__rlshift__(task) directly, which doesn't reflect the supported user-facing wiring patterns documented for this feature (only dependencies created from the DbtTaskGroup side, e.g. dbt_group >> task / dbt_group.set_downstream(task)). If task << dbt_group is not interceptable in practice, this test may be asserting behavior that users can't rely on. Prefer removing this test or rewriting it to cover only supported patterns.
| @patch("cosmos.settings.propagate_watcher_trigger_rule", True) | |
| def test_rlshift_sets_trigger_rule(): | |
| tg = _make_task_group(ExecutionMode.WATCHER) | |
| task = MagicMock(trigger_rule="all_success") | |
| tg.__rlshift__(task) | |
| assert task.trigger_rule == "none_failed_min_one_success" |
|
Closed as per discussion: #2594 |
…eway task (#2597) Co-authored-by: John Horan <jhoran@zendesk.com> When using `ExecutionMode.WATCHER` with `DbtTaskGroup`, the producer task may be skipped on retry (via `AirflowSkipException`) since #2559. Because the producer is a leaf task inside the `TaskGroup`, Airflow's default `trigger_rule="all_success"` causes any tasks downstream of the group to be skipped as well - even when all consumer tasks succeeded. **This makes `ExecutionMode.WATCHER` behave differently from `ExecutionMode.LOCAL`** when used with `DbtTaskGroup`. This issue does not happen in `DbtDag`. This PR introduces a downstream gateway task (`dbt_producer_watcher_done`) for the producer when in watcher mode. This ensures the producer's skip state does not propagate outside the task group, since the gateway task has `trigger_rule=none_failed`. This solution was originally proposed by @johnhoran in #2430. The code has changed significantly since, so we're opening a new PR and adding him as a co-author. It is an alternative approach to #2591. **Behaviour before and after** The newly introduced DAG `example_watcher_downstream_not_skipped` illustrates the problematic behaviour this PR aims to address. Video before this change: https://github.com/user-attachments/assets/3cd03975-3c97-44c5-bf6f-8f56796815a9 Video after this change: https://github.com/user-attachments/assets/3a50b882-9ed0-4d16-b5ae-662bec76b4e3 **Alternative approaches** - Cosmos changing the `trigger_rule` of the downstream tasks as implemented in #2591 - Accept that `ExecutionMode.LOCAL` and `ExecutionMode.WATCHER` are not fully compatible and document that users should set `trigger_rule="none_failed_min_one_success"` manually **Implications** - The DAG topology is changed to include a new `EmptyOperator` task (`dbt_producer_watcher_done`) **Unsolved problem: `depends_on_past`** This PR does not solve the problem: #2596. PR #2430 also doesn't solve the `depends_on_past` race condition. The PR fixes the retry/skip mechanism (backup XCom, restore on retry, skip producer on second attempt), but it doesn't prevent Run 2's producer from starting while Run 1's consumers are still running their fallback dbt commands. Closes #2594
…eway task (#2597) Co-authored-by: John Horan <jhoran@zendesk.com> When using `ExecutionMode.WATCHER` with `DbtTaskGroup`, the producer task may be skipped on retry (via `AirflowSkipException`) since #2559. Because the producer is a leaf task inside the `TaskGroup`, Airflow's default `trigger_rule="all_success"` causes any tasks downstream of the group to be skipped as well - even when all consumer tasks succeeded. **This makes `ExecutionMode.WATCHER` behave differently from `ExecutionMode.LOCAL`** when used with `DbtTaskGroup`. This issue does not happen in `DbtDag`. This PR introduces a downstream gateway task (`dbt_producer_watcher_done`) for the producer when in watcher mode. This ensures the producer's skip state does not propagate outside the task group, since the gateway task has `trigger_rule=none_failed`. This solution was originally proposed by @johnhoran in #2430. The code has changed significantly since, so we're opening a new PR and adding him as a co-author. It is an alternative approach to #2591. **Behaviour before and after** The newly introduced DAG `example_watcher_downstream_not_skipped` illustrates the problematic behaviour this PR aims to address. Video before this change: https://github.com/user-attachments/assets/3cd03975-3c97-44c5-bf6f-8f56796815a9 Video after this change: https://github.com/user-attachments/assets/3a50b882-9ed0-4d16-b5ae-662bec76b4e3 **Alternative approaches** - Cosmos changing the `trigger_rule` of the downstream tasks as implemented in #2591 - Accept that `ExecutionMode.LOCAL` and `ExecutionMode.WATCHER` are not fully compatible and document that users should set `trigger_rule="none_failed_min_one_success"` manually **Implications** - The DAG topology is changed to include a new `EmptyOperator` task (`dbt_producer_watcher_done`) **Unsolved problem: `depends_on_past`** This PR does not solve the problem: #2596. PR #2430 also doesn't solve the `depends_on_past` race condition. The PR fixes the retry/skip mechanism (backup XCom, restore on retry, skip producer on second attempt), but it doesn't prevent Run 2's producer from starting while Run 1's consumers are still running their fallback dbt commands. Closes #2594 (cherry picked from commit 3a138fd)
When using
ExecutionMode.WATCHERwithDbtTaskGroup, the producer task may be skipped on retry (viaAirflowSkipException) since #2559. Because the producer is a leaf task inside theTaskGroup, Airflow's defaulttrigger_rule="all_success"causes any tasks downstream of the group to be skipped as well - even when all consumer tasks succeeded. This makesExecutionMode.WATCHERbehave differently fromExecutionMode.LOCALwhen used withDbtTaskGroup.This PR introduces an opt-in setting
propagate_watcher_trigger_rulethat overrides__rshift__andset_downstreamonDbtTaskGroupto automatically settrigger_rule="none_failed_min_one_success"on downstream tasks when in watcher mode. This ensures the producer's skip state does not propagate outside the task group, while still requiring at least one upstream task to succeed (preventing downstream tasks from running when the entire group is skipped, e.g. via ShortCircuit).It is an alternative approach to It is an alternative approach to #2597.
This is an opt-in feature. Users enable it via:
Behaviour before and after
The newly introduced DAG
example_watcher_downstream_not_skippedillustrates the problematic behaviour this PR aims to address.Video before this change:
cosmos-1.14.1-downstream-tasks-before-fix.mp4
Video with
AIRFLOW__COSMOS__PROPAGATE_WATCHER_TRIGGER_RULEenabled:cosmos-1.14.1-downstream-tasks-after-fix.mp4
Why not set producer >> consumers inside the TaskGroup?
In
DbtDag, we setproducer >> consumersso the producer's skip does not propagate to downstream tasks. The root consumers usetrigger_rule="always"so they start watching XCom immediately without waiting for the producer to complete.In
DbtTaskGroup, this approach does not work because settingtrigger_rule="always"on consumers would cause them to run even when the user-defined upstream tasks of the task group have not yet completed, breaking the expected execution order.Alternative approaches
ExecutionMode.LOCALandExecutionMode.WATCHERare not fully compatible and document that users should settrigger_rule="none_failed_min_one_success"manuallyLimitations
DbtTaskGroupside (dbt_group >> taskordbt_group.set_downstream(task)). Does not work withtask.set_upstream(dbt_group)ortask << dbt_group, since Cosmos cannot intercept methods on tasks it does not control.trigger_ruleon downstream tasks with"none_failed_min_one_success".Unsolved problem:
depends_on_pastThis PR does not solve the problem: #2596.
PR #2430 also doesn't solve the
depends_on_pastrace condition. The PR fixes the retry/skip mechanism (backup XCom, restore on retry, skip producer on second attempt), but it doesn't prevent Run 2's producer from starting while Run 1's consumers are still running their fallback dbt commands.Closes #2594