Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

import copy
import inspect
from typing import Any, Callable

Expand Down Expand Up @@ -118,6 +119,10 @@ def __init__(
# If we are using the old interface, we should migrate it to the new interface
# This is safe to do now since we have validated which config interface we're using
if project_config.dbt_project_path:
# We copy the configuration so the change does not affect other DAGs or TaskGroups
# that may reuse the same original configuration
render_config = copy.deepcopy(render_config)
execution_config = copy.deepcopy(execution_config)
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path

Expand Down
26 changes: 22 additions & 4 deletions dev/dags/basic_cosmos_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
An example DAG that uses Cosmos to render a dbt project as a TaskGroup.
"""
import os

from datetime import datetime
from pathlib import Path

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, RenderConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
Expand All @@ -23,6 +24,8 @@
),
)

shared_execution_config = ExecutionConfig()


@dag(
schedule_interval="@daily",
Expand All @@ -35,19 +38,34 @@ def basic_cosmos_task_group() -> None:
"""
pre_dbt = EmptyOperator(task_id="pre_dbt")

jaffle_shop = DbtTaskGroup(
group_id="test_123",
customers = DbtTaskGroup(
group_id="customers",
project_config=ProjectConfig(
(DBT_ROOT_PATH / "jaffle_shop").as_posix(),
),
render_config=RenderConfig(select=["path:seeds/raw_customers.csv"]),
execution_config=shared_execution_config,
operator_args={"install_deps": True},
profile_config=profile_config,
default_args={"retries": 2},
)

orders = DbtTaskGroup(
group_id="orders",
project_config=ProjectConfig(
(DBT_ROOT_PATH / "jaffle_shop").as_posix(),
),
render_config=RenderConfig(select=["path:seeds/raw_orders.csv"]),
execution_config=shared_execution_config,
operator_args={"install_deps": True},
profile_config=profile_config,
default_args={"retries": 2},
)

post_dbt = EmptyOperator(task_id="post_dbt")

pre_dbt >> jaffle_shop >> post_dbt
pre_dbt >> customers >> post_dbt
pre_dbt >> orders >> post_dbt


basic_cosmos_task_group()