Skip to content

Extract watcher XCom-key helpers and inline single-use bindings#2673

Merged
pankajastro merged 2 commits into
mainfrom
unify-watcher-xcom-keys
May 14, 2026
Merged

Extract watcher XCom-key helpers and inline single-use bindings#2673
pankajastro merged 2 commits into
mainfrom
unify-watcher-xcom-keys

Conversation

@pankajastro
Copy link
Copy Markdown
Contributor

The watcher operators sanitised dbt unique_ids into XCom keys with the same f"{uid.replace('.', '__')}_" pattern repeated 12 times across base.py and triggerer.py for three suffixes (_status, _dbt_event, _compiled_sql)

Add three slim helpers in cosmos/operators/_watcher/state.py: get_status_xcom_key, get_dbt_event_xcom_key, get_compiled_sql_xcom_key. Each is a single-expression function so the ".replace('.', '__')" sanitisation rule lives in one place.

Replace all 12 inline call sites and, while there, inline the single-use xcom_key/status_key/compiled_sql_key bindings that introduced a redundant variable just to immediately pass it as the next argument.

No behaviour change.

Copilot AI review requested due to automatic review settings May 13, 2026 16:08
@pankajastro pankajastro requested review from a team, corsettigyg, dwreeves and jbandoro as code owners May 13, 2026 16:08
@pankajastro pankajastro requested review from pankajkoti and tatiana May 13, 2026 16:08
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors watcher consumer/sensor and trigger code to centralize dbt unique_id → XCom-key sanitization (dot-to-__) into dedicated helpers, removing repeated inline formatting and redundant single-use local variables.

Changes:

  • Added get_status_xcom_key, get_dbt_event_xcom_key, and get_compiled_sql_xcom_key helpers in cosmos/operators/_watcher/state.py.
  • Replaced inline f"{uid.replace('.', '__')}_<suffix>" constructions with the new helpers across watcher base and triggerer code paths.
  • Inlined several single-use *_key local variables at call sites for simpler control flow.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
cosmos/operators/_watcher/triggerer.py Uses new XCom-key helpers for status/event/compiled_sql pulls in the async trigger polling loop.
cosmos/operators/_watcher/state.py Introduces the new XCom-key helper functions to centralize the sanitization + suffix rules.
cosmos/operators/_watcher/base.py Uses new XCom-key helpers for XCom push/pull paths in the consumer sensor and log parsing.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_watcher/state.py
Comment thread cosmos/operators/_watcher/state.py
@codecov
Copy link
Copy Markdown

codecov Bot commented May 13, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 98.03%. Comparing base (7b880c6) to head (ac177c3).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2673      +/-   ##
==========================================
+ Coverage   98.02%   98.03%   +0.01%     
==========================================
  Files         105      105              
  Lines        7846     7843       -3     
==========================================
- Hits         7691     7689       -2     
+ Misses        155      154       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent refactoring and cleanup, thanks a lot, @pankajastro! I agree with Copilot that it would be great to have unittests for these new methods.

pankajastro added a commit that referenced this pull request May 14, 2026
Address Copilot review feedback on PR #2673: document the new
get_status_xcom_key, get_dbt_event_xcom_key, and
get_compiled_sql_xcom_key helpers to match the surrounding
module style, and add parametrized unit tests that pin the
dot-to-"__" sanitisation rule and each helper's suffix to guard
against accidental key-format drift.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
pankajastro and others added 2 commits May 14, 2026 14:45
The watcher operators sanitised dbt unique_ids into XCom keys with
the same f"{uid.replace('.', '__')}_<suffix>" pattern repeated 12
times across base.py and triggerer.py for three suffixes (_status,
_dbt_event, _compiled_sql) — parallel to the existing
get_tests_status_xcom_key in aggregation.py for _tests_status.

Add three slim helpers in cosmos/operators/_watcher/state.py:
get_status_xcom_key, get_dbt_event_xcom_key, get_compiled_sql_xcom_key.
Each is a single-expression function so the ".replace('.', '__')"
sanitisation rule lives in one place.

Replace all 12 inline call sites and, while there, inline the
single-use xcom_key/status_key/compiled_sql_key bindings that
introduced a redundant variable just to immediately pass it as the
next argument.

No behaviour change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address Copilot review feedback on PR #2673: document the new
get_status_xcom_key, get_dbt_event_xcom_key, and
get_compiled_sql_xcom_key helpers to match the surrounding
module style, and add parametrized unit tests that pin the
dot-to-"__" sanitisation rule and each helper's suffix to guard
against accidental key-format drift.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 14, 2026 09:15
@pankajastro pankajastro force-pushed the unify-watcher-xcom-keys branch from a245e0d to ac177c3 Compare May 14, 2026 09:15
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated no new comments.

@pankajastro pankajastro merged commit 4867faf into main May 14, 2026
129 checks passed
@pankajastro pankajastro deleted the unify-watcher-xcom-keys branch May 14, 2026 09:36
@pankajastro pankajastro self-assigned this May 20, 2026
@pankajastro pankajastro added this to the Cosmos 1.14.2 milestone May 20, 2026
tatiana pushed a commit that referenced this pull request May 20, 2026
The watcher operators sanitised dbt unique_ids into XCom keys with the
same f"{uid.replace('.', '__')}_<suffix>" pattern repeated 12 times
across base.py and triggerer.py for three suffixes (_status, _dbt_event,
_compiled_sql)

Add three slim helpers in cosmos/operators/_watcher/state.py:
get_status_xcom_key, get_dbt_event_xcom_key, get_compiled_sql_xcom_key.
Each is a single-expression function so the ".replace('.', '__')"
sanitisation rule lives in one place.

Replace all 12 inline call sites and, while there, inline the single-use
xcom_key/status_key/compiled_sql_key bindings that introduced a
redundant variable just to immediately pass it as the next argument.

No behaviour change.

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tatiana added a commit that referenced this pull request May 20, 2026
Refreshes the 1.14.2 CHANGELOG draft to incorporate nine more PRs
that @pankajastro added to the Cosmos 1.14.2 milestone after the
initial release PR (#2708) was opened:

* Docs:  #2652, #2653
* Others: #2646, #2661, #2669, #2673, #2679, #2690, #2692

PR #2618 ("Improve glossary") was excluded because it modifies
``docs/reference/glossary.rst``, which does not exist on
``release-1.14`` -- the glossary stub was added by #2461, which is
not part of this release line. Picking #2618 here would require
back-porting #2461 as well; deferring the glossary improvements to
1.15.0 instead.

Each of the nine included PRs has been cherry-picked onto
``release-1.14.2a3`` in its original merge order and applied without
manual conflict resolution. CHANGELOG entries are appended to the
existing Docs and Others sections in the canonical style.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tatiana added a commit that referenced this pull request May 20, 2026
@pankajkoti added #2658 and #2659 to the Cosmos 1.14.2 milestone after
the previous CHANGELOG refresh. Cherry-pick both onto the release
branch and add their entries under Bug Fixes:

* #2658 -- Fix test sensor retry behaviour in ``ExecutionMode.WATCHER``
* #2659 -- Fix watcher fallback selector for versioned dbt models

Both PRs auto-merged cleanly on top of the existing watcher fixes
(#2683 XCom-key sanitization, #2684 BOSS-401 rewrite, #2673 helper
extraction). 1495 watcher-suite tests pass locally after the picks.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
tatiana added a commit that referenced this pull request May 20, 2026
Cuts a new alpha after refreshing the 1.14.2 release branch with 11 additional
milestone PRs:

* From @pankajastro: #2646, #2652, #2653, #2661, #2669, #2673, #2679, #2690, #2692
* From @pankajkoti:  #2658, #2659

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@tatiana tatiana mentioned this pull request May 20, 2026
22 tasks
@tatiana
Copy link
Copy Markdown
Collaborator

tatiana commented May 21, 2026

🚀 Released in Cosmos 1.14.2 (PyPI).

tatiana added a commit that referenced this pull request May 21, 2026
## CHANGELOG entry

1.14.2 (2026-05-21)
-------------------

Behaviour Changes

These changes adjust observable behaviour of the
``ExecutionMode.WATCHER`` execution mode.
None of them breaks the public Cosmos API, but users relying on
undocumented internals
(graph wiring assertions, XCom backup Variable names, retry-on-recovery
semantics, or
retry log format) should review before upgrading.

* ``ExecutionMode.WATCHER`` + ``depends_on_past=True``: when the
producer task has
``depends_on_past=True`` (typically set via ``default_args``), the
producer-done gateway
task inside ``DbtTaskGroup`` is now wired downstream of every consumer
task, in addition
to the producer. This is required so that ``wait_for_downstream`` gating
behaves
correctly across DAG runs and the task group acts as a single unit that
must fully
succeed before the next run starts. Users with ``depends_on_past=False``
(the default)
  see no topology change. See #2615.
* ``ExecutionMode.WATCHER`` downstream retry on upstream recovery: dbt
models that were
skipped after an upstream-failure event are now retried in the same DAG
run when the
upstream task succeeds on retry. Previously these models remained
skipped for the run.
  See #2684.
* ``ExecutionMode.WATCHER`` consumer-retry log format: the consumer's
fallback ``dbt``
invocation no longer inherits the producer's internal ``--log-format
json`` flag, so
retry task logs now default to dbt's normal text format. Users who
relied on JSON output
in retry logs can opt in via ``operator_args={"dbt_cmd_flags":
["--log-format", "json"]}``.
  See #2713.
* ``ExecutionMode.WATCHER`` XCom-backup Variable key scheme: the
per-model XCom backup
Variable key now includes the full task-group path and sanitises
disallowed characters
(``+`` / ``:``) from ``run_id``. External monitoring or cleanup scripts
that match the
  old key pattern will need updating. See #2629 and #2683.

Bug Fixes

* Sanitize disallowed characters from XCom backup variable key by
@MichaelRBlack in #2629
* Prevent watcher producers from colliding on one XCom-backup key by
@tatiana in #2683
* Retry watcher downstream models on upstream-failure recovery by
@tatiana in #2684
* Fix ``ExecutionMode.WATCHER`` interaction with ``depends_on_past`` by
@johnhoran in #2615
* Strip ``--log-format`` from producer flags on watcher consumer retry
by @tatiana in #2713
* Fix duplicate ``deferrable`` kwarg in
``DbtRunAirflowAsyncBigqueryOperator`` by @pankajastro in #2616
* Fix dbt docs iframe ``src`` missing deployment path prefix by
@pankajastro in #2640
* Defer ``TaskInstance`` import in cluster policy to fix Sentry init
crash by @pankajastro in #2662
* Restore type hints broken by lazy imports in ``cosmos/__init__.py`` by
@pankajastro in #2647
* Fix ``ExecutionMode.WATCHER`` non-dbt stdout being suppressed from
logs by @pankajastro in #2654
* Fix test sensor retry behaviour in ``ExecutionMode.WATCHER`` by
@pankajkoti in #2658
* Fix watcher fallback selector for versioned dbt models by @pankajkoti
in #2659
* Break out of iframe from Airflow 2 dbt Docs 404 link by @pankajastro
in #2685

Docs

* Document source freshness aware execution for
``ExecutionMode.WATCHER`` by @pankajastro in #2617
* Add reference docs for ``DbtRunLocalOperator``,
``DbtTestLocalOperator``, ``DbtSnapshotLocalOperator`` and
``DbtBuildLocalOperator`` by @pankajastro in #2643
* Add watcher retry behaviour history documentation by @tatiana in #2600
* Add Apache Airflow® trademark on first prominent mention by
@pankajkoti in #2624
* Sentence-case section headings by @pankajkoti in #2630
* Use ``-`` for bullet points by @pankajkoti in #2631
* Drop decorative separator lines by @pankajkoti in #2632
* Normalize heading underlines in ``docs/guides/`` and
``docs/index.rst`` by @pankajkoti in #2664
* Fix broken cross-directory doc links by @pankajastro in #2694
* Fix broken external links in hand-written docs by @pankajastro in
#2696
* Document support for Airflow 3.2 in the compatibility policy by
@pankajastro in #2652
* Refresh the dbt/Airflow conflicts table to match the compatibility
policy by @pankajastro in #2653
* Document incremental model limitation for
``ExecutionMode.AIRFLOW_ASYNC`` by @pankajastro in #2642

Others

* Import ``ParamValidationError`` from ``airflow.sdk`` to silence
deprecation warning by @pankajastro in #2645
* Import ``DAG`` from ``airflow.sdk`` to silence deprecation warning by
@pankajastro in #2644
* Enforce docs style guide via pre-commit hook by @pankajkoti and
@tatiana in #2633
* Add Airflow 3.2 to the test matrix in ``CLAUDE.md`` by @pankajastro in
#2646
* Document the lazy-logging standard in ``CLAUDE.md`` by @pankajastro in
#2679
* Extract watcher XCom-key helpers and inline single-use bindings by
@pankajastro in #2673
* Remove leftover ``scripts/airflow3`` directory by @pankajastro in
#2661
* Fix ``altered_jaffle_shop`` seed-dep CTE references by @pankajastro in
#2690
* Skip Airflow 3.0 integration test stuck on
``example_watcher_with_freshness`` by @pankajastro in #2692
* Fix typo "constrantis" → "constraints" in tests env comment by
@pankajastro in #2669

## Summary

Drafts the **Cosmos 1.14.2** release. Latest alpha cut is **1.14.2a4** —
refreshed from `1.14.2a3` after maintainers (@pankajastro, @pankajkoti)
added 11 more PRs to the milestone:

- @pankajastro: #2646, #2652, #2653, #2661, #2669, #2673, #2679, #2690,
#2692
- @pankajkoti:  #2658, #2659

All 11 picks applied cleanly on top of the existing branch — no
additional manual conflict resolution needed.

**Excluded:** #2618 ("Improve glossary") — modifies
`docs/reference/glossary.rst`, which doesn't exist on `release-1.14`
(added on main by #2461, never backported). Deferred to 1.15.0.

33 PRs cherry-picked total (22 in the initial a3 cut + 11 in this a4
refresh); two PRs (#2575, #2618) deliberately held back as 1.15.0
content.

## Milestone

[Cosmos
1.14.2](https://github.com/astronomer/astronomer-cosmos/milestone/48) —
33 merged PRs across Bug Fixes, Docs, and Others.

## Inclusion provenance

| Path | PRs | Notes |
|---|---|---|
| **Originally in milestone — a3 cut (12)** | #2629, #2616, #2640,
#2662, #2654, #2683, #2684, #2615, #2694, #2696, #2645, #2644 | Assigned
by maintainers before the initial release-draft run |
| **Pulled in via closed-issue link — a3 cut (1)** | #2647 | Closes
milestone issue #2634 ("Typehinting broken with lazy imports") but the
PR itself was never assigned to the milestone — included via
`closedByPullRequestsReferences` |
| **Added during cherry-pick conflict resolution — a3 cut (9)** | #2631,
#2624, #2630, #2632, #2664, #2633, #2617, #2600, #2643 | Docs PRs whose
absence caused `release-1.14` ↔ `main` textual drift. #2631 caused
#2696's conflict; the rest were transitive dependencies (especially
#2664, on top of the bullet/heading/trademark sweeps). #2643 was added
to unblock #2664's operator-docs conflict |
| **Added to milestone after a3 — included in a4 (11)** | #2646, #2652,
#2653, #2658, #2659, #2661, #2669, #2673, #2679, #2690, #2692 | Added by
@pankajastro and @pankajkoti after the initial draft. All applied
cleanly on top of the a3 cherry-picks |
| **Deliberately excluded (2)** | #2575, #2618 | #2575: documents
`DbtDocsS3KubernetesOperator` with `.. versionadded:: 1.15.0` (already
in the `Cosmos 1.15.0` milestone). #2618: improves a glossary file that
doesn't exist on `release-1.14` (the stub was added by #2461, not
backported) |

### Manual conflict resolution

Cherry-picks that needed manual fix-up. Reviewers should double-check
the files listed below:

| PR | File(s) | Resolution |
|---|---|---|
| **#2664** | `docs/guides/dbt_docs/generating-docs.rst` | **Substantive
exclusion** — manually removed the entire `Upload to S3 from Kubernetes`
section (lines ~46–77 of the incoming diff) that documents
`DbtDocsS3KubernetesOperator` (1.15.0 feature, PR #2575). Kept HEAD (no
S3-from-Kubernetes section). |
| **#2664** |
`docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | Took
incoming side for "Example 1" / "Example 2" heading underlines (`++++`
style — matches #2664's normalization across the rest of the file). Also
took incoming for em-dash → colon ("Example 1 —" → "Example 1:"). |
| **#2664** | `docs/guides/run_dbt/operators/operators.rst` |
Auto-resolved once #2643 was cherry-picked first (added missing
Run/Test/Snapshot/Build operator reference docs that #2664 expected to
be present). |
| **#2633** |
`docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` |
Initially took `'''` (Example 1/2 underlines) per #2633's incoming side
— this broke the file's heading hierarchy and prevented sphinx from
registering the `_watcher-source-freshness:` label, causing the docs
build to fail. **Fixed in a follow-up commit** by reverting to `+++` to
match main and the rest of the file's level-3 sections. |
| **#2617** |
`docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` |
**Substantive trim** — #2617 documented both the 1.14.0 source-freshness
execution path AND the 1.15.0 `freshness_callback` override (which ships
with #2586, not in this release line). Removed the `literalinclude` of
`dev/dags/watcher_with_freshness_check.py` (1.15.0 example DAG, missing
on release-1.14) and the surrounding override section so the 1.14.2 docs
cover only what the 1.14.x line supports. Surfaced as a `-W`
(warnings-as-errors) docs build failure on the first CI run. |
| **#2684** | `cosmos/operators/_watcher/state.py` | Took incoming side
— added two new frozensets (`DBT_UPSTREAM_FAILURE_SKIP_EVENT_NAMES`,
`DBT_SOURCE_FRESHNESS_STALE_STATUSES`) at lines 28–37. HEAD had nothing;
incoming had the additions. |
| **#2615** | `tests/airflow/test_graph.py` | Took incoming side — added
two new tests at the end of the file:
`test_add_watcher_producer_task_passes_freshness_callback_via_setup_operator_args`
and `test_watcher_dependency_wiring`. HEAD had nothing; incoming had the
additions. |

All a4 cherry-picks (11) applied without manual intervention.

## Test plan

> Long-term goal: automate. For now, please pick the slice relevant to
your environment and report deviations as a comment on this PR.

### Watcher mode (Postgres)

- [x] @pankajkoti `example_watcher` (`dev/dags/example_watcher.py`) —
default watcher run; exercises **#2629** (XCom backup key sanitization
triggers on the `+` in any default Airflow run_id)
- [x] (@tatiana) **[NEW]** `example_watcher_xcom_collision`
(`dev/failed_dags/example_watcher_xcom_collision.py`) — validates
**#2683**
- [x] @tatiana **[NEW]** `example_watcher_recovers_skipped_downstream`
(`dev/failed_dags/example_watcher_recovers_skipped_downstream.py`) —
validates **#2684**
- [x] @tatiana `example_watcher` with `default_args={"depends_on_past":
True}` and ≥2 consecutive runs — validates **#2615** (manual edit; no
dedicated example DAG)
- [x] @pankajkoti watcher DAG with at least one model that has tests +
Airflow retries enabled on the test sensor — validates **#2658** (test
sensor retry path)
- [x] @pankajkoti watcher DAG referencing a versioned dbt model (e.g.
`models/foo_v2.sql`) and triggering the fallback selector path —
validates **#2659**

### BigQuery (async)

- [x] @tatiana `simple_dag_async` (`dev/dags/simple_dag_async.py`) —
validates **#2616** (duplicate `deferrable` kwarg fix)

### dbt docs plugin

- [x] (@pankajkoti) `docs_dag` (`dev/dags/dbt_docs.py`) + open the
Cosmos dbt docs URL in the Airflow UI under a non-root deployment path —
validates **#2640** (iframe `src` deployment prefix)

### Cross-cutting scenarios (no dedicated DAG)

- [x] @pankajastro **#2662** — boot Airflow with Cosmos cluster policy +
Sentry init; verify no `TaskInstance`-import crash on startup
- [x] @pankajastro **#2654** — run a watcher DAG that prints non-dbt
stdout (e.g., Snowflake `externalbrowser` auth URL); confirm output
reaches task logs
- [x] (@pankajkoti) **#2647** — `mypy` / IDE inspection of `from cosmos
import DbtDag, ProjectConfig, ProfileConfig, RenderConfig,
ExecutionConfig` resolves attributes
- [x] @pankajastro **#2645, #2644** — boot Airflow 3 with `airflow.sdk`
available; no `ParamValidationError` / `DAG` deprecation warnings from
Cosmos imports
- [x] @pankajastro **#2673** — verify watcher XCom-key extraction left
no behavioural drift (run the existing watcher integration suite
end-to-end on Postgres + Airflow 2.10)

### Docs build (covers all docs PRs)

- [x] (@pankajkoti) `sphinx-build -W -b html docs docs/_build` succeeds
with no warnings (#2617, #2624, #2630, #2631, #2632, #2643, #2600,
#2664, #2694, #2696, #2652, #2653)

### Tooling

- [x] (@pankajkoti) `pre-commit run check-docs-style --all-files` passes
(#2633)

## Reviewer checklist

- [x] CHANGELOG section assignments reviewed
- [x] Entry wording reviewed
- [x] `cosmos/__init__.py` bumped to `1.14.2a4`
- [x] Cosmetic docs PRs (#2624, #2630, #2631, #2632, #2633, #2664)
confirmed acceptable in patch line
- [x] **Manual conflict resolutions** (table above) reviewed
file-by-file
- [x] Test plan executed on at least Postgres + one warehouse
- [x] Ready to mark non-draft

---------

Co-authored-by: Michael Black <4128408+MichaelRBlack@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Pankaj Singh <98807258+pankajastro@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Co-authored-by: John Horan <jhoran@zendesk.com>
Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants