From 1e6c8e74018a68b402b353ff3839b9f531f0c6d1 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 12 Nov 2025 14:19:14 +0530 Subject: [PATCH 01/13] Document Async ExecutionMode.WATCHER --- dev/dags/example_watcher.py | 5 +- dev/dags/example_watcher_deferrable.py | 93 ++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 dev/dags/example_watcher_deferrable.py diff --git a/dev/dags/example_watcher.py b/dev/dags/example_watcher.py index 2ca25fabb5..9bcbe77748 100644 --- a/dev/dags/example_watcher.py +++ b/dev/dags/example_watcher.py @@ -46,7 +46,10 @@ project_config=ProjectConfig(DBT_PROJECT_PATH), profile_config=profile_config, render_config=RenderConfig(exclude=["raw_payments"]), - operator_args=operator_args, + operator_args={ + **operator_args, + "deferrable": False + }, # normal dag parameters schedule="@daily", start_date=datetime(2023, 1, 1), diff --git a/dev/dags/example_watcher_deferrable.py b/dev/dags/example_watcher_deferrable.py new file mode 100644 index 0000000000..26506d7c5d --- /dev/null +++ b/dev/dags/example_watcher_deferrable.py @@ -0,0 +1,93 @@ +""" +An example DAG that uses Cosmos to render a dbt project into an Airflow DAG. +""" + +import os +from datetime import datetime, timedelta +from pathlib import Path + +from cosmos import DbtDag, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.constants import ExecutionMode +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) +DBT_PROJECT_NAME = os.getenv("DBT_PROJECT_NAME", "jaffle_shop") +DBT_PROJECT_PATH = DBT_ROOT_PATH / DBT_PROJECT_NAME + + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + + +operator_args = { + "install_deps": True, # install any necessary dependencies before running any dbt command + "execution_timeout": timedelta(seconds=120), +} + +# Currently airflow dags test ignores priority_weight and weight_rule, for this reason, we're setting the following in the CI only: +if os.getenv("CI"): + operator_args["trigger_rule"] = "all_success" + + +from cosmos.constants import InvocationMode + +# [START example_watcher_deferrable] +example_watcher = DbtDag( + # dbt/cosmos-specific parameters + execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER, invocation_mode=InvocationMode.DBT_RUNNER), + project_config=ProjectConfig(DBT_PROJECT_PATH), + profile_config=profile_config, + render_config=RenderConfig(exclude=["raw_payments"]), + operator_args=operator_args, + # normal dag parameters + schedule="@daily", + start_date=datetime(2023, 1, 1), + catchup=False, + dag_id="example_watcher", + default_args={"retries": 0}, +) +# [END example_watcher_deferrable] + + +# This is not being executed in the CI, but it works in Airflow locally via standalone and in Astro CLI +""" +from airflow.models import DAG + +try: + from airflow.providers.standard.operators.empty import EmptyOperator +except ImportError: + from airflow.operators.empty import EmptyOperator + +from cosmos import DbtTaskGroup + +# [START example_watcher_taskgroup] +with DAG( + dag_id="example_watcher_taskgroup", + schedule="@daily", + start_date=datetime(2023, 1, 1), + catchup=False, +): + pre_dbt = EmptyOperator(task_id="pre_dbt") + + first_dbt_task_group = DbtTaskGroup( + group_id="first_dbt_task_group", + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.WATCHER, + ), + render_config=RenderConfig(select=["*customers*"], exclude=["path:seeds"]), + project_config=ProjectConfig(DBT_PROJECT_PATH), + profile_config=profile_config, + operator_args=operator_args, + ) + + pre_dbt >> first_dbt_task_group +# [END example_watcher_taskgroup] +""" From f02f885b1463c702484326932ca7b24dfe2c2ac2 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 12 Nov 2025 14:20:49 +0530 Subject: [PATCH 02/13] Document Async ExecutionMode.WATCHER --- dev/dags/example_watcher_deferrable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/dags/example_watcher_deferrable.py b/dev/dags/example_watcher_deferrable.py index 26506d7c5d..45e2bc4658 100644 --- a/dev/dags/example_watcher_deferrable.py +++ b/dev/dags/example_watcher_deferrable.py @@ -40,7 +40,7 @@ from cosmos.constants import InvocationMode # [START example_watcher_deferrable] -example_watcher = DbtDag( +example_watcher_deferrable = DbtDag( # dbt/cosmos-specific parameters execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER, invocation_mode=InvocationMode.DBT_RUNNER), project_config=ProjectConfig(DBT_PROJECT_PATH), From 8918fb831196fca5147ddc9f8c4132bcbde0d0b1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 12 Nov 2025 08:52:21 +0000 Subject: [PATCH 03/13] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dev/dags/example_watcher.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/dev/dags/example_watcher.py b/dev/dags/example_watcher.py index 9bcbe77748..bf16a5ab9e 100644 --- a/dev/dags/example_watcher.py +++ b/dev/dags/example_watcher.py @@ -46,10 +46,7 @@ project_config=ProjectConfig(DBT_PROJECT_PATH), profile_config=profile_config, render_config=RenderConfig(exclude=["raw_payments"]), - operator_args={ - **operator_args, - "deferrable": False - }, + operator_args={**operator_args, "deferrable": False}, # normal dag parameters schedule="@daily", start_date=datetime(2023, 1, 1), From 406794a7f4280f38acc8b32b795568ccc2896599 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 12 Nov 2025 14:32:31 +0530 Subject: [PATCH 04/13] cleanup --- dev/dags/example_watcher_deferrable.py | 38 +------------------------- 1 file changed, 1 insertion(+), 37 deletions(-) diff --git a/dev/dags/example_watcher_deferrable.py b/dev/dags/example_watcher_deferrable.py index 45e2bc4658..7565305119 100644 --- a/dev/dags/example_watcher_deferrable.py +++ b/dev/dags/example_watcher_deferrable.py @@ -51,43 +51,7 @@ schedule="@daily", start_date=datetime(2023, 1, 1), catchup=False, - dag_id="example_watcher", + dag_id="example_watcher_deferrable", default_args={"retries": 0}, ) # [END example_watcher_deferrable] - - -# This is not being executed in the CI, but it works in Airflow locally via standalone and in Astro CLI -""" -from airflow.models import DAG - -try: - from airflow.providers.standard.operators.empty import EmptyOperator -except ImportError: - from airflow.operators.empty import EmptyOperator - -from cosmos import DbtTaskGroup - -# [START example_watcher_taskgroup] -with DAG( - dag_id="example_watcher_taskgroup", - schedule="@daily", - start_date=datetime(2023, 1, 1), - catchup=False, -): - pre_dbt = EmptyOperator(task_id="pre_dbt") - - first_dbt_task_group = DbtTaskGroup( - group_id="first_dbt_task_group", - execution_config=ExecutionConfig( - execution_mode=ExecutionMode.WATCHER, - ), - render_config=RenderConfig(select=["*customers*"], exclude=["path:seeds"]), - project_config=ProjectConfig(DBT_PROJECT_PATH), - profile_config=profile_config, - operator_args=operator_args, - ) - - pre_dbt >> first_dbt_task_group -# [END example_watcher_taskgroup] -""" From c181f8085778c082dbb81711f97224d20a5cf6cf Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 12 Nov 2025 17:11:56 +0530 Subject: [PATCH 05/13] Keep sync and async dag in same file --- dev/dags/example_watcher.py | 18 +++++++- dev/dags/example_watcher_deferrable.py | 57 -------------------------- 2 files changed, 17 insertions(+), 58 deletions(-) delete mode 100644 dev/dags/example_watcher_deferrable.py diff --git a/dev/dags/example_watcher.py b/dev/dags/example_watcher.py index bf16a5ab9e..968ca3a814 100644 --- a/dev/dags/example_watcher.py +++ b/dev/dags/example_watcher.py @@ -46,7 +46,7 @@ project_config=ProjectConfig(DBT_PROJECT_PATH), profile_config=profile_config, render_config=RenderConfig(exclude=["raw_payments"]), - operator_args={**operator_args, "deferrable": False}, + operator_args=operator_args, # normal dag parameters schedule="@daily", start_date=datetime(2023, 1, 1), @@ -91,3 +91,19 @@ pre_dbt >> first_dbt_task_group # [END example_watcher_taskgroup] """ + +# [START example_watcher_synchronous] +example_watcher_synchronous = DbtDag( + # dbt/cosmos-specific parameters + execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER, invocation_mode=InvocationMode.DBT_RUNNER), + project_config=ProjectConfig(DBT_PROJECT_PATH), + profile_config=profile_config, + operator_args={**operator_args, "deferrable": False}, + # normal dag parameters + schedule="@daily", + start_date=datetime(2023, 1, 1), + catchup=False, + dag_id="example_watcher_synchronous", + default_args={"retries": 0}, +) +# [END example_watcher_synchronous] diff --git a/dev/dags/example_watcher_deferrable.py b/dev/dags/example_watcher_deferrable.py deleted file mode 100644 index 7565305119..0000000000 --- a/dev/dags/example_watcher_deferrable.py +++ /dev/null @@ -1,57 +0,0 @@ -""" -An example DAG that uses Cosmos to render a dbt project into an Airflow DAG. -""" - -import os -from datetime import datetime, timedelta -from pathlib import Path - -from cosmos import DbtDag, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import ExecutionMode -from cosmos.profiles import PostgresUserPasswordProfileMapping - -DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" -DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) -DBT_PROJECT_NAME = os.getenv("DBT_PROJECT_NAME", "jaffle_shop") -DBT_PROJECT_PATH = DBT_ROOT_PATH / DBT_PROJECT_NAME - - -profile_config = ProfileConfig( - profile_name="default", - target_name="dev", - profile_mapping=PostgresUserPasswordProfileMapping( - conn_id="example_conn", - profile_args={"schema": "public"}, - disable_event_tracking=True, - ), -) - - -operator_args = { - "install_deps": True, # install any necessary dependencies before running any dbt command - "execution_timeout": timedelta(seconds=120), -} - -# Currently airflow dags test ignores priority_weight and weight_rule, for this reason, we're setting the following in the CI only: -if os.getenv("CI"): - operator_args["trigger_rule"] = "all_success" - - -from cosmos.constants import InvocationMode - -# [START example_watcher_deferrable] -example_watcher_deferrable = DbtDag( - # dbt/cosmos-specific parameters - execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER, invocation_mode=InvocationMode.DBT_RUNNER), - project_config=ProjectConfig(DBT_PROJECT_PATH), - profile_config=profile_config, - render_config=RenderConfig(exclude=["raw_payments"]), - operator_args=operator_args, - # normal dag parameters - schedule="@daily", - start_date=datetime(2023, 1, 1), - catchup=False, - dag_id="example_watcher_deferrable", - default_args={"retries": 0}, -) -# [END example_watcher_deferrable] From b788546062aadbfd3ff97ffb569b946c353bd8aa Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 12 Nov 2025 17:13:30 +0530 Subject: [PATCH 06/13] Use default invocation mode --- dev/dags/example_watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/dags/example_watcher.py b/dev/dags/example_watcher.py index 968ca3a814..4c6118304d 100644 --- a/dev/dags/example_watcher.py +++ b/dev/dags/example_watcher.py @@ -95,7 +95,7 @@ # [START example_watcher_synchronous] example_watcher_synchronous = DbtDag( # dbt/cosmos-specific parameters - execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER, invocation_mode=InvocationMode.DBT_RUNNER), + execution_config=ExecutionConfig(execution_mode=ExecutionMode.WATCHER), project_config=ProjectConfig(DBT_PROJECT_PATH), profile_config=profile_config, operator_args={**operator_args, "deferrable": False}, From c9b2042f650c7b73a22589780bf6f8cfcdc63f8d Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 12 Nov 2025 17:14:20 +0530 Subject: [PATCH 07/13] Fix some airflow param --- dev/dags/example_watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/dags/example_watcher.py b/dev/dags/example_watcher.py index 4c6118304d..8cbefffb59 100644 --- a/dev/dags/example_watcher.py +++ b/dev/dags/example_watcher.py @@ -101,7 +101,7 @@ operator_args={**operator_args, "deferrable": False}, # normal dag parameters schedule="@daily", - start_date=datetime(2023, 1, 1), + start_date=datetime(2025, 1, 1), catchup=False, dag_id="example_watcher_synchronous", default_args={"retries": 0}, From ab30e0c93e5c0a12a217763d0868b8fc14d50447 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 12 Nov 2025 17:14:40 +0530 Subject: [PATCH 08/13] Fix some airflow param --- dev/dags/example_watcher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dev/dags/example_watcher.py b/dev/dags/example_watcher.py index 8cbefffb59..9de2055287 100644 --- a/dev/dags/example_watcher.py +++ b/dev/dags/example_watcher.py @@ -104,6 +104,5 @@ start_date=datetime(2025, 1, 1), catchup=False, dag_id="example_watcher_synchronous", - default_args={"retries": 0}, ) # [END example_watcher_synchronous] From d30cd09c7203d29961185c33d570b5fb09e52423 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 12 Nov 2025 17:44:40 +0530 Subject: [PATCH 09/13] Add docs for async execution --- .../watcher-execution-mode.rst | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/docs/getting_started/watcher-execution-mode.rst b/docs/getting_started/watcher-execution-mode.rst index f937d0a020..a7cd20232f 100644 --- a/docs/getting_started/watcher-execution-mode.rst +++ b/docs/getting_started/watcher-execution-mode.rst @@ -334,10 +334,31 @@ Synchronous sensor execution In Cosmos 1.11.0, the ``DbtConsumerWatcherSensor`` operator is implemented as a synchronous XCom sensor, which continuously occupies the worker slot - even if they're just sleeping and checking periodically. +.. TODO: Remove the following asynchronous mention when approaching the 1.12.0 release. + An improvement is to change this behaviour and implement an asynchronous sensor execution, so that the worker slot is released until the condition, validated by the Airflow triggerer, is met. The ticket to implement this behaviour is `#2059 `_. +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Asynchronous sensor execution +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Starting with Cosmos 1.12.0, the ``DbtConsumerWatcherSensor`` supports +`deferrable (asynchronous) execution `_. Deferrable execution frees up the Airflow worker slot, while task status monitoring is handled by the Airflow triggerer component, +which increases overall task throughput. By default, the sensor now runs in deferrable mode. + +**Limitations:** +- Deferrable execution is currently supported only for dbt model (i.e., run) commands. +- Deferrable execution applies only to the first task attempt (try number 0). For subsequent retries, the sensor falls back to synchronous execution. + +To disable asynchronous execution, set the ``deferrable`` flag to ``False`` in the ``operator_args``. + +.. literalinclude:: ../../dev/dags/example_watcher.py + :language: python + :start-after: [START example_watcher_synchronous] + :end-before: [END example_watcher_synchronous] + ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Airflow Datasets and Assets ~~~~~~~~~~~~~~~~~~~~~~~~~~~ From 03c668afef646f557f9244723fa470a0148bb4ea Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 12 Nov 2025 17:52:18 +0530 Subject: [PATCH 10/13] Fix rendering --- docs/getting_started/watcher-execution-mode.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/getting_started/watcher-execution-mode.rst b/docs/getting_started/watcher-execution-mode.rst index a7cd20232f..2c198b5dec 100644 --- a/docs/getting_started/watcher-execution-mode.rst +++ b/docs/getting_started/watcher-execution-mode.rst @@ -349,6 +349,7 @@ Starting with Cosmos 1.12.0, the ``DbtConsumerWatcherSensor`` supports which increases overall task throughput. By default, the sensor now runs in deferrable mode. **Limitations:** + - Deferrable execution is currently supported only for dbt model (i.e., run) commands. - Deferrable execution applies only to the first task attempt (try number 0). For subsequent retries, the sensor falls back to synchronous execution. From 9466054124997c3fb3b9afb9b2d9d43c5a6a717c Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 12 Nov 2025 18:20:59 +0530 Subject: [PATCH 11/13] Fix try_no --- docs/getting_started/watcher-execution-mode.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/getting_started/watcher-execution-mode.rst b/docs/getting_started/watcher-execution-mode.rst index 2c198b5dec..786999a1ec 100644 --- a/docs/getting_started/watcher-execution-mode.rst +++ b/docs/getting_started/watcher-execution-mode.rst @@ -351,7 +351,7 @@ which increases overall task throughput. By default, the sensor now runs in defe **Limitations:** - Deferrable execution is currently supported only for dbt model (i.e., run) commands. -- Deferrable execution applies only to the first task attempt (try number 0). For subsequent retries, the sensor falls back to synchronous execution. +- Deferrable execution applies only to the first task attempt (try number 1). For subsequent retries, the sensor falls back to synchronous execution. To disable asynchronous execution, set the ``deferrable`` flag to ``False`` in the ``operator_args``. From 03eb0a49f877abced93ff3678ae935b65c64af5d Mon Sep 17 00:00:00 2001 From: pankajastro Date: Wed, 12 Nov 2025 20:31:02 +0530 Subject: [PATCH 12/13] Address feedback --- docs/getting_started/watcher-execution-mode.rst | 6 ------ 1 file changed, 6 deletions(-) diff --git a/docs/getting_started/watcher-execution-mode.rst b/docs/getting_started/watcher-execution-mode.rst index 786999a1ec..76fb6c2d79 100644 --- a/docs/getting_started/watcher-execution-mode.rst +++ b/docs/getting_started/watcher-execution-mode.rst @@ -334,12 +334,6 @@ Synchronous sensor execution In Cosmos 1.11.0, the ``DbtConsumerWatcherSensor`` operator is implemented as a synchronous XCom sensor, which continuously occupies the worker slot - even if they're just sleeping and checking periodically. -.. TODO: Remove the following asynchronous mention when approaching the 1.12.0 release. - -An improvement is to change this behaviour and implement an asynchronous sensor execution, so that the worker slot is released until the condition, validated by the Airflow triggerer, is met. - -The ticket to implement this behaviour is `#2059 `_. - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Asynchronous sensor execution ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ From f943fd96584714684e44c38a96c1a5276f67fad3 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 13 Nov 2025 22:31:53 +0530 Subject: [PATCH 13/13] Address feedback --- docs/getting_started/watcher-execution-mode.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/getting_started/watcher-execution-mode.rst b/docs/getting_started/watcher-execution-mode.rst index 76fb6c2d79..018c70cdcf 100644 --- a/docs/getting_started/watcher-execution-mode.rst +++ b/docs/getting_started/watcher-execution-mode.rst @@ -344,7 +344,7 @@ which increases overall task throughput. By default, the sensor now runs in defe **Limitations:** -- Deferrable execution is currently supported only for dbt model (i.e., run) commands. +- Deferrable execution is currently supported only for dbt models, seeds and snapshots. - Deferrable execution applies only to the first task attempt (try number 1). For subsequent retries, the sensor falls back to synchronous execution. To disable asynchronous execution, set the ``deferrable`` flag to ``False`` in the ``operator_args``.