Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions docs/apache-airflow/authoring-and-scheduling/deferring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,38 +56,44 @@ Writing a deferrable operator takes a bit more work. There are some main points
* You can defer multiple times, and you can defer before/after your Operator does significant work, or only defer if certain conditions are met (e.g. a system does not have an immediate answer). Deferral is entirely under your control.
* Any Operator can defer; no special marking on its class is needed, and it's not limited to Sensors.
* In order for any changes to a Trigger to be reflected, the *triggerer* needs to be restarted whenever the Trigger is modified.
* If you want add an operator or sensor that supports both deferrable and non-deferrable modes. It's suggested to add ``deferable: bool = conf.getboolean("operators", "default_deferrable", fallback=False)`` to the ``__init__`` method of the operator and use it to decide whether to run the operator in deferrable mode. You'll be able to configure the default value of ``deferrable`` of all the operators and sensors that supports switch between deferrable and non-deferrable mode through ``default_deferrable`` in the ``operator`` section. Here's an example of a sensor that supports both modes.::
* If you want to add an operator or sensor that supports both deferrable and non-deferrable modes, it's suggested to add ``deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False)`` to the ``__init__`` method of the operator and use it to decide whether to run the operator in deferrable mode. You'll be able to configure the default value of ``deferrable`` of all the operators and sensors that support switching between deferrable and non-deferrable mode through ``default_deferrable`` in the ``operator`` section. Here's an example of a sensor that supports both modes.

.. code-block:: python

import time
from datetime import timedelta
from typing import Any

from airflow.configuration import conf
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context


class WaitOneHourSensor(BaseSensorOperator):
def __init__(
self,
deferable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs
):
self, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs
) -> None:
super().__init__(**kwargs)
self.deferrable = deferable

def execute(self, context):
if deferrable:
def execute(self, context: Context) -> None:
if self.deferrable:
self.defer(
trigger=TimeDeltaTrigger(timedelta(hours=1)),
method_name="execute_complete"
method_name="execute_complete",
)
else:
time.sleep(3600)

def execute_complete(self, context, event=None):
def execute_complete(
self,
context: Context,
event: dict[str, Any] | None = None,
) -> None:
# We have no more work to do here. Mark as complete.
return


Triggering Deferral
~~~~~~~~~~~~~~~~~~~

Expand All @@ -106,19 +112,23 @@ If your Operator returns from either its first ``execute()`` method when it's ne

You are free to set ``method_name`` to ``execute`` if you want your Operator to have one entrypoint, but it, too, will have to accept ``event`` as an optional keyword argument.

Here's a basic example of how a sensor might trigger deferral::
Here's a basic example of how a sensor might trigger deferral

.. code-block:: python

from datetime import timedelta
from typing import Any

from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context


class WaitOneHourSensor(BaseSensorOperator):
def execute(self, context):
def execute(self, context: Context) -> None:
self.defer(trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete")

def execute_complete(self, context, event=None):
def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
# We have no more work to do here. Mark as complete.
return

Expand Down Expand Up @@ -151,8 +161,9 @@ There's also some design constraints to be aware of:
Currently Triggers are only used up to their first event, as they are only used for resuming deferred tasks (which happens on the first event fired). However, we plan to allow DAGs to be launched from triggers in future, which is where multi-event triggers will be more useful.


Here's the structure of a basic Trigger::
Here's the structure of a basic Trigger

.. code-block:: python

import asyncio

Expand All @@ -161,7 +172,6 @@ Here's the structure of a basic Trigger::


class DateTimeTrigger(BaseTrigger):

def __init__(self, moment):
super().__init__()
self.moment = moment
Expand All @@ -174,6 +184,7 @@ Here's the structure of a basic Trigger::
await asyncio.sleep(1)
yield TriggerEvent(self.moment)


This is a very simplified version of Airflow's ``DateTimeTrigger``, and you can see several things here:

* ``__init__`` and ``serialize`` are written as a pair; the Trigger is instantiated once when it is submitted by the Operator as part of its deferral request, then serialized and re-instantiated on any *triggerer* process that runs the trigger.
Expand Down