Skip to content

feat: Add watcher mode support for dbt test node states#2318

Merged
tatiana merged 5 commits into
astronomer:mainfrom
michal-mrazek:feat/watcher-test-operator-support
Mar 3, 2026
Merged

feat: Add watcher mode support for dbt test node states#2318
tatiana merged 5 commits into
astronomer:mainfrom
michal-mrazek:feat/watcher-test-operator-support

Conversation

@michal-mrazek
Copy link
Copy Markdown
Contributor

@michal-mrazek michal-mrazek commented Feb 3, 2026

Description

When using ExecutionMode.WATCHER, dbt test nodes now have their results aggregated per parent model and pushed as a single XCom value ("pass" or "fail"), enabling downstream consumer sensors to observe the test outcome for each model. This lays the groundwork for a future DbtTestConsumerWatcherSensor.

I also introduced some helpful function for handling node status. This can later be easily enhanced.

I removed using hardcoded string in TriggerEvent - this was prone for typos.

I think that documentation should be changed in following PR - where we change the behaviour of test tasks. For now, there is no change for end users.

Future work:

  • Implement the TestOperator to read the xcom
  • also implement for k8 execution mode

Related Issue(s)

Related to: #2311

Breaking Change?

No

Checklist

  • I have made corresponding changes to the documentation (if required)
  • I have added tests that prove my fix is effective or that my feature works

Copilot AI review requested due to automatic review settings February 3, 2026 16:09
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds watcher mode support for dbt test node states by introducing helper functions to handle different node status values (models use "success"/"failed", tests use "pass"/"fail") and updating the codebase to use these helpers and constants instead of hardcoded strings.

Changes:

  • Introduced helper functions (is_node_status_success, is_node_status_failed, is_node_status_terminal) to handle both model and test status values
  • Replaced hardcoded status strings with EventStatus constants in trigger events
  • Updated log messages to use generic "node" terminology instead of "model"

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
cosmos/operators/_watcher/state.py Adds helper functions and constants for checking node status states
cosmos/operators/_watcher/triggerer.py Updates to use new helper functions and EventStatus constants instead of hardcoded strings
cosmos/operators/_watcher/base.py Updates store_dbt_resource_status_from_log to use is_node_status_terminal helper
cosmos/operators/_watcher/init.py Exports new helper functions in all
tests/operators/_watcher/test_state.py Adds comprehensive unit tests for the new node status helper functions
tests/operators/test_watcher.py Adds tests verifying test node status ("pass"/"fail") is correctly stored in XCom
tests/operators/_watcher/test_triggerer.py Updates test assertion to use "node" instead of "model" terminology

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_watcher/state.py
@netlify
Copy link
Copy Markdown

netlify Bot commented Feb 3, 2026

Deploy Preview for astronomer-cosmos ready!

Name Link
🔨 Latest commit d00e36a
🔍 Latest deploy log https://app.netlify.com/projects/astronomer-cosmos/deploys/6991c516128eb300087503dc
😎 Deploy Preview https://deploy-preview-2318--astronomer-cosmos.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@codecov
Copy link
Copy Markdown

codecov Bot commented Feb 4, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 97.97%. Comparing base (4d86173) to head (0826b5b).
⚠️ Report is 4 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2318      +/-   ##
==========================================
+ Coverage   97.96%   97.97%   +0.01%     
==========================================
  Files         102      103       +1     
  Lines        7013     7079      +66     
==========================================
+ Hits         6870     6936      +66     
  Misses        143      143              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread cosmos/operators/_watcher/state.py Outdated
Comment thread cosmos/operators/_watcher/base.py Outdated
Comment thread cosmos/operators/_watcher/state.py Outdated
Comment thread cosmos/operators/_watcher/triggerer.py
Copy link
Copy Markdown
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michal-mrazek this is looking great, thank you for the improvement! Really excited that this could also fix a significant part of #2291.

I believe we're very close to merge this change. I left some feedback inline - it would be great if we could disambiguate where we are referencing ariflow task state x dbt model state, since we don't have 1:1 parity.

Lastly, could you confirm if this works both:

  • when using InvocationMode.DBT_RUNNER
  • when using InvocationMode.SUBPROCESS (I believe you tested with this use case, referencing log lines)

@michal-mrazek
Copy link
Copy Markdown
Contributor Author

michal-mrazek commented Feb 5, 2026

@tatiana, during testing subprocess/dbt_runner, I realized that there is one caveat.

There can be multiple tests (so dbt nodes) for one model. Moreover, the test node name is more complex. So to make the AFTER_EACH work lately we have to map airflow task to multiple dbt test nodes:

  1. Aggregate during the xcom push
  2. Aggregate during the xcom pull

I tend to incline to option 2. But since this is architecture decision, I would be curious about your opinion :)

This is not strictly related to this PR, but we have to keep this in mind. But yeah, works with both subprocess/dbt_runner :)

Screenshot 2026-02-05 at 10 49 34 PM

@michal-mrazek
Copy link
Copy Markdown
Contributor Author

@tatiana, once we merge this, I will start working on the test operator for after_each. I think it makes sense, we can reuse a lot of code, we will just have to pull for multiple xcoms :)

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@michal-mrazek
Copy link
Copy Markdown
Contributor Author

@tatiana @pankajkoti Can you pls check if you have some time? :) I will then follow up with the next PR :)

Comment thread cosmos/operators/_watcher/base.py
Copilot AI review requested due to automatic review settings February 23, 2026 13:17
@michal-mrazek michal-mrazek force-pushed the feat/watcher-test-operator-support branch from 5a6324f to a2436e4 Compare February 23, 2026 13:17
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_watcher/aggregation.py
Comment thread cosmos/operators/_watcher/state.py
Comment thread cosmos/operators/_watcher/aggregation.py
Comment thread cosmos/operators/_watcher/aggregation.py
Comment thread tests/operators/_watcher/test_aggregation.py
Copilot AI review requested due to automatic review settings February 23, 2026 13:44
@michal-mrazek
Copy link
Copy Markdown
Contributor Author

michal-mrazek commented Feb 23, 2026

Added the dbt test aggregation before pushing to xcom. This was a bit more complex than I expected :)
Screenshot 2026-02-23 at 2 08 30 PM

We need to pass the tests_per_model but also keep the state of existing tests (but be aware of multithreading, so lock was introduced). I will change the PR description.

Both subprocess and dbt_runner were tested - k8 should follow in another PR, I feel like this PR is already growing a lot.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_watcher/aggregation.py
Comment thread cosmos/operators/_watcher/aggregation.py
Comment thread cosmos/airflow/graph.py
Comment thread cosmos/operators/_watcher/base.py
@tatiana
Copy link
Copy Markdown
Collaborator

tatiana commented Feb 24, 2026

Thanks a lot for advancing the implementation further, @michal-mrazek!

I just tried out commit 0826b5b, and I noticed that we're still generating one XCom per test - could you adjust this, please?
Screenshot 2026-02-24 at 13 45 48

I used the DAG that's in the repo example_watcher.py and airflow standalone to try it out.

@michal-mrazek
Copy link
Copy Markdown
Contributor Author

@tatiana That is strange 🤔

git log --oneline -5
0826b5bf (HEAD -> feat/watcher-test-operator-support, origin/feat/watcher-test-operator-support) tests
5f016214 comments
9f5332f9 handle the dbt test aggregation
bda49418 rename all dbt node variables for cleaner code
da0ac024 feat: Add watcher mode support for dbt test node states
Screenshot 2026-02-24 at 4 58 56 PM I just pulled the branch and run via docker compose you guys provide 🤔

@michal-mrazek
Copy link
Copy Markdown
Contributor Author

@tatiana I did not manage to match your results with this branch. I specifically desinged tests for this case https://github.com/astronomer/astronomer-cosmos/pull/2318/changes#diff-b6f62de0effcda8f032c7203047c74394bfd2ee93ab65392462714a9910dbdfeR687 Can you please pull the branch and try it from scratch? 🙏

@tatiana
Copy link
Copy Markdown
Collaborator

tatiana commented Mar 3, 2026

Thanks a lot, @michal-mrazek , I'm not sure about what happened. Thanks for the amazing work here. I created a new Airflow deployment locally, and I was able to confirm the expected behaviour 🎉

Screenshot 2026-03-03 at 13 11 06

Comment thread cosmos/airflow/graph.py
Comment thread cosmos/operators/_watcher/base.py
@tatiana tatiana merged commit e38da03 into astronomer:main Mar 3, 2026
134 of 135 checks passed
@pankajastro pankajastro mentioned this pull request Mar 16, 2026
pankajastro added a commit that referenced this pull request Apr 7, 2026
1.14.0 (2026-04-07)
---------------------

Breaking Changes

* Drop support for Airflow versions earlier than **2.9** by
@jedcunningham in #2288
* Fix inclusion of package models and selection/exclusion behavior by
@pankajkoti in #2357
* ``ExecutionMode.WATCHER``: The per-node ``*_status`` XCom value is now
a dict (``{"status": "<status>", "outlet_uris": [...]}``) instead of a
plain string. Any custom code that reads these internal XCom keys
directly will need to be updated by @pankajkoti in #2507

Features

* Add cluster policy support for ``ExecutionMode.WATCHER`` sensor
retries by @astro-anand in #2293
* Add debug mode to track memory utilization by @tatiana in #2327
* Add FQN selection support for ``LoadMode.DBT_MANIFEST`` by
@pankajastro in #2375
* Introduce interceptors for Cosmos tasks by @tatiana in #2419
* Add config to allow disabling dag versioning by @pankajkoti in #2470
* Implement TaskGroups by models folder by @maximilianoarcieri and
@tatiana in #1566, #2469, and #2420
* feat: implement DbtTestWatcherOperator by @michal-mrazek in #2447
* Add source freshness aware execution for ``ExecutionMode.WATCHER`` by
@pankajastro and @tatiana in #2467

* Note: Like ``ExecutionMode.WATCHER``, this feature is experimental and
its interface and implementation can change in the future.
* Add Airflow 3.2 support by @pankajastro and @pankajkoti in #2472

Enhancements

* Add watcher mode support for dbt test node states by @michal-mrazek in
#2318
* Rename watcher-mode sensor retry queue and reuse it for producer tasks
by @pankajastro in #2331
* Fix leaked semaphore warnings in Airflow 3 by resetting dbt adapters
by @pankajkoti in #2335
* Improve dbt Fusion support and related tests by @tatiana in #2356
* Default Snowflake profile mappings to four threads by @tatiana in
#2374
* Attempt to remove Pydantic as a dependency by @tatiana in #2377
* Log dbt-core and adapter versions in watcher consumer tasks by
@pankajastro in #2412
* Log model errors in watcher consumer on dbt node failure by
@pankajastro in #2431
* Reduce XCom read/write for tracking node state and errors in
ConsumerWatcher task by @pankajastro in #2471
* Remove duplicate debug log in watcher subprocess path by @tatiana in
#2494
* Simplify and unify WATCHER implementation regardless of InvocationMode
by @tatiana in #2498
* Switch to lazy imports in cosmos/__init__.py by @pankajkoti in #2531

Bug Fixes

* Handle invalid YAML errors with ``LoadMode.DBT_MANIFEST`` and
``RenderConfig.selector`` by @YourRoyalLinus in #2316
* Populate ``compiled_sql`` for ``InvocationMode.SUBPROCESS`` in
``ExecutionMode.WATCHER`` by @pankajkoti in #2319
* Fix select/exclude type mismatch by @tatiana in #2364
* Set ``emit_datasets=False`` for ``DbtTest*`` operators by @pankajastro
in #2365
* Set correct queue priority for watcher producer tasks by @pankajastro
in #2372
* Preserve ``extra_context`` for watcher consumer task instances by
@pankajkoti in #2381
* Respect ``deferrable=False`` from ``operator_args`` on watcher
consumer sensors by @pankajkoti in #2384
* Fix watcher queue precedence and add documentation by @pankajastro in
#2391
* Do not set ``compiled_sql`` on ``ExecutionMode.WATCHER`` producers by
@pankajkoti in #2440
* Remove const attribute for ``__cosmos_telemetry_metadata__`` dag param
by @pankajkoti in #2466
* Remove timeout override from Cosmos watcher sensors by @tatiana and
@claude in #2478
* Remove forced ``retries=0`` from watcher producer operators by
@tatiana in #2479
* RFC: Add patch for newer versions of amazon provider when running dbt
on EKS by @aoelvp94 in #2481
* Fix ``cosmos_debug_max_memory_mb`` XCom not pushed in Watcher sensor
tasks by @tatiana in #2503
* Fix ``TestBehavior.NONE`` and ``TestBehavior.AFTER_ALL`` exclude
ignored with selectors in ``ExecutionMode.WATCHER`` by @pankajkoti in
#2511
* Move dataset emission for ``ExecutionMode.WATCHER`` from producer to
consumer sensors by @pankajkoti in #2507

Docs

* Document cluster policy configuration for ``ExecutionMode.WATCHER``
sensor tasks by @pankajastro in #2315
* Remove outdated docs for the dbt docs plugin with Airflow 3 by
@pankajastro in #2353
* Make Watcher DBT Execution Queue heading clickable by @pankajastro in
#2354
* Update ``ExecutionMode.WATCHER`` documentation regarding test node
implementation by @jroachgolf84 in #2355
* Fix ``pre_dbt_fusion`` configuration rendering by @pankajastro in
#2369
* Add documentation for including/excluding nodes based on FQN by
@pankajastro in #2371
* Update watcher execution mode documentation by @tatiana in #2380
* Add documentation for ``DbtSeedLocalOperator`` by @jroachgolf84 in
#2383
* Fix miscellaneous Sphinx warnings by @pankajastro in #2395
* Improve contributing documentation by @lzdanski in #2397
* Add **Get Started in 5 Minutes** guide by @lzdanski in #2398
* Add Sphinx redirects package for documentation redirects by @lzdanski
in #2407
* Restructure **Getting Started** and **Guides** sections by @lzdanski
in #2418
* Add open-source quickstart by @lzdanski in #2439
* Fix documentation redirects by @lzdanski in #2442
* Restructure and refactor reference documentation by @lzdanski in #2443
* Add execution modes decision documentation by @lzdanski in #2444
* Add **Core Concepts** page to Getting Started by @lzdanski in #2448
* Add guide: *How Cosmos Works* by @lzdanski in #2449
* Update **Getting Started** overview and index pages by @lzdanski in
#2452
* Add guide: *How Cosmos Runs dbt* by @lzdanski in #2453
* Fix miscellaneous documentation links by @lzdanski in #2454
* Add Mermaid diagrams and execution mode diagrams by @lzdanski and
@tatiana in #2459
* Add documentation for memory optimization options by @pankajastro in
#2340
* Fix typo in watcher execution mode docs by @evanvolgas in #2485
* Fix minor documentation issues by @evanvolgas in #2489
* Add troubleshooting note for dbt debug logs in ExecutionMode.WATCHER
by @tatiana in #2491
* docs: unify RST header styles across documentation by @jigangz in
#2473
* docs: fix env var for rich logging by @vricciardulli in #2514
* docs: update dbt project path example for Airflow 3 Astro
compatibility by @yeoreums in #2512
* Document missing Cosmos Airflow config settings in cosmos-conf.rst by
@tatiana in #2515
* Split security-privacy policy doc and add dependency cooldown by
@pankajkoti in #2519
* Add performance optimization and troubleshooting docs by @pankajkoti
in #2521
* Update copyright year to 2026 by @tayloramurphy in #2527
* docs: Updating "Project Policies" to "Policies" in menu bar by
@jroachgolf84 in #2526

Others

* Fix tests after removing support for Airflow versions earlier than 2.9
by @tatiana in #2321
* Enable listener tests for Airflow 3.1 by @pankajastro in #2348
* Accept ``int`` or ``float`` for ``cosmos_debug_max_memory_mb`` in
integration tests by @pankajkoti in #2352
* Update ``CODEOWNERS`` to prioritize ``oss-integrations`` by @tatiana
in #2359
* Fix automatic reviewer assignment in GitHub by @tatiana and @phanikumv
in #2360
* Improve PyPI tagging by @tatiana in #2363
* Add integration tests for dbt Fusion and ``ExecutionMode.WATCHER`` by
@tatiana in #2373
* Fix Zizmor check by @tatiana in #2376
* Remove ``methodtools`` dependency by @tatiana in #2378
* Improve comments on #2389 by @evanvolgas in #2394
* Refactor ``load_from_dbt_manifest`` to reduce code complexity by
@pankajkoti in #2399
* Refactor ``_handle_no_precursors_or_descendants`` to reduce complexity
by @pankajkoti in #2400
* Improve issue templates by @tatiana in #2401
* Avoid running tests when only docs change by @tatiana in #2402
* Add ``no-reload`` target for serving docs locally by @pankajkoti in
#2405
* Fix test hash checks on macOS by @tatiana in #2406
* Attempt deterministic dbt project copy in test fixtures by @pankajkoti
in #2409
* Pin ``virtualenv <21`` due to hatch incompatibility in CI by
@pankajkoti in #2410
* Revert virtualenv pin for hatch installation in CI by @pankajkoti in
#2426
* Add version comments for commit SHA pinned GitHub Actions by
@pankajkoti in #2436
* Fix ``hatch run docs:build`` issues by @tatiana in #2437
* Minor code improvements by @dnskr in #2446
* Pre-commit autoupdate by @pre-commit-ci in #2367, #2396, #2422, #2451,
#2468, #2495, and #2516
* Add file to support Claude understanding the Cosmos repository by
@tatiana in #2458
* Dependency updates by @dependabot in #2368, #2425, #2435, #2465,
#2475, #2504, #2518, and #2528
* Isolate Scarf telemetry integration test into its own CI job by
@pankajkoti and @claude in #2477
* ci: upgrade Airflow version to 3.1 in MyPy type-check job by @yeoreums
in #2506
* Add commit message guidelines to CLAUDE.md by @pankajkoti in #2509
* Extend skipping tests in CI for more non-code file changes by
@pankajkoti in #2510
* Add Dependabot pre-commit support with 7-day cooldown by @pankajkoti
in #2517
* Enforce zero warnings policy for documentation by @dnskr in #2513

Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants