Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 94 additions & 77 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,48 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str,
return nodes


def _build_dbt_node_from_manifest_resource(
unique_id: str,
node_dict: dict[str, Any],
project_path: Path,
packages_subpath: str,
manifest_project_name: str | None,
) -> DbtNode | None:
"""
Build a DbtNode from a manifest resource entry, or None if the node should be skipped
(e.g. external nodes with no file path).
"""
original_file_path = node_dict.get("original_file_path")
if not original_file_path:
logger.debug(
"Skipping node `%s` because it has no file path (likely an external reference from dbt-loom or similar)",
unique_id,
)
return None

package_name = node_dict.get("package_name")
is_root_project_node = manifest_project_name is None or (package_name == manifest_project_name)
if package_name and not is_root_project_node:
resolved_path = project_path / packages_subpath / package_name / _normalize_path(original_file_path)
else:
resolved_path = project_path / _normalize_path(original_file_path)

resource_type = DbtResourceType(node_dict["resource_type"])
return DbtNode(
unique_id=unique_id,
package_name=package_name,
resource_type=resource_type,
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=resolved_path,
tags=node_dict.get("tags") or [],
config=node_dict.get("config") or {},
has_freshness=(
is_freshness_effective(node_dict.get("freshness")) if resource_type == DbtResourceType.SOURCE else False
),
fqn=node_dict.get("fqn"),
)


class DbtGraph:
"""
A dbt project graph (represented by `nodes` and `filtered_nodes`).
Expand Down Expand Up @@ -1112,10 +1154,56 @@ def load_parsed_selectors(self, selector_definitions: dict[str, Any]) -> YamlSel

return self.parse_yaml_selectors(selector_definitions)

# TODO: Refactor this method to remove the noqa: C901 in a separate PR.
def load_from_dbt_manifest(self) -> None: # noqa: C901
def _load_nodes_from_manifest_data(self, manifest: dict[str, Any], project_path: Path) -> dict[str, DbtNode]:
"""Build a nodes dict from manifest resources (nodes, sources, exposures)."""
resources = {**manifest.get("nodes", {}), **manifest.get("sources", {}), **manifest.get("exposures", {})}
packages_subpath = get_dbt_packages_subpath(project_path)
manifest_metadata = manifest.get("metadata")
manifest_project_name = manifest_metadata.get("project_name") if isinstance(manifest_metadata, dict) else None
nodes: dict[str, DbtNode] = {}
for unique_id, node_dict in resources.items():
node = _build_dbt_node_from_manifest_resource(
unique_id, node_dict, project_path, packages_subpath, manifest_project_name
)
if node is not None:
nodes[node.unique_id] = node
return nodes

def _apply_manifest_node_selection(self, nodes: dict[str, DbtNode], manifest: dict[str, Any]) -> None:
"""Set self.nodes and self.filtered_nodes using selector or render_config select/exclude."""
project_dir = self.execution_config.project_path
if self.render_config.selector:
selector_definitions = manifest.get("selectors", {})
if not selector_definitions:
if TYPE_CHECKING:
assert self.project.manifest_path is not None # pragma: no cover
Comment thread
pankajkoti marked this conversation as resolved.
raise CosmosLoadDbtException(f"Selectors not found in manifest file `{self.project.manifest_path}`")

yaml_selectors = self.load_parsed_selectors(selector_definitions)
selections = yaml_selectors.get_parsed(self.render_config.selector)
if not selections:
raise CosmosLoadDbtException(
f"Selector `{self.render_config.selector}` not found in parsed YAML selectors `{selector_definitions}`"
)
self.nodes = nodes
self.filtered_nodes = select_nodes(
project_dir=project_dir,
nodes=nodes,
select=selections["select"],
exclude=selections["exclude"],
)
else:
self.nodes = nodes
self.filtered_nodes = select_nodes(
project_dir=project_dir,
nodes=nodes,
select=self.render_config.select,
exclude=self.render_config.exclude,
)

def load_from_dbt_manifest(self) -> None:
"""
This approach accurately loads `dbt` projects using the `manifest.yml` file.
This approach accurately loads `dbt` projects using the `manifest.json` dbt manifest artifact.

However, since the Manifest does not represent filters, it relies on the Custom Cosmos implementation
to filter out the nodes relevant to the user (based on self.exclude and self.select).

Copilot AI Feb 24, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring still says filtering is based on self.exclude/self.select, but those attributes don’t exist on DbtGraph (selection is driven by self.render_config.select/exclude or self.render_config.selector). Update the wording to reflect the actual configuration fields to avoid misleading readers.

Suggested change
to filter out the nodes relevant to the user (based on self.exclude and self.select).
to filter out the nodes relevant to the user, based on the render configuration
(e.g. render_config.selector, render_config.select, and render_config.exclude).

Copilot uses AI. Check for mistakes.
Expand All @@ -1133,86 +1221,15 @@ def load_from_dbt_manifest(self) -> None: # noqa: C901
if not self.execution_config.project_path:
raise CosmosLoadDbtException("Unable to load manifest without ExecutionConfig.dbt_project_path")

nodes = {}

if TYPE_CHECKING:
assert self.project.manifest_path is not None # pragma: no cover

with self.project.manifest_path.open() as fp:
manifest = json.load(fp) or {}

resources = {**manifest.get("nodes", {}), **manifest.get("sources", {}), **manifest.get("exposures", {})}
project_path = self.execution_config.project_path
packages_subpath = get_dbt_packages_subpath(project_path)
manifest_metadata = manifest.get("metadata")
manifest_project_name = (
manifest_metadata.get("project_name") if isinstance(manifest_metadata, dict) else None
)
for unique_id, node_dict in resources.items():
# External nodes (e.g., from dbt-loom) may not have a file path - skip them
# Check for both None and empty string since dbt-loom may set either
original_file_path = node_dict.get("original_file_path")
if not original_file_path:
logger.debug(
"Skipping node `%s` because it has no file path (likely an external reference from dbt-loom or similar)",
unique_id,
)
continue

package_name = node_dict.get("package_name")
is_root_project_node = manifest_project_name is None or (package_name == manifest_project_name)
if package_name and not is_root_project_node:
resolved_path = project_path / packages_subpath / package_name / _normalize_path(original_file_path)
else:
resolved_path = project_path / _normalize_path(original_file_path)
resource_type = DbtResourceType(node_dict["resource_type"])
node = DbtNode(
unique_id=unique_id,
package_name=package_name,
resource_type=resource_type,
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=resolved_path,
tags=node_dict.get("tags") or [],
config=node_dict.get("config") or {},
has_freshness=(
is_freshness_effective(node_dict.get("freshness"))
if resource_type == DbtResourceType.SOURCE
else False
),
fqn=node_dict.get("fqn"),
)

nodes[node.unique_id] = node

if self.render_config.selector:
selector_definitions = manifest.get("selectors", {})

if not selector_definitions:
raise CosmosLoadDbtException(f"Selectors not found in manifest file `{self.project.manifest_path}`")

yaml_selectors = self.load_parsed_selectors(selector_definitions)
selections = yaml_selectors.get_parsed(self.render_config.selector)

if not selections:
raise CosmosLoadDbtException(
f"Selector `{self.render_config.selector}` not found in parsed YAML selectors `{selector_definitions}`"
)

self.nodes = nodes
self.filtered_nodes = select_nodes(
project_dir=self.execution_config.project_path,
nodes=nodes,
select=selections["select"],
exclude=selections["exclude"],
)
else:
self.nodes = nodes
self.filtered_nodes = select_nodes(
project_dir=self.execution_config.project_path,
nodes=nodes,
select=self.render_config.select,
exclude=self.render_config.exclude,
)
project_path = self.execution_config.project_path
nodes = self._load_nodes_from_manifest_data(manifest, project_path)
self._apply_manifest_node_selection(nodes, manifest)

def update_node_dependency(self) -> None:
"""
Expand Down