Remove forced retries=0 from watcher producer operators#2479
Conversation
Since v1.12.2 (da39a56), the producer task skips execution gracefully on retry (try_number > 1) instead of failing. Retrying is therefore safe and harmless, so there is no longer a reason to override the user-supplied retries value to 0. Users can now configure retries freely on DbtProducerWatcherOperator and DbtProducerWatcherKubernetesOperator. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
retries=0 from watcher producer operators
There was a problem hiding this comment.
Pull request overview
This PR removes Cosmos’s hard-coded override that forced retries=0 on WATCHER producer operators, based on the newer behavior where producer retries (try_number > 1) skip re-execution instead of failing. It also updates documentation and removes now-obsolete tests asserting retries were forced to zero.
Changes:
- Removed logic that overwrote user-supplied
retries/default_args["retries"]inDbtProducerWatcherOperatorandDbtProducerWatcherKubernetesOperator. - Updated WATCHER execution mode documentation to state producer retries are configurable and safe.
- Deleted unit tests that asserted retries were always forced to
0.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
cosmos/operators/watcher.py |
Removes producer init-time mutation of retries/default_args; producer now inherits user/DAG retry config. |
cosmos/operators/watcher_kubernetes.py |
Same removal for the Kubernetes WATCHER producer. |
docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst |
Updates guidance to say retries are no longer forced to 0, and provides an example with retries: 3. |
tests/operators/test_watcher.py |
Removes tests that asserted retries were forced to 0. |
tests/operators/test_watcher_kubernetes_unit.py |
Removes tests that asserted retries were forced to 0. |
Comments suppressed due to low confidence (2)
cosmos/operators/watcher_kubernetes.py:93
- With retries no longer forced to 0, this operator can inherit DAG-level
retriesand be automatically retried. Becauseexecute()skips and returns success fortry_number > 1, a first-attempt failure that didn’t emit model status XComs can be converted into a successful producer run and cause downstream watcher tasks to reportmodel_not_runas success. Consider guarding the skip-on-retry behavior behind a “prior attempt completed and published results” sentinel, or keep producer retries disabled by default to avoid masking failures.
def __init__(self, *args: Any, **kwargs: Any) -> None:
task_id = kwargs.pop("task_id", "dbt_producer_watcher_operator")
existing_callbacks = kwargs.get("callbacks")
if existing_callbacks is None:
normalized_callbacks: list[Any] = []
elif isinstance(existing_callbacks, (list, tuple)):
normalized_callbacks = list(existing_callbacks)
else:
normalized_callbacks = [existing_callbacks]
normalized_callbacks.append(WatcherKubernetesCallback)
kwargs["callbacks"] = normalized_callbacks
super().__init__(task_id=task_id, *args, **kwargs)
tests/operators/test_watcher_kubernetes_unit.py:90
- After removing the retries-forced-to-0 behavior, there’s no longer a unit test asserting that user-supplied
retriesis respected forDbtProducerWatcherKubernetesOperator. Add a small initialization test that setsretries(and/ordefault_args={'retries': ...}) and asserts the operator keeps that value, so this behavior doesn’t regress.
render_config = RenderConfig(load_method=LoadMode.DBT_MANIFEST, test_behavior=TestBehavior.NONE)
@patch("cosmos.operators.kubernetes.DbtBuildKubernetesOperator.execute")
def test_skips_retry_attempt(mock_execute, caplog):
"""
Test that the operator skips execution when a retry is attempted (try_number > 1).
"""
op = DbtProducerWatcherKubernetesOperator(
project_dir=".",
profile_config=None,
image="dbt-image:latest",
)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pankajkoti
left a comment
There was a problem hiding this comment.
LGTM. Couple of questions in-line on the documentation.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2479 +/- ##
==========================================
- Coverage 98.05% 98.04% -0.01%
==========================================
Files 103 103
Lines 7238 7229 -9
==========================================
- Hits 7097 7088 -9
Misses 141 141 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self.log.info( | ||
| "Dbt WATCHER producer task does not support Airflow retries. " | ||
| "Detected attempt #%s; skipping execution to avoid running a second dbt build.", | ||
| try_number, | ||
| ) |
There was a problem hiding this comment.
The retry log message says the producer "does not support Airflow retries", but this PR’s intent/documentation is that retries are allowed and will be treated as a no-op (skip dbt build and succeed). Consider rewording this message to clarify that retries are safe/allowed but execution is intentionally skipped to prevent duplicate dbt builds.
| @@ -192,11 +192,6 @@ def test_dbt_producer_watcher_operator_priority_weight_override(): | |||
| assert op.priority_weight == 100 | |||
|
|
|||
|
|
|||
There was a problem hiding this comment.
The tests that asserted retries are forced to 0 were removed, but there isn’t a replacement assertion that user-supplied retry settings are now preserved. Add a unit test that initializes the producer with explicit retries (and/or default_args['retries']) and asserts the operator’s retries reflects the user-provided value (and that the input default_args dict is not mutated).
| def test_dbt_producer_watcher_operator_preserves_explicit_retries(): | |
| """User-supplied retries argument should be preserved on the operator.""" | |
| op = DbtProducerWatcherOperator(project_dir=".", profile_config=None, retries=5) | |
| assert op.retries == 5 | |
| def test_dbt_producer_watcher_operator_preserves_default_args_retries_and_does_not_mutate(): | |
| """ | |
| When retries is provided via default_args, the operator should respect it | |
| and not mutate the caller's default_args dict. | |
| """ | |
| default_args = {"retries": 7} | |
| original_default_args = default_args.copy() | |
| op = DbtProducerWatcherOperator(project_dir=".", profile_config=None, default_args=default_args) | |
| assert op.retries == 7 | |
| # Ensure the original dict passed in was not mutated | |
| assert default_args == original_default_args |
| @@ -77,31 +77,6 @@ | |||
| render_config = RenderConfig(load_method=LoadMode.DBT_MANIFEST, test_behavior=TestBehavior.NONE) | |||
|
|
|||
|
|
|||
There was a problem hiding this comment.
The unit tests that validated DbtProducerWatcherKubernetesOperator forced retries=0 were removed, but there isn’t a new test validating the updated behavior (that user-configured retries are retained). Add a unit test that passes a non-zero retries (and/or via default_args) and asserts op.retries matches the provided value.
| def test_producer_watcher_respects_configured_retries(): | |
| """ | |
| Ensure that DbtProducerWatcherKubernetesOperator preserves a non-zero user-configured | |
| retries value instead of forcing retries=0. | |
| """ | |
| op = DbtProducerWatcherKubernetesOperator( | |
| task_id="test_retries", | |
| project_dir=".", | |
| profile_config=None, | |
| image="dbt-image:latest", | |
| retries=3, | |
| ) | |
| assert op.retries == 3 |
1.14.0 (2026-04-07) --------------------- Breaking Changes * Drop support for Airflow versions earlier than **2.9** by @jedcunningham in #2288 * Fix inclusion of package models and selection/exclusion behavior by @pankajkoti in #2357 * ``ExecutionMode.WATCHER``: The per-node ``*_status`` XCom value is now a dict (``{"status": "<status>", "outlet_uris": [...]}``) instead of a plain string. Any custom code that reads these internal XCom keys directly will need to be updated by @pankajkoti in #2507 Features * Add cluster policy support for ``ExecutionMode.WATCHER`` sensor retries by @astro-anand in #2293 * Add debug mode to track memory utilization by @tatiana in #2327 * Add FQN selection support for ``LoadMode.DBT_MANIFEST`` by @pankajastro in #2375 * Introduce interceptors for Cosmos tasks by @tatiana in #2419 * Add config to allow disabling dag versioning by @pankajkoti in #2470 * Implement TaskGroups by models folder by @maximilianoarcieri and @tatiana in #1566, #2469, and #2420 * feat: implement DbtTestWatcherOperator by @michal-mrazek in #2447 * Add source freshness aware execution for ``ExecutionMode.WATCHER`` by @pankajastro and @tatiana in #2467 * Note: Like ``ExecutionMode.WATCHER``, this feature is experimental and its interface and implementation can change in the future. * Add Airflow 3.2 support by @pankajastro and @pankajkoti in #2472 Enhancements * Add watcher mode support for dbt test node states by @michal-mrazek in #2318 * Rename watcher-mode sensor retry queue and reuse it for producer tasks by @pankajastro in #2331 * Fix leaked semaphore warnings in Airflow 3 by resetting dbt adapters by @pankajkoti in #2335 * Improve dbt Fusion support and related tests by @tatiana in #2356 * Default Snowflake profile mappings to four threads by @tatiana in #2374 * Attempt to remove Pydantic as a dependency by @tatiana in #2377 * Log dbt-core and adapter versions in watcher consumer tasks by @pankajastro in #2412 * Log model errors in watcher consumer on dbt node failure by @pankajastro in #2431 * Reduce XCom read/write for tracking node state and errors in ConsumerWatcher task by @pankajastro in #2471 * Remove duplicate debug log in watcher subprocess path by @tatiana in #2494 * Simplify and unify WATCHER implementation regardless of InvocationMode by @tatiana in #2498 * Switch to lazy imports in cosmos/__init__.py by @pankajkoti in #2531 Bug Fixes * Handle invalid YAML errors with ``LoadMode.DBT_MANIFEST`` and ``RenderConfig.selector`` by @YourRoyalLinus in #2316 * Populate ``compiled_sql`` for ``InvocationMode.SUBPROCESS`` in ``ExecutionMode.WATCHER`` by @pankajkoti in #2319 * Fix select/exclude type mismatch by @tatiana in #2364 * Set ``emit_datasets=False`` for ``DbtTest*`` operators by @pankajastro in #2365 * Set correct queue priority for watcher producer tasks by @pankajastro in #2372 * Preserve ``extra_context`` for watcher consumer task instances by @pankajkoti in #2381 * Respect ``deferrable=False`` from ``operator_args`` on watcher consumer sensors by @pankajkoti in #2384 * Fix watcher queue precedence and add documentation by @pankajastro in #2391 * Do not set ``compiled_sql`` on ``ExecutionMode.WATCHER`` producers by @pankajkoti in #2440 * Remove const attribute for ``__cosmos_telemetry_metadata__`` dag param by @pankajkoti in #2466 * Remove timeout override from Cosmos watcher sensors by @tatiana and @claude in #2478 * Remove forced ``retries=0`` from watcher producer operators by @tatiana in #2479 * RFC: Add patch for newer versions of amazon provider when running dbt on EKS by @aoelvp94 in #2481 * Fix ``cosmos_debug_max_memory_mb`` XCom not pushed in Watcher sensor tasks by @tatiana in #2503 * Fix ``TestBehavior.NONE`` and ``TestBehavior.AFTER_ALL`` exclude ignored with selectors in ``ExecutionMode.WATCHER`` by @pankajkoti in #2511 * Move dataset emission for ``ExecutionMode.WATCHER`` from producer to consumer sensors by @pankajkoti in #2507 Docs * Document cluster policy configuration for ``ExecutionMode.WATCHER`` sensor tasks by @pankajastro in #2315 * Remove outdated docs for the dbt docs plugin with Airflow 3 by @pankajastro in #2353 * Make Watcher DBT Execution Queue heading clickable by @pankajastro in #2354 * Update ``ExecutionMode.WATCHER`` documentation regarding test node implementation by @jroachgolf84 in #2355 * Fix ``pre_dbt_fusion`` configuration rendering by @pankajastro in #2369 * Add documentation for including/excluding nodes based on FQN by @pankajastro in #2371 * Update watcher execution mode documentation by @tatiana in #2380 * Add documentation for ``DbtSeedLocalOperator`` by @jroachgolf84 in #2383 * Fix miscellaneous Sphinx warnings by @pankajastro in #2395 * Improve contributing documentation by @lzdanski in #2397 * Add **Get Started in 5 Minutes** guide by @lzdanski in #2398 * Add Sphinx redirects package for documentation redirects by @lzdanski in #2407 * Restructure **Getting Started** and **Guides** sections by @lzdanski in #2418 * Add open-source quickstart by @lzdanski in #2439 * Fix documentation redirects by @lzdanski in #2442 * Restructure and refactor reference documentation by @lzdanski in #2443 * Add execution modes decision documentation by @lzdanski in #2444 * Add **Core Concepts** page to Getting Started by @lzdanski in #2448 * Add guide: *How Cosmos Works* by @lzdanski in #2449 * Update **Getting Started** overview and index pages by @lzdanski in #2452 * Add guide: *How Cosmos Runs dbt* by @lzdanski in #2453 * Fix miscellaneous documentation links by @lzdanski in #2454 * Add Mermaid diagrams and execution mode diagrams by @lzdanski and @tatiana in #2459 * Add documentation for memory optimization options by @pankajastro in #2340 * Fix typo in watcher execution mode docs by @evanvolgas in #2485 * Fix minor documentation issues by @evanvolgas in #2489 * Add troubleshooting note for dbt debug logs in ExecutionMode.WATCHER by @tatiana in #2491 * docs: unify RST header styles across documentation by @jigangz in #2473 * docs: fix env var for rich logging by @vricciardulli in #2514 * docs: update dbt project path example for Airflow 3 Astro compatibility by @yeoreums in #2512 * Document missing Cosmos Airflow config settings in cosmos-conf.rst by @tatiana in #2515 * Split security-privacy policy doc and add dependency cooldown by @pankajkoti in #2519 * Add performance optimization and troubleshooting docs by @pankajkoti in #2521 * Update copyright year to 2026 by @tayloramurphy in #2527 * docs: Updating "Project Policies" to "Policies" in menu bar by @jroachgolf84 in #2526 Others * Fix tests after removing support for Airflow versions earlier than 2.9 by @tatiana in #2321 * Enable listener tests for Airflow 3.1 by @pankajastro in #2348 * Accept ``int`` or ``float`` for ``cosmos_debug_max_memory_mb`` in integration tests by @pankajkoti in #2352 * Update ``CODEOWNERS`` to prioritize ``oss-integrations`` by @tatiana in #2359 * Fix automatic reviewer assignment in GitHub by @tatiana and @phanikumv in #2360 * Improve PyPI tagging by @tatiana in #2363 * Add integration tests for dbt Fusion and ``ExecutionMode.WATCHER`` by @tatiana in #2373 * Fix Zizmor check by @tatiana in #2376 * Remove ``methodtools`` dependency by @tatiana in #2378 * Improve comments on #2389 by @evanvolgas in #2394 * Refactor ``load_from_dbt_manifest`` to reduce code complexity by @pankajkoti in #2399 * Refactor ``_handle_no_precursors_or_descendants`` to reduce complexity by @pankajkoti in #2400 * Improve issue templates by @tatiana in #2401 * Avoid running tests when only docs change by @tatiana in #2402 * Add ``no-reload`` target for serving docs locally by @pankajkoti in #2405 * Fix test hash checks on macOS by @tatiana in #2406 * Attempt deterministic dbt project copy in test fixtures by @pankajkoti in #2409 * Pin ``virtualenv <21`` due to hatch incompatibility in CI by @pankajkoti in #2410 * Revert virtualenv pin for hatch installation in CI by @pankajkoti in #2426 * Add version comments for commit SHA pinned GitHub Actions by @pankajkoti in #2436 * Fix ``hatch run docs:build`` issues by @tatiana in #2437 * Minor code improvements by @dnskr in #2446 * Pre-commit autoupdate by @pre-commit-ci in #2367, #2396, #2422, #2451, #2468, #2495, and #2516 * Add file to support Claude understanding the Cosmos repository by @tatiana in #2458 * Dependency updates by @dependabot in #2368, #2425, #2435, #2465, #2475, #2504, #2518, and #2528 * Isolate Scarf telemetry integration test into its own CI job by @pankajkoti and @claude in #2477 * ci: upgrade Airflow version to 3.1 in MyPy type-check job by @yeoreums in #2506 * Add commit message guidelines to CLAUDE.md by @pankajkoti in #2509 * Extend skipping tests in CI for more non-code file changes by @pankajkoti in #2510 * Add Dependabot pre-commit support with 7-day cooldown by @pankajkoti in #2517 * Enforce zero warnings policy for documentation by @dnskr in #2513 Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com> Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com> Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
Since v1.12.2 (da39a56), the producer task "skips" (does not fail) execution gracefully on retry (
try_number > 1) instead of failing. Retrying is therefore safe and harmless, so there is no longer a reason to override the user-supplied retries value to 0. Users can now configure retries freely onDbtProducerWatcherOperatorandDbtProducerWatcherKubernetesOperator.Closes: #2429
Co-Authored-By: Claude Sonnet 4.6 noreply@anthropic.com