Retry watcher downstream models on upstream-failure recovery#2684
Conversation
Allow `hatch run tests.<env>:test-integration -k <name>` (and other
pytest flags) to filter or otherwise customize the integration run
without editing scripts/test/integration.sh.
`hatch run` does not forward extra args to a script unless the script
definition contains the `{args}` placeholder. The script itself must
also accept the forwarded args via `"$@"`. Without both pieces in
place, attempts to filter with `-k` or `PYTEST_ADDOPTS` are silently
ignored -- the env-var approach in particular fails because
integration.sh already passes its own `-k` exclusion list and pytest
only honors the last `-k`.
This change adds `{args}` to the pyproject script definition, `"$@"`
to the pytest invocation in integration.sh, and documents the new
filtering workflow in the contributor guide. CI behavior is unchanged
because CI invokes the script without extra args.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When a watcher producer's first dbt build attempt fails, dbt marks the failed model and every transitive downstream node with ``node_status="skipped"`` and emits ``SkippingDetails`` events (``LogSkipBecauseError`` when the upstream is ephemeral). The producer log parser pushed those "skipped" statuses straight to XCom, so each downstream consumer sensor raised ``AirflowSkipException`` and ended in SKIPPED. Airflow does not retry SKIPPED tasks, so even after the failing upstream model recovered via its own consumer-retry fallback, the downstream model was never re-run. The DAG completed "green" with the downstream tables silently un-materialized. See Linear BOSS-401 for the customer reproduction. This rewrites the affected status from "skipped" to "failed" so the downstream consumer fails on attempt 1, Airflow retries it, and the existing ``_fallback_to_non_watcher_run`` path runs the model locally once the upstream has recovered. ``SkippingDetails`` / ``LogSkipBecauseError`` are the right discriminator because they are fired only from dbt's ``on_skip()``, which is reached only when ``do_skip(cause=...)`` was called -- i.e. exclusively on upstream-node failure. Empty, ephemeral, and selector-excluded skips do not fire these events. dbt also emits a later ``NodeFinished`` with ``node_status="skipped"`` for the same node from the runner's ``finally`` block, which would otherwise overwrite the rewritten XCom; tracking affected ``unique_id``\\ s in a per-execution set lets us rewrite both events consistently. The fix is in the shared log parser, so it applies uniformly to both ``InvocationMode.DBT_RUNNER`` (protobuf events serialised via ``MessageToJson``) and ``InvocationMode.SUBPROCESS`` (``--log-format json`` stdout lines). The producer operator owns the accumulator set (``DbtProducerWatcherOperator._upstream_failure_skipped_ids``), bound to the parser via ``functools.partial`` and cleared at the start of each ``execute()``. A new dbt project ``dev/dags/dbt/watcher_upstream_failure_recovery`` (three models: ``model_a`` succeeds, ``model_flaky`` fails-once via a Postgres sequence, ``model_downstream`` depends on ``model_flaky``) and an integration test ``test_dbt_task_group_watcher_retry_recovers_skipped_downstream`` exercise the end-to-end recovery. The test asserts that ``model_downstream`` lands in ``success`` (was ``skipped`` before the fix) and that its ``try_number > 1`` (proves the consumer fallback path actually fired). The fail-once sequence drop is extracted into a ``reset_fail_once_sequence`` pytest fixture so both this test and the existing ``test_dbt_task_group_watcher_gateway_prevents_downstream_skip`` reuse it. A standalone example DAG ``example_watcher_recovers_skipped_downstream`` is added under ``dev/failed_dags/`` for visual reproduction in Airflow standalone -- failing once on attempt 1, then recovering both the flaky upstream and the (previously) skipped downstream on retry. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR fixes BOSS-401: in watcher mode, when a producer's first dbt build attempt fails, dbt marks downstream nodes as "skipped". Cosmos pushed that status straight to XCom, so consumer sensors raised AirflowSkipException and Airflow never retried the SKIPPED downstream tasks — even after the failing upstream recovered via the existing consumer-retry fallback. The DAG ended in a "false green" with un-materialized tables. The fix tracks unique_ids that dbt skipped specifically due to upstream-node failure (identified by SkippingDetails/LogSkipBecauseError events, which fire only via do_skip(cause=...)) and rewrites their later "skipped" terminal events to "failed" so Airflow retries them and the existing fallback path runs them locally.
Changes:
- Add
_rewrite_upstream_failure_skip_statusto the watcher log parser and thread a new per-execution_upstream_failure_skipped_idsaccumulator set throughDbtProducerWatcherOperatorandstore_dbt_resource_status_from_log. - Add a new dbt project (
watcher_upstream_failure_recovery/), a standalone repro DAG, and an integration test assertingmodel_downstream_runendssuccesswithtry_number > 1; refactor the existing fail-once-sequence reset into a shared pytest fixture. - Cherry-picked from #2682: forward extra args to
test-integration(pyproject.toml,scripts/test/integration.sh, contributor docs).
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| cosmos/operators/_watcher/base.py | New helper _rewrite_upstream_failure_skip_status and rewrite hook inside store_dbt_resource_status_from_log for upstream-failure skips. |
| cosmos/operators/_watcher/state.py | New DBT_UPSTREAM_FAILURE_SKIP_EVENT_NAMES set and is_dbt_upstream_failure_skip_event predicate. |
| cosmos/operators/watcher.py | Adds and clears the _upstream_failure_skipped_ids set on the producer operator and forwards it to the parser via _make_parse_callable. |
| tests/operators/test_watcher.py | New reset_fail_once_sequence fixture (refactored from inline psycopg2 setup) and new integration test test_dbt_task_group_watcher_retry_recovers_skipped_downstream. |
| dev/dags/dbt/watcher_upstream_failure_recovery/* | New dbt project with model_a, model_flaky (fail-once pre_hook on a project-specific sequence), and model_downstream. |
| dev/failed_dags/example_watcher_recovers_skipped_downstream.py | Standalone repro DAG for manual verification, mirroring the existing example_watcher_downstream_not_skipped.py pattern. |
| pyproject.toml, scripts/test/integration.sh, docs/policy/contributing.rst | Forward extra args from hatch run … test-integration to pytest; document the workflow. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2684 +/- ##
==========================================
+ Coverage 98.04% 98.08% +0.04%
==========================================
Files 105 105
Lines 7864 7882 +18
==========================================
+ Hits 7710 7731 +21
+ Misses 154 151 -3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
pankajkoti
left a comment
There was a problem hiding this comment.
I was able to reproduce and validate that the fix solves the case with a DAG run with astro cli (I had only one downstream task for a flaky model, though, I didn't test further nested downstream tasks). Nice find on narrowing the event types for skip statuses tied to upstream failures.
Have some clarifying questions and a suggestion inline,
| """Track upstream-failure-skipped nodes and rewrite their status from "skipped" to "failed". | ||
|
|
||
| dbt emits two events for a node that it skipped because of an upstream | ||
| failure: ``SkippingDetails``/``LogSkipBecauseError`` during ``on_skip()``, |
There was a problem hiding this comment.
Are these public dbt contracts? If internal, would it be possible to do a quick sanity check that these are consistent across the dbt versions we support? I guess for future releases we might be able to catch if these change when we test against newly released versions, but it would be nice to check for earlier versions once.
There was a problem hiding this comment.
There was a problem hiding this comment.
Thanks @tatiana. I see the PR touching on the --log-format key. What I meant here was that the docstring talks about upstream failures: SkippingDetails / LogSkipBecauseError and whether they are the same for the dbt versions we support.
Replaces every reference to the private Linear identifier ``BOSS-401`` with the public GitHub issue ``#2698`` across cosmos source, the watcher upstream-failure-recovery dbt project, the standalone repro DAG, and the integration test. Resolves Pankaj's review comment about avoiding private issue links in OSS source. Adds three unit-test cases in ``TestStoreDbtStatusFromLog`` exercising the new ``_rewrite_upstream_failure_skip_status`` helper without the integration env, so the BOSS-401 invariant is regression-caught on every CI run rather than only on the integration matrix (Copilot's review comment). Coverage: - A ``SkippingDetails`` event followed by a ``NodeFinished`` (and the reverse arrival order) for the same unique_id results in the per-node status XCom being pushed as ``"failed"`` rather than ``"skipped"``, parametrized for both ``SkippingDetails`` and ``LogSkipBecauseError``. - A plain ``NodeFinished`` ``"skipped"`` event for an unrelated unique_id is unchanged. - A ``SkippingDetails`` event with no accumulator argument is unchanged (verifies the helper's None-handling). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
dbt skips every transitively-downstream node when an upstream fails, emitting a ``SkippingDetails`` event for each one. The fix in ``store_dbt_resource_status_from_log`` rewrites the per-node status keyed on ``unique_id``, so it handles arbitrary chain depth -- but the existing integration test only had one downstream level (``model_flaky -> model_downstream``) and therefore did not exercise that property. Add ``model_downstream_2`` to the dbt project, depending on ``model_downstream``, and parametrize the integration test over both downstream task ids so the regression is caught for every node along the chain. Pankaj asked for explicit nested-downstream coverage in the #2684 review. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
While documenting the minimum-dbt-version constraint for watcher mode (``ExecutionMode.WATCHER`` requires dbt-core 1.8+), the watcher regression test from #2684 was rerun against AF 3.2 with each supported dbt version and the resulting environment captured via ``pip freeze``. The four snapshots in ``requirements/`` (1.8.9, 1.9.10, 1.10.21, plus the existing 1.11.7) are reference-only: ``pre-install-airflow.sh`` still pins ``requirements-airflow-3.2-dbt-1.11.txt`` regardless of the matrix ``dbt`` value, so creating an env like ``tests.py3.10-3.2-1.8`` will still install dbt 1.11. The new files document a known-good dependency set that reviewers and CI can use to reproduce the manual validation; wiring them into the matrix would be a separate change. Each snapshot was produced after overriding dbt-core in a freshly created hatch env: :: hatch run tests.py3.10-3.2-1.8:pip install --force-reinstall \\ 'dbt-core~=1.8.0' 'dbt-postgres~=1.8.0' 'dbt-duckdb~=1.8.0' hatch run tests.py3.10-3.2-1.8:pip freeze \\ > requirements/requirements-airflow-3.2-dbt-1.8.txt then verifying the watcher regression test passes: :: AIRFLOW_HOME=$(pwd)/dev \\ hatch run tests.py3.10-3.2-1.8:test-integration \\ --ignore=tests/test_example_dags.py \\ --ignore=tests/test_example_dags_no_connections.py \\ -k test_dbt_task_group_watcher_retry_recovers_skipped_downstream dbt 1.5 / 1.6 / 1.7 are not included: the watcher's ``dbt build --log-format json …`` command shape is rejected by their CLI parsers, so no valid requirements set produces a passing test on those versions (see the docs note in this same change). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The K8s watcher path called ``store_dbt_resource_status_from_log`` from its static progress callback without an ``upstream_failure_skipped_ids`` accumulator, so the upstream-failure ``"skipped"`` → ``"failed"`` rewrite introduced for the local watcher in #2684 silently did nothing on ``ExecutionMode.WATCHER_KUBERNETES`` — the same false-green symptom from issue #2698 could still surface there on retry. Mirror the per-operator accumulator from ``DbtProducerWatcherOperator``: hold a ``set[str]`` on ``DbtProducerWatcherKubernetesOperator``, clear it in ``execute``, and expose it to the static ``WatcherKubernetesCallback.progress_callback`` via the same module-level-global pattern already used for the task context. The callback now forwards the accumulator to the shared log parser, so the fix applies uniformly to both watcher backends. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@pankajkoti added #2658 and #2659 to the Cosmos 1.14.2 milestone after the previous CHANGELOG refresh. Cherry-pick both onto the release branch and add their entries under Bug Fixes: * #2658 -- Fix test sensor retry behaviour in ``ExecutionMode.WATCHER`` * #2659 -- Fix watcher fallback selector for versioned dbt models Both PRs auto-merged cleanly on top of the existing watcher fixes (#2683 XCom-key sanitization, #2684 BOSS-401 rewrite, #2673 helper extraction). 1495 watcher-suite tests pass locally after the picks. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Four watcher-mode changes in this release adjust observable behaviour without breaking the public API. Flagging them under a new 'Behaviour Changes' heading (intentionally not 'Breaking Changes' since this is a patch release) so users relying on undocumented internals can review before upgrading: - #2615: producer-done gateway task is now downstream of every consumer in DbtTaskGroup when depends_on_past=True. Default (depends_on_past=False) topology unchanged. - #2684: downstream models skipped after upstream-failure now retry when the upstream task recovers (previously remained skipped for the run). - #2713: consumer retry no longer inherits the producer's --log-format json flag. Retry logs default to text format; JSON opt-in via dbt_cmd_flags. - #2629/#2683: XCom backup Variable key scheme changed (now includes task-group path and sanitises run-id). Monitoring scripts matching the old pattern need updating.
|
🚀 Released in Cosmos 1.14.2 (PyPI). |
## CHANGELOG entry 1.14.2 (2026-05-21) ------------------- Behaviour Changes These changes adjust observable behaviour of the ``ExecutionMode.WATCHER`` execution mode. None of them breaks the public Cosmos API, but users relying on undocumented internals (graph wiring assertions, XCom backup Variable names, retry-on-recovery semantics, or retry log format) should review before upgrading. * ``ExecutionMode.WATCHER`` + ``depends_on_past=True``: when the producer task has ``depends_on_past=True`` (typically set via ``default_args``), the producer-done gateway task inside ``DbtTaskGroup`` is now wired downstream of every consumer task, in addition to the producer. This is required so that ``wait_for_downstream`` gating behaves correctly across DAG runs and the task group acts as a single unit that must fully succeed before the next run starts. Users with ``depends_on_past=False`` (the default) see no topology change. See #2615. * ``ExecutionMode.WATCHER`` downstream retry on upstream recovery: dbt models that were skipped after an upstream-failure event are now retried in the same DAG run when the upstream task succeeds on retry. Previously these models remained skipped for the run. See #2684. * ``ExecutionMode.WATCHER`` consumer-retry log format: the consumer's fallback ``dbt`` invocation no longer inherits the producer's internal ``--log-format json`` flag, so retry task logs now default to dbt's normal text format. Users who relied on JSON output in retry logs can opt in via ``operator_args={"dbt_cmd_flags": ["--log-format", "json"]}``. See #2713. * ``ExecutionMode.WATCHER`` XCom-backup Variable key scheme: the per-model XCom backup Variable key now includes the full task-group path and sanitises disallowed characters (``+`` / ``:``) from ``run_id``. External monitoring or cleanup scripts that match the old key pattern will need updating. See #2629 and #2683. Bug Fixes * Sanitize disallowed characters from XCom backup variable key by @MichaelRBlack in #2629 * Prevent watcher producers from colliding on one XCom-backup key by @tatiana in #2683 * Retry watcher downstream models on upstream-failure recovery by @tatiana in #2684 * Fix ``ExecutionMode.WATCHER`` interaction with ``depends_on_past`` by @johnhoran in #2615 * Strip ``--log-format`` from producer flags on watcher consumer retry by @tatiana in #2713 * Fix duplicate ``deferrable`` kwarg in ``DbtRunAirflowAsyncBigqueryOperator`` by @pankajastro in #2616 * Fix dbt docs iframe ``src`` missing deployment path prefix by @pankajastro in #2640 * Defer ``TaskInstance`` import in cluster policy to fix Sentry init crash by @pankajastro in #2662 * Restore type hints broken by lazy imports in ``cosmos/__init__.py`` by @pankajastro in #2647 * Fix ``ExecutionMode.WATCHER`` non-dbt stdout being suppressed from logs by @pankajastro in #2654 * Fix test sensor retry behaviour in ``ExecutionMode.WATCHER`` by @pankajkoti in #2658 * Fix watcher fallback selector for versioned dbt models by @pankajkoti in #2659 * Break out of iframe from Airflow 2 dbt Docs 404 link by @pankajastro in #2685 Docs * Document source freshness aware execution for ``ExecutionMode.WATCHER`` by @pankajastro in #2617 * Add reference docs for ``DbtRunLocalOperator``, ``DbtTestLocalOperator``, ``DbtSnapshotLocalOperator`` and ``DbtBuildLocalOperator`` by @pankajastro in #2643 * Add watcher retry behaviour history documentation by @tatiana in #2600 * Add Apache Airflow® trademark on first prominent mention by @pankajkoti in #2624 * Sentence-case section headings by @pankajkoti in #2630 * Use ``-`` for bullet points by @pankajkoti in #2631 * Drop decorative separator lines by @pankajkoti in #2632 * Normalize heading underlines in ``docs/guides/`` and ``docs/index.rst`` by @pankajkoti in #2664 * Fix broken cross-directory doc links by @pankajastro in #2694 * Fix broken external links in hand-written docs by @pankajastro in #2696 * Document support for Airflow 3.2 in the compatibility policy by @pankajastro in #2652 * Refresh the dbt/Airflow conflicts table to match the compatibility policy by @pankajastro in #2653 * Document incremental model limitation for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajastro in #2642 Others * Import ``ParamValidationError`` from ``airflow.sdk`` to silence deprecation warning by @pankajastro in #2645 * Import ``DAG`` from ``airflow.sdk`` to silence deprecation warning by @pankajastro in #2644 * Enforce docs style guide via pre-commit hook by @pankajkoti and @tatiana in #2633 * Add Airflow 3.2 to the test matrix in ``CLAUDE.md`` by @pankajastro in #2646 * Document the lazy-logging standard in ``CLAUDE.md`` by @pankajastro in #2679 * Extract watcher XCom-key helpers and inline single-use bindings by @pankajastro in #2673 * Remove leftover ``scripts/airflow3`` directory by @pankajastro in #2661 * Fix ``altered_jaffle_shop`` seed-dep CTE references by @pankajastro in #2690 * Skip Airflow 3.0 integration test stuck on ``example_watcher_with_freshness`` by @pankajastro in #2692 * Fix typo "constrantis" → "constraints" in tests env comment by @pankajastro in #2669 ## Summary Drafts the **Cosmos 1.14.2** release. Latest alpha cut is **1.14.2a4** — refreshed from `1.14.2a3` after maintainers (@pankajastro, @pankajkoti) added 11 more PRs to the milestone: - @pankajastro: #2646, #2652, #2653, #2661, #2669, #2673, #2679, #2690, #2692 - @pankajkoti: #2658, #2659 All 11 picks applied cleanly on top of the existing branch — no additional manual conflict resolution needed. **Excluded:** #2618 ("Improve glossary") — modifies `docs/reference/glossary.rst`, which doesn't exist on `release-1.14` (added on main by #2461, never backported). Deferred to 1.15.0. 33 PRs cherry-picked total (22 in the initial a3 cut + 11 in this a4 refresh); two PRs (#2575, #2618) deliberately held back as 1.15.0 content. ## Milestone [Cosmos 1.14.2](https://github.com/astronomer/astronomer-cosmos/milestone/48) — 33 merged PRs across Bug Fixes, Docs, and Others. ## Inclusion provenance | Path | PRs | Notes | |---|---|---| | **Originally in milestone — a3 cut (12)** | #2629, #2616, #2640, #2662, #2654, #2683, #2684, #2615, #2694, #2696, #2645, #2644 | Assigned by maintainers before the initial release-draft run | | **Pulled in via closed-issue link — a3 cut (1)** | #2647 | Closes milestone issue #2634 ("Typehinting broken with lazy imports") but the PR itself was never assigned to the milestone — included via `closedByPullRequestsReferences` | | **Added during cherry-pick conflict resolution — a3 cut (9)** | #2631, #2624, #2630, #2632, #2664, #2633, #2617, #2600, #2643 | Docs PRs whose absence caused `release-1.14` ↔ `main` textual drift. #2631 caused #2696's conflict; the rest were transitive dependencies (especially #2664, on top of the bullet/heading/trademark sweeps). #2643 was added to unblock #2664's operator-docs conflict | | **Added to milestone after a3 — included in a4 (11)** | #2646, #2652, #2653, #2658, #2659, #2661, #2669, #2673, #2679, #2690, #2692 | Added by @pankajastro and @pankajkoti after the initial draft. All applied cleanly on top of the a3 cherry-picks | | **Deliberately excluded (2)** | #2575, #2618 | #2575: documents `DbtDocsS3KubernetesOperator` with `.. versionadded:: 1.15.0` (already in the `Cosmos 1.15.0` milestone). #2618: improves a glossary file that doesn't exist on `release-1.14` (the stub was added by #2461, not backported) | ### Manual conflict resolution Cherry-picks that needed manual fix-up. Reviewers should double-check the files listed below: | PR | File(s) | Resolution | |---|---|---| | **#2664** | `docs/guides/dbt_docs/generating-docs.rst` | **Substantive exclusion** — manually removed the entire `Upload to S3 from Kubernetes` section (lines ~46–77 of the incoming diff) that documents `DbtDocsS3KubernetesOperator` (1.15.0 feature, PR #2575). Kept HEAD (no S3-from-Kubernetes section). | | **#2664** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | Took incoming side for "Example 1" / "Example 2" heading underlines (`++++` style — matches #2664's normalization across the rest of the file). Also took incoming for em-dash → colon ("Example 1 —" → "Example 1:"). | | **#2664** | `docs/guides/run_dbt/operators/operators.rst` | Auto-resolved once #2643 was cherry-picked first (added missing Run/Test/Snapshot/Build operator reference docs that #2664 expected to be present). | | **#2633** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | Initially took `'''` (Example 1/2 underlines) per #2633's incoming side — this broke the file's heading hierarchy and prevented sphinx from registering the `_watcher-source-freshness:` label, causing the docs build to fail. **Fixed in a follow-up commit** by reverting to `+++` to match main and the rest of the file's level-3 sections. | | **#2617** | `docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | **Substantive trim** — #2617 documented both the 1.14.0 source-freshness execution path AND the 1.15.0 `freshness_callback` override (which ships with #2586, not in this release line). Removed the `literalinclude` of `dev/dags/watcher_with_freshness_check.py` (1.15.0 example DAG, missing on release-1.14) and the surrounding override section so the 1.14.2 docs cover only what the 1.14.x line supports. Surfaced as a `-W` (warnings-as-errors) docs build failure on the first CI run. | | **#2684** | `cosmos/operators/_watcher/state.py` | Took incoming side — added two new frozensets (`DBT_UPSTREAM_FAILURE_SKIP_EVENT_NAMES`, `DBT_SOURCE_FRESHNESS_STALE_STATUSES`) at lines 28–37. HEAD had nothing; incoming had the additions. | | **#2615** | `tests/airflow/test_graph.py` | Took incoming side — added two new tests at the end of the file: `test_add_watcher_producer_task_passes_freshness_callback_via_setup_operator_args` and `test_watcher_dependency_wiring`. HEAD had nothing; incoming had the additions. | All a4 cherry-picks (11) applied without manual intervention. ## Test plan > Long-term goal: automate. For now, please pick the slice relevant to your environment and report deviations as a comment on this PR. ### Watcher mode (Postgres) - [x] @pankajkoti `example_watcher` (`dev/dags/example_watcher.py`) — default watcher run; exercises **#2629** (XCom backup key sanitization triggers on the `+` in any default Airflow run_id) - [x] (@tatiana) **[NEW]** `example_watcher_xcom_collision` (`dev/failed_dags/example_watcher_xcom_collision.py`) — validates **#2683** - [x] @tatiana **[NEW]** `example_watcher_recovers_skipped_downstream` (`dev/failed_dags/example_watcher_recovers_skipped_downstream.py`) — validates **#2684** - [x] @tatiana `example_watcher` with `default_args={"depends_on_past": True}` and ≥2 consecutive runs — validates **#2615** (manual edit; no dedicated example DAG) - [x] @pankajkoti watcher DAG with at least one model that has tests + Airflow retries enabled on the test sensor — validates **#2658** (test sensor retry path) - [x] @pankajkoti watcher DAG referencing a versioned dbt model (e.g. `models/foo_v2.sql`) and triggering the fallback selector path — validates **#2659** ### BigQuery (async) - [x] @tatiana `simple_dag_async` (`dev/dags/simple_dag_async.py`) — validates **#2616** (duplicate `deferrable` kwarg fix) ### dbt docs plugin - [x] (@pankajkoti) `docs_dag` (`dev/dags/dbt_docs.py`) + open the Cosmos dbt docs URL in the Airflow UI under a non-root deployment path — validates **#2640** (iframe `src` deployment prefix) ### Cross-cutting scenarios (no dedicated DAG) - [x] @pankajastro **#2662** — boot Airflow with Cosmos cluster policy + Sentry init; verify no `TaskInstance`-import crash on startup - [x] @pankajastro **#2654** — run a watcher DAG that prints non-dbt stdout (e.g., Snowflake `externalbrowser` auth URL); confirm output reaches task logs - [x] (@pankajkoti) **#2647** — `mypy` / IDE inspection of `from cosmos import DbtDag, ProjectConfig, ProfileConfig, RenderConfig, ExecutionConfig` resolves attributes - [x] @pankajastro **#2645, #2644** — boot Airflow 3 with `airflow.sdk` available; no `ParamValidationError` / `DAG` deprecation warnings from Cosmos imports - [x] @pankajastro **#2673** — verify watcher XCom-key extraction left no behavioural drift (run the existing watcher integration suite end-to-end on Postgres + Airflow 2.10) ### Docs build (covers all docs PRs) - [x] (@pankajkoti) `sphinx-build -W -b html docs docs/_build` succeeds with no warnings (#2617, #2624, #2630, #2631, #2632, #2643, #2600, #2664, #2694, #2696, #2652, #2653) ### Tooling - [x] (@pankajkoti) `pre-commit run check-docs-style --all-files` passes (#2633) ## Reviewer checklist - [x] CHANGELOG section assignments reviewed - [x] Entry wording reviewed - [x] `cosmos/__init__.py` bumped to `1.14.2a4` - [x] Cosmetic docs PRs (#2624, #2630, #2631, #2632, #2633, #2664) confirmed acceptable in patch line - [x] **Manual conflict resolutions** (table above) reviewed file-by-file - [x] Test plan executed on at least Postgres + one warehouse - [x] Ready to mark non-draft --------- Co-authored-by: Michael Black <4128408+MichaelRBlack@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com> Co-authored-by: John Horan <jhoran@zendesk.com> Co-authored-by: Copilot <copilot@github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Two consistency / accuracy fixes pointed out in review: 1. "Does the Airflow state match dbt's?" — 1.14.1 row no longer contradicts the new 1.14.2 false-green note. It was previously a bare "Yes", but #2684 (added in 1.14.2) closes a real false-green gap that existed in 1.14.1: upstream-failure-skipped downstream nodes were silently marked SKIPPED while the DAG completed success. The 1.14.1 row now reads "Mostly, except upstream- failure-skipped downstream nodes could produce a 'false green' -- fixed in 1.14.2 by #2684", with the explicit forward pointer. 2. "Avoid duplicate or concurrent runs" — 1.14.2 row corrected to describe the actual topology landed by #2615. Previously read "the producer-done gateway is wired downstream of every consumer task and the task-group leaves carry wait_for_downstream=True", which overstates both halves: only leaf consumer tasks (those without downstream tasks) get wired upstream of the gateway (see cosmos/airflow/graph.py:795-796), and wait_for_downstream=True is set on the producer plus all watcher consumer tasks, not just task group leaves (see graph.py:777, 793). Reworded to: "leaf consumer tasks are wired upstream of the producer-done gateway, and the producer plus watcher tasks carry wait_for_downstream=True." Verified: hatch run docs:sphinx-build -W -b html docs docs/_build/html succeeds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Closes #2698
Closes BOSS-401 (Customer issue).
The problem
When a watcher producer's first
dbt buildattempt fails, dbt marks the failing model and every transitive downstream node withnode_status="skipped". The producer log parser pushed those"skipped"statuses straight to XCom, so each downstream consumer sensor raisedAirflowSkipExceptionand ended in SKIPPED. Airflow does not retry SKIPPED tasks — so even after the failing upstream recovered via its own consumer-retry fallback, the downstream model was never re-run.The DAG completed
success(Airflow treats SKIPPED as non-failure) with the downstream tables silently un-materialized. Luis Carbonell (Prodege) reported the "false green" symptom onastronomer-cosmos==1.14.1andAirflow 3.2.1+astro.2.Reproduction setup:
model_a→ succeeds.model_flaky→ fails on first attempt via a Postgres sequence pre-hook, succeeds on subsequent attempts.model_downstream→ depends onmodel_flaky.Without the fix, on retry:
model_flakyrecovers via the consumer's_fallback_to_non_watcher_run(existing retry path for FAILED consumers).model_downstreamstays SKIPPED — Airflow never retries it.success.The fix
store_dbt_resource_status_from_logincosmos/operators/_watcher/base.pynow tracks unique_ids that dbt emitsSkippingDetails(non-ephemeral upstream) orLogSkipBecauseError(ephemeral upstream) events for. Later"skipped"terminal events for those unique_ids are rewritten to"failed"so the downstream consumer fails on attempt 1, Airflow retries it, and the existing_fallback_to_non_watcher_runpath runs the model locally once its upstream has recovered.Why these two events are the right discriminator (traced through dbt source on
dbt-core==1.11):SkippingDetailsandLogSkipBecauseErrorare fired only fromBaseRunner.on_skip()(dbt/task/base.py:420-471).on_skip()is reached only whenself.skip = True.self.skip = Trueis set only insidedo_skip(cause=...)(dbt/task/base.py:474-476) — verified bygrep -rn '\.skip = True'returning exactly one site.do_skip(cause=cause)is called only fromrunnable.py:358-360whenrunner.node.unique_id in self._skipped_children— i.e. exclusively on upstream-node failure.Empty, ephemeral, and selector-excluded skips don't go through
do_skip()and don't fire these events, so they keep their existing"skipped"handling.dbt also emits a later
NodeFinishedwithnode_status="skipped"for the same node from the runner'sfinallyblock (dbt/task/runnable.py:266), which would otherwise overwrite the rewritten XCom back to"skipped". The per-execution accumulator set ensures both events are rewritten consistently regardless of arrival order.The fix lives in the shared log parser, so it applies uniformly to both
InvocationMode.DBT_RUNNER(protobuf events serialised viaMessageToJson) andInvocationMode.SUBPROCESS(--log-format jsonstdout lines).Behaviour after this PR
How we prevent this from regressing
test_dbt_task_group_watcher_retry_recovers_skipped_downstreamintests/operators/test_watcher.pyruns the new DAG viadag.test()and asserts:model_downstream_run.state == "success"(wasskippedbefore the fix).model_downstream_run.try_number > 1— guards against a future refactor that happens to leave the task insuccessfor the wrong reason.dev/dags/dbt/watcher_upstream_failure_recovery/— kept separate fromwatcher_downstream_not_skippedso the two test scenarios don't conflate. Uses a distinct sequence name_cosmos_recovery_fail_once_seqso parallel CI runs can't share state.dev/failed_dags/example_watcher_recovers_skipped_downstream.pyfor visual verification in Airflow standalone.reset_fail_once_sequencepytest fixture; both the new test and the existingtest_dbt_task_group_watcher_gateway_prevents_downstream_skipnow use it.Test plan
tests.py3.10-3.2-1.11:test-integration, 109.65s call duration).test_dbt_task_group_watcher_gateway_prevents_downstream_skipstill passes after the fixture refactor.example_watcher_recovers_skipped_downstreamin Airflow standalone: confirmedmodel_downstream_runlands insuccess(wasskipped).🤖 Generated with Claude Code