From d7348ba814d9e198d3cd6d3b39d6b7bf346cf320 Mon Sep 17 00:00:00 2001 From: Tyler Seruga Date: Tue, 1 Aug 2023 15:11:34 -0500 Subject: [PATCH 01/13] Add dbt graph selector syntax for upstream dependencies (prefix +) Part of the spec described in: https://docs.getdbt.com/reference/node-selection/graph-operators#the-plus-operator Co-authored-by: Tatiana Al-Chueyr --- cosmos/dbt/selector.py | 51 ++++++++++++++-- tests/dbt/test_selector.py | 115 ++++++++++++++++++++++++++++++------- 2 files changed, 138 insertions(+), 28 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index c7316dc75e..398d40624d 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -1,7 +1,7 @@ from __future__ import annotations -from pathlib import Path import copy - +from collections import defaultdict +from pathlib import Path from typing import TYPE_CHECKING, Any from cosmos.constants import DbtResourceType @@ -16,7 +16,7 @@ PATH_SELECTOR = "path:" TAG_SELECTOR = "tag:" CONFIG_SELECTOR = "config." - +MODEL_UPSTREAM_SELECTOR = "+" logger = get_logger(__name__) @@ -42,6 +42,7 @@ def __init__(self, project_dir: Path | None, statement: str): self.paths: list[Path] = [] self.tags: list[str] = [] self.config: dict[str, str] = {} + self.model_upstream: str = "" self.other: list[str] = [] self.load_from_statement(statement) @@ -76,6 +77,9 @@ def load_from_statement(self, statement: str) -> None: key, value = item[index:].split(":") if key in SUPPORTED_CONFIG: self.config[key] = value + elif item.startswith(MODEL_UPSTREAM_SELECTOR): + index = len(MODEL_UPSTREAM_SELECTOR) + self.model_upstream = item[index:] else: self.other.append(item) logger.warning("Unsupported select statement: %s", item) @@ -175,6 +179,34 @@ def _is_path_matching(self, node: DbtNode) -> bool: return self._should_include_node(node.depends_on[0], model_node) return False + def select_nodes_by_precursor(self) -> set[str]: + """ + Return a list of node ids which match the configuration defined in the config. + + Return all nodes that are parents (or parents from parents) of the root defined in the configuration. + + References: + https://docs.getdbt.com/reference/node-selection/syntax + https://docs.getdbt.com/reference/node-selection/yaml-selectors + """ + root_id, root = None, None + for node_id, node in self.nodes.items(): + if node.name == self.config.model_upstream: + root_id, root = node_id, node + + if not root: + logger.warn("Model in selector not found in DBT graph") + return set() + + selected_nodes = set() + precursors = set([root_id]) + while precursors: + precursor_id = precursors.pop() + selected_nodes.add(precursor_id) + precursors = precursors.union(set(self.nodes[precursor_id].depends_on)) + + return selected_nodes + def retrieve_by_label(statement_list: list[str], label: str) -> set[str]: """ @@ -217,7 +249,10 @@ def select_nodes( filters = [["select", select], ["exclude", exclude]] for filter_type, filter in filters: for filter_parameter in filter: - if filter_parameter.startswith(PATH_SELECTOR) or filter_parameter.startswith(TAG_SELECTOR): + if (filter_parameter.startswith(PATH_SELECTOR) or + filter_parameter.startswith(TAG_SELECTOR) or + filter_parameter.startswith(MODEL_UPSTREAM_SELECTOR) + ): continue elif any([filter_parameter.startswith(CONFIG_SELECTOR + config + ":") for config in SUPPORTED_CONFIG]): continue @@ -229,8 +264,12 @@ def select_nodes( for statement in select: config = SelectorConfig(project_dir, statement) node_selector = NodeSelector(nodes, config) - select_ids = node_selector.select_nodes_ids_by_intersection() - subset_ids = subset_ids.union(set(select_ids)) + if config.model_upstream: + select_ids = node_selector.select_nodes_by_precursor() + subset_ids = subset_ids.union(set(select_ids)) + else: + select_ids = node_selector.select_nodes_ids_by_intersection() + subset_ids = subset_ids.union(set(select_ids)) if select: nodes = {id_: nodes[id_] for id_ in subset_ids} diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index f7ece63910..6da699d05d 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -46,47 +46,69 @@ def test_is_empty_config(selector_config, paths, tags, config, other, expected): tags=["has_child"], config={"materialized": "view", "tags": ["has_child"]}, ) + +another_grandparent_node = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.another_grandparent_node", + resource_type=DbtResourceType.MODEL, + depends_on=[], + file_path=SAMPLE_PROJ_PATH / "gen1/models/another_grandparent_node.sql", + tags=[], + config={}, +) + parent_node = DbtNode( unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.parent", resource_type=DbtResourceType.MODEL, - depends_on=["grandparent"], + depends_on=[grandparent_node.unique_id, another_grandparent_node.unique_id], file_path=SAMPLE_PROJ_PATH / "gen2/models/parent.sql", tags=["has_child", "is_child"], config={"materialized": "view", "tags": ["has_child", "is_child"]}, ) + child_node = DbtNode( unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.child", resource_type=DbtResourceType.MODEL, - depends_on=["parent"], + depends_on=[parent_node.unique_id], file_path=SAMPLE_PROJ_PATH / "gen3/models/child.sql", tags=["nightly", "is_child"], config={"materialized": "table", "tags": ["nightly", "is_child"]}, ) -grandchild_1_test_node = DbtNode( - unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.grandchild_1", +sibling1_node = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.sibling1", resource_type=DbtResourceType.MODEL, - depends_on=["parent"], - file_path=SAMPLE_PROJ_PATH / "gen3/models/grandchild_1.sql", + depends_on=[parent_node.unique_id], + file_path=SAMPLE_PROJ_PATH / "gen3/models/sibling1.sql", tags=["nightly", "deprecated", "test"], config={"materialized": "table", "tags": ["nightly", "deprecated", "test"]}, ) -grandchild_2_test_node = DbtNode( - unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.grandchild_2", +sibling2_node = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.sibling2", resource_type=DbtResourceType.MODEL, - depends_on=["parent"], - file_path=SAMPLE_PROJ_PATH / "gen3/models/grandchild_2.sql", + depends_on=[parent_node.unique_id], + file_path=SAMPLE_PROJ_PATH / "gen3/models/sibling2.sql", tags=["nightly", "deprecated", "test2"], config={"materialized": "table", "tags": ["nightly", "deprecated", "test2"]}, ) +orphaned_node = DbtNode( + unique_id=f"{DbtResourceType.MODEL.value}.{SAMPLE_PROJ_PATH.stem}.orphaned", + resource_type=DbtResourceType.MODEL, + depends_on=[], + file_path=SAMPLE_PROJ_PATH / "gen3/models/orphaned.sql", + tags=[], + config={}, +) + sample_nodes = { grandparent_node.unique_id: grandparent_node, + another_grandparent_node.unique_id: another_grandparent_node, parent_node.unique_id: parent_node, child_node.unique_id: child_node, - grandchild_1_test_node.unique_id: grandchild_1_test_node, - grandchild_2_test_node.unique_id: grandchild_2_test_node, + sibling1_node.unique_id: sibling1_node, + sibling2_node.unique_id: sibling2_node, + orphaned_node.unique_id: orphaned_node, } @@ -100,8 +122,8 @@ def test_select_nodes_by_select_config(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["config.materialized:table"]) expected = { child_node.unique_id: child_node, - grandchild_1_test_node.unique_id: grandchild_1_test_node, - grandchild_2_test_node.unique_id: grandchild_2_test_node, + sibling1_node.unique_id: sibling1_node, + sibling2_node.unique_id: sibling2_node, } assert selected == expected @@ -136,8 +158,8 @@ def test_select_nodes_by_select_union_config_test_tags(): expected = { grandparent_node.unique_id: grandparent_node, parent_node.unique_id: parent_node, - grandchild_1_test_node.unique_id: grandchild_1_test_node, - grandchild_2_test_node.unique_id: grandchild_2_test_node, + sibling1_node.unique_id: sibling1_node, + sibling2_node.unique_id: sibling2_node, } assert selected == expected @@ -176,8 +198,8 @@ def test_select_nodes_by_select_union(): grandparent_node.unique_id: grandparent_node, parent_node.unique_id: parent_node, child_node.unique_id: child_node, - grandchild_1_test_node.unique_id: grandchild_1_test_node, - grandchild_2_test_node.unique_id: grandchild_2_test_node, + sibling1_node.unique_id: sibling1_node, + sibling2_node.unique_id: sibling2_node, } assert selected == expected @@ -191,8 +213,10 @@ def test_select_nodes_by_exclude_tag(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, exclude=["tag:has_child"]) expected = { child_node.unique_id: child_node, - grandchild_1_test_node.unique_id: grandchild_1_test_node, - grandchild_2_test_node.unique_id: grandchild_2_test_node, + sibling1_node.unique_id: sibling1_node, + sibling2_node.unique_id: sibling2_node, + another_grandparent_node.unique_id: another_grandparent_node, + orphaned_node.unique_id: orphaned_node, } assert selected == expected @@ -217,18 +241,43 @@ def test_select_nodes_by_exclude_union_config_test_tags(): ) expected = { grandparent_node.unique_id: grandparent_node, + another_grandparent_node.unique_id: another_grandparent_node, parent_node.unique_id: parent_node, child_node.unique_id: child_node, + orphaned_node.unique_id: orphaned_node, } assert selected == expected +def test_select_nodes_by_child_and_precursors(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+child"]) + expected = [ + another_grandparent_node.unique_id, + child_node.unique_id, + grandparent_node.unique_id, + parent_node.unique_id, + ] + assert sorted(selected.keys()) == expected + + def test_select_nodes_by_path_dir(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen3/models"]) expected = { child_node.unique_id: child_node, - grandchild_1_test_node.unique_id: grandchild_1_test_node, - grandchild_2_test_node.unique_id: grandchild_2_test_node, + sibling1_node.unique_id: sibling1_node, + sibling2_node.unique_id: sibling2_node, + orphaned_node.unique_id: orphaned_node, + } + assert selected == expected + + +def test_select_nodes_by_child_and_precursors_exclude_tags(): + selected = select_nodes( + project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+child"], exclude=["tag:has_child"] + ) + expected = { + another_grandparent_node.unique_id: another_grandparent_node, + child_node.unique_id: child_node, } assert selected == expected @@ -239,3 +288,25 @@ def test_select_nodes_by_path_file(): parent_node.unique_id: parent_node, } assert selected == expected + + +def test_select_node_by_child_and_precursors_partial_tree(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+parent"]) + expected = { + grandparent_node.unique_id: grandparent_node, + another_grandparent_node.unique_id: another_grandparent_node, + parent_node.unique_id: parent_node, + } + assert selected == expected + + +def test_select_node_by_dfs_leaf(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+orphaned"]) + expected = {orphaned_node.unique_id: orphaned_node} + assert selected == expected + + +def test_select_node_by_dfs_no_node(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+modelDoesntExist"]) + expected = {} + assert selected == expected From 65fd4e16754969f1960b3a4bc641a74888179cfe Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 29 Nov 2023 13:31:52 +0000 Subject: [PATCH 02/13] Refactor graph selector implementation --- cosmos/dbt/selector.py | 100 +++++++++++++++++++++++++++---------- tests/dbt/test_selector.py | 39 +++++++-------- 2 files changed, 91 insertions(+), 48 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 398d40624d..d71e0a081b 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -1,6 +1,7 @@ from __future__ import annotations import copy -from collections import defaultdict +import re +from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, Any @@ -16,11 +17,43 @@ PATH_SELECTOR = "path:" TAG_SELECTOR = "tag:" CONFIG_SELECTOR = "config." -MODEL_UPSTREAM_SELECTOR = "+" +PLUS_SELECTOR = "+" +GRAPH_SELECTOR_REGEX = r"^([0-9]*\+)?([^\+]+)(\+[0-9]*)?$|" logger = get_logger(__name__) +@dataclass +class GraphSelector: + node_name: str + precursors: str | None + descendants: str | None + + @property + def precursors_depth(self) -> int: + if not self.precursors: + return 0 + if self.precursors == "+": + return -1 + else: + return int(self.precursors[:-1]) + + @property + def descendants_depth(self) -> int: + if not self.descendants: + return 0 + if self.descendants == "+": + return -1 + else: + return int(self.descendants[1:]) + + @staticmethod + def parse(text: str) -> GraphSelector | None: + precursors, node_name, descendants = re.search(GRAPH_SELECTOR_REGEX, text).groups() + if node_name: + return GraphSelector(node_name, precursors, descendants) + + class SelectorConfig: """ Represents a select/exclude statement. @@ -42,8 +75,8 @@ def __init__(self, project_dir: Path | None, statement: str): self.paths: list[Path] = [] self.tags: list[str] = [] self.config: dict[str, str] = {} - self.model_upstream: str = "" self.other: list[str] = [] + self.graph_selectors: list[GraphSelector] = [] self.load_from_statement(statement) @property @@ -62,6 +95,7 @@ def load_from_statement(self, statement: str) -> None: https://docs.getdbt.com/reference/node-selection/yaml-selectors """ items = statement.split(",") + for item in items: if item.startswith(PATH_SELECTOR): index = len(PATH_SELECTOR) @@ -77,12 +111,13 @@ def load_from_statement(self, statement: str) -> None: key, value = item[index:].split(":") if key in SUPPORTED_CONFIG: self.config[key] = value - elif item.startswith(MODEL_UPSTREAM_SELECTOR): - index = len(MODEL_UPSTREAM_SELECTOR) - self.model_upstream = item[index:] else: - self.other.append(item) - logger.warning("Unsupported select statement: %s", item) + graph_selector = GraphSelector.parse(item) + if graph_selector is not None: + self.graph_selectors.append(graph_selector) + else: + self.other.append(item) + logger.warning("Unsupported select statement: %s", item) def __repr__(self) -> str: return f"SelectorConfig(paths={self.paths}, tags={self.tags}, config={self.config}, other={self.other})" @@ -179,7 +214,7 @@ def _is_path_matching(self, node: DbtNode) -> bool: return self._should_include_node(node.depends_on[0], model_node) return False - def select_nodes_by_precursor(self) -> set[str]: + def select_node_precursors(self) -> set[str]: """ Return a list of node ids which match the configuration defined in the config. @@ -189,21 +224,28 @@ def select_nodes_by_precursor(self) -> set[str]: https://docs.getdbt.com/reference/node-selection/syntax https://docs.getdbt.com/reference/node-selection/yaml-selectors """ - root_id, root = None, None + selected_nodes = set() + + # Index nodes by name + node_by_name = {} for node_id, node in self.nodes.items(): - if node.name == self.config.model_upstream: - root_id, root = node_id, node + node_by_name[node.name] = node_id - if not root: - logger.warn("Model in selector not found in DBT graph") - return set() + for graph_selector in self.config.graph_selectors: + if graph_selector.node_name in node_by_name: + root_id = node_by_name[graph_selector.node_name] + else: + logger.warn("Model in selector not found in DBT graph") + break - selected_nodes = set() - precursors = set([root_id]) - while precursors: - precursor_id = precursors.pop() - selected_nodes.add(precursor_id) - precursors = precursors.union(set(self.nodes[precursor_id].depends_on)) + selected_nodes.add(root_id) + + if graph_selector.precursors: + precursors = {root_id} + while precursors: + precursor_id = precursors.pop() + selected_nodes.add(precursor_id) + precursors = precursors.union(set(self.nodes[precursor_id].depends_on)) return selected_nodes @@ -249,23 +291,27 @@ def select_nodes( filters = [["select", select], ["exclude", exclude]] for filter_type, filter in filters: for filter_parameter in filter: - if (filter_parameter.startswith(PATH_SELECTOR) or - filter_parameter.startswith(TAG_SELECTOR) or - filter_parameter.startswith(MODEL_UPSTREAM_SELECTOR) + if ( + filter_parameter.startswith(PATH_SELECTOR) + or filter_parameter.startswith(TAG_SELECTOR) + or PLUS_SELECTOR in filter_parameter ): continue elif any([filter_parameter.startswith(CONFIG_SELECTOR + config + ":") for config in SUPPORTED_CONFIG]): continue - else: + elif ":" in filter_parameter: raise CosmosValueError(f"Invalid {filter_type} filter: {filter_parameter}") + else: + logger.warn(f"Best effort in processing filter {filter}") subset_ids: set[str] = set() for statement in select: config = SelectorConfig(project_dir, statement) node_selector = NodeSelector(nodes, config) - if config.model_upstream: - select_ids = node_selector.select_nodes_by_precursor() + # TODO: Fix this + if config.graph_selectors: + select_ids = node_selector.select_node_precursors() subset_ids = subset_ids.union(set(select_ids)) else: select_ids = node_selector.select_nodes_ids_by_intersection() diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index 6da699d05d..f70527da51 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -275,38 +275,35 @@ def test_select_nodes_by_child_and_precursors_exclude_tags(): selected = select_nodes( project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+child"], exclude=["tag:has_child"] ) - expected = { - another_grandparent_node.unique_id: another_grandparent_node, - child_node.unique_id: child_node, - } - assert selected == expected + expected = [another_grandparent_node.unique_id, child_node.unique_id] + assert sorted(selected.keys()) == expected def test_select_nodes_by_path_file(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen2/models/parent.sql"]) - expected = { - parent_node.unique_id: parent_node, - } - assert selected == expected + expected = [parent_node.unique_id] + assert list(selected.keys()) == expected def test_select_node_by_child_and_precursors_partial_tree(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+parent"]) - expected = { - grandparent_node.unique_id: grandparent_node, - another_grandparent_node.unique_id: another_grandparent_node, - parent_node.unique_id: parent_node, - } - assert selected == expected + expected = [another_grandparent_node.unique_id, grandparent_node.unique_id, parent_node.unique_id] + assert sorted(selected.keys()) == expected -def test_select_node_by_dfs_leaf(): +def test_select_node_by_precursors_with_orphaned_node(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+orphaned"]) - expected = {orphaned_node.unique_id: orphaned_node} - assert selected == expected + expected = [orphaned_node.unique_id] + assert list(selected.keys()) == expected + + +def test_select_node_by_exact_node_name(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["child"]) + expected = [child_node.unique_id] + assert list(selected.keys()) == expected -def test_select_node_by_dfs_no_node(): +def test_select_node_by_child_and_precursors_no_node(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+modelDoesntExist"]) - expected = {} - assert selected == expected + expected = [] + assert list(selected.keys()) == expected From 922fd69fa25a58d1492e81c264ab69f622fb6154 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 29 Nov 2023 14:23:04 +0000 Subject: [PATCH 03/13] Support depth in graph percursors --- cosmos/dbt/selector.py | 25 +++++++++++++++++-------- tests/dbt/test_selector.py | 26 ++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index d71e0a081b..d7baaa0157 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -49,9 +49,11 @@ def descendants_depth(self) -> int: @staticmethod def parse(text: str) -> GraphSelector | None: - precursors, node_name, descendants = re.search(GRAPH_SELECTOR_REGEX, text).groups() - if node_name: + regex_match = re.search(GRAPH_SELECTOR_REGEX, text) + if regex_match: + precursors, node_name, descendants = regex_match.groups() return GraphSelector(node_name, precursors, descendants) + return None class SelectorConfig: @@ -235,17 +237,24 @@ def select_node_precursors(self) -> set[str]: if graph_selector.node_name in node_by_name: root_id = node_by_name[graph_selector.node_name] else: - logger.warn("Model in selector not found in DBT graph") + logger.warn(f"Selector {graph_selector.node_name} not found.") break selected_nodes.add(root_id) if graph_selector.precursors: - precursors = {root_id} - while precursors: - precursor_id = precursors.pop() - selected_nodes.add(precursor_id) - precursors = precursors.union(set(self.nodes[precursor_id].depends_on)) + depth = graph_selector.precursors_depth + previous_generation = {root_id} + processed_nodes = set() + while depth and previous_generation: + new_generation: set[str] = set() + for node_id in previous_generation: + if node_id not in processed_nodes: + new_generation = new_generation.union(set(self.nodes[node_id].depends_on)) + processed_nodes.add(node_id) + selected_nodes = selected_nodes.union(new_generation) + previous_generation = new_generation + depth -= 1 return selected_nodes diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index f70527da51..48fcf31896 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -297,6 +297,26 @@ def test_select_node_by_precursors_with_orphaned_node(): assert list(selected.keys()) == expected +def test_select_nodes_by_child_and_first_degree_precursors(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["1+child"]) + expected = [ + child_node.unique_id, + parent_node.unique_id, + ] + assert sorted(selected.keys()) == expected + + +def test_select_nodes_by_child_and_second_degree_precursors(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["2+child"]) + expected = [ + another_grandparent_node.unique_id, + child_node.unique_id, + grandparent_node.unique_id, + parent_node.unique_id, + ] + assert sorted(selected.keys()) == expected + + def test_select_node_by_exact_node_name(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["child"]) expected = [child_node.unique_id] @@ -307,3 +327,9 @@ def test_select_node_by_child_and_precursors_no_node(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+modelDoesntExist"]) expected = [] assert list(selected.keys()) == expected + + +# TODO: precursors depth +# TODO: successors logic +# TODO: intersection of graph selector +# TODO: union of graph selector From febdcb3a6daf96df55fdfd72df251b276e67757e Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 29 Nov 2023 14:25:58 +0000 Subject: [PATCH 04/13] Reduce code complexity --- cosmos/dbt/selector.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index d7baaa0157..f4ce89cd83 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -304,14 +304,11 @@ def select_nodes( filter_parameter.startswith(PATH_SELECTOR) or filter_parameter.startswith(TAG_SELECTOR) or PLUS_SELECTOR in filter_parameter + or any([filter_parameter.startswith(CONFIG_SELECTOR + config + ":") for config in SUPPORTED_CONFIG]) ): continue - elif any([filter_parameter.startswith(CONFIG_SELECTOR + config + ":") for config in SUPPORTED_CONFIG]): - continue elif ":" in filter_parameter: raise CosmosValueError(f"Invalid {filter_type} filter: {filter_parameter}") - else: - logger.warn(f"Best effort in processing filter {filter}") subset_ids: set[str] = set() From 97d7247d307a0622e0393f35be20fe66c28dccf8 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 30 Nov 2023 11:25:32 +0000 Subject: [PATCH 05/13] Support node descendants with depth --- cosmos/dbt/selector.py | 82 ++++++++++++++++++++++++++++---------- tests/dbt/test_selector.py | 59 ++++++++++++++++++--------- 2 files changed, 102 insertions(+), 39 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index f4ce89cd83..087e45888f 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -1,6 +1,7 @@ from __future__ import annotations import copy import re +from collections import defaultdict from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, Any @@ -55,6 +56,58 @@ def parse(text: str) -> GraphSelector | None: return GraphSelector(node_name, precursors, descendants) return None + def select_node_precursors(self, nodes: dict[str, DbtNode], root_id: str, selected_nodes: set[DbtNode]): + """ + Parse original nodes and add the precursor nodes related to this config to the selected_nodes set. + + :param nodes: Original dbt nodes list + :param root_id: Unique identifier of self.node_name + :param selected_nodes: Set where precursor nodes will be added to. + """ + if self.precursors: + depth = self.precursors_depth + previous_generation = {root_id} + processed_nodes = set() + while depth and previous_generation: + new_generation: set[str] = set() + for node_id in previous_generation: + if node_id not in processed_nodes: + new_generation.update(set(nodes[node_id].depends_on)) + processed_nodes.add(node_id) + selected_nodes.update(new_generation) + previous_generation = new_generation + depth -= 1 + + def select_node_descendants( + self, nodes: dict[str, DbtNode], root_id: str, selected_nodes: set[DbtNode] + ) -> set[DbtNode]: + """ + Parse original nodes and add the descendant nodes related to this config to the selected_nodes set. + + :param nodes: Original dbt nodes list + :param root_id: Unique identifier of self.node_name + :param selected_nodes: Set where precursor nodes will be added to. + """ + if self.descendants: + children_by_node = defaultdict(set) + for node_id, node in nodes.items(): + for parent_id in node.depends_on: + children_by_node[parent_id].add(node_id) + + depth = self.descendants_depth + previous_generation = {root_id} + processed_nodes = set() + while depth and previous_generation: + new_generation: set[str] = set() + for node_id in previous_generation: + if node_id not in processed_nodes: + new_generation.update(set(children_by_node[node_id])) + processed_nodes.add(node_id) + selected_nodes.update(new_generation) + previous_generation = new_generation + depth -= 1 + return selected_nodes + class SelectorConfig: """ @@ -216,7 +269,7 @@ def _is_path_matching(self, node: DbtNode) -> bool: return self._should_include_node(node.depends_on[0], model_node) return False - def select_node_precursors(self) -> set[str]: + def select_by_graph_operator(self) -> set[str]: """ Return a list of node ids which match the configuration defined in the config. @@ -233,6 +286,7 @@ def select_node_precursors(self) -> set[str]: for node_id, node in self.nodes.items(): node_by_name[node.name] = node_id + # TODO: refactor for graph_selector in self.config.graph_selectors: if graph_selector.node_name in node_by_name: root_id = node_by_name[graph_selector.node_name] @@ -241,20 +295,8 @@ def select_node_precursors(self) -> set[str]: break selected_nodes.add(root_id) - - if graph_selector.precursors: - depth = graph_selector.precursors_depth - previous_generation = {root_id} - processed_nodes = set() - while depth and previous_generation: - new_generation: set[str] = set() - for node_id in previous_generation: - if node_id not in processed_nodes: - new_generation = new_generation.union(set(self.nodes[node_id].depends_on)) - processed_nodes.add(node_id) - selected_nodes = selected_nodes.union(new_generation) - previous_generation = new_generation - depth -= 1 + graph_selector.select_node_precursors(self.nodes, root_id, selected_nodes) + graph_selector.select_node_descendants(self.nodes, root_id, selected_nodes) return selected_nodes @@ -272,7 +314,7 @@ def retrieve_by_label(statement_list: list[str], label: str) -> set[str]: for statement in statement_list: config = SelectorConfig(Path(), statement) item_values = getattr(config, label) - label_values = label_values.union(item_values) + label_values.update(item_values) return label_values @@ -317,11 +359,11 @@ def select_nodes( node_selector = NodeSelector(nodes, config) # TODO: Fix this if config.graph_selectors: - select_ids = node_selector.select_node_precursors() - subset_ids = subset_ids.union(set(select_ids)) + select_ids = node_selector.select_by_graph_operator() + subset_ids.update(set(select_ids)) else: select_ids = node_selector.select_nodes_ids_by_intersection() - subset_ids = subset_ids.union(set(select_ids)) + subset_ids.update(set(select_ids)) if select: nodes = {id_: nodes[id_] for id_ in subset_ids} @@ -332,7 +374,7 @@ def select_nodes( for statement in exclude: config = SelectorConfig(project_dir, statement) node_selector = NodeSelector(nodes, config) - exclude_ids = exclude_ids.union(set(node_selector.select_nodes_ids_by_intersection())) + exclude_ids.update(set(node_selector.select_nodes_ids_by_intersection())) subset_ids = set(nodes_ids) - set(exclude_ids) return {id_: nodes[id_] for id_ in subset_ids} diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index 48fcf31896..c802b2c383 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -249,17 +249,6 @@ def test_select_nodes_by_exclude_union_config_test_tags(): assert selected == expected -def test_select_nodes_by_child_and_precursors(): - selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+child"]) - expected = [ - another_grandparent_node.unique_id, - child_node.unique_id, - grandparent_node.unique_id, - parent_node.unique_id, - ] - assert sorted(selected.keys()) == expected - - def test_select_nodes_by_path_dir(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen3/models"]) expected = { @@ -271,6 +260,23 @@ def test_select_nodes_by_path_dir(): assert selected == expected +def test_select_nodes_by_path_file(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen2/models/parent.sql"]) + expected = [parent_node.unique_id] + assert list(selected.keys()) == expected + + +def test_select_nodes_by_child_and_precursors(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+child"]) + expected = [ + another_grandparent_node.unique_id, + child_node.unique_id, + grandparent_node.unique_id, + parent_node.unique_id, + ] + assert sorted(selected.keys()) == expected + + def test_select_nodes_by_child_and_precursors_exclude_tags(): selected = select_nodes( project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+child"], exclude=["tag:has_child"] @@ -279,12 +285,6 @@ def test_select_nodes_by_child_and_precursors_exclude_tags(): assert sorted(selected.keys()) == expected -def test_select_nodes_by_path_file(): - selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["path:gen2/models/parent.sql"]) - expected = [parent_node.unique_id] - assert list(selected.keys()) == expected - - def test_select_node_by_child_and_precursors_partial_tree(): selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["+parent"]) expected = [another_grandparent_node.unique_id, grandparent_node.unique_id, parent_node.unique_id] @@ -329,7 +329,28 @@ def test_select_node_by_child_and_precursors_no_node(): assert list(selected.keys()) == expected -# TODO: precursors depth -# TODO: successors logic +def test_select_node_by_descendants(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["grandparent+"]) + expected = [ + "model.dbt-proj.child", + "model.dbt-proj.grandparent", + "model.dbt-proj.parent", + "model.dbt-proj.sibling1", + "model.dbt-proj.sibling2", + ] + assert sorted(selected.keys()) == expected + + +def test_select_node_by_descendants_depth_first_degree(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["grandparent+1"]) + expected = [ + "model.dbt-proj.grandparent", + "model.dbt-proj.parent", + ] + assert sorted(selected.keys()) == expected + + # TODO: intersection of graph selector # TODO: union of graph selector +# TODO: intersection of select - one of which with graph selector +# TODO: union of select - one of which with graph selector From c8c6d22a27568a0b6b8c397b9280140530cfe0cf Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 30 Nov 2023 11:49:58 +0000 Subject: [PATCH 06/13] Refactor --- cosmos/dbt/selector.py | 46 ++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 087e45888f..ee73443ccd 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -56,7 +56,7 @@ def parse(text: str) -> GraphSelector | None: return GraphSelector(node_name, precursors, descendants) return None - def select_node_precursors(self, nodes: dict[str, DbtNode], root_id: str, selected_nodes: set[DbtNode]): + def select_node_precursors(self, nodes: dict[str, DbtNode], root_id: str, selected_nodes: set[str]) -> None: """ Parse original nodes and add the precursor nodes related to this config to the selected_nodes set. @@ -78,9 +78,7 @@ def select_node_precursors(self, nodes: dict[str, DbtNode], root_id: str, select previous_generation = new_generation depth -= 1 - def select_node_descendants( - self, nodes: dict[str, DbtNode], root_id: str, selected_nodes: set[DbtNode] - ) -> set[DbtNode]: + def select_node_descendants(self, nodes: dict[str, DbtNode], root_id: str, selected_nodes: set[str]) -> None: """ Parse original nodes and add the descendant nodes related to this config to the selected_nodes set. @@ -90,6 +88,9 @@ def select_node_descendants( """ if self.descendants: children_by_node = defaultdict(set) + # Index nodes by parent id + # We could optimize by doing this only once for the dbt project and giving it + # as a parameter to the GraphSelector for node_id, node in nodes.items(): for parent_id in node.depends_on: children_by_node[parent_id].add(node_id) @@ -101,11 +102,30 @@ def select_node_descendants( new_generation: set[str] = set() for node_id in previous_generation: if node_id not in processed_nodes: - new_generation.update(set(children_by_node[node_id])) + new_generation.update(children_by_node[node_id]) processed_nodes.add(node_id) selected_nodes.update(new_generation) previous_generation = new_generation depth -= 1 + + def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: + selected_nodes: set[str] = set() + + # Index nodes by name, we can improve performance by doing this once + # for multiple GraphSelectors + node_by_name = {} + for node_id, node in nodes.items(): + node_by_name[node.name] = node_id + + if self.node_name in node_by_name: + root_id = node_by_name[self.node_name] + else: + logger.warn(f"Selector {self.node_name} not found.") + return selected_nodes + + selected_nodes.add(root_id) + self.select_node_precursors(nodes, root_id, selected_nodes) + self.select_node_descendants(nodes, root_id, selected_nodes) return selected_nodes @@ -281,22 +301,8 @@ def select_by_graph_operator(self) -> set[str]: """ selected_nodes = set() - # Index nodes by name - node_by_name = {} - for node_id, node in self.nodes.items(): - node_by_name[node.name] = node_id - - # TODO: refactor for graph_selector in self.config.graph_selectors: - if graph_selector.node_name in node_by_name: - root_id = node_by_name[graph_selector.node_name] - else: - logger.warn(f"Selector {graph_selector.node_name} not found.") - break - - selected_nodes.add(root_id) - graph_selector.select_node_precursors(self.nodes, root_id, selected_nodes) - graph_selector.select_node_descendants(self.nodes, root_id, selected_nodes) + selected_nodes.update(graph_selector.filter_nodes(self.nodes)) return selected_nodes From 12186cb5f4aebce388929b76d7cac93dfdcb5c09 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 30 Nov 2023 11:58:21 +0000 Subject: [PATCH 07/13] Add support for graph selection union --- tests/dbt/test_selector.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index c802b2c383..aac97fecf7 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -350,7 +350,18 @@ def test_select_node_by_descendants_depth_first_degree(): assert sorted(selected.keys()) == expected +def test_select_node_by_descendants_union(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["grandparent+1", "parent+1"]) + expected = [ + "model.dbt-proj.child", + "model.dbt-proj.grandparent", + "model.dbt-proj.parent", + "model.dbt-proj.sibling1", + "model.dbt-proj.sibling2", + ] + assert sorted(selected.keys()) == expected + + # TODO: intersection of graph selector -# TODO: union of graph selector # TODO: intersection of select - one of which with graph selector # TODO: union of select - one of which with graph selector From 7a0b2b2feee20c18faed50b1b9f6f6fba742e57a Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 30 Nov 2023 12:11:28 +0000 Subject: [PATCH 08/13] Support intersection between graph selectors --- cosmos/dbt/selector.py | 9 ++++----- tests/dbt/test_selector.py | 11 ++++++++++- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index ee73443ccd..1e2f5386fd 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -299,12 +299,11 @@ def select_by_graph_operator(self) -> set[str]: https://docs.getdbt.com/reference/node-selection/syntax https://docs.getdbt.com/reference/node-selection/yaml-selectors """ - selected_nodes = set() + selected_nodes_by_selector: list[set[str]] = [] for graph_selector in self.config.graph_selectors: - selected_nodes.update(graph_selector.filter_nodes(self.nodes)) - - return selected_nodes + selected_nodes_by_selector.append(graph_selector.filter_nodes(self.nodes)) + return set.intersection(*selected_nodes_by_selector) def retrieve_by_label(statement_list: list[str], label: str) -> set[str]: @@ -363,7 +362,7 @@ def select_nodes( for statement in select: config = SelectorConfig(project_dir, statement) node_selector = NodeSelector(nodes, config) - # TODO: Fix this + if config.graph_selectors: select_ids = node_selector.select_by_graph_operator() subset_ids.update(set(select_ids)) diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index aac97fecf7..d71eb958fc 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -362,6 +362,15 @@ def test_select_node_by_descendants_union(): assert sorted(selected.keys()) == expected -# TODO: intersection of graph selector +def test_select_node_by_descendants_intersection(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["grandparent+1,parent+1"]) + expected = [ + "model.dbt-proj.parent", + ] + assert sorted(selected.keys()) == expected + + # TODO: intersection of select - one of which with graph selector # TODO: union of select - one of which with graph selector +# TODO: exclude graph +# TODO: exclude graph with include non-graph From ce3bd7c889921ad15ce02cf1d99219dd74aefae2 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 30 Nov 2023 12:33:27 +0000 Subject: [PATCH 09/13] Support intersection of tag and graph selector --- cosmos/dbt/selector.py | 40 +++++++++++++++++++++----------------- tests/dbt/test_selector.py | 8 ++++++++ 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 1e2f5386fd..905ae27474 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -156,7 +156,7 @@ def __init__(self, project_dir: Path | None, statement: str): @property def is_empty(self) -> bool: - return not (self.paths or self.tags or self.config or self.other) + return not (self.paths or self.tags or self.config or self.graph_selectors or self.other) def load_from_statement(self, statement: str) -> None: """ @@ -187,15 +187,16 @@ def load_from_statement(self, statement: str) -> None: if key in SUPPORTED_CONFIG: self.config[key] = value else: - graph_selector = GraphSelector.parse(item) - if graph_selector is not None: - self.graph_selectors.append(graph_selector) - else: - self.other.append(item) - logger.warning("Unsupported select statement: %s", item) + if item: + graph_selector = GraphSelector.parse(item) + if graph_selector is not None: + self.graph_selectors.append(graph_selector) + else: + self.other.append(item) + logger.warning("Unsupported select statement: %s", item) def __repr__(self) -> str: - return f"SelectorConfig(paths={self.paths}, tags={self.tags}, config={self.config}, other={self.other})" + return f"SelectorConfig(paths={self.paths}, tags={self.tags}, config={self.config}, other={self.other}, graph_selectors={self.graph_selectors})" class NodeSelector: @@ -209,7 +210,9 @@ class NodeSelector: def __init__(self, nodes: dict[str, DbtNode], config: SelectorConfig) -> None: self.nodes = nodes self.config = config + self.selected_nodes: set[str] = set() + @property def select_nodes_ids_by_intersection(self) -> set[str]: """ Return a list of node ids which matches the configuration defined in config. @@ -221,14 +224,19 @@ def select_nodes_ids_by_intersection(self) -> set[str]: if self.config.is_empty: return set(self.nodes.keys()) - self.selected_nodes: set[str] = set() + selected_nodes: set[str] = set() self.visited_nodes: set[str] = set() for node_id, node in self.nodes.items(): if self._should_include_node(node_id, node): - self.selected_nodes.add(node_id) + selected_nodes.add(node_id) + + if self.config.graph_selectors: + nodes_by_graph_selector = self.select_by_graph_operator() + selected_nodes = selected_nodes.intersection(nodes_by_graph_selector) - return self.selected_nodes + self.selected_nodes = selected_nodes + return selected_nodes def _should_include_node(self, node_id: str, node: DbtNode) -> bool: "Checks if a single node should be included. Only runs once per node with caching." @@ -363,12 +371,8 @@ def select_nodes( config = SelectorConfig(project_dir, statement) node_selector = NodeSelector(nodes, config) - if config.graph_selectors: - select_ids = node_selector.select_by_graph_operator() - subset_ids.update(set(select_ids)) - else: - select_ids = node_selector.select_nodes_ids_by_intersection() - subset_ids.update(set(select_ids)) + select_ids = node_selector.select_nodes_ids_by_intersection + subset_ids.update(set(select_ids)) if select: nodes = {id_: nodes[id_] for id_ in subset_ids} @@ -379,7 +383,7 @@ def select_nodes( for statement in exclude: config = SelectorConfig(project_dir, statement) node_selector = NodeSelector(nodes, config) - exclude_ids.update(set(node_selector.select_nodes_ids_by_intersection())) + exclude_ids.update(set(node_selector.select_nodes_ids_by_intersection)) subset_ids = set(nodes_ids) - set(exclude_ids) return {id_: nodes[id_] for id_ in subset_ids} diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index d71eb958fc..355bba4813 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -370,6 +370,14 @@ def test_select_node_by_descendants_intersection(): assert sorted(selected.keys()) == expected +def test_select_node_by_descendants_intersection_with_tag(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["parent+1,tag:has_child"]) + expected = [ + "model.dbt-proj.parent", + ] + assert sorted(selected.keys()) == expected + + # TODO: intersection of select - one of which with graph selector # TODO: union of select - one of which with graph selector # TODO: exclude graph From 870935f250edf1286ebccd9a39e905cc07cff0df Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 30 Nov 2023 12:42:16 +0000 Subject: [PATCH 10/13] Support unions and intersections between graph-selectors and non-graph selectors --- tests/dbt/test_selector.py | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/tests/dbt/test_selector.py b/tests/dbt/test_selector.py index 355bba4813..1cf9871248 100644 --- a/tests/dbt/test_selector.py +++ b/tests/dbt/test_selector.py @@ -378,7 +378,31 @@ def test_select_node_by_descendants_intersection_with_tag(): assert sorted(selected.keys()) == expected -# TODO: intersection of select - one of which with graph selector -# TODO: union of select - one of which with graph selector -# TODO: exclude graph -# TODO: exclude graph with include non-graph +def test_select_node_by_descendants_and_tag_union(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, select=["child", "tag:has_child"]) + expected = [ + "model.dbt-proj.child", + "model.dbt-proj.grandparent", + "model.dbt-proj.parent", + ] + assert sorted(selected.keys()) == expected + + +def test_exclude_by_graph_selector(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, exclude=["+parent"]) + expected = [ + "model.dbt-proj.child", + "model.dbt-proj.orphaned", + "model.dbt-proj.sibling1", + "model.dbt-proj.sibling2", + ] + assert sorted(selected.keys()) == expected + + +def test_exclude_by_union_graph_selector_and_tag(): + selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=sample_nodes, exclude=["+parent", "tag:deprecated"]) + expected = [ + "model.dbt-proj.child", + "model.dbt-proj.orphaned", + ] + assert sorted(selected.keys()) == expected From 83706b17d8184b0470ed74b1f1b9a4118ada0d06 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 30 Nov 2023 13:07:02 +0000 Subject: [PATCH 11/13] Improve docstrings --- cosmos/dbt/selector.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 905ae27474..809a991dcf 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -26,12 +26,31 @@ @dataclass class GraphSelector: + """ + Implements dbt graph operator selectors: + model_a + +model_b + model_c+ + +model_d+ + 2+model_e + model_f+3 + + https://docs.getdbt.com/reference/node-selection/graph-operators + """ + node_name: str precursors: str | None descendants: str | None @property def precursors_depth(self) -> int: + """ + Calculates the depth/degrees/generations of precursors (parents). + Return: + -1: if it should return all the generations of precursors + 0: if it shouldn't return any precursors + >0: upperbound number of parent generations + """ if not self.precursors: return 0 if self.precursors == "+": @@ -41,6 +60,13 @@ def precursors_depth(self) -> int: @property def descendants_depth(self) -> int: + """ + Calculates the depth/degrees/generations of descendants (children). + Return: + -1: if it should return all the generations of children + 0: if it shouldn't return any children + >0: upperbound of children generations + """ if not self.descendants: return 0 if self.descendants == "+": @@ -50,6 +76,10 @@ def descendants_depth(self) -> int: @staticmethod def parse(text: str) -> GraphSelector | None: + """ + Parse a string and identify if there are graph selectors, including the desired node name, descendants and + precursors. Return a GraphSelector instance if the pattern matches. + """ regex_match = re.search(GRAPH_SELECTOR_REGEX, text) if regex_match: precursors, node_name, descendants = regex_match.groups() @@ -109,6 +139,13 @@ def select_node_descendants(self, nodes: dict[str, DbtNode], root_id: str, selec depth -= 1 def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: + """ + Given a dictionary with the original dbt project nodes, applies the current graph selector to + identify the subset of nodes that matches the selection criteria. + + :param nodes: dbt project nodes + :return: set of node ids that matches current graph selector + """ selected_nodes: set[str] = set() # Index nodes by name, we can improve performance by doing this once From c23944dbd8d02d8e143d5738fd665a19040ea2ab Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 30 Nov 2023 13:18:09 +0000 Subject: [PATCH 12/13] Update select/exclude docs --- docs/configuration/selecting-excluding.rst | 35 +++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/docs/configuration/selecting-excluding.rst b/docs/configuration/selecting-excluding.rst index fadea1485f..dfa4a96c59 100644 --- a/docs/configuration/selecting-excluding.rst +++ b/docs/configuration/selecting-excluding.rst @@ -10,7 +10,9 @@ The ``select`` and ``exclude`` parameters are lists, with values like the follow - ``tag:my_tag``: include/exclude models with the tag ``my_tag`` - ``config.materialized:table``: include/exclude models with the config ``materialized: table`` - ``path:analytics/tables``: include/exclude models in the ``analytics/tables`` directory - +- ``+node_name+1`` (graph operators): include/exclude the node with name ``node_name``, all its parents, and its first generation of children (`dbt graph selector docs `_) +- ``tag:my_tag,+node_name`` (intersection): include/exclude ``node_name`` and its parents if they have the tag ``my_tag`` (`dbt set operator docs `_) +- ``['tag:first_tag', 'tag:second_tag']`` (union): include/exclude nodes that have either ``tag:first_tag`` or ``tag:second_tag`` .. note:: @@ -51,3 +53,34 @@ Examples: select=["path:analytics/tables"], ) ) + + +.. code-block:: python + + from cosmos import DbtDag, RenderConfig + + jaffle_shop = DbtDag( + render_config=RenderConfig( + select=["tag:include_tag1", "tag:include_tag2"], # union + ) + ) + +.. code-block:: python + + from cosmos import DbtDag, RenderConfig + + jaffle_shop = DbtDag( + render_config=RenderConfig( + select=["tag:include_tag1,tag:include_tag2"], # intersection + ) + ) + +.. code-block:: python + + from cosmos import DbtDag, RenderConfig + + jaffle_shop = DbtDag( + render_config=RenderConfig( + exclude=["node_name+"], # node_name and its children + ) + ) From 8ad0648478a9934304a5616e0728c72a8b958d68 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 5 Dec 2023 11:19:00 +0000 Subject: [PATCH 13/13] Update cosmos/dbt/selector.py Co-authored-by: Harel Shein --- cosmos/dbt/selector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 809a991dcf..c7eb893075 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -114,7 +114,7 @@ def select_node_descendants(self, nodes: dict[str, DbtNode], root_id: str, selec :param nodes: Original dbt nodes list :param root_id: Unique identifier of self.node_name - :param selected_nodes: Set where precursor nodes will be added to. + :param selected_nodes: Set where descendant nodes will be added to. """ if self.descendants: children_by_node = defaultdict(set)