Fix test sensor retry in ExecutionMode.WATCHER#2658
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes Airflow-level retries/manual clears for DbtTestWatcherOperator when running in ExecutionMode.WATCHER by removing the previous “tests cannot be retried” guard and adding a concrete non-watcher fallback that re-runs tests locally for the watched model.
Changes:
- Removes the
AirflowExceptionguard that prevented retries for test watcher sensors in the generic watcher fallback path. - Implements a
DbtTestWatcherOperatorfallback that runsdbt test --select <model>locally (and setsbase_cmd = ["test"]to support this). - Updates/extends watcher operator tests to assert the new retry/fallback behavior.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
cosmos/operators/_watcher/base.py |
Removes the test-sensor retry guard and updates fallback docstring to allow subclass-specific commands. |
cosmos/operators/watcher.py |
Adds dbt test fallback behavior for DbtTestWatcherOperator and documents retry semantics. |
tests/operators/test_watcher.py |
Updates tests to expect local dbt test fallback on retry and ensures producer flags aren’t forwarded. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2658 +/- ##
=======================================
Coverage 98.01% 98.01%
=======================================
Files 104 104
Lines 7797 7803 +6
=======================================
+ Hits 7642 7648 +6
Misses 155 155 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| if try_number > 1: | ||
| logger.info( | ||
| "Retry attempt #%s – Running tests for model '%s' from project '%s'", | ||
| try_number - 1, | ||
| self.model_unique_id, | ||
| self.project_dir, | ||
| ) | ||
| else: | ||
| logger.info( | ||
| "Falling back to running tests for model '%s' from project '%s'", | ||
| self.model_unique_id, | ||
| self.project_dir, | ||
| ) |
There was a problem hiding this comment.
Why is it relevant to log a different log message if try_number > 1? Could we have a single log message that includes the try-number and handles the rendering of this information by itself, without the need for an if?
There was a problem hiding this comment.
This logging is consistent with what we have in the BaseConsumerSensor's _fallback_to_non_watcher_run method https://github.com/astronomer/astronomer-cosmos/blame/c23411b2b8a198849e26daf7abf7d338f1dc5733/cosmos/operators/_watcher/base.py#L429 added via PR #2559.
There was a problem hiding this comment.
But I think it may be worth having just a single log, so I will make this change here. Not sure why we had that in the base sensor, maybe it's better that we make it consistent there too to have a single log line.
tatiana
left a comment
There was a problem hiding this comment.
Hi @pankajkoti , thanks for fixing this behaviour. Retries are the most complicated aspect of the watcher mode, and this helps stabilise it. I left some inline comments. Please address them before merging this PR.
Previously, clearing a DbtTestWatcherOperator task from the Airflow UI
(or hitting an Airflow-level retry) raised:
Test re-execution is not yet supported in watcher mode.
Tests for model '<model_unique_id>' cannot be retried.
Model/seed/snapshot consumer sensors already fall back to running dbt
locally for their node on retry; tests had no such path.
Remove the AirflowException guard in BaseConsumerSensor and add a
test-sensor-specific fallback: DbtTestWatcherOperator now sets
base_cmd = ["test"] and overrides _fallback_to_non_watcher_run to
issue `dbt test --select <model>` for the watched model.
This addresses part 1 of #2598. The producer-internal test-retry race
(part 2) is unchanged.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Use `split(".", 2)[2]` to derive the dbt selector so versioned model
unique_ids (e.g. `model.pkg.my_model.v1`) select the full
`my_model.v1` resource instead of just `v1`. Matches the convention
already used by DbtSourceWatcherOperator.
- Drop the "on retry" wording from the success log line; the fallback
also runs on the first attempt when the producer is skipped or has
failed.
- Mirror the parsing change in the existing assertions and add a test
covering a versioned unique_id to lock in the behaviour.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The fallback can fire on the first attempt (try_number == 1) when the producer was skipped or failed; only the retry branch was exercised by the existing tests. Add a test asserting the "Falling back" log is emitted on the first attempt so patch coverage reaches the new branch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Collapse the if/else log in DbtTestWatcherOperator._fallback_to_non_watcher_run into a single log line that includes try_number. - Document why base_cmd is hardcoded instead of inheriting from DbtTestMixin (the mixin's __init__ forwards select/exclude/selector kwargs that the sensor's MRO via BaseSensorOperator rejects). - Drive the fallback tests through sensor.poke(context) instead of calling _fallback_to_non_watcher_run directly, and add a mock with side_effect to explicitly assert the fallback is invoked during retries. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
3fb629f to
f6198ea
Compare
|
Going ahead with the merge here @tatiana . I have addressed the comments. Please let me know if you have further comments, and happy to address those in a follow-up PR. |
## Summary
- `BaseConsumerSensor._fallback_to_non_watcher_run` used
`model_unique_id.split(".")[-1]` to build the dbt `--select` value,
which strips the version segment for versioned models (e.g.
`model.pkg.my_model.v1` became `v1`).
- Switched to `split(".", 2)[2]`, matching `DbtNode.resource_name` in
`cosmos/dbt/graph.py` — the canonical parsing used throughout Cosmos for
dbt unique_ids (per the [dbt manifest
spec](https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details),
`resource_type` and `package` cannot contain dots, so everything after
the second dot is the full resource name including any version suffix).
- Added a regression test
(`test_fallback_selector_preserves_version_suffix`) covering a versioned
`unique_id`.
Surfaced by Copilot review on #2658.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
## Summary - Removes the `AirflowException` guard in `BaseConsumerSensor._fallback_to_non_watcher_run` that blocked manual clears and Airflow-level retries on `DbtTestWatcherOperator`. - Adds a test-sensor fallback that runs `dbt test --select <model>` locally for the watched model: `DbtTestWatcherOperator` now sets `base_cmd = ["test"]` and overrides `_fallback_to_non_watcher_run`. Addresses part 1 of #2598. The producer-internal test-retry race condition (part 2) is unchanged and remains tracked there. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
## Summary
- `BaseConsumerSensor._fallback_to_non_watcher_run` used
`model_unique_id.split(".")[-1]` to build the dbt `--select` value,
which strips the version segment for versioned models (e.g.
`model.pkg.my_model.v1` became `v1`).
- Switched to `split(".", 2)[2]`, matching `DbtNode.resource_name` in
`cosmos/dbt/graph.py` — the canonical parsing used throughout Cosmos for
dbt unique_ids (per the [dbt manifest
spec](https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details),
`resource_type` and `package` cannot contain dots, so everything after
the second dot is the full resource name including any version suffix).
- Added a regression test
(`test_fallback_selector_preserves_version_suffix`) covering a versioned
`unique_id`.
Surfaced by Copilot review on #2658.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@pankajkoti added #2658 and #2659 to the Cosmos 1.14.2 milestone after the previous CHANGELOG refresh. Cherry-pick both onto the release branch and add their entries under Bug Fixes: * #2658 -- Fix test sensor retry behaviour in ``ExecutionMode.WATCHER`` * #2659 -- Fix watcher fallback selector for versioned dbt models Both PRs auto-merged cleanly on top of the existing watcher fixes (#2683 XCom-key sanitization, #2684 BOSS-401 rewrite, #2673 helper extraction). 1495 watcher-suite tests pass locally after the picks. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
## CHANGELOG entry 1.14.2 (2026-05-21) ------------------- Behaviour Changes These changes adjust observable behaviour of the ``ExecutionMode.WATCHER`` execution mode. None of them breaks the public Cosmos API, but users relying on undocumented internals (graph wiring assertions, XCom backup Variable names, retry-on-recovery semantics, or retry log format) should review before upgrading. * ``ExecutionMode.WATCHER`` + ``depends_on_past=True``: when the producer task has ``depends_on_past=True`` (typically set via ``default_args``), the producer-done gateway task inside ``DbtTaskGroup`` is now wired downstream of every consumer task, in addition to the producer. This is required so that ``wait_for_downstream`` gating behaves correctly across DAG runs and the task group acts as a single unit that must fully succeed before the next run starts. Users with ``depends_on_past=False`` (the default) see no topology change. See #2615. * ``ExecutionMode.WATCHER`` downstream retry on upstream recovery: dbt models that were skipped after an upstream-failure event are now retried in the same DAG run when the upstream task succeeds on retry. Previously these models remained skipped for the run. See #2684. * ``ExecutionMode.WATCHER`` consumer-retry log format: the consumer's fallback ``dbt`` invocation no longer inherits the producer's internal ``--log-format json`` flag, so retry task logs now default to dbt's normal text format. Users who relied on JSON output in retry logs can opt in via ``operator_args={"dbt_cmd_flags": ["--log-format", "json"]}``. See #2713. * ``ExecutionMode.WATCHER`` XCom-backup Variable key scheme: the per-model XCom backup Variable key now includes the full task-group path and sanitises disallowed characters (``+`` / ``:``) from ``run_id``. External monitoring or cleanup scripts that match the old key pattern will need updating. See #2629 and #2683. Bug Fixes * Sanitize disallowed characters from XCom backup variable key by @MichaelRBlack in #2629 * Prevent watcher producers from colliding on one XCom-backup key by @tatiana in #2683 * Retry watcher downstream models on upstream-failure recovery by @tatiana in #2684 * Fix ``ExecutionMode.WATCHER`` interaction with ``depends_on_past`` by @johnhoran in #2615 * Strip ``--log-format`` from producer flags on watcher consumer retry by @tatiana in #2713 * Fix duplicate ``deferrable`` kwarg in ``DbtRunAirflowAsyncBigqueryOperator`` by @pankajastro in #2616 * Fix dbt docs iframe ``src`` missing deployment path prefix by @pankajastro in #2640 * Defer ``TaskInstance`` import in cluster policy to fix Sentry init crash by @pankajastro in #2662 * Restore type hints broken by lazy imports in ``cosmos/__init__.py`` by @pankajastro in #2647 * Fix ``ExecutionMode.WATCHER`` non-dbt stdout being suppressed from logs by @pankajastro in #2654 * Fix test sensor retry behaviour in ``ExecutionMode.WATCHER`` by @pankajkoti in #2658 * Fix watcher fallback selector for versioned dbt models by @pankajkoti in #2659 * Break out of iframe from Airflow 2 dbt Docs 404 link by @pankajastro in #2685 Docs * Document source freshness aware execution for ``ExecutionMode.WATCHER`` by @pankajastro in #2617 * Add reference docs for ``DbtRunLocalOperator``, ``DbtTestLocalOperator``, ``DbtSnapshotLocalOperator`` and ``DbtBuildLocalOperator`` by @pankajastro in #2643 * Add watcher retry behaviour history documentation by @tatiana in #2600 * Add Apache Airflow® trademark on first prominent mention by @pankajkoti in #2624 * Sentence-case section headings by @pankajkoti in #2630 * Use ``-`` for bullet points by @pankajkoti in #2631 * Drop decorative separator lines by @pankajkoti in #2632 * Normalize heading underlines in ``docs/guides/`` and ``docs/index.rst`` by @pankajkoti in #2664 * Fix broken cross-directory doc links by @pankajastro in #2694 * Fix broken external links in hand-written docs by @pankajastro in #2696 * Document support for Airflow 3.2 in the compatibility policy by @pankajastro in #2652 * Refresh the dbt/Airflow conflicts table to match the compatibility policy by @pankajastro in #2653 * Document incremental model limitation for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajastro in #2642 Others * Import ``ParamValidationError`` from ``airflow.sdk`` to silence deprecation warning by @pankajastro in #2645 * Import ``DAG`` from ``airflow.sdk`` to silence deprecation warning by @pankajastro in #2644 * Enforce docs style guide via pre-commit hook by @pankajkoti and @tatiana in #2633 * Add Airflow 3.2 to the test matrix in ``CLAUDE.md`` by @pankajastro in #2646 * Document the lazy-logging standard in ``CLAUDE.md`` by @pankajastro in #2679 * Extract watcher XCom-key helpers and inline single-use bindings by @pankajastro in #2673 * Remove leftover ``scripts/airflow3`` directory by @pankajastro in #2661 * Fix ``altered_jaffle_shop`` seed-dep CTE references by @pankajastro in #2690 * Skip Airflow 3.0 integration test stuck on ``example_watcher_with_freshness`` by @pankajastro in #2692 * Fix typo "constrantis" → "constraints" in tests env comment by @pankajastro in #2669 ## Summary Drafts the **Cosmos 1.14.2** release. Latest alpha cut is **1.14.2a4** — refreshed from `1.14.2a3` after maintainers (@pankajastro, @pankajkoti) added 11 more PRs to the milestone: - @pankajastro: #2646, #2652, #2653, #2661, #2669, #2673, #2679, #2690, #2692 - @pankajkoti: #2658, #2659 All 11 picks applied cleanly on top of the existing branch — no additional manual conflict resolution needed. **Excluded:** #2618 ("Improve glossary") — modifies `docs/reference/glossary.rst`, which doesn't exist on `release-1.14` (added on main by #2461, never backported). Deferred to 1.15.0. 33 PRs cherry-picked total (22 in the initial a3 cut + 11 in this a4 refresh); two PRs (#2575, #2618) deliberately held back as 1.15.0 content. ## Milestone [Cosmos 1.14.2](https://github.com/astronomer/astronomer-cosmos/milestone/48) — 33 merged PRs across Bug Fixes, Docs, and Others. ## Inclusion provenance | Path | PRs | Notes | |---|---|---| | **Originally in milestone — a3 cut (12)** | #2629, #2616, #2640, #2662, #2654, #2683, #2684, #2615, #2694, #2696, #2645, #2644 | Assigned by maintainers before the initial release-draft run | | **Pulled in via closed-issue link — a3 cut (1)** | #2647 | Closes milestone issue #2634 ("Typehinting broken with lazy imports") but the PR itself was never assigned to the milestone — included via `closedByPullRequestsReferences` | | **Added during cherry-pick conflict resolution — a3 cut (9)** | #2631, #2624, #2630, #2632, #2664, #2633, #2617, #2600, #2643 | Docs PRs whose absence caused `release-1.14` ↔ `main` textual drift. #2631 caused #2696's conflict; the rest were transitive dependencies (especially #2664, on top of the bullet/heading/trademark sweeps). #2643 was added to unblock #2664's operator-docs conflict | | **Added to milestone after a3 — included in a4 (11)** | #2646, #2652, #2653, #2658, #2659, #2661, #2669, #2673, #2679, #2690, #2692 | Added by @pankajastro and @pankajkoti after the initial draft. All applied cleanly on top of the a3 cherry-picks | | **Deliberately excluded (2)** | #2575, #2618 | #2575: documents `DbtDocsS3KubernetesOperator` with `.. versionadded:: 1.15.0` (already in the `Cosmos 1.15.0` milestone). #2618: improves a glossary file that doesn't exist on `release-1.14` (the stub was added by #2461, not backported) | ### Manual conflict resolution Cherry-picks that needed manual fix-up. Reviewers should double-check the files listed below: | PR | File(s) | Resolution | |---|---|---| | **#2664** | `docs/guides/dbt_docs/generating-docs.rst` | **Substantive exclusion** — manually removed the entire `Upload to S3 from Kubernetes` section (lines ~46–77 of the incoming diff) that documents `DbtDocsS3KubernetesOperator` (1.15.0 feature, PR #2575). Kept HEAD (no S3-from-Kubernetes section). | | **#2664** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | Took incoming side for "Example 1" / "Example 2" heading underlines (`++++` style — matches #2664's normalization across the rest of the file). Also took incoming for em-dash → colon ("Example 1 —" → "Example 1:"). | | **#2664** | `docs/guides/run_dbt/operators/operators.rst` | Auto-resolved once #2643 was cherry-picked first (added missing Run/Test/Snapshot/Build operator reference docs that #2664 expected to be present). | | **#2633** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | Initially took `'''` (Example 1/2 underlines) per #2633's incoming side — this broke the file's heading hierarchy and prevented sphinx from registering the `_watcher-source-freshness:` label, causing the docs build to fail. **Fixed in a follow-up commit** by reverting to `+++` to match main and the rest of the file's level-3 sections. | | **#2617** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | **Substantive trim** — #2617 documented both the 1.14.0 source-freshness execution path AND the 1.15.0 `freshness_callback` override (which ships with #2586, not in this release line). Removed the `literalinclude` of `dev/dags/watcher_with_freshness_check.py` (1.15.0 example DAG, missing on release-1.14) and the surrounding override section so the 1.14.2 docs cover only what the 1.14.x line supports. Surfaced as a `-W` (warnings-as-errors) docs build failure on the first CI run. | | **#2684** | `cosmos/operators/_watcher/state.py` | Took incoming side — added two new frozensets (`DBT_UPSTREAM_FAILURE_SKIP_EVENT_NAMES`, `DBT_SOURCE_FRESHNESS_STALE_STATUSES`) at lines 28–37. HEAD had nothing; incoming had the additions. | | **#2615** | `tests/airflow/test_graph.py` | Took incoming side — added two new tests at the end of the file: `test_add_watcher_producer_task_passes_freshness_callback_via_setup_operator_args` and `test_watcher_dependency_wiring`. HEAD had nothing; incoming had the additions. | All a4 cherry-picks (11) applied without manual intervention. ## Test plan > Long-term goal: automate. For now, please pick the slice relevant to your environment and report deviations as a comment on this PR. ### Watcher mode (Postgres) - [x] @pankajkoti `example_watcher` (`dev/dags/example_watcher.py`) — default watcher run; exercises **#2629** (XCom backup key sanitization triggers on the `+` in any default Airflow run_id) - [x] (@tatiana) **[NEW]** `example_watcher_xcom_collision` (`dev/failed_dags/example_watcher_xcom_collision.py`) — validates **#2683** - [x] @tatiana **[NEW]** `example_watcher_recovers_skipped_downstream` (`dev/failed_dags/example_watcher_recovers_skipped_downstream.py`) — validates **#2684** - [x] @tatiana `example_watcher` with `default_args={"depends_on_past": True}` and ≥2 consecutive runs — validates **#2615** (manual edit; no dedicated example DAG) - [x] @pankajkoti watcher DAG with at least one model that has tests + Airflow retries enabled on the test sensor — validates **#2658** (test sensor retry path) - [x] @pankajkoti watcher DAG referencing a versioned dbt model (e.g. `models/foo_v2.sql`) and triggering the fallback selector path — validates **#2659** ### BigQuery (async) - [x] @tatiana `simple_dag_async` (`dev/dags/simple_dag_async.py`) — validates **#2616** (duplicate `deferrable` kwarg fix) ### dbt docs plugin - [x] (@pankajkoti) `docs_dag` (`dev/dags/dbt_docs.py`) + open the Cosmos dbt docs URL in the Airflow UI under a non-root deployment path — validates **#2640** (iframe `src` deployment prefix) ### Cross-cutting scenarios (no dedicated DAG) - [x] @pankajastro **#2662** — boot Airflow with Cosmos cluster policy + Sentry init; verify no `TaskInstance`-import crash on startup - [x] @pankajastro **#2654** — run a watcher DAG that prints non-dbt stdout (e.g., Snowflake `externalbrowser` auth URL); confirm output reaches task logs - [x] (@pankajkoti) **#2647** — `mypy` / IDE inspection of `from cosmos import DbtDag, ProjectConfig, ProfileConfig, RenderConfig, ExecutionConfig` resolves attributes - [x] @pankajastro **#2645, #2644** — boot Airflow 3 with `airflow.sdk` available; no `ParamValidationError` / `DAG` deprecation warnings from Cosmos imports - [x] @pankajastro **#2673** — verify watcher XCom-key extraction left no behavioural drift (run the existing watcher integration suite end-to-end on Postgres + Airflow 2.10) ### Docs build (covers all docs PRs) - [x] (@pankajkoti) `sphinx-build -W -b html docs docs/_build` succeeds with no warnings (#2617, #2624, #2630, #2631, #2632, #2643, #2600, #2664, #2694, #2696, #2652, #2653) ### Tooling - [x] (@pankajkoti) `pre-commit run check-docs-style --all-files` passes (#2633) ## Reviewer checklist - [x] CHANGELOG section assignments reviewed - [x] Entry wording reviewed - [x] `cosmos/__init__.py` bumped to `1.14.2a4` - [x] Cosmetic docs PRs (#2624, #2630, #2631, #2632, #2633, #2664) confirmed acceptable in patch line - [x] **Manual conflict resolutions** (table above) reviewed file-by-file - [x] Test plan executed on at least Postgres + one warehouse - [x] Ready to mark non-draft --------- Co-authored-by: Michael Black <4128408+MichaelRBlack@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com> Co-authored-by: John Horan <jhoran@zendesk.com> Co-authored-by: Copilot <copilot@github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
|
🚀 Released in Cosmos 1.14.2 (PyPI). |

Summary
AirflowExceptionguard inBaseConsumerSensor._fallback_to_non_watcher_runthat blocked manual clears and Airflow-level retries onDbtTestWatcherOperator.dbt test --select <model>locally for the watched model:DbtTestWatcherOperatornow setsbase_cmd = ["test"]and overrides_fallback_to_non_watcher_run.Addresses part 1 of #2598. The producer-internal test-retry race condition (part 2) is unchanged and remains tracked there.
🤖 Generated with Claude Code