Prevent watcher producer skip propagating to downstream tasks via gateway task#2597
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses an Airflow watcher-mode edge case where the WATCHER producer task can be skipped on retry inside a DbtTaskGroup, causing Airflow’s default all_success trigger rule to skip tasks downstream of the TaskGroup even when all consumer tasks succeed.
Changes:
- Adds an internal “gateway”
EmptyOperator(dbt_producer_watcher_done) downstream of the watcher producer insideDbtTaskGroupto absorb producer skip propagation. - Updates integration tests and documentation to reflect the new TaskGroup topology and watcher retry behavior.
- Adds a new repro DAG and dbt project under
dev/to demonstrate/validate the downstream-not-skipped behavior.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
cosmos/airflow/graph.py |
Creates and wires the new producer “done” gateway task for watcher TaskGroups; updates watcher dependency loop. |
cosmos/operators/_watcher/base.py |
Introduces create_producer_done_task() helper that builds the gateway EmptyOperator with TriggerRule.NONE_FAILED. |
tests/operators/test_watcher.py |
Updates integration assertions for the additional gateway task and producer downstream dependencies. |
docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst |
Documents updated producer retry behavior and the new TaskGroup-only gateway task. |
dev/failed_dags/example_watcher_downstream_not_skipped.py |
Adds a repro DAG to demonstrate downstream tasks are not skipped when producer is skipped on retry. |
dev/dags/dbt/watcher_downstream_not_skipped/dbt_project.yml |
Adds a dbt project used by the new repro DAG. |
dev/dags/dbt/watcher_downstream_not_skipped/models/model_a.sql |
Adds a simple base model for the repro project. |
dev/dags/dbt/watcher_downstream_not_skipped/models/model_retry.sql |
Adds a model that intentionally fails once to force the watcher retry/skip path. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2597 +/- ##
=======================================
Coverage 98.04% 98.05%
=======================================
Files 104 104
Lines 7737 7752 +15
=======================================
+ Hits 7586 7601 +15
Misses 151 151 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
pankajkoti
left a comment
There was a problem hiding this comment.
LGTM. I believe this is the right direction. Thanks a lot for finding a simpler approach than #2591
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…eway task (#2597) Co-authored-by: John Horan <jhoran@zendesk.com> When using `ExecutionMode.WATCHER` with `DbtTaskGroup`, the producer task may be skipped on retry (via `AirflowSkipException`) since #2559. Because the producer is a leaf task inside the `TaskGroup`, Airflow's default `trigger_rule="all_success"` causes any tasks downstream of the group to be skipped as well - even when all consumer tasks succeeded. **This makes `ExecutionMode.WATCHER` behave differently from `ExecutionMode.LOCAL`** when used with `DbtTaskGroup`. This issue does not happen in `DbtDag`. This PR introduces a downstream gateway task (`dbt_producer_watcher_done`) for the producer when in watcher mode. This ensures the producer's skip state does not propagate outside the task group, since the gateway task has `trigger_rule=none_failed`. This solution was originally proposed by @johnhoran in #2430. The code has changed significantly since, so we're opening a new PR and adding him as a co-author. It is an alternative approach to #2591. **Behaviour before and after** The newly introduced DAG `example_watcher_downstream_not_skipped` illustrates the problematic behaviour this PR aims to address. Video before this change: https://github.com/user-attachments/assets/3cd03975-3c97-44c5-bf6f-8f56796815a9 Video after this change: https://github.com/user-attachments/assets/3a50b882-9ed0-4d16-b5ae-662bec76b4e3 **Alternative approaches** - Cosmos changing the `trigger_rule` of the downstream tasks as implemented in #2591 - Accept that `ExecutionMode.LOCAL` and `ExecutionMode.WATCHER` are not fully compatible and document that users should set `trigger_rule="none_failed_min_one_success"` manually **Implications** - The DAG topology is changed to include a new `EmptyOperator` task (`dbt_producer_watcher_done`) **Unsolved problem: `depends_on_past`** This PR does not solve the problem: #2596. PR #2430 also doesn't solve the `depends_on_past` race condition. The PR fixes the retry/skip mechanism (backup XCom, restore on retry, skip producer on second attempt), but it doesn't prevent Run 2's producer from starting while Run 1's consumers are still running their fallback dbt commands. Closes #2594 (cherry picked from commit 3a138fd)
## Summary - Extend the `skipif` on `test_dbt_task_group_watcher_gateway_prevents_downstream_skip` to also cover Airflow 3.0. - Unblocks main, which started failing in the `Run-Integration-Tests (3.10/3.11/3.12, 3.0, 1.11, 2)` jobs right after #2597 merged. This PR is scoped purely to a test skip to restore CI. It does not change production code. ## Why the test broke on main The test was added in #2597. Its PR CI was green because #2597's head was branched off main *before* #2592 merged — the two changes were merged together without a re-run, so main is the first place they run as a pair. On Airflow 3.0, `dag.test()` runs tasks inline via `ti._run_raw_task` without starting the task SDK supervisor, so `SUPERVISOR_COMMS` is never initialised. `RuntimeTaskInstance.get_task_states` raises `NameError`, and `fetch_state_airflow3` in `cosmos/operators/_watcher/state.py` returns `None`. Under the retry behaviour introduced in #2592, an unknown producer state makes the consumer sensor keep polling instead of falling back to a local dbt run. The sensor re-reads the stale `"error"` status from the producer's first attempt and exhausts retries, so the DAG finishes `failed`. Airflow 3.2 rebuilt `dag.test()` on top of `run_task_in_process`, which sets up an in-process supervisor, so the same code path works there. Airflow 3.1.x is already skipped because of the unrelated `SetRenderedFields` retry crash. ## Follow-up (not in this PR) A more durable fix would be to make `fetch_state_airflow3` fall back to a direct DB query when `SUPERVISOR_COMMS` is unavailable, mirroring the Airflow 2 path. That would let this test run on Airflow 3.0 again. It is out of scope here. ## Test plan - [x] `hatch run tests.py3.11-3.0-1.9:pytest -v -m integration tests/operators/test_watcher.py::test_dbt_task_group_watcher_gateway_prevents_downstream_skip` → SKIPPED locally - [ ] CI green on Airflow 2.10, 2.11, 3.0, 3.1, 3.2 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
## Summary - Extend the `skipif` on `test_dbt_task_group_watcher_gateway_prevents_downstream_skip` to also cover Airflow 3.0. - Unblocks main, which started failing in the `Run-Integration-Tests (3.10/3.11/3.12, 3.0, 1.11, 2)` jobs right after #2597 merged. This PR is scoped purely to a test skip to restore CI. It does not change production code. ## Why the test broke on main The test was added in #2597. Its PR CI was green because #2597's head was branched off main *before* #2592 merged — the two changes were merged together without a re-run, so main is the first place they run as a pair. On Airflow 3.0, `dag.test()` runs tasks inline via `ti._run_raw_task` without starting the task SDK supervisor, so `SUPERVISOR_COMMS` is never initialised. `RuntimeTaskInstance.get_task_states` raises `NameError`, and `fetch_state_airflow3` in `cosmos/operators/_watcher/state.py` returns `None`. Under the retry behaviour introduced in #2592, an unknown producer state makes the consumer sensor keep polling instead of falling back to a local dbt run. The sensor re-reads the stale `"error"` status from the producer's first attempt and exhausts retries, so the DAG finishes `failed`. Airflow 3.2 rebuilt `dag.test()` on top of `run_task_in_process`, which sets up an in-process supervisor, so the same code path works there. Airflow 3.1.x is already skipped because of the unrelated `SetRenderedFields` retry crash. ## Follow-up (not in this PR) A more durable fix would be to make `fetch_state_airflow3` fall back to a direct DB query when `SUPERVISOR_COMMS` is unavailable, mirroring the Airflow 2 path. That would let this test run on Airflow 3.0 again. It is out of scope here. ## Test plan - [x] `hatch run tests.py3.11-3.0-1.9:pytest -v -m integration tests/operators/test_watcher.py::test_dbt_task_group_watcher_gateway_prevents_downstream_skip` → SKIPPED locally - [ ] CI green on Airflow 2.10, 2.11, 3.0, 3.1, 3.2 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> (cherry picked from commit 238bc55)
Bug Fixes * Fix ``ExecutionMode.WATCHER`` producer retry behaviour by @tatiana in #2559 * Prevent watcher producer skip propagating to downstream tasks via gateway task by @johnhoran and @tatiana in #2597 * Keep watcher sensor polling when producer is still running by @pankajkoti in #2592 * Fix circular import error in Cosmos plugin discovery under Astro Runtime by @tatiana in #2538 * Fix ``CosmosRichLogger`` crash on ``None`` log message by @tatiana in #2540 * Enable inlets and outlets using dbt Fusion on Airflow 3 by @ichirotakami in #2561 * Fix incorrectly skipped source downstream tasks in ``ExecutionMode.WATCHER`` by @pankajastro in #2563 * Fix duplicate logs in ``dbt build`` when source freshness is enabled by @pankajastro in #2564 * Warn and normalize when ``source_rendering_behavior=None`` is passed by @pankajastro in #2570 * Gracefully handle ``Variable.set()`` failures on Astro Remote Execution by @hkc-8010 in #2573 * Skip malformed YAML selectors instead of failing entirely by @YourRoyalLinus in #2577 Docs * Update watcher test behavior docs for Cosmos 1.14.0 by @tatiana in #2549 * Add redirect for moved partial-parsing docs page by @tatiana in #2550 * Document ``ExecutionMode.WATCHER`` and ``depends_on_past`` limitation by @tatiana in #2602 * Restore memory-optimised imports docs for Cosmos < 1.14.0 by @pankajkoti in #2604 Others * Speed up Airflow 3.1+ integration tests by caching InProcessExecutionAPI by @pankajkoti in #2547 * Improve stability of cache hash unit tests by @tatiana in #2539 * Fix mypy 1.20.0 type check failures by @pankajkoti in #2546 * Fix CI failures caused by docs build memory exhaustion by @pankajkoti in #2580 * Fix dbt Fusion broken integration tests by @tatiana in #2581 * Fix flaky ``cosmos_manifest_selectors_example`` DAG in CI by @pankajkoti in #2593 * Reduce pre-commit autoupdate frequency PRs by @tatiana in #2544 * Bump ``reviewdog/action-actionlint`` from 1.71.0 to 1.72.0 by @dependabot in #2542 * Skip watcher gateway test on Airflow 3.0 by @tatiana in #2607 closes: astronomer/oss-integrations-private#381
When using
ExecutionMode.WATCHERwithDbtTaskGroup, the producer task may be skipped on retry (viaAirflowSkipException) since #2559. Because the producer is a leaf task inside theTaskGroup, Airflow's defaulttrigger_rule="all_success"causes any tasks downstream of the group to be skipped as well - even when all consumer tasks succeeded. This makesExecutionMode.WATCHERbehave differently fromExecutionMode.LOCALwhen used withDbtTaskGroup. This issue does not happen inDbtDag.This PR introduces a downstream gateway task (
dbt_producer_watcher_done) for the producer when in watcher mode. This ensures the producer's skip state does not propagate outside the task group, since the gateway task hastrigger_rule=none_failed.This solution was originally proposed by @johnhoran in #2430. The code has changed significantly since, so we're opening a new PR and adding him as a co-author. It is an alternative approach to #2591.
Co-authored-by: John Horan jhoran@zendesk.com
Behaviour before and after
The newly introduced DAG
example_watcher_downstream_not_skippedillustrates the problematic behaviour this PR aims to address.Video before this change:
cosmos-1.14.1-downstream-tasks-before-fix.mp4
Video after this change:
cosmos-1.14.1-downstream-tasks-after-fix-gateway.mov
Alternative approaches
trigger_ruleof the downstream tasks as implemented in Prevent watcher producer skip propagating to downstream tasks via trigger_rule #2591ExecutionMode.LOCALandExecutionMode.WATCHERare not fully compatible and document that users should settrigger_rule="none_failed_min_one_success"manuallyImplications
EmptyOperatortask (dbt_producer_watcher_done)Unsolved problem:
depends_on_pastThis PR does not solve the problem: #2596.
PR #2430 also doesn't solve the
depends_on_pastrace condition. The PR fixes the retry/skip mechanism (backup XCom, restore on retry, skip producer on second attempt), but it doesn't prevent Run 2's producer from starting while Run 1's consumers are still running their fallback dbt commands.Closes #2594