Watcher depends on past#2615
Conversation
Co-authored-by: Copilot <copilot@github.com>
There was a problem hiding this comment.
Pull request overview
This PR addresses a race condition in ExecutionMode.WATCHER when depends_on_past=True by making the DbtTaskGroup behave like a single atomic unit across DAG runs (so a later run’s producer can’t start while a prior run’s watcher consumers are still finishing).
Changes:
- Wire task-group “leaf” tasks to be upstream of the watcher
*_donegate task whendepends_on_past=True. - Set
wait_for_downstream=Trueacross watcher tasks (when applicable) to enforce cross-run downstream completion before starting the next run. - Add a parametrized unit test to validate dependency wiring across
TestBehaviorvariants anddepends_on_pastsettings.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
cosmos/airflow/graph.py |
Adds watcher done task into tasks_map, applies wait_for_downstream behavior when depends_on_past=True, and rewires leaves to the watcher done gate. |
tests/airflow/test_graph.py |
Adds coverage validating watcher dependency wiring and wait_for_downstream behavior across test behaviors. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <copilot@github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <copilot@github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2615 +/- ##
=======================================
Coverage 98.01% 98.01%
=======================================
Files 104 104
Lines 7796 7803 +7
=======================================
+ Hits 7641 7648 +7
Misses 155 155 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| child_2b = DbtNode( | ||
| unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.child2.v2_b", | ||
| resource_type=DbtResourceType.MODEL, | ||
| depends_on=[parent_node.unique_id], | ||
| path_base=SAMPLE_PROJ_PATH, | ||
| original_file_path=Path("gen3/models/child2_v2.sql"), | ||
| tags=["nightly"], | ||
| config={"materialized": "table", "meta": {"cosmos": {"operator_kwargs": {"pool": "custom_pool"}}}}, | ||
| has_test=True, | ||
| has_non_detached_test=True, | ||
| ) | ||
| child_2b_test = DbtNode( | ||
| unique_id=f"{DbtResourceType.TEST.value}.{SAMPLE_PROJ_PATH.stem}.child2.test_v2_b", | ||
| resource_type=DbtResourceType.TEST, | ||
| depends_on=[child_2b.unique_id], | ||
| path_base=Path("."), | ||
| original_file_path=Path("."), | ||
| ) | ||
|
|
||
| build_airflow_graph( | ||
| nodes={child_2b.unique_id: child_2b, child_2b_test.unique_id: child_2b_test, **sample_nodes}, | ||
| dag=dag, | ||
| execution_mode=ExecutionMode.WATCHER, | ||
| test_indirect_selection=TestIndirectSelection.EAGER, | ||
| task_args=task_args, | ||
| render_config=RenderConfig( | ||
| test_behavior=test_behavior, | ||
| ), | ||
| dbt_project_name="astro_shop", | ||
| task_group=TaskGroup("tg", dag=dag), | ||
| ) | ||
| if not depends_on_past: | ||
| assert dag.task_dict["tg.dbt_producer_watcher_done"].upstream_task_ids == {"tg.dbt_producer_watcher"} | ||
| assert all(task.wait_for_downstream is False for task in dag.tasks) | ||
| return | ||
|
|
||
| assert all(task.wait_for_downstream is True for task in dag.tasks if task.task_id != "tg.dbt_producer_watcher_done") | ||
| if test_behavior == TestBehavior.NONE: | ||
| assert dag.task_dict["tg.dbt_producer_watcher_done"].upstream_task_ids == { | ||
| "tg.child_run", | ||
| "tg.dbt_producer_watcher", | ||
| "tg.child2_v2_run", | ||
| "tg.child2_v2_b_run", | ||
| } | ||
| if test_behavior == TestBehavior.AFTER_EACH: | ||
| assert dag.task_dict["tg.dbt_producer_watcher_done"].upstream_task_ids == { | ||
| "tg.child_run", | ||
| "tg.dbt_producer_watcher", | ||
| "tg.child2_v2_run", | ||
| "tg.child2_v2_b.test", | ||
| } | ||
| if test_behavior == TestBehavior.AFTER_ALL: | ||
| assert dag.task_dict["tg.dbt_producer_watcher_done"].upstream_task_ids == { | ||
| "tg.dbt_producer_watcher", | ||
| "tg.astro_shop_test", | ||
| } |
| assert dag.task_dict["tg.dbt_producer_watcher_done"].upstream_task_ids == { | ||
| "tg.dbt_producer_watcher", | ||
| "tg.astro_shop_test", | ||
| } |
There was a problem hiding this comment.
The new test covers the dependency topology well. A small extra assertion that tg.dbt_producer_watcher_done.wait_for_downstream is False would make the intended gate behaviour explicit.
There was a problem hiding this comment.
@johnhoran Thanks a lot for working on this, and I'm sorry for the delay in reviewing. The approach makes sense to me, and I’m comfortable moving forward so we can release the fix.
A few non-blocking follow-ups that would be helpful, in a separate PR after this lands:
- As we agreed, this only addresses the race for
DbtTaskGroup, since thedbt_producer_watcher_donegate only exists whentask_group is None. Could we explicitly:
- Document/call out that
DbtDagis still not covered, and - Create a follow-up issue or PR to either raise an exception on
DbtDagor properly handle it there as well?
-
The watcher docs still describe
max_active_runs=1as the workaround for thedepends_on_past=Truerace. Once this lands, that section should be updated to explain the newDbtTaskGroupbehaviour and any remaining limitations forDbtDag. -
Adding the
AFTER_ALLtest task totasks_maplooks useful, but it is also a behavioural side-change:tasks_mapnow includes an additional entry. In the future, it would be great to split this up into a separate PR. If, for now, we could document this change of behaviour, it would be great. -
Minor inline comment (#2615 (comment))
I’m okay with the current test staying as wiring/topology coverage rather than a full cross-run scheduler integration test, since reproducing the actual concurrent-run race cleanly is much harder and probably not worth blocking this fix.
Four watcher-mode changes in this release adjust observable behaviour without breaking the public API. Flagging them under a new 'Behaviour Changes' heading (intentionally not 'Breaking Changes' since this is a patch release) so users relying on undocumented internals can review before upgrading: - #2615: producer-done gateway task is now downstream of every consumer in DbtTaskGroup when depends_on_past=True. Default (depends_on_past=False) topology unchanged. - #2684: downstream models skipped after upstream-failure now retry when the upstream task recovers (previously remained skipped for the run). - #2713: consumer retry no longer inherits the producer's --log-format json flag. Retry logs default to text format; JSON opt-in via dbt_cmd_flags. - #2629/#2683: XCom backup Variable key scheme changed (now includes task-group path and sanitises run-id). Monitoring scripts matching the old pattern need updating.
## CHANGELOG entry 1.14.2 (2026-05-21) ------------------- Behaviour Changes These changes adjust observable behaviour of the ``ExecutionMode.WATCHER`` execution mode. None of them breaks the public Cosmos API, but users relying on undocumented internals (graph wiring assertions, XCom backup Variable names, retry-on-recovery semantics, or retry log format) should review before upgrading. * ``ExecutionMode.WATCHER`` + ``depends_on_past=True``: when the producer task has ``depends_on_past=True`` (typically set via ``default_args``), the producer-done gateway task inside ``DbtTaskGroup`` is now wired downstream of every consumer task, in addition to the producer. This is required so that ``wait_for_downstream`` gating behaves correctly across DAG runs and the task group acts as a single unit that must fully succeed before the next run starts. Users with ``depends_on_past=False`` (the default) see no topology change. See #2615. * ``ExecutionMode.WATCHER`` downstream retry on upstream recovery: dbt models that were skipped after an upstream-failure event are now retried in the same DAG run when the upstream task succeeds on retry. Previously these models remained skipped for the run. See #2684. * ``ExecutionMode.WATCHER`` consumer-retry log format: the consumer's fallback ``dbt`` invocation no longer inherits the producer's internal ``--log-format json`` flag, so retry task logs now default to dbt's normal text format. Users who relied on JSON output in retry logs can opt in via ``operator_args={"dbt_cmd_flags": ["--log-format", "json"]}``. See #2713. * ``ExecutionMode.WATCHER`` XCom-backup Variable key scheme: the per-model XCom backup Variable key now includes the full task-group path and sanitises disallowed characters (``+`` / ``:``) from ``run_id``. External monitoring or cleanup scripts that match the old key pattern will need updating. See #2629 and #2683. Bug Fixes * Sanitize disallowed characters from XCom backup variable key by @MichaelRBlack in #2629 * Prevent watcher producers from colliding on one XCom-backup key by @tatiana in #2683 * Retry watcher downstream models on upstream-failure recovery by @tatiana in #2684 * Fix ``ExecutionMode.WATCHER`` interaction with ``depends_on_past`` by @johnhoran in #2615 * Strip ``--log-format`` from producer flags on watcher consumer retry by @tatiana in #2713 * Fix duplicate ``deferrable`` kwarg in ``DbtRunAirflowAsyncBigqueryOperator`` by @pankajastro in #2616 * Fix dbt docs iframe ``src`` missing deployment path prefix by @pankajastro in #2640 * Defer ``TaskInstance`` import in cluster policy to fix Sentry init crash by @pankajastro in #2662 * Restore type hints broken by lazy imports in ``cosmos/__init__.py`` by @pankajastro in #2647 * Fix ``ExecutionMode.WATCHER`` non-dbt stdout being suppressed from logs by @pankajastro in #2654 * Fix test sensor retry behaviour in ``ExecutionMode.WATCHER`` by @pankajkoti in #2658 * Fix watcher fallback selector for versioned dbt models by @pankajkoti in #2659 * Break out of iframe from Airflow 2 dbt Docs 404 link by @pankajastro in #2685 Docs * Document source freshness aware execution for ``ExecutionMode.WATCHER`` by @pankajastro in #2617 * Add reference docs for ``DbtRunLocalOperator``, ``DbtTestLocalOperator``, ``DbtSnapshotLocalOperator`` and ``DbtBuildLocalOperator`` by @pankajastro in #2643 * Add watcher retry behaviour history documentation by @tatiana in #2600 * Add Apache Airflow® trademark on first prominent mention by @pankajkoti in #2624 * Sentence-case section headings by @pankajkoti in #2630 * Use ``-`` for bullet points by @pankajkoti in #2631 * Drop decorative separator lines by @pankajkoti in #2632 * Normalize heading underlines in ``docs/guides/`` and ``docs/index.rst`` by @pankajkoti in #2664 * Fix broken cross-directory doc links by @pankajastro in #2694 * Fix broken external links in hand-written docs by @pankajastro in #2696 * Document support for Airflow 3.2 in the compatibility policy by @pankajastro in #2652 * Refresh the dbt/Airflow conflicts table to match the compatibility policy by @pankajastro in #2653 * Document incremental model limitation for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajastro in #2642 Others * Import ``ParamValidationError`` from ``airflow.sdk`` to silence deprecation warning by @pankajastro in #2645 * Import ``DAG`` from ``airflow.sdk`` to silence deprecation warning by @pankajastro in #2644 * Enforce docs style guide via pre-commit hook by @pankajkoti and @tatiana in #2633 * Add Airflow 3.2 to the test matrix in ``CLAUDE.md`` by @pankajastro in #2646 * Document the lazy-logging standard in ``CLAUDE.md`` by @pankajastro in #2679 * Extract watcher XCom-key helpers and inline single-use bindings by @pankajastro in #2673 * Remove leftover ``scripts/airflow3`` directory by @pankajastro in #2661 * Fix ``altered_jaffle_shop`` seed-dep CTE references by @pankajastro in #2690 * Skip Airflow 3.0 integration test stuck on ``example_watcher_with_freshness`` by @pankajastro in #2692 * Fix typo "constrantis" → "constraints" in tests env comment by @pankajastro in #2669 ## Summary Drafts the **Cosmos 1.14.2** release. Latest alpha cut is **1.14.2a4** — refreshed from `1.14.2a3` after maintainers (@pankajastro, @pankajkoti) added 11 more PRs to the milestone: - @pankajastro: #2646, #2652, #2653, #2661, #2669, #2673, #2679, #2690, #2692 - @pankajkoti: #2658, #2659 All 11 picks applied cleanly on top of the existing branch — no additional manual conflict resolution needed. **Excluded:** #2618 ("Improve glossary") — modifies `docs/reference/glossary.rst`, which doesn't exist on `release-1.14` (added on main by #2461, never backported). Deferred to 1.15.0. 33 PRs cherry-picked total (22 in the initial a3 cut + 11 in this a4 refresh); two PRs (#2575, #2618) deliberately held back as 1.15.0 content. ## Milestone [Cosmos 1.14.2](https://github.com/astronomer/astronomer-cosmos/milestone/48) — 33 merged PRs across Bug Fixes, Docs, and Others. ## Inclusion provenance | Path | PRs | Notes | |---|---|---| | **Originally in milestone — a3 cut (12)** | #2629, #2616, #2640, #2662, #2654, #2683, #2684, #2615, #2694, #2696, #2645, #2644 | Assigned by maintainers before the initial release-draft run | | **Pulled in via closed-issue link — a3 cut (1)** | #2647 | Closes milestone issue #2634 ("Typehinting broken with lazy imports") but the PR itself was never assigned to the milestone — included via `closedByPullRequestsReferences` | | **Added during cherry-pick conflict resolution — a3 cut (9)** | #2631, #2624, #2630, #2632, #2664, #2633, #2617, #2600, #2643 | Docs PRs whose absence caused `release-1.14` ↔ `main` textual drift. #2631 caused #2696's conflict; the rest were transitive dependencies (especially #2664, on top of the bullet/heading/trademark sweeps). #2643 was added to unblock #2664's operator-docs conflict | | **Added to milestone after a3 — included in a4 (11)** | #2646, #2652, #2653, #2658, #2659, #2661, #2669, #2673, #2679, #2690, #2692 | Added by @pankajastro and @pankajkoti after the initial draft. All applied cleanly on top of the a3 cherry-picks | | **Deliberately excluded (2)** | #2575, #2618 | #2575: documents `DbtDocsS3KubernetesOperator` with `.. versionadded:: 1.15.0` (already in the `Cosmos 1.15.0` milestone). #2618: improves a glossary file that doesn't exist on `release-1.14` (the stub was added by #2461, not backported) | ### Manual conflict resolution Cherry-picks that needed manual fix-up. Reviewers should double-check the files listed below: | PR | File(s) | Resolution | |---|---|---| | **#2664** | `docs/guides/dbt_docs/generating-docs.rst` | **Substantive exclusion** — manually removed the entire `Upload to S3 from Kubernetes` section (lines ~46–77 of the incoming diff) that documents `DbtDocsS3KubernetesOperator` (1.15.0 feature, PR #2575). Kept HEAD (no S3-from-Kubernetes section). | | **#2664** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | Took incoming side for "Example 1" / "Example 2" heading underlines (`++++` style — matches #2664's normalization across the rest of the file). Also took incoming for em-dash → colon ("Example 1 —" → "Example 1:"). | | **#2664** | `docs/guides/run_dbt/operators/operators.rst` | Auto-resolved once #2643 was cherry-picked first (added missing Run/Test/Snapshot/Build operator reference docs that #2664 expected to be present). | | **#2633** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | Initially took `'''` (Example 1/2 underlines) per #2633's incoming side — this broke the file's heading hierarchy and prevented sphinx from registering the `_watcher-source-freshness:` label, causing the docs build to fail. **Fixed in a follow-up commit** by reverting to `+++` to match main and the rest of the file's level-3 sections. | | **#2617** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | **Substantive trim** — #2617 documented both the 1.14.0 source-freshness execution path AND the 1.15.0 `freshness_callback` override (which ships with #2586, not in this release line). Removed the `literalinclude` of `dev/dags/watcher_with_freshness_check.py` (1.15.0 example DAG, missing on release-1.14) and the surrounding override section so the 1.14.2 docs cover only what the 1.14.x line supports. Surfaced as a `-W` (warnings-as-errors) docs build failure on the first CI run. | | **#2684** | `cosmos/operators/_watcher/state.py` | Took incoming side — added two new frozensets (`DBT_UPSTREAM_FAILURE_SKIP_EVENT_NAMES`, `DBT_SOURCE_FRESHNESS_STALE_STATUSES`) at lines 28–37. HEAD had nothing; incoming had the additions. | | **#2615** | `tests/airflow/test_graph.py` | Took incoming side — added two new tests at the end of the file: `test_add_watcher_producer_task_passes_freshness_callback_via_setup_operator_args` and `test_watcher_dependency_wiring`. HEAD had nothing; incoming had the additions. | All a4 cherry-picks (11) applied without manual intervention. ## Test plan > Long-term goal: automate. For now, please pick the slice relevant to your environment and report deviations as a comment on this PR. ### Watcher mode (Postgres) - [x] @pankajkoti `example_watcher` (`dev/dags/example_watcher.py`) — default watcher run; exercises **#2629** (XCom backup key sanitization triggers on the `+` in any default Airflow run_id) - [x] (@tatiana) **[NEW]** `example_watcher_xcom_collision` (`dev/failed_dags/example_watcher_xcom_collision.py`) — validates **#2683** - [x] @tatiana **[NEW]** `example_watcher_recovers_skipped_downstream` (`dev/failed_dags/example_watcher_recovers_skipped_downstream.py`) — validates **#2684** - [x] @tatiana `example_watcher` with `default_args={"depends_on_past": True}` and ≥2 consecutive runs — validates **#2615** (manual edit; no dedicated example DAG) - [x] @pankajkoti watcher DAG with at least one model that has tests + Airflow retries enabled on the test sensor — validates **#2658** (test sensor retry path) - [x] @pankajkoti watcher DAG referencing a versioned dbt model (e.g. `models/foo_v2.sql`) and triggering the fallback selector path — validates **#2659** ### BigQuery (async) - [x] @tatiana `simple_dag_async` (`dev/dags/simple_dag_async.py`) — validates **#2616** (duplicate `deferrable` kwarg fix) ### dbt docs plugin - [x] (@pankajkoti) `docs_dag` (`dev/dags/dbt_docs.py`) + open the Cosmos dbt docs URL in the Airflow UI under a non-root deployment path — validates **#2640** (iframe `src` deployment prefix) ### Cross-cutting scenarios (no dedicated DAG) - [x] @pankajastro **#2662** — boot Airflow with Cosmos cluster policy + Sentry init; verify no `TaskInstance`-import crash on startup - [x] @pankajastro **#2654** — run a watcher DAG that prints non-dbt stdout (e.g., Snowflake `externalbrowser` auth URL); confirm output reaches task logs - [x] (@pankajkoti) **#2647** — `mypy` / IDE inspection of `from cosmos import DbtDag, ProjectConfig, ProfileConfig, RenderConfig, ExecutionConfig` resolves attributes - [x] @pankajastro **#2645, #2644** — boot Airflow 3 with `airflow.sdk` available; no `ParamValidationError` / `DAG` deprecation warnings from Cosmos imports - [x] @pankajastro **#2673** — verify watcher XCom-key extraction left no behavioural drift (run the existing watcher integration suite end-to-end on Postgres + Airflow 2.10) ### Docs build (covers all docs PRs) - [x] (@pankajkoti) `sphinx-build -W -b html docs docs/_build` succeeds with no warnings (#2617, #2624, #2630, #2631, #2632, #2643, #2600, #2664, #2694, #2696, #2652, #2653) ### Tooling - [x] (@pankajkoti) `pre-commit run check-docs-style --all-files` passes (#2633) ## Reviewer checklist - [x] CHANGELOG section assignments reviewed - [x] Entry wording reviewed - [x] `cosmos/__init__.py` bumped to `1.14.2a4` - [x] Cosmetic docs PRs (#2624, #2630, #2631, #2632, #2633, #2664) confirmed acceptable in patch line - [x] **Manual conflict resolutions** (table above) reviewed file-by-file - [x] Test plan executed on at least Postgres + one warehouse - [x] Ready to mark non-draft --------- Co-authored-by: Michael Black <4128408+MichaelRBlack@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com> Co-authored-by: John Horan <jhoran@zendesk.com> Co-authored-by: Copilot <copilot@github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
|
🚀 Released in Cosmos 1.14.2 (PyPI). |
Two consistency / accuracy fixes pointed out in review: 1. "Does the Airflow state match dbt's?" — 1.14.1 row no longer contradicts the new 1.14.2 false-green note. It was previously a bare "Yes", but #2684 (added in 1.14.2) closes a real false-green gap that existed in 1.14.1: upstream-failure-skipped downstream nodes were silently marked SKIPPED while the DAG completed success. The 1.14.1 row now reads "Mostly, except upstream- failure-skipped downstream nodes could produce a 'false green' -- fixed in 1.14.2 by #2684", with the explicit forward pointer. 2. "Avoid duplicate or concurrent runs" — 1.14.2 row corrected to describe the actual topology landed by #2615. Previously read "the producer-done gateway is wired downstream of every consumer task and the task-group leaves carry wait_for_downstream=True", which overstates both halves: only leaf consumer tasks (those without downstream tasks) get wired upstream of the gateway (see cosmos/airflow/graph.py:795-796), and wait_for_downstream=True is set on the producer plus all watcher consumer tasks, not just task group leaves (see graph.py:777, 793). Reworded to: "leaf consumer tasks are wired upstream of the producer-done gateway, and the producer plus watcher tasks carry wait_for_downstream=True." Verified: hatch run docs:sphinx-build -W -b html docs docs/_build/html succeeds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Description
Adds a
wait_for_downstreamto all tasks in the DbtTaskGroup ifdepends_on_pastis set to true. Ifdepends_on_pastis set to true, then the aim is to consider the DbtTaskGroup as a single atomic step. Most of the time this is fine, but as outlined in #2596, if a run fails and is marked as skipped, then the producer task from a subsequent run can start, even if the watcher consumer tasks from the initial run haven't completed.This PR aims to fix that by moving the leaves of the task group so they are upstream of the watcher done task, and have

wait_for_downstreamset to true, in which case airflow will only start a task group if the previous run watcher done task has completed.Before:
After this change when using

TestBehavior.AFTER_ALLAfter this change when using

TestBehavior.AFTER_EACHAs an aside, I also put the test added for
TestBehavior.AFTER_ALLtask_map, mostly so I'm only modifying thewait_for_downstreamin a single method, but also I suspect there was a bug when using it withExecutionConfig.AIRFLOW_ASYNC.astronomer-cosmos/cosmos/airflow/graph.py
Lines 881 to 883 in c919104
I haven't used that mode, so I can't really test that.
Related Issue(s)
closes #2596
Breaking Change?
Checklist