From 2885b7c2b73daef7cf5d79edd6ed79341261a310 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 11 May 2026 19:32:59 +0530 Subject: [PATCH] Fix watcher fallback selector for versioned dbt models `BaseConsumerSensor._fallback_to_non_watcher_run` derived the dbt selector with `model_unique_id.split(".")[-1]`, which strips the version segment for versioned models (e.g. `model.pkg.my_model.v1` became `v1`). Use `split(".", 2)[2]` to match `DbtNode.resource_name` in cosmos/dbt/graph.py, the canonical parsing used throughout Cosmos for dbt unique_ids, so the full resource name including any version suffix is selected on fallback reruns. Co-Authored-By: Claude Opus 4.7 (1M context) --- cosmos/operators/_watcher/base.py | 2 +- tests/operators/test_watcher.py | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index 1fb0d245e4..db543480b5 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -447,7 +447,7 @@ def _fallback_to_non_watcher_run(self, try_number: int, context: Context) -> boo raw_flags = upstream_task.add_cmd_flags() extra_flags = self._filter_flags(raw_flags) - model_selector = self.model_unique_id.split(".")[-1] + model_selector = self.model_unique_id.split(".", 2)[2] cmd_flags = extra_flags + ["--select", model_selector] self.build_and_run_cmd(context, cmd_flags=cmd_flags) # type: ignore[attr-defined] diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 8d9cbd9c06..87f74375f7 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -1118,7 +1118,21 @@ def test_fallback_to_non_watcher_run(self): sensor.build_and_run_cmd.assert_called_once() args, kwargs = sensor.build_and_run_cmd.call_args assert "--select" in kwargs["cmd_flags"] - assert MODEL_UNIQUE_ID.split(".")[-1] in kwargs["cmd_flags"] + assert MODEL_UNIQUE_ID.split(".", 2)[2] in kwargs["cmd_flags"] + + def test_fallback_selector_preserves_version_suffix(self): + """For versioned dbt models (e.g. ``model.pkg.my_model.v1``) the selector must keep the version suffix.""" + sensor = self.make_sensor() + sensor.model_unique_id = "model.jaffle_shop.stg_orders.v1" + ti = MagicMock() + ti.task.dag.get_task.return_value.add_cmd_flags.return_value = [] + context = self.make_context(ti) + sensor.build_and_run_cmd = MagicMock() + + sensor._fallback_to_non_watcher_run(2, context) + + kwargs = sensor.build_and_run_cmd.call_args.kwargs + assert kwargs["cmd_flags"] == ["--select", "stg_orders.v1"] def test_filter_flags(self): flags = ["--select", "model", "--exclude", "other", "--threads", "2"]