diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 29696fadc0..3a14b9eae9 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -155,12 +155,12 @@ def execute(self, context: Context, **kwargs: Any) -> Any: try_number = getattr(task_instance, "try_number", 1) if try_number > 1: - retry_message = ( + self.log.info( "Dbt WATCHER producer task does not support Airflow retries. " - f"Detected attempt #{try_number}; failing fast to avoid running a second dbt build." + "Detected attempt #%s; skipping execution to avoid running a second dbt build.", + try_number, ) - self.log.error(retry_message) - raise AirflowException(retry_message) + return None self.log.info( "Dbt WATCHER producer task forces Airflow retries to 0 so the dbt build only runs once; " diff --git a/cosmos/operators/watcher_kubernetes.py b/cosmos/operators/watcher_kubernetes.py index 41c4df6248..67504590aa 100644 --- a/cosmos/operators/watcher_kubernetes.py +++ b/cosmos/operators/watcher_kubernetes.py @@ -104,12 +104,12 @@ def execute(self, context: Context, **kwargs: Any) -> Any: try_number = getattr(task_instance, "try_number", 1) if try_number > 1: - retry_message = ( + self.log.info( "DbtProducerWatcherKubernetesOperator does not support Airflow retries. " - f"Detected attempt #{try_number}; failing fast to avoid running a second dbt build." + "Detected attempt #%s; skipping execution to avoid running a second dbt build.", + try_number, ) - self.log.error(retry_message) - raise AirflowException(retry_message) + return None # This global variable is used to make the task context available to the K8s callback. # While the callback is set during the operator initialization, the context is only created during the operator's execution. diff --git a/docs/getting_started/watcher-execution-mode.rst b/docs/getting_started/watcher-execution-mode.rst index da966ce239..32af2eb399 100644 --- a/docs/getting_started/watcher-execution-mode.rst +++ b/docs/getting_started/watcher-execution-mode.rst @@ -219,15 +219,31 @@ How retries work ~~~~~~~~~~~~~~~~ When the ``dbt build`` command run by ``DbtProducerWatcherOperator`` fails, it will notify all the ``DbtConsumerWatcherSensor``. -Cosmos always sets the producer's Airflow task retries to ``0``; this ensures the failure surfaces immediately and avoids kicking off a second full ``dbt build`` run. The individual watcher tasks, that subclass ``DbtConsumerWatcherSensor``, can retry the dbt command by themselves using the same behaviour as ``ExecutionMode.LOCAL``. -This is also the reason why we set ``retries`` to ``0`` in the ``DbtProducerWatcherOperator`` task because rerunning the producer would repeat the full dbt build and duplicate -watcher callbacks which may not be processed by the consumers if they have already processed output XCOMs from the first run of the producer. If a branch of the DAG failed, users can clear the status of a failed consumer task, including its downstream tasks, via the Airflow UI - and each of them will run using the ``ExecutionMode.LOCAL``. -Currently, we do not support retrying the ``DbtProducerWatcherOperator`` task itself. +**Producer retry behavior** + +.. versionadded:: 1.12.2 + +When the ``DbtProducerWatcherOperator`` is triggered for a retry (try_number > 1), it will not re-run the dbt build command and will succeed. In previous versions of Cosmos, the producer task would fail during retries. +This behavior is designed to support TaskGroup-level retries, as reported in `#2282 `_. + +**Why this matters:** + +- In earlier versions, attempting to retry the producer task would raise an ``AirflowException``, causing the retry to fail immediately. +- Now, the producer gracefully skips execution on retries, logging an informational message explaining that the retry was skipped to avoid running a second ``dbt build``. +- This allows users to retry entire TaskGroups and/or DAGs without the producer task blocking the retry flow. + +**Important considerations:** + +- The producer task should still be configured with ``retries=0`` (which Cosmos enforces by default) to avoid unintended duplicate ``dbt build`` runs. + +- By default, Cosmos sets ``retries`` to ``0`` in``DbtProducerWatcherOperator``. Users can retry manually by clearing the status of the producer task and all its downstream tasks, keeping in mind that the producer task will not re-run the ``dbt build`` command and will succeed. + +The overall retry behavior will be further improved once `#1978 `_ is implemented. ------------------------------------------------------------------------------- diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index f631fa0022..282ec8126f 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -198,20 +198,20 @@ def test_dbt_producer_watcher_operator_logs_retry_message(caplog): assert any("forces Airflow retries to 0" in message for message in caplog.messages) -def test_dbt_producer_watcher_operator_blocks_retry_attempt(caplog): +def test_dbt_producer_watcher_operator_skips_retry_attempt(caplog): op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) ti = _MockTI() ti.try_number = 2 context = {"ti": ti} with patch("cosmos.operators.local.DbtLocalBaseOperator.execute") as mock_execute: - with caplog.at_level(logging.ERROR): - with pytest.raises(AirflowException) as excinfo: - op.execute(context=context) + with caplog.at_level(logging.INFO): + result = op.execute(context=context) mock_execute.assert_not_called() - assert "does not support Airflow retries" in str(excinfo.value) + assert result is None assert any("does not support Airflow retries" in message for message in caplog.messages) + assert any("skipping execution" in message for message in caplog.messages) @pytest.mark.parametrize( diff --git a/tests/operators/test_watcher_kubernetes_unit.py b/tests/operators/test_watcher_kubernetes_unit.py index 4ef9207cd6..3b167c731f 100644 --- a/tests/operators/test_watcher_kubernetes_unit.py +++ b/tests/operators/test_watcher_kubernetes_unit.py @@ -103,9 +103,9 @@ def test_retries_overridden_even_if_user_sets_them(): @patch("cosmos.operators.kubernetes.DbtBuildKubernetesOperator.execute") -def test_blocks_retry_attempt(mock_execute, caplog): +def test_skips_retry_attempt(mock_execute, caplog): """ - Test that the operator raises an AirflowException when a retry is attempted (try_number > 1). + Test that the operator skips execution when a retry is attempted (try_number > 1). """ op = DbtProducerWatcherKubernetesOperator( project_dir=".", @@ -117,13 +117,13 @@ def test_blocks_retry_attempt(mock_execute, caplog): ti.try_number = 2 context = {"ti": ti} - with caplog.at_level(logging.ERROR): - with pytest.raises(AirflowException) as excinfo: - op.execute(context=context) + with caplog.at_level(logging.INFO): + result = op.execute(context=context) mock_execute.assert_not_called() - assert "does not support Airflow retries" in str(excinfo.value) + assert result is None assert any("does not support Airflow retries" in message for message in caplog.messages) + assert any("skipping execution" in message for message in caplog.messages) def test_raises_exception_when_task_instance_missing():