Skip to content
62 changes: 57 additions & 5 deletions docs/getting_started/watcher-kubernetes-execution-mode.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ The following example shows how to configure a ``DbtDag`` with ``ExecutionMode.W
execution_mode=ExecutionMode.WATCHER_KUBERNETES,
dbt_project_path=K8S_PROJECT_DIR,
),
operator_args=operator_args,
operator_args={
"image": DBT_IMAGE,
"get_logs": True,
"log_events_on_failure": True,
},
)

**Key differences from** ``ExecutionMode.KUBERNETES``:
Expand Down Expand Up @@ -97,15 +101,63 @@ Please ensure you have a compatible version installed:

pip install "apache-airflow-providers-cncf-kubernetes>10.7.0"

We successfully tested against the most recent release of the provider (`10.12.2 <https://pypi.org/project/apache-airflow-providers-cncf-kubernetes/10.12.2/>`_).

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Producer watcher does not support deferrable mode
Support for KPO deferrable mode
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Similar to ``ExecutionMode.WATCHER``, the ``ExecutionMode.WATCHER_KUBERNETES`` producer task, ``DbtProducerWatcherKubernetesSensor``, runs using synchronous mode (``deferrable=False``).
The producer node created by the ``ExecutionMode.WATCHER_KUBERNETES`` producer task can be set to deferrable mode as long as:

- The correct version of Airflow Kubernetes is installed (``>=10.12.2``). This version fixed a bug (`PR <https://github.com/apache/airflow/pull/58684>`_) that prevented setting callbacks and parsing the logs when the Kubernetes Operator run using ``deferrable``. The experience should be further improved once `this other PR is merged <https://github.com/apache/airflow/pull/60778>`_.

.. code-block:: bash

pip install "apache-airflow-providers-cncf-kubernetes>=10.12.2"

- The arguments ``deferrable=True`` and ``is_delete_operator_pod=True`` are set:

.. code-block:: python

dag = DbtDag(
dag_id="jaffle_shop_watcher_kubernetes",
# ... other DAG parameters ...
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.WATCHER_KUBERNETES,
dbt_project_path=K8S_PROJECT_DIR,
),
operator_args={
"deferrable": True,
"is_delete_operator_pod": True,
"image": DBT_IMAGE,
"get_logs": True,
"log_events_on_failure": True,
},
)

Conversely, the consumer tasks that subclass ``DbtConsumerWatcherKubernetesSensor`` run in deferrable mode by default when operating as a sensor. They can also operate in deferrable mode if they are running dbt themselves upon retry.

~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Mandatory ``operator_args``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This was a limitation in the Airflow kubernetes provider, which was fixed in `this PR <https://github.com/apache/airflow/pull/58684>`_, and we'll be updating Cosmos once it is released.
The ``operator_args`` must define ``get_logs`` and ``log_events_on_failure``:

Conversely, the consumer tasks, ``DbtConsumerWatcherKubernetesSensor``, run in deferrable mode by default when they operate as sensors.
.. code-block: python
Comment thread
tatiana marked this conversation as resolved.

dag = DbtDag(
dag_id="jaffle_shop_watcher_kubernetes",
# ... other DAG parameters ...
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.WATCHER_KUBERNETES,
dbt_project_path=K8S_PROJECT_DIR,
),
operator_args={
# ... other KPO mandatory args ...
"get_logs": True,
"log_events_on_failure": True,
},
)


~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down