From 013c81f4a773f47cb2c030651cfa22ce03744231 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 23 Jan 2026 12:55:16 +0000 Subject: [PATCH 1/5] Allow watcher producer retries without erroring Aims to overcome retries at a TaskGroup level, is reported in: https://github.com/astronomer/astronomer-cosmos/issues/2282 The behaviour will be improved once the following ticket is implemented: https://github.com/astronomer/astronomer-cosmos/issues/1978 --- cosmos/operators/watcher.py | 8 ++++---- cosmos/operators/watcher_kubernetes.py | 8 ++++---- tests/operators/test_watcher.py | 10 +++++----- tests/operators/test_watcher_kubernetes_unit.py | 12 ++++++------ 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 29696fadc0..3a14b9eae9 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -155,12 +155,12 @@ def execute(self, context: Context, **kwargs: Any) -> Any: try_number = getattr(task_instance, "try_number", 1) if try_number > 1: - retry_message = ( + self.log.info( "Dbt WATCHER producer task does not support Airflow retries. " - f"Detected attempt #{try_number}; failing fast to avoid running a second dbt build." + "Detected attempt #%s; skipping execution to avoid running a second dbt build.", + try_number, ) - self.log.error(retry_message) - raise AirflowException(retry_message) + return None self.log.info( "Dbt WATCHER producer task forces Airflow retries to 0 so the dbt build only runs once; " diff --git a/cosmos/operators/watcher_kubernetes.py b/cosmos/operators/watcher_kubernetes.py index 41c4df6248..67504590aa 100644 --- a/cosmos/operators/watcher_kubernetes.py +++ b/cosmos/operators/watcher_kubernetes.py @@ -104,12 +104,12 @@ def execute(self, context: Context, **kwargs: Any) -> Any: try_number = getattr(task_instance, "try_number", 1) if try_number > 1: - retry_message = ( + self.log.info( "DbtProducerWatcherKubernetesOperator does not support Airflow retries. " - f"Detected attempt #{try_number}; failing fast to avoid running a second dbt build." + "Detected attempt #%s; skipping execution to avoid running a second dbt build.", + try_number, ) - self.log.error(retry_message) - raise AirflowException(retry_message) + return None # This global variable is used to make the task context available to the K8s callback. # While the callback is set during the operator initialization, the context is only created during the operator's execution. diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index f631fa0022..282ec8126f 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -198,20 +198,20 @@ def test_dbt_producer_watcher_operator_logs_retry_message(caplog): assert any("forces Airflow retries to 0" in message for message in caplog.messages) -def test_dbt_producer_watcher_operator_blocks_retry_attempt(caplog): +def test_dbt_producer_watcher_operator_skips_retry_attempt(caplog): op = DbtProducerWatcherOperator(project_dir=".", profile_config=None) ti = _MockTI() ti.try_number = 2 context = {"ti": ti} with patch("cosmos.operators.local.DbtLocalBaseOperator.execute") as mock_execute: - with caplog.at_level(logging.ERROR): - with pytest.raises(AirflowException) as excinfo: - op.execute(context=context) + with caplog.at_level(logging.INFO): + result = op.execute(context=context) mock_execute.assert_not_called() - assert "does not support Airflow retries" in str(excinfo.value) + assert result is None assert any("does not support Airflow retries" in message for message in caplog.messages) + assert any("skipping execution" in message for message in caplog.messages) @pytest.mark.parametrize( diff --git a/tests/operators/test_watcher_kubernetes_unit.py b/tests/operators/test_watcher_kubernetes_unit.py index 4ef9207cd6..3b167c731f 100644 --- a/tests/operators/test_watcher_kubernetes_unit.py +++ b/tests/operators/test_watcher_kubernetes_unit.py @@ -103,9 +103,9 @@ def test_retries_overridden_even_if_user_sets_them(): @patch("cosmos.operators.kubernetes.DbtBuildKubernetesOperator.execute") -def test_blocks_retry_attempt(mock_execute, caplog): +def test_skips_retry_attempt(mock_execute, caplog): """ - Test that the operator raises an AirflowException when a retry is attempted (try_number > 1). + Test that the operator skips execution when a retry is attempted (try_number > 1). """ op = DbtProducerWatcherKubernetesOperator( project_dir=".", @@ -117,13 +117,13 @@ def test_blocks_retry_attempt(mock_execute, caplog): ti.try_number = 2 context = {"ti": ti} - with caplog.at_level(logging.ERROR): - with pytest.raises(AirflowException) as excinfo: - op.execute(context=context) + with caplog.at_level(logging.INFO): + result = op.execute(context=context) mock_execute.assert_not_called() - assert "does not support Airflow retries" in str(excinfo.value) + assert result is None assert any("does not support Airflow retries" in message for message in caplog.messages) + assert any("skipping execution" in message for message in caplog.messages) def test_raises_exception_when_task_instance_missing(): From 5db165eb3ddab64c35caf9f4dad52557f5918d40 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 23 Jan 2026 13:11:57 +0000 Subject: [PATCH 2/5] Add docs --- .../watcher-execution-mode.rst | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/docs/getting_started/watcher-execution-mode.rst b/docs/getting_started/watcher-execution-mode.rst index da966ce239..3b009a161d 100644 --- a/docs/getting_started/watcher-execution-mode.rst +++ b/docs/getting_started/watcher-execution-mode.rst @@ -219,15 +219,29 @@ How retries work ~~~~~~~~~~~~~~~~ When the ``dbt build`` command run by ``DbtProducerWatcherOperator`` fails, it will notify all the ``DbtConsumerWatcherSensor``. -Cosmos always sets the producer's Airflow task retries to ``0``; this ensures the failure surfaces immediately and avoids kicking off a second full ``dbt build`` run. The individual watcher tasks, that subclass ``DbtConsumerWatcherSensor``, can retry the dbt command by themselves using the same behaviour as ``ExecutionMode.LOCAL``. -This is also the reason why we set ``retries`` to ``0`` in the ``DbtProducerWatcherOperator`` task because rerunning the producer would repeat the full dbt build and duplicate -watcher callbacks which may not be processed by the consumers if they have already processed output XCOMs from the first run of the producer. If a branch of the DAG failed, users can clear the status of a failed consumer task, including its downstream tasks, via the Airflow UI - and each of them will run using the ``ExecutionMode.LOCAL``. -Currently, we do not support retrying the ``DbtProducerWatcherOperator`` task itself. +**Producer retry behavior** + +.. versionadded:: 1.12.2 + +When the ``DbtProducerWatcherOperator`` is triggered for a retry (try_number > 1), it will not re-run the dbt build command and will succeed. In previous versions of Cosmos, the producer task would fail during retries. +This behavior is designed to support TaskGroup-level retries, as reported in `#2282 `_. + +**Why this matters:** + +- In earlier versions, attempting to retry the producer task would raise an ``AirflowException``, causing the retry to fail immediately. +- Now, the producer gracefully skips execution on retries, logging an informational message explaining that the retry was skipped to avoid running a second ``dbt build``. +- This allows users to retry entire TaskGroups without the producer task blocking the retry flow. + +**Important considerations:** + +- The producer task should still be configured with ``retries=0`` (which Cosmos enforces by default) to avoid unintended duplicate ``dbt build`` runs. + +This behavior will be further improved once `#1978 `_ is implemented. ------------------------------------------------------------------------------- From 0459080ff7d7aa1fba394ce91fb3f52c361cbc0a Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 23 Jan 2026 14:02:30 +0000 Subject: [PATCH 3/5] Update docs/getting_started/watcher-execution-mode.rst Co-authored-by: Pankaj Koti --- docs/getting_started/watcher-execution-mode.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/getting_started/watcher-execution-mode.rst b/docs/getting_started/watcher-execution-mode.rst index 3b009a161d..1577a4a7a6 100644 --- a/docs/getting_started/watcher-execution-mode.rst +++ b/docs/getting_started/watcher-execution-mode.rst @@ -235,7 +235,7 @@ This behavior is designed to support TaskGroup-level retries, as reported in `#2 - In earlier versions, attempting to retry the producer task would raise an ``AirflowException``, causing the retry to fail immediately. - Now, the producer gracefully skips execution on retries, logging an informational message explaining that the retry was skipped to avoid running a second ``dbt build``. -- This allows users to retry entire TaskGroups without the producer task blocking the retry flow. +- This allows users to retry entire TaskGroups and/or DAGs without the producer task blocking the retry flow. **Important considerations:** From d0b1708f0af64d257121c42fcdb226fde80b54cc Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 23 Jan 2026 14:08:01 +0000 Subject: [PATCH 4/5] Improve docs based on feedback --- docs/getting_started/watcher-execution-mode.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/getting_started/watcher-execution-mode.rst b/docs/getting_started/watcher-execution-mode.rst index 1577a4a7a6..1b37191b7e 100644 --- a/docs/getting_started/watcher-execution-mode.rst +++ b/docs/getting_started/watcher-execution-mode.rst @@ -241,7 +241,9 @@ This behavior is designed to support TaskGroup-level retries, as reported in `#2 - The producer task should still be configured with ``retries=0`` (which Cosmos enforces by default) to avoid unintended duplicate ``dbt build`` runs. -This behavior will be further improved once `#1978 `_ is implemented. +- By default, Cosmos sets ``retries`` to ``0`` in``DbtProducerWatcherOperator``. Users can override this by declaring the ``retries`` parameter in ``setup_operator_args``. + +The overall retry behavior will be further improved once `#1978 `_ is implemented. ------------------------------------------------------------------------------- From c7b6de85dc2253c7402427f469d2e2ddd55c067b Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 23 Jan 2026 14:23:33 +0000 Subject: [PATCH 5/5] Improve docs based on feedback --- docs/getting_started/watcher-execution-mode.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/getting_started/watcher-execution-mode.rst b/docs/getting_started/watcher-execution-mode.rst index 1b37191b7e..32af2eb399 100644 --- a/docs/getting_started/watcher-execution-mode.rst +++ b/docs/getting_started/watcher-execution-mode.rst @@ -241,7 +241,7 @@ This behavior is designed to support TaskGroup-level retries, as reported in `#2 - The producer task should still be configured with ``retries=0`` (which Cosmos enforces by default) to avoid unintended duplicate ``dbt build`` runs. -- By default, Cosmos sets ``retries`` to ``0`` in``DbtProducerWatcherOperator``. Users can override this by declaring the ``retries`` parameter in ``setup_operator_args``. +- By default, Cosmos sets ``retries`` to ``0`` in``DbtProducerWatcherOperator``. Users can retry manually by clearing the status of the producer task and all its downstream tasks, keeping in mind that the producer task will not re-run the ``dbt build`` command and will succeed. The overall retry behavior will be further improved once `#1978 `_ is implemented.