Fix watcher fallback selector for versioned dbt models#2659
Conversation
`BaseConsumerSensor._fallback_to_non_watcher_run` derived the dbt
selector with `model_unique_id.split(".")[-1]`, which strips the
version segment for versioned models (e.g. `model.pkg.my_model.v1`
became `v1`). Use `split(".", 2)[2]` to match `DbtNode.resource_name`
in cosmos/dbt/graph.py, the canonical parsing used throughout
Cosmos for dbt unique_ids, so the full resource name including any
version suffix is selected on fallback reruns.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Fixes WATCHER-mode retry fallback selector generation so dbt versioned model unique_ids (e.g. model.pkg.my_model.v1) are converted into a correct --select value that preserves the version suffix, aligning the behavior with Cosmos’s canonical DbtNode.resource_name parsing.
Changes:
- Update
_fallback_to_non_watcher_runto derive the selector viaunique_id.split(".", 2)[2](preserves version suffixes). - Adjust the existing fallback test assertion to match the new parsing behavior.
- Add a regression test ensuring versioned model selectors remain intact (e.g.
stg_orders.v1).
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
cosmos/operators/_watcher/base.py |
Fixes selector parsing in watcher fallback retries to preserve version suffixes. |
tests/operators/test_watcher.py |
Updates/extends tests to validate the corrected selector behavior, including a versioned-model regression case. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2659 +/- ##
=======================================
Coverage 98.01% 98.01%
=======================================
Files 104 104
Lines 7796 7796
=======================================
Hits 7641 7641
Misses 155 155 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| extra_flags = self._filter_flags(raw_flags) | ||
|
|
||
| model_selector = self.model_unique_id.split(".")[-1] | ||
| model_selector = self.model_unique_id.split(".", 2)[2] |
There was a problem hiding this comment.
@pankajkoti, maybe instead of hard-coding index 2, we could retrieve the last index, similar to what we were doing:
| model_selector = self.model_unique_id.split(".", 2)[2] | |
| model_selector = self.model_unique_id.split(".", 2)[-1] |
Do we have any official docs on how dbt forms model_unique_id? It could be worth mentioning what the variations are, that normally there would be two dots, but in some cases there are more, and why it is safe to group the last dots
There was a problem hiding this comment.
On [2] vs [-1]: I'd lean toward keeping [2]. For a well-formed unique_id, they return the same value; the difference is the failure mode. [2] raises IndexError and surfaces malformed input loudly, while [-1] would silently fall back to the package or resource_type and mis-issue the dbt command. It also matches the canonical parse already used in DbtNode.resource_name (cosmos/dbt/graph.py) and in cosmos/operators/watcher.py at lines 340, 546, and 613.
On the docs ask: per the dbt manifest spec, node unique_ids are <resource_type>.<package>.<resource_name>. Both resource_type and package are constrained identifiers that cannot contain dots, so the first two dots are unambiguous separators and everything after the second dot is the full resource name. For versioned models, dbt appends a fourth segment: model.<package>.<resource_name>.<version> (see node_args.py).
split(".", 2)[2] returns the whole remainder (my_model.v1); split(".")[-1] returns just v1, which is the bug this PR fixes as mentioned in the PR description.
I would like to skip adding an inline docstring at this site because the parse is open-coded in multiple places across the codebase that are linked above.
As we agreed, I am following up with a refactor PR that reuses a common method for these sites.
tatiana
left a comment
There was a problem hiding this comment.
Thanks, @pankajkoti , this looks good, minor comment inline. The change doesn't block merging this PR, but I recommend adding a comment explaining the rationale. If you could double-check whether there are any other parts of the code where we're doing this type of handling, that would be great.
The dbt unique_id-to-resource_name parse (split(".", 2)[2]) was
open-coded in several places. PR #2659 added one more such site to fix
a versioned-model bug. Consolidating the parse into a single helper
with a docstring documents the unique_id format in one place and
removes the drift risk between call sites.
Add get_resource_name_from_unique_id in cosmos/dbt/resource.py with a
docstring citing the dbt manifest spec and the versioned-model variant.
Route DbtNode.resource_name through it, then replace the open-coded
splits in cosmos/operators/watcher.py and cosmos/operators/_watcher/
base.py.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
## Summary
- `BaseConsumerSensor._fallback_to_non_watcher_run` used
`model_unique_id.split(".")[-1]` to build the dbt `--select` value,
which strips the version segment for versioned models (e.g.
`model.pkg.my_model.v1` became `v1`).
- Switched to `split(".", 2)[2]`, matching `DbtNode.resource_name` in
`cosmos/dbt/graph.py` — the canonical parsing used throughout Cosmos for
dbt unique_ids (per the [dbt manifest
spec](https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details),
`resource_type` and `package` cannot contain dots, so everything after
the second dot is the full resource name including any version suffix).
- Added a regression test
(`test_fallback_selector_preserves_version_suffix`) covering a versioned
`unique_id`.
Surfaced by Copilot review on #2658.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@pankajkoti added #2658 and #2659 to the Cosmos 1.14.2 milestone after the previous CHANGELOG refresh. Cherry-pick both onto the release branch and add their entries under Bug Fixes: * #2658 -- Fix test sensor retry behaviour in ``ExecutionMode.WATCHER`` * #2659 -- Fix watcher fallback selector for versioned dbt models Both PRs auto-merged cleanly on top of the existing watcher fixes (#2683 XCom-key sanitization, #2684 BOSS-401 rewrite, #2673 helper extraction). 1495 watcher-suite tests pass locally after the picks. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
## CHANGELOG entry 1.14.2 (2026-05-21) ------------------- Behaviour Changes These changes adjust observable behaviour of the ``ExecutionMode.WATCHER`` execution mode. None of them breaks the public Cosmos API, but users relying on undocumented internals (graph wiring assertions, XCom backup Variable names, retry-on-recovery semantics, or retry log format) should review before upgrading. * ``ExecutionMode.WATCHER`` + ``depends_on_past=True``: when the producer task has ``depends_on_past=True`` (typically set via ``default_args``), the producer-done gateway task inside ``DbtTaskGroup`` is now wired downstream of every consumer task, in addition to the producer. This is required so that ``wait_for_downstream`` gating behaves correctly across DAG runs and the task group acts as a single unit that must fully succeed before the next run starts. Users with ``depends_on_past=False`` (the default) see no topology change. See #2615. * ``ExecutionMode.WATCHER`` downstream retry on upstream recovery: dbt models that were skipped after an upstream-failure event are now retried in the same DAG run when the upstream task succeeds on retry. Previously these models remained skipped for the run. See #2684. * ``ExecutionMode.WATCHER`` consumer-retry log format: the consumer's fallback ``dbt`` invocation no longer inherits the producer's internal ``--log-format json`` flag, so retry task logs now default to dbt's normal text format. Users who relied on JSON output in retry logs can opt in via ``operator_args={"dbt_cmd_flags": ["--log-format", "json"]}``. See #2713. * ``ExecutionMode.WATCHER`` XCom-backup Variable key scheme: the per-model XCom backup Variable key now includes the full task-group path and sanitises disallowed characters (``+`` / ``:``) from ``run_id``. External monitoring or cleanup scripts that match the old key pattern will need updating. See #2629 and #2683. Bug Fixes * Sanitize disallowed characters from XCom backup variable key by @MichaelRBlack in #2629 * Prevent watcher producers from colliding on one XCom-backup key by @tatiana in #2683 * Retry watcher downstream models on upstream-failure recovery by @tatiana in #2684 * Fix ``ExecutionMode.WATCHER`` interaction with ``depends_on_past`` by @johnhoran in #2615 * Strip ``--log-format`` from producer flags on watcher consumer retry by @tatiana in #2713 * Fix duplicate ``deferrable`` kwarg in ``DbtRunAirflowAsyncBigqueryOperator`` by @pankajastro in #2616 * Fix dbt docs iframe ``src`` missing deployment path prefix by @pankajastro in #2640 * Defer ``TaskInstance`` import in cluster policy to fix Sentry init crash by @pankajastro in #2662 * Restore type hints broken by lazy imports in ``cosmos/__init__.py`` by @pankajastro in #2647 * Fix ``ExecutionMode.WATCHER`` non-dbt stdout being suppressed from logs by @pankajastro in #2654 * Fix test sensor retry behaviour in ``ExecutionMode.WATCHER`` by @pankajkoti in #2658 * Fix watcher fallback selector for versioned dbt models by @pankajkoti in #2659 * Break out of iframe from Airflow 2 dbt Docs 404 link by @pankajastro in #2685 Docs * Document source freshness aware execution for ``ExecutionMode.WATCHER`` by @pankajastro in #2617 * Add reference docs for ``DbtRunLocalOperator``, ``DbtTestLocalOperator``, ``DbtSnapshotLocalOperator`` and ``DbtBuildLocalOperator`` by @pankajastro in #2643 * Add watcher retry behaviour history documentation by @tatiana in #2600 * Add Apache Airflow® trademark on first prominent mention by @pankajkoti in #2624 * Sentence-case section headings by @pankajkoti in #2630 * Use ``-`` for bullet points by @pankajkoti in #2631 * Drop decorative separator lines by @pankajkoti in #2632 * Normalize heading underlines in ``docs/guides/`` and ``docs/index.rst`` by @pankajkoti in #2664 * Fix broken cross-directory doc links by @pankajastro in #2694 * Fix broken external links in hand-written docs by @pankajastro in #2696 * Document support for Airflow 3.2 in the compatibility policy by @pankajastro in #2652 * Refresh the dbt/Airflow conflicts table to match the compatibility policy by @pankajastro in #2653 * Document incremental model limitation for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajastro in #2642 Others * Import ``ParamValidationError`` from ``airflow.sdk`` to silence deprecation warning by @pankajastro in #2645 * Import ``DAG`` from ``airflow.sdk`` to silence deprecation warning by @pankajastro in #2644 * Enforce docs style guide via pre-commit hook by @pankajkoti and @tatiana in #2633 * Add Airflow 3.2 to the test matrix in ``CLAUDE.md`` by @pankajastro in #2646 * Document the lazy-logging standard in ``CLAUDE.md`` by @pankajastro in #2679 * Extract watcher XCom-key helpers and inline single-use bindings by @pankajastro in #2673 * Remove leftover ``scripts/airflow3`` directory by @pankajastro in #2661 * Fix ``altered_jaffle_shop`` seed-dep CTE references by @pankajastro in #2690 * Skip Airflow 3.0 integration test stuck on ``example_watcher_with_freshness`` by @pankajastro in #2692 * Fix typo "constrantis" → "constraints" in tests env comment by @pankajastro in #2669 ## Summary Drafts the **Cosmos 1.14.2** release. Latest alpha cut is **1.14.2a4** — refreshed from `1.14.2a3` after maintainers (@pankajastro, @pankajkoti) added 11 more PRs to the milestone: - @pankajastro: #2646, #2652, #2653, #2661, #2669, #2673, #2679, #2690, #2692 - @pankajkoti: #2658, #2659 All 11 picks applied cleanly on top of the existing branch — no additional manual conflict resolution needed. **Excluded:** #2618 ("Improve glossary") — modifies `docs/reference/glossary.rst`, which doesn't exist on `release-1.14` (added on main by #2461, never backported). Deferred to 1.15.0. 33 PRs cherry-picked total (22 in the initial a3 cut + 11 in this a4 refresh); two PRs (#2575, #2618) deliberately held back as 1.15.0 content. ## Milestone [Cosmos 1.14.2](https://github.com/astronomer/astronomer-cosmos/milestone/48) — 33 merged PRs across Bug Fixes, Docs, and Others. ## Inclusion provenance | Path | PRs | Notes | |---|---|---| | **Originally in milestone — a3 cut (12)** | #2629, #2616, #2640, #2662, #2654, #2683, #2684, #2615, #2694, #2696, #2645, #2644 | Assigned by maintainers before the initial release-draft run | | **Pulled in via closed-issue link — a3 cut (1)** | #2647 | Closes milestone issue #2634 ("Typehinting broken with lazy imports") but the PR itself was never assigned to the milestone — included via `closedByPullRequestsReferences` | | **Added during cherry-pick conflict resolution — a3 cut (9)** | #2631, #2624, #2630, #2632, #2664, #2633, #2617, #2600, #2643 | Docs PRs whose absence caused `release-1.14` ↔ `main` textual drift. #2631 caused #2696's conflict; the rest were transitive dependencies (especially #2664, on top of the bullet/heading/trademark sweeps). #2643 was added to unblock #2664's operator-docs conflict | | **Added to milestone after a3 — included in a4 (11)** | #2646, #2652, #2653, #2658, #2659, #2661, #2669, #2673, #2679, #2690, #2692 | Added by @pankajastro and @pankajkoti after the initial draft. All applied cleanly on top of the a3 cherry-picks | | **Deliberately excluded (2)** | #2575, #2618 | #2575: documents `DbtDocsS3KubernetesOperator` with `.. versionadded:: 1.15.0` (already in the `Cosmos 1.15.0` milestone). #2618: improves a glossary file that doesn't exist on `release-1.14` (the stub was added by #2461, not backported) | ### Manual conflict resolution Cherry-picks that needed manual fix-up. Reviewers should double-check the files listed below: | PR | File(s) | Resolution | |---|---|---| | **#2664** | `docs/guides/dbt_docs/generating-docs.rst` | **Substantive exclusion** — manually removed the entire `Upload to S3 from Kubernetes` section (lines ~46–77 of the incoming diff) that documents `DbtDocsS3KubernetesOperator` (1.15.0 feature, PR #2575). Kept HEAD (no S3-from-Kubernetes section). | | **#2664** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | Took incoming side for "Example 1" / "Example 2" heading underlines (`++++` style — matches #2664's normalization across the rest of the file). Also took incoming for em-dash → colon ("Example 1 —" → "Example 1:"). | | **#2664** | `docs/guides/run_dbt/operators/operators.rst` | Auto-resolved once #2643 was cherry-picked first (added missing Run/Test/Snapshot/Build operator reference docs that #2664 expected to be present). | | **#2633** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | Initially took `'''` (Example 1/2 underlines) per #2633's incoming side — this broke the file's heading hierarchy and prevented sphinx from registering the `_watcher-source-freshness:` label, causing the docs build to fail. **Fixed in a follow-up commit** by reverting to `+++` to match main and the rest of the file's level-3 sections. | | **#2617** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | **Substantive trim** — #2617 documented both the 1.14.0 source-freshness execution path AND the 1.15.0 `freshness_callback` override (which ships with #2586, not in this release line). Removed the `literalinclude` of `dev/dags/watcher_with_freshness_check.py` (1.15.0 example DAG, missing on release-1.14) and the surrounding override section so the 1.14.2 docs cover only what the 1.14.x line supports. Surfaced as a `-W` (warnings-as-errors) docs build failure on the first CI run. | | **#2684** | `cosmos/operators/_watcher/state.py` | Took incoming side — added two new frozensets (`DBT_UPSTREAM_FAILURE_SKIP_EVENT_NAMES`, `DBT_SOURCE_FRESHNESS_STALE_STATUSES`) at lines 28–37. HEAD had nothing; incoming had the additions. | | **#2615** | `tests/airflow/test_graph.py` | Took incoming side — added two new tests at the end of the file: `test_add_watcher_producer_task_passes_freshness_callback_via_setup_operator_args` and `test_watcher_dependency_wiring`. HEAD had nothing; incoming had the additions. | All a4 cherry-picks (11) applied without manual intervention. ## Test plan > Long-term goal: automate. For now, please pick the slice relevant to your environment and report deviations as a comment on this PR. ### Watcher mode (Postgres) - [x] @pankajkoti `example_watcher` (`dev/dags/example_watcher.py`) — default watcher run; exercises **#2629** (XCom backup key sanitization triggers on the `+` in any default Airflow run_id) - [x] (@tatiana) **[NEW]** `example_watcher_xcom_collision` (`dev/failed_dags/example_watcher_xcom_collision.py`) — validates **#2683** - [x] @tatiana **[NEW]** `example_watcher_recovers_skipped_downstream` (`dev/failed_dags/example_watcher_recovers_skipped_downstream.py`) — validates **#2684** - [x] @tatiana `example_watcher` with `default_args={"depends_on_past": True}` and ≥2 consecutive runs — validates **#2615** (manual edit; no dedicated example DAG) - [x] @pankajkoti watcher DAG with at least one model that has tests + Airflow retries enabled on the test sensor — validates **#2658** (test sensor retry path) - [x] @pankajkoti watcher DAG referencing a versioned dbt model (e.g. `models/foo_v2.sql`) and triggering the fallback selector path — validates **#2659** ### BigQuery (async) - [x] @tatiana `simple_dag_async` (`dev/dags/simple_dag_async.py`) — validates **#2616** (duplicate `deferrable` kwarg fix) ### dbt docs plugin - [x] (@pankajkoti) `docs_dag` (`dev/dags/dbt_docs.py`) + open the Cosmos dbt docs URL in the Airflow UI under a non-root deployment path — validates **#2640** (iframe `src` deployment prefix) ### Cross-cutting scenarios (no dedicated DAG) - [x] @pankajastro **#2662** — boot Airflow with Cosmos cluster policy + Sentry init; verify no `TaskInstance`-import crash on startup - [x] @pankajastro **#2654** — run a watcher DAG that prints non-dbt stdout (e.g., Snowflake `externalbrowser` auth URL); confirm output reaches task logs - [x] (@pankajkoti) **#2647** — `mypy` / IDE inspection of `from cosmos import DbtDag, ProjectConfig, ProfileConfig, RenderConfig, ExecutionConfig` resolves attributes - [x] @pankajastro **#2645, #2644** — boot Airflow 3 with `airflow.sdk` available; no `ParamValidationError` / `DAG` deprecation warnings from Cosmos imports - [x] @pankajastro **#2673** — verify watcher XCom-key extraction left no behavioural drift (run the existing watcher integration suite end-to-end on Postgres + Airflow 2.10) ### Docs build (covers all docs PRs) - [x] (@pankajkoti) `sphinx-build -W -b html docs docs/_build` succeeds with no warnings (#2617, #2624, #2630, #2631, #2632, #2643, #2600, #2664, #2694, #2696, #2652, #2653) ### Tooling - [x] (@pankajkoti) `pre-commit run check-docs-style --all-files` passes (#2633) ## Reviewer checklist - [x] CHANGELOG section assignments reviewed - [x] Entry wording reviewed - [x] `cosmos/__init__.py` bumped to `1.14.2a4` - [x] Cosmetic docs PRs (#2624, #2630, #2631, #2632, #2633, #2664) confirmed acceptable in patch line - [x] **Manual conflict resolutions** (table above) reviewed file-by-file - [x] Test plan executed on at least Postgres + one warehouse - [x] Ready to mark non-draft --------- Co-authored-by: Michael Black <4128408+MichaelRBlack@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com> Co-authored-by: John Horan <jhoran@zendesk.com> Co-authored-by: Copilot <copilot@github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
|
🚀 Released in Cosmos 1.14.2 (PyPI). |
The dbt unique_id-to-resource_name parse (split(".", 2)[2]) was
open-coded in several places. PR #2659 added one more such site to fix
a versioned-model bug. Consolidating the parse into a single helper
with a docstring documents the unique_id format in one place and
removes the drift risk between call sites.
Add get_resource_name_from_unique_id in cosmos/dbt/resource.py with a
docstring citing the dbt manifest spec and the versioned-model variant.
Route DbtNode.resource_name through it, then replace the open-coded
splits in cosmos/operators/watcher.py and cosmos/operators/_watcher/
base.py.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The dbt unique_id-to-resource_name parse (split(".", 2)[2]) was
open-coded in several places. PR #2659 added one more such site to fix
a versioned-model bug. Consolidating the parse into a single helper
with a docstring documents the unique_id format in one place and
removes the drift risk between call sites.
Add get_resource_name_from_unique_id in cosmos/dbt/resource.py with a
docstring citing the dbt manifest spec and the versioned-model variant.
Route DbtNode.resource_name through it, then replace the open-coded
splits in cosmos/operators/watcher.py and cosmos/operators/_watcher/
base.py.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
BaseConsumerSensor._fallback_to_non_watcher_runusedmodel_unique_id.split(".")[-1]to build the dbt--selectvalue, which strips the version segment for versioned models (e.g.model.pkg.my_model.v1becamev1).split(".", 2)[2], matchingDbtNode.resource_nameincosmos/dbt/graph.py— the canonical parsing used throughout Cosmos for dbt unique_ids (per the dbt manifest spec,resource_typeandpackagecannot contain dots, so everything after the second dot is the full resource name including any version suffix).test_fallback_selector_preserves_version_suffix) covering a versionedunique_id.Surfaced by Copilot review on #2658.
🤖 Generated with Claude Code