Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cosmos/operators/_watcher/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ def _fallback_to_non_watcher_run(self, try_number: int, context: Context) -> boo
raw_flags = upstream_task.add_cmd_flags()
extra_flags = self._filter_flags(raw_flags)

model_selector = self.model_unique_id.split(".")[-1]
model_selector = self.model_unique_id.split(".", 2)[2]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pankajkoti, maybe instead of hard-coding index 2, we could retrieve the last index, similar to what we were doing:

Suggested change
model_selector = self.model_unique_id.split(".", 2)[2]
model_selector = self.model_unique_id.split(".", 2)[-1]

Do we have any official docs on how dbt forms model_unique_id? It could be worth mentioning what the variations are, that normally there would be two dots, but in some cases there are more, and why it is safe to group the last dots

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On [2] vs [-1]: I'd lean toward keeping [2]. For a well-formed unique_id, they return the same value; the difference is the failure mode. [2] raises IndexError and surfaces malformed input loudly, while [-1] would silently fall back to the package or resource_type and mis-issue the dbt command. It also matches the canonical parse already used in DbtNode.resource_name (cosmos/dbt/graph.py) and in cosmos/operators/watcher.py at lines 340, 546, and 613.

On the docs ask: per the dbt manifest spec, node unique_ids are <resource_type>.<package>.<resource_name>. Both resource_type and package are constrained identifiers that cannot contain dots, so the first two dots are unambiguous separators and everything after the second dot is the full resource name. For versioned models, dbt appends a fourth segment: model.<package>.<resource_name>.<version> (see node_args.py).
split(".", 2)[2] returns the whole remainder (my_model.v1); split(".")[-1] returns just v1, which is the bug this PR fixes as mentioned in the PR description.

I would like to skip adding an inline docstring at this site because the parse is open-coded in multiple places across the codebase that are linked above.

As we agreed, I am following up with a refactor PR that reuses a common method for these sites.

cmd_flags = extra_flags + ["--select", model_selector]

self.build_and_run_cmd(context, cmd_flags=cmd_flags) # type: ignore[attr-defined]
Expand Down
16 changes: 15 additions & 1 deletion tests/operators/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,21 @@ def test_fallback_to_non_watcher_run(self):
sensor.build_and_run_cmd.assert_called_once()
args, kwargs = sensor.build_and_run_cmd.call_args
assert "--select" in kwargs["cmd_flags"]
assert MODEL_UNIQUE_ID.split(".")[-1] in kwargs["cmd_flags"]
assert MODEL_UNIQUE_ID.split(".", 2)[2] in kwargs["cmd_flags"]

def test_fallback_selector_preserves_version_suffix(self):
"""For versioned dbt models (e.g. ``model.pkg.my_model.v1``) the selector must keep the version suffix."""
sensor = self.make_sensor()
sensor.model_unique_id = "model.jaffle_shop.stg_orders.v1"
ti = MagicMock()
ti.task.dag.get_task.return_value.add_cmd_flags.return_value = []
context = self.make_context(ti)
sensor.build_and_run_cmd = MagicMock()

sensor._fallback_to_non_watcher_run(2, context)

kwargs = sensor.build_and_run_cmd.call_args.kwargs
assert kwargs["cmd_flags"] == ["--select", "stg_orders.v1"]

def test_filter_flags(self):
flags = ["--select", "model", "--exclude", "other", "--threads", "2"]
Expand Down
Loading