Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
36629ac
docs: Improve AIRFLOW_ASYNC docs
tatiana Nov 10, 2025
3fe4919
Fix rst issues
tatiana Nov 10, 2025
99e7b4d
fix formatting
tatiana Nov 10, 2025
1f78e72
Fix ExecutionMode table
tatiana Nov 10, 2025
b17b1e4
Fix ExecutionMode table
tatiana Nov 10, 2025
a80601b
Merge branch 'main' into improve-async-docs
tatiana Nov 10, 2025
6e66811
Update docs/getting_started/async-execution-mode.rst
tatiana Nov 10, 2025
3f6c8de
Merge branch 'main' into improve-async-docs
tatiana Nov 10, 2025
b6e0867
Apply suggestions from code review
tatiana Nov 10, 2025
84d7f85
Apply suggestions from code review
tatiana Nov 10, 2025
8dec64d
Apply suggestions from code review
tatiana Nov 10, 2025
b8c8621
Apply suggestions from code review
tatiana Nov 10, 2025
d480bf0
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Nov 10, 2025
0c83ee4
Apply suggestions from code review
tatiana Nov 10, 2025
c07aecf
Address code review feedback
tatiana Nov 10, 2025
3a3e2d9
Apply suggestions from code review
tatiana Nov 10, 2025
6c2984e
Apply suggestions from code review
tatiana Nov 10, 2025
370baad
Merge branch 'main' into improve-async-docs
tatiana Nov 10, 2025
53dfd74
Apply suggestion from @tatiana
tatiana Nov 11, 2025
0fbab26
Merge branch 'main' into improve-async-docs
tatiana Nov 11, 2025
87c4177
Apply suggestion from @tatiana
tatiana Nov 11, 2025
e6e0185
Apply suggestion from @tatiana
tatiana Nov 11, 2025
792393f
Apply suggestion from @tatiana
tatiana Nov 11, 2025
669861e
Apply suggestions from code review
tatiana Nov 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 59 additions & 16 deletions docs/getting_started/async-execution-mode.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,38 @@
Airflow Async Execution Mode
============================

The Airflow async execution mode in Cosmos is designed to improve pipeline performance. This execution mode could be preferred when you’ve long running resources and you want to run them asynchronously by leveraging Airflow’s `deferrable operators <https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html>`__. In this mode, additional operators—``SetupAsyncOperator`` and ``TeardownAsyncOperator``—are added to your workflow.
This execution mode can reduce the runtime by 35% in comparison to Cosmos LOCAL execution mode, but is currently only available for BigQuery. While this mode was introduced in Cosmos 1.9, we strongly encourage users to use Cosmos 1.11, which has significant performance improvements.

It can be particularly useful for long-running transformations, since it leverages Airflow's `deferrable operators <https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html>`__.

In this mode, there is a ``SetupAsyncOperator`` that will pre-generate the SQL files for the dbt project and upload them to Airflow XCom or a remote location. A remote location will only be used if users set ``AIRFLOW__COSMOS__REMOTE_TARGET_PATH`` and ``AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID``. This operator is run before the remaining pipeline.
All the pipeline dbt model transformations will be run using ``DbtRunAirflowAsyncOperator`` which, instead of running the ``dbt run`` command for each model. They will download the SQL files from the Airflow XCom or remote location and execute them directly leveraging the Airflow ``BigQueryInsertJobOperator``.

Users can leverage other existing ``BigQueryInsertJobOperator`` features, such as the UI controls to link to the job in the BigQuery UI.

- **SetupAsyncOperator:** This task runs a mocked ``dbt run`` command on your dbt project, which outputs compiled SQL files to the project’s target director. If ``upload_sql_xcom`` is enabled (default behaviour), the compiled SQL files will be uploaded to Airflow XCom. Otherwise, they will be uploaded to the remote location specified by the ``remote_target_path`` configuration.
- **TeardownAsyncOperator:** This task deletes the resources created by ``SetupAsyncOperator`` from the remote location defined by the ``remote_target_path`` configuration.

Advantages of Airflow Async Mode
++++++++++++++++++++++++++++++++

- **Improved Task Throughput:** Async tasks free up Airflow workers by leveraging the Airflow Trigger framework. While long-running SQL transformations are executing in the data warehouse, the worker is released and can handle other tasks, increasing overall task throughput.
- **Faster Task Execution:** With Cosmos ``SetupAsyncOperator``, the SQL transformations are precompiled and uploaded to xcom (default behaviour) or a remote location. Instead of invoking a full dbt run during each dbt model task, the SQL files are downloaded from this remote path and executed directly. This eliminates unnecessary overhead from running the full dbt command, resulting in faster and more efficient task execution. We have observed a dbt project with 129 models takes ~500 seconds with remote SQL file upload/download, but only ~2 seconds using xcom.
- **Better Resource Utilization:** By minimizing idle time on Airflow workers, async tasks allow more efficient use of compute resources. Workers aren't blocked waiting for external systems and can be reused for other work while waiting on async operations.
- **Faster Task Execution:** With Cosmos ``SetupAsyncOperator``, the SQL transformations are precompiled and uploaded to XCom (default behaviour) or a remote location. Instead of invoking a full dbt run during each dbt model task, the SQL files are downloaded from this XCom or remote path and executed directly. This eliminates unnecessary overhead from running the full dbt command, resulting in faster and more efficient task execution.

We have `observed <https://github.com/astronomer/astronomer-cosmos/pull/1934>`_ the following performance improvements by running a dbt project with 129 models:

+----------------------------------------------+--------------------------+
| How the dbt pipeline was executed | Execution Time (seconds) |
+==============================================+==========================+
| ``dbt run`` with dbt Core 1.10 | 13 |
+----------------------------------------------+--------------------------+
| Cosmos 1.11 with ExecutionMode.LOCAL | 11 |
+----------------------------------------------+--------------------------+
| Cosmos 1.11 with ExecutionMode.AIRFLOW_ASYNC | 7 |
+----------------------------------------------+--------------------------+


Getting Started with Airflow Async Mode
++++++++++++++++++++++++++++++++++
+++++++++++++++++++++++++++++++++++++++

This guide walks you through setting up an Astro CLI project and running a Cosmos-based DAG with a deferrable operator, enabling asynchronous task execution in Apache Airflow.

Expand Down Expand Up @@ -62,11 +80,6 @@ Edit your Dockerfile to ensure all necessary requirements are included.

FROM astrocrpublic.azurecr.io/runtime:3.0-2

# These environment variables configure Cosmos to upload and download
# compiled SQL files from the specified GCS bucket.
# The path is set to 'cosmos_remote_target_demo', and access is handled via the 'gcp_conn' Airflow connection.
ENV AIRFLOW__COSMOS__REMOTE_TARGET_PATH=gs://cosmos_remote_target_demo
ENV AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=gcp_conn

3. Add astronomer-cosmos Dependency
+++++++++++++++++++++++++++++++++++
Expand Down Expand Up @@ -132,6 +145,7 @@ In your ``requirements.txt``, add:
"location": "US",
"install_deps": True,
"full_refresh": True,
"virtualenv_dir": "dbt_venv",
},
)

Expand Down Expand Up @@ -196,14 +210,43 @@ Create an Airflow connection with following configurations

The ``run`` tasks will run asynchronously via the deferrable operator, freeing up worker slots while waiting on I/O or long-running tasks.

.. note::

1. The deferrable operator is currently supported for dbt models only when using BigQuery. Adding support for other adapters is on the roadmap.
Control of where to upload the SQL files
++++++++++++++++++++++++++++++++++++++++

For optimal performance we encourage to keep Cosmos standard behaviour (introduced in 1.11), which is to upload the SQL files to XCom, instead of a remote object location.

For the benchmakr example described in a previous section, there was an overhead of ~500 seconds with remote SQL file upload/download, but only ~2 seconds using XCom, which can outweigh the performance improvements introduced by using deferrable operators.

However, if you want to upload the SQL files to a remote object location instead of XCom, you can set the following environment variables:

.. code-block:: bash

AIRFLOW__COSMOS__REMOTE_TARGET_PATH=gs://cosmos_remote_target_demo
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=gcp_conn


Limitations
+++++++++++


1. **Airflow 2.8 or higher required**: This mode relies on Airflow's `Object Storage <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/objectstorage.html>`__ feature, introduced in Airflow 2.8, to store and retrieve compiled SQLs.

2. **Limited to dbt models**: Only dbt resource type models are run asynchronously using Airflow deferrable operators. Other resource types are executed synchronously, similar to the local execution mode.

3. **BigQuery support only**: This mode only supports BigQuery as the target database. If a different target is specified, Cosmos will throw an error indicating the target database is unsupported in this mode. Adding support for other adapters is on the roadmap.

4. **ProfileMapping parameter required**: You need to specify the ``ProfileMapping`` parameter in the ``ProfileConfig`` for your DAG. Refer to the example DAG below for details on setting this parameter.

5. **Location parameter required**: You must specify the location of the BigQuery dataset in the ``operator_args`` of the ``DbtDag`` or ``DbtTaskGroup``. The example DAG below provides guidance on this.

6. **async_py_requirements parameter required**: If you're using the default approach of having a setup task, you must specify the necessary dbt adapter Python requirements based on your profile type for the async execution mode in the ``ExecutionConfig`` of your ``DbtDag`` or ``DbtTaskGroup``. The example DAG below provides guidance on this.

7. **Creation of new isolated virtual environment for each task run**: By default, the ``SetupAsyncOperator`` creates and executes within a new isolated virtual environment for each task run, which can cause performance issues. To reuse an existing virtual environment, use the ``virtualenv_dir`` parameter within the ``operator_args`` of the ``DbtDag``. We have observed that for ``dbt-bigquery``, the ``SetupAsyncOperator`` executes approximately 30% faster when reusing an existing virtual environment, particularly for transformations that take around 10 minutes to complete.

2. By default, the ``SetupAsyncOperator`` creates and executes within a new isolated virtual environment for each task run, which can cause performance issues. To reuse an existing virtual environment, use the ``virtualenv_dir`` parameter within the ``operator_args`` of the ``DbtDag``. We have observed that for ``dbt-bigquery``, the ``SetupAsyncOperator`` executes approximately 30% faster when reusing an existing virtual environment, particularly for transformations that take around 10 minutes to complete.
8. **Performance degradation when uploading to remote object location**: Even though it is possible to upload the SQL files to a remote object location by setting environment variables, it is slow. We observed that this introduces a significant overhead in the execution time (500s for 129 models).

Example:
9. **TeardownAsyncOperator limitation**: When using a remote object location, in addition to the ``SetupAsyncOperator``, a ``TeardownAsyncOperator`` is also added to the DAG. This task will delete the SQL files from the remote location by the end of the DAG Run. This is can lead to a limitation from a retry perspective, as described in the issue `#2066 <https://github.com/astronomer/astronomer-cosmos/issues/2066>`_. This can be avoided by setting the ``enable_teardown_async_task`` configuration to ``False``, as described in the :ref:`enable_teardown_async_task` section.

.. code-block:: python

DbtDag(..., operator_args={"virtualenv_dir": "dbt_venv"})
For a comparison between different Cosmos execution modes, please, check the :ref:`execution-modes-comparison` section.
68 changes: 8 additions & 60 deletions docs/getting_started/execution-modes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Cosmos can run ``dbt`` commands using five different approaches, called ``execut

The choice of the ``execution mode`` can vary based on each user's needs and concerns. For more details, check each execution mode described below.

.. _execution-modes-comparison:

.. list-table:: Execution Modes Comparison
:widths: 25 25 25 25
Expand Down Expand Up @@ -60,11 +61,11 @@ The choice of the ``execution mode`` can vary based on each user's needs and con
- High
- No
* - Airflow Async
- Very Fast
- Medium
- None
- Yes
* - Local
- Very fast
* - Watcher
- Very Fast
- None
- Yes

Expand Down Expand Up @@ -288,82 +289,29 @@ Please refer to the step-by-step guide for using AWS ECS as the execution mode.
},
)

.. _airflow-async-execution-mode:
Airflow Async
-------------

.. versionadded:: 1.9.0

Although this execution mode was introduced in Cosmos 1.9, we strongly encourage users to use Cosmos 1.11, which has significant performance improvements.
In comparison to the ``local``, the ``airflow_async`` execution mode can reduce the execution time of a dbt project by up to 36%.

The ``airflow_async`` execution mode is a way to run the dbt resources from your dbt project using Apache Airflow's
`Deferrable operators <https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html>`__.
This execution mode could be preferred when you've long running resources and you want to run them asynchronously by
leveraging Airflow's deferrable operators. With that, you would be able to potentially observe higher throughput of tasks
as more dbt nodes will be run in parallel since they won't be blocking Airflow's worker slots.

In this mode, Cosmos adds a new operator, ``SetupAsyncOperator``, as a root task in the DbtDag or DbtTaskGroup. The task runs
the mocked ``dbt run`` command on your dbt project which then outputs compiled SQLs in the project's target directory.
As part of the same task run, these compiled SQLs are then stored remotely to a remote path set using the
:ref:`remote_target_path` configuration. The remote path is then used by the subsequent tasks in the DAG to
fetch (from the remote path) and run the compiled SQLs asynchronously using e.g. the ``SetupAsyncOperator``.
You may observe that the compile task takes a bit longer to run due to the latency of storing the compiled SQLs
remotely (e.g. for the classic ``jaffle_shop`` dbt project, upon compiling it produces about 31 files measuring about 124KB in total, but on a local
machine it took approximately 25 seconds for the task to compile & upload the compiled SQLs to the remote path).,
however, it is still a win as it is one-time overhead and the subsequent tasks run asynchronously utilising the Airflow's
deferrable operators and supplying to them those compiled SQLs. With this setup task, model tasks no longer require dbt
to be available or installed, eliminating the need to install dbt adapters in the same environment as the Airflow
installation. However, the virtual environment created during execution of the ``SetupAsyncOperator`` must install
the necessary dbt adapter for the setup task to function correctly. This can be achieved by specifying the required
dbt adapter in the ``async_py_requirements`` parameter within the ``ExecutionConfig`` of your ``DbtDag`` or ``DbtTaskGroup``.

Note that currently, the ``airflow_async`` execution mode has the following limitations:

1. **Airflow 2.8 or higher required**: This mode relies on Airflow's `Object Storage <https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/objectstorage.html>`__ feature, introduced in Airflow 2.8, to store and retrieve compiled SQLs.
2. **Limited to dbt models**: Only dbt resource type models are run asynchronously using Airflow deferrable operators. Other resource types are executed synchronously, similar to the local execution mode.
3. **BigQuery support only**: This mode only supports BigQuery as the target database. If a different target is specified, Cosmos will throw an error indicating the target database is unsupported in this mode.
4. **ProfileMapping parameter required**: You need to specify the ``ProfileMapping`` parameter in the ``ProfileConfig`` for your DAG. Refer to the example DAG below for details on setting this parameter.
5. **location parameter required**: You must specify the location of the BigQuery dataset in the ``operator_args`` of the ``DbtDag`` or ``DbtTaskGroup``. The example DAG below provides guidance on this.
6. **async_py_requirements parameter required**: If you're using the default approach of having a setup task, you must specify the necessary dbt adapter Python requirements based on your profile type for the async execution mode in the ``ExecutionConfig`` of your ``DbtDag`` or ``DbtTaskGroup``. The example DAG below provides guidance on this.

To start leveraging async execution mode that is currently supported for the BigQuery profile type targets you need to install Cosmos with the below additional dependencies:

.. code:: bash

astronomer-cosmos[dbt-bigquery, google]


Example DAG:

.. literalinclude:: ../../dev/dags/simple_dag_async.py
:language: python
:start-after: [START airflow_async_execution_mode_example]
:end-before: [END airflow_async_execution_mode_example]

**Known Issue:**

The ``dag test`` command failed with the following error, likely because the trigger does not fully initialize during the ``dag test``, leading to an uninitialized task instance.
This causes the BigQuery trigger to attempt accessing parameters of the Task Instance that are not properly initialized.

.. code:: bash

[2024-10-01T18:19:09.726+0530] {base_events.py:1738} ERROR - unhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-46' coro=<<async_generator_athrow without __name__>()> exception=AttributeError("'NoneType' object has no attribute 'dag_id'")>
Traceback (most recent call last):
File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 138, in run
yield TriggerEvent(
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 157, in run
if self.job_id and self.cancel_on_kill and self.safe_to_cancel():
File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 126, in safe_to_cancel
task_instance = self.get_task_instance() # type: ignore[call-arg]
File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/utils/session.py", line 97, in wrapper
return func(*args, session=session, **kwargs)
File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 102, in get_task_instance
TaskInstance.dag_id == self.task_instance.dag_id,
AttributeError: 'NoneType' object has no attribute 'dag_id'
For a full step-by-step guide and limitations, check the :ref:`async-execution-mode` page.


Watcher Execution Mode (Experimental)
Expand Down