Simplify and unify WATCHER implementation regardless of InvocationMode#2498
Conversation
…ucer The producer operator no longer forces InvocationMode.SUBPROCESS. Instead, the invocation mode is auto-discovered at runtime (DBT_RUNNER preferred when dbt-core is in-process, SUBPROCESS otherwise), while an explicit `invocation_mode` passed by the caller takes full precedence. Both modes share the same `store_dbt_resource_status_from_log` parser: - SUBPROCESS: JSON log lines from stdout are parsed directly (unchanged). - DBT_RUNNER: EventMsg callbacks are serialised to JSON via `google.protobuf.json_format.MessageToJson` (a transitive dbt-core dep) and then forwarded to the same parser. The wiring is done by overriding `run_subprocess` and `run_dbt_runner` so that `project_dir` (the temp copy) is captured correctly in each path. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
InvocationMode
InvocationModeInvocationMode
InvocationModeInvocationMode
There was a problem hiding this comment.
Pull request overview
This PR unifies ExecutionMode.WATCHER behavior across InvocationMode.SUBPROCESS and InvocationMode.DBT_RUNNER by routing both execution paths through the same dbt JSON log parsing pipeline and standardizing per-node XCom keys.
Changes:
- Standardize per-node status reporting to
<unique_id>_statusfor both invocation modes viastore_dbt_resource_status_from_log. - Replace dbt Runner event-specific XCom compression strategy with an EventMsg→JSON→parser callback.
- Simplify consumer sensors + triggerer to always poll the canonical
*_status(and*_tests_status) keys; update/trim tests accordingly.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
cosmos/operators/watcher.py |
Producer now always uses --log-format json; adds dbt Runner callback that feeds the unified parser; removes event-compression path. |
cosmos/operators/_watcher/base.py |
Simplifies event processing to dict-only; standardizes _get_node_status() to *_status; adds _log_dbt_msg(); makes startup-events logging safer. |
cosmos/operators/_watcher/triggerer.py |
Removes use_event branching; trigger always reads *_status; removes compressed-event parsing helper. |
cosmos/operators/_watcher/__init__.py |
Stops exporting _parse_compressed_xcom after triggerer simplification. |
cosmos/operators/watcher_kubernetes.py |
Removes use_event() stub now that event-vs-xcom branching is gone. |
tests/operators/test_watcher.py |
Updates tests to new canonical XCom strategy; adds coverage for new runner callback wiring and startup-event logging; switches capsys→caplog. |
tests/operators/test_watcher_kubernetes_unit.py |
Removes use_event() test (method deleted). |
tests/operators/_watcher/test_watcher_base.py |
Removes abstract-method tests for deleted use_event()/event-based status methods. |
tests/operators/_watcher/test_triggerer.py |
Updates trigger serialization/tests for removed use_event and canonical *_status lookup. |
tests/hooks/test_subprocess.py |
Updates expectations for store_dbt_resource_status_from_log to no longer assert on missing context. |
tests/dbt/test_runner.py |
Removes no-longer-needed EventMsg patching. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2498 +/- ##
==========================================
- Coverage 98.03% 98.01% -0.02%
==========================================
Files 103 103
Lines 7262 7164 -98
==========================================
- Hits 7119 7022 -97
+ Misses 143 142 -1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pankajkoti
left a comment
There was a problem hiding this comment.
Looks like a solid refactor to me! Thanks for adding all the code comments, making the review easier and the code maintainable.
I did a sanity test on both SUBPROCESS and dbt Runner approaches, and it's looking fine to me with respect to pushing XCOMs, creating asset events, populating compiled SQLs, consistent logs and dag run durations. Also, the thread count was set to 12 in my BigQuery profile while testing, so it's looking concurrency safe. Good net 400+ lines being removed :)
michal-mrazek
left a comment
There was a problem hiding this comment.
Looking good, I tested in local docker compose. Just added one small suggestion - it took me some time to figure out that the if context is None is just typing thing :)
Co-authored-by: Michal Mrázek <121952333+michal-mrazek@users.noreply.github.com>
There was a problem hiding this comment.
Copilot reviewed 11 out of 11 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Copilot reviewed 11 out of 11 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
When SourceRenderingBehavior is not NONE, the producer task now runs
`dbt source freshness` before `dbt build`. For every stale source it
pushes a synthetic `"skipped"` XCom entry (using the unified
`{uid}_status` key introduced in #2498) for all transitively dependent
models, and adds those models to the `--exclude` list of the subsequent
`dbt build` call. Consumer sensors raise `AirflowSkipException` instead
of waiting for results that will never arrive.
Key changes:
- `cosmos/operators/watcher.py`: add `_default_freshness_callback`,
`_push_skipped_xcom_for_model`, `_run_source_freshness`,
`_skipped_node_token`, `_apply_source_freshness` to
`DbtProducerWatcherOperator`; call `_apply_source_freshness` from
`execute()` when `_check_source_freshness=True`
- `cosmos/operators/_watcher/state.py`: add `DBT_SKIPPED_STATUSES` and
`is_dbt_node_status_skipped()`; include skipped in terminal statuses
- `cosmos/operators/_watcher/base.py`: raise `AirflowSkipException` in
`execute_complete` and `poke` when status is `"skipped"`
- `cosmos/operators/_watcher/triggerer.py`: yield skipped TriggerEvent
when node status is skipped
- `cosmos/operators/local.py`: add `_read_target_sources_json` helper
and early-return path in `run_command` for the freshness check
- `cosmos/airflow/graph.py`: pass `_check_source_freshness=True` to the
producer when `source_rendering_behavior` is not NONE
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1.14.0 (2026-04-07) --------------------- Breaking Changes * Drop support for Airflow versions earlier than **2.9** by @jedcunningham in #2288 * Fix inclusion of package models and selection/exclusion behavior by @pankajkoti in #2357 * ``ExecutionMode.WATCHER``: The per-node ``*_status`` XCom value is now a dict (``{"status": "<status>", "outlet_uris": [...]}``) instead of a plain string. Any custom code that reads these internal XCom keys directly will need to be updated by @pankajkoti in #2507 Features * Add cluster policy support for ``ExecutionMode.WATCHER`` sensor retries by @astro-anand in #2293 * Add debug mode to track memory utilization by @tatiana in #2327 * Add FQN selection support for ``LoadMode.DBT_MANIFEST`` by @pankajastro in #2375 * Introduce interceptors for Cosmos tasks by @tatiana in #2419 * Add config to allow disabling dag versioning by @pankajkoti in #2470 * Implement TaskGroups by models folder by @maximilianoarcieri and @tatiana in #1566, #2469, and #2420 * feat: implement DbtTestWatcherOperator by @michal-mrazek in #2447 * Add source freshness aware execution for ``ExecutionMode.WATCHER`` by @pankajastro and @tatiana in #2467 * Note: Like ``ExecutionMode.WATCHER``, this feature is experimental and its interface and implementation can change in the future. * Add Airflow 3.2 support by @pankajastro and @pankajkoti in #2472 Enhancements * Add watcher mode support for dbt test node states by @michal-mrazek in #2318 * Rename watcher-mode sensor retry queue and reuse it for producer tasks by @pankajastro in #2331 * Fix leaked semaphore warnings in Airflow 3 by resetting dbt adapters by @pankajkoti in #2335 * Improve dbt Fusion support and related tests by @tatiana in #2356 * Default Snowflake profile mappings to four threads by @tatiana in #2374 * Attempt to remove Pydantic as a dependency by @tatiana in #2377 * Log dbt-core and adapter versions in watcher consumer tasks by @pankajastro in #2412 * Log model errors in watcher consumer on dbt node failure by @pankajastro in #2431 * Reduce XCom read/write for tracking node state and errors in ConsumerWatcher task by @pankajastro in #2471 * Remove duplicate debug log in watcher subprocess path by @tatiana in #2494 * Simplify and unify WATCHER implementation regardless of InvocationMode by @tatiana in #2498 * Switch to lazy imports in cosmos/__init__.py by @pankajkoti in #2531 Bug Fixes * Handle invalid YAML errors with ``LoadMode.DBT_MANIFEST`` and ``RenderConfig.selector`` by @YourRoyalLinus in #2316 * Populate ``compiled_sql`` for ``InvocationMode.SUBPROCESS`` in ``ExecutionMode.WATCHER`` by @pankajkoti in #2319 * Fix select/exclude type mismatch by @tatiana in #2364 * Set ``emit_datasets=False`` for ``DbtTest*`` operators by @pankajastro in #2365 * Set correct queue priority for watcher producer tasks by @pankajastro in #2372 * Preserve ``extra_context`` for watcher consumer task instances by @pankajkoti in #2381 * Respect ``deferrable=False`` from ``operator_args`` on watcher consumer sensors by @pankajkoti in #2384 * Fix watcher queue precedence and add documentation by @pankajastro in #2391 * Do not set ``compiled_sql`` on ``ExecutionMode.WATCHER`` producers by @pankajkoti in #2440 * Remove const attribute for ``__cosmos_telemetry_metadata__`` dag param by @pankajkoti in #2466 * Remove timeout override from Cosmos watcher sensors by @tatiana and @claude in #2478 * Remove forced ``retries=0`` from watcher producer operators by @tatiana in #2479 * RFC: Add patch for newer versions of amazon provider when running dbt on EKS by @aoelvp94 in #2481 * Fix ``cosmos_debug_max_memory_mb`` XCom not pushed in Watcher sensor tasks by @tatiana in #2503 * Fix ``TestBehavior.NONE`` and ``TestBehavior.AFTER_ALL`` exclude ignored with selectors in ``ExecutionMode.WATCHER`` by @pankajkoti in #2511 * Move dataset emission for ``ExecutionMode.WATCHER`` from producer to consumer sensors by @pankajkoti in #2507 Docs * Document cluster policy configuration for ``ExecutionMode.WATCHER`` sensor tasks by @pankajastro in #2315 * Remove outdated docs for the dbt docs plugin with Airflow 3 by @pankajastro in #2353 * Make Watcher DBT Execution Queue heading clickable by @pankajastro in #2354 * Update ``ExecutionMode.WATCHER`` documentation regarding test node implementation by @jroachgolf84 in #2355 * Fix ``pre_dbt_fusion`` configuration rendering by @pankajastro in #2369 * Add documentation for including/excluding nodes based on FQN by @pankajastro in #2371 * Update watcher execution mode documentation by @tatiana in #2380 * Add documentation for ``DbtSeedLocalOperator`` by @jroachgolf84 in #2383 * Fix miscellaneous Sphinx warnings by @pankajastro in #2395 * Improve contributing documentation by @lzdanski in #2397 * Add **Get Started in 5 Minutes** guide by @lzdanski in #2398 * Add Sphinx redirects package for documentation redirects by @lzdanski in #2407 * Restructure **Getting Started** and **Guides** sections by @lzdanski in #2418 * Add open-source quickstart by @lzdanski in #2439 * Fix documentation redirects by @lzdanski in #2442 * Restructure and refactor reference documentation by @lzdanski in #2443 * Add execution modes decision documentation by @lzdanski in #2444 * Add **Core Concepts** page to Getting Started by @lzdanski in #2448 * Add guide: *How Cosmos Works* by @lzdanski in #2449 * Update **Getting Started** overview and index pages by @lzdanski in #2452 * Add guide: *How Cosmos Runs dbt* by @lzdanski in #2453 * Fix miscellaneous documentation links by @lzdanski in #2454 * Add Mermaid diagrams and execution mode diagrams by @lzdanski and @tatiana in #2459 * Add documentation for memory optimization options by @pankajastro in #2340 * Fix typo in watcher execution mode docs by @evanvolgas in #2485 * Fix minor documentation issues by @evanvolgas in #2489 * Add troubleshooting note for dbt debug logs in ExecutionMode.WATCHER by @tatiana in #2491 * docs: unify RST header styles across documentation by @jigangz in #2473 * docs: fix env var for rich logging by @vricciardulli in #2514 * docs: update dbt project path example for Airflow 3 Astro compatibility by @yeoreums in #2512 * Document missing Cosmos Airflow config settings in cosmos-conf.rst by @tatiana in #2515 * Split security-privacy policy doc and add dependency cooldown by @pankajkoti in #2519 * Add performance optimization and troubleshooting docs by @pankajkoti in #2521 * Update copyright year to 2026 by @tayloramurphy in #2527 * docs: Updating "Project Policies" to "Policies" in menu bar by @jroachgolf84 in #2526 Others * Fix tests after removing support for Airflow versions earlier than 2.9 by @tatiana in #2321 * Enable listener tests for Airflow 3.1 by @pankajastro in #2348 * Accept ``int`` or ``float`` for ``cosmos_debug_max_memory_mb`` in integration tests by @pankajkoti in #2352 * Update ``CODEOWNERS`` to prioritize ``oss-integrations`` by @tatiana in #2359 * Fix automatic reviewer assignment in GitHub by @tatiana and @phanikumv in #2360 * Improve PyPI tagging by @tatiana in #2363 * Add integration tests for dbt Fusion and ``ExecutionMode.WATCHER`` by @tatiana in #2373 * Fix Zizmor check by @tatiana in #2376 * Remove ``methodtools`` dependency by @tatiana in #2378 * Improve comments on #2389 by @evanvolgas in #2394 * Refactor ``load_from_dbt_manifest`` to reduce code complexity by @pankajkoti in #2399 * Refactor ``_handle_no_precursors_or_descendants`` to reduce complexity by @pankajkoti in #2400 * Improve issue templates by @tatiana in #2401 * Avoid running tests when only docs change by @tatiana in #2402 * Add ``no-reload`` target for serving docs locally by @pankajkoti in #2405 * Fix test hash checks on macOS by @tatiana in #2406 * Attempt deterministic dbt project copy in test fixtures by @pankajkoti in #2409 * Pin ``virtualenv <21`` due to hatch incompatibility in CI by @pankajkoti in #2410 * Revert virtualenv pin for hatch installation in CI by @pankajkoti in #2426 * Add version comments for commit SHA pinned GitHub Actions by @pankajkoti in #2436 * Fix ``hatch run docs:build`` issues by @tatiana in #2437 * Minor code improvements by @dnskr in #2446 * Pre-commit autoupdate by @pre-commit-ci in #2367, #2396, #2422, #2451, #2468, #2495, and #2516 * Add file to support Claude understanding the Cosmos repository by @tatiana in #2458 * Dependency updates by @dependabot in #2368, #2425, #2435, #2465, #2475, #2504, #2518, and #2528 * Isolate Scarf telemetry integration test into its own CI job by @pankajkoti and @claude in #2477 * ci: upgrade Airflow version to 3.1 in MyPy type-check job by @yeoreums in #2506 * Add commit message guidelines to CLAUDE.md by @pankajkoti in #2509 * Extend skipping tests in CI for more non-code file changes by @pankajkoti in #2510 * Add Dependabot pre-commit support with 7-day cooldown by @pankajkoti in #2517 * Enforce zero warnings policy for documentation by @dnskr in #2513 Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com> Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com> Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
…#2654) Forward non-dbt stdout to task log in WATCHER mode Since upgrading from `astronomer-cosmos==1.13.0` to `1.14.1`, non-dbt stdout output (specifically the Snowflake connector's externalbrowser authentication URL) no longer appears in task logs when using `ExecutionMode.WATCHER`. ## Steps to Reproduce 1. Configure a local Airflow environment with Snowflake `authenticator: externalbrowser` in `~/.dbt/profiles.yml` 2. Set the following environment variables (required for server-side externalbrowser auth in a containerised environment): ``` SF_AUTH_SOCKET_PORT=3910 SF_AUTH_SOCKET_ADDR=0.0.0.0 SNOWFLAKE_AUTH_FORCE_SERVER='true' PYTHONUNBUFFERED=1 ``` 3. Use `ExecutionMode.WATCHER` (with either `InvocationMode.DBT_RUNNER` or `InvocationMode.SUBPROCESS`) 4. Run a dbt DAG that connects to Snowflake 5. Observe that the Snowflake connector's "Going to open: <URL> to authenticate..." message does **not** appear in task logs 6. Downgrade to `astronomer-cosmos==1.13.0` — the URL appears correctly ## Expected Behaviour Non-dbt stdout/logging output (e.g. the Snowflake externalbrowser auth URL) should be forwarded to the Airflow task log, as it was in 1.13.0 after PR #2241. ## Actual Behaviour - **`InvocationMode.DBT_RUNNER`**: `contextlib.redirect_stdout(io.StringIO())` in `cosmos/operators/watcher.py` captures all stdout into a buffer that is never read. The Snowflake connector's `print("Going to open: ...")` goes into the void. - **`InvocationMode.SUBPROCESS`**: The JSON log line parser discards non-JSON lines from subprocess stdout. ## Summary of changes - Fixes #2649. Since 1.14.0, `ExecutionMode.WATCHER` swallowed any stdout written by libraries other than dbt itself — most visibly the Snowflake connector's externalbrowser auth prompt (`Going to open: <URL>`), which made interactive auth unusable. - Replaces `_NullWriter` (used for the `DBT_RUNNER` path) with `_StdoutFilter`, which drops valid-JSON lines (still handled by the `EventMsg` callback, so the #2558 double-logging fix is preserved) and forwards anything else to `logger.info`. - In the `SUBPROCESS` path, promotes the `JSONDecodeError` branch from `logger.debug` to `logger.info` via a small `_surface_non_json_stdout` helper, so non-JSON lines surface in the task log. ## Why Two paths in WATCHER mode were dropping non-dbt stdout: - `DBT_RUNNER`: `run_dbt_runner()` wrapped the dbt invocation in `contextlib.redirect_stdout(_NullWriter())` to suppress duplicate `--log-format json` output, but the sink also discarded everything written by other libraries loaded inside dbt's process. - `SUBPROCESS`: `store_dbt_resource_status_from_log()` demoted any line that failed `json.loads` to `logger.debug`, which is silenced by default in Airflow task logs. ## #2558 regression check The double-logging fix from #2498 is preserved: `_StdoutFilter._emit()` calls `json.loads`; if the line parses, it is silently dropped (same as `_NullWriter`). Only when parsing raises is the line forwarded to `logger.info`. Smoke tested with the same JSON shape from #2558 — no double-log output. ## Test plan - [x] Added `TestStdoutFilter` with cases for: JSON line dropped, non-JSON line forwarded at INFO, partial writes buffer until newline, `flush()` emits trailing partial line, mixed JSON+plain stream. - [x] Added `TestStoreDbtResourceStatusFromLogNonJson` with a Snowflake-prompt regression test and an empty-line case. - [x] Existing `TestStoreDbtStatusFromLog` suite still passes (no regressions in JSON parsing). - [x] Reviewer to validate against the Snowflake `authenticator: externalbrowser` reproduction in #2649. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This PR unifies
ExecutionMode.WATCHERso it works almost identically with bothInvocationMode.SUBPROCESSandInvocationMode.DBT_RUNNER- removing the previously separate code paths that depended on the invocation mode.Problem
Before this PR,
ExecutionMode.WATCHERhad two completely different strategies depending on how dbt was invoked:InvocationMode.DBT_RUNNER: Subscribed to dbt'sEventMsgcallbacks, serialised eachNodeFinishedevent as zlib-compressed base64 JSON under XCom keynodefinished_<uid>, andpushed startup metadata under dbt_startup_events.
InvocationMode.SUBPROCESS: After dbt exited, read target/run_results.json and pushed the entire results blob once under XCom key run_results. Sensors could not react per-modeluntil the full build finished.
_dbt_runner_callbackswith structuredEventMsgobjects_process_log_line_callable+--log-format jsonparsingXComkey (status)nodefinished_{uid}(compressed+base64 JSON){uid}_status(plain string)_get_status_from_events()(decompresses JSON)get_xcom_val(…_status)_get_node_status()→ decompresses JSON_get_node_status()→ reads plain valueThis meant the consumer sensors needed to know the invocation mode at poke time (
use_event()method), and the triggerer needed ause_eventflag. The code was complex, duplicated, and the two modes delivered very different latency and observability.Solution
Both invocation modes now feed through the same
store_dbt_resource_status_from_logparser, which pushes per-model XCom keys<unique_id>_statusas eachNodeFinishedevent arrives:InvocatioMode.SUBPROCESS: Each JSON log line from stdout is parsed directly by the callable wired torun_subprocess. `InvocationMode.DBT_RUNNER: EachEventMsgfrom the dbt callback is serialised to JSON viagoogle.protobuf.json_format.MessageToJson(a transitive dbt-core dependency) and passedthrough the same parser. Stdout is redirected to suppress raw JSON duplication in Airflow logs.
Consumer sensors and the
WatcherTriggeralways poll the canonical{uid}_statuskey regardless of invocation mode.Key changes
cosmos/operators/watcher.py—DbtProducerWatcherOperatorrun_dbt_runner()override: registers aMessageToJson-based callback that feedsstore_dbt_resource_status_from_log. Callback is only registered when contextis not None (guards against dbt deps calls during
_install_dependencies, which would otherwise cause a phantom callback andGenericExceptionOnRunerrors).contextlib.redirect_stdout(io.StringIO())during the main build to suppress raw JSON from dbt's stdout (since events are handled via the callback).EventMsg-based_handle_node_finished/_serialize_event/zlib-compressedXCom strategy.InvocationMode-basedbranching fromexecute().cosmos/operators/_watcher/base.pyuse_event()abstract method and_get_status_from_events()/_get_status_from_run_results()— no longer needed._get_node_status()now always reads{uid}_statusXCom key (plus the_tests_statuskey forDbtTestWatcherOperator).poke()calls_log_startup_events()for all non-test sensors regardless of invocation mode._process_dbt_log_event()simplified: removed theEventMsgbranch (dict-only now)._log_dbt_msg()helper to improve log UX: shows human-readable dbt messages (e.g."Found 5 models, 3 seeds...") instead of raw JSON in Airflow task logs.Reduces cyclomatic complexity of
store_dbt_resource_status_from_logfrom 11 to 9 (fixes Ruff C901).AssertionError("")→GenericExceptionOnRunwhen dbt's phantom callback fires withouta context.
_log_startup_events()now checksisinstance(dbt_startup_events, list)before iterating.cosmos/operators/_watcher/triggerer.pyuse_eventparameter fromWatcherTrigger._parse_dbt_node_status_and_compiled_sql()simplified: always reads{uid}_status(nonodefinished_*key lookup).cosmos/operators/watcher_kubernetes.pyuse_event()stub.Tests
_handle_node_finished,_get_status_from_events,_get_status_from_run_results,use_event()).test_dbt_dag_with_watcherfromcapsystocaplogto avoid thread-safety issues with dbt's multi-threaded stdout writes.test_dbt_runner_caching_and_callbacksto remove the now-unnecessaryEventMsgpatch.Test plan
hatch run tests.py3.11-2.10-1.9:testExecutionMode.WATCHERruns end-to-end with bothSUBPROCESSandDBT_RUNNERinvocation modesairflow dags test example_watchercompletes without getting stuck or emitting GenericExceptionOnRun errorsairflow standaloneand validate the output.