diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 9488f185f1..e325213d9b 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -24,6 +24,7 @@ SUPPORTED_CONFIG = ["materialized", "schema", "tags", CONFIG_META_PATH] PATH_SELECTOR = "path:" TAG_SELECTOR = "tag:" +FQN_SELECTOR = "fqn:" CONFIG_SELECTOR = "config." SOURCE_SELECTOR = "source:" EXPOSURE_SELECTOR = "exposure:" @@ -37,6 +38,26 @@ logger = get_logger(__name__) +def _node_fqn_str(node: DbtNode) -> str | None: + """ + Return the node's fully qualified name as a string (e.g. 'jaffle_shop.marts.customers'). + FQN is project name, path segments, and file name without extension, joined by periods. + Returns None when node.fqn is not set. + See https://docs.getdbt.com/reference/node-selection/methods#fqn + """ + if node.fqn: + return ".".join(node.fqn) + return None + + +def _fqn_matches(node_fqn: str, fqn_selector_value: str) -> bool: + """ + Return True if the node's fully qualified name exactly equals the selector value. + FQN is a fully qualified name (e.g. project.folder.model_name); no wildcards or partial matching. + """ + return node_fqn == fqn_selector_value + + def _check_nested_value_in_dict(dict_: dict[Any, Any], pattern: str) -> bool: """ Given a dictionary dict_, identify if the pattern defined in pattern happens on the dictionary. @@ -84,6 +105,7 @@ class GraphSelector: @model_g +/path/to/model_g+ path:/path/to/model_h+ + fqn:project.folder.model_name +tag:nightly +config.materialized:view resource_type:resource_name @@ -225,6 +247,13 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: # noqa: C901 tag_selection = self.node_name[len(TAG_SELECTOR) :] root_nodes.update({node_id for node_id, node in nodes.items() if tag_selection in node.tags}) + elif FQN_SELECTOR in self.node_name: + fqn_pattern = self.node_name[len(FQN_SELECTOR) :].strip() + for node_id, node in nodes.items(): + node_fqn = _node_fqn_str(node) + if node_fqn and _fqn_matches(node_fqn, fqn_pattern): + root_nodes.add(node_id) + elif SOURCE_SELECTOR in self.node_name: source_selection = self.node_name[len(SOURCE_SELECTOR) :] @@ -355,6 +384,7 @@ def __init__(self, project_dir: Path | None, statement: str): self.project_dir = project_dir self.paths: list[Path] = [] self.tags: list[str] = [] + self.fqns: list[str] = [] self.config: dict[str, str] = {} self.other: list[str] = [] self.graph_selectors: list[GraphSelector] = [] @@ -373,6 +403,7 @@ def is_empty(self) -> bool: return not ( self.paths or self.tags + or self.fqns or self.config or self.graph_selectors or self.other @@ -428,6 +459,8 @@ def _handle_no_precursors_or_descendants(self, item: str, node_name: str) -> Non self._parse_resource_type_selector(item) elif node_name.startswith(EXCLUDE_RESOURCE_TYPE_SELECTOR): self._parse_exclude_resource_type_selector(item) + elif node_name.startswith(FQN_SELECTOR): + self._parse_fqn_selector(item) elif self._is_bare_identifier(node_name): self._parse_bare_identifier(node_name) else: @@ -464,6 +497,11 @@ def _parse_tag_selector(self, item: str) -> None: index = len(TAG_SELECTOR) self.tags.append(item[index:]) + def _parse_fqn_selector(self, item: str) -> None: + """Parse fqn: selector; appends the fully qualified name (value after 'fqn:') to self.fqns.""" + index = len(FQN_SELECTOR) + self.fqns.append(item[index:].strip()) + def _parse_path_selector(self, item: str) -> None: index = len(PATH_SELECTOR) if self.project_dir: @@ -505,6 +543,7 @@ def __repr__(self) -> str: "SelectorConfig(" + f"paths={self.paths}, " + f"tags={self.tags}, " + + f"fqns={self.fqns}, " + f"config={self.config}, " + f"sources={self.sources}, " + f"resource={self.resource_types}, " @@ -624,18 +663,23 @@ def _should_include_node(self, node_id: str, node: DbtNode) -> bool: # noqa: C9 ) or not self._should_include_based_on_non_meta_and_non_tag_config(node, config_copy): return False - if self.config.paths and not self._is_path_matching(node): + if not self._passes_selector_filters(node): return False + return True + + def _passes_selector_filters(self, node: DbtNode) -> bool: + """Return True if node matches all configured selector filters (path, fqn, resource_type, source, exposure).""" + if self.config.paths and not self._is_path_matching(node): + return False + if self.config.fqns and not self._is_fqn_matching(node): + return False if self.config.resource_types and not self._is_resource_type_matching(node): return False - if self.config.exclude_resource_types and self._is_exclude_resource_type_matching(node): return False - if self.config.sources and not self._is_source_matching(node): return False - if self.config.exposures and not self._is_exposure_matching(node): return False @@ -647,6 +691,13 @@ def _should_include_node(self, node_id: str, node: DbtNode) -> bool: # noqa: C9 return True + def _is_fqn_matching(self, node: DbtNode) -> bool: + """Checks if the node's fully qualified name exactly matches any of the config's fqn selector values.""" + node_fqn = _node_fqn_str(node) + if node_fqn is None: + return False + return any(_fqn_matches(node_fqn, fqn_value) for fqn_value in self.config.fqns) + def _is_resource_type_matching(self, node: DbtNode) -> bool: """Checks if the node's resource type is a subset of the config's resource type.""" if node.resource_type.value not in self.config.resource_types: @@ -946,10 +997,11 @@ def _parse_selection_from_cosmos_spec(method: str, value: str) -> tuple[str | No - method="tag", value="nightly" -> ("tag:nightly", None) - method="path", value="models/" -> ("path:models/", None) - method="fqn", value="*" -> ("", None) + - method="fqn", value="jaffle_shop.customers" -> ("fqn:jaffle_shop.customers", None) - method="config.materialized", value="view" -> ("config.materialized:view", None) """ if method == "fqn": - return ("" if value == "*" else value, None) + return "" if value == "*" else f"{FQN_SELECTOR}{value}", None method_mappings = { TAG_SELECTOR[:-1]: TAG_SELECTOR, @@ -1416,6 +1468,7 @@ def validate_filters(exclude: list[str], select: list[str]) -> None: if ( filter_parameter.startswith(PATH_SELECTOR) or filter_parameter.startswith(TAG_SELECTOR) + or filter_parameter.startswith(FQN_SELECTOR) or filter_parameter.startswith(RESOURCE_TYPE_SELECTOR) or filter_parameter.startswith(EXCLUDE_RESOURCE_TYPE_SELECTOR) or filter_parameter.startswith(SOURCE_SELECTOR) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 82c1ca0070..8f2aa9e43a 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -2287,7 +2287,7 @@ def test_save_yaml_selectors_cache(mock_variable_set, mock_datetime, tmp_dbt_pro hash_dir, hash_selectors, hash_impl = version.split(",") assert hash_selectors == "43303af03e84e3b51fbfcf598261fae4" - assert hash_impl == "c7ba8b331e80ae876d5c7c7c1cfdf93d" + assert hash_impl == "4c93048c66ca45356e1677511447c7ba" if sys.platform == "darwin": # We faced inconsistent hashing versions depending on the version of MacOS/Linux - the following line aims to address these. diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index 9c5f5b208a..44e3babe5a 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -4,7 +4,7 @@ from cosmos.constants import DbtResourceType from cosmos.dbt.graph import DbtNode -from cosmos.dbt.selector import NodeSelector, SelectorConfig, YamlSelectors, select_nodes +from cosmos.dbt.selector import NodeSelector, SelectorConfig, YamlSelectors, _node_fqn_str, select_nodes from cosmos.exceptions import CosmosValueError SAMPLE_PROJ_PATH = Path("/home/user/path/dbt-proj/") @@ -139,6 +139,73 @@ def test_select_nodes_by_select_config(): assert selected == expected +def test_node_fqn_str_returns_none_when_missing(): + node = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.no_fqn", + resource_type=DbtResourceType.MODEL, + depends_on=[], + file_path=SAMPLE_PROJ_PATH / "models/no_fqn.sql", + tags=[], + config={}, + fqn=None, + ) + + assert _node_fqn_str(node) is None + + +def test_select_nodes_with_fqn_selector(): + project_name = SAMPLE_PROJ_PATH.stem + + # Node that should match the selector + node_matching = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.{project_name}.my_model", + resource_type=DbtResourceType.MODEL, + depends_on=[], + file_path=SAMPLE_PROJ_PATH / "models/my_model.sql", + tags=[], + config={}, + fqn=[project_name, "models", "my_model"], + ) + + # Node that should not match + node_not_matching = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.{project_name}.other_model", + resource_type=DbtResourceType.MODEL, + depends_on=[], + file_path=SAMPLE_PROJ_PATH / "models/other_model.sql", + tags=[], + config={}, + fqn=[project_name, "models", "other_model"], + ) + + # Node without FQN, should be skipped + node_without_fqn = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.{project_name}.no_fqn", + resource_type=DbtResourceType.MODEL, + depends_on=[], + file_path=SAMPLE_PROJ_PATH / "models/no_fqn.sql", + tags=[], + config={}, + fqn=None, + ) + + nodes = { + node_matching.unique_id: node_matching, + node_not_matching.unique_id: node_not_matching, + node_without_fqn.unique_id: node_without_fqn, + } + + # Non-empty select ensures config.is_empty == False + selected = select_nodes( + project_dir=SAMPLE_PROJ_PATH, + nodes=nodes, + select=[f"+fqn:{project_name}.models.my_model"], # triggers FQN_SELECTOR + ) + + # Only the matching node should be returned + assert list(selected.keys()) == [node_matching.unique_id] + + def test_select_nodes_by_select_config_meta_nested_property(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["config.meta.frequency:daily"]) expected = {another_grandparent_node.unique_id: another_grandparent_node} @@ -1305,7 +1372,7 @@ def test_exposure_selector(): ( "fqn_method", {"name": "fqn_method", "definition": {"method": "fqn", "value": "customers"}}, - {"select": ["customers"], "exclude": None}, + {"select": ["fqn:customers"], "exclude": None}, ), ( "fqn_star_method",