Force watcher producer retries to zero#2114
Conversation
6f0d808 to
fe6aa31
Compare
✅ Deploy Preview for sunny-pastelito-5ecb04 ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
✅ Deploy Preview for sunny-pastelito-5ecb04 canceled.
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2114 +/- ##
==========================================
- Coverage 97.81% 97.80% -0.02%
==========================================
Files 92 92
Lines 5948 5961 +13
==========================================
+ Hits 5818 5830 +12
- Misses 130 131 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull Request Overview
This PR ensures that DbtProducerWatcherOperator never auto-retries by forcing its retries parameter to zero. The rationale is that consumer watcher sensors handle model-level retries individually using LOCAL execution mode, and re-running the producer would duplicate the full dbt build and create duplicate watcher callbacks that consumers may not process correctly if they've already handled XCOMs from the first run.
- Forces
retriesto 0 in the operator's__init__method, overriding any user-provided values - Documents the behavior in the WATCHER execution mode guide
- Adds test coverage for the forced retry behavior
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| cosmos/operators/watcher.py | Adds logic to force retries and default_args["retries"] to 0 with inline explanation |
| docs/getting_started/watcher-execution-mode.rst | Documents why producer retries are disabled and explains the consumer-level retry strategy |
| tests/operators/test_watcher.py | Adds two tests verifying that retries are forced to 0 regardless of user input |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
39b4f83 to
0bee20a
Compare
tatiana
left a comment
There was a problem hiding this comment.
Great work, @pankajkoti , thank you!
The PR makes the following changes: - hard-set `DbtProducerWatcherOperator` `retries` (and `default_args["retries"]`) to zero so WATCHER never auto-retries the full dbt build - Log at INFO that producer retries are disabled, and fail fast if Airflow attempts a retry (try number > 1), improving user visibility into the behavior. - explain the rationale inline (sensor retry path handles per-model reruns) - document behavior in the WATCHER guide and add tests covering the forced retry value closes: #2105 --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> (cherry picked from commit b58778b)
Bug fixes * Force ``DbtProducerWatcherOperator`` retries to zero by @pankajkoti in #2114 * Fail ``DbtConsumerWatcherSensor`` tasks immediately when the ``DbtProducerWatcherOperator`` fails using Airflow context by @pankajkoti in #2126 * Fix forwarding ``DbtProducerWatcherOperator`` ``dbt build`` flags by by @michal-mrazek in #2127 Documentation * Expand ``ExecutionMode.KUBERNETES`` guidance by @tatiana in #2139 * Document dataset-event limitation when using ``ExecutionMode.AIRFLOW_ASYNC`` by @varaprasadregani in #2143 related: astronomer/oss-integrations-private#274
Bug fixes * Force ``DbtProducerWatcherOperator`` retries to zero by @pankajkoti in #2114 * Fail ``DbtConsumerWatcherSensor`` tasks immediately when the ``DbtProducerWatcherOperator`` fails using Airflow context by @pankajkoti in #2126 * Fix forwarding ``DbtProducerWatcherOperator`` ``dbt build`` flags by @michal-mrazek in #2127 Documentation * Expand ``ExecutionMode.KUBERNETES`` guidance by @tatiana in #2139 * Document dataset-event limitation when using ``ExecutionMode.AIRFLOW_ASYNC`` by @varaprasadregani in #2143 related: astronomer/oss-integrations-private#274 --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Breaking changes * Introduced in the PR #2080. The following functions are expected to be used internally only to Cosmos, so we hope these won't impact end-users, but we are documenting the changes just in case: - ``generate_task_or_group`` receives ``render_config`` instead of its individual configurations, such as ``test_behavior``, ``source_rendering_behavior`` and ``enable_owner_inheritance`` - ``create_task_metadata`` receives ``render_config`` instead of its individual configurations, such as ``test_behavior``, ``source_rendering_behavior`` and ``enable_owner_inheritance`` - ``create_task_metadata`` now expects the ``node_converters`` argument * Drop Python 3.9 support by @pankajastro in #2118 * Drop Airflow 2.4 support by @pankajastro in #2161 * Drop Airflow 2.5 support by @pankajastro in #2165 Features * Support applying ``node_converter`` at a task level instead of task group level by @anyapriya in #1759 * Allow overriding ``DbtProducerWatcherOperator`` parameters via ``ExecutionConfig.setup_operator_args`` by @pankajastro in #2133 * Use deferrable sensors by default in ``ExecutionMode.WATCHER`` by @pankajastro in #2084 * Support real-time consumer updates when using ``ExecutionMode.WATCHER`` and ``InvocationMode.SUBPROCESS`` by @pankajastro in #2152 * Update telemetry to v3 format with query parameters by @pankajkoti in #2192 * Add initial set of telemetry task listener metrics for Cosmos operators by @pankajkoti in #2195 Enhancements * Unify Airflow version handling into ``constants.py`` by @tatiana in #2089 * Refactor ``airflow/graph.py`` to simplify the code base by @tatiana in #2080 * Force watcher producer retries to zero by @pankajkoti in #2114 * Fail ``ExecutionMode.WATCHER`` consumer sensors immediately when the producer fails using Airflow context by @pankajkoti in #2126 * ``ExecutonMode.WATCHER``: fetch producer status asynchronously from the Airflow runtime so deferrable sensors fail immediately when the producer task fails by @pankajkoti in #2144 * Refactor ``ExecutionMode.WATCHER`` ``InvocationMode.SUBPROCESS`` log parser by @tatiana in #2183 * Replace map_index with is_mapped_task boolean in task telemetry metrics by @pankajkoti in #2210 * Collect cosmos profile metrics in task telemetry metrics by @pankajastro in #2198 * Remove unnecessary information from telemetry by @tatiana in #2211 Bug fixes * Clarify ``ExecutionMode.WATCHER`` deferrable failure messaging by @pankajkoti in #2124 * Remove empty test tasks when all tests are detached by @anyapriya in #2010 * Fix forwarding ``DbtProducerWatcherOperator`` ``dbt build`` flags by @michal-mrazek in #2127 * Add databricks oauth mock profile by @fjmacagno in #2164 * Register listeners in Airflow 3 plugin implementation by @pankajastro in #2187 * Fix resolution of ``packages-install-path`` when it uses ``env_var`` by @tatiana in #2194 * Fix ``template_fields`` in ``DbtConsumerWatcherSensor`` to include ``DbtRunLocalOperator`` template_fields`` by @tiovader and @emanuel-luis in #2209 * Emit asset events in ExecutionMode.AIRFLOW_ASYNC mode by @pankajastro in #2184 * Remove dag_run_id from telemetry tests by @tatiana in #2213 Docs * Document dataset-event limitation when using ``ExecutionMode.AIRFLOW_ASYNC`` by @varaprasadregani in #2143 * Expand ``ExecutionMode.KUBERNETES`` guidance by @tatiana in #2139 * Add docs for deferrable ``DbtConsumerWatcherSensor`` by @pankajastro in #2115 * Fix reStructuredText formatting by @dnskr in #2132 * Add docs for ``setup_operator_args`` param by @pankajastro in #2136 * Remove experimental flag for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajastro in #2153 * Clarify ``ExecutionMode.AIRFLOW_ASYNC`` dataset limits by @pankajkoti in #2167 * Update PRIVACY_NOTICE.rst by @tatiana in #2212 Others * Drop Python 3.9 support by @pankajastro in #2118 * Drop Airflow 2.4 support by @pankajastro in #2161 * Drop Airflow 2.5 support by @pankajastro in #2165 * Improve example DAG ``jaffle_shop_kubernetes.py`` by @tatiana in #2140 * Enable tests for Python 3.13 by @pankajastro in #2154 * Add Python 3.12 to CI integration tests matrix by @pankajastro in #2168 * Retry flaky Telemetry success test to stabilise CI by @pankajkoti in #2138 * Drop unused producer state xcom handling in ``ExecutionMode.WATCHER`` by @pankajkoti in #2145 * Remove unused Python3.9 uses from Github action CI by @pankajastro in #2117 * Run pre-commit on ``ExecutionMode.WATCHER`` modules by @pankajkoti in #2150 * Refactor: Use shared airflow version constant by @pankajkoti in #2157 * Pin ``pydantic<2.0`` for Airflow 2.6 compatibility by @pankajastro in #2172 * Remove duplicate ``dbt-duckdb`` dependency by @pankajastro in #2170 * Add targeted ``type: ignore`` for untyped decorators to fix ``mypy`` errors by @pankajastro in #2174 * Replace Legacy typing Aliases with Built-in Types for Python 3.10+ by @pankajastro in #2175 * Refactor to reuse ``load_method_from_module`` from ``_utils/importer.py`` by @pankajastro in #2176 * Remove try except block for cache import and unused python_version variable by @pankajastro in #2186 * Unpin Airflow to satisfy GitHub Security tab requirements by @pankajastro in #2171 * Update Python version for ``pyupgrade`` in ``pre-commit`` config by @pankajastro in #2190 * Add cooldown config in ``dependabot`` config by @pankajastro in #2189 * Adjust pre-commit so Python 3.10 or higher can be used by @tatiana in #2196 * Remove empty variables emission from telemetry metrics by @pankajkoti in #2197 * Reformat documented comments for historical URL formats by @pankajkoti in #2199 * Bump ``actions/checkout`` from ``5.0.0`` to ``5.0.1`` by @dependabot in #2135 * Bump ``actions/checkout`` to ``6.0.0`` in GitHub workflows by @dependabot in #2147 * Bump ``zizmorcore/zizmor-action`` from ``0.2.0`` to ``0.3.0`` by @dependabot in #2156 * Bump ``actions/checkout`` from ``5.0.1`` to ``6.0.0`` by @dependabot in #2155 * Bump ``actions/checkout`` from ``6.0.0`` to ``6.0.1`` by @dependabot in #2178 * Bump ``codecov/codecov-action`` from ``5.5.1`` to ``5.5.2`` by @dependabot in #2208 * pre-commit autoupdate by @pre-commit-ci[bot] in #2134, #2162, #2173, #2191, #2202 closes: astronomer/oss-integrations-private#275
Aims to overcome retries at a TaskGroup level, is reported in: #2282 During retries, the watcher will not execute any operation, but will not fail. We are intentionally changing the behaviour previously implemented in #2114. The behaviour will be improved once the following ticket is implemented: #1978 Closes: #2282
The PR makes the following changes:
DbtProducerWatcherOperatorretries(anddefault_args["retries"]) to zero so WATCHER never auto-retries the full dbt buildcloses: #2105