From a0ac2daf634897b9fdad592238681b4c1a6c78ca Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 20 Feb 2026 15:32:29 +0530 Subject: [PATCH] fix(watcher): Respect deferrable=False from operator_args on consumer sensor DbtConsumerWatcherSensor did not pass deferrable (or producer_task_id) to super().__init__() after the refactor in PR #2245, so operator_args={"deferrable": False} was ignored and sensors always deferred. Pass both through so the base class receives them. related: #2245 --- cosmos/operators/watcher.py | 2 ++ tests/operators/test_watcher.py | 17 ++++++++++++++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 0fc78aa208..d211038c0f 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -241,6 +241,8 @@ def __init__( profile_config=profile_config, project_dir=project_dir, profiles_dir=profiles_dir, + producer_task_id=producer_task_id, + deferrable=deferrable, **kwargs, ) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 23a71e32fa..d07fc91f85 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -281,11 +281,11 @@ def test_dbt_producer_watcher_operator_skips_retry_attempt(caplog): ({"status": "success", "reason": "model_not_run"}, None), ( {"status": "failed", "reason": "model_failed"}, - "dbt model 'model.pkg.m' failed. Review the producer task 'dbt_producer_watcher' logs for details.", + "dbt model 'model.pkg.m' failed. Review the producer task 'dbt_producer_watcher_operator' logs for details.", ), ( {"status": "failed", "reason": "producer_failed"}, - "Watcher producer task 'dbt_producer_watcher' failed before reporting model results. Check its logs for the underlying error.", + "Watcher producer task 'dbt_producer_watcher_operator' failed before reporting model results. Check its logs for the underlying error.", ), ], ) @@ -810,7 +810,7 @@ def make_sensor(self, **kwargs): task_id="model.my_model", project_dir="/tmp/project", profile_config=None, - deferrable=True, + deferrable=kwargs.pop("deferrable", True), **kwargs, ) @@ -1205,6 +1205,17 @@ def test_sensor_not_deferred(self, mock_poke): sensor.execute(context=context) mock_poke.assert_called_once() + @patch("cosmos.operators.watcher.DbtConsumerWatcherSensor.poke") + def test_deferrable_false_via_constructor_does_not_defer(self, mock_poke): + """operator_args={'deferrable': False} is respected: sensor created with deferrable=False does not defer.""" + mock_poke.return_value = True + sensor = self.make_sensor(deferrable=False) + assert sensor.deferrable is False + context = {"run_id": "run_id", "task_instance": Mock()} + sensor.execute(context=context) + mock_poke.assert_called_once() + # No TaskDeferred raised: sensor ran synchronously and completed + @pytest.mark.parametrize( "mock_event", [