Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class DbtResourceType(aenum.Enum): # type: ignore
SEED = "seed"
TEST = "test"
SOURCE = "source"
EXPOSURE = "exposure"

@classmethod
def _missing_value_(cls, value): # type: ignore
Expand Down
39 changes: 37 additions & 2 deletions cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
TAG_SELECTOR = "tag:"
CONFIG_SELECTOR = "config."
SOURCE_SELECTOR = "source:"
EXPOSURE_SELECTOR = "exposure:"
RESOURCE_TYPE_SELECTOR = "resource_type:"
PLUS_SELECTOR = "+"
AT_SELECTOR = "@"
Expand All @@ -45,6 +46,7 @@ class GraphSelector:
+config.materialized:view
resource_type:resource_name
source:source_name
exposure:exposure_name

https://docs.getdbt.com/reference/node-selection/graph-operators
"""
Expand Down Expand Up @@ -160,7 +162,7 @@ def select_node_descendants(self, nodes: dict[str, DbtNode], root_id: str, selec
previous_generation = new_generation
depth -= 1

def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]:
def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]: # noqa: C901
"""
Given a dictionary with the original dbt project nodes, applies the current graph selector to
identify the subset of nodes that matches the selection criteria.
Expand Down Expand Up @@ -192,6 +194,18 @@ def filter_nodes(self, nodes: dict[str, DbtNode]) -> set[str]:
}
)

elif EXPOSURE_SELECTOR in self.node_name:
exposure_selection = self.node_name[len(EXPOSURE_SELECTOR) :]

# match node.resource_type == EXPOSURE, node.resource_name == exposure_selection
root_nodes.update(
{
node_id
for node_id, node in nodes.items()
if node.resource_type == DbtResourceType.EXPOSURE and node.resource_name == exposure_selection
}
)

elif CONFIG_SELECTOR in self.node_name:
config_selection_key, config_selection_value = self.node_name[len(CONFIG_SELECTOR) :].split(":")
if config_selection_key not in SUPPORTED_CONFIG:
Expand Down Expand Up @@ -289,6 +303,7 @@ def __init__(self, project_dir: Path | None, statement: str):
self.other: list[str] = []
self.graph_selectors: list[GraphSelector] = []
self.sources: list[str] = []
self.exposures: list[str] = []
self.resource_types: list[str] = []
self.load_from_statement(statement)

Expand All @@ -301,6 +316,7 @@ def is_empty(self) -> bool:
or self.graph_selectors
or self.other
or self.sources
or self.exposures
or self.resource_types
)

Expand Down Expand Up @@ -339,6 +355,8 @@ def _handle_no_precursors_or_descendants(self, item: str, node_name: str) -> Non
self._parse_config_selector(item)
elif node_name.startswith(SOURCE_SELECTOR):
self._parse_source_selector(item)
elif node_name.startswith(EXPOSURE_SELECTOR):
self._parse_exposure_selector(item)
elif node_name.startswith(RESOURCE_TYPE_SELECTOR):
self._parse_resource_type_selector(item)
else:
Expand Down Expand Up @@ -380,8 +398,13 @@ def _parse_source_selector(self, item: str) -> None:
source_name = item[index:].strip()
self.sources.append(source_name)

def _parse_exposure_selector(self, item: str) -> None:
index = len(EXPOSURE_SELECTOR)
exposure_name = item[index:].strip()
self.exposures.append(exposure_name)

def __repr__(self) -> str:
return f"SelectorConfig(paths={self.paths}, tags={self.tags}, config={self.config}, sources={self.sources}, resource={self.resource_types}, other={self.other}, graph_selectors={self.graph_selectors})"
return f"SelectorConfig(paths={self.paths}, tags={self.tags}, config={self.config}, sources={self.sources}, resource={self.resource_types}, exposures={self.exposures}, other={self.other}, graph_selectors={self.graph_selectors})"


class NodeSelector:
Expand Down Expand Up @@ -477,6 +500,9 @@ def _should_include_node(self, node_id: str, node: DbtNode) -> bool:
if self.config.sources and not self._is_source_matching(node):
return False

if self.config.exposures and not self._is_exposure_matching(node):
return False

return True

def _is_resource_type_matching(self, node: DbtNode) -> bool:
Expand All @@ -493,6 +519,14 @@ def _is_source_matching(self, node: DbtNode) -> bool:
return False
return True

def _is_exposure_matching(self, node: DbtNode) -> bool:
"""Checks if the node's exposure is a subset of the config's exposure."""
if node.resource_type != DbtResourceType.EXPOSURE:
return False
if node.resource_name not in self.config.exposures:
return False
return True

def _is_tags_subset(self, node: DbtNode) -> bool:
"""Checks if the node's tags are a subset of the config's tags."""
if not (set(self.config.tags) <= set(node.tags)):
Expand Down Expand Up @@ -620,6 +654,7 @@ def validate_filters(exclude: list[str], select: list[str]) -> None:
or filter_parameter.startswith(TAG_SELECTOR)
or filter_parameter.startswith(RESOURCE_TYPE_SELECTOR)
or filter_parameter.startswith(SOURCE_SELECTOR)
or filter_parameter.startswith(EXPOSURE_SELECTOR)
or PLUS_SELECTOR in filter_parameter
or any([filter_parameter.startswith(CONFIG_SELECTOR + config + ":") for config in SUPPORTED_CONFIG])
):
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration/selecting-excluding.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ The ``select`` and ``exclude`` parameters are lists, with values like the follow
- ``source:my_source``: include/exclude nodes that have the source ``my_source`` and are of resource_type ``source``
- ``source:my_source+``: include/exclude nodes that have the source ``my_source`` and their children
- ``source:my_source.my_table``: include/exclude nodes that have the source ``my_source`` and the table ``my_table``
- ``exposure:my_exposure``: include/exclude nodes that have the exposure ``my_exposure`` and are of resource_type ``exposure``
- ``exposure:+my_exposure``: include/exclude nodes that have the exposure ``my_exposure`` and their parents

.. note::

Expand Down
70 changes: 70 additions & 0 deletions tests/dbt/test_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,32 @@ def test_select_nodes_by_source_name():
assert selected == expected


def test_select_nodes_by_exposure_name():
"""
Test selecting a single exposure node by exact name 'exposure:exposure_name'.
The code in _should_include_node requires node.resource_type == EXPOSURE
AND node.name == "exposure_name".
"""
local_nodes = dict(sample_nodes)
exposure_node = DbtNode(
unique_id=f"{DbtResourceType.EXPOSURE.value}.{SAMPLE_PROJ_PATH.stem}.exposure_name",
resource_type=DbtResourceType.EXPOSURE,
depends_on=[],
file_path=SAMPLE_PROJ_PATH / "exposures/my_exposure.yml",
tags=[],
config={},
)

local_nodes[exposure_node.unique_id] = exposure_node
selected = select_nodes(
project_dir=SAMPLE_PROJ_PATH,
nodes=local_nodes,
select=["exposure:exposure_name"],
)
expected = {exposure_node.unique_id: exposure_node}
assert selected == expected


def test_exclude_nodes_by_resource_type_seed():
"""
Test excluding any seed node via 'resource_type:seed'.
Expand Down Expand Up @@ -891,3 +917,47 @@ def test_source_selector():
source_node_match.unique_id: source_node_match,
}
assert selected == expected, f"Expected only {source_node_match.unique_id} to match"


def test_exposure_selector():
"""
Covers:
1) exposure_selection = self.node_name[len(EXPOSURE_SELECTOR):]
2) root_nodes.update(...) in that exposure logic
3) __repr__ for SelectorConfig
4) the line 'if node.resource_name not in self.config.exposures: return False'
"""
local_nodes = dict(sample_nodes)

exposure_node_match = DbtNode(
unique_id=f"{DbtResourceType.EXPOSURE.value}.{SAMPLE_PROJ_PATH.stem}.my_exposure",
resource_type=DbtResourceType.EXPOSURE,
depends_on=[],
file_path=SAMPLE_PROJ_PATH / "exposures/my_exposure.yml",
tags=[],
config={},
)
exposure_node_mismatch = DbtNode(
unique_id=f"{DbtResourceType.EXPOSURE.value}.{SAMPLE_PROJ_PATH.stem}.another_exposure",
resource_type=DbtResourceType.EXPOSURE,
depends_on=[],
file_path=SAMPLE_PROJ_PATH / "exposures/another_exposure.yml",
tags=[],
config={},
)
local_nodes[exposure_node_match.unique_id] = exposure_node_match
local_nodes[exposure_node_mismatch.unique_id] = exposure_node_mismatch

select_statement = ["exposure:my_exposure"]

config = SelectorConfig(SAMPLE_PROJ_PATH, select_statement[0])

config_repr = repr(config)
assert "my_exposure" in config_repr, "Expected 'my_exposure' to appear in the config repr"

selected = select_nodes(project_dir=SAMPLE_PROJ_PATH, nodes=local_nodes, select=select_statement)

expected = {
exposure_node_match.unique_id: exposure_node_match,
}
assert selected == expected, f"Expected only {exposure_node_match.unique_id} to match"