diff --git a/docs/getting_started/async-execution-mode.rst b/docs/getting_started/async-execution-mode.rst index ce38d69119..a7b8d3040e 100644 --- a/docs/getting_started/async-execution-mode.rst +++ b/docs/getting_started/async-execution-mode.rst @@ -5,20 +5,38 @@ Airflow Async Execution Mode ============================ -The Airflow async execution mode in Cosmos is designed to improve pipeline performance. This execution mode could be preferred when you’ve long running resources and you want to run them asynchronously by leveraging Airflow’s `deferrable operators `__. In this mode, additional operators—``SetupAsyncOperator`` and ``TeardownAsyncOperator``—are added to your workflow. +This execution mode can reduce the runtime by 35% in comparison to Cosmos LOCAL execution mode, but is currently only available for BigQuery. While this mode was introduced in Cosmos 1.9, we strongly encourage users to use Cosmos 1.11, which has significant performance improvements. + +It can be particularly useful for long-running transformations, since it leverages Airflow's `deferrable operators `__. + +In this mode, there is a ``SetupAsyncOperator`` that will pre-generate the SQL files for the dbt project and upload them to Airflow XCom or a remote location. A remote location will only be used if users set ``AIRFLOW__COSMOS__REMOTE_TARGET_PATH`` and ``AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID``. This operator is run before the remaining pipeline. +All the pipeline dbt model transformations will be run using ``DbtRunAirflowAsyncOperator`` which, instead of running the ``dbt run`` command for each model. They will download the SQL files from the Airflow XCom or remote location and execute them directly leveraging the Airflow ``BigQueryInsertJobOperator``. + +Users can leverage other existing ``BigQueryInsertJobOperator`` features, such as the UI controls to link to the job in the BigQuery UI. -- **SetupAsyncOperator:** This task runs a mocked ``dbt run`` command on your dbt project, which outputs compiled SQL files to the project’s target director. If ``upload_sql_xcom`` is enabled (default behaviour), the compiled SQL files will be uploaded to Airflow XCom. Otherwise, they will be uploaded to the remote location specified by the ``remote_target_path`` configuration. -- **TeardownAsyncOperator:** This task deletes the resources created by ``SetupAsyncOperator`` from the remote location defined by the ``remote_target_path`` configuration. Advantages of Airflow Async Mode ++++++++++++++++++++++++++++++++ - **Improved Task Throughput:** Async tasks free up Airflow workers by leveraging the Airflow Trigger framework. While long-running SQL transformations are executing in the data warehouse, the worker is released and can handle other tasks, increasing overall task throughput. -- **Faster Task Execution:** With Cosmos ``SetupAsyncOperator``, the SQL transformations are precompiled and uploaded to xcom (default behaviour) or a remote location. Instead of invoking a full dbt run during each dbt model task, the SQL files are downloaded from this remote path and executed directly. This eliminates unnecessary overhead from running the full dbt command, resulting in faster and more efficient task execution. We have observed a dbt project with 129 models takes ~500 seconds with remote SQL file upload/download, but only ~2 seconds using xcom. - **Better Resource Utilization:** By minimizing idle time on Airflow workers, async tasks allow more efficient use of compute resources. Workers aren't blocked waiting for external systems and can be reused for other work while waiting on async operations. +- **Faster Task Execution:** With Cosmos ``SetupAsyncOperator``, the SQL transformations are precompiled and uploaded to XCom (default behaviour) or a remote location. Instead of invoking a full dbt run during each dbt model task, the SQL files are downloaded from this XCom or remote path and executed directly. This eliminates unnecessary overhead from running the full dbt command, resulting in faster and more efficient task execution. + +We have `observed `_ the following performance improvements by running a dbt project with 129 models: + ++----------------------------------------------+--------------------------+ +| How the dbt pipeline was executed | Execution Time (seconds) | ++==============================================+==========================+ +| ``dbt run`` with dbt Core 1.10 | 13 | ++----------------------------------------------+--------------------------+ +| Cosmos 1.11 with ExecutionMode.LOCAL | 11 | ++----------------------------------------------+--------------------------+ +| Cosmos 1.11 with ExecutionMode.AIRFLOW_ASYNC | 7 | ++----------------------------------------------+--------------------------+ + Getting Started with Airflow Async Mode -++++++++++++++++++++++++++++++++++ ++++++++++++++++++++++++++++++++++++++++ This guide walks you through setting up an Astro CLI project and running a Cosmos-based DAG with a deferrable operator, enabling asynchronous task execution in Apache Airflow. @@ -62,11 +80,6 @@ Edit your Dockerfile to ensure all necessary requirements are included. FROM astrocrpublic.azurecr.io/runtime:3.0-2 - # These environment variables configure Cosmos to upload and download - # compiled SQL files from the specified GCS bucket. - # The path is set to 'cosmos_remote_target_demo', and access is handled via the 'gcp_conn' Airflow connection. - ENV AIRFLOW__COSMOS__REMOTE_TARGET_PATH=gs://cosmos_remote_target_demo - ENV AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=gcp_conn 3. Add astronomer-cosmos Dependency +++++++++++++++++++++++++++++++++++ @@ -132,6 +145,7 @@ In your ``requirements.txt``, add: "location": "US", "install_deps": True, "full_refresh": True, + "virtualenv_dir": "dbt_venv", }, ) @@ -196,14 +210,43 @@ Create an Airflow connection with following configurations The ``run`` tasks will run asynchronously via the deferrable operator, freeing up worker slots while waiting on I/O or long-running tasks. -.. note:: - 1. The deferrable operator is currently supported for dbt models only when using BigQuery. Adding support for other adapters is on the roadmap. +Control of where to upload the SQL files +++++++++++++++++++++++++++++++++++++++++ + +For optimal performance we encourage to keep Cosmos standard behaviour (introduced in 1.11), which is to upload the SQL files to XCom, instead of a remote object location. + +For the benchmakr example described in a previous section, there was an overhead of ~500 seconds with remote SQL file upload/download, but only ~2 seconds using XCom, which can outweigh the performance improvements introduced by using deferrable operators. + +However, if you want to upload the SQL files to a remote object location instead of XCom, you can set the following environment variables: + +.. code-block:: bash + + AIRFLOW__COSMOS__REMOTE_TARGET_PATH=gs://cosmos_remote_target_demo + AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=gcp_conn + + +Limitations ++++++++++++ + + +1. **Airflow 2.8 or higher required**: This mode relies on Airflow's `Object Storage `__ feature, introduced in Airflow 2.8, to store and retrieve compiled SQLs. + +2. **Limited to dbt models**: Only dbt resource type models are run asynchronously using Airflow deferrable operators. Other resource types are executed synchronously, similar to the local execution mode. + +3. **BigQuery support only**: This mode only supports BigQuery as the target database. If a different target is specified, Cosmos will throw an error indicating the target database is unsupported in this mode. Adding support for other adapters is on the roadmap. + +4. **ProfileMapping parameter required**: You need to specify the ``ProfileMapping`` parameter in the ``ProfileConfig`` for your DAG. Refer to the example DAG below for details on setting this parameter. + +5. **Location parameter required**: You must specify the location of the BigQuery dataset in the ``operator_args`` of the ``DbtDag`` or ``DbtTaskGroup``. The example DAG below provides guidance on this. + +6. **async_py_requirements parameter required**: If you're using the default approach of having a setup task, you must specify the necessary dbt adapter Python requirements based on your profile type for the async execution mode in the ``ExecutionConfig`` of your ``DbtDag`` or ``DbtTaskGroup``. The example DAG below provides guidance on this. + +7. **Creation of new isolated virtual environment for each task run**: By default, the ``SetupAsyncOperator`` creates and executes within a new isolated virtual environment for each task run, which can cause performance issues. To reuse an existing virtual environment, use the ``virtualenv_dir`` parameter within the ``operator_args`` of the ``DbtDag``. We have observed that for ``dbt-bigquery``, the ``SetupAsyncOperator`` executes approximately 30% faster when reusing an existing virtual environment, particularly for transformations that take around 10 minutes to complete. - 2. By default, the ``SetupAsyncOperator`` creates and executes within a new isolated virtual environment for each task run, which can cause performance issues. To reuse an existing virtual environment, use the ``virtualenv_dir`` parameter within the ``operator_args`` of the ``DbtDag``. We have observed that for ``dbt-bigquery``, the ``SetupAsyncOperator`` executes approximately 30% faster when reusing an existing virtual environment, particularly for transformations that take around 10 minutes to complete. +8. **Performance degradation when uploading to remote object location**: Even though it is possible to upload the SQL files to a remote object location by setting environment variables, it is slow. We observed that this introduces a significant overhead in the execution time (500s for 129 models). - Example: +9. **TeardownAsyncOperator limitation**: When using a remote object location, in addition to the ``SetupAsyncOperator``, a ``TeardownAsyncOperator`` is also added to the DAG. This task will delete the SQL files from the remote location by the end of the DAG Run. This is can lead to a limitation from a retry perspective, as described in the issue `#2066 `_. This can be avoided by setting the ``enable_teardown_async_task`` configuration to ``False``, as described in the :ref:`enable_teardown_async_task` section. - .. code-block:: python - DbtDag(..., operator_args={"virtualenv_dir": "dbt_venv"}) +For a comparison between different Cosmos execution modes, please, check the :ref:`execution-modes-comparison` section. diff --git a/docs/getting_started/execution-modes.rst b/docs/getting_started/execution-modes.rst index c8c2eac6f2..1bd6b5bf26 100644 --- a/docs/getting_started/execution-modes.rst +++ b/docs/getting_started/execution-modes.rst @@ -18,6 +18,7 @@ Cosmos can run ``dbt`` commands using five different approaches, called ``execut The choice of the ``execution mode`` can vary based on each user's needs and concerns. For more details, check each execution mode described below. +.. _execution-modes-comparison: .. list-table:: Execution Modes Comparison :widths: 25 25 25 25 @@ -60,11 +61,11 @@ The choice of the ``execution mode`` can vary based on each user's needs and con - High - No * - Airflow Async + - Very Fast - Medium - - None - Yes - * - Local - - Very fast + * - Watcher + - Very Fast - None - Yes @@ -288,11 +289,14 @@ Please refer to the step-by-step guide for using AWS ECS as the execution mode. }, ) +.. _airflow-async-execution-mode: Airflow Async ------------- .. versionadded:: 1.9.0 +Although this execution mode was introduced in Cosmos 1.9, we strongly encourage users to use Cosmos 1.11, which has significant performance improvements. +In comparison to the ``local``, the ``airflow_async`` execution mode can reduce the execution time of a dbt project by up to 36%. The ``airflow_async`` execution mode is a way to run the dbt resources from your dbt project using Apache Airflow's `Deferrable operators `__. @@ -300,37 +304,6 @@ This execution mode could be preferred when you've long running resources and yo leveraging Airflow's deferrable operators. With that, you would be able to potentially observe higher throughput of tasks as more dbt nodes will be run in parallel since they won't be blocking Airflow's worker slots. -In this mode, Cosmos adds a new operator, ``SetupAsyncOperator``, as a root task in the DbtDag or DbtTaskGroup. The task runs -the mocked ``dbt run`` command on your dbt project which then outputs compiled SQLs in the project's target directory. -As part of the same task run, these compiled SQLs are then stored remotely to a remote path set using the -:ref:`remote_target_path` configuration. The remote path is then used by the subsequent tasks in the DAG to -fetch (from the remote path) and run the compiled SQLs asynchronously using e.g. the ``SetupAsyncOperator``. -You may observe that the compile task takes a bit longer to run due to the latency of storing the compiled SQLs -remotely (e.g. for the classic ``jaffle_shop`` dbt project, upon compiling it produces about 31 files measuring about 124KB in total, but on a local -machine it took approximately 25 seconds for the task to compile & upload the compiled SQLs to the remote path)., -however, it is still a win as it is one-time overhead and the subsequent tasks run asynchronously utilising the Airflow's -deferrable operators and supplying to them those compiled SQLs. With this setup task, model tasks no longer require dbt -to be available or installed, eliminating the need to install dbt adapters in the same environment as the Airflow -installation. However, the virtual environment created during execution of the ``SetupAsyncOperator`` must install -the necessary dbt adapter for the setup task to function correctly. This can be achieved by specifying the required -dbt adapter in the ``async_py_requirements`` parameter within the ``ExecutionConfig`` of your ``DbtDag`` or ``DbtTaskGroup``. - -Note that currently, the ``airflow_async`` execution mode has the following limitations: - -1. **Airflow 2.8 or higher required**: This mode relies on Airflow's `Object Storage `__ feature, introduced in Airflow 2.8, to store and retrieve compiled SQLs. -2. **Limited to dbt models**: Only dbt resource type models are run asynchronously using Airflow deferrable operators. Other resource types are executed synchronously, similar to the local execution mode. -3. **BigQuery support only**: This mode only supports BigQuery as the target database. If a different target is specified, Cosmos will throw an error indicating the target database is unsupported in this mode. -4. **ProfileMapping parameter required**: You need to specify the ``ProfileMapping`` parameter in the ``ProfileConfig`` for your DAG. Refer to the example DAG below for details on setting this parameter. -5. **location parameter required**: You must specify the location of the BigQuery dataset in the ``operator_args`` of the ``DbtDag`` or ``DbtTaskGroup``. The example DAG below provides guidance on this. -6. **async_py_requirements parameter required**: If you're using the default approach of having a setup task, you must specify the necessary dbt adapter Python requirements based on your profile type for the async execution mode in the ``ExecutionConfig`` of your ``DbtDag`` or ``DbtTaskGroup``. The example DAG below provides guidance on this. - -To start leveraging async execution mode that is currently supported for the BigQuery profile type targets you need to install Cosmos with the below additional dependencies: - -.. code:: bash - - astronomer-cosmos[dbt-bigquery, google] - - Example DAG: .. literalinclude:: ../../dev/dags/simple_dag_async.py @@ -338,32 +311,7 @@ Example DAG: :start-after: [START airflow_async_execution_mode_example] :end-before: [END airflow_async_execution_mode_example] -**Known Issue:** - -The ``dag test`` command failed with the following error, likely because the trigger does not fully initialize during the ``dag test``, leading to an uninitialized task instance. -This causes the BigQuery trigger to attempt accessing parameters of the Task Instance that are not properly initialized. - -.. code:: bash - - [2024-10-01T18:19:09.726+0530] {base_events.py:1738} ERROR - unhandled exception during asyncio.run() shutdown - task: ()> exception=AttributeError("'NoneType' object has no attribute 'dag_id'")> - Traceback (most recent call last): - File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 138, in run - yield TriggerEvent( - asyncio.exceptions.CancelledError - - During handling of the above exception, another exception occurred: - - Traceback (most recent call last): - File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 157, in run - if self.job_id and self.cancel_on_kill and self.safe_to_cancel(): - File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 126, in safe_to_cancel - task_instance = self.get_task_instance() # type: ignore[call-arg] - File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/utils/session.py", line 97, in wrapper - return func(*args, session=session, **kwargs) - File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 102, in get_task_instance - TaskInstance.dag_id == self.task_instance.dag_id, - AttributeError: 'NoneType' object has no attribute 'dag_id' +For a full step-by-step guide and limitations, check the :ref:`async-execution-mode` page. Watcher Execution Mode (Experimental)