Fail consumer sensors when producer task failure observed using Airflow context#2126
Conversation
✅ Deploy Preview for sunny-pastelito-5ecb04 ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
6fb90ed to
60aead8
Compare
d24a4a4 to
785aaae
Compare
759fa63 to
4acf148
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2126 +/- ##
==========================================
- Coverage 97.82% 97.81% -0.01%
==========================================
Files 92 92
Lines 5972 5995 +23
==========================================
+ Hits 5842 5864 +22
- Misses 130 131 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull Request Overview
This PR enhances the consumer sensor to detect producer task failures by querying the Airflow context directly, rather than relying solely on XCom state. The implementation includes separate code paths for Airflow 2.x (database queries) and Airflow 3.x (RuntimeTaskInstance API).
Key Changes:
- Adds
_get_producer_task_status()method with version-specific logic for retrieving producer task state - Updates
poke()method to use the new context-based status retrieval - Adds comprehensive test coverage for both Airflow versions and error scenarios
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| cosmos/operators/watcher.py | Implements new _get_producer_task_status() method with Airflow 2/3 compatibility and updates poke() to use it |
| tests/operators/test_watcher.py | Adds test coverage for new producer status retrieval method across Airflow versions and error cases |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ba4e08b to
af433d7
Compare
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
7f4b30d to
be89e16
Compare
tatiana
left a comment
There was a problem hiding this comment.
@pankajkoti, it looks excellent, thank you very much. I didn't realise we could get another task state in Airflow 3 - this is very useful.
With this change, it seems tests stopped hitting _get_producer_task_state, based on the code coverage report (https://app.codecov.io/gh/astronomer/astronomer-cosmos/pull/2126/indirect-changes). I'm not sure we stopped seeing code coverage comments in the PR - they used to be very useful.
If the function is no longer in use, it may be better to remove it.
With this PR, the Watcher deferrable trigger now mirrors the synchronous sensor (as implemented in PR #2126) by querying the producer task state through Airflow instead of the state XCom. The PR adds `_get_producer_task_status` to `WatcherTrigger` (database lookup in Airflow 2, `RuntimeTaskInstance` in Airflow 3) and wires the `run()` method to utilise it, so async sensors raise `producer_failed` events when the producer dies. related: #2126 related: #2086 --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This PR removes the obsolete `_store_producer_task_state` helper and its failure-callback wiring from `DbtProducerWatcherOperator`, since both synchronous and asynchronous watcher sensors now rely on `_get_producer_task_status` for producer failures (in `cosmos/operators/watcher.py` and `cosmos/_triggers/watcher.py`) as part of changes done in PRs #2144 and #2126. Also, the PR cleans up the associated unit tests that validated the old callback/XCom behaviour (tests/operators/test_watcher.py) related: #2144 related: #2126 related: #2086 --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
…ow context (#2126) Ensure `DbtConsumerWatcherSensor` uses the Airflow context to fail fast when the watcher producer ends in a failure state. related: #2086 --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> (cherry picked from commit a59011e)
Bug fixes * Force ``DbtProducerWatcherOperator`` retries to zero by @pankajkoti in #2114 * Fail ``DbtConsumerWatcherSensor`` tasks immediately when the ``DbtProducerWatcherOperator`` fails using Airflow context by @pankajkoti in #2126 * Fix forwarding ``DbtProducerWatcherOperator`` ``dbt build`` flags by by @michal-mrazek in #2127 Documentation * Expand ``ExecutionMode.KUBERNETES`` guidance by @tatiana in #2139 * Document dataset-event limitation when using ``ExecutionMode.AIRFLOW_ASYNC`` by @varaprasadregani in #2143 related: astronomer/oss-integrations-private#274
Bug fixes * Force ``DbtProducerWatcherOperator`` retries to zero by @pankajkoti in #2114 * Fail ``DbtConsumerWatcherSensor`` tasks immediately when the ``DbtProducerWatcherOperator`` fails using Airflow context by @pankajkoti in #2126 * Fix forwarding ``DbtProducerWatcherOperator`` ``dbt build`` flags by @michal-mrazek in #2127 Documentation * Expand ``ExecutionMode.KUBERNETES`` guidance by @tatiana in #2139 * Document dataset-event limitation when using ``ExecutionMode.AIRFLOW_ASYNC`` by @varaprasadregani in #2143 related: astronomer/oss-integrations-private#274 --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Breaking changes * Introduced in the PR #2080. The following functions are expected to be used internally only to Cosmos, so we hope these won't impact end-users, but we are documenting the changes just in case: - ``generate_task_or_group`` receives ``render_config`` instead of its individual configurations, such as ``test_behavior``, ``source_rendering_behavior`` and ``enable_owner_inheritance`` - ``create_task_metadata`` receives ``render_config`` instead of its individual configurations, such as ``test_behavior``, ``source_rendering_behavior`` and ``enable_owner_inheritance`` - ``create_task_metadata`` now expects the ``node_converters`` argument * Drop Python 3.9 support by @pankajastro in #2118 * Drop Airflow 2.4 support by @pankajastro in #2161 * Drop Airflow 2.5 support by @pankajastro in #2165 Features * Support applying ``node_converter`` at a task level instead of task group level by @anyapriya in #1759 * Allow overriding ``DbtProducerWatcherOperator`` parameters via ``ExecutionConfig.setup_operator_args`` by @pankajastro in #2133 * Use deferrable sensors by default in ``ExecutionMode.WATCHER`` by @pankajastro in #2084 * Support real-time consumer updates when using ``ExecutionMode.WATCHER`` and ``InvocationMode.SUBPROCESS`` by @pankajastro in #2152 * Update telemetry to v3 format with query parameters by @pankajkoti in #2192 * Add initial set of telemetry task listener metrics for Cosmos operators by @pankajkoti in #2195 Enhancements * Unify Airflow version handling into ``constants.py`` by @tatiana in #2089 * Refactor ``airflow/graph.py`` to simplify the code base by @tatiana in #2080 * Force watcher producer retries to zero by @pankajkoti in #2114 * Fail ``ExecutionMode.WATCHER`` consumer sensors immediately when the producer fails using Airflow context by @pankajkoti in #2126 * ``ExecutonMode.WATCHER``: fetch producer status asynchronously from the Airflow runtime so deferrable sensors fail immediately when the producer task fails by @pankajkoti in #2144 * Refactor ``ExecutionMode.WATCHER`` ``InvocationMode.SUBPROCESS`` log parser by @tatiana in #2183 * Replace map_index with is_mapped_task boolean in task telemetry metrics by @pankajkoti in #2210 * Collect cosmos profile metrics in task telemetry metrics by @pankajastro in #2198 * Remove unnecessary information from telemetry by @tatiana in #2211 Bug fixes * Clarify ``ExecutionMode.WATCHER`` deferrable failure messaging by @pankajkoti in #2124 * Remove empty test tasks when all tests are detached by @anyapriya in #2010 * Fix forwarding ``DbtProducerWatcherOperator`` ``dbt build`` flags by @michal-mrazek in #2127 * Add databricks oauth mock profile by @fjmacagno in #2164 * Register listeners in Airflow 3 plugin implementation by @pankajastro in #2187 * Fix resolution of ``packages-install-path`` when it uses ``env_var`` by @tatiana in #2194 * Fix ``template_fields`` in ``DbtConsumerWatcherSensor`` to include ``DbtRunLocalOperator`` template_fields`` by @tiovader and @emanuel-luis in #2209 * Emit asset events in ExecutionMode.AIRFLOW_ASYNC mode by @pankajastro in #2184 * Remove dag_run_id from telemetry tests by @tatiana in #2213 Docs * Document dataset-event limitation when using ``ExecutionMode.AIRFLOW_ASYNC`` by @varaprasadregani in #2143 * Expand ``ExecutionMode.KUBERNETES`` guidance by @tatiana in #2139 * Add docs for deferrable ``DbtConsumerWatcherSensor`` by @pankajastro in #2115 * Fix reStructuredText formatting by @dnskr in #2132 * Add docs for ``setup_operator_args`` param by @pankajastro in #2136 * Remove experimental flag for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajastro in #2153 * Clarify ``ExecutionMode.AIRFLOW_ASYNC`` dataset limits by @pankajkoti in #2167 * Update PRIVACY_NOTICE.rst by @tatiana in #2212 Others * Drop Python 3.9 support by @pankajastro in #2118 * Drop Airflow 2.4 support by @pankajastro in #2161 * Drop Airflow 2.5 support by @pankajastro in #2165 * Improve example DAG ``jaffle_shop_kubernetes.py`` by @tatiana in #2140 * Enable tests for Python 3.13 by @pankajastro in #2154 * Add Python 3.12 to CI integration tests matrix by @pankajastro in #2168 * Retry flaky Telemetry success test to stabilise CI by @pankajkoti in #2138 * Drop unused producer state xcom handling in ``ExecutionMode.WATCHER`` by @pankajkoti in #2145 * Remove unused Python3.9 uses from Github action CI by @pankajastro in #2117 * Run pre-commit on ``ExecutionMode.WATCHER`` modules by @pankajkoti in #2150 * Refactor: Use shared airflow version constant by @pankajkoti in #2157 * Pin ``pydantic<2.0`` for Airflow 2.6 compatibility by @pankajastro in #2172 * Remove duplicate ``dbt-duckdb`` dependency by @pankajastro in #2170 * Add targeted ``type: ignore`` for untyped decorators to fix ``mypy`` errors by @pankajastro in #2174 * Replace Legacy typing Aliases with Built-in Types for Python 3.10+ by @pankajastro in #2175 * Refactor to reuse ``load_method_from_module`` from ``_utils/importer.py`` by @pankajastro in #2176 * Remove try except block for cache import and unused python_version variable by @pankajastro in #2186 * Unpin Airflow to satisfy GitHub Security tab requirements by @pankajastro in #2171 * Update Python version for ``pyupgrade`` in ``pre-commit`` config by @pankajastro in #2190 * Add cooldown config in ``dependabot`` config by @pankajastro in #2189 * Adjust pre-commit so Python 3.10 or higher can be used by @tatiana in #2196 * Remove empty variables emission from telemetry metrics by @pankajkoti in #2197 * Reformat documented comments for historical URL formats by @pankajkoti in #2199 * Bump ``actions/checkout`` from ``5.0.0`` to ``5.0.1`` by @dependabot in #2135 * Bump ``actions/checkout`` to ``6.0.0`` in GitHub workflows by @dependabot in #2147 * Bump ``zizmorcore/zizmor-action`` from ``0.2.0`` to ``0.3.0`` by @dependabot in #2156 * Bump ``actions/checkout`` from ``5.0.1`` to ``6.0.0`` by @dependabot in #2155 * Bump ``actions/checkout`` from ``6.0.0`` to ``6.0.1`` by @dependabot in #2178 * Bump ``codecov/codecov-action`` from ``5.5.1`` to ``5.5.2`` by @dependabot in #2208 * pre-commit autoupdate by @pre-commit-ci[bot] in #2134, #2162, #2173, #2191, #2202 closes: astronomer/oss-integrations-private#275
Ensure
DbtConsumerWatcherSensoruses the Airflow context to fail fast when the watcher producer ends in a failure state.related: #2086