Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 10 additions & 40 deletions azure/durable_functions/tasks/call_activity_with_retry.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from typing import List, Any

from .task_utilities import find_task_scheduled, \
find_task_retry_timer_created, set_processed, parse_history_event, \
find_task_completed, find_task_failed, find_task_retry_timer_fired
from .task_utilities import get_retried_task
from ..models.RetryOptions import RetryOptions
from ..models.Task import (
Task)
from ..models.actions.CallActivityWithRetryAction import \
CallActivityWithRetryAction
from ..models.history import HistoryEvent
from ..models.history import HistoryEvent, HistoryEventType


def call_activity_with_retry_task(
Expand Down Expand Up @@ -37,40 +35,12 @@ def call_activity_with_retry_task(
"""
new_action = CallActivityWithRetryAction(
function_name=name, retry_options=retry_options, input_=input_)
for attempt in range(retry_options.max_number_of_attempts):
task_scheduled = find_task_scheduled(state, name)
task_completed = find_task_completed(state, task_scheduled)
task_failed = find_task_failed(state, task_scheduled)
task_retry_timer = find_task_retry_timer_created(state, task_failed)
task_retry_timer_fired = find_task_retry_timer_fired(
state, task_retry_timer)
set_processed([task_scheduled, task_completed,
task_failed, task_retry_timer, task_retry_timer_fired])

if not task_scheduled:
break

if task_completed:
return Task(
is_completed=True,
is_faulted=False,
action=new_action,
is_played=task_completed._is_played,
result=parse_history_event(task_completed),
timestamp=task_completed.timestamp,
id_=task_completed.TaskScheduledId)

if task_failed and task_retry_timer and attempt + 1 >= \
retry_options.max_number_of_attempts:
return Task(
is_completed=True,
is_faulted=True,
action=new_action,
is_played=task_failed._is_played,
timestamp=task_failed.timestamp,
id_=task_failed.TaskScheduledId,
exc=Exception(
f"{task_failed.Reason} \n {task_failed.Details}")
)

return Task(is_completed=False, is_faulted=False, action=new_action)
return get_retried_task(
state=state,
max_number_of_attempts=retry_options.max_number_of_attempts,
scheduled_type=HistoryEventType.TASK_SCHEDULED,
completed_type=HistoryEventType.TASK_COMPLETED,
failed_type=HistoryEventType.TASK_FAILED,
action=new_action
)
53 changes: 10 additions & 43 deletions azure/durable_functions/tasks/call_suborchestrator_with_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
Task)
from ..models.actions.CallSubOrchestratorWithRetryAction import CallSubOrchestratorWithRetryAction
from ..models.RetryOptions import RetryOptions
from ..models.history import HistoryEvent
from .task_utilities import set_processed, parse_history_event, \
find_sub_orchestration_created, find_sub_orchestration_completed, \
find_sub_orchestration_failed, find_task_retry_timer_fired, find_task_retry_timer_created
from ..models.history import HistoryEvent, HistoryEventType
from .task_utilities import get_retried_task


def call_sub_orchestrator_with_retry_task(
Expand Down Expand Up @@ -40,42 +38,11 @@ def call_sub_orchestrator_with_retry_task(
A Durable Task that completes when the called sub-orchestrator completes or fails.
"""
new_action = CallSubOrchestratorWithRetryAction(name, retry_options, input_, instance_id)
for attempt in range(retry_options.max_number_of_attempts):
task_scheduled = find_sub_orchestration_created(
state, name, context=context, instance_id=instance_id)
task_completed = find_sub_orchestration_completed(state, task_scheduled)
task_failed = find_sub_orchestration_failed(state, task_scheduled)
task_retry_timer = find_task_retry_timer_created(state, task_failed)
task_retry_timer_fired = find_task_retry_timer_fired(
state, task_retry_timer)
set_processed([task_scheduled, task_completed,
task_failed, task_retry_timer, task_retry_timer_fired])

if not task_scheduled:
break

if task_completed is not None:
return Task(
is_completed=True,
is_faulted=False,
action=new_action,
is_played=task_completed._is_played,
result=parse_history_event(task_completed),
timestamp=task_completed.timestamp,
id_=task_completed.TaskScheduledId)

if task_failed and task_retry_timer and attempt + 1 >= \
retry_options.max_number_of_attempts:
return Task(
is_completed=True,
is_faulted=True,
action=new_action,
is_played=task_failed._is_played,
result=task_failed.Reason,
timestamp=task_failed.timestamp,
id_=task_failed.TaskScheduledId,
exc=Exception(
f"{task_failed.Reason} \n {task_failed.Details}")
)

return Task(is_completed=False, is_faulted=False, action=new_action)
return get_retried_task(
state=state,
max_number_of_attempts=retry_options.max_number_of_attempts,
scheduled_type=HistoryEventType.SUB_ORCHESTRATION_INSTANCE_CREATED,
completed_type=HistoryEventType.SUB_ORCHESTRATION_INSTANCE_COMPLETED,
failed_type=HistoryEventType.SUB_ORCHESTRATION_INSTANCE_FAILED,
action=new_action
)
94 changes: 94 additions & 0 deletions azure/durable_functions/tasks/task_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from azure.functions._durable_functions import _deserialize_custom_object
from datetime import datetime
from typing import List, Optional
from ..models.actions.Action import Action
from ..models.Task import Task


def should_suspend(partial_result) -> bool:
Expand Down Expand Up @@ -410,3 +412,95 @@ def should_preserve(event: HistoryEvent) -> bool:
# We should try to refactor this logic at some point
event = matches[0]
return event


def get_retried_task(
state: List[HistoryEvent], max_number_of_attempts: int, scheduled_type: HistoryEventType,
completed_type: HistoryEventType, failed_type: HistoryEventType, action: Action) -> Task:
"""Determine the state of scheduling some task for execution with retry options.

Parameters
----------
state: List[HistoryEvent]
The list of history events
max_number_of_ints: int
The maximum number of retrying attempts
scheduled_type: HistoryEventType
The event type corresponding to scheduling the searched-for task
completed_type: HistoryEventType
The event type corresponding to a completion of the searched-for task
failed_type: HistoryEventType
The event type coresponding to the failure of the searched-for task
action: Action
The action corresponding to the searched-for task

Returns
-------
Task
A Task encompassing the state of the scheduled work item, that is,
either completed, failed, or incomplete.
"""
# tasks to look for in the state array
scheduled_task, completed_task = None, None
failed_task, scheduled_timer_task = None, None
attempt = 1

# Note each case below is exclusive, and the order matters
for event in state:
event_type = HistoryEventType(event.event_type)

# Skip processed events
if event.is_processed:
continue

# first we find the scheduled_task
elif scheduled_task is None:
if event_type is scheduled_type:
scheduled_task = event

# if the task has a correponding completion, we process the events
# and return a completed task
elif event_type == completed_type and \
event.TaskScheduledId == scheduled_task.event_id:
completed_task = event
set_processed([scheduled_task, completed_task])
return Task(
is_completed=True,
is_faulted=False,
action=action,
result=parse_history_event(completed_task),
timestamp=completed_task.timestamp,
id_=completed_task.TaskScheduledId
)

# if its failed, we'll have to wait for an upcoming timer scheduled
elif failed_task is None:
if event_type is failed_type:
if event.TaskScheduledId == scheduled_task.event_id:
failed_task = event

# if we have a timer scheduled, we'll have to find a timer fired
elif scheduled_timer_task is None:
if event_type is HistoryEventType.TIMER_CREATED:
scheduled_timer_task = event

# if we have a timer fired, we check if we still have more attempts for retries.
# If so, we retry again and clear our found events so far.
# If not, we process the events and return a completed task
elif event_type is HistoryEventType.TIMER_FIRED:
if event.TimerId == scheduled_timer_task.event_id:
set_processed([scheduled_task, completed_task, failed_task, scheduled_timer_task])
if attempt >= max_number_of_attempts:
return Task(
is_completed=True,
is_faulted=True,
action=action,
timestamp=failed_task.timestamp,
id_=failed_task.TaskScheduledId,
exc=Exception(
f"{failed_task.Reason} \n {failed_task.Details}")
)
else:
scheduled_task, failed_task, scheduled_timer_task = None, None, None
attempt += 1
return Task(is_completed=False, is_faulted=False, action=action)
Loading