Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cosmos/operators/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@ def execute(self, context: Context, **kwargs: Any) -> Any:
try_number = getattr(task_instance, "try_number", 1)

if try_number > 1:
retry_message = (
self.log.info(
"Dbt WATCHER producer task does not support Airflow retries. "
f"Detected attempt #{try_number}; failing fast to avoid running a second dbt build."
"Detected attempt #%s; skipping execution to avoid running a second dbt build.",
try_number,
)
self.log.error(retry_message)
raise AirflowException(retry_message)
return None

self.log.info(
"Dbt WATCHER producer task forces Airflow retries to 0 so the dbt build only runs once; "
Expand Down
8 changes: 4 additions & 4 deletions cosmos/operators/watcher_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ def execute(self, context: Context, **kwargs: Any) -> Any:
try_number = getattr(task_instance, "try_number", 1)

if try_number > 1:
retry_message = (
self.log.info(
"DbtProducerWatcherKubernetesOperator does not support Airflow retries. "
f"Detected attempt #{try_number}; failing fast to avoid running a second dbt build."
"Detected attempt #%s; skipping execution to avoid running a second dbt build.",
try_number,
)
self.log.error(retry_message)
raise AirflowException(retry_message)
return None

# This global variable is used to make the task context available to the K8s callback.
# While the callback is set during the operator initialization, the context is only created during the operator's execution.
Expand Down
24 changes: 20 additions & 4 deletions docs/getting_started/watcher-execution-mode.rst
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,31 @@ How retries work
~~~~~~~~~~~~~~~~

When the ``dbt build`` command run by ``DbtProducerWatcherOperator`` fails, it will notify all the ``DbtConsumerWatcherSensor``.
Cosmos always sets the producer's Airflow task retries to ``0``; this ensures the failure surfaces immediately and avoids kicking off a second full ``dbt build`` run.

The individual watcher tasks, that subclass ``DbtConsumerWatcherSensor``, can retry the dbt command by themselves using the same behaviour as ``ExecutionMode.LOCAL``.
This is also the reason why we set ``retries`` to ``0`` in the ``DbtProducerWatcherOperator`` task because rerunning the producer would repeat the full dbt build and duplicate
watcher callbacks which may not be processed by the consumers if they have already processed output XCOMs from the first run of the producer.
Comment thread
tatiana marked this conversation as resolved.

If a branch of the DAG failed, users can clear the status of a failed consumer task, including its downstream tasks, via the Airflow UI - and each of them will run using the ``ExecutionMode.LOCAL``.

Currently, we do not support retrying the ``DbtProducerWatcherOperator`` task itself.
**Producer retry behavior**

.. versionadded:: 1.12.2

When the ``DbtProducerWatcherOperator`` is triggered for a retry (try_number > 1), it will not re-run the dbt build command and will succeed. In previous versions of Cosmos, the producer task would fail during retries.
This behavior is designed to support TaskGroup-level retries, as reported in `#2282 <https://github.com/astronomer/astronomer-cosmos/issues/2282>`_.

**Why this matters:**

- In earlier versions, attempting to retry the producer task would raise an ``AirflowException``, causing the retry to fail immediately.
- Now, the producer gracefully skips execution on retries, logging an informational message explaining that the retry was skipped to avoid running a second ``dbt build``.
- This allows users to retry entire TaskGroups and/or DAGs without the producer task blocking the retry flow.

**Important considerations:**

- The producer task should still be configured with ``retries=0`` (which Cosmos enforces by default) to avoid unintended duplicate ``dbt build`` runs.

- By default, Cosmos sets ``retries`` to ``0`` in``DbtProducerWatcherOperator``. Users can retry manually by clearing the status of the producer task and all its downstream tasks, keeping in mind that the producer task will not re-run the ``dbt build`` command and will succeed.

The overall retry behavior will be further improved once `#1978 <https://github.com/astronomer/astronomer-cosmos/issues/1978>`_ is implemented.

-------------------------------------------------------------------------------

Expand Down
10 changes: 5 additions & 5 deletions tests/operators/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,20 +198,20 @@ def test_dbt_producer_watcher_operator_logs_retry_message(caplog):
assert any("forces Airflow retries to 0" in message for message in caplog.messages)


def test_dbt_producer_watcher_operator_blocks_retry_attempt(caplog):
def test_dbt_producer_watcher_operator_skips_retry_attempt(caplog):
op = DbtProducerWatcherOperator(project_dir=".", profile_config=None)
ti = _MockTI()
ti.try_number = 2
context = {"ti": ti}

with patch("cosmos.operators.local.DbtLocalBaseOperator.execute") as mock_execute:
with caplog.at_level(logging.ERROR):
with pytest.raises(AirflowException) as excinfo:
op.execute(context=context)
with caplog.at_level(logging.INFO):
result = op.execute(context=context)

mock_execute.assert_not_called()
assert "does not support Airflow retries" in str(excinfo.value)
assert result is None
assert any("does not support Airflow retries" in message for message in caplog.messages)
assert any("skipping execution" in message for message in caplog.messages)


@pytest.mark.parametrize(
Expand Down
12 changes: 6 additions & 6 deletions tests/operators/test_watcher_kubernetes_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ def test_retries_overridden_even_if_user_sets_them():


@patch("cosmos.operators.kubernetes.DbtBuildKubernetesOperator.execute")
def test_blocks_retry_attempt(mock_execute, caplog):
def test_skips_retry_attempt(mock_execute, caplog):
"""
Test that the operator raises an AirflowException when a retry is attempted (try_number > 1).
Test that the operator skips execution when a retry is attempted (try_number > 1).
"""
op = DbtProducerWatcherKubernetesOperator(
project_dir=".",
Expand All @@ -117,13 +117,13 @@ def test_blocks_retry_attempt(mock_execute, caplog):
ti.try_number = 2
context = {"ti": ti}

with caplog.at_level(logging.ERROR):
with pytest.raises(AirflowException) as excinfo:
op.execute(context=context)
with caplog.at_level(logging.INFO):
result = op.execute(context=context)

mock_execute.assert_not_called()
assert "does not support Airflow retries" in str(excinfo.value)
assert result is None
assert any("does not support Airflow retries" in message for message in caplog.messages)
assert any("skipping execution" in message for message in caplog.messages)


def test_raises_exception_when_task_instance_missing():
Expand Down