From 35bb20f719442e48295ff44800cf9d74994e6e51 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Sat, 16 May 2026 01:27:42 +0530 Subject: [PATCH 1/5] Extract get_resource_name_from_unique_id helper The dbt unique_id-to-resource_name parse (split(".", 2)[2]) was open-coded in several places. PR #2659 added one more such site to fix a versioned-model bug. Consolidating the parse into a single helper with a docstring documents the unique_id format in one place and removes the drift risk between call sites. Add get_resource_name_from_unique_id in cosmos/dbt/resource.py with a docstring citing the dbt manifest spec and the versioned-model variant. Route DbtNode.resource_name through it, then replace the open-coded splits in cosmos/operators/watcher.py and cosmos/operators/_watcher/ base.py. Co-Authored-By: Claude Opus 4.7 (1M context) --- cosmos/dbt/graph.py | 7 ++++--- cosmos/dbt/resource.py | 25 +++++++++++++++++++++++++ cosmos/operators/_watcher/base.py | 3 ++- cosmos/operators/watcher.py | 9 +++++---- tests/dbt/test_resource.py | 16 ++++++++++++++++ tests/operators/test_watcher.py | 3 ++- 6 files changed, 54 insertions(+), 9 deletions(-) create mode 100644 cosmos/dbt/resource.py create mode 100644 tests/dbt/test_resource.py diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index b33976528f..d9e207d1d1 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -64,6 +64,7 @@ get_partial_parse_path, has_non_empty_dependencies_file, ) +from cosmos.dbt.resource import get_resource_name_from_unique_id from cosmos.dbt.selector import YamlSelectors, select_nodes from cosmos.log import get_logger @@ -169,10 +170,10 @@ def profile_config_to_override(self) -> dict[str, Any]: def resource_name(self) -> str: """ Use this property to retrieve the resource name for command generation, for instance: ["dbt", "run", "--models", f"{resource_name}"]. - The unique_id format is defined as [..](https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details). - For a special case like a versioned model, the unique_id follows this pattern: [model...](https://github.com/dbt-labs/dbt-core/blob/main/core/dbt/contracts/graph/node_args.py#L26C3-L31) + Delegates to :func:`cosmos.dbt.resource.get_resource_name_from_unique_id`, which documents the dbt ``unique_id`` format + (including the versioned-model variant ``model...``). """ - return self.unique_id.split(".", 2)[2] + return get_resource_name_from_unique_id(self.unique_id) @property def name(self) -> str: diff --git a/cosmos/dbt/resource.py b/cosmos/dbt/resource.py new file mode 100644 index 0000000000..5c348c68ef --- /dev/null +++ b/cosmos/dbt/resource.py @@ -0,0 +1,25 @@ +from __future__ import annotations + + +def get_resource_name_from_unique_id(unique_id: str) -> str: + """ + Return the ``resource_name`` segment of a dbt node ``unique_id``. + + Per the `dbt manifest spec + `_, + a node ``unique_id`` is ``..``. + Both ``resource_type`` and ``package`` are constrained identifiers that + cannot contain dots, so the first two dots are unambiguous separators + and everything after the second dot is the full resource name. + + For versioned models, dbt appends a fourth segment: + ``model...`` (see + `node_args.py `_). + Splitting with ``maxsplit=2`` preserves that suffix: + ``model.pkg.my_model.v1`` -> ``my_model.v1``. + + :raises IndexError: if ``unique_id`` does not contain at least two + dots. Malformed inputs are surfaced loudly rather than silently + mis-parsed. + """ + return unique_id.split(".", 2)[2] diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index 8615b0c16a..23bc6efd05 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -18,6 +18,7 @@ PRODUCER_WATCHER_TASK_ID, WATCHER_TASK_WEIGHT_RULE, ) +from cosmos.dbt.resource import get_resource_name_from_unique_id from cosmos.listeners.dag_run_listener import EventStatus from cosmos.log import get_logger from cosmos.operators._watcher.aggregation import get_tests_status_xcom_key, push_test_result_or_aggregate @@ -512,7 +513,7 @@ def _fallback_to_non_watcher_run(self, try_number: int, context: Context) -> boo raw_flags = upstream_task.add_cmd_flags() extra_flags = self._filter_flags(raw_flags) - model_selector = self.model_unique_id.split(".", 2)[2] + model_selector = get_resource_name_from_unique_id(self.model_unique_id) cmd_flags = extra_flags + ["--select", model_selector] self.build_and_run_cmd(context, cmd_flags=cmd_flags) # type: ignore[attr-defined] diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index f93c118816..c21fffb109 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -24,6 +24,7 @@ ) from cosmos.dataset import get_dataset_namespace from cosmos.dbt.graph import DbtNode +from cosmos.dbt.resource import get_resource_name_from_unique_id from cosmos.log import get_logger from cosmos.operators._watcher import safe_xcom_push from cosmos.operators._watcher.base import ( @@ -379,12 +380,12 @@ def _apply_node_state_tokens(self, context: Context, node_state_pairs: list[tupl # "error") that a custom freshness_callback may return. Without exclusion, # dbt would run the model anyway and either overwrite the pre-set XCom status # or trigger a race condition with the consumer sensor. - # Use the same parsing as DbtNode.resource_name: unique_id.split(".", 2)[2] - # This preserves version suffixes (e.g. model.pkg.my_model.v1 -> my_model.v1) excluded_ids = [uid for uid, state in node_state_pairs if state not in DBT_SUCCESS_STATUSES] if not excluded_ids: return - model_names = sorted({uid.split(".", 2)[2] for uid in excluded_ids if len(uid.split(".", 2)) == 3}) + model_names = sorted( + {get_resource_name_from_unique_id(uid) for uid in excluded_ids if len(uid.split(".", 2)) == 3} + ) exclude_str = " ".join(model_names) if exclude_str: current_exclude = getattr(self, "exclude", None) @@ -591,7 +592,7 @@ def _fallback_to_non_watcher_run(self, try_number: int, context: Context) -> boo self.model_unique_id, self.project_dir, ) - resource_name = self.model_unique_id.split(".", 2)[2] + resource_name = get_resource_name_from_unique_id(self.model_unique_id) cmd_flags = ["--select", f"source:{resource_name}"] self.build_and_run_cmd(context, cmd_flags=cmd_flags) logger.info("dbt source freshness completed successfully on retry for source '%s'", self.model_unique_id) diff --git a/tests/dbt/test_resource.py b/tests/dbt/test_resource.py new file mode 100644 index 0000000000..41a348c52a --- /dev/null +++ b/tests/dbt/test_resource.py @@ -0,0 +1,16 @@ +import pytest + +from cosmos.dbt.resource import get_resource_name_from_unique_id + + +class TestGetResourceNameFromUniqueId: + def test_plain_model(self): + assert get_resource_name_from_unique_id("model.my_pkg.my_model") == "my_model" + + def test_versioned_model_preserves_version_suffix(self): + assert get_resource_name_from_unique_id("model.my_pkg.my_model.v1") == "my_model.v1" + + @pytest.mark.parametrize("malformed", ["", "foo", "foo.bar"]) + def test_malformed_unique_id_raises(self, malformed): + with pytest.raises(IndexError): + get_resource_name_from_unique_id(malformed) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 8572de2477..671ce4415f 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -26,6 +26,7 @@ ExecutionMode, SourceRenderingBehavior, ) +from cosmos.dbt.resource import get_resource_name_from_unique_id from cosmos.operators._watcher.base import store_compiled_sql_for_model from cosmos.operators._watcher.triggerer import WatcherEventReason, WatcherTrigger from cosmos.operators.watcher import ( @@ -1227,7 +1228,7 @@ def test_fallback_to_non_watcher_run(self): sensor.build_and_run_cmd.assert_called_once() args, kwargs = sensor.build_and_run_cmd.call_args assert "--select" in kwargs["cmd_flags"] - assert MODEL_UNIQUE_ID.split(".", 2)[2] in kwargs["cmd_flags"] + assert get_resource_name_from_unique_id(MODEL_UNIQUE_ID) in kwargs["cmd_flags"] def test_fallback_strips_producer_log_format_by_default(self): """Producer's ``--log-format json`` (internal, used for event-stream parsing) must not leak into From 8cb512c3f85e55e299b6424ef1d56828aa6b1cfc Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Sat, 16 May 2026 01:40:52 +0530 Subject: [PATCH 2/5] Route remaining unique_id splits through helper Three additional inline split(".", 2)[2] sites came in from a main merge: one in DbtTestWatcherOperator's fallback in cosmos/operators/watcher.py and two assertions in tests/operators/test_watcher.py. Route them through get_resource_name_from_unique_id for consistency with the helper introduced earlier in this PR. Co-Authored-By: Claude Opus 4.7 (1M context) --- cosmos/operators/watcher.py | 2 +- tests/operators/test_watcher.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index c21fffb109..9932f0dbb5 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -659,7 +659,7 @@ def _fallback_to_non_watcher_run(self, try_number: int, context: Context) -> boo try_number, ) - model_selector = self.model_unique_id.split(".", 2)[2] + model_selector = get_resource_name_from_unique_id(self.model_unique_id) cmd_flags = ["--select", model_selector] self.build_and_run_cmd(context, cmd_flags=cmd_flags) logger.info("dbt test completed successfully for model '%s'", self.model_unique_id) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 671ce4415f..81a06d5546 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -2590,7 +2590,7 @@ def test_fallback_runs_dbt_test_on_retry(self): mock_fallback_to_non_watcher_run.assert_called_once() sensor.build_and_run_cmd.assert_called_once() _, kwargs = sensor.build_and_run_cmd.call_args - assert kwargs["cmd_flags"] == ["--select", self.MODEL_UID.split(".", 2)[2]] + assert kwargs["cmd_flags"] == ["--select", get_resource_name_from_unique_id(self.MODEL_UID)] def test_fallback_via_poke_does_not_forward_producer_flags(self): """Driving through ``poke`` on retry, the fallback should issue ``dbt test`` with @@ -2612,7 +2612,7 @@ def test_fallback_via_poke_does_not_forward_producer_flags(self): mock_fallback_to_non_watcher_run.assert_called_once() _, kwargs = sensor.build_and_run_cmd.call_args - assert kwargs["cmd_flags"] == ["--select", self.MODEL_UID.split(".", 2)[2]] + assert kwargs["cmd_flags"] == ["--select", get_resource_name_from_unique_id(self.MODEL_UID)] assert "--full-refresh" not in kwargs["cmd_flags"] assert sensor.base_cmd == ["test"] From 974c6c65da7b133897626f921e4d5e810634936c Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Tue, 26 May 2026 21:49:52 +0530 Subject: [PATCH 3/5] Test source watcher fallback path on retry DbtSourceWatcherOperator._fallback_to_non_watcher_run had no test coverage, leaving the helper call there flagged by codecov/patch. Cover it directly, asserting cmd_flags is ["--select", "source:"]. Use a multi-segment resource name (raw.orders) to also pin the maxsplit=2 behaviour that preserves dots after the package segment. --- tests/operators/test_watcher.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 81a06d5546..a50e125ff1 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -2494,6 +2494,31 @@ def test_dbt_source_watcher_operator_template_fields(): assert field in DbtSourceWatcherOperator.template_fields +def test_dbt_source_watcher_operator_fallback_runs_source_freshness(): + """On retry the source sensor should run ``dbt source freshness --select source:`` + locally for its specific source. + """ + from cosmos.operators.watcher import DbtSourceWatcherOperator + + source_uid = "source.jaffle_shop.raw.orders" + extra_context = {"dbt_node_config": {"unique_id": source_uid}} + sensor = DbtSourceWatcherOperator( + task_id="raw_orders.source", + project_dir="/tmp/project", + profile_config=None, + extra_context=extra_context, + ) + sensor.build_and_run_cmd = MagicMock() + context = MagicMock() + + result = sensor._fallback_to_non_watcher_run(2, context) + + assert result is True + sensor.build_and_run_cmd.assert_called_once() + _, kwargs = sensor.build_and_run_cmd.call_args + assert kwargs["cmd_flags"] == ["--select", "source:raw.orders"] + + class TestDbtTestWatcherOperator: """Tests for DbtTestWatcherOperator — the sensor that watches aggregated test results.""" From c3e359970c49c0deefa7dcaedeced5ce999a6cc3 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 5 Jun 2026 12:45:36 +0530 Subject: [PATCH 4/5] Move resource_name parsing into DbtNode static method Replace the standalone cosmos.dbt.resource module with a DbtNode.get_resource_name_from_unique_id static method and route all call sites through it. Collapse the redundant double split in the source-freshness exclude path into a single helper call, logging a warning on malformed unique_ids instead of silently dropping them. --- cosmos/dbt/graph.py | 29 ++++++++++++++++++++++++++--- cosmos/dbt/resource.py | 25 ------------------------- cosmos/operators/_watcher/base.py | 4 ++-- cosmos/operators/watcher.py | 15 +++++++++------ tests/dbt/test_graph.py | 13 +++++++++++++ tests/dbt/test_resource.py | 16 ---------------- tests/operators/test_watcher.py | 17 +++++++++++++---- 7 files changed, 63 insertions(+), 56 deletions(-) delete mode 100644 cosmos/dbt/resource.py delete mode 100644 tests/dbt/test_resource.py diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index d9e207d1d1..74db3bd1e6 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -64,7 +64,6 @@ get_partial_parse_path, has_non_empty_dependencies_file, ) -from cosmos.dbt.resource import get_resource_name_from_unique_id from cosmos.dbt.selector import YamlSelectors, select_nodes from cosmos.log import get_logger @@ -166,14 +165,38 @@ def profile_config_to_override(self) -> dict[str, Any]: ) return operator_kwargs + @staticmethod + def get_resource_name_from_unique_id(unique_id: str) -> str: + """ + Return the ``resource_name`` segment of a dbt node ``unique_id``. + + Per the `dbt manifest spec + `_, + a node ``unique_id`` is ``..``. + Both ``resource_type`` and ``package`` are constrained identifiers that + cannot contain dots, so the first two dots are unambiguous separators + and everything after the second dot is the full resource name. + + For versioned models, dbt appends a fourth segment: + ``model...`` (see + `node_args.py `_). + Splitting with ``maxsplit=2`` preserves that suffix: + ``model.pkg.my_model.v1`` -> ``my_model.v1``. + + :raises IndexError: if ``unique_id`` does not contain at least two + dots. Malformed inputs are surfaced loudly rather than silently + mis-parsed. + """ + return unique_id.split(".", 2)[2] + @property def resource_name(self) -> str: """ Use this property to retrieve the resource name for command generation, for instance: ["dbt", "run", "--models", f"{resource_name}"]. - Delegates to :func:`cosmos.dbt.resource.get_resource_name_from_unique_id`, which documents the dbt ``unique_id`` format + Delegates to :meth:`get_resource_name_from_unique_id`, which documents the dbt ``unique_id`` format (including the versioned-model variant ``model...``). """ - return get_resource_name_from_unique_id(self.unique_id) + return self.get_resource_name_from_unique_id(self.unique_id) @property def name(self) -> str: diff --git a/cosmos/dbt/resource.py b/cosmos/dbt/resource.py deleted file mode 100644 index 5c348c68ef..0000000000 --- a/cosmos/dbt/resource.py +++ /dev/null @@ -1,25 +0,0 @@ -from __future__ import annotations - - -def get_resource_name_from_unique_id(unique_id: str) -> str: - """ - Return the ``resource_name`` segment of a dbt node ``unique_id``. - - Per the `dbt manifest spec - `_, - a node ``unique_id`` is ``..``. - Both ``resource_type`` and ``package`` are constrained identifiers that - cannot contain dots, so the first two dots are unambiguous separators - and everything after the second dot is the full resource name. - - For versioned models, dbt appends a fourth segment: - ``model...`` (see - `node_args.py `_). - Splitting with ``maxsplit=2`` preserves that suffix: - ``model.pkg.my_model.v1`` -> ``my_model.v1``. - - :raises IndexError: if ``unique_id`` does not contain at least two - dots. Malformed inputs are surfaced loudly rather than silently - mis-parsed. - """ - return unique_id.split(".", 2)[2] diff --git a/cosmos/operators/_watcher/base.py b/cosmos/operators/_watcher/base.py index 23bc6efd05..10d7983077 100644 --- a/cosmos/operators/_watcher/base.py +++ b/cosmos/operators/_watcher/base.py @@ -18,7 +18,7 @@ PRODUCER_WATCHER_TASK_ID, WATCHER_TASK_WEIGHT_RULE, ) -from cosmos.dbt.resource import get_resource_name_from_unique_id +from cosmos.dbt.graph import DbtNode from cosmos.listeners.dag_run_listener import EventStatus from cosmos.log import get_logger from cosmos.operators._watcher.aggregation import get_tests_status_xcom_key, push_test_result_or_aggregate @@ -513,7 +513,7 @@ def _fallback_to_non_watcher_run(self, try_number: int, context: Context) -> boo raw_flags = upstream_task.add_cmd_flags() extra_flags = self._filter_flags(raw_flags) - model_selector = get_resource_name_from_unique_id(self.model_unique_id) + model_selector = DbtNode.get_resource_name_from_unique_id(self.model_unique_id) cmd_flags = extra_flags + ["--select", model_selector] self.build_and_run_cmd(context, cmd_flags=cmd_flags) # type: ignore[attr-defined] diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 9932f0dbb5..0570a48850 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -24,7 +24,6 @@ ) from cosmos.dataset import get_dataset_namespace from cosmos.dbt.graph import DbtNode -from cosmos.dbt.resource import get_resource_name_from_unique_id from cosmos.log import get_logger from cosmos.operators._watcher import safe_xcom_push from cosmos.operators._watcher.base import ( @@ -383,9 +382,13 @@ def _apply_node_state_tokens(self, context: Context, node_state_pairs: list[tupl excluded_ids = [uid for uid, state in node_state_pairs if state not in DBT_SUCCESS_STATUSES] if not excluded_ids: return - model_names = sorted( - {get_resource_name_from_unique_id(uid) for uid in excluded_ids if len(uid.split(".", 2)) == 3} - ) + resource_names = set() + for uid in excluded_ids: + try: + resource_names.add(DbtNode.get_resource_name_from_unique_id(uid)) + except IndexError: + logger.warning("Skipping malformed dbt unique_id while building source-freshness exclude list: %s", uid) + model_names = sorted(resource_names) exclude_str = " ".join(model_names) if exclude_str: current_exclude = getattr(self, "exclude", None) @@ -592,7 +595,7 @@ def _fallback_to_non_watcher_run(self, try_number: int, context: Context) -> boo self.model_unique_id, self.project_dir, ) - resource_name = get_resource_name_from_unique_id(self.model_unique_id) + resource_name = DbtNode.get_resource_name_from_unique_id(self.model_unique_id) cmd_flags = ["--select", f"source:{resource_name}"] self.build_and_run_cmd(context, cmd_flags=cmd_flags) logger.info("dbt source freshness completed successfully on retry for source '%s'", self.model_unique_id) @@ -659,7 +662,7 @@ def _fallback_to_non_watcher_run(self, try_number: int, context: Context) -> boo try_number, ) - model_selector = get_resource_name_from_unique_id(self.model_unique_id) + model_selector = DbtNode.get_resource_name_from_unique_id(self.model_unique_id) cmd_flags = ["--select", model_selector] self.build_and_run_cmd(context, cmd_flags=cmd_flags) logger.info("dbt test completed successfully for model '%s'", self.model_unique_id) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 4e2c546c2a..8c9db049f8 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -138,6 +138,19 @@ def test_dbt_node_name_and_select(unique_id, expected_name, expected_select): assert node.resource_name == expected_select +class TestGetResourceNameFromUniqueId: + def test_plain_model(self): + assert DbtNode.get_resource_name_from_unique_id("model.my_pkg.my_model") == "my_model" + + def test_versioned_model_preserves_version_suffix(self): + assert DbtNode.get_resource_name_from_unique_id("model.my_pkg.my_model.v1") == "my_model.v1" + + @pytest.mark.parametrize("malformed", ["", "foo", "foo.bar"]) + def test_malformed_unique_id_raises(self, malformed): + with pytest.raises(IndexError): + DbtNode.get_resource_name_from_unique_id(malformed) + + def test_dbt_node_meta(): valid_node = DbtNode( unique_id="some-id", diff --git a/tests/dbt/test_resource.py b/tests/dbt/test_resource.py deleted file mode 100644 index 41a348c52a..0000000000 --- a/tests/dbt/test_resource.py +++ /dev/null @@ -1,16 +0,0 @@ -import pytest - -from cosmos.dbt.resource import get_resource_name_from_unique_id - - -class TestGetResourceNameFromUniqueId: - def test_plain_model(self): - assert get_resource_name_from_unique_id("model.my_pkg.my_model") == "my_model" - - def test_versioned_model_preserves_version_suffix(self): - assert get_resource_name_from_unique_id("model.my_pkg.my_model.v1") == "my_model.v1" - - @pytest.mark.parametrize("malformed", ["", "foo", "foo.bar"]) - def test_malformed_unique_id_raises(self, malformed): - with pytest.raises(IndexError): - get_resource_name_from_unique_id(malformed) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index a50e125ff1..b2bcb7f58f 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -26,7 +26,7 @@ ExecutionMode, SourceRenderingBehavior, ) -from cosmos.dbt.resource import get_resource_name_from_unique_id +from cosmos.dbt.graph import DbtNode from cosmos.operators._watcher.base import store_compiled_sql_for_model from cosmos.operators._watcher.triggerer import WatcherEventReason, WatcherTrigger from cosmos.operators.watcher import ( @@ -1228,7 +1228,7 @@ def test_fallback_to_non_watcher_run(self): sensor.build_and_run_cmd.assert_called_once() args, kwargs = sensor.build_and_run_cmd.call_args assert "--select" in kwargs["cmd_flags"] - assert get_resource_name_from_unique_id(MODEL_UNIQUE_ID) in kwargs["cmd_flags"] + assert DbtNode.get_resource_name_from_unique_id(MODEL_UNIQUE_ID) in kwargs["cmd_flags"] def test_fallback_strips_producer_log_format_by_default(self): """Producer's ``--log-format json`` (internal, used for event-stream parsing) must not leak into @@ -2615,7 +2615,7 @@ def test_fallback_runs_dbt_test_on_retry(self): mock_fallback_to_non_watcher_run.assert_called_once() sensor.build_and_run_cmd.assert_called_once() _, kwargs = sensor.build_and_run_cmd.call_args - assert kwargs["cmd_flags"] == ["--select", get_resource_name_from_unique_id(self.MODEL_UID)] + assert kwargs["cmd_flags"] == ["--select", DbtNode.get_resource_name_from_unique_id(self.MODEL_UID)] def test_fallback_via_poke_does_not_forward_producer_flags(self): """Driving through ``poke`` on retry, the fallback should issue ``dbt test`` with @@ -2637,7 +2637,7 @@ def test_fallback_via_poke_does_not_forward_producer_flags(self): mock_fallback_to_non_watcher_run.assert_called_once() _, kwargs = sensor.build_and_run_cmd.call_args - assert kwargs["cmd_flags"] == ["--select", get_resource_name_from_unique_id(self.MODEL_UID)] + assert kwargs["cmd_flags"] == ["--select", DbtNode.get_resource_name_from_unique_id(self.MODEL_UID)] assert "--full-refresh" not in kwargs["cmd_flags"] assert sensor.base_cmd == ["test"] @@ -3009,6 +3009,15 @@ def test_apply_node_state_tokens_appends_to_existing_exclude(self): assert "existing_model" in producer.exclude assert "m1" in producer.exclude + def test_apply_node_state_tokens_skips_malformed_unique_id(self): + producer = self._make_producer() + producer.exclude = None + ti = MagicMock() + context = {"ti": ti} + producer._apply_node_state_tokens(context, [("malformed_uid", "skipped"), ("model.pkg.m1", "skipped")]) + # The malformed id is skipped while the valid one still makes it into exclude. + assert producer.exclude == "m1" + def test_apply_node_state_tokens_noop_when_empty(self): producer = self._make_producer() producer.exclude = None From 12f2fed2cdbc13b612f96d1e245290810bc69b17 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 5 Jun 2026 13:33:54 +0530 Subject: [PATCH 5/5] Reject malformed dbt unique_ids in resource name parsing Validate that the unique_id splits into exactly three non-empty segments before returning the resource name, raising ValueError on shapes like 'model..name', '..' or 'model.pkg.' instead of silently mis-parsing them. --- cosmos/dbt/graph.py | 18 ++++++++++++++---- cosmos/operators/watcher.py | 2 +- tests/dbt/test_graph.py | 7 +++++-- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 74db3bd1e6..eec94487f0 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -183,11 +183,21 @@ def get_resource_name_from_unique_id(unique_id: str) -> str: Splitting with ``maxsplit=2`` preserves that suffix: ``model.pkg.my_model.v1`` -> ``my_model.v1``. - :raises IndexError: if ``unique_id`` does not contain at least two - dots. Malformed inputs are surfaced loudly rather than silently - mis-parsed. + :raises ValueError: if ``unique_id`` does not have the expected + ``..`` shape, i.e. fewer + than two dots or any empty segment (e.g. ``model..name``, ``..``, + ``model.pkg.``). Malformed inputs are surfaced loudly rather than + silently mis-parsed. """ - return unique_id.split(".", 2)[2] + # ``maxsplit=2`` caps the result at 3 elements, so a well-formed + # unique_id always yields exactly 3 non-empty parts (the versioned/source + # suffixes stay attached to the third part). + parts = unique_id.split(".", 2) + if len(parts) != 3 or not all(parts): + raise ValueError( + f"Malformed dbt unique_id, expected '..': {unique_id!r}" + ) + return parts[2] @property def resource_name(self) -> str: diff --git a/cosmos/operators/watcher.py b/cosmos/operators/watcher.py index 0570a48850..719dcf0b1d 100644 --- a/cosmos/operators/watcher.py +++ b/cosmos/operators/watcher.py @@ -386,7 +386,7 @@ def _apply_node_state_tokens(self, context: Context, node_state_pairs: list[tupl for uid in excluded_ids: try: resource_names.add(DbtNode.get_resource_name_from_unique_id(uid)) - except IndexError: + except ValueError: logger.warning("Skipping malformed dbt unique_id while building source-freshness exclude list: %s", uid) model_names = sorted(resource_names) exclude_str = " ".join(model_names) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 8c9db049f8..c254d66c73 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -145,9 +145,12 @@ def test_plain_model(self): def test_versioned_model_preserves_version_suffix(self): assert DbtNode.get_resource_name_from_unique_id("model.my_pkg.my_model.v1") == "my_model.v1" - @pytest.mark.parametrize("malformed", ["", "foo", "foo.bar"]) + @pytest.mark.parametrize( + "malformed", + ["", "foo", "foo.bar", "model..name", "..", "model.pkg.", ".pkg.name"], + ) def test_malformed_unique_id_raises(self, malformed): - with pytest.raises(IndexError): + with pytest.raises(ValueError): DbtNode.get_resource_name_from_unique_id(malformed)