Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions cosmos/operators/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs["should_store_compiled_sql"] = False
kwargs.setdefault("priority_weight", PRODUCER_WATCHER_DEFAULT_PRIORITY_WEIGHT)
kwargs.setdefault("weight_rule", WATCHER_TASK_WEIGHT_RULE)
# Consumer watcher retry logic handles model-level reruns using the LOCAL execution mode; rerunning the producer
# would repeat the full dbt build and duplicate watcher callbacks which may not be processed by the consumers if
# they have already processed output XCOMs from the first run of the producer, so we disable retries.
default_args = dict(kwargs.get("default_args", {}) or {})
default_args["retries"] = 0
kwargs["default_args"] = default_args
kwargs["retries"] = 0
kwargs["queue"] = watcher_dbt_execution_queue or kwargs.get("queue") or DEFAULT_QUEUE
super().__init__(task_id=task_id, *args, **kwargs)
Comment thread
tatiana marked this conversation as resolved.

Expand Down Expand Up @@ -193,11 +186,6 @@ def execute(self, context: Context, **kwargs: Any) -> Any:
)
return None

self.log.info(
"Dbt WATCHER producer task forces Airflow retries to 0 so the dbt build only runs once; "
"downstream sensors own model-level retries."
)

try:
use_events = self.invocation_mode == InvocationMode.DBT_RUNNER and EventMsg is not None
logger.debug("DbtProducerWatcherOperator: use_events=%s", use_events)
Expand Down
6 changes: 0 additions & 6 deletions cosmos/operators/watcher_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,6 @@ class DbtProducerWatcherKubernetesOperator(DbtBuildKubernetesOperator):
def __init__(self, *args: Any, **kwargs: Any) -> None:
task_id = kwargs.pop("task_id", "dbt_producer_watcher_operator")

# Disable retries on producer task
default_args = dict(kwargs.get("default_args", {}) or {})
default_args["retries"] = 0
kwargs["default_args"] = default_args
kwargs["retries"] = 0

existing_callbacks = kwargs.get("callbacks")
if existing_callbacks is None:
normalized_callbacks: list[Any] = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,7 @@ This behavior is designed to support TaskGroup-level retries, as reported in `#2

**Important considerations:**

- The producer task should still be configured with ``retries=0`` (which Cosmos enforces by default) to avoid unintended duplicate ``dbt build`` runs.

- By default, Cosmos sets ``retries`` to ``0`` in``DbtProducerWatcherOperator``. Users can retry manually by clearing the status of the producer task and all its downstream tasks, keeping in mind that the producer task will not re-run the ``dbt build`` command and will succeed.
- Retries are no longer forced to ``0`` by Cosmos, since 1.14.0. Users may configure ``retries`` freely on the producer task. On any retry attempt (``try_number > 1``), the producer gracefully skips execution and returns success — it will not re-run the ``dbt build`` command. This means retrying the producer (or clearing an entire TaskGroup) is safe and will not cause duplicate dbt builds. During the retry of the sensor tasks, they will effectively run the correspondent dbt commands.

The overall retry behavior will be further improved once `#1978 <https://github.com/astronomer/astronomer-cosmos/issues/1978>`_ is implemented.

Expand Down Expand Up @@ -406,6 +404,8 @@ When using ``ExecutionMode.WATCHER``, you may want to configure specific propert
- Reduced manual intervention - failed producer runs can recover without requiring operator restarts.
- Better reliability - retry behavior can be tuned independently from sensor tasks.

Because the producer gracefully skips re-execution on retries (returning success without re-running the dbt build), it is safe to set ``retries`` to any value you wish. Currently, retries work for sensor tasks, which, after a first failed attempt, will effectively run the corresponding dbt command.

Example: Configure the producer task with custom retry settings.

.. code-block:: python
Expand All @@ -417,7 +417,7 @@ Example: Configure the producer task with custom retry settings.
execution_config = ExecutionConfig(
execution_mode=ExecutionMode.WATCHER,
setup_operator_args={
"retries": 0,
"retries": 3,
"retry_delay": timedelta(minutes=5),
},
)
Expand Down
32 changes: 0 additions & 32 deletions tests/operators/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,6 @@ def test_dbt_producer_watcher_operator_priority_weight_override():
assert op.priority_weight == 100


Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests that asserted retries are forced to 0 were removed, but there isn’t a replacement assertion that user-supplied retry settings are now preserved. Add a unit test that initializes the producer with explicit retries (and/or default_args['retries']) and asserts the operator’s retries reflects the user-provided value (and that the input default_args dict is not mutated).

Suggested change
def test_dbt_producer_watcher_operator_preserves_explicit_retries():
"""User-supplied retries argument should be preserved on the operator."""
op = DbtProducerWatcherOperator(project_dir=".", profile_config=None, retries=5)
assert op.retries == 5
def test_dbt_producer_watcher_operator_preserves_default_args_retries_and_does_not_mutate():
"""
When retries is provided via default_args, the operator should respect it
and not mutate the caller's default_args dict.
"""
default_args = {"retries": 7}
original_default_args = default_args.copy()
op = DbtProducerWatcherOperator(project_dir=".", profile_config=None, default_args=default_args)
assert op.retries == 7
# Ensure the original dict passed in was not mutated
assert default_args == original_default_args

Copilot uses AI. Check for mistakes.
def test_dbt_producer_watcher_operator_retries_forced_to_zero():
op = DbtProducerWatcherOperator(project_dir=".", profile_config=None)
assert op.retries == 0


@pytest.mark.parametrize(
"invocation_mode, expected_log_format",
(
Expand All @@ -209,19 +204,6 @@ def test_dbt_producer_log_format_adjusts_with_invocation(invocation_mode, expect
assert getattr(op, "log_format", None) == expected_log_format


def test_dbt_producer_watcher_operator_retries_ignores_user_input():
user_default_args = {"retries": 5}
op = DbtProducerWatcherOperator(
project_dir=".",
profile_config=None,
default_args=user_default_args,
retries=3,
)

assert op.retries == 0
assert user_default_args["retries"] == 5


def test_dbt_producer_watcher_operator_pushes_completion_status():
"""Test that operator pushes 'completed' status to XCom in both success and failure cases."""
op = DbtProducerWatcherOperator(project_dir=".", profile_config=None)
Expand Down Expand Up @@ -301,20 +283,6 @@ def test_dbt_consumer_watcher_sensor_execute_complete_model_not_run_logs_message
assert any("ephemeral model or if the model sql file is empty" in message for message in caplog.messages)


def test_dbt_producer_watcher_operator_logs_retry_message(caplog):
op = DbtProducerWatcherOperator(project_dir=".", profile_config=None)
ti = _MockTI()
ti.try_number = 1
context = {"ti": ti}

with patch("cosmos.operators.local.DbtLocalBaseOperator.execute", return_value="ok") as mock_execute:
with caplog.at_level(logging.INFO):
op.execute(context=context)

mock_execute.assert_called_once()
assert any("forces Airflow retries to 0" in message for message in caplog.messages)


def test_dbt_producer_watcher_operator_skips_retry_attempt(caplog):
op = DbtProducerWatcherOperator(project_dir=".", profile_config=None)
ti = _MockTI()
Expand Down
25 changes: 0 additions & 25 deletions tests/operators/test_watcher_kubernetes_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,31 +77,6 @@
render_config = RenderConfig(load_method=LoadMode.DBT_MANIFEST, test_behavior=TestBehavior.NONE)


Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unit tests that validated DbtProducerWatcherKubernetesOperator forced retries=0 were removed, but there isn’t a new test validating the updated behavior (that user-configured retries are retained). Add a unit test that passes a non-zero retries (and/or via default_args) and asserts op.retries matches the provided value.

Suggested change
def test_producer_watcher_respects_configured_retries():
"""
Ensure that DbtProducerWatcherKubernetesOperator preserves a non-zero user-configured
retries value instead of forcing retries=0.
"""
op = DbtProducerWatcherKubernetesOperator(
task_id="test_retries",
project_dir=".",
profile_config=None,
image="dbt-image:latest",
retries=3,
)
assert op.retries == 3

Copilot uses AI. Check for mistakes.
def test_retries_set_to_zero_on_init():
"""
Test that the operator sets retries to 0 during initialization.
"""
op = DbtProducerWatcherKubernetesOperator(
project_dir=".",
profile_config=None,
image="dbt-image:latest",
)
assert op.retries == 0


def test_retries_overridden_even_if_user_sets_them():
"""
Test that even if a user explicitly sets retries, they are overridden to 0.
"""
op = DbtProducerWatcherKubernetesOperator(
project_dir=".",
profile_config=None,
image="dbt-image:latest",
retries=5,
)
assert op.retries == 0


@patch("cosmos.operators.kubernetes.DbtBuildKubernetesOperator.execute")
def test_skips_retry_attempt(mock_execute, caplog):
"""
Expand Down
Loading