diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index bd0301990b..077822f514 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -106,13 +106,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: kwargs["should_store_compiled_sql"] = False kwargs.setdefault("priority_weight", PRODUCER_WATCHER_DEFAULT_PRIORITY_WEIGHT) kwargs.setdefault("weight_rule", WATCHER_TASK_WEIGHT_RULE) - # Consumer watcher retry logic handles model-level reruns using the LOCAL execution mode; rerunning the producer - # would repeat the full dbt build and duplicate watcher callbacks which may not be processed by the consumers if - # they have already processed output XCOMs from the first run of the producer, so we disable retries. - default_args = dict(kwargs.get("default_args", {}) or {}) - default_args["retries"] = 0 - kwargs["default_args"] = default_args - kwargs["retries"] = 0 kwargs["queue"] = watcher_dbt_execution_queue or kwargs.get("queue") or DEFAULT_QUEUE super().__init__(task_id=task_id, *args, **kwargs) @@ -193,11 +186,6 @@ def execute(self, context: Context, **kwargs: Any) -> Any: ) return None - self.log.info( - "Dbt WATCHER producer task forces Airflow retries to 0 so the dbt build only runs once; " - "downstream sensors own model-level retries." - ) - try: use_events = self.invocation_mode == InvocationMode.DBT_RUNNER and EventMsg is not None logger.debug("DbtProducerWatcherOperator: use_events=%s", use_events) diff --git a/cosmos/operators/watcher_kubernetes.py b/cosmos/operators/watcher_kubernetes.py index 78f892914b..76b4da4e18 100644 --- a/cosmos/operators/watcher_kubernetes.py +++ b/cosmos/operators/watcher_kubernetes.py @@ -81,12 +81,6 @@ class DbtProducerWatcherKubernetesOperator(DbtBuildKubernetesOperator): def __init__(self, *args: Any, **kwargs: Any) -> None: task_id = kwargs.pop("task_id", "dbt_producer_watcher_operator") - # Disable retries on producer task - default_args = dict(kwargs.get("default_args", {}) or {}) - default_args["retries"] = 0 - kwargs["default_args"] = default_args - kwargs["retries"] = 0 - existing_callbacks = kwargs.get("callbacks") if existing_callbacks is None: normalized_callbacks: list[Any] = [] diff --git a/docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst b/docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst index 355cef891a..243ce0702b 100644 --- a/docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst +++ b/docs/guides/run_dbt/airflow-worker/watcher-execution-mode.rst @@ -240,9 +240,7 @@ This behavior is designed to support TaskGroup-level retries, as reported in `#2 **Important considerations:** -- The producer task should still be configured with ``retries=0`` (which Cosmos enforces by default) to avoid unintended duplicate ``dbt build`` runs. - -- By default, Cosmos sets ``retries`` to ``0`` in``DbtProducerWatcherOperator``. Users can retry manually by clearing the status of the producer task and all its downstream tasks, keeping in mind that the producer task will not re-run the ``dbt build`` command and will succeed. +- Retries are no longer forced to ``0`` by Cosmos, since 1.14.0. Users may configure ``retries`` freely on the producer task. On any retry attempt (``try_number > 1``), the producer gracefully skips execution and returns success — it will not re-run the ``dbt build`` command. This means retrying the producer (or clearing an entire TaskGroup) is safe and will not cause duplicate dbt builds. During the retry of the sensor tasks, they will effectively run the correspondent dbt commands. The overall retry behavior will be further improved once `#1978 `_ is implemented. @@ -406,6 +404,8 @@ When using ``ExecutionMode.WATCHER``, you may want to configure specific propert - Reduced manual intervention - failed producer runs can recover without requiring operator restarts. - Better reliability - retry behavior can be tuned independently from sensor tasks. +Because the producer gracefully skips re-execution on retries (returning success without re-running the dbt build), it is safe to set ``retries`` to any value you wish. Currently, retries work for sensor tasks, which, after a first failed attempt, will effectively run the corresponding dbt command. + Example: Configure the producer task with custom retry settings. .. code-block:: python @@ -417,7 +417,7 @@ Example: Configure the producer task with custom retry settings. execution_config = ExecutionConfig( execution_mode=ExecutionMode.WATCHER, setup_operator_args={ - "retries": 0, + "retries": 3, "retry_delay": timedelta(minutes=5), }, ) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 6b89f0ff9b..261fe5c94d 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -192,11 +192,6 @@ def test_dbt_producer_watcher_operator_priority_weight_override(): assert op.priority_weight == 100 -def test_dbt_producer_watcher_operator_retries_forced_to_zero(): - op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) - assert op.retries == 0 - - @pytest.mark.parametrize( "invocation_mode, expected_log_format", ( @@ -209,19 +204,6 @@ def test_dbt_producer_log_format_adjusts_with_invocation(invocation_mode, expect assert getattr(op, "log_format", None) == expected_log_format -def test_dbt_producer_watcher_operator_retries_ignores_user_input(): - user_default_args = {"retries": 5} - op = DbtProducerWatcherOperator( - project_dir=".", - profile_config=None, - default_args=user_default_args, - retries=3, - ) - - assert op.retries == 0 - assert user_default_args["retries"] == 5 - - def test_dbt_producer_watcher_operator_pushes_completion_status(): """Test that operator pushes 'completed' status to XCom in both success and failure cases.""" op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) @@ -301,20 +283,6 @@ def test_dbt_consumer_watcher_sensor_execute_complete_model_not_run_logs_message assert any("ephemeral model or if the model sql file is empty" in message for message in caplog.messages) -def test_dbt_producer_watcher_operator_logs_retry_message(caplog): - op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) - ti = _MockTI() - ti.try_number = 1 - context = {"ti": ti} - - with patch("cosmos.operators.local.DbtLocalBaseOperator.execute", return_value="ok") as mock_execute: - with caplog.at_level(logging.INFO): - op.execute(context=context) - - mock_execute.assert_called_once() - assert any("forces Airflow retries to 0" in message for message in caplog.messages) - - def test_dbt_producer_watcher_operator_skips_retry_attempt(caplog): op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) ti = _MockTI() diff --git a/tests/operators/test_watcher_kubernetes_unit.py b/tests/operators/test_watcher_kubernetes_unit.py index edc06cbddb..5ba35845e5 100644 --- a/tests/operators/test_watcher_kubernetes_unit.py +++ b/tests/operators/test_watcher_kubernetes_unit.py @@ -77,31 +77,6 @@ render_config = RenderConfig(load_method=LoadMode.DBT_MANIFEST, test_behavior=TestBehavior.NONE) -def test_retries_set_to_zero_on_init(): - """ - Test that the operator sets retries to 0 during initialization. - """ - op = DbtProducerWatcherKubernetesOperator( - project_dir=".", - profile_config=None, - image="dbt-image:latest", - ) - assert op.retries == 0 - - -def test_retries_overridden_even_if_user_sets_them(): - """ - Test that even if a user explicitly sets retries, they are overridden to 0. - """ - op = DbtProducerWatcherKubernetesOperator( - project_dir=".", - profile_config=None, - image="dbt-image:latest", - retries=5, - ) - assert op.retries == 0 - - @patch("cosmos.operators.kubernetes.DbtBuildKubernetesOperator.execute") def test_skips_retry_attempt(mock_execute, caplog): """