Skip to content

feat: implement DbtTestWatcherOperator #2447

Merged
tatiana merged 4 commits into
astronomer:mainfrom
michal-mrazek:feat/watcher-test-consumer-sensor
Mar 24, 2026
Merged

feat: implement DbtTestWatcherOperator #2447
tatiana merged 4 commits into
astronomer:mainfrom
michal-mrazek:feat/watcher-test-consumer-sensor

Conversation

@michal-mrazek
Copy link
Copy Markdown
Contributor

@michal-mrazek michal-mrazek commented Mar 8, 2026

Description

Implementation of the DbtTestWatcherOperator (formerly only EmptyOperator) with a DbtConsumerWatcherSensor subclass that watches aggregated dbt test results. All logic is handled in the BaseConsumerSensor that just checks for is_test_sensor property.

I also added WatcherEventReason enum for type-safe trigger event payloads and changed model->node.

Related Issue(s)

closes #2441

Breaking Change?

Checklist

  • I have made corresponding changes to the documentation (if required)
  • I have added tests that prove my fix is effective or that my feature works

Copilot AI review requested due to automatic review settings March 8, 2026 18:06
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements DbtTestWatcherOperator as a real watcher-mode sensor that monitors aggregated dbt test outcomes via XCom, and introduces a typed WatcherEventReason for trigger → sensor payloads while standardizing terminology from “model” to “node”.

Changes:

  • Implement DbtTestWatcherOperator as a DbtConsumerWatcherSensor subclass that watches the aggregated <model_uid>_tests_status XCom key.
  • Add WatcherEventReason enum and update WatcherTrigger/BaseConsumerSensor to use typed reason codes and support test sensors via is_test_sensor.
  • Update and extend tests to cover aggregated test status polling and the new reason codes.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
cosmos/operators/watcher.py Replaces the placeholder DbtTestWatcherOperator with a real watcher-mode sensor implementation.
cosmos/operators/_watcher/triggerer.py Adds WatcherEventReason and extends WatcherTrigger to handle aggregated test status via is_test_sensor.
cosmos/operators/_watcher/base.py Adds test-sensor plumbing (is_test_sensor, _resource_label, _get_node_status) and updates event handling/logging.
tests/operators/_watcher/test_triggerer.py Updates triggerer tests to assert enum-based reason payloads.
tests/operators/test_watcher.py Updates existing watcher tests for enum reasons and adds new tests for DbtTestWatcherOperator.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment thread cosmos/operators/watcher.py
Comment thread cosmos/operators/_watcher/base.py
@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 9, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 98.02%. Comparing base (5a09ad5) to head (614ada3).
⚠️ Report is 8 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2447      +/-   ##
==========================================
- Coverage   98.04%   98.02%   -0.03%     
==========================================
  Files         103      103              
  Lines        7229     7254      +25     
==========================================
+ Hits         7088     7111      +23     
- Misses        141      143       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@michal-mrazek
Copy link
Copy Markdown
Contributor Author

@tatiana I will rebase this ticket since we now have logging in sensors (which we are super grateful for!) and elt you know it is ready fore review :)

@michal-mrazek michal-mrazek force-pushed the feat/watcher-test-consumer-sensor branch from bce804e to 8996fcb Compare March 17, 2026 09:38
Copilot AI review requested due to automatic review settings March 17, 2026 09:39
@michal-mrazek michal-mrazek review requested due to automatic review settings March 17, 2026 09:39
@michal-mrazek michal-mrazek force-pushed the feat/watcher-test-consumer-sensor branch from 13d61ca to 554ae49 Compare March 17, 2026 09:43
Copilot AI review requested due to automatic review settings March 17, 2026 09:43
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment thread cosmos/operators/watcher.py
Comment thread cosmos/operators/_watcher/base.py
Comment thread cosmos/operators/watcher.py
Copilot AI review requested due to automatic review settings March 17, 2026 09:53
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment thread cosmos/operators/watcher.py
Comment thread cosmos/operators/watcher.py
Comment thread cosmos/operators/_watcher/base.py
@michal-mrazek michal-mrazek force-pushed the feat/watcher-test-consumer-sensor branch from f7ee3ae to 41e86ea Compare March 17, 2026 09:57
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (3)

cosmos/operators/_watcher/base.py:332

  • For is_test_sensor retries, _fallback_to_non_watcher_run currently returns True, which will mark a retry attempt as successful even if the aggregated test status is still fail (or never produced). This can mask failing tests whenever Airflow retries are configured. Consider changing retry handling for test sensors to re-check the aggregated _tests_status XCom (and fail again if it's fail), or explicitly raise an exception indicating retries are unsupported instead of returning success.
    def _filter_flags(flags: list[str]) -> list[str]:
        """Filters out dbt flags that are incompatible with retry (e.g., --select, --exclude)."""
        filtered = []
        skip_next = False
        for token in flags:
            if skip_next:
                if token.startswith("--"):

cosmos/operators/_watcher/base.py:438

  • The log message for NODE_NOT_RUN is framed as "was skipped by the dbt command" and mentions ephemeral/empty model SQL. When is_test_sensor=True, this reason more likely means no aggregated test status was produced (e.g., no tests selected), so the message becomes misleading and grammatically awkward ("Tests for model ... was"). Consider branching the message for test sensors (e.g., "No test results were reported...") or using a resource-specific verb phrasing.

    def execute(self, context: Context, **kwargs: Any) -> None:
        if not self.deferrable:
            super().execute(context)
        elif not self.poke(context):
            self.defer(

cosmos/operators/_watcher/base.py:464

  • The failure messages format dbt {self._resource_label.lower()} ... failed / reporting results for {self._resource_label.lower()} ... leads to awkward output like "dbt tests for model '...' failed". Consider using a dedicated noun/label for error strings (e.g., "model" vs "tests") or a full sentence that avoids concatenating dbt with the label, so messages read naturally for both model and test sensors.

        if status == "success" and reason == WatcherEventReason.NODE_NOT_RUN:
            logger.info(
                "%s '%s' was skipped by the dbt command. This may happen if it is an ephemeral model or if the model sql file is empty.",
                self._resource_label,
                self.model_unique_id,
            )

        # Extract and store compiled_sql from the event if available

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_watcher/base.py
Comment thread cosmos/operators/watcher.py
Comment thread cosmos/operators/_watcher/triggerer.py
Copilot AI review requested due to automatic review settings March 20, 2026 09:27
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_watcher/base.py Outdated
Comment thread cosmos/operators/_watcher/base.py
Comment thread cosmos/operators/watcher.py
Comment thread tests/operators/test_watcher.py Outdated
@tatiana tatiana added this to the Cosmos 1.14.0 milestone Mar 20, 2026
Copy link
Copy Markdown
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michal-mrazek Thank you very much for continuing this work stream. I would like us to merge these changes before we release Cosmos 1.14.0. I just gave some feedback in-line - please, let me know your thoughts.

Also, please, could you mention how you tested these changes?

  • InvocationMode.SUBPROCESS x InvocationMode.DBT_RUNNER
  • Local x K8s (based on the files changed, I assume only local)

For us to have feature parity in K8s & Local, should we update the ticket #2427 or create a new one?

Comment thread cosmos/operators/_watcher/base.py
@michal-mrazek
Copy link
Copy Markdown
Contributor Author

@michal-mrazek Thank you very much for continuing this work stream. I would like us to merge these changes before we release Cosmos 1.14.0. I just gave some feedback in-line - please, let me know your thoughts.

Also, please, could you mention how you tested these changes?

  • InvocationMode.SUBPROCESS x InvocationMode.DBT_RUNNER
  • Local x K8s (based on the files changed, I assume only local)

For us to have feature parity in K8s & Local, should we update the ticket #2427 or create a new one?

@tatiana I tested on both DBT_RUNNER and SUBPROCESS. Not K8 though, so I think the #2427 is still actual and we should work on it :)

Btw do you have any timeline for 1.14.0 release?

Copilot AI review requested due to automatic review settings March 20, 2026 14:24
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cosmos/operators/_watcher/triggerer.py
Comment thread cosmos/operators/_watcher/base.py
Comment thread cosmos/operators/watcher.py
Comment thread tests/operators/test_watcher.py Outdated
@tatiana
Copy link
Copy Markdown
Collaborator

tatiana commented Mar 23, 2026

@michal-mrazek thanks a lot, I believe we're very close to merging this - please, could you fix the unit tests?

@michal-mrazek
Copy link
Copy Markdown
Contributor Author

Adjusted tests, hopefully it will pass now :) Lets trigger it, I can fix in a minute if needed :)

Copy link
Copy Markdown
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much, @michal-mrazek , it's an excellent addition to Cosmos watcher to run test after each properly 🎉

@tatiana tatiana merged commit 3f7b3fa into astronomer:main Mar 24, 2026
75 of 76 checks passed
@pankajastro pankajastro mentioned this pull request Apr 7, 2026
tatiana pushed a commit that referenced this pull request Apr 7, 2026
Following #2441
and #2447, this PR
implements test handling for `WATCHER_KUBERNETES` mode.

Closes #2427
pankajastro added a commit that referenced this pull request Apr 7, 2026
1.14.0 (2026-04-07)
---------------------

Breaking Changes

* Drop support for Airflow versions earlier than **2.9** by
@jedcunningham in #2288
* Fix inclusion of package models and selection/exclusion behavior by
@pankajkoti in #2357
* ``ExecutionMode.WATCHER``: The per-node ``*_status`` XCom value is now
a dict (``{"status": "<status>", "outlet_uris": [...]}``) instead of a
plain string. Any custom code that reads these internal XCom keys
directly will need to be updated by @pankajkoti in #2507

Features

* Add cluster policy support for ``ExecutionMode.WATCHER`` sensor
retries by @astro-anand in #2293
* Add debug mode to track memory utilization by @tatiana in #2327
* Add FQN selection support for ``LoadMode.DBT_MANIFEST`` by
@pankajastro in #2375
* Introduce interceptors for Cosmos tasks by @tatiana in #2419
* Add config to allow disabling dag versioning by @pankajkoti in #2470
* Implement TaskGroups by models folder by @maximilianoarcieri and
@tatiana in #1566, #2469, and #2420
* feat: implement DbtTestWatcherOperator by @michal-mrazek in #2447
* Add source freshness aware execution for ``ExecutionMode.WATCHER`` by
@pankajastro and @tatiana in #2467

* Note: Like ``ExecutionMode.WATCHER``, this feature is experimental and
its interface and implementation can change in the future.
* Add Airflow 3.2 support by @pankajastro and @pankajkoti in #2472

Enhancements

* Add watcher mode support for dbt test node states by @michal-mrazek in
#2318
* Rename watcher-mode sensor retry queue and reuse it for producer tasks
by @pankajastro in #2331
* Fix leaked semaphore warnings in Airflow 3 by resetting dbt adapters
by @pankajkoti in #2335
* Improve dbt Fusion support and related tests by @tatiana in #2356
* Default Snowflake profile mappings to four threads by @tatiana in
#2374
* Attempt to remove Pydantic as a dependency by @tatiana in #2377
* Log dbt-core and adapter versions in watcher consumer tasks by
@pankajastro in #2412
* Log model errors in watcher consumer on dbt node failure by
@pankajastro in #2431
* Reduce XCom read/write for tracking node state and errors in
ConsumerWatcher task by @pankajastro in #2471
* Remove duplicate debug log in watcher subprocess path by @tatiana in
#2494
* Simplify and unify WATCHER implementation regardless of InvocationMode
by @tatiana in #2498
* Switch to lazy imports in cosmos/__init__.py by @pankajkoti in #2531

Bug Fixes

* Handle invalid YAML errors with ``LoadMode.DBT_MANIFEST`` and
``RenderConfig.selector`` by @YourRoyalLinus in #2316
* Populate ``compiled_sql`` for ``InvocationMode.SUBPROCESS`` in
``ExecutionMode.WATCHER`` by @pankajkoti in #2319
* Fix select/exclude type mismatch by @tatiana in #2364
* Set ``emit_datasets=False`` for ``DbtTest*`` operators by @pankajastro
in #2365
* Set correct queue priority for watcher producer tasks by @pankajastro
in #2372
* Preserve ``extra_context`` for watcher consumer task instances by
@pankajkoti in #2381
* Respect ``deferrable=False`` from ``operator_args`` on watcher
consumer sensors by @pankajkoti in #2384
* Fix watcher queue precedence and add documentation by @pankajastro in
#2391
* Do not set ``compiled_sql`` on ``ExecutionMode.WATCHER`` producers by
@pankajkoti in #2440
* Remove const attribute for ``__cosmos_telemetry_metadata__`` dag param
by @pankajkoti in #2466
* Remove timeout override from Cosmos watcher sensors by @tatiana and
@claude in #2478
* Remove forced ``retries=0`` from watcher producer operators by
@tatiana in #2479
* RFC: Add patch for newer versions of amazon provider when running dbt
on EKS by @aoelvp94 in #2481
* Fix ``cosmos_debug_max_memory_mb`` XCom not pushed in Watcher sensor
tasks by @tatiana in #2503
* Fix ``TestBehavior.NONE`` and ``TestBehavior.AFTER_ALL`` exclude
ignored with selectors in ``ExecutionMode.WATCHER`` by @pankajkoti in
#2511
* Move dataset emission for ``ExecutionMode.WATCHER`` from producer to
consumer sensors by @pankajkoti in #2507

Docs

* Document cluster policy configuration for ``ExecutionMode.WATCHER``
sensor tasks by @pankajastro in #2315
* Remove outdated docs for the dbt docs plugin with Airflow 3 by
@pankajastro in #2353
* Make Watcher DBT Execution Queue heading clickable by @pankajastro in
#2354
* Update ``ExecutionMode.WATCHER`` documentation regarding test node
implementation by @jroachgolf84 in #2355
* Fix ``pre_dbt_fusion`` configuration rendering by @pankajastro in
#2369
* Add documentation for including/excluding nodes based on FQN by
@pankajastro in #2371
* Update watcher execution mode documentation by @tatiana in #2380
* Add documentation for ``DbtSeedLocalOperator`` by @jroachgolf84 in
#2383
* Fix miscellaneous Sphinx warnings by @pankajastro in #2395
* Improve contributing documentation by @lzdanski in #2397
* Add **Get Started in 5 Minutes** guide by @lzdanski in #2398
* Add Sphinx redirects package for documentation redirects by @lzdanski
in #2407
* Restructure **Getting Started** and **Guides** sections by @lzdanski
in #2418
* Add open-source quickstart by @lzdanski in #2439
* Fix documentation redirects by @lzdanski in #2442
* Restructure and refactor reference documentation by @lzdanski in #2443
* Add execution modes decision documentation by @lzdanski in #2444
* Add **Core Concepts** page to Getting Started by @lzdanski in #2448
* Add guide: *How Cosmos Works* by @lzdanski in #2449
* Update **Getting Started** overview and index pages by @lzdanski in
#2452
* Add guide: *How Cosmos Runs dbt* by @lzdanski in #2453
* Fix miscellaneous documentation links by @lzdanski in #2454
* Add Mermaid diagrams and execution mode diagrams by @lzdanski and
@tatiana in #2459
* Add documentation for memory optimization options by @pankajastro in
#2340
* Fix typo in watcher execution mode docs by @evanvolgas in #2485
* Fix minor documentation issues by @evanvolgas in #2489
* Add troubleshooting note for dbt debug logs in ExecutionMode.WATCHER
by @tatiana in #2491
* docs: unify RST header styles across documentation by @jigangz in
#2473
* docs: fix env var for rich logging by @vricciardulli in #2514
* docs: update dbt project path example for Airflow 3 Astro
compatibility by @yeoreums in #2512
* Document missing Cosmos Airflow config settings in cosmos-conf.rst by
@tatiana in #2515
* Split security-privacy policy doc and add dependency cooldown by
@pankajkoti in #2519
* Add performance optimization and troubleshooting docs by @pankajkoti
in #2521
* Update copyright year to 2026 by @tayloramurphy in #2527
* docs: Updating "Project Policies" to "Policies" in menu bar by
@jroachgolf84 in #2526

Others

* Fix tests after removing support for Airflow versions earlier than 2.9
by @tatiana in #2321
* Enable listener tests for Airflow 3.1 by @pankajastro in #2348
* Accept ``int`` or ``float`` for ``cosmos_debug_max_memory_mb`` in
integration tests by @pankajkoti in #2352
* Update ``CODEOWNERS`` to prioritize ``oss-integrations`` by @tatiana
in #2359
* Fix automatic reviewer assignment in GitHub by @tatiana and @phanikumv
in #2360
* Improve PyPI tagging by @tatiana in #2363
* Add integration tests for dbt Fusion and ``ExecutionMode.WATCHER`` by
@tatiana in #2373
* Fix Zizmor check by @tatiana in #2376
* Remove ``methodtools`` dependency by @tatiana in #2378
* Improve comments on #2389 by @evanvolgas in #2394
* Refactor ``load_from_dbt_manifest`` to reduce code complexity by
@pankajkoti in #2399
* Refactor ``_handle_no_precursors_or_descendants`` to reduce complexity
by @pankajkoti in #2400
* Improve issue templates by @tatiana in #2401
* Avoid running tests when only docs change by @tatiana in #2402
* Add ``no-reload`` target for serving docs locally by @pankajkoti in
#2405
* Fix test hash checks on macOS by @tatiana in #2406
* Attempt deterministic dbt project copy in test fixtures by @pankajkoti
in #2409
* Pin ``virtualenv <21`` due to hatch incompatibility in CI by
@pankajkoti in #2410
* Revert virtualenv pin for hatch installation in CI by @pankajkoti in
#2426
* Add version comments for commit SHA pinned GitHub Actions by
@pankajkoti in #2436
* Fix ``hatch run docs:build`` issues by @tatiana in #2437
* Minor code improvements by @dnskr in #2446
* Pre-commit autoupdate by @pre-commit-ci in #2367, #2396, #2422, #2451,
#2468, #2495, and #2516
* Add file to support Claude understanding the Cosmos repository by
@tatiana in #2458
* Dependency updates by @dependabot in #2368, #2425, #2435, #2465,
#2475, #2504, #2518, and #2528
* Isolate Scarf telemetry integration test into its own CI job by
@pankajkoti and @claude in #2477
* ci: upgrade Airflow version to 3.1 in MyPy type-check job by @yeoreums
in #2506
* Add commit message guidelines to CLAUDE.md by @pankajkoti in #2509
* Extend skipping tests in CI for more non-code file changes by
@pankajkoti in #2510
* Add Dependabot pre-commit support with 7-day cooldown by @pankajkoti
in #2517
* Enforce zero warnings policy for documentation by @dnskr in #2513

Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Implement DbtTestWatcherOperator

3 participants