Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,14 @@
logger = get_logger(__name__)


def _normalize_path(path: str) -> str:
def _normalize_path(path: str | None) -> str:
"""
Converts a potentially Windows path string into a Posix-friendly path.
"""
return Path(path.replace("\\", "/")).as_posix()
if path is None:
return ""
else:
return Path(path.replace("\\", "/")).as_posix()


class CosmosLoadDbtException(Exception):
Expand All @@ -88,7 +91,8 @@ class DbtNode:
unique_id: str
resource_type: DbtResourceType
depends_on: list[str]
file_path: Path
path_base: Path
original_file_path: Path
package_name: str | None = None
tags: list[str] = field(default_factory=lambda: [])
config: dict[str, Any] = field(default_factory=lambda: {})
Expand All @@ -98,6 +102,11 @@ class DbtNode:
downstream: list[str] = field(default_factory=lambda: [])
fqn: list[str] | None = None

@property
def file_path(self) -> Path:
"""Combined path to the node's file (path_base / original_file_path)."""
return self.path_base / self.original_file_path

Comment thread
tatiana marked this conversation as resolved.
Comment thread
tatiana marked this conversation as resolved.
@property
def meta(self) -> dict[str, Any]:
"""
Expand Down Expand Up @@ -171,14 +180,15 @@ def owner(self) -> str:
@property
def context_dict(self) -> dict[str, Any]:
"""
Returns a dictionary containing all the attributes of the DbtNode object,
ensuring that the output is JSON serializable so it can be stored in Airflow's db
Returns a JSON-serializable dictionary containing a curated subset of
DbtNode attributes, suitable for storing in Airflow's database.
"""
return {
"unique_id": self.unique_id,
"resource_type": self.resource_type.value, # convert enum to value
"depends_on": self.depends_on,
"file_path": str(self.file_path), # convert path to string
"original_file_path": str(self.original_file_path), # convert original path to string
"tags": self.tags,
"config": self.config,
"has_test": self.has_test,
Expand Down Expand Up @@ -306,15 +316,17 @@ def run_command(

def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str, DbtNode]:
"""Parses the output of `dbt ls` into a dictionary of `DbtNode` instances."""
if project_path is None:
raise CosmosLoadDbtException("project_path is required to parse dbt ls output")
nodes = {}
for line in ls_stdout.split("\n"):
try:
node_dict = json.loads(line.strip())
except json.decoder.JSONDecodeError:
logger.debug("Skipped dbt ls line: %s", line)
else:
base_path = (
project_path.parent / node_dict["package_name"] if node_dict.get("package_name") else project_path # type: ignore
base_path: Path = (
project_path.parent / node_dict["package_name"] if node_dict.get("package_name") else project_path
)
Comment thread
tatiana marked this conversation as resolved.
Comment thread
tatiana marked this conversation as resolved.

# dbt-core defined the node path via "original_file_path", dbt fusion identifies it via "path"
Expand All @@ -336,7 +348,8 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str,
package_name=node_dict.get("package_name"),
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=base_path / node_file_path, # type: ignore[arg-type]
path_base=base_path,
original_file_path=Path(_normalize_path(node_file_path)),
Comment thread
tatiana marked this conversation as resolved.
tags=node_dict.get("tags") or [],
config=node_dict.get("config") or {},
has_freshness=(
Expand Down Expand Up @@ -376,17 +389,18 @@ def _build_dbt_node_from_manifest_resource(
package_name = node_dict.get("package_name")
is_root_project_node = manifest_project_name is None or (package_name == manifest_project_name)
if package_name and not is_root_project_node:
resolved_path = project_path / packages_subpath / package_name / _normalize_path(original_file_path)
path_base = project_path / packages_subpath / package_name
else:
resolved_path = project_path / _normalize_path(original_file_path)
path_base = project_path

resource_type = DbtResourceType(node_dict["resource_type"])
return DbtNode(
unique_id=unique_id,
package_name=package_name,
resource_type=resource_type,
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=resolved_path,
path_base=path_base,
original_file_path=Path(_normalize_path(original_file_path)),
tags=node_dict.get("tags") or [],
config=node_dict.get("config") or {},
has_freshness=(
Expand Down Expand Up @@ -978,11 +992,8 @@ def load_via_custom_parser(self) -> None:
unique_id=f"{model.type.value}.{self.project.project_name}.{model_name}",
resource_type=DbtResourceType(model.type.value),
depends_on=list(model.config.upstream_models),
file_path=Path(
model.path.as_posix().replace(
self.render_config.project_path.as_posix(), self.execution_config.project_path.as_posix()
)
),
path_base=self.execution_config.project_path,
original_file_path=model.path.relative_to(self.render_config.project_path),
tags=tags or [],
config=config,
)
Expand Down
Loading
Loading