Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ repos:
rev: v6.0.0
hooks:
- id: check-added-large-files
exclude: ^dev/dags/dbt/cross_project/downstream/target/manifest.json$
- id: check-merge-conflict
- id: check-toml
- id: check-yaml
Expand Down
30 changes: 26 additions & 4 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,26 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str,
project_path.parent / node_dict["package_name"] if node_dict.get("package_name") else project_path # type: ignore
)

# dbt-core defined the node path via "original_file_path", dbt fusion identifies it via "path"
# External nodes (e.g., from dbt-loom) may not have a file path - skip them
# dbt-loom injects upstream models with resource_type="model" and empty file path
# Check for both None and empty string since dbt-loom may set either
node_file_path = node_dict.get("original_file_path") or node_dict.get("path")
Comment thread
tatiana marked this conversation as resolved.
resource_type = node_dict.get("resource_type")
if not node_file_path and resource_type == "model" and node_dict.get("unique_id"):
logger.debug(
"Skipping model `%s` because it has no file path (likely an external reference from dbt-loom or similar)",
node_dict.get("unique_id"),
)
continue

try:
node = DbtNode(
unique_id=node_dict["unique_id"],
package_name=node_dict.get("package_name"),
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
# dbt-core defined the node path via "original_file_path", dbt fusion identifies it via "path"
file_path=base_path / (node_dict["original_file_path"] or node_dict.get("path")),
file_path=base_path / node_file_path, # type: ignore[arg-type]
tags=node_dict.get("tags") or [],
config=node_dict.get("config") or {},
has_freshness=(
Expand All @@ -331,7 +343,7 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str,
else False
),
)
except KeyError:
except (KeyError, TypeError):
logger.info("Could not parse following the dbt ls line even though it was a valid JSON `%s`", line)
else:
nodes[node.unique_id] = node
Expand Down Expand Up @@ -941,12 +953,22 @@ def load_from_dbt_manifest(self) -> None:

resources = {**manifest.get("nodes", {}), **manifest.get("sources", {}), **manifest.get("exposures", {})}
for unique_id, node_dict in resources.items():
# External nodes (e.g., from dbt-loom) may not have a file path - skip them
# Check for both None and empty string since dbt-loom may set either
original_file_path = node_dict.get("original_file_path")
if not original_file_path:
logger.debug(
"Skipping node `%s` because it has no file path (likely an external reference from dbt-loom or similar)",
unique_id,
)
continue

node = DbtNode(
unique_id=unique_id,
package_name=node_dict.get("package_name"),
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=self.execution_config.project_path / _normalize_path(node_dict["original_file_path"]),
file_path=self.execution_config.project_path / _normalize_path(original_file_path),
tags=node_dict.get("tags") or [],
config=node_dict.get("config") or {},
has_freshness=(
Expand Down
133 changes: 133 additions & 0 deletions dev/dags/cross_project_dbt_ls_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""
Example DAG for cross project reference demonstration - using 'dbt ls' Load Mode for both upstream and downstream dbt Projects.

This example demonstrates how Cosmos works with dbt-loom for cross-project references.

Architecture:
upstream → downstream
├── stg_customers ├── fct_revenue
├── stg_orders ├── fct_customer_revenue
├── stg_order_items ├── dim_payment_methods
├── stg_products └── rpt_revenue_summary
├── int_orders_enriched
└── int_customer_orders

The downstream project uses dbt-loom to reference upstream models via:
ref('upstream', 'stg_customers')

Key Points:
1. Upstream project must generate manifest.json first (via dbt parse/compile/ls)
2. Downstream project must be able to query upstream tables (same DB, cross-DB, etc.)
3. Cosmos correctly handles dbt-loom's external node references (skips them)

Database Setup (this example):
- Upstream models: 'platform' schema
- Downstream models: 'finance' schema
"""

import os
from datetime import datetime
from pathlib import Path

from airflow import DAG

from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

# Airflow connection ID for PostgreSQL
POSTGRES_CONN_ID = "example_conn"

# Project paths
DBT_UPSTREAM_PROJECT_PATH = DBT_ROOT_PATH / "cross_project" / "upstream"
DBT_DOWNSTREAM_PROJECT_PATH = DBT_ROOT_PATH / "cross_project" / "downstream"


# [START cross_project_dbt_ls_dag]
# =============================================================================
# Combined DAG with Task Groups - Upstream runs first, then Downstream
# =============================================================================

with DAG(
dag_id="cross_project_dbt_ls_dag",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
default_args={"retries": 0},
tags=["dbt-loom", "dbt ls"],
doc_md=__doc__,
) as dag:

# -------------------------------------------------------------------------
# Upstream Task Group - Core Data Platform (upstream)
# -------------------------------------------------------------------------
# Contains foundational models (staging, intermediate) exposed as public
# models for the downstream project to reference via dbt-loom.

upstream_profile_config = ProfileConfig(
profile_name="upstream",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id=POSTGRES_CONN_ID,
profile_args={"schema": "platform", "threads": 4},
),
)

upstream_task_group = DbtTaskGroup(
group_id="upstream",
project_config=ProjectConfig(
dbt_project_path=DBT_UPSTREAM_PROJECT_PATH,
),
profile_config=upstream_profile_config,
render_config=RenderConfig(
dbt_deps=True,
),
operator_args={
"install_deps": True,
},
)

# -------------------------------------------------------------------------
# Downstream Task Group - Finance Domain Models (downstream)
# -------------------------------------------------------------------------
# Uses dbt-loom to reference public models from the upstream project.
# Cosmos skips external nodes (those without file paths) during parsing
# and only creates tasks for this project's own models.

downstream_profile_config = ProfileConfig(
profile_name="downstream",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id=POSTGRES_CONN_ID,
profile_args={"schema": "finance", "threads": 4},
),
)

# Environment variables for dbt-loom to find the upstream manifest
# dbt_loom_env_vars = {
# "PLATFORM_MANIFEST_PATH": str(DBT_UPSTREAM_PROJECT_PATH / "target" / "manifest.json"),
# }

downstream_task_group = DbtTaskGroup(
group_id="downstream",
project_config=ProjectConfig(
dbt_project_path=DBT_DOWNSTREAM_PROJECT_PATH,
),
profile_config=downstream_profile_config,
render_config=RenderConfig(
dbt_deps=True,
# For dbt loom environment variable configured upstream project's manifest
# env_vars=dbt_loom_env_vars,
),
operator_args={
"install_deps": True,
# For dbt loom environment variable configured upstream project's manifest
# "env": dbt_loom_env_vars,
},
)

# Chain: Upstream runs first, then Downstream
upstream_task_group >> downstream_task_group
# [END cross_project_dbt_ls_dag]
157 changes: 157 additions & 0 deletions dev/dags/cross_project_manifest_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
Example DAG for cross project reference demonstration - Using Manifest Load Mode for both upstream and downstream dbt Projects

This example demonstrates how Cosmos works with dbt-loom for cross-project references
using LoadMode.DBT_MANIFEST for faster DAG parsing (no dbt ls execution required).

Architecture:
upstream → downstream
├── stg_customers ├── fct_revenue
├── stg_orders ├── fct_customer_revenue
├── stg_order_items ├── dim_payment_methods
├── stg_products └── rpt_revenue_summary
├── int_orders_enriched
└── int_customer_orders

Prerequisites:
1. Generate manifest.json for both projects BEFORE deploying:
cd upstream && dbt compile
cd downstream && dbt compile

Or use CI/CD to generate and store manifests in S3/GCS.

2. For remote manifests (S3/GCS/Azure), ensure the connection is configured.

Key Benefits of DBT_MANIFEST mode:
- No dbt installation required on scheduler
- Fastest parsing method
"""

import os
from datetime import datetime
from pathlib import Path

from airflow import DAG

from cosmos import DbtTaskGroup, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import LoadMode
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

# Airflow connection ID for PostgreSQL
POSTGRES_CONN_ID = "example_conn"

# Project paths
DBT_UPSTREAM_PROJECT_PATH = DBT_ROOT_PATH / "cross_project" / "upstream"
DBT_DOWNSTREAM_PROJECT_PATH = DBT_ROOT_PATH / "cross_project" / "downstream"

# Manifest paths (local)
UPSTREAM_MANIFEST_PATH = DBT_UPSTREAM_PROJECT_PATH / "target" / "manifest.json"
DOWNSTREAM_MANIFEST_PATH = DBT_DOWNSTREAM_PROJECT_PATH / "target" / "manifest.json"

# =============================================================================
# Alternative: Remote Manifest Paths (S3/GCS/Azure) - Uncomment to use
# =============================================================================
# UPSTREAM_MANIFEST_PATH = "s3://your-bucket/dbt-manifests/upstream/manifest.json"
# DOWNSTREAM_MANIFEST_PATH = "s3://your-bucket/dbt-manifests/downstream/manifest.json"
# MANIFEST_CONN_ID = "aws_default" # or "google_cloud_default" for GCS

# [START cross_project_manifest_dag]
# =============================================================================
# Combined DAG with Task Groups - Using DBT_MANIFEST Load Mode
# =============================================================================

with DAG(
dag_id="cross_project_manifest_dag",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
default_args={"retries": 0},
tags=["dbt-loom", "manifest"],
doc_md=__doc__,
) as dag:

# -------------------------------------------------------------------------
# Upstream Task Group - Core Data Platform (upstream)
# -------------------------------------------------------------------------

upstream_profile_config = ProfileConfig(
profile_name="upstream",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id=POSTGRES_CONN_ID,
profile_args={"schema": "platform", "threads": 4},
),
)

upstream_task_group = DbtTaskGroup(
group_id="upstream",
project_config=ProjectConfig(
# Specify the manifest path for faster parsing
manifest_path=str(UPSTREAM_MANIFEST_PATH),
project_name="upstream",
# For remote manifests (S3/GCS/Azure), add:
# manifest_conn_id=MANIFEST_CONN_ID,
),
profile_config=upstream_profile_config,
execution_config=ExecutionConfig(
dbt_project_path=DBT_UPSTREAM_PROJECT_PATH, dbt_executable_path="/usr/local/bin/dbt"
),
render_config=RenderConfig(
# Use manifest-based parsing (no dbt ls required)
load_method=LoadMode.DBT_MANIFEST,
# Note: dbt_deps is not needed for manifest mode parsing
# but you may still want install_deps=True for task execution
),
operator_args={
"install_deps": True,
},
)

# -------------------------------------------------------------------------
# Downstream Task Group - Finance Domain Models
# -------------------------------------------------------------------------

downstream_profile_config = ProfileConfig(
profile_name="downstream",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id=POSTGRES_CONN_ID,
profile_args={"schema": "finance"},
),
)

# Environment variables for dbt-loom to find the upstream manifest
# dbt_loom_env_vars = {
# "PLATFORM_MANIFEST_PATH": str(DBT_UPSTREAM_PROJECT_PATH / "target" / "manifest.json"),
# }

downstream_task_group = DbtTaskGroup(
group_id="downstream_finance",
project_config=ProjectConfig(
# Specify the manifest path for faster parsing
manifest_path=str(DOWNSTREAM_MANIFEST_PATH),
project_name="downstream",
# For remote manifests (S3/GCS/Azure), add:
# manifest_conn_id=MANIFEST_CONN_ID,
# For dbt loom environment variable configured upstream project's manifest
# env_vars=dbt_loom_env_vars,
),
profile_config=downstream_profile_config,
execution_config=ExecutionConfig(
dbt_project_path=DBT_DOWNSTREAM_PROJECT_PATH, dbt_executable_path="/usr/local/bin/dbt"
),
render_config=RenderConfig(
# Use manifest-based parsing (no dbt ls required)
load_method=LoadMode.DBT_MANIFEST,
),
operator_args={
"install_deps": True,
},
)

# Chain: Upstream runs first, then Downstream
upstream_task_group >> downstream_task_group
# [END cross_project_manifest_dag]
14 changes: 14 additions & 0 deletions dev/dags/dbt/cross_project/downstream/dbt_loom.config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
manifests:
- name: upstream
type: file
config:
# Use environment variable for flexibility, with fallback to relative path
# path: '{{ env_var("PLATFORM_MANIFEST_PATH", "../upstream/target/manifest.json") }}'
# In production, set PLATFORM_MANIFEST_PATH to absolute path

# For GitHub Actions Integration Tests CI run, set the path to the manifest.json file
path: /home/runner/work/astronomer-cosmos/astronomer-cosmos/dev/dags/dbt/cross_project/upstream/target/manifest.json
Comment thread
pankajkoti marked this conversation as resolved.
# path: ../upstream/target/manifest.json


enable_telemetry: false
Loading