Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 58 additions & 5 deletions cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
SUPPORTED_CONFIG = ["materialized", "schema", "tags", CONFIG_META_PATH]
PATH_SELECTOR = "path:"
TAG_SELECTOR = "tag:"
FQN_SELECTOR = "fqn:"
CONFIG_SELECTOR = "config."
SOURCE_SELECTOR = "source:"
EXPOSURE_SELECTOR = "exposure:"
Expand All @@ -37,6 +38,26 @@
logger = get_logger(__name__)


def _node_fqn_str(node: DbtNode) -> str | None:
"""
Return the node's fully qualified name as a string (e.g. 'jaffle_shop.marts.customers').
FQN is project name, path segments, and file name without extension, joined by periods.
Returns None when node.fqn is not set.
See https://docs.getdbt.com/reference/node-selection/methods#fqn
"""
if node.fqn:
return ".".join(node.fqn)
return None


def _fqn_matches(node_fqn: str, fqn_selector_value: str) -> bool:
"""
Return True if the node's fully qualified name exactly equals the selector value.
FQN is a fully qualified name (e.g. project.folder.model_name); no wildcards or partial matching.
"""
return node_fqn == fqn_selector_value


def _check_nested_value_in_dict(dict_: dict[Any, Any], pattern: str) -> bool:
"""
Given a dictionary dict_, identify if the pattern defined in pattern happens on the dictionary.
Expand Down Expand Up @@ -84,6 +105,7 @@ class GraphSelector:
@model_g
+/path/to/model_g+
path:/path/to/model_h+
fqn:project.folder.model_name
+tag:nightly
+config.materialized:view
resource_type:resource_name
Expand Down Expand Up @@ -225,6 +247,13 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: # noqa: C901
tag_selection = self.node_name[len(TAG_SELECTOR) :]
root_nodes.update({node_id for node_id, node in nodes.items() if tag_selection in node.tags})

elif FQN_SELECTOR in self.node_name:
fqn_pattern = self.node_name[len(FQN_SELECTOR) :].strip()
for node_id, node in nodes.items():
node_fqn = _node_fqn_str(node)
if node_fqn and _fqn_matches(node_fqn, fqn_pattern):
root_nodes.add(node_id)

elif SOURCE_SELECTOR in self.node_name:
source_selection = self.node_name[len(SOURCE_SELECTOR) :]

Expand Down Expand Up @@ -355,6 +384,7 @@ def __init__(self, project_dir: Path | None, statement: str):
self.project_dir = project_dir
self.paths: list[Path] = []
self.tags: list[str] = []
self.fqns: list[str] = []
self.config: dict[str, str] = {}
self.other: list[str] = []
self.graph_selectors: list[GraphSelector] = []
Expand All @@ -373,6 +403,7 @@ def is_empty(self) -> bool:
return not (
self.paths
or self.tags
or self.fqns
or self.config
or self.graph_selectors
or self.other
Expand Down Expand Up @@ -428,6 +459,8 @@ def _handle_no_precursors_or_descendants(self, item: str, node_name: str) -> Non
self._parse_resource_type_selector(item)
elif node_name.startswith(EXCLUDE_RESOURCE_TYPE_SELECTOR):
self._parse_exclude_resource_type_selector(item)
elif node_name.startswith(FQN_SELECTOR):
self._parse_fqn_selector(item)
elif self._is_bare_identifier(node_name):
self._parse_bare_identifier(node_name)
else:
Expand Down Expand Up @@ -464,6 +497,11 @@ def _parse_tag_selector(self, item: str) -> None:
index = len(TAG_SELECTOR)
self.tags.append(item[index:])

def _parse_fqn_selector(self, item: str) -> None:
"""Parse fqn: selector; appends the fully qualified name (value after 'fqn:') to self.fqns."""
index = len(FQN_SELECTOR)
self.fqns.append(item[index:].strip())

def _parse_path_selector(self, item: str) -> None:
index = len(PATH_SELECTOR)
if self.project_dir:
Expand Down Expand Up @@ -505,6 +543,7 @@ def __repr__(self) -> str:
"SelectorConfig("
+ f"paths={self.paths}, "
+ f"tags={self.tags}, "
+ f"fqns={self.fqns}, "
+ f"config={self.config}, "
+ f"sources={self.sources}, "
+ f"resource={self.resource_types}, "
Expand Down Expand Up @@ -624,18 +663,23 @@ def _should_include_node(self, node_id: str, node: DbtNode) -> bool: # noqa: C9
) or not self._should_include_based_on_non_meta_and_non_tag_config(node, config_copy):
return False

if self.config.paths and not self._is_path_matching(node):
if not self._passes_selector_filters(node):
return False

return True

def _passes_selector_filters(self, node: DbtNode) -> bool:
"""Return True if node matches all configured selector filters (path, fqn, resource_type, source, exposure)."""
if self.config.paths and not self._is_path_matching(node):
return False
if self.config.fqns and not self._is_fqn_matching(node):
return False
if self.config.resource_types and not self._is_resource_type_matching(node):
return False

if self.config.exclude_resource_types and self._is_exclude_resource_type_matching(node):
return False

if self.config.sources and not self._is_source_matching(node):
return False

if self.config.exposures and not self._is_exposure_matching(node):
return False

Expand All @@ -647,6 +691,13 @@ def _should_include_node(self, node_id: str, node: DbtNode) -> bool: # noqa: C9

return True

def _is_fqn_matching(self, node: DbtNode) -> bool:
"""Checks if the node's fully qualified name exactly matches any of the config's fqn selector values."""
node_fqn = _node_fqn_str(node)
if node_fqn is None:
return False
return any(_fqn_matches(node_fqn, fqn_value) for fqn_value in self.config.fqns)

def _is_resource_type_matching(self, node: DbtNode) -> bool:
"""Checks if the node's resource type is a subset of the config's resource type."""
if node.resource_type.value not in self.config.resource_types:
Expand Down Expand Up @@ -946,10 +997,11 @@ def _parse_selection_from_cosmos_spec(method: str, value: str) -> tuple[str | No
- method="tag", value="nightly" -> ("tag:nightly", None)
- method="path", value="models/" -> ("path:models/", None)
- method="fqn", value="*" -> ("", None)
- method="fqn", value="jaffle_shop.customers" -> ("fqn:jaffle_shop.customers", None)
- method="config.materialized", value="view" -> ("config.materialized:view", None)
"""
if method == "fqn":
return ("" if value == "*" else value, None)
return "" if value == "*" else f"{FQN_SELECTOR}{value}", None
Comment thread
pankajastro marked this conversation as resolved.
Comment thread
pankajastro marked this conversation as resolved.

method_mappings = {
TAG_SELECTOR[:-1]: TAG_SELECTOR,
Expand Down Expand Up @@ -1416,6 +1468,7 @@ def validate_filters(exclude: list[str], select: list[str]) -> None:
if (
filter_parameter.startswith(PATH_SELECTOR)
or filter_parameter.startswith(TAG_SELECTOR)
or filter_parameter.startswith(FQN_SELECTOR)
or filter_parameter.startswith(RESOURCE_TYPE_SELECTOR)
or filter_parameter.startswith(EXCLUDE_RESOURCE_TYPE_SELECTOR)
or filter_parameter.startswith(SOURCE_SELECTOR)
Expand Down
2 changes: 1 addition & 1 deletion tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -2287,7 +2287,7 @@ def test_save_yaml_selectors_cache(mock_variable_set, mock_datetime, tmp_dbt_pro
hash_dir, hash_selectors, hash_impl = version.split(",")

assert hash_selectors == "43303af03e84e3b51fbfcf598261fae4"
assert hash_impl == "c7ba8b331e80ae876d5c7c7c1cfdf93d"
assert hash_impl == "4c93048c66ca45356e1677511447c7ba"

if sys.platform == "darwin":
# We faced inconsistent hashing versions depending on the version of MacOS/Linux - the following line aims to address these.
Expand Down
71 changes: 69 additions & 2 deletions tests/dbt/test_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from cosmos.constants import DbtResourceType
from cosmos.dbt.graph import DbtNode
from cosmos.dbt.selector import NodeSelector, SelectorConfig, YamlSelectors, select_nodes
from cosmos.dbt.selector import NodeSelector, SelectorConfig, YamlSelectors, _node_fqn_str, select_nodes
from cosmos.exceptions import CosmosValueError

SAMPLE_PROJ_PATH = Path("/home/user/path/dbt-proj/")
Expand Down Expand Up @@ -139,6 +139,73 @@ def test_select_nodes_by_select_config():
assert selected == expected


def test_node_fqn_str_returns_none_when_missing():
node = DbtNode(
unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.no_fqn",
resource_type=DbtResourceType.MODEL,
depends_on=[],
file_path=SAMPLE_PROJ_PATH / "models/no_fqn.sql",
tags=[],
config={},
fqn=None,
)

assert _node_fqn_str(node) is None


def test_select_nodes_with_fqn_selector():
project_name = SAMPLE_PROJ_PATH.stem

# Node that should match the selector
node_matching = DbtNode(
unique_id=f"{DbtResourceType.MODEL.value}.{project_name}.my_model",
Comment thread
pankajastro marked this conversation as resolved.
resource_type=DbtResourceType.MODEL,
depends_on=[],
file_path=SAMPLE_PROJ_PATH / "models/my_model.sql",
tags=[],
config={},
fqn=[project_name, "models", "my_model"],
)

# Node that should not match
node_not_matching = DbtNode(
unique_id=f"{DbtResourceType.MODEL.value}.{project_name}.other_model",
resource_type=DbtResourceType.MODEL,
depends_on=[],
file_path=SAMPLE_PROJ_PATH / "models/other_model.sql",
tags=[],
config={},
fqn=[project_name, "models", "other_model"],
)

# Node without FQN, should be skipped
node_without_fqn = DbtNode(
unique_id=f"{DbtResourceType.MODEL.value}.{project_name}.no_fqn",
resource_type=DbtResourceType.MODEL,
depends_on=[],
file_path=SAMPLE_PROJ_PATH / "models/no_fqn.sql",
tags=[],
config={},
fqn=None,
)

nodes = {
node_matching.unique_id: node_matching,
node_not_matching.unique_id: node_not_matching,
node_without_fqn.unique_id: node_without_fqn,
}

# Non-empty select ensures config.is_empty == False
selected = select_nodes(
project_dir=SAMPLE_PROJ_PATH,
nodes=nodes,
select=[f"+fqn:{project_name}.models.my_model"], # triggers FQN_SELECTOR
)

# Only the matching node should be returned
assert list(selected.keys()) == [node_matching.unique_id]


def test_select_nodes_by_select_config_meta_nested_property():
selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["config.meta.frequency:daily"])
expected = {another_grandparent_node.unique_id: another_grandparent_node}
Expand Down Expand Up @@ -1305,7 +1372,7 @@ def test_exposure_selector():
(
"fqn_method",
{"name": "fqn_method", "definition": {"method": "fqn", "value": "customers"}},
{"select": ["customers"], "exclude": None},
{"select": ["fqn:customers"], "exclude": None},
),
(
"fqn_star_method",
Expand Down