Keep watcher sensor polling when producer is still running#2592
Conversation
Previously, any retry of a watcher consumer sensor (try_number > 1) unconditionally fell back to a non-watcher dbt run. This meant that if the sensor timed out while the producer was still working, the retry would launch a duplicate dbt invocation instead of continuing to wait for the producer's XCom updates. On retry, first check the producer task state. If the producer has reached a terminal state (success, failed, skipped, upstream_failed, removed), fall back to a non-watcher run as before — this covers both producer completion and manual task clears from the UI. Otherwise, keep polling so the sensor can pick up where it left off without spawning a redundant run. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Seed raw_orders and raw_payments in cosmos_manifest_selectors_example so the DAG is self-contained regardless of pytest-split ordering. This mirrors the change in #2593; remove this commit once that PR merges. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2592 +/- ##
=======================================
Coverage 98.04% 98.05%
=======================================
Files 104 104
Lines 7737 7747 +10
=======================================
+ Hits 7586 7596 +10
Misses 151 151 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Adjusts watcher consumer sensor retry behavior to avoid spawning duplicate dbt runs when the watcher producer is still running, by checking the producer task state and only falling back to a non-watcher run when the producer is in a terminal state.
Changes:
- Add producer-terminal state detection (
PRODUCER_TERMINAL_STATES,is_producer_task_terminated) and use it during consumer-sensor retries. - Update watcher consumer sensor retry flow to keep polling when the producer is still active, otherwise fall back to non-watcher execution.
- Extend unit tests to cover “retry + producer running” vs “retry + producer terminated” scenarios (local and Kubernetes).
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
cosmos/operators/_watcher/base.py |
Implements retry handler that conditionally falls back or continues polling based on producer task state. |
cosmos/operators/_watcher/state.py |
Defines producer terminal states and helper to classify producer task termination. |
tests/operators/_watcher/test_state.py |
Adds unit tests for the new producer-terminal-state helper. |
tests/operators/test_watcher.py |
Updates/expands watcher sensor tests to validate retry behavior for running vs terminated producer. |
tests/operators/test_watcher_kubernetes_unit.py |
Adds Kubernetes watcher sensor retry test to ensure polling continues when producer is running. |
dev/dags/cosmos_manifest_selectors_example.py |
Adjusts example selector list to include additional upstream sources. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
The `pre_condition` task group in `cosmos_manifest_selectors_example`
used `select=["+customers"]`, which left the DAG dependent on state
leaked from other tests. This made the integration test
`test_example_dag[cosmos_manifest_selectors_example]` flaky; passing
only when `pytest-split` happened to order another jaffle_shop DAG
before it in the same split (which pre-populated the required tables in
Postgres).
**Root cause**
Two gaps in the +customers selection:
1. Orphan seeds. In `altered_jaffle_shop`, `stg_orders` and
`stg_payments` read their data via `source('postgres_db', 'raw_orders' |
'raw_payments')`. The corresponding seeds (`raw_orders`,
`raw_payments`) are orphans in the manifest, nothing references them, so
`+` traversal skips them, and they never get loaded. `raw_customers` is
pulled in because each staging model has a `force_seed_dep CTE that does
select * from {{ ref('raw_customers') }}`.
2. Missing `orders` model. The
`relationships_orders_customer_id__customer_id__ref_customers_ test` is
attached to both `customers` and `orders` and queries `public.orders`.
`+customers` pulls the test in (it's a child of customers) but doesn't
build the orders model, so `customers.test` fails with `relation
public.orders" does not exist`. This also matters because the downstream
`local_example` / `aws_s3_example` / `gcp_gs_example` /
`azure_abfs_example` task groups all run the critical_path selector,
which is the union of `customers` and `orders`, so pre_condition needs
to leave both models present.
**Fix**
Change the pre_condition selector to:
`select=["+customers", "+orders", "raw_orders", "raw_payments"]`
- `+customers` / `+orders` build both final models and their upstream
`stg_*` models (and pull in `raw_customers` via `ref`)
- `raw_orders`, `raw_payments` explicitly seed the two orphan seeds so
the `source()` reads in `stg_orders` / `stg_payments` resolve
related: #2562
related: #2592
---------
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tatiana
left a comment
There was a problem hiding this comment.
Looks great, thanks a lot, @pankajkoti for creating a new PR and rebasing after the many changes introduced in the meantime.
Previously, any retry of a watcher consumer sensor (try_number > 1) unconditionally fell back to a non-watcher dbt run. This meant that if the sensor timed out while the producer was still working, the retry would launch a duplicate dbt invocation instead of continuing to wait for the producer's XCom updates. On retry, first check the producer task state. If the producer has reached a terminal state (success, failed, skipped, upstream_failed, removed), fall back to a non-watcher run as before — this covers both producer completion and manual task clears from the UI. Otherwise, keep polling so the sensor can pick up where it left off without spawning a redundant run. closes: astronomer/oss-integrations-private#359 --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> (cherry picked from commit 8a9e1c9)
The cherry-pick for #2592 was reverted, so drop its entry from the Bug Fixes section to keep the changelog consistent with the branch state. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
## Summary - Extend the `skipif` on `test_dbt_task_group_watcher_gateway_prevents_downstream_skip` to also cover Airflow 3.0. - Unblocks main, which started failing in the `Run-Integration-Tests (3.10/3.11/3.12, 3.0, 1.11, 2)` jobs right after #2597 merged. This PR is scoped purely to a test skip to restore CI. It does not change production code. ## Why the test broke on main The test was added in #2597. Its PR CI was green because #2597's head was branched off main *before* #2592 merged — the two changes were merged together without a re-run, so main is the first place they run as a pair. On Airflow 3.0, `dag.test()` runs tasks inline via `ti._run_raw_task` without starting the task SDK supervisor, so `SUPERVISOR_COMMS` is never initialised. `RuntimeTaskInstance.get_task_states` raises `NameError`, and `fetch_state_airflow3` in `cosmos/operators/_watcher/state.py` returns `None`. Under the retry behaviour introduced in #2592, an unknown producer state makes the consumer sensor keep polling instead of falling back to a local dbt run. The sensor re-reads the stale `"error"` status from the producer's first attempt and exhausts retries, so the DAG finishes `failed`. Airflow 3.2 rebuilt `dag.test()` on top of `run_task_in_process`, which sets up an in-process supervisor, so the same code path works there. Airflow 3.1.x is already skipped because of the unrelated `SetRenderedFields` retry crash. ## Follow-up (not in this PR) A more durable fix would be to make `fetch_state_airflow3` fall back to a direct DB query when `SUPERVISOR_COMMS` is unavailable, mirroring the Airflow 2 path. That would let this test run on Airflow 3.0 again. It is out of scope here. ## Test plan - [x] `hatch run tests.py3.11-3.0-1.9:pytest -v -m integration tests/operators/test_watcher.py::test_dbt_task_group_watcher_gateway_prevents_downstream_skip` → SKIPPED locally - [ ] CI green on Airflow 2.10, 2.11, 3.0, 3.1, 3.2 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously, any retry of a watcher consumer sensor (try_number > 1) unconditionally fell back to a non-watcher dbt run. This meant that if the sensor timed out while the producer was still working, the retry would launch a duplicate dbt invocation instead of continuing to wait for the producer's XCom updates. On retry, first check the producer task state. If the producer has reached a terminal state (success, failed, skipped, upstream_failed, removed), fall back to a non-watcher run as before — this covers both producer completion and manual task clears from the UI. Otherwise, keep polling so the sensor can pick up where it left off without spawning a redundant run. closes: astronomer/oss-integrations-private#359 --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> (cherry picked from commit 8a9e1c9)
## Summary - Extend the `skipif` on `test_dbt_task_group_watcher_gateway_prevents_downstream_skip` to also cover Airflow 3.0. - Unblocks main, which started failing in the `Run-Integration-Tests (3.10/3.11/3.12, 3.0, 1.11, 2)` jobs right after #2597 merged. This PR is scoped purely to a test skip to restore CI. It does not change production code. ## Why the test broke on main The test was added in #2597. Its PR CI was green because #2597's head was branched off main *before* #2592 merged — the two changes were merged together without a re-run, so main is the first place they run as a pair. On Airflow 3.0, `dag.test()` runs tasks inline via `ti._run_raw_task` without starting the task SDK supervisor, so `SUPERVISOR_COMMS` is never initialised. `RuntimeTaskInstance.get_task_states` raises `NameError`, and `fetch_state_airflow3` in `cosmos/operators/_watcher/state.py` returns `None`. Under the retry behaviour introduced in #2592, an unknown producer state makes the consumer sensor keep polling instead of falling back to a local dbt run. The sensor re-reads the stale `"error"` status from the producer's first attempt and exhausts retries, so the DAG finishes `failed`. Airflow 3.2 rebuilt `dag.test()` on top of `run_task_in_process`, which sets up an in-process supervisor, so the same code path works there. Airflow 3.1.x is already skipped because of the unrelated `SetRenderedFields` retry crash. ## Follow-up (not in this PR) A more durable fix would be to make `fetch_state_airflow3` fall back to a direct DB query when `SUPERVISOR_COMMS` is unavailable, mirroring the Airflow 2 path. That would let this test run on Airflow 3.0 again. It is out of scope here. ## Test plan - [x] `hatch run tests.py3.11-3.0-1.9:pytest -v -m integration tests/operators/test_watcher.py::test_dbt_task_group_watcher_gateway_prevents_downstream_skip` → SKIPPED locally - [ ] CI green on Airflow 2.10, 2.11, 3.0, 3.1, 3.2 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> (cherry picked from commit 238bc55)
Bug Fixes * Fix ``ExecutionMode.WATCHER`` producer retry behaviour by @tatiana in #2559 * Prevent watcher producer skip propagating to downstream tasks via gateway task by @johnhoran and @tatiana in #2597 * Keep watcher sensor polling when producer is still running by @pankajkoti in #2592 * Fix circular import error in Cosmos plugin discovery under Astro Runtime by @tatiana in #2538 * Fix ``CosmosRichLogger`` crash on ``None`` log message by @tatiana in #2540 * Enable inlets and outlets using dbt Fusion on Airflow 3 by @ichirotakami in #2561 * Fix incorrectly skipped source downstream tasks in ``ExecutionMode.WATCHER`` by @pankajastro in #2563 * Fix duplicate logs in ``dbt build`` when source freshness is enabled by @pankajastro in #2564 * Warn and normalize when ``source_rendering_behavior=None`` is passed by @pankajastro in #2570 * Gracefully handle ``Variable.set()`` failures on Astro Remote Execution by @hkc-8010 in #2573 * Skip malformed YAML selectors instead of failing entirely by @YourRoyalLinus in #2577 Docs * Update watcher test behavior docs for Cosmos 1.14.0 by @tatiana in #2549 * Add redirect for moved partial-parsing docs page by @tatiana in #2550 * Document ``ExecutionMode.WATCHER`` and ``depends_on_past`` limitation by @tatiana in #2602 * Restore memory-optimised imports docs for Cosmos < 1.14.0 by @pankajkoti in #2604 Others * Speed up Airflow 3.1+ integration tests by caching InProcessExecutionAPI by @pankajkoti in #2547 * Improve stability of cache hash unit tests by @tatiana in #2539 * Fix mypy 1.20.0 type check failures by @pankajkoti in #2546 * Fix CI failures caused by docs build memory exhaustion by @pankajkoti in #2580 * Fix dbt Fusion broken integration tests by @tatiana in #2581 * Fix flaky ``cosmos_manifest_selectors_example`` DAG in CI by @pankajkoti in #2593 * Reduce pre-commit autoupdate frequency PRs by @tatiana in #2544 * Bump ``reviewdog/action-actionlint`` from 1.71.0 to 1.72.0 by @dependabot in #2542 * Skip watcher gateway test on Airflow 3.0 by @tatiana in #2607 closes: astronomer/oss-integrations-private#381
Split the dense single table into one per goal so each dimension can be scanned across versions: DAG/dbt status alignment, task-level retry (consumer and producer), automatic retries, full clear, and avoiding duplicate or concurrent transformation runs. Add the missing stable patch versions (1.11.1, 1.11.3, 1.12.1, 1.13.1) so the chronology is complete, mark non-met goals explicitly, and move the 1.14.1 XCom backup mechanism and its known issues (#2619, #2625) under the producer table, where the backup actually runs. Correct the 1.14.0 issue reference to #2554 and reframe the XCom problem as an Airflow limitation (XCom is not preserved across retries), not a Cosmos bug. Reference #2592 for the consumer sensor retry concurrent-run fix that lands in 1.14.1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Split the dense single table into one per goal so each dimension can be scanned across versions: DAG/dbt status alignment, task-level retry (consumer and producer), automatic retries, full clear, and avoiding duplicate or concurrent transformation runs. Add the missing stable patch versions (1.11.1, 1.11.3, 1.12.1, 1.13.1) so the chronology is complete, mark non-met goals explicitly, and move the 1.14.1 XCom backup mechanism and its known issues (#2619, #2625) under the producer table, where the backup actually runs. Correct the 1.14.0 issue reference to #2554 and reframe the XCom problem as an Airflow limitation (XCom is not preserved across retries), not a Cosmos bug. Reference #2592 for the consumer sensor retry concurrent-run fix that lands in 1.14.1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>


Previously, any retry of a watcher consumer sensor (try_number > 1) unconditionally fell back to a non-watcher dbt run. This meant that if the sensor timed out while the producer was still working, the retry would launch a duplicate dbt invocation instead of continuing to wait for the producer's XCom updates.
On retry, first check the producer task state. If the producer has reached a terminal state (success, failed, skipped, upstream_failed, removed), fall back to a non-watcher run as before — this covers both producer completion and manual task clears from the UI. Otherwise, keep polling so the sensor can pick up where it left off without spawning a redundant run.
closes: https://github.com/astronomer/oss-integrations-private/issues/359