Refactor operator DbtConsumerWatcherSensor for reusability#2245
Conversation
✅ Deploy Preview for astronomer-cosmos canceled.
|
kubernetes_watcher
kubernetes_watcher…he K8s watcher operator
f13fa25 to
7c96d1b
Compare
There was a problem hiding this comment.
Pull request overview
This PR refactors the watcher operator implementation by extracting common functionality into a new base module (cosmos/operators/_watcher/base.py), preparing for the upcoming Kubernetes watcher implementation. The refactoring moves shared sensor logic, utility functions, and constants to reusable components while maintaining existing functionality.
Key changes:
- Created
BaseConsumerSensorclass to encapsulate common sensor behavior previously inDbtConsumerWatcherSensor - Moved utility functions (
_store_dbt_resource_status_from_log) and constants to shared locations - Updated import paths throughout test files to reflect the new module structure
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
cosmos/constants.py |
Added new watcher-related constants for priority weights and weight rules |
cosmos/operators/watcher.py |
Refactored to use new BaseConsumerSensor parent class and removed duplicated code |
cosmos/operators/_watcher/base.py |
New base module containing BaseConsumerSensor class and shared utility functions |
tests/operators/test_watcher.py |
Updated test patches to reference new module paths and renamed test methods |
tests/hooks/test_subprocess.py |
Updated test patches to reference new module location |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2245 +/- ##
==========================================
- Coverage 97.99% 97.97% -0.03%
==========================================
Files 96 97 +1
Lines 6245 6273 +28
==========================================
+ Hits 6120 6146 +26
- Misses 125 127 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
b3ef8b6 to
a993b0d
Compare
DbtConsumerWatcherSensor for reusability
pankajkoti
left a comment
There was a problem hiding this comment.
Looks solid refactor to me!
Would be nice to test DAGs once with astro CLI / airflow standalone if not done already.
Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
Features * Support cross-referencing models across dbt projects using dbt-loom by @pankajkoti in #2271 * Support use of YAML selectors when using ``LoadMode.DBT_MANIFEST`` by @YourRoyalLinus in #2261 * Introduce ``ExecutionMode.WATCHER_KUBERNETES`` to use the watcher with ``KubernetesPodOperator`` by @tatiana in #2207 * Add support for StarRocks profile mapping by @kurkim0661 in #2256 * Allow pushing URIs as XComs for Cosmos tasks by @corsettigyg in #2275 * Support defining custom callbacks alongside the ``WATCHER_KUBERNETES`` callback by @johnhoran in #2307 Enhancements * Refactor: remove duplicate ``_construct_dest_file_path`` by @jx2lee in #2077 * Leverage Airflow ``::group::`` to group logs associated with DAG parsing by @tatiana in #2235 * Refactor ``DbtConsumerWatcherSensor`` for reusability by @tatiana in #2245 * Restore plain text output when using ``ExecutionMode.WATCHER`` by @tiovader in #2241 Bug Fixes * Fix running empty models or ephemeral nodes in ``ExecutionMode.WATCHER`` by @tatiana in #2279 * Improve watcher producer task priority in scheduling and the UI by @tatiana in #2237 * Fix typos and formatting issues in documentation by @pankajkoti in #2259 * Allow watcher producer retries without erroring by @tatiana in #2283 * Fix ``TestBehavior.AFTER_ALL`` is missing project_name information when loading project using manifest file by @tuantran0910 in #2242 * Fix duplicate log lines in watcher subprocess execution and format timestamps by @pankajkoti in #2301 Docs * Add Watcher Kubernetes documentation by @tatiana in #2303 * Document newly added telemetry metrics in the privacy notice by @pankajkoti in #2249 * Add compatibility policy document by @pankajastro in #2251 * Improve watcher documentation related to dbt threads by @tatiana in #2273 * Fix link in watcher execution mode documentation by @jedcunningham in #2277 * Update Apache Airflow minimum compatibility policy by @tatiana in #2285 * Clarify Cosmos runtime support until "End of Basic Support" by @jedcunningham in #2286 * Update watcher docs by @tatiana in #2298 * Update watcher kubernetes documentation by @tatiana in #2306 Others * Add Airflow 3 DAG versioning tests for Cosmos by @michal-mrazek in #2177 * Add dbt Core 1.11 to the test matrix by @tatiana in #2230 * Add integration tests using InvocationMode.SUBPROCESS and validate output by @tatiana in #2287 * Fix main branch failing tests by @tatiana in #2296 * Update pre-commit hooks to the latest versions by @jedcunningham in #2289 * Pre-commit autoupdates by @pre-commit in #2222, #2264, #2274 and #2290 * Dependabot updates by @dependabot in #2218, #2219, #2220, #2280 and #2284 * Add Scarf metrics to understand Cosmos feature usage patterns - Add telemetry tracking for dbt docs plugin usage by @pankajkoti in #2240 - Add DAG run telemetry metrics for load mode, invocation, and render_config parameters by @pankajkoti in #2223 - Collect profile metrics for DAG runs by @pankajastro in #2228 - Compress telemetry metadata to reduce serialized DAG size by @pankajkoti in #2252 - Skip storing telemetry metadata when emission is disabled by @pankajkoti in #2278 - Hide telemetry metadata parameters from the Airflow trigger UI by @pankajkoti in #2247 closes: astronomer/oss-integrations-private#317 --------- Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
This PR extracts basic watcher operator features that will be reused by the
ExecutionMode.WATCHER_KUBERNETESimplementation in #2207, so that the last PR related to introducingExecutionMode.WATCHER_KUBERNETESbecomes smaller and easier to review.This PR does not add any new features or functionality. The main changes are:
ExecutionMode.WATCHERandExecutionMode.WATCHER_KUBERNETESfromcosmos.operators.watcher.BaseConsumerSensortocosmos.operators._watcher.base.DbtConsumerWatcherSensorcosmos.operators.watcher._store_dbt_resource_status_from_logtocosmos.operators._watchers.base._store_dbt_resource_status_from_logHow the changes were validated
In addition to the automated tests we run in the CI, I ran manually three different DAGs that use
ExecutionMode.WATCHER:example_watcherthat is in our repo indev/dags/example_watcher.pyexample_watcher_synchronousthat is in our repo indev/dags/example_watcher.pyexample_watcherto validate subprocess, where I changedexecution_configto:Screenshots with the successful runs:
DAG overview:

Using

InvocationMode.DBT_RUNNERUsing

InvocationMode.SUBPROCESSConsumers continue sensing:

Runs with synchronous sensors:
