From 2b8d722193a5fe12d354be7d44252972799b61e2 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Sat, 15 Nov 2025 02:46:29 +0530 Subject: [PATCH 01/12] Add setup_operator_args in ExecutionConfig --- cosmos/airflow/graph.py | 6 ++++-- cosmos/config.py | 2 ++ cosmos/converter.py | 5 +++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 32359d5c93..8b2b9652db 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -19,7 +19,7 @@ from airflow.utils.task_group import TaskGroup from cosmos import settings -from cosmos.config import RenderConfig +from cosmos.config import ExecutionConfig, RenderConfig from cosmos.constants import ( DBT_SETUP_ASYNC_TASK_ID, DBT_TEARDOWN_ASYNC_TASK_ID, @@ -803,6 +803,7 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro task_group: TaskGroup | None = None, on_warning_callback: Callable[..., Any] | None = None, # argument specific to the DBT test command async_py_requirements: list[str] | None = None, + execution_config: ExecutionConfig | None = None, ) -> dict[str, Union[TaskGroup, BaseOperator]]: """ Instantiate dbt `nodes` as Airflow tasks within the given `task_group` (optional) or `dag` (mandatory). @@ -912,9 +913,10 @@ def build_airflow_graph( # noqa: C901 TODO: https://github.com/astronomer/astro create_airflow_task_dependencies(nodes, tasks_map) if execution_mode == ExecutionMode.WATCHER: + setup_operator_args = getattr(execution_config, "setup_operator_args", None) or {} _add_producer_watcher_and_dependencies( dag=dag, - task_args=task_args, + task_args={**task_args, **setup_operator_args}, tasks_map=tasks_map, task_group=task_group, render_config=render_config, diff --git a/cosmos/config.py b/cosmos/config.py index c34c4c7bd3..fa025656ab 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -422,6 +422,7 @@ class ExecutionConfig: should be used for execution when execution mode is set to `ExecutionMode.VIRTUALENV` :param async_py_requirements: A list of Python packages to install when `ExecutionMode.AIRFLOW_ASYNC`(Experimental) is used. This parameter is required only if both `enable_setup_async_task` and `enable_teardown_async_task` are set to `True`. Example: `["dbt-postgres==1.5.0"]` + param setup_operator_args: A dictionary of producer operator parameters. These will override the values supplied in operator_args. """ execution_mode: ExecutionMode = ExecutionMode.LOCAL @@ -434,6 +435,7 @@ class ExecutionConfig: project_path: Path | None = field(init=False) async_py_requirements: list[str] | None = None + setup_operator_args: dict[str, Any] | None = None def __post_init__(self, dbt_project_path: str | Path | None) -> None: if self.invocation_mode and self.execution_mode not in ( diff --git a/cosmos/converter.py b/cosmos/converter.py index 803ad46f7b..4cb15eae8d 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -329,13 +329,14 @@ def __init__( nodes=self.dbt_graph.filtered_nodes, dag=dag or (task_group and task_group.dag), task_group=task_group, - execution_mode=execution_config.execution_mode, + execution_mode=execution_config.execution_mode, # TODO: Deprecate it task_args=task_args, test_indirect_selection=execution_config.test_indirect_selection, dbt_project_name=render_config.project_name, on_warning_callback=on_warning_callback, render_config=render_config, - async_py_requirements=execution_config.async_py_requirements, + async_py_requirements=execution_config.async_py_requirements, # TODO: Deprecate it + execution_config=execution_config, ) current_time = time.perf_counter() From 3f7e97b57e7fa6f4c80393c9b4cdc08a90782ba5 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Mon, 17 Nov 2025 13:16:23 +0530 Subject: [PATCH 02/12] cleanup --- cosmos/config.py | 2 +- cosmos/converter.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index fa025656ab..f8433e4f14 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -422,7 +422,7 @@ class ExecutionConfig: should be used for execution when execution mode is set to `ExecutionMode.VIRTUALENV` :param async_py_requirements: A list of Python packages to install when `ExecutionMode.AIRFLOW_ASYNC`(Experimental) is used. This parameter is required only if both `enable_setup_async_task` and `enable_teardown_async_task` are set to `True`. Example: `["dbt-postgres==1.5.0"]` - param setup_operator_args: A dictionary of producer operator parameters. These will override the values supplied in operator_args. + param setup_operator_args: A dictionary of producer operator parameters. These will override the values supplied in operator_args for producer operator. """ execution_mode: ExecutionMode = ExecutionMode.LOCAL diff --git a/cosmos/converter.py b/cosmos/converter.py index 4cb15eae8d..b56c91c668 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -329,13 +329,13 @@ def __init__( nodes=self.dbt_graph.filtered_nodes, dag=dag or (task_group and task_group.dag), task_group=task_group, - execution_mode=execution_config.execution_mode, # TODO: Deprecate it + execution_mode=execution_config.execution_mode, task_args=task_args, test_indirect_selection=execution_config.test_indirect_selection, dbt_project_name=render_config.project_name, on_warning_callback=on_warning_callback, render_config=render_config, - async_py_requirements=execution_config.async_py_requirements, # TODO: Deprecate it + async_py_requirements=execution_config.async_py_requirements, execution_config=execution_config, ) From e0871addfb2e07ce760de54d8c8722ca7edfeaee Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 18 Nov 2025 01:42:10 +0530 Subject: [PATCH 03/12] Add tests --- tests/operators/test_watcher.py | 40 +++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index bb25ed3792..8757f478c6 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -15,7 +15,7 @@ from airflow.utils.state import DagRunState from packaging.version import Version -from cosmos import DbtDag, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig +from cosmos import DbtDag, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig, TestBehavior from cosmos._triggers.watcher import WatcherTrigger from cosmos.config import InvocationMode from cosmos.constants import ExecutionMode @@ -28,7 +28,7 @@ DbtSeedWatcherOperator, DbtTestWatcherOperator, ) -from cosmos.profiles import PostgresUserPasswordProfileMapping +from cosmos.profiles import PostgresUserPasswordProfileMapping, get_automatic_profile_mapping from tests.utils import AIRFLOW_VERSION, new_test_dag DBT_PROJECT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt/jaffle_shop" @@ -974,3 +974,39 @@ def test_dbt_task_group_with_watcher_has_correct_dbt_cmd(): # Verify the command was built correctly assert full_cmd[1] == "build" # dbt build command assert "--full-refresh" in full_cmd + + +def test_sensor_args_import(mock_bigquery_conn): + profile_mapping = get_automatic_profile_mapping(mock_bigquery_conn.conn_id, {}) + _profile_config = ProfileConfig( + profile_name="airflow_db", + target_name="bq", + profile_mapping=profile_mapping, + ) + dbt_project_path = Path(__file__).parent.parent.parent / "dev/dags/dbt" + + dag = DbtDag( + project_config=ProjectConfig(dbt_project_path=dbt_project_path / "jaffle_shop"), + profile_config=_profile_config, + operator_args={ + "install_deps": True, + "full_refresh": True, + "deferrable": False, + "execution_timeout": timedelta(seconds=1), + }, + render_config=RenderConfig(test_behavior=TestBehavior.NONE), + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.WATCHER, setup_operator_args={"execution_timeout": timedelta(seconds=2)} + ), + schedule="@daily", + start_date=datetime(2025, 1, 1), + catchup=False, + dag_id="test_sensor_args_import", + ) + + for task in dag.tasks_map.values(): + print(task) + if isinstance(task, DbtProducerWatcherOperator): + assert task.execution_timeout == timedelta(seconds=2) + else: + assert task.execution_timeout == timedelta(seconds=1) From fca7bd010b293bec2c1bbf4815cd5a845b138e25 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 18 Nov 2025 01:43:06 +0530 Subject: [PATCH 04/12] Add tests --- tests/operators/test_watcher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 8757f478c6..84c92518ee 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -1005,7 +1005,6 @@ def test_sensor_args_import(mock_bigquery_conn): ) for task in dag.tasks_map.values(): - print(task) if isinstance(task, DbtProducerWatcherOperator): assert task.execution_timeout == timedelta(seconds=2) else: From a464d3f3c60bf01d899c4aadfde994c1a7d744bd Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 18 Nov 2025 01:55:11 +0530 Subject: [PATCH 05/12] Fix dependency --- pyproject.toml | 1 + tests/operators/test_watcher.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5a78067f6e..db7b7a1763 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -162,6 +162,7 @@ dependencies = [ "Werkzeug<3.0.0", "methodtools", "pytest-asyncio", + "dbt-bigquery", ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 84c92518ee..04006797b7 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -976,7 +976,7 @@ def test_dbt_task_group_with_watcher_has_correct_dbt_cmd(): assert "--full-refresh" in full_cmd -def test_sensor_args_import(mock_bigquery_conn): +def test_sensor_and_producer_different_param_values(mock_bigquery_conn): profile_mapping = get_automatic_profile_mapping(mock_bigquery_conn.conn_id, {}) _profile_config = ProfileConfig( profile_name="airflow_db", From 04fc5443bbd965e7588446123c3fd95b76abfc33 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 18 Nov 2025 02:07:34 +0530 Subject: [PATCH 06/12] Fix dependency --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index db7b7a1763..5f6d42aa68 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -162,7 +162,7 @@ dependencies = [ "Werkzeug<3.0.0", "methodtools", "pytest-asyncio", - "dbt-bigquery", + "dbt-bigquery<1.11", ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] From 65b304ccff3096bd95acb0e098d6a63c7c7cfab1 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 18 Nov 2025 02:11:08 +0530 Subject: [PATCH 07/12] Fix dependency --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5f6d42aa68..b714462a85 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -162,7 +162,8 @@ dependencies = [ "Werkzeug<3.0.0", "methodtools", "pytest-asyncio", - "dbt-bigquery<1.11", + "dbt-bigquery", + "click<8.2.0", ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] From 71e5f4a59b19fb0f8ffef347334c7291d73e6f95 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 18 Nov 2025 02:14:07 +0530 Subject: [PATCH 08/12] Fix dependency --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b714462a85..39b7f7f1b9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -163,7 +163,7 @@ dependencies = [ "methodtools", "pytest-asyncio", "dbt-bigquery", - "click<8.2.0", + "click=<8.1.3", ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] From b3b741bd21c69428ca0dc1897c3e035a3749371a Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 18 Nov 2025 02:17:28 +0530 Subject: [PATCH 09/12] Fix dependency --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 39b7f7f1b9..7642a48afd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -163,7 +163,7 @@ dependencies = [ "methodtools", "pytest-asyncio", "dbt-bigquery", - "click=<8.1.3", + "click<=8.1.3", ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] From 68ff9d94fd64503376c595c0a5718b9bc3cce2d1 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 18 Nov 2025 02:26:42 +0530 Subject: [PATCH 10/12] Fix dependency --- pyproject.toml | 1 - scripts/test/unit-cov.sh | 1 + scripts/test/unit.sh | 1 + 3 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 7642a48afd..db7b7a1763 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -163,7 +163,6 @@ dependencies = [ "methodtools", "pytest-asyncio", "dbt-bigquery", - "click<=8.1.3", ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] diff --git a/scripts/test/unit-cov.sh b/scripts/test/unit-cov.sh index 50cd268c40..3dd8576936 100644 --- a/scripts/test/unit-cov.sh +++ b/scripts/test/unit-cov.sh @@ -1,3 +1,4 @@ +pip install "click<=8.1.3" pytest \ -vv \ --cov=cosmos \ diff --git a/scripts/test/unit.sh b/scripts/test/unit.sh index 373c109a73..fd66bb142f 100644 --- a/scripts/test/unit.sh +++ b/scripts/test/unit.sh @@ -1,3 +1,4 @@ +pip install "click<=8.1.3" pytest \ -vv \ -m "not (integration or perf or dbtfusion)" \ From ac0b1e2f09a1543b0312f7b8cd05c6772cd43bcb Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 18 Nov 2025 02:35:37 +0530 Subject: [PATCH 11/12] Fix dependency --- pyproject.toml | 1 + scripts/test/unit-cov.sh | 1 - scripts/test/unit.sh | 1 - 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index db7b7a1763..6c0417864c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -163,6 +163,7 @@ dependencies = [ "methodtools", "pytest-asyncio", "dbt-bigquery", + "click<8.1.0" ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] diff --git a/scripts/test/unit-cov.sh b/scripts/test/unit-cov.sh index 3dd8576936..50cd268c40 100644 --- a/scripts/test/unit-cov.sh +++ b/scripts/test/unit-cov.sh @@ -1,4 +1,3 @@ -pip install "click<=8.1.3" pytest \ -vv \ --cov=cosmos \ diff --git a/scripts/test/unit.sh b/scripts/test/unit.sh index fd66bb142f..373c109a73 100644 --- a/scripts/test/unit.sh +++ b/scripts/test/unit.sh @@ -1,4 +1,3 @@ -pip install "click<=8.1.3" pytest \ -vv \ -m "not (integration or perf or dbtfusion)" \ From 0790cff0bdd8b20888237eb570ec955922e3f642 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 18 Nov 2025 13:31:15 +0530 Subject: [PATCH 12/12] Fix dependency --- pyproject.toml | 2 -- tests/operators/test_watcher.py | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6c0417864c..5a78067f6e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -162,8 +162,6 @@ dependencies = [ "Werkzeug<3.0.0", "methodtools", "pytest-asyncio", - "dbt-bigquery", - "click<8.1.0" ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] diff --git a/tests/operators/test_watcher.py b/tests/operators/test_watcher.py index 04006797b7..1cf1ba0c2e 100644 --- a/tests/operators/test_watcher.py +++ b/tests/operators/test_watcher.py @@ -976,6 +976,7 @@ def test_dbt_task_group_with_watcher_has_correct_dbt_cmd(): assert "--full-refresh" in full_cmd +@pytest.mark.integration def test_sensor_and_producer_different_param_values(mock_bigquery_conn): profile_mapping = get_automatic_profile_mapping(mock_bigquery_conn.conn_id, {}) _profile_config = ProfileConfig(