diff --git a/cosmos/converter.py b/cosmos/converter.py index b3f804807d..0be9f526c0 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -145,8 +145,7 @@ def validate_initial_user_config( + "If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None" ) - # Cosmos 2.0 will remove the ability to pass in operator_args with 'env' and 'vars' in place of ProjectConfig.env_vars and - # ProjectConfig.dbt_vars. + # Cosmos 2.0 will remove the ability to pass in operator_args with 'env' in place of ProjectConfig.env_vars. if "env" in operator_args: warn( "operator_args with 'env' is deprecated since Cosmos 1.3 and will be removed in Cosmos 2.0. Use ProjectConfig.env_vars instead.", @@ -254,7 +253,7 @@ def __init__( # do not affect other DAGs or TaskGroups that may reuse the same original configuration execution_config = copy.deepcopy(execution_config) if execution_config is not None else ExecutionConfig() render_config = copy.deepcopy(render_config) if render_config is not None else RenderConfig() - operator_args = copy.deepcopy(operator_args) if operator_args is not None else {} + operator_args = copy.copy(operator_args) if operator_args is not None else {} project_config.validate_project() validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) diff --git a/dev/dags/example_taskflow_operator_args.py b/dev/dags/example_taskflow_operator_args.py new file mode 100644 index 0000000000..22c48f975d --- /dev/null +++ b/dev/dags/example_taskflow_operator_args.py @@ -0,0 +1,48 @@ +import os +from datetime import datetime +from pathlib import Path + +from airflow import DAG +from airflow.decorators import task + +from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) + + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + + +@task(task_id="build_partial_dbt_env_vars_operator") +def build_partial_dbt_env(): + return {"ENV_VAR_NAME": "value", "ENV_VAR_NAME_2": False} + + +with DAG( + dag_id="example_taskflow_operator_args", + schedule="@daily", + start_date=datetime(2024, 1, 1), + catchup=False, +): + DbtTaskGroup( + group_id="transform_task_group", + project_config=ProjectConfig( + dbt_project_path=DBT_ROOT_PATH / "jaffle_shop", + manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json", + ), + profile_config=profile_config, + operator_args={ + "install_deps": True, + "vars": build_partial_dbt_env(), + }, + ) diff --git a/dev/dags/example_taskflow.py b/dev/dags/example_taskflow_project_config.py similarity index 96% rename from dev/dags/example_taskflow.py rename to dev/dags/example_taskflow_project_config.py index 84632f65cd..031faa15d7 100644 --- a/dev/dags/example_taskflow.py +++ b/dev/dags/example_taskflow_project_config.py @@ -29,7 +29,7 @@ def build_partial_dbt_env(): with DAG( - dag_id="example_taskflow", + dag_id="example_taskflow_project_config", schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False,