Add deferrable support for Watcher Sensor#2084
Conversation
cc3b1a8 to
592abce
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2084 +/- ##
==========================================
+ Coverage 97.80% 97.81% +0.01%
==========================================
Files 91 92 +1
Lines 5871 5948 +77
==========================================
+ Hits 5742 5818 +76
- Misses 129 130 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull Request Overview
This PR adds deferrable (async) execution support to the DbtConsumerWatcherSensor by introducing a new WatcherTrigger class. When the sensor's initial poke returns False, it defers execution to the trigger which asynchronously polls XCom for model status updates.
- Implemented
WatcherTriggerclass for async polling of dbt model status - Added
executeandexecute_completemethods toDbtConsumerWatcherSensorfor deferrable behavior - Refactored XCom decompression logic into a reusable
_parse_compressed_xcomutility function - Added comprehensive test coverage for the trigger and sensor deferral behavior
Reviewed Changes
Copilot reviewed 5 out of 7 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| cosmos/_triggers/watcher.py | New trigger class for async polling of dbt model status from XCom |
| cosmos/_triggers/init.py | Empty init file for the _triggers module |
| cosmos/operators/watcher.py | Added execute/execute_complete methods and refactored to use shared decompression utility |
| tests/_triggers/watcher.py | Comprehensive test coverage for WatcherTrigger async behavior |
| tests/_triggers/init.py | Empty init file for test triggers module |
| tests/operators/test_watcher.py | Tests for sensor deferral and execute_complete method |
| pyproject.toml | Added pytest-asyncio dependency for async test support |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pankajkoti
left a comment
There was a problem hiding this comment.
It's looking great to me. Have a couple of comments inline.
Also, looks like a few tests are failing in the CI.
There was a problem hiding this comment.
@pankajastro, this is really exciting, thanks for finding a solution for it!
Do you see any downsides or risks with always using the deferrable implementation? What happens if the end-user does not have a triggerer in their Airflow deployment?
Should we also support people opting out of using the deferrable mode?
Finally, are there any performance improvements when switching to deferrable, when running ExecutionMode.WATCHER with our benchmark project?
I’m a bit concerned about the number of database accesses from the trigger.
I’ve thought about it, and I think we should expose a parameter at the sensor level to conditionally defer. However, it would be better to do this once #1973 is solved.
I have not done any performance testing. |
tatiana
left a comment
There was a problem hiding this comment.
Looks great, thank you very much, @pankajastro !
Breaking changes * Introduced in the PR #2080. The following functions are expected to be used internally only to Cosmos, so we hope these won't impact end-users, but we are documenting the changes just in case: - ``generate_task_or_group`` receives ``render_config`` instead of its individual configurations, such as ``test_behavior``, ``source_rendering_behavior`` and ``enable_owner_inheritance`` - ``create_task_metadata`` receives ``render_config`` instead of its individual configurations, such as ``test_behavior``, ``source_rendering_behavior`` and ``enable_owner_inheritance`` - ``create_task_metadata`` now expects the ``node_converters`` argument * Drop Python 3.9 support by @pankajastro in #2118 * Drop Airflow 2.4 support by @pankajastro in #2161 * Drop Airflow 2.5 support by @pankajastro in #2165 Features * Support applying ``node_converter`` at a task level instead of task group level by @anyapriya in #1759 * Allow overriding ``DbtProducerWatcherOperator`` parameters via ``ExecutionConfig.setup_operator_args`` by @pankajastro in #2133 * Use deferrable sensors by default in ``ExecutionMode.WATCHER`` by @pankajastro in #2084 * Support real-time consumer updates when using ``ExecutionMode.WATCHER`` and ``InvocationMode.SUBPROCESS`` by @pankajastro in #2152 * Update telemetry to v3 format with query parameters by @pankajkoti in #2192 * Add initial set of telemetry task listener metrics for Cosmos operators by @pankajkoti in #2195 Enhancements * Unify Airflow version handling into ``constants.py`` by @tatiana in #2089 * Refactor ``airflow/graph.py`` to simplify the code base by @tatiana in #2080 * Force watcher producer retries to zero by @pankajkoti in #2114 * Fail ``ExecutionMode.WATCHER`` consumer sensors immediately when the producer fails using Airflow context by @pankajkoti in #2126 * ``ExecutonMode.WATCHER``: fetch producer status asynchronously from the Airflow runtime so deferrable sensors fail immediately when the producer task fails by @pankajkoti in #2144 * Refactor ``ExecutionMode.WATCHER`` ``InvocationMode.SUBPROCESS`` log parser by @tatiana in #2183 * Replace map_index with is_mapped_task boolean in task telemetry metrics by @pankajkoti in #2210 * Collect cosmos profile metrics in task telemetry metrics by @pankajastro in #2198 * Remove unnecessary information from telemetry by @tatiana in #2211 Bug fixes * Clarify ``ExecutionMode.WATCHER`` deferrable failure messaging by @pankajkoti in #2124 * Remove empty test tasks when all tests are detached by @anyapriya in #2010 * Fix forwarding ``DbtProducerWatcherOperator`` ``dbt build`` flags by @michal-mrazek in #2127 * Add databricks oauth mock profile by @fjmacagno in #2164 * Register listeners in Airflow 3 plugin implementation by @pankajastro in #2187 * Fix resolution of ``packages-install-path`` when it uses ``env_var`` by @tatiana in #2194 * Fix ``template_fields`` in ``DbtConsumerWatcherSensor`` to include ``DbtRunLocalOperator`` template_fields`` by @tiovader and @emanuel-luis in #2209 * Emit asset events in ExecutionMode.AIRFLOW_ASYNC mode by @pankajastro in #2184 * Remove dag_run_id from telemetry tests by @tatiana in #2213 Docs * Document dataset-event limitation when using ``ExecutionMode.AIRFLOW_ASYNC`` by @varaprasadregani in #2143 * Expand ``ExecutionMode.KUBERNETES`` guidance by @tatiana in #2139 * Add docs for deferrable ``DbtConsumerWatcherSensor`` by @pankajastro in #2115 * Fix reStructuredText formatting by @dnskr in #2132 * Add docs for ``setup_operator_args`` param by @pankajastro in #2136 * Remove experimental flag for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajastro in #2153 * Clarify ``ExecutionMode.AIRFLOW_ASYNC`` dataset limits by @pankajkoti in #2167 * Update PRIVACY_NOTICE.rst by @tatiana in #2212 Others * Drop Python 3.9 support by @pankajastro in #2118 * Drop Airflow 2.4 support by @pankajastro in #2161 * Drop Airflow 2.5 support by @pankajastro in #2165 * Improve example DAG ``jaffle_shop_kubernetes.py`` by @tatiana in #2140 * Enable tests for Python 3.13 by @pankajastro in #2154 * Add Python 3.12 to CI integration tests matrix by @pankajastro in #2168 * Retry flaky Telemetry success test to stabilise CI by @pankajkoti in #2138 * Drop unused producer state xcom handling in ``ExecutionMode.WATCHER`` by @pankajkoti in #2145 * Remove unused Python3.9 uses from Github action CI by @pankajastro in #2117 * Run pre-commit on ``ExecutionMode.WATCHER`` modules by @pankajkoti in #2150 * Refactor: Use shared airflow version constant by @pankajkoti in #2157 * Pin ``pydantic<2.0`` for Airflow 2.6 compatibility by @pankajastro in #2172 * Remove duplicate ``dbt-duckdb`` dependency by @pankajastro in #2170 * Add targeted ``type: ignore`` for untyped decorators to fix ``mypy`` errors by @pankajastro in #2174 * Replace Legacy typing Aliases with Built-in Types for Python 3.10+ by @pankajastro in #2175 * Refactor to reuse ``load_method_from_module`` from ``_utils/importer.py`` by @pankajastro in #2176 * Remove try except block for cache import and unused python_version variable by @pankajastro in #2186 * Unpin Airflow to satisfy GitHub Security tab requirements by @pankajastro in #2171 * Update Python version for ``pyupgrade`` in ``pre-commit`` config by @pankajastro in #2190 * Add cooldown config in ``dependabot`` config by @pankajastro in #2189 * Adjust pre-commit so Python 3.10 or higher can be used by @tatiana in #2196 * Remove empty variables emission from telemetry metrics by @pankajkoti in #2197 * Reformat documented comments for historical URL formats by @pankajkoti in #2199 * Bump ``actions/checkout`` from ``5.0.0`` to ``5.0.1`` by @dependabot in #2135 * Bump ``actions/checkout`` to ``6.0.0`` in GitHub workflows by @dependabot in #2147 * Bump ``zizmorcore/zizmor-action`` from ``0.2.0`` to ``0.3.0`` by @dependabot in #2156 * Bump ``actions/checkout`` from ``5.0.1`` to ``6.0.0`` by @dependabot in #2155 * Bump ``actions/checkout`` from ``6.0.0`` to ``6.0.1`` by @dependabot in #2178 * Bump ``codecov/codecov-action`` from ``5.5.1`` to ``5.5.2`` by @dependabot in #2208 * pre-commit autoupdate by @pre-commit-ci[bot] in #2134, #2162, #2173, #2191, #2202 closes: astronomer/oss-integrations-private#275
Since #2084, the `ExecutionMode.WATCHER` logic started spreading throughout the Cosmos code base. When implementing Airflow providers, it is common practice to split Airflow code into operators and triggerers folders. However, in the specific case of Cosmos, I believe this approach makes it harder to read and maintain the project and that it is better to centralise the execution mode code. Historically, Cosmos execution modes were defined in the operators folder because the code for each execution mode was operator-only. With the introduction of the async and watcher execution modes, the execution mode implementation became more complex, requiring a custom triggerer for the watcher. As a result, the watcher execution mode code began spreading throughout the Cosmos codebase, complicating code distribution more than needed. This PR brings the watcher triggerer logic closer to the operator, so that most of the watcher-specific logic lives near the execution mode implementation. In Cosmos 2.0, we can review how we name the Cosmos execution mode folder, potentially renaming this folder from operators to execution_modes.
This PR adds async/deferrable support to the Watcher sensor
Benefits
Limitation
Future
closes: #2059