Prevent watcher producers from colliding on one XCom-backup key#2683
Conversation
There was a problem hiding this comment.
Pull request overview
Fixes issue #2625 where multiple DbtTaskGroups using ExecutionMode.WATCHER in the same DAG collide on a single XCom backup Variable key. The root cause is _get_task_group_id reading the nonexistent task.task_group_id attribute (instead of task.task_group.group_id), causing the discriminating group segment to be silently dropped from the key.
Changes:
- Fix
_get_task_group_idincosmos/operators/_watcher/xcom.pyto accesstask.task_group.group_iddirectly (failing loudly on schema drift instead of silently). - Add a regression integration test asserting two
DbtTaskGroups yield distinct backup keys, and update/extend unit tests to use aspec'd mock so the bug can no longer be masked. - Add an example DAG
dev/dags/example_watcher_xcom_collision.pyreproducing the original issue.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| cosmos/operators/_watcher/xcom.py | Reads task group id via the correct task.task_group.group_id path with direct attribute access. |
| tests/operators/_watcher/test_state.py | Updates unit test to mock the correct attribute path; adds a test ensuring malformed tasks raise. |
| tests/operators/test_watcher.py | Adds integration test verifying two watcher producers compute distinct backup keys. |
| dev/dags/example_watcher_xcom_collision.py | New example DAG reproducing the collision scenario end-to-end. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2683 +/- ##
==========================================
+ Coverage 95.74% 98.03% +2.29%
==========================================
Files 105 105
Lines 7864 7846 -18
==========================================
+ Hits 7529 7692 +163
+ Misses 335 154 -181 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
pankajastro
left a comment
There was a problem hiding this comment.
Really clean fix — the root-cause analysis (silent getattr masking a non-existent attribute) and the three-layer regression strategy (real-attribute fix, MagicMock(spec=…) hardening, cross-group integration test) are excellent. Thanks for the thorough writeup!
One tiny doc nit: the PR description references example_watcher_xcom_collision.py as new, but it was removed in a follow-up commit and isn't in the final diff. Mind dropping the reference from the body so future readers don't go looking for it?
One cosmetic suggestion inline — nothing blocking.
🤖 Generated with Claude Code on behalf of @pankajastro
pankajkoti
left a comment
There was a problem hiding this comment.
Have a couple of questions in-line, but the fix for correcting the non-existent task_group_id -> task.task_group.group_id is a must, I agree.
Address PR #2683 feedback from pankajastro and pankajkoti: - Restore example_watcher_xcom_collision.py under dev/failed_dags/, the conventional location for repro DAGs of known issues. The path resolution matches the sibling example_watcher_failing_* files. - Correct the inline comment on _get_task_group_id and drop the defensive ``task_group is not None`` branch. On a bound operator ``task.task_group`` is always a TaskGroup -- the implicit root group (whose ``group_id`` is None) for top-level tasks, or the enclosing user-defined group otherwise. The previous wording inverted this. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #2683 feedback from pankajastro and pankajkoti: - Restore example_watcher_xcom_collision.py under dev/failed_dags/, the conventional location for repro DAGs of known issues. The path resolution matches the sibling example_watcher_failing_* files. - Correct the inline comment on _get_task_group_id and drop the defensive ``task_group is not None`` branch. On a bound operator ``task.task_group`` is always a TaskGroup -- the implicit root group (whose ``group_id`` is None) for top-level tasks, or the enclosing user-defined group otherwise. The previous wording inverted this. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
03cb302 to
ec60814
Compare
When a DAG contained multiple DbtTaskGroups using ExecutionMode.WATCHER,
every dbt_producer_watcher task in the same DAG run computed the same
Airflow Variable key for its XCom backup:
cosmos_xcom_backup__{dag_id}__{run_id}
The task-group segment was supposed to discriminate concurrent producers
but was always dropped because the helper read ``task.task_group_id`` --
an attribute that does not exist on Airflow operators in either Airflow
2 or Airflow 3. ``getattr(task, "task_group_id", None)`` silently
returned ``None``, the segment was omitted from the key, and every
producer in the DAG run wrote to the same Variable row.
The consequence depends on the Airflow version:
- Airflow 2 raises ``UniqueViolation`` on ``variable_key_uq`` for the
loser of every INSERT race -- a noisy ERROR on every model completion.
- Airflow 3 silently upserts (``ON CONFLICT DO UPDATE``), so the last
writer wins. If one producer fails and retries,
``_restore_xcom_from_variable`` may load the *other* group's XCom
snapshot, an undetected correctness bug.
Fix the helper to read ``task.task_group.group_id`` instead -- the
canonical path on both Airflow 2 and 3, and the same one used by
``_get_dbt_dag_task_group_identifier`` in ``cosmos/airflow/graph.py``.
Use direct attribute access on ``task.task_group``/``group_id`` (rather
than ``getattr`` with a ``None`` default) so a future rename or typo
fails loudly with ``AttributeError`` instead of degrading silently to
the colliding-key behaviour.
Three places kept this bug hidden:
1. The existing unit test used ``MagicMock(task_group_id="my_group")``;
``MagicMock`` fabricates any attribute access, so the test passed
against a mock that did not match the real operator shape. The test
is reworked to mock ``task.task_group.group_id`` and a new test uses
``MagicMock(spec=[...])`` to forbid attribute fabrication.
2. No integration test asserted that two DbtTaskGroups in one DAG
produce distinct backup keys. A new integration test in
``tests/operators/test_watcher.py`` does, exercising the real
``_init_xcom_backup`` against real producer operators.
3. ``Variable.set`` is an upsert in Airflow 3, so end-to-end DAG runs
silently masked the failure on AF3.
A reproduction DAG ``dev/dags/example_watcher_xcom_collision.py`` is
added to demonstrate the issue end-to-end with two parallel
DbtTaskGroups against ``jaffle_shop``.
Closes #2625
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #2683 feedback from pankajastro and pankajkoti: - Restore example_watcher_xcom_collision.py under dev/failed_dags/, the conventional location for repro DAGs of known issues. The path resolution matches the sibling example_watcher_failing_* files. - Correct the inline comment on _get_task_group_id and drop the defensive ``task_group is not None`` branch. On a bound operator ``task.task_group`` is always a TaskGroup -- the implicit root group (whose ``group_id`` is None) for top-level tasks, or the enclosing user-defined group otherwise. The previous wording inverted this. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ec60814 to
ffc3d43
Compare
mypy rejected ``_get_task_group_id`` returning ``Any`` from a function declared ``str | None`` (CI: 76715404131). The intermediate variable gets an explicit ``str | None`` annotation so mypy is satisfied while the runtime behaviour stays identical. Also update CLAUDE.md to require running ``hatch run tests.py3.10-3.1-1.9:type-check`` (or ``pre-commit run mypy --all-files``) before committing or opening a PR, so this class of failure is caught locally rather than wasting a CI round-trip. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@pankajkoti added #2658 and #2659 to the Cosmos 1.14.2 milestone after the previous CHANGELOG refresh. Cherry-pick both onto the release branch and add their entries under Bug Fixes: * #2658 -- Fix test sensor retry behaviour in ``ExecutionMode.WATCHER`` * #2659 -- Fix watcher fallback selector for versioned dbt models Both PRs auto-merged cleanly on top of the existing watcher fixes (#2683 XCom-key sanitization, #2684 BOSS-401 rewrite, #2673 helper extraction). 1495 watcher-suite tests pass locally after the picks. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Four watcher-mode changes in this release adjust observable behaviour without breaking the public API. Flagging them under a new 'Behaviour Changes' heading (intentionally not 'Breaking Changes' since this is a patch release) so users relying on undocumented internals can review before upgrading: - #2615: producer-done gateway task is now downstream of every consumer in DbtTaskGroup when depends_on_past=True. Default (depends_on_past=False) topology unchanged. - #2684: downstream models skipped after upstream-failure now retry when the upstream task recovers (previously remained skipped for the run). - #2713: consumer retry no longer inherits the producer's --log-format json flag. Retry logs default to text format; JSON opt-in via dbt_cmd_flags. - #2629/#2683: XCom backup Variable key scheme changed (now includes task-group path and sanitises run-id). Monitoring scripts matching the old pattern need updating.
|
🚀 Released in Cosmos 1.14.2 (PyPI). |
## CHANGELOG entry 1.14.2 (2026-05-21) ------------------- Behaviour Changes These changes adjust observable behaviour of the ``ExecutionMode.WATCHER`` execution mode. None of them breaks the public Cosmos API, but users relying on undocumented internals (graph wiring assertions, XCom backup Variable names, retry-on-recovery semantics, or retry log format) should review before upgrading. * ``ExecutionMode.WATCHER`` + ``depends_on_past=True``: when the producer task has ``depends_on_past=True`` (typically set via ``default_args``), the producer-done gateway task inside ``DbtTaskGroup`` is now wired downstream of every consumer task, in addition to the producer. This is required so that ``wait_for_downstream`` gating behaves correctly across DAG runs and the task group acts as a single unit that must fully succeed before the next run starts. Users with ``depends_on_past=False`` (the default) see no topology change. See #2615. * ``ExecutionMode.WATCHER`` downstream retry on upstream recovery: dbt models that were skipped after an upstream-failure event are now retried in the same DAG run when the upstream task succeeds on retry. Previously these models remained skipped for the run. See #2684. * ``ExecutionMode.WATCHER`` consumer-retry log format: the consumer's fallback ``dbt`` invocation no longer inherits the producer's internal ``--log-format json`` flag, so retry task logs now default to dbt's normal text format. Users who relied on JSON output in retry logs can opt in via ``operator_args={"dbt_cmd_flags": ["--log-format", "json"]}``. See #2713. * ``ExecutionMode.WATCHER`` XCom-backup Variable key scheme: the per-model XCom backup Variable key now includes the full task-group path and sanitises disallowed characters (``+`` / ``:``) from ``run_id``. External monitoring or cleanup scripts that match the old key pattern will need updating. See #2629 and #2683. Bug Fixes * Sanitize disallowed characters from XCom backup variable key by @MichaelRBlack in #2629 * Prevent watcher producers from colliding on one XCom-backup key by @tatiana in #2683 * Retry watcher downstream models on upstream-failure recovery by @tatiana in #2684 * Fix ``ExecutionMode.WATCHER`` interaction with ``depends_on_past`` by @johnhoran in #2615 * Strip ``--log-format`` from producer flags on watcher consumer retry by @tatiana in #2713 * Fix duplicate ``deferrable`` kwarg in ``DbtRunAirflowAsyncBigqueryOperator`` by @pankajastro in #2616 * Fix dbt docs iframe ``src`` missing deployment path prefix by @pankajastro in #2640 * Defer ``TaskInstance`` import in cluster policy to fix Sentry init crash by @pankajastro in #2662 * Restore type hints broken by lazy imports in ``cosmos/__init__.py`` by @pankajastro in #2647 * Fix ``ExecutionMode.WATCHER`` non-dbt stdout being suppressed from logs by @pankajastro in #2654 * Fix test sensor retry behaviour in ``ExecutionMode.WATCHER`` by @pankajkoti in #2658 * Fix watcher fallback selector for versioned dbt models by @pankajkoti in #2659 * Break out of iframe from Airflow 2 dbt Docs 404 link by @pankajastro in #2685 Docs * Document source freshness aware execution for ``ExecutionMode.WATCHER`` by @pankajastro in #2617 * Add reference docs for ``DbtRunLocalOperator``, ``DbtTestLocalOperator``, ``DbtSnapshotLocalOperator`` and ``DbtBuildLocalOperator`` by @pankajastro in #2643 * Add watcher retry behaviour history documentation by @tatiana in #2600 * Add Apache Airflow® trademark on first prominent mention by @pankajkoti in #2624 * Sentence-case section headings by @pankajkoti in #2630 * Use ``-`` for bullet points by @pankajkoti in #2631 * Drop decorative separator lines by @pankajkoti in #2632 * Normalize heading underlines in ``docs/guides/`` and ``docs/index.rst`` by @pankajkoti in #2664 * Fix broken cross-directory doc links by @pankajastro in #2694 * Fix broken external links in hand-written docs by @pankajastro in #2696 * Document support for Airflow 3.2 in the compatibility policy by @pankajastro in #2652 * Refresh the dbt/Airflow conflicts table to match the compatibility policy by @pankajastro in #2653 * Document incremental model limitation for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajastro in #2642 Others * Import ``ParamValidationError`` from ``airflow.sdk`` to silence deprecation warning by @pankajastro in #2645 * Import ``DAG`` from ``airflow.sdk`` to silence deprecation warning by @pankajastro in #2644 * Enforce docs style guide via pre-commit hook by @pankajkoti and @tatiana in #2633 * Add Airflow 3.2 to the test matrix in ``CLAUDE.md`` by @pankajastro in #2646 * Document the lazy-logging standard in ``CLAUDE.md`` by @pankajastro in #2679 * Extract watcher XCom-key helpers and inline single-use bindings by @pankajastro in #2673 * Remove leftover ``scripts/airflow3`` directory by @pankajastro in #2661 * Fix ``altered_jaffle_shop`` seed-dep CTE references by @pankajastro in #2690 * Skip Airflow 3.0 integration test stuck on ``example_watcher_with_freshness`` by @pankajastro in #2692 * Fix typo "constrantis" → "constraints" in tests env comment by @pankajastro in #2669 ## Summary Drafts the **Cosmos 1.14.2** release. Latest alpha cut is **1.14.2a4** — refreshed from `1.14.2a3` after maintainers (@pankajastro, @pankajkoti) added 11 more PRs to the milestone: - @pankajastro: #2646, #2652, #2653, #2661, #2669, #2673, #2679, #2690, #2692 - @pankajkoti: #2658, #2659 All 11 picks applied cleanly on top of the existing branch — no additional manual conflict resolution needed. **Excluded:** #2618 ("Improve glossary") — modifies `docs/reference/glossary.rst`, which doesn't exist on `release-1.14` (added on main by #2461, never backported). Deferred to 1.15.0. 33 PRs cherry-picked total (22 in the initial a3 cut + 11 in this a4 refresh); two PRs (#2575, #2618) deliberately held back as 1.15.0 content. ## Milestone [Cosmos 1.14.2](https://github.com/astronomer/astronomer-cosmos/milestone/48) — 33 merged PRs across Bug Fixes, Docs, and Others. ## Inclusion provenance | Path | PRs | Notes | |---|---|---| | **Originally in milestone — a3 cut (12)** | #2629, #2616, #2640, #2662, #2654, #2683, #2684, #2615, #2694, #2696, #2645, #2644 | Assigned by maintainers before the initial release-draft run | | **Pulled in via closed-issue link — a3 cut (1)** | #2647 | Closes milestone issue #2634 ("Typehinting broken with lazy imports") but the PR itself was never assigned to the milestone — included via `closedByPullRequestsReferences` | | **Added during cherry-pick conflict resolution — a3 cut (9)** | #2631, #2624, #2630, #2632, #2664, #2633, #2617, #2600, #2643 | Docs PRs whose absence caused `release-1.14` ↔ `main` textual drift. #2631 caused #2696's conflict; the rest were transitive dependencies (especially #2664, on top of the bullet/heading/trademark sweeps). #2643 was added to unblock #2664's operator-docs conflict | | **Added to milestone after a3 — included in a4 (11)** | #2646, #2652, #2653, #2658, #2659, #2661, #2669, #2673, #2679, #2690, #2692 | Added by @pankajastro and @pankajkoti after the initial draft. All applied cleanly on top of the a3 cherry-picks | | **Deliberately excluded (2)** | #2575, #2618 | #2575: documents `DbtDocsS3KubernetesOperator` with `.. versionadded:: 1.15.0` (already in the `Cosmos 1.15.0` milestone). #2618: improves a glossary file that doesn't exist on `release-1.14` (the stub was added by #2461, not backported) | ### Manual conflict resolution Cherry-picks that needed manual fix-up. Reviewers should double-check the files listed below: | PR | File(s) | Resolution | |---|---|---| | **#2664** | `docs/guides/dbt_docs/generating-docs.rst` | **Substantive exclusion** — manually removed the entire `Upload to S3 from Kubernetes` section (lines ~46–77 of the incoming diff) that documents `DbtDocsS3KubernetesOperator` (1.15.0 feature, PR #2575). Kept HEAD (no S3-from-Kubernetes section). | | **#2664** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | Took incoming side for "Example 1" / "Example 2" heading underlines (`++++` style — matches #2664's normalization across the rest of the file). Also took incoming for em-dash → colon ("Example 1 —" → "Example 1:"). | | **#2664** | `docs/guides/run_dbt/operators/operators.rst` | Auto-resolved once #2643 was cherry-picked first (added missing Run/Test/Snapshot/Build operator reference docs that #2664 expected to be present). | | **#2633** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | Initially took `'''` (Example 1/2 underlines) per #2633's incoming side — this broke the file's heading hierarchy and prevented sphinx from registering the `_watcher-source-freshness:` label, causing the docs build to fail. **Fixed in a follow-up commit** by reverting to `+++` to match main and the rest of the file's level-3 sections. | | **#2617** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | **Substantive trim** — #2617 documented both the 1.14.0 source-freshness execution path AND the 1.15.0 `freshness_callback` override (which ships with #2586, not in this release line). Removed the `literalinclude` of `dev/dags/watcher_with_freshness_check.py` (1.15.0 example DAG, missing on release-1.14) and the surrounding override section so the 1.14.2 docs cover only what the 1.14.x line supports. Surfaced as a `-W` (warnings-as-errors) docs build failure on the first CI run. | | **#2684** | `cosmos/operators/_watcher/state.py` | Took incoming side — added two new frozensets (`DBT_UPSTREAM_FAILURE_SKIP_EVENT_NAMES`, `DBT_SOURCE_FRESHNESS_STALE_STATUSES`) at lines 28–37. HEAD had nothing; incoming had the additions. | | **#2615** | `tests/airflow/test_graph.py` | Took incoming side — added two new tests at the end of the file: `test_add_watcher_producer_task_passes_freshness_callback_via_setup_operator_args` and `test_watcher_dependency_wiring`. HEAD had nothing; incoming had the additions. | All a4 cherry-picks (11) applied without manual intervention. ## Test plan > Long-term goal: automate. For now, please pick the slice relevant to your environment and report deviations as a comment on this PR. ### Watcher mode (Postgres) - [x] @pankajkoti `example_watcher` (`dev/dags/example_watcher.py`) — default watcher run; exercises **#2629** (XCom backup key sanitization triggers on the `+` in any default Airflow run_id) - [x] (@tatiana) **[NEW]** `example_watcher_xcom_collision` (`dev/failed_dags/example_watcher_xcom_collision.py`) — validates **#2683** - [x] @tatiana **[NEW]** `example_watcher_recovers_skipped_downstream` (`dev/failed_dags/example_watcher_recovers_skipped_downstream.py`) — validates **#2684** - [x] @tatiana `example_watcher` with `default_args={"depends_on_past": True}` and ≥2 consecutive runs — validates **#2615** (manual edit; no dedicated example DAG) - [x] @pankajkoti watcher DAG with at least one model that has tests + Airflow retries enabled on the test sensor — validates **#2658** (test sensor retry path) - [x] @pankajkoti watcher DAG referencing a versioned dbt model (e.g. `models/foo_v2.sql`) and triggering the fallback selector path — validates **#2659** ### BigQuery (async) - [x] @tatiana `simple_dag_async` (`dev/dags/simple_dag_async.py`) — validates **#2616** (duplicate `deferrable` kwarg fix) ### dbt docs plugin - [x] (@pankajkoti) `docs_dag` (`dev/dags/dbt_docs.py`) + open the Cosmos dbt docs URL in the Airflow UI under a non-root deployment path — validates **#2640** (iframe `src` deployment prefix) ### Cross-cutting scenarios (no dedicated DAG) - [x] @pankajastro **#2662** — boot Airflow with Cosmos cluster policy + Sentry init; verify no `TaskInstance`-import crash on startup - [x] @pankajastro **#2654** — run a watcher DAG that prints non-dbt stdout (e.g., Snowflake `externalbrowser` auth URL); confirm output reaches task logs - [x] (@pankajkoti) **#2647** — `mypy` / IDE inspection of `from cosmos import DbtDag, ProjectConfig, ProfileConfig, RenderConfig, ExecutionConfig` resolves attributes - [x] @pankajastro **#2645, #2644** — boot Airflow 3 with `airflow.sdk` available; no `ParamValidationError` / `DAG` deprecation warnings from Cosmos imports - [x] @pankajastro **#2673** — verify watcher XCom-key extraction left no behavioural drift (run the existing watcher integration suite end-to-end on Postgres + Airflow 2.10) ### Docs build (covers all docs PRs) - [x] (@pankajkoti) `sphinx-build -W -b html docs docs/_build` succeeds with no warnings (#2617, #2624, #2630, #2631, #2632, #2643, #2600, #2664, #2694, #2696, #2652, #2653) ### Tooling - [x] (@pankajkoti) `pre-commit run check-docs-style --all-files` passes (#2633) ## Reviewer checklist - [x] CHANGELOG section assignments reviewed - [x] Entry wording reviewed - [x] `cosmos/__init__.py` bumped to `1.14.2a4` - [x] Cosmetic docs PRs (#2624, #2630, #2631, #2632, #2633, #2664) confirmed acceptable in patch line - [x] **Manual conflict resolutions** (table above) reviewed file-by-file - [x] Test plan executed on at least Postgres + one warehouse - [x] Ready to mark non-draft --------- Co-authored-by: Michael Black <4128408+MichaelRBlack@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com> Co-authored-by: John Horan <jhoran@zendesk.com> Co-authored-by: Copilot <copilot@github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Closes #2625.
Closes BOSS-83.
Closes BOSS-157.
The problem
When a DAG contained multiple
DbtTaskGroups usingExecutionMode.WATCHER, everydbt_producer_watcherin the same DAG run computed the same Airflow Variable key for its XCom backup:The task-group segment was supposed to discriminate concurrent producers, but the helper that read it (
_get_task_group_idincosmos/operators/_watcher/xcom.py) was readingtask.task_group_id— an attribute that does not exist on Airflow operators in either Airflow 2 or Airflow 3 (the canonical path istask.task_group.group_id). Because the read went throughgetattr(task, "task_group_id", None), it silently returnedNone; the segment was omitted from the key, and every producer in the DAG run wrote to the same Variable row.The consequence depends on the Airflow version:
UniqueViolationonvariable_key_uq— a noisyERRORon every model completion (this is the original symptom from the issue report).Variable.setis an upsert (ON CONFLICT DO UPDATE), so the conflict is silently absorbed and the last writer wins. If one producer fails and retries,_restore_xcom_from_variablemay load the other group's XCom snapshot — an undetected correctness bug.How to reproduce
dev/failed_dags/example_watcher_xcom_collision.pyreproduces the issue end-to-end against thejaffle_shopproject: twoDbtTaskGroups (group_aselecting+customers,group_bselecting+orders), no inter-group dependency, both running in parallel.Reproduction conditions (from the issue report):
parallelism=1)Trigger the DAG manually; on the first node-status XCom in whichever producer loses the INSERT race, you see — before this fix:
After this fix the two producers compute distinct keys (
…__group_a__…and…__group_b__…) and neither symptom occurs.The fix
_get_task_group_idnow readstask.task_group.group_id— the same path that_get_dbt_dag_task_group_identifierincosmos/airflow/graph.pyalready uses..task_group/.group_idare accessed directly (not viagetattrwith aNonedefault), so a future rename or typo at this site raisesAttributeErrorat first invocation instead of silently degrading to the colliding-key behaviour. The outergetattr(ti, "task", None)is kept because the runtime-TI shape varies legitimately betweenTaskInstance,RuntimeTaskInstance, and test stubs.How do we prevent this from happening again
Three independent reasons let the bug slip through, addressed in three independent ways:
The unit test masked the bug with
MagicMock. The previous test didti.task = MagicMock(task_group_id="my_group"). BecauseMagicMockfabricates any attribute access,getattr(mock, "task_group_id", None)returned"my_group"and the test passed — even though the same call against a real operator always returnedNone. The test (tests/operators/_watcher/test_state.py::TestInitXcomBackup) is reworked to mocktask.task_group.group_id, and a new testtest_rejects_task_without_task_group_attributeusesMagicMock(spec=[...])to forbid fabrication and asserts the helper raises loudly when fed a malformed task.No test asserted uniqueness across task groups. A new integration test
test_dbt_task_groups_with_watcher_produce_distinct_xcom_backup_keysintests/operators/test_watcher.pybuilds twoDbtTaskGroups in one DAG, drives_init_xcom_backupagainst each real producer operator (exercising the actual code path the bug lived in), and asserts the resulting backup keys differ — and containgroup_a/group_b. This test fails onmainwith the same colliding key the user sees in production, and passes after the helper fix.Direct attribute access replaces silent-fallback
getattron the contract attributes (task.task_group,.group_id). A future typo or rename at the failure site now surfaces asAttributeErrorat first invocation instead of degrading to a colliding key. This is the safer failure mode the originalgetattrchain lacked.🤖 Generated with Claude Code