diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 6392ef9697647..b855d0e69c7ff 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -81,7 +81,7 @@ from airflow.ti_deps.deps.not_previously_skipped_dep import NotPreviouslySkippedDep from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep -from airflow.triggers.base import BaseTrigger +from airflow.triggers.base import BaseTrigger, TriggerEvent from airflow.utils import timezone from airflow.utils.context import Context from airflow.utils.decorators import fixup_decorator_warning_stack @@ -1574,7 +1574,7 @@ def defer( self, *, trigger: BaseTrigger, - method_name: str, + method_name: str = "execute_complete_default", kwargs: dict[str, Any] | None = None, timeout: timedelta | None = None, ): @@ -1587,6 +1587,15 @@ def defer( """ raise TaskDeferred(trigger=trigger, method_name=method_name, kwargs=kwargs, timeout=timeout) + def execute_complete_default(self, context: Context, event: dict[str, Any]): + """The default method for handling the event returned after the deferred operation completes.""" + op_name = type(self).__name__ + if event["status"] != TriggerEvent.STATUS_SUCCESS: + raise AirflowException(f"{op_name}'s deferred operation was not completed successfully: {event}") + else: + self.log.info("% completed successfully", op_name) + return event.get("value") + def unmap(self, resolve: None | dict[str, Any] | tuple[Context, Session]) -> BaseOperator: """Get the "normal" operator from the current operator. diff --git a/airflow/triggers/base.py b/airflow/triggers/base.py index 314d97b0ee919..8ef81506c2366 100644 --- a/airflow/triggers/base.py +++ b/airflow/triggers/base.py @@ -109,9 +109,20 @@ class TriggerEvent: events. """ + STATUS_SUCCESS = "success" + def __init__(self, payload: Any): self.payload = payload + @classmethod + def success(cls, value: Any = None) -> TriggerEvent: + """ + Creates a TriggerEvent to be returned by a deferred operation that completed successfully + + :param value: the value to be returned by the operator on completion + """ + return TriggerEvent({"status": cls.STATUS_SUCCESS, "value": value}) + def __repr__(self) -> str: return f"TriggerEvent<{self.payload!r}>"