Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
from airflow.ti_deps.deps.not_previously_skipped_dep import NotPreviouslySkippedDep
from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
from airflow.triggers.base import BaseTrigger
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils import timezone
from airflow.utils.context import Context
from airflow.utils.decorators import fixup_decorator_warning_stack
Expand Down Expand Up @@ -1574,7 +1574,7 @@ def defer(
self,
*,
trigger: BaseTrigger,
method_name: str,
method_name: str = "execute_complete_default",
kwargs: dict[str, Any] | None = None,
timeout: timedelta | None = None,
):
Expand All @@ -1587,6 +1587,15 @@ def defer(
"""
raise TaskDeferred(trigger=trigger, method_name=method_name, kwargs=kwargs, timeout=timeout)

def execute_complete_default(self, context: Context, event: dict[str, Any]):
"""The default method for handling the event returned after the deferred operation completes."""
op_name = type(self).__name__
if event["status"] != TriggerEvent.STATUS_SUCCESS:
raise AirflowException(f"{op_name}'s deferred operation was not completed successfully: {event}")
else:
self.log.info("% completed successfully", op_name)
return event.get("value")

def unmap(self, resolve: None | dict[str, Any] | tuple[Context, Session]) -> BaseOperator:
"""Get the "normal" operator from the current operator.

Expand Down
11 changes: 11 additions & 0 deletions airflow/triggers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,20 @@ class TriggerEvent:
events.
"""

STATUS_SUCCESS = "success"

def __init__(self, payload: Any):
self.payload = payload

@classmethod
def success(cls, value: Any = None) -> TriggerEvent:
"""
Creates a TriggerEvent to be returned by a deferred operation that completed successfully

:param value: the value to be returned by the operator on completion
"""
return TriggerEvent({"status": cls.STATUS_SUCCESS, "value": value})

def __repr__(self) -> str:
return f"TriggerEvent<{self.payload!r}>"

Expand Down