Skip to content

Fix: watcher consumer retry logging dbt with --log-format json#2713

Merged
tatiana merged 2 commits into
mainfrom
fix/watcher-retry-log-format
May 21, 2026
Merged

Fix: watcher consumer retry logging dbt with --log-format json#2713
tatiana merged 2 commits into
mainfrom
fix/watcher-retry-log-format

Conversation

@tatiana
Copy link
Copy Markdown
Collaborator

@tatiana tatiana commented May 20, 2026

When a watcher consumer task retries via _fallback_to_non_watcher_run, it copies the producer's add_cmd_flags() output into its own dbt invocation. The producer hardcodes log_format="json" so it can parse dbt's structured event stream — but that internal choice was leaking into the consumer's user-facing retry dbt command, producing raw JSON (one structured event per line) instead of dbt's default formatted text in task logs.

Example of what users currently see on a retry task like watcher_downstream_not_skipped.model_downstream_run:

[2026-05-20 21:45:11] INFO - {"data": {"log_version": 3, "version": "=1.11.8"}, "info": {"category": "", "code": "A001", ...}}
[2026-05-20 21:45:11] INFO - {"data": {"adapter_name": "***", ...}, "info": {"category": "", "code": "E034", ...}}

Fix

Add --log-format to _filter_flags's skip list. The producer's hardcoded --log-format json is no longer propagated into the consumer's fallback dbt command, so retries default to dbt's normal text output.

How users can still get JSON

Users who want JSON-formatted dbt output on retry can opt in via:

DbtDag(
    ...,
    operator_args={"dbt_cmd_flags": ["--log-format", "json"]},
)

dbt_cmd_flags is appended to the dbt command by build_cmd outside of _filter_flags's pipeline, so this path keeps working after the fix.

Test plan

Automated

  • tests/operators/test_watcher.py updated:
    • test_filter_flags — extended to assert --log-format <value> is filtered alongside --select / --exclude
    • test_filter_flags_strips_log_format_when_first (new) — covers the edge case where --log-format is the first flag in the input list
    • test_fallback_strips_producer_log_format_by_default (new) — end-to-end verifies the consumer's retry command has no --log-format
    • test_fallback_preserves_user_dbt_cmd_flags_opt_in_to_json (new) — documents the opt-in path via operator_args["dbt_cmd_flags"]
  • All 12 affected tests pass on tests.py3.12-2.11-1.11.

Manual validation DAGs

Two example DAGs in dev/failed_dags/ exercise both sides of the change. Both intentionally fail once on model_retry and retry via the watcher fallback path — inspect the retry task's logs to verify behaviour. Do not run them concurrently (they share public._cosmos_fail_once_seq).

  • dev/failed_dags/example_watcher_downstream_not_skipped.py — default config (no dbt_cmd_flags). On the watcher_downstream_not_skipped.model_retry_run retry, log lines should be dbt's normal text format (e.g. INFO [MainThread]: Running with dbt=1.11.8). Before this fix, those lines came through as raw JSON; after the fix, they should read normally.
  • dev/failed_dags/example_watcher_downstream_not_skipped_json_logs.py (new in this PR) — adds operator_args={"dbt_cmd_flags": ["--log-format", "json"]}. On the watcher_downstream_not_skipped_json_logs.model_retry_run retry, log lines should be structured JSON objects ({"data": ..., "info": ...}). Confirms the user opt-in still works after the fix.

How this was discovered

Reported while validating example_watcher_downstream_not_skipped's model_downstream_run retry on the 1.14.2a3 release candidate.

Backport

Targeting main; needs a backport to release-1.14 so it ships in 1.14.2. Add Cosmos 1.14.2 milestone after merge.

When a watcher consumer task retries via _fallback_to_non_watcher_run,
it copies the producer's add_cmd_flags() output into its own dbt
invocation. The producer hardcodes log_format="json" so it can parse
dbt's structured event stream, but that internal choice was leaking
into the consumer's user-facing retry dbt command, producing raw JSON
in task logs (one structured event per line) instead of dbt's
default formatted text.

Add --log-format to the _filter_flags skip list so the producer's
internal log_format choice does not propagate. Users who want JSON
output on retry can still opt in via
operator_args={"dbt_cmd_flags": ["--log-format", "json"]} —
dbt_cmd_flags is appended by build_cmd outside of this flag pipeline.

Reported when validating example_watcher_downstream_not_skipped's
model_downstream_run retry on the 1.14.2a3 release candidate.
Copilot AI review requested due to automatic review settings May 20, 2026 21:31
@tatiana tatiana requested review from a team, corsettigyg, dwreeves and jbandoro as code owners May 20, 2026 21:31
@tatiana tatiana requested review from pankajastro and pankajkoti May 20, 2026 21:31
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR prevents the watcher producer’s internal --log-format json setting (needed for structured event parsing) from leaking into the watcher consumer’s fallback retry dbt invocation, so retry task logs default back to dbt’s normal human-readable text output.

Changes:

  • Extend the watcher consumer’s retry flag filtering to drop --log-format <value> alongside --select/--exclude.
  • Clarify docstrings around why --log-format is filtered and how users can still opt into JSON via operator_args["dbt_cmd_flags"].
  • Add/extend tests covering both the filtering behavior and the end-to-end fallback command construction.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
cosmos/operators/_watcher/base.py Adds --log-format to the _filter_flags skip list and updates retry/filter documentation.
tests/operators/test_watcher.py Adds new regression tests to ensure producer --log-format json is stripped on fallback and user opt-in via dbt_cmd_flags remains intact.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Mirrors example_watcher_downstream_not_skipped but adds
operator_args={"dbt_cmd_flags": ["--log-format", "json"]} so the
consumer's retry dbt command emits JSON-formatted logs. Reuses the
same watcher_downstream_not_skipped dbt project and the same fail-once
sequence, so the two DAGs must not run concurrently.
@tatiana tatiana mentioned this pull request May 20, 2026
22 tasks
@tatiana tatiana changed the title Strip --log-format from producer flags on watcher consumer retry Strip --log-format from producer flags on watcher consumer retry May 20, 2026
@tatiana tatiana changed the title Strip --log-format from producer flags on watcher consumer retry Fix: watcher consumer retry logging dbt with --log-format json May 20, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented May 20, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 98.04%. Comparing base (f8087c5) to head (595e87a).

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2713   +/-   ##
=======================================
  Coverage   98.04%   98.04%           
=======================================
  Files         105      105           
  Lines        7892     7892           
=======================================
  Hits         7738     7738           
  Misses        154      154           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@pankajkoti pankajkoti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I guess we need to associate the milestone 1.14.2 for this PR?

Comment thread cosmos/operators/_watcher/base.py
@tatiana tatiana added this to the Cosmos 1.14.2 milestone May 21, 2026
@tatiana tatiana merged commit c7951f1 into main May 21, 2026
240 of 241 checks passed
@tatiana tatiana deleted the fix/watcher-retry-log-format branch May 21, 2026 09:25
tatiana added a commit that referenced this pull request May 21, 2026
Strip --log-format from producer flags on watcher consumer retry —
prevents the producer's internal JSON log format (used for
structured event parsing) from leaking into the user-facing retry
dbt command. Users can still opt into JSON via
operator_args={'dbt_cmd_flags': ['--log-format', 'json']}.
tatiana added a commit that referenced this pull request May 21, 2026
Four watcher-mode changes in this release adjust observable
behaviour without breaking the public API. Flagging them under a
new 'Behaviour Changes' heading (intentionally not 'Breaking
Changes' since this is a patch release) so users relying on
undocumented internals can review before upgrading:

- #2615: producer-done gateway task is now downstream of every
  consumer in DbtTaskGroup when depends_on_past=True. Default
  (depends_on_past=False) topology unchanged.
- #2684: downstream models skipped after upstream-failure now
  retry when the upstream task recovers (previously remained
  skipped for the run).
- #2713: consumer retry no longer inherits the producer's
  --log-format json flag. Retry logs default to text format;
  JSON opt-in via dbt_cmd_flags.
- #2629/#2683: XCom backup Variable key scheme changed (now
  includes task-group path and sanitises run-id). Monitoring
  scripts matching the old pattern need updating.
@tatiana
Copy link
Copy Markdown
Collaborator Author

tatiana commented May 21, 2026

🚀 Released in Cosmos 1.14.2 (PyPI).

tatiana added a commit that referenced this pull request May 21, 2026
## CHANGELOG entry

1.14.2 (2026-05-21)
-------------------

Behaviour Changes

These changes adjust observable behaviour of the
``ExecutionMode.WATCHER`` execution mode.
None of them breaks the public Cosmos API, but users relying on
undocumented internals
(graph wiring assertions, XCom backup Variable names, retry-on-recovery
semantics, or
retry log format) should review before upgrading.

* ``ExecutionMode.WATCHER`` + ``depends_on_past=True``: when the
producer task has
``depends_on_past=True`` (typically set via ``default_args``), the
producer-done gateway
task inside ``DbtTaskGroup`` is now wired downstream of every consumer
task, in addition
to the producer. This is required so that ``wait_for_downstream`` gating
behaves
correctly across DAG runs and the task group acts as a single unit that
must fully
succeed before the next run starts. Users with ``depends_on_past=False``
(the default)
  see no topology change. See #2615.
* ``ExecutionMode.WATCHER`` downstream retry on upstream recovery: dbt
models that were
skipped after an upstream-failure event are now retried in the same DAG
run when the
upstream task succeeds on retry. Previously these models remained
skipped for the run.
  See #2684.
* ``ExecutionMode.WATCHER`` consumer-retry log format: the consumer's
fallback ``dbt``
invocation no longer inherits the producer's internal ``--log-format
json`` flag, so
retry task logs now default to dbt's normal text format. Users who
relied on JSON output
in retry logs can opt in via ``operator_args={"dbt_cmd_flags":
["--log-format", "json"]}``.
  See #2713.
* ``ExecutionMode.WATCHER`` XCom-backup Variable key scheme: the
per-model XCom backup
Variable key now includes the full task-group path and sanitises
disallowed characters
(``+`` / ``:``) from ``run_id``. External monitoring or cleanup scripts
that match the
  old key pattern will need updating. See #2629 and #2683.

Bug Fixes

* Sanitize disallowed characters from XCom backup variable key by
@MichaelRBlack in #2629
* Prevent watcher producers from colliding on one XCom-backup key by
@tatiana in #2683
* Retry watcher downstream models on upstream-failure recovery by
@tatiana in #2684
* Fix ``ExecutionMode.WATCHER`` interaction with ``depends_on_past`` by
@johnhoran in #2615
* Strip ``--log-format`` from producer flags on watcher consumer retry
by @tatiana in #2713
* Fix duplicate ``deferrable`` kwarg in
``DbtRunAirflowAsyncBigqueryOperator`` by @pankajastro in #2616
* Fix dbt docs iframe ``src`` missing deployment path prefix by
@pankajastro in #2640
* Defer ``TaskInstance`` import in cluster policy to fix Sentry init
crash by @pankajastro in #2662
* Restore type hints broken by lazy imports in ``cosmos/__init__.py`` by
@pankajastro in #2647
* Fix ``ExecutionMode.WATCHER`` non-dbt stdout being suppressed from
logs by @pankajastro in #2654
* Fix test sensor retry behaviour in ``ExecutionMode.WATCHER`` by
@pankajkoti in #2658
* Fix watcher fallback selector for versioned dbt models by @pankajkoti
in #2659
* Break out of iframe from Airflow 2 dbt Docs 404 link by @pankajastro
in #2685

Docs

* Document source freshness aware execution for
``ExecutionMode.WATCHER`` by @pankajastro in #2617
* Add reference docs for ``DbtRunLocalOperator``,
``DbtTestLocalOperator``, ``DbtSnapshotLocalOperator`` and
``DbtBuildLocalOperator`` by @pankajastro in #2643
* Add watcher retry behaviour history documentation by @tatiana in #2600
* Add Apache Airflow® trademark on first prominent mention by
@pankajkoti in #2624
* Sentence-case section headings by @pankajkoti in #2630
* Use ``-`` for bullet points by @pankajkoti in #2631
* Drop decorative separator lines by @pankajkoti in #2632
* Normalize heading underlines in ``docs/guides/`` and
``docs/index.rst`` by @pankajkoti in #2664
* Fix broken cross-directory doc links by @pankajastro in #2694
* Fix broken external links in hand-written docs by @pankajastro in
#2696
* Document support for Airflow 3.2 in the compatibility policy by
@pankajastro in #2652
* Refresh the dbt/Airflow conflicts table to match the compatibility
policy by @pankajastro in #2653
* Document incremental model limitation for
``ExecutionMode.AIRFLOW_ASYNC`` by @pankajastro in #2642

Others

* Import ``ParamValidationError`` from ``airflow.sdk`` to silence
deprecation warning by @pankajastro in #2645
* Import ``DAG`` from ``airflow.sdk`` to silence deprecation warning by
@pankajastro in #2644
* Enforce docs style guide via pre-commit hook by @pankajkoti and
@tatiana in #2633
* Add Airflow 3.2 to the test matrix in ``CLAUDE.md`` by @pankajastro in
#2646
* Document the lazy-logging standard in ``CLAUDE.md`` by @pankajastro in
#2679
* Extract watcher XCom-key helpers and inline single-use bindings by
@pankajastro in #2673
* Remove leftover ``scripts/airflow3`` directory by @pankajastro in
#2661
* Fix ``altered_jaffle_shop`` seed-dep CTE references by @pankajastro in
#2690
* Skip Airflow 3.0 integration test stuck on
``example_watcher_with_freshness`` by @pankajastro in #2692
* Fix typo "constrantis" → "constraints" in tests env comment by
@pankajastro in #2669

## Summary

Drafts the **Cosmos 1.14.2** release. Latest alpha cut is **1.14.2a4** —
refreshed from `1.14.2a3` after maintainers (@pankajastro, @pankajkoti)
added 11 more PRs to the milestone:

- @pankajastro: #2646, #2652, #2653, #2661, #2669, #2673, #2679, #2690,
#2692
- @pankajkoti:  #2658, #2659

All 11 picks applied cleanly on top of the existing branch — no
additional manual conflict resolution needed.

**Excluded:** #2618 ("Improve glossary") — modifies
`docs/reference/glossary.rst`, which doesn't exist on `release-1.14`
(added on main by #2461, never backported). Deferred to 1.15.0.

33 PRs cherry-picked total (22 in the initial a3 cut + 11 in this a4
refresh); two PRs (#2575, #2618) deliberately held back as 1.15.0
content.

## Milestone

[Cosmos
1.14.2](https://github.com/astronomer/astronomer-cosmos/milestone/48) —
33 merged PRs across Bug Fixes, Docs, and Others.

## Inclusion provenance

| Path | PRs | Notes |
|---|---|---|
| **Originally in milestone — a3 cut (12)** | #2629, #2616, #2640,
#2662, #2654, #2683, #2684, #2615, #2694, #2696, #2645, #2644 | Assigned
by maintainers before the initial release-draft run |
| **Pulled in via closed-issue link — a3 cut (1)** | #2647 | Closes
milestone issue #2634 ("Typehinting broken with lazy imports") but the
PR itself was never assigned to the milestone — included via
`closedByPullRequestsReferences` |
| **Added during cherry-pick conflict resolution — a3 cut (9)** | #2631,
#2624, #2630, #2632, #2664, #2633, #2617, #2600, #2643 | Docs PRs whose
absence caused `release-1.14` ↔ `main` textual drift. #2631 caused
#2696's conflict; the rest were transitive dependencies (especially
#2664, on top of the bullet/heading/trademark sweeps). #2643 was added
to unblock #2664's operator-docs conflict |
| **Added to milestone after a3 — included in a4 (11)** | #2646, #2652,
#2653, #2658, #2659, #2661, #2669, #2673, #2679, #2690, #2692 | Added by
@pankajastro and @pankajkoti after the initial draft. All applied
cleanly on top of the a3 cherry-picks |
| **Deliberately excluded (2)** | #2575, #2618 | #2575: documents
`DbtDocsS3KubernetesOperator` with `.. versionadded:: 1.15.0` (already
in the `Cosmos 1.15.0` milestone). #2618: improves a glossary file that
doesn't exist on `release-1.14` (the stub was added by #2461, not
backported) |

### Manual conflict resolution

Cherry-picks that needed manual fix-up. Reviewers should double-check
the files listed below:

| PR | File(s) | Resolution |
|---|---|---|
| **#2664** | `docs/guides/dbt_docs/generating-docs.rst` | **Substantive
exclusion** — manually removed the entire `Upload to S3 from Kubernetes`
section (lines ~46–77 of the incoming diff) that documents
`DbtDocsS3KubernetesOperator` (1.15.0 feature, PR #2575). Kept HEAD (no
S3-from-Kubernetes section). |
| **#2664** |
`docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` | Took
incoming side for "Example 1" / "Example 2" heading underlines (`++++`
style — matches #2664's normalization across the rest of the file). Also
took incoming for em-dash → colon ("Example 1 —" → "Example 1:"). |
| **#2664** | `docs/guides/run_dbt/operators/operators.rst` |
Auto-resolved once #2643 was cherry-picked first (added missing
Run/Test/Snapshot/Build operator reference docs that #2664 expected to
be present). |
| **#2633** |
`docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` |
Initially took `'''` (Example 1/2 underlines) per #2633's incoming side
— this broke the file's heading hierarchy and prevented sphinx from
registering the `_watcher-source-freshness:` label, causing the docs
build to fail. **Fixed in a follow-up commit** by reverting to `+++` to
match main and the rest of the file's level-3 sections. |
| **#2617** |
`docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst` |
**Substantive trim** — #2617 documented both the 1.14.0 source-freshness
execution path AND the 1.15.0 `freshness_callback` override (which ships
with #2586, not in this release line). Removed the `literalinclude` of
`dev/dags/watcher_with_freshness_check.py` (1.15.0 example DAG, missing
on release-1.14) and the surrounding override section so the 1.14.2 docs
cover only what the 1.14.x line supports. Surfaced as a `-W`
(warnings-as-errors) docs build failure on the first CI run. |
| **#2684** | `cosmos/operators/_watcher/state.py` | Took incoming side
— added two new frozensets (`DBT_UPSTREAM_FAILURE_SKIP_EVENT_NAMES`,
`DBT_SOURCE_FRESHNESS_STALE_STATUSES`) at lines 28–37. HEAD had nothing;
incoming had the additions. |
| **#2615** | `tests/airflow/test_graph.py` | Took incoming side — added
two new tests at the end of the file:
`test_add_watcher_producer_task_passes_freshness_callback_via_setup_operator_args`
and `test_watcher_dependency_wiring`. HEAD had nothing; incoming had the
additions. |

All a4 cherry-picks (11) applied without manual intervention.

## Test plan

> Long-term goal: automate. For now, please pick the slice relevant to
your environment and report deviations as a comment on this PR.

### Watcher mode (Postgres)

- [x] @pankajkoti `example_watcher` (`dev/dags/example_watcher.py`) —
default watcher run; exercises **#2629** (XCom backup key sanitization
triggers on the `+` in any default Airflow run_id)
- [x] (@tatiana) **[NEW]** `example_watcher_xcom_collision`
(`dev/failed_dags/example_watcher_xcom_collision.py`) — validates
**#2683**
- [x] @tatiana **[NEW]** `example_watcher_recovers_skipped_downstream`
(`dev/failed_dags/example_watcher_recovers_skipped_downstream.py`) —
validates **#2684**
- [x] @tatiana `example_watcher` with `default_args={"depends_on_past":
True}` and ≥2 consecutive runs — validates **#2615** (manual edit; no
dedicated example DAG)
- [x] @pankajkoti watcher DAG with at least one model that has tests +
Airflow retries enabled on the test sensor — validates **#2658** (test
sensor retry path)
- [x] @pankajkoti watcher DAG referencing a versioned dbt model (e.g.
`models/foo_v2.sql`) and triggering the fallback selector path —
validates **#2659**

### BigQuery (async)

- [x] @tatiana `simple_dag_async` (`dev/dags/simple_dag_async.py`) —
validates **#2616** (duplicate `deferrable` kwarg fix)

### dbt docs plugin

- [x] (@pankajkoti) `docs_dag` (`dev/dags/dbt_docs.py`) + open the
Cosmos dbt docs URL in the Airflow UI under a non-root deployment path —
validates **#2640** (iframe `src` deployment prefix)

### Cross-cutting scenarios (no dedicated DAG)

- [x] @pankajastro **#2662** — boot Airflow with Cosmos cluster policy +
Sentry init; verify no `TaskInstance`-import crash on startup
- [x] @pankajastro **#2654** — run a watcher DAG that prints non-dbt
stdout (e.g., Snowflake `externalbrowser` auth URL); confirm output
reaches task logs
- [x] (@pankajkoti) **#2647** — `mypy` / IDE inspection of `from cosmos
import DbtDag, ProjectConfig, ProfileConfig, RenderConfig,
ExecutionConfig` resolves attributes
- [x] @pankajastro **#2645, #2644** — boot Airflow 3 with `airflow.sdk`
available; no `ParamValidationError` / `DAG` deprecation warnings from
Cosmos imports
- [x] @pankajastro **#2673** — verify watcher XCom-key extraction left
no behavioural drift (run the existing watcher integration suite
end-to-end on Postgres + Airflow 2.10)

### Docs build (covers all docs PRs)

- [x] (@pankajkoti) `sphinx-build -W -b html docs docs/_build` succeeds
with no warnings (#2617, #2624, #2630, #2631, #2632, #2643, #2600,
#2664, #2694, #2696, #2652, #2653)

### Tooling

- [x] (@pankajkoti) `pre-commit run check-docs-style --all-files` passes
(#2633)

## Reviewer checklist

- [x] CHANGELOG section assignments reviewed
- [x] Entry wording reviewed
- [x] `cosmos/__init__.py` bumped to `1.14.2a4`
- [x] Cosmetic docs PRs (#2624, #2630, #2631, #2632, #2633, #2664)
confirmed acceptable in patch line
- [x] **Manual conflict resolutions** (table above) reviewed
file-by-file
- [x] Test plan executed on at least Postgres + one warehouse
- [x] Ready to mark non-draft

---------

Co-authored-by: Michael Black <4128408+MichaelRBlack@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Pankaj Singh <98807258+pankajastro@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Co-authored-by: John Horan <jhoran@zendesk.com>
Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
tatiana added a commit that referenced this pull request May 26, 2026
Follow-up to [#2713 review
feedback](#2713 (comment))
from @pankajkoti: extract the tuple of dbt flags the watcher consumer
must strip from a retry command into a module-level constant.

The constant lives in `cosmos/operators/_watcher/base.py`, one block
above `BaseConsumerSensor`. Its docstring carries the per-flag rationale
(previously buried in `_filter_flags`'s own docstring), so a future
contributor adding or removing a flag has one place to look.

🤖 Generated with Claude Code (https://claude.com/claude-code)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants