From 6091a3b012ae81ab0cc79560f94de9adc0cd2dfb Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 9 Nov 2023 11:50:31 +0000 Subject: [PATCH] Fix reusing config accross TaskGroups/DAGs If execution_config was reused, Cosmos 1.2.2 would raise: ``` astronomer-cosmos/dags/basic_cosmos_task_group.py Traceback (most recent call last): File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/venv-38/lib/python3.8/site-packages/airflow/models/dagbag.py", line 343, in parse loader.exec_module(new_module) File "", line 848, in exec_module File "", line 219, in _call_with_frames_removed File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/dags/basic_cosmos_task_group.py", line 74, in basic_cosmos_task_group() File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/venv-38/lib/python3.8/site-packages/airflow/models/dag.py", line 3817, in factory f(**f_kwargs) File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/dags/basic_cosmos_task_group.py", line 54, in basic_cosmos_task_group orders = DbtTaskGroup( File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/cosmos/airflow/task_group.py", line 26, in __init__ DbtToAirflowConverter.__init__(self, *args, **specific_kwargs(**kwargs)) File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/cosmos/converter.py", line 113, in __init__ raise CosmosValueError( cosmos.exceptions.CosmosValueError: ProjectConfig.dbt_project_path is mutually exclusive with RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path.If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None ``` --- cosmos/converter.py | 5 +++++ dev/dags/basic_cosmos_task_group.py | 26 ++++++++++++++++++++++---- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index dbc290271e..45d98a4cf1 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -3,6 +3,7 @@ from __future__ import annotations +import copy import inspect from typing import Any, Callable @@ -118,6 +119,10 @@ def __init__( # If we are using the old interface, we should migrate it to the new interface # This is safe to do now since we have validated which config interface we're using if project_config.dbt_project_path: + # We copy the configuration so the change does not affect other DAGs or TaskGroups + # that may reuse the same original configuration + render_config = copy.deepcopy(render_config) + execution_config = copy.deepcopy(execution_config) render_config.project_path = project_config.dbt_project_path execution_config.project_path = project_config.dbt_project_path diff --git a/dev/dags/basic_cosmos_task_group.py b/dev/dags/basic_cosmos_task_group.py index 50cb6ed09e..7319149531 100644 --- a/dev/dags/basic_cosmos_task_group.py +++ b/dev/dags/basic_cosmos_task_group.py @@ -2,13 +2,14 @@ An example DAG that uses Cosmos to render a dbt project as a TaskGroup. """ import os + from datetime import datetime from pathlib import Path from airflow.decorators import dag from airflow.operators.empty import EmptyOperator -from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig +from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, RenderConfig, ExecutionConfig from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" @@ -23,6 +24,8 @@ ), ) +shared_execution_config = ExecutionConfig() + @dag( schedule_interval="@daily", @@ -35,11 +38,25 @@ def basic_cosmos_task_group() -> None: """ pre_dbt = EmptyOperator(task_id="pre_dbt") - jaffle_shop = DbtTaskGroup( - group_id="test_123", + customers = DbtTaskGroup( + group_id="customers", + project_config=ProjectConfig( + (DBT_ROOT_PATH / "jaffle_shop").as_posix(), + ), + render_config=RenderConfig(select=["path:seeds/raw_customers.csv"]), + execution_config=shared_execution_config, + operator_args={"install_deps": True}, + profile_config=profile_config, + default_args={"retries": 2}, + ) + + orders = DbtTaskGroup( + group_id="orders", project_config=ProjectConfig( (DBT_ROOT_PATH / "jaffle_shop").as_posix(), ), + render_config=RenderConfig(select=["path:seeds/raw_orders.csv"]), + execution_config=shared_execution_config, operator_args={"install_deps": True}, profile_config=profile_config, default_args={"retries": 2}, @@ -47,7 +64,8 @@ def basic_cosmos_task_group() -> None: post_dbt = EmptyOperator(task_id="post_dbt") - pre_dbt >> jaffle_shop >> post_dbt + pre_dbt >> customers >> post_dbt + pre_dbt >> orders >> post_dbt basic_cosmos_task_group()