Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ def validate_initial_user_config(
+ "If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None"
)

# Cosmos 2.0 will remove the ability to pass in operator_args with 'env' and 'vars' in place of ProjectConfig.env_vars and
# ProjectConfig.dbt_vars.
# Cosmos 2.0 will remove the ability to pass in operator_args with 'env' in place of ProjectConfig.env_vars.
Comment thread
tatiana marked this conversation as resolved.
if "env" in operator_args:
warn(
"operator_args with 'env' is deprecated since Cosmos 1.3 and will be removed in Cosmos 2.0. Use ProjectConfig.env_vars instead.",
Expand Down Expand Up @@ -254,7 +253,7 @@ def __init__(
# do not affect other DAGs or TaskGroups that may reuse the same original configuration
execution_config = copy.deepcopy(execution_config) if execution_config is not None else ExecutionConfig()
render_config = copy.deepcopy(render_config) if render_config is not None else RenderConfig()
operator_args = copy.deepcopy(operator_args) if operator_args is not None else {}
operator_args = copy.copy(operator_args) if operator_args is not None else {}

project_config.validate_project()
validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args)
Expand Down
48 changes: 48 additions & 0 deletions dev/dags/example_taskflow_operator_args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import os
from datetime import datetime
from pathlib import Path

from airflow import DAG
from airflow.decorators import task

from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))


profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
disable_event_tracking=True,
),
)


@task(task_id="build_partial_dbt_env_vars_operator")
def build_partial_dbt_env():
return {"ENV_VAR_NAME": "value", "ENV_VAR_NAME_2": False}


with DAG(
dag_id="example_taskflow_operator_args",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
):
DbtTaskGroup(
group_id="transform_task_group",
project_config=ProjectConfig(
dbt_project_path=DBT_ROOT_PATH / "jaffle_shop",
manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
),
profile_config=profile_config,
operator_args={
"install_deps": True,
"vars": build_partial_dbt_env(),
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def build_partial_dbt_env():


with DAG(
dag_id="example_taskflow",
dag_id="example_taskflow_project_config",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
Expand Down