diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 8061718b..dde2d018 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -1,24 +1,39 @@ +from collections import defaultdict +from azure.durable_functions.models.actions.SignalEntityAction import SignalEntityAction +from azure.durable_functions.models.actions.CallEntityAction import CallEntityAction +from azure.durable_functions.models.Task import TaskBase +from azure.durable_functions.models.actions.CallHttpAction import CallHttpAction +from azure.durable_functions.models.DurableHttpRequest import DurableHttpRequest +from azure.durable_functions.models.actions.CallSubOrchestratorWithRetryAction import \ + CallSubOrchestratorWithRetryAction +from azure.durable_functions.models.actions.CallActivityWithRetryAction import \ + CallActivityWithRetryAction +from azure.durable_functions.models.actions.ContinueAsNewAction import \ + ContinueAsNewAction +from azure.durable_functions.models.actions.WaitForExternalEventAction import \ + WaitForExternalEventAction +from azure.durable_functions.models.actions.CallSubOrchestratorAction import \ + CallSubOrchestratorAction +from azure.durable_functions.models.actions.CreateTimerAction import CreateTimerAction +from azure.durable_functions.models.Task import WhenAllTask, WhenAnyTask, AtomicTask, \ + RetryAbleTask +from azure.durable_functions.models.actions.CallActivityAction import CallActivityAction from azure.durable_functions.models.ReplaySchema import ReplaySchema import json import datetime import inspect -from typing import List, Any, Dict, Optional -from uuid import UUID, uuid5, NAMESPACE_URL +from typing import DefaultDict, List, Any, Dict, Optional, Tuple, Union +from uuid import UUID, uuid5, NAMESPACE_URL, NAMESPACE_OID from datetime import timezone from .RetryOptions import RetryOptions -from .TaskSet import TaskSet from .FunctionContext import FunctionContext from .history import HistoryEvent, HistoryEventType from .actions import Action -from ..models.Task import Task from ..models.TokenSource import TokenSource from .utils.entity_utils import EntityId -from ..tasks import call_activity_task, task_all, task_any, call_activity_with_retry_task, \ - wait_for_external_event_task, continue_as_new, new_uuid, call_http, create_timer_task, \ - call_sub_orchestrator_task, call_sub_orchestrator_with_retry_task, call_entity_task, \ - signal_entity_task from azure.functions._durable_functions import _deserialize_custom_object +from azure.durable_functions.constants import DATETIME_STRING_FORMAT class DurableOrchestrationContext: @@ -47,16 +62,21 @@ def __init__(self, self.decision_started_event.timestamp self._new_uuid_counter = 0 self._function_context: FunctionContext = FunctionContext(**kwargs) + self._sequence_number = 0 self._replay_schema = ReplaySchema(upperSchemaVersion) - self.actions: List[List[Action]] = [] - if self._replay_schema == ReplaySchema.V2: - self.actions.append([]) + + self._action_payload_v1: List[List[Action]] = [] + self._action_payload_v2: List[Action] = [] # make _input always a string # (consistent with Python Functions generic trigger/input bindings) if (isinstance(input, Dict)): input = json.dumps(input) + self._input: Any = input + self.open_tasks: DefaultDict[Union[int, str], Union[List[TaskBase], TaskBase]] + self.open_tasks = defaultdict(list) + self.deferred_tasks: Dict[Union[int, str], Tuple[HistoryEvent, bool, str]] = {} @classmethod def from_json(cls, json_string: str): @@ -73,11 +93,56 @@ def from_json(cls, json_string: str): New instance of the durable orchestration context class """ # We should consider parsing the `Input` field here as well, - # intead of doing so lazily when `get_input` is called. + # instead of doing so lazily when `get_input` is called. json_dict = json.loads(json_string) return cls(**json_dict) - def call_activity(self, name: str, input_: Optional[Any] = None) -> Task: + def _generate_task(self, action: Action, + retry_options: Optional[RetryOptions] = None, + id_: Optional[Union[int, str]] = None, + parent: Optional[TaskBase] = None) -> Union[AtomicTask, RetryAbleTask]: + """Generate an atomic or retryable Task based on an input. + + Parameters + ---------- + action : Action + The action backing the Task. + retry_options : Optional[RetryOptions] + RetryOptions for a with-retry task, by default None + + Returns + ------- + Union[AtomicTask, RetryAbleTask] + Either an atomic task or a retry-able task + """ + # Create an atomic task + task: Union[AtomicTask, RetryAbleTask] + action_payload: Union[Action, List[Action]] + + # TODO: find cleanear way to do this + if self._replay_schema is ReplaySchema.V1: + action_payload = [action] + else: + action_payload = action + task = AtomicTask(id_, action_payload) + task.parent = parent + + # if task is retryable, provide the retryable wrapper class + if not(retry_options is None): + task = RetryAbleTask(task, retry_options, self) + return task + + def _set_is_replaying(self, is_replaying: bool): + """Set the internal `is_replaying` flag. + + Parameters + ---------- + is_replaying : bool + New value of the `is_replaying` flag + """ + self._is_replaying = is_replaying + + def call_activity(self, name: str, input_: Optional[Any] = None) -> TaskBase: """Schedule an activity for execution. Parameters @@ -92,14 +157,13 @@ def call_activity(self, name: str, input_: Optional[Any] = None) -> Task: Task A Durable Task that completes when the called activity function completes or fails. """ - return call_activity_task( - state=self.histories, - name=name, - input_=input_) + action = CallActivityAction(name, input_) + task = self._generate_task(action) + return task def call_activity_with_retry(self, name: str, retry_options: RetryOptions, - input_: Optional[Any] = None) -> Task: + input_: Optional[Any] = None) -> TaskBase: """Schedule an activity for execution with retry options. Parameters @@ -117,15 +181,13 @@ def call_activity_with_retry(self, A Durable Task that completes when the called activity function completes or fails completely. """ - return call_activity_with_retry_task( - state=self.histories, - retry_options=retry_options, - name=name, - input_=input_) + action = CallActivityWithRetryAction(name, retry_options, input_) + task = self._generate_task(action, retry_options) + return task def call_http(self, method: str, uri: str, content: Optional[str] = None, headers: Optional[Dict[str, str]] = None, - token_source: TokenSource = None) -> Task: + token_source: TokenSource = None) -> TaskBase: """Schedule a durable HTTP call to the specified endpoint. Parameters @@ -146,13 +208,20 @@ def call_http(self, method: str, uri: str, content: Optional[str] = None, Task The durable HTTP request to schedule. """ - return call_http( - state=self.histories, method=method, uri=uri, content=content, headers=headers, - token_source=token_source) + json_content: Optional[str] = None + if content and content is not isinstance(content, str): + json_content = json.dumps(content) + else: + json_content = content + + request = DurableHttpRequest(method, uri, json_content, headers, token_source) + action = CallHttpAction(request) + task = self._generate_task(action) + return task def call_sub_orchestrator(self, name: str, input_: Optional[Any] = None, - instance_id: Optional[str] = None) -> Task: + instance_id: Optional[str] = None) -> TaskBase: """Schedule sub-orchestration function named `name` for execution. Parameters @@ -169,17 +238,14 @@ def call_sub_orchestrator(self, Task A Durable Task that completes when the called sub-orchestrator completes or fails. """ - return call_sub_orchestrator_task( - context=self, - state=self.histories, - name=name, - input_=input_, - instance_id=instance_id) + action = CallSubOrchestratorAction(name, input_, instance_id) + task = self._generate_task(action) + return task def call_sub_orchestrator_with_retry(self, name: str, retry_options: RetryOptions, input_: Optional[Any] = None, - instance_id: Optional[str] = None) -> Task: + instance_id: Optional[str] = None) -> TaskBase: """Schedule sub-orchestration function named `name` for execution, with retry-options. Parameters @@ -198,13 +264,9 @@ def call_sub_orchestrator_with_retry(self, Task A Durable Task that completes when the called sub-orchestrator completes or fails. """ - return call_sub_orchestrator_with_retry_task( - context=self, - state=self.histories, - retry_options=retry_options, - name=name, - input_=input_, - instance_id=instance_id) + action = CallSubOrchestratorWithRetryAction(name, retry_options, input_, instance_id) + task = self._generate_task(action, retry_options) + return task def get_input(self) -> Optional[Any]: """Get the orchestration input.""" @@ -224,9 +286,17 @@ def new_uuid(self) -> str: str New UUID that is safe for replay within an orchestration or operation. """ - return new_uuid(context=self) + URL_NAMESPACE: str = "9e952958-5e33-4daf-827f-2fa12937b875" + + uuid_name_value = \ + f"{self._instance_id}" \ + f"_{self.current_utc_datetime.strftime(DATETIME_STRING_FORMAT)}" \ + f"_{self._new_uuid_counter}" + self._new_uuid_counter += 1 + namespace_uuid = uuid5(NAMESPACE_OID, URL_NAMESPACE) + return str(uuid5(namespace_uuid, uuid_name_value)) - def task_all(self, activities: List[Task]) -> TaskSet: + def task_all(self, activities: List[TaskBase]) -> TaskBase: """Schedule the execution of all activities. Similar to Promise.all. When called with `yield` or `return`, returns an @@ -244,9 +314,9 @@ def task_all(self, activities: List[Task]) -> TaskSet: TaskSet The results of all activities. """ - return task_all(tasks=activities, replay_schema=self._replay_schema) + return WhenAllTask(activities, replay_schema=self._replay_schema) - def task_any(self, activities: List[Task]) -> TaskSet: + def task_any(self, activities: List[TaskBase]) -> TaskBase: """Schedule the execution of all activities. Similar to Promise.race. When called with `yield` or `return`, returns @@ -264,7 +334,7 @@ def task_any(self, activities: List[Task]) -> TaskSet: TaskSet The first [[Task]] instance to complete. """ - return task_any(tasks=activities, replay_schema=self._replay_schema) + return WhenAnyTask(activities, replay_schema=self._replay_schema) def set_custom_status(self, status: Any): """Set the customized orchestration status for your orchestrator function. @@ -385,7 +455,24 @@ def call_entity(self, entityId: EntityId, Task A Task of the entity call """ - return call_entity_task(self.histories, entityId, operationName, operationInput) + action = CallEntityAction(entityId, operationName, operationInput) + task = self._generate_task(action) + return task + + def _record_fire_and_forget_action(self, action: Action): + """Append a responseless-API action object to the actions array. + + Parameters + ---------- + action : Action + The action to append + """ + new_action: Union[List[Action], Action] + if self._replay_schema is ReplaySchema.V2: + new_action = action + else: + new_action = [action] + self._add_to_actions(new_action) def signal_entity(self, entityId: EntityId, operationName: str, operationInput: Optional[Any] = None): @@ -405,14 +492,17 @@ def signal_entity(self, entityId: EntityId, Task A Task of the entity signal """ - return signal_entity_task(self, self.histories, entityId, operationName, operationInput) + action = SignalEntityAction(entityId, operationName, operationInput) + task = self._generate_task(action) + self._record_fire_and_forget_action(action) + return task @property def will_continue_as_new(self) -> bool: """Return true if continue_as_new was called.""" return self._continue_as_new_flag - def create_timer(self, fire_at: datetime.datetime) -> Task: + def create_timer(self, fire_at: datetime.datetime) -> TaskBase: """Create a Durable Timer Task to implement a deadline at which to wake-up the orchestrator. Parameters @@ -422,12 +512,14 @@ def create_timer(self, fire_at: datetime.datetime) -> Task: Returns ------- - TimerTask + TaskBase A Durable Timer Task that schedules the timer to wake up the activity """ - return create_timer_task(state=self.histories, fire_at=fire_at) + action = CreateTimerAction(fire_at) + task = self._generate_task(action) + return task - def wait_for_external_event(self, name: str) -> Task: + def wait_for_external_event(self, name: str) -> TaskBase: """Wait asynchronously for an event to be raised with the name `name`. Parameters @@ -440,7 +532,9 @@ def wait_for_external_event(self, name: str) -> Task: Task Task to wait for the event """ - return wait_for_external_event_task(state=self.histories, name=name) + action = WaitForExternalEventAction(name) + task = self._generate_task(action, id_=name) + return task def continue_as_new(self, input_: Any): """Schedule the orchestrator to continue as new. @@ -450,7 +544,9 @@ def continue_as_new(self, input_: Any): input_ : Any The new starting input to the orchestrator. """ - return continue_as_new(context=self, input_=input_) + continue_as_new_action: Action = ContinueAsNewAction(input_) + self._record_fire_and_forget_action(continue_as_new_action) + self._continue_as_new_flag = True def new_guid(self) -> UUID: """Generate a replay-safe GUID. @@ -466,6 +562,41 @@ def new_guid(self) -> UUID: guid = uuid5(NAMESPACE_URL, guid_name) return guid + @property + def _actions(self) -> List[List[Action]]: + """Get the actions payload of this context, for replay in the extension. + + Returns + ------- + List[List[Action]] + The actions of this context + """ + if self._replay_schema is ReplaySchema.V1: + return self._action_payload_v1 + else: + return [self._action_payload_v2] + + def _add_to_actions(self, action_repr: Union[List[Action], Action]): + """Add a Task's actions payload to the context's actions array. + + Parameters + ---------- + action_repr : Union[List[Action], Action] + The tasks to add + """ + # Do not add further actions after `continue_as_new` has been + # called + if self.will_continue_as_new: + return + + if self._replay_schema is ReplaySchema.V1 and isinstance(action_repr, list): + self._action_payload_v1.append(action_repr) + elif self._replay_schema is ReplaySchema.V2 and isinstance(action_repr, Action): + self._action_payload_v2.append(action_repr) + else: + raise Exception(f"DF-internal exception: ActionRepr of signature {type(action_repr)}" + f"is not compatible on ReplaySchema {self._replay_schema.name}. ") + def _pretty_print_history(self) -> str: """Get a pretty-printed version of the orchestration's internal history.""" def history_to_string(event): @@ -477,3 +608,20 @@ def history_to_string(event): json_dict[key] = val return json.dumps(json_dict) return str(list(map(history_to_string, self._histories))) + + def _add_to_open_tasks(self, task: TaskBase): + + if isinstance(task, AtomicTask): + if task.id is None: + task.id = self._sequence_number + self._sequence_number += 1 + self.open_tasks[task.id] = task + elif task.id != -1: + self.open_tasks[task.id].append(task) + + if task.id in self.deferred_tasks: + task_update_action = self.deferred_tasks[task.id] + task_update_action() + else: + for child in task.children: + self._add_to_open_tasks(child) diff --git a/azure/durable_functions/models/Task.py b/azure/durable_functions/models/Task.py index 1c08583a..5749d798 100644 --- a/azure/durable_functions/models/Task.py +++ b/azure/durable_functions/models/Task.py @@ -1,76 +1,328 @@ -from datetime import datetime +from azure.durable_functions.models.actions.NoOpAction import NoOpAction +from azure.durable_functions.models.actions.CompoundAction import CompoundAction +from azure.durable_functions.models.RetryOptions import RetryOptions +from azure.durable_functions.models.ReplaySchema import ReplaySchema +from azure.durable_functions.models.actions.Action import Action +from azure.durable_functions.models.actions.WhenAnyAction import WhenAnyAction +from azure.durable_functions.models.actions.WhenAllAction import WhenAllAction -from .actions.Action import Action +import enum +from typing import Any, List, Optional, Set, Type, Union -class Task: - """Represents some pending action. +class TaskState(enum.Enum): + """The possible states that a Task can be in.""" - Similar to a native JavaScript promise in - that it acts as a placeholder for outstanding asynchronous work, but has - a synchronous implementation and is specific to Durable Functions. + RUNNING = 0 + SUCCEEDED = 1 + FAILED = 2 - Tasks are only returned to an orchestration function when a - [[DurableOrchestrationContext]] operation is not called with `yield`. They - are useful for parallelization and timeout operations in conjunction with - Task.all and Task.any. + +class TaskBase: + """The base class of all Tasks. + + Contains shared logic that drives all of its sub-classes. Should never be + instantiated on its own. """ - def __init__(self, is_completed, is_faulted, action, - result=None, timestamp=None, id_=None, exc=None, is_played=False): - self._is_completed: bool = is_completed - self._is_faulted: bool = is_faulted - self._action: Action = action - self._result = result - self._timestamp: datetime = timestamp - self._id = id_ - self._exception = exc - self._is_played = is_played - self._is_yielded: bool = False - - @property - def is_completed(self) -> bool: - """Get indicator whether the task has completed. - - Note that completion is not equivalent to success. + def __init__(self, id_: Union[int, str], actions: Union[List[Action], Action]): + """Initialize the TaskBase. + + Parameters + ---------- + id_ : int + An ID for the task + actions : List[Any] + The list of DF actions representing this Task. + Needed for reconstruction in the extension. """ - return self._is_completed + self.id: Union[int, str] = id_ + self.state = TaskState.RUNNING + self.parent: Optional[CompoundTask] = None + self._api_name: str + + api_action: Union[Action, Type[CompoundAction]] + if isinstance(actions, list): + if len(actions) == 1: + api_action = actions[0] + else: + api_action = CompoundAction + else: + api_action = actions - @property - def is_faulted(self) -> bool: - """Get indicator whether the task faulted in some way due to error.""" - return self._is_faulted + self._api_name = api_action.__class__.__name__ - @property - def action(self) -> Action: - """Get the scheduled action represented by the task. + self.result: Any = None + self.action_repr: Union[List[Action], Action] = actions + self.is_played = False - _Internal use only._ + def set_is_played(self, is_played: bool): + """Set the is_played flag for the Task. + + Needed for updating the orchestrator's is_replaying flag. + + Parameters + ---------- + is_played : bool + Whether the latest event for this Task has been played before. """ - return self._action + self.is_played = is_played + + def change_state(self, state: TaskState): + """Transition a running Task to a terminal state: success or failure. - @property - def result(self) -> object: - """Get the result of the task, if completed. Otherwise `None`.""" - return self._result + Parameters + ---------- + state : TaskState + The terminal state to assign to this Task - @property - def timestamp(self) -> datetime: - """Get the timestamp of the task.""" - return self._timestamp + Raises + ------ + Exception + When the input state is RUNNING + """ + if state is TaskState.RUNNING: + raise Exception("Cannot change Task to the RUNNING state.") + self.state = state + + def set_value(self, is_error: bool, value: Any): + """Set the value of this Task: either an exception of a result. - @property - def id(self): - """Get the ID number of the task. + Parameters + ---------- + is_error : bool + Whether the value represents an exception of a result. + value : Any + The value of this Task - _Internal use only._ + Raises + ------ + Exception + When the Task failed but its value was not an Exception """ - return self._id + new_state = self.state + if is_error: + if not isinstance(value, Exception): + if not (isinstance(value, TaskBase) and isinstance(value.result, Exception)): + err_message = f"Task ID {self.id} failed but it's value was not an Exception" + raise Exception(err_message) + new_state = TaskState.FAILED + else: + new_state = TaskState.SUCCEEDED + self.change_state(new_state) + self.result = value + self.propagate() + + def propagate(self): + """Notify parent Task of this Task's state change.""" + has_completed = not (self.state is TaskState.RUNNING) + has_parent = not (self.parent is None) + if has_completed and has_parent: + self.parent.handle_completion(self) + + +class CompoundTask(TaskBase): + """A Task of Tasks. + + Contains shared logic that drives all of its sub-classes. + Should never be instantiated on its own. + """ - @property - def exception(self): - """Get the error thrown when attempting to perform the task's action. + def __init__(self, tasks: List[TaskBase], compound_action_constructor=None): + """Instantiate CompoundTask attributes. - If the Task has not yet completed or has completed successfully, `None` + Parameters + ---------- + tasks : List[Task] + The children/sub-tasks of this Task + compound_action_constructor : Union[WhenAllAction, WhenAnyAction, None] + Either None or, a WhenAllAction or WhenAnyAction constructor. + It is None when using the V1 replay protocol, where no Compound Action + objects size and compound actions are represented as arrays of actions. + It is not None when using the V2 replay protocol. """ - return self._exception + super().__init__(-1, []) + child_actions = [] + for task in tasks: + task.parent = self + action_repr = task.action_repr + if isinstance(action_repr, list): + child_actions.extend(action_repr) + else: + child_actions.append(action_repr) + if compound_action_constructor is None: + self.action_repr = child_actions + else: # replay_schema is ReplaySchema.V2 + self.action_repr = compound_action_constructor(child_actions) + self._first_error: Optional[Exception] = None + self.pending_tasks: Set[TaskBase] = set(tasks) + self.completed_tasks: List[TaskBase] = [] + self.children = tasks + + def handle_completion(self, child: TaskBase): + """Manage sub-task completion events. + + Parameters + ---------- + child : TaskBase + The sub-task that completed + + Raises + ------ + Exception + When the calling sub-task was not registered + with this Task's pending sub-tasks. + """ + try: + self.pending_tasks.remove(child) + except KeyError: + raise Exception( + f"Parent Task {self.id} does not have pending sub-task with ID {child.id}." + f"This most likely means that Task {child.id} completed twice.") + + self.completed_tasks.append(child) + self.set_is_played(child.is_played) + self.try_set_value(child) + + def try_set_value(self, child: TaskBase): + """Transition a CompoundTask to a terminal state and set its value. + + Should be implemented by sub-classes. + + Parameters + ---------- + child : TaskBase + A sub-task that just completed + + Raises + ------ + NotImplementedError + This method needs to be implemented by each subclass. + """ + raise NotImplementedError + + +class AtomicTask(TaskBase): + """A Task with no subtasks.""" + + pass + + +class WhenAllTask(CompoundTask): + """A Task representing `when_all` scenarios.""" + + def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema): + """Initialize a WhenAllTask. + + Parameters + ---------- + task : List[Task] + The list of child tasks + replay_schema : ReplaySchema + The ReplaySchema, which determines the inner action payload representation + """ + compound_action_constructor = None + if replay_schema is ReplaySchema.V2: + compound_action_constructor = WhenAllAction + super().__init__(task, compound_action_constructor) + + def try_set_value(self, child: TaskBase): + """Transition a WhenAll Task to a terminal state and set its value. + + Parameters + ---------- + child : TaskBase + A sub-task that just completed + """ + if child.state is TaskState.SUCCEEDED: + # A WhenAll Task only completes when it has no pending tasks + # i.e _when all_ of its children have completed + if len(self.pending_tasks) == 0: + results = list(map(lambda x: x.result, self.completed_tasks)) + self.set_value(is_error=False, value=results) + else: # child.state is TaskState.FAILED: + # a single error is sufficient to fail this task + if self._first_error is None: + self._first_error = child.result + self.set_value(is_error=True, value=self._first_error) + + +class WhenAnyTask(CompoundTask): + """A Task representing `when_any` scenarios.""" + + def __init__(self, task: List[TaskBase], replay_schema: ReplaySchema): + """Initialize a WhenAnyTask. + + Parameters + ---------- + task : List[Task] + The list of child tasks + replay_schema : ReplaySchema + The ReplaySchema, which determines the inner action payload representation + """ + compound_action_constructor = None + if replay_schema is ReplaySchema.V2: + compound_action_constructor = WhenAnyAction + super().__init__(task, compound_action_constructor) + + def try_set_value(self, child: TaskBase): + """Transition a WhenAny Task to a terminal state and set its value. + + Parameters + ---------- + child : TaskBase + A sub-task that just completed + """ + if self.state is TaskState.RUNNING: + self.set_value(is_error=False, value=child) + + +class RetryAbleTask(WhenAllTask): + """A Task representing `with_retry` scenarios. + + It inherits from WhenAllTask because retryable scenarios are Tasks + with equivalent to WhenAll Tasks with dynamically increasing lists + of children. At every failure, we add a Timer child and a Task child + to the list of pending tasks. + """ + + def __init__(self, child: TaskBase, retry_options: RetryOptions, context): + self.id_ = str(child.id) + "_retryable_proxy" + tasks = [child] + super().__init__(tasks, context._replay_schema) + + self.retry_options = retry_options + self.num_attempts = 1 + self.context = context + self.actions = child.action_repr + + def try_set_value(self, child: TaskBase): + """Transition a Retryable Task to a terminal state and set its value. + + Parameters + ---------- + child : TaskBase + A sub-task that just completed + """ + if child.state is TaskState.SUCCEEDED: + if len(self.pending_tasks) == 0: + # if all pending tasks have completed, + # and we have a successful child, then + # we can set the Task's event + self.set_value(is_error=False, value=child.result) + + else: # child.state is TaskState.FAILED: + if self.num_attempts >= self.retry_options.max_number_of_attempts: + # we have reached the maximum number of attempts, set error + self.set_value(is_error=True, value=child.result) + else: + # still have some retries left. + # increase size of pending tasks by adding a timer task + # and then re-scheduling the current task after that + timer_task = self.context._generate_task(action=NoOpAction(), parent=self) + self.pending_tasks.add(timer_task) + self.context._add_to_open_tasks(timer_task) + rescheduled_task = self.context._generate_task(action=NoOpAction(), parent=self) + self.pending_tasks.add(rescheduled_task) + self.context._add_to_open_tasks(rescheduled_task) + self.num_attempts += 1 diff --git a/azure/durable_functions/models/TaskOrchestrationExecutor.py b/azure/durable_functions/models/TaskOrchestrationExecutor.py new file mode 100644 index 00000000..4119d76d --- /dev/null +++ b/azure/durable_functions/models/TaskOrchestrationExecutor.py @@ -0,0 +1,304 @@ +from azure.durable_functions.models.Task import TaskBase, TaskState, AtomicTask +from azure.durable_functions.models.OrchestratorState import OrchestratorState +from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext +from typing import Any, List, Optional, Union +from azure.durable_functions.models.history.HistoryEventType import HistoryEventType +from azure.durable_functions.models.history.HistoryEvent import HistoryEvent +from types import GeneratorType +import warnings +from collections import namedtuple +import json +from ..models.entities.ResponseMessage import ResponseMessage +from azure.functions._durable_functions import _deserialize_custom_object + + +class TaskOrchestrationExecutor: + """Manages the execution and replay of user-defined orchestrations.""" + + def __init__(self): + """Initialize TaskOrchestrationExecutor.""" + # A mapping of event types to a tuple of + # (1) whether the event type represents a task success + # (2) the attribute in the corresponding event object that identifies the Task + # this mapping is used for processing events that transition a Task from its running state + # to a terminal one + SetTaskValuePayload = namedtuple("SetTaskValuePayload", ("is_success", "task_id_key")) + self.event_to_SetTaskValuePayload = dict([ + (HistoryEventType.TASK_COMPLETED, SetTaskValuePayload(True, "TaskScheduledId")), + (HistoryEventType.TIMER_FIRED, SetTaskValuePayload(True, "TimerId")), + (HistoryEventType.SUB_ORCHESTRATION_INSTANCE_COMPLETED, + SetTaskValuePayload(True, "TaskScheduledId")), + (HistoryEventType.EVENT_RAISED, SetTaskValuePayload(True, "Name")), + (HistoryEventType.TASK_FAILED, SetTaskValuePayload(False, "TaskScheduledId")), + (HistoryEventType.SUB_ORCHESTRATION_INSTANCE_FAILED, + SetTaskValuePayload(False, "TaskScheduledId")) + ]) + self.task_completion_events = set(self.event_to_SetTaskValuePayload.keys()) + self.initialize() + + def initialize(self): + """Initialize the TaskOrchestrationExecutor for a new orchestration invocation.""" + # The first task is just a placeholder to kickstart the generator. + # So it's value is `None`. + self.current_task: TaskBase = AtomicTask(-1, []) + self.current_task.set_value(is_error=False, value=None) + + self.output: Any = None + self.exception: Optional[Exception] = None + self.orchestrator_returned: bool = False + + def execute(self, context: DurableOrchestrationContext, + history: List[HistoryEvent], fn) -> str: + """Execute an orchestration using the orchestration history to evaluate Tasks and replay events. + + Parameters + ---------- + context : DurableOrchestrationContext + The user's orchestration context, to interact with the user code. + history : List[HistoryEvent] + The orchestration history, to evaluate tasks and replay events. + fn : function + The user's orchestration function. + + Returns + ------- + str + A JSON-formatted string of the user's orchestration state, payload for the extension. + """ + self.context = context + evaluated_user_code = fn(context) + + # If user code is a generator, then it uses `yield` statements (the DF API) + # and so we iterate through the DF history, generating tasks and populating + # them with values when the history provides them + if isinstance(evaluated_user_code, GeneratorType): + self.generator = evaluated_user_code + for event in history: + self.process_event(event) + if self.has_execution_completed: + break + + # Due to backwards compatibility reasons, it's possible + # for the `continue_as_new` API to be called without `yield` statements. + # Therefore, we explicitely check if `continue_as_new` was used before + # flatting the orchestration as returned/completed + elif not self.context.will_continue_as_new: + self.orchestrator_returned = True + self.output = evaluated_user_code + return self.get_orchestrator_state_str() + + def process_event(self, event: HistoryEvent): + """Evaluate a history event. + + This might result in updating some orchestration internal state deterministically, + to evaluating some Task, or have no side-effects. + + Parameters + ---------- + event : HistoryEvent + The history event to process + """ + event_type = event.event_type + if event_type == HistoryEventType.ORCHESTRATOR_STARTED: + # update orchestration's deterministic timestamp + timestamp = event.timestamp + if timestamp > self.context.current_utc_datetime: + self.context.current_utc_datetime = event.timestamp + elif event.event_type == HistoryEventType.CONTINUE_AS_NEW: + # re-initialize the orchestration state + self.initialize() + elif event_type == HistoryEventType.EXECUTION_STARTED: + # begin replaying user code + self.resume_user_code() + elif event_type == HistoryEventType.EVENT_SENT: + # we want to differentiate between a "proper" event sent, and a signal/call entity + key = getattr(event, "event_id") + if key in self.context.open_tasks.keys(): + task = self.context.open_tasks[key] + if task._api_name == "CallEntityAction": + # in the signal entity case, the Task is represented + # with a GUID, not with a sequential integer + self.context.open_tasks.pop(key) + event_id = json.loads(event.Input)["id"] + self.context.open_tasks[event_id] = task + + elif self.is_task_completion_event(event.event_type): + # transition a task to a success or failure state + (is_success, id_key) = self.event_to_SetTaskValuePayload[event_type] + self.set_task_value(event, is_success, id_key) + self.resume_user_code() + + def set_task_value(self, event: HistoryEvent, is_success: bool, id_key: str): + """Set a running task to either a success or failed state, and sets its value. + + Parameters + ---------- + event : HistoryEvent + The history event containing the value for the Task + is_success : bool + Whether the Task succeeded or failed (throws exception) + id_key : str + The attribute in the event object containing the ID of the Task to target + """ + + def parse_history_event(directive_result): + """Based on the type of event, parse the JSON.serializable portion of the event.""" + event_type = directive_result.event_type + if event_type is None: + raise ValueError("EventType is not found in task object") + + # We provide the ability to deserialize custom objects, because the output of this + # will be passed directly to the orchestrator as the output of some activity + if event_type == HistoryEventType.SUB_ORCHESTRATION_INSTANCE_COMPLETED: + return json.loads(directive_result.Result, object_hook=_deserialize_custom_object) + if event_type == HistoryEventType.TASK_COMPLETED: + return json.loads(directive_result.Result, object_hook=_deserialize_custom_object) + if event_type == HistoryEventType.EVENT_RAISED: + # TODO: Investigate why the payload is in "Input" instead of "Result" + response = json.loads(directive_result.Input, + object_hook=_deserialize_custom_object) + return response + return None + + # get target task + key = getattr(event, id_key) + try: + task: Union[TaskBase, List[TaskBase]] = self.context.open_tasks.pop(key) + if isinstance(task, list): + task_list = task + task = task_list.pop() + if len(task_list) > 0: + self.context.open_tasks[key] = task_list + except KeyError: + warning = f"Potential duplicate Task completion for TaskId: {key}" + warnings.warn(warning) + self.context.deferred_tasks[key] = lambda: self.set_task_value( + event, is_success, id_key) + return + + if is_success: + # retrieve result + new_value = parse_history_event(event) + if task._api_name == "CallEntityAction": + new_value = ResponseMessage.from_dict(new_value) + new_value = json.loads(new_value.result) + else: + # generate exception + new_value = Exception(f"{event.Reason} \n {event.Details}") + + # with a yielded task now evaluated, we can try to resume the user code + task.set_is_played(event._is_played) + task.set_value(is_error=not(is_success), value=new_value) + + def resume_user_code(self): + """Attempt to continue executing user code. + + We can only continue executing if the active/current task has resolved to a value. + """ + current_task = self.current_task + self.context._set_is_replaying(current_task.is_played) + if current_task.state is TaskState.RUNNING: + # if the current task hasn't been resolved, we can't + # continue executing the user code. + return + + new_task = None + try: + # resume orchestration with a resolved task's value + task_value = current_task.result + task_succeeded = current_task.state is TaskState.SUCCEEDED + new_task = self.generator.send( + task_value) if task_succeeded else self.generator.throw(task_value) + self.context._add_to_open_tasks(new_task) + except StopIteration as stop_exception: + # the orchestration returned, + # flag it as such and capture its output + self.orchestrator_returned = True + self.output = stop_exception.value + except Exception as exception: + # the orchestration threw an exception + self.exception = exception + + self.current_task = new_task + if not (new_task is None): + if not (new_task.state is TaskState.RUNNING): + # user yielded the same task multiple times, continue executing code + # until a new/not-previously-yielded task is encountered + self.resume_user_code() + else: + # new task is received. it needs to be resolved to a value + self.context._add_to_actions(self.current_task.action_repr) + + def get_orchestrator_state_str(self) -> str: + """Obtain a JSON-formatted string representing the orchestration's state. + + Returns + ------- + str + String represented orchestration's state, payload to the extension + + Raises + ------ + Exception + When the orchestration's state represents an error. The exception's + message contains in it the string representation of the orchestration's + state + """ + state = OrchestratorState( + is_done=self.orchestration_invocation_succeeded, + actions=self.context._actions, + output=self.output, + replay_schema=self.context._replay_schema, + error=None if self.exception is None else str(self.exception), + custom_status=self.context.custom_status + ) + + if self.exception is not None: + # Create formatted error, using out-of-proc error schema + error_label = "\n\n$OutOfProcData$:" + state_str = state.to_json_string() + formatted_error = f"{self.exception}{error_label}{state_str}" + + # Raise exception, re-set stack to original location + raise Exception(formatted_error) from self.exception + return state.to_json_string() + + def is_task_completion_event(self, event_type: HistoryEventType) -> bool: + """Determine if some event_type corresponds to a Task-resolution event. + + Parameters + ---------- + event_type : HistoryEventType + The event_type to analyze. + + Returns + ------- + bool + True if the event corresponds to a Task being resolved. False otherwise. + """ + return event_type in self.task_completion_events + + @property + def has_execution_completed(self) -> bool: + """Determine if the orchestration invocation is completed. + + An orchestration can complete either by returning, + continuing-as-new, or through an exception. + + Returns + ------- + bool + Whether the orchestration invocation is completed. + """ + return self.orchestration_invocation_succeeded or not(self.exception is None) + + @property + def orchestration_invocation_succeeded(self) -> bool: + """Whether the orchestration returned or continued-as-new. + + Returns + ------- + bool + Whether the orchestration returned or continued-as-new + """ + return self.orchestrator_returned or self.context.will_continue_as_new diff --git a/azure/durable_functions/models/TaskSet.py b/azure/durable_functions/models/TaskSet.py deleted file mode 100644 index 28400aa0..00000000 --- a/azure/durable_functions/models/TaskSet.py +++ /dev/null @@ -1,67 +0,0 @@ -from typing import List -from azure.durable_functions.models.actions.Action import Action -from datetime import datetime - - -class TaskSet: - """Represents a list of some pending action. - - Similar to a native JavaScript promise in - that it acts as a placeholder for outstanding asynchronous work, but has - a synchronous implementation and is specific to Durable Functions. - - Tasks are only returned to an orchestration function when a - [[DurableOrchestrationContext]] operation is not called with `yield`. They - are useful for parallelization and timeout operations in conjunction with - Task.all and Task.any. - """ - - def __init__(self, is_completed, actions, result, is_faulted=False, - timestamp=None, exception=None, is_played=False): - self._is_completed: bool = is_completed - self._actions: List[Action] = actions - self._result = result - self._is_faulted: bool = is_faulted - self._timestamp: datetime = timestamp - self._exception = exception - self._is_played = is_played - self._is_yielded: bool = False - - @property - def is_completed(self) -> bool: - """Get indicator whether the task has completed. - - Note that completion is not equivalent to success. - """ - return self._is_completed - - @property - def is_faulted(self) -> bool: - """Get indicator whether the task faulted in some way due to error.""" - return self._is_faulted - - @property - def actions(self) -> List[Action]: - """Get the scheduled action represented by the task. - - _Internal use only._ - """ - return self._actions - - @property - def result(self) -> object: - """Get the result of the task, if completed. Otherwise `None`.""" - return self._result - - @property - def timestamp(self) -> datetime: - """Get the timestamp of the task.""" - return self._timestamp - - @property - def exception(self): - """Get the error thrown when attempting to perform the task's action. - - If the Task has not yet completed or has completed successfully, `None` - """ - return self._exception diff --git a/azure/durable_functions/models/__init__.py b/azure/durable_functions/models/__init__.py index cc291aa2..a61511d2 100644 --- a/azure/durable_functions/models/__init__.py +++ b/azure/durable_functions/models/__init__.py @@ -6,8 +6,6 @@ from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus from .PurgeHistoryResult import PurgeHistoryResult from .RetryOptions import RetryOptions -from .Task import Task -from .TaskSet import TaskSet from .DurableHttpRequest import DurableHttpRequest from .TokenSource import ManagedIdentityTokenSource from .DurableEntityContext import DurableEntityContext @@ -22,7 +20,5 @@ 'OrchestratorState', 'OrchestrationRuntimeStatus', 'PurgeHistoryResult', - 'RetryOptions', - 'Task', - 'TaskSet' + 'RetryOptions' ] diff --git a/azure/durable_functions/models/actions/NoOpAction.py b/azure/durable_functions/models/actions/NoOpAction.py new file mode 100644 index 00000000..b59475e2 --- /dev/null +++ b/azure/durable_functions/models/actions/NoOpAction.py @@ -0,0 +1,20 @@ +from azure.durable_functions.models.actions.Action import Action +from typing import Any, Dict + + +class NoOpAction(Action): + """A no-op action, for anonymous tasks only.""" + + def action_type(self) -> int: + """Get the type of action this class represents.""" + raise Exception("Attempted to get action type of an anonymous Action") + + def to_json(self) -> Dict[str, Any]: + """Convert object into a json dictionary. + + Returns + ------- + Dict[str, Any] + The instance of the class converted into a json dictionary + """ + raise Exception("Attempted to convert an anonymous Action to JSON") diff --git a/azure/durable_functions/models/actions/__init__.py b/azure/durable_functions/models/actions/__init__.py index 5d84c09d..ea7e4f1a 100644 --- a/azure/durable_functions/models/actions/__init__.py +++ b/azure/durable_functions/models/actions/__init__.py @@ -7,6 +7,8 @@ from .WaitForExternalEventAction import WaitForExternalEventAction from .CallHttpAction import CallHttpAction from .CreateTimerAction import CreateTimerAction +from .WhenAllAction import WhenAllAction +from .WhenAnyAction import WhenAnyAction __all__ = [ 'Action', @@ -16,5 +18,7 @@ 'CallSubOrchestratorAction', 'CallHttpAction', 'WaitForExternalEventAction', - 'CreateTimerAction' + 'CreateTimerAction', + 'WhenAnyAction', + 'WhenAllAction' ] diff --git a/azure/durable_functions/models/utils/__init__.py b/azure/durable_functions/models/utils/__init__.py index d5e75062..3428656c 100644 --- a/azure/durable_functions/models/utils/__init__.py +++ b/azure/durable_functions/models/utils/__init__.py @@ -4,4 +4,4 @@ """ from pkgutil import extend_path import typing -__path__: typing.Iterable[str] = extend_path(__path__, __name__) +__path__: typing.Iterable[str] = extend_path(__path__, __name__) # type: ignore diff --git a/azure/durable_functions/orchestrator.py b/azure/durable_functions/orchestrator.py index 0f42f1a8..085f59d9 100644 --- a/azure/durable_functions/orchestrator.py +++ b/azure/durable_functions/orchestrator.py @@ -3,17 +3,10 @@ Responsible for orchestrating the execution of the user defined generator function. """ -from typing import Callable, Iterator, Any, Generator +from azure.durable_functions.models.TaskOrchestrationExecutor import TaskOrchestrationExecutor +from typing import Callable, Any, Generator -from azure.durable_functions.models.ReplaySchema import ReplaySchema - -from .models import ( - DurableOrchestrationContext, - Task, - TaskSet, - OrchestratorState) -from .models.history import HistoryEventType -from .tasks import should_suspend +from .models import DurableOrchestrationContext import azure.functions as func @@ -34,135 +27,24 @@ def __init__(self, :param activity_func: Generator function to orchestrate. """ self.fn: Callable[[DurableOrchestrationContext], Generator[Any, Any, Any]] = activity_func + self.task_orchestration_executor = TaskOrchestrationExecutor() - def handle(self, context: DurableOrchestrationContext): + def handle(self, context: DurableOrchestrationContext) -> str: """Handle the orchestration of the user defined generator function. - Called each time the durable extension executes an activity and needs - the client to handle the result. + Parameters + ---------- + context : DurableOrchestrationContext + The DF orchestration context - :param context: the context of what has been executed by - the durable extension. - :return: the resulting orchestration state, with instructions back to - the durable extension. + Returns + ------- + str + The JSON-formatted string representing the user's orchestration + state after this invocation """ self.durable_context = context - self.generator = None - suspended = False - - fn_output = self.fn(self.durable_context) - - # If `fn_output` is not an Iterator, then the orchestrator - # function does not make use of its context parameter. If so, - # `fn_output` is the return value instead of a generator - if not isinstance(fn_output, Iterator): - orchestration_state = OrchestratorState( - replay_schema=self.durable_context._replay_schema, - is_done=True, - output=fn_output, - actions=self.durable_context.actions, - custom_status=self.durable_context.custom_status) - - else: - self.generator = fn_output - try: - generation_state = self._generate_next(None) - - while not suspended: - self._add_to_actions(generation_state) - - if should_suspend(generation_state): - - # The `is_done` field should be False here unless - # `continue_as_new` was called. Therefore, - # `will_continue_as_new` essentially "tracks" - # whether or not the orchestration is done. - orchestration_state = OrchestratorState( - replay_schema=self.durable_context._replay_schema, - is_done=self.durable_context.will_continue_as_new, - output=None, - actions=self.durable_context.actions, - custom_status=self.durable_context.custom_status) - suspended = True - continue - - if (isinstance(generation_state, Task) - or isinstance(generation_state, TaskSet)) and ( - generation_state.is_faulted): - generation_state = self.generator.throw( - generation_state.exception) - continue - - self._update_timestamp() - self.durable_context._is_replaying = generation_state._is_played - generation_state = self._generate_next(generation_state) - - except StopIteration as sie: - orchestration_state = OrchestratorState( - replay_schema=self.durable_context._replay_schema, - is_done=True, - output=sie.value, - actions=self.durable_context.actions, - custom_status=self.durable_context.custom_status) - except Exception as e: - exception_str = str(e) - orchestration_state = OrchestratorState( - replay_schema=self.durable_context._replay_schema, - is_done=False, - output=None, # Should have no output, after generation range - actions=self.durable_context.actions, - error=exception_str, - custom_status=self.durable_context.custom_status) - - # Create formatted error, using out-of-proc error schema - error_label = "\n\n$OutOfProcData$:" - state_str = orchestration_state.to_json_string() - formatted_error = f"{exception_str}{error_label}{state_str}" - - # Raise exception, re-set stack to original location - raise Exception(formatted_error) from e - - # No output if continue_as_new was called - if self.durable_context.will_continue_as_new: - orchestration_state._output = None - - return orchestration_state.to_json_string() - - def _generate_next(self, partial_result): - if partial_result is not None: - gen_result = self.generator.send(partial_result.result) - else: - gen_result = self.generator.send(None) - - return gen_result - - def _add_to_actions(self, generation_state): - # Do not add new tasks to action if continue_as_new was called - if self.durable_context.will_continue_as_new: - return - if not generation_state._is_yielded: - if isinstance(generation_state, Task): - if self.durable_context._replay_schema == ReplaySchema.V1: - self.durable_context.actions.append([generation_state.action]) - else: - self.durable_context.actions[0].append(generation_state.action) - - elif isinstance(generation_state, TaskSet): - if self.durable_context._replay_schema == ReplaySchema.V1: - self.durable_context.actions.append(generation_state.actions) - else: - self.durable_context.actions[0].append(generation_state.actions) - generation_state._is_yielded = True - - def _update_timestamp(self): - last_timestamp = self.durable_context.decision_started_event.timestamp - decision_started_events = [e_ for e_ in self.durable_context.histories - if e_.event_type == HistoryEventType.ORCHESTRATOR_STARTED - and e_.timestamp > last_timestamp] - if len(decision_started_events) != 0: - self.durable_context.decision_started_event = decision_started_events[0] - self.durable_context.current_utc_datetime = \ - self.durable_context.decision_started_event.timestamp + return self.task_orchestration_executor.execute(context, context.histories, self.fn) @classmethod def create(cls, fn: Callable[[DurableOrchestrationContext], Generator[Any, Any, Any]]) \ diff --git a/azure/durable_functions/tasks/__init__.py b/azure/durable_functions/tasks/__init__.py deleted file mode 100644 index 9c7f6e9b..00000000 --- a/azure/durable_functions/tasks/__init__.py +++ /dev/null @@ -1,32 +0,0 @@ -"""Contains the definitions for the functions that enable scheduling of activities.""" -from .call_activity import call_activity_task -from .call_activity_with_retry import call_activity_with_retry_task -from .call_suborchestrator import call_sub_orchestrator_task -from .call_suborchestrator_with_retry import call_sub_orchestrator_with_retry_task -from .task_all import task_all -from .task_any import task_any -from .task_utilities import should_suspend -from .wait_for_external_event import wait_for_external_event_task -from .continue_as_new import continue_as_new -from .new_uuid import new_uuid -from .call_http import call_http -from .create_timer import create_timer_task -from .call_entity import call_entity_task -from .signal_entity import signal_entity_task - -__all__ = [ - 'call_activity_task', - 'call_activity_with_retry_task', - 'call_sub_orchestrator_task', - 'call_sub_orchestrator_with_retry_task', - 'call_entity_task', - 'signal_entity_task', - 'call_http', - 'continue_as_new', - 'new_uuid', - 'task_all', - 'task_any', - 'should_suspend', - 'wait_for_external_event_task', - 'create_timer_task' -] diff --git a/azure/durable_functions/tasks/call_activity.py b/azure/durable_functions/tasks/call_activity.py deleted file mode 100644 index c5b094b7..00000000 --- a/azure/durable_functions/tasks/call_activity.py +++ /dev/null @@ -1,61 +0,0 @@ -from typing import List, Any - -from ..models.Task import ( - Task) -from ..models.actions.CallActivityAction import CallActivityAction -from ..models.history import HistoryEvent -from .task_utilities import find_task_completed, find_task_failed, \ - find_task_scheduled, set_processed, parse_history_event - - -def call_activity_task( - state: List[HistoryEvent], - name: str, - input_: Any = None) -> Task: - """Determine the state of Scheduling an activity for execution. - - Parameters - ---------- - state: List[HistoryEvent] - The list of history events to search to determine the current state of the activity. - name: str - The name of the activity function to schedule. - input_: Any - The JSON-serializable input to pass to the activity function. - - Returns - ------- - Task - A Durable Task that completes when the called activity function completes or fails. - """ - new_action = CallActivityAction(name, input_) - - task_scheduled = find_task_scheduled(state, name) - task_completed = find_task_completed(state, task_scheduled) - task_failed = find_task_failed(state, task_scheduled) - set_processed([task_scheduled, task_completed, task_failed]) - - if task_completed is not None: - return Task( - is_completed=True, - is_faulted=False, - action=new_action, - is_played=task_completed._is_played, - result=parse_history_event(task_completed), - timestamp=task_completed.timestamp, - id_=task_completed.TaskScheduledId) - - if task_failed is not None: - return Task( - is_completed=True, - is_faulted=True, - action=new_action, - is_played=task_failed._is_played, - result=task_failed.Reason, - timestamp=task_failed.timestamp, - id_=task_failed.TaskScheduledId, - exc=Exception( - f"{task_failed.Reason} \n {task_failed.Details}") - ) - - return Task(is_completed=False, is_faulted=False, action=new_action) diff --git a/azure/durable_functions/tasks/call_activity_with_retry.py b/azure/durable_functions/tasks/call_activity_with_retry.py deleted file mode 100644 index 3a4b1273..00000000 --- a/azure/durable_functions/tasks/call_activity_with_retry.py +++ /dev/null @@ -1,46 +0,0 @@ -from typing import List, Any - -from .task_utilities import get_retried_task -from ..models.RetryOptions import RetryOptions -from ..models.Task import ( - Task) -from ..models.actions.CallActivityWithRetryAction import \ - CallActivityWithRetryAction -from ..models.history import HistoryEvent, HistoryEventType - - -def call_activity_with_retry_task( - state: List[HistoryEvent], - retry_options: RetryOptions, - name: str, - input_: Any = None) -> Task: - """Determine the state of scheduling an activity for execution with retry options. - - Parameters - ---------- - state: List[HistoryEvent] - The list of history events to search to determine the current state of the activity. - retry_options: RetryOptions - The retry options for the activity function. - name: str - The name of the activity function to call. - input_: Any - The JSON-serializable input to pass to the activity function. - - Returns - ------- - Task - A Durable Task that completes when the called activity function completes or fails - completely. - """ - new_action = CallActivityWithRetryAction( - function_name=name, retry_options=retry_options, input_=input_) - - return get_retried_task( - state=state, - max_number_of_attempts=retry_options.max_number_of_attempts, - scheduled_type=HistoryEventType.TASK_SCHEDULED, - completed_type=HistoryEventType.TASK_COMPLETED, - failed_type=HistoryEventType.TASK_FAILED, - action=new_action - ) diff --git a/azure/durable_functions/tasks/call_entity.py b/azure/durable_functions/tasks/call_entity.py deleted file mode 100644 index 5d920405..00000000 --- a/azure/durable_functions/tasks/call_entity.py +++ /dev/null @@ -1,83 +0,0 @@ -from typing import List, Any, Optional - -from ..models.Task import ( - Task) -from ..models.actions.CallEntityAction import CallEntityAction -from ..models.history import HistoryEvent, HistoryEventType -from .task_utilities import set_processed, parse_history_event, find_event -from ..models.utils.entity_utils import EntityId -from ..models.entities.RequestMessage import RequestMessage -from ..models.entities.ResponseMessage import ResponseMessage -import json - - -def call_entity_task( - state: List[HistoryEvent], - entity_id: EntityId, - operation_name: str = "", - input_: Optional[Any] = None): - """Determine the status of a call-entity task. - - It the task hasn't been scheduled, it returns a Task to schedule. If the task completed, - we return a completed Task, to process its result. - - Parameters - ---------- - state: List[HistoryEvent] - The list of history events to search over to determine the - current state of the callEntity Task. - entity_id: EntityId - An identifier for the entity to call. - operation_name: str - The name of the operation the entity needs to execute. - input_: Any - The JSON-serializable input to pass to the activity function. - - Returns - ------- - Task - A Durable Task that completes when the called entity completes or fails. - """ - new_action = CallEntityAction(entity_id, operation_name, input_) - scheduler_id = EntityId.get_scheduler_id(entity_id=entity_id) - - hist_type = HistoryEventType.EVENT_SENT - extra_constraints = { - "InstanceId": scheduler_id, - "Name": "op" - } - event_sent = find_event(state, hist_type, extra_constraints) - - event_raised = None - if event_sent: - event_input = None - if hasattr(event_sent, "Input") and event_sent.Input is not None: - event_input = RequestMessage.from_json(event_sent.Input) - hist_type = HistoryEventType.EVENT_RAISED - extra_constraints = { - "Name": event_input.id - } - event_raised = find_event(state, hist_type, extra_constraints) - # TODO: does it make sense to have an event_sent but no `Input` attribute ? - # If not, we should raise an exception here - - set_processed([event_sent, event_raised]) - if event_raised is not None: - response = parse_history_event(event_raised) - response = ResponseMessage.from_dict(response) - - # TODO: json.loads inside parse_history_event is not recursive - # investigate if response.result is used elsewhere, - # which probably requires another deserialization - result = json.loads(response.result) - - return Task( - is_completed=True, - is_faulted=False, - action=new_action, - result=result, - timestamp=event_raised.timestamp, - id_=event_raised.Name) # event_raised.TaskScheduledId - - # TODO: this may be missing exception handling, as is JS - return Task(is_completed=False, is_faulted=False, action=new_action) diff --git a/azure/durable_functions/tasks/call_http.py b/azure/durable_functions/tasks/call_http.py deleted file mode 100644 index c2c34b3f..00000000 --- a/azure/durable_functions/tasks/call_http.py +++ /dev/null @@ -1,78 +0,0 @@ -import json -from typing import Dict, List, Optional - -from .task_utilities import find_task_scheduled, find_task_completed, find_task_failed, \ - set_processed, parse_history_event -from ..constants import HTTP_ACTION_NAME -from ..models.DurableHttpRequest import DurableHttpRequest -from ..models.TokenSource import TokenSource -from ..models.actions import CallHttpAction -from ..models.history import HistoryEvent -from ..models.Task import ( - Task) - - -def call_http(state: List[HistoryEvent], method: str, uri: str, content: Optional[str] = None, - headers: Dict[str, str] = None, token_source: Optional[TokenSource] = None) -> Task: - """Get task used to schedule a durable HTTP call to the specified endpoint. - - Parameters - ---------- - state: List[HistoryEvent] - The list of events that have been processed to determine the state of the task to be - scheduled - method: str - The HTTP request method. - uri: str - The HTTP request uri. - content: str - The HTTP request content. - headers: Dict[str, str] - The HTTP request headers. - token_source: TokenSource - The source of OAuth token to add to the request. - - Returns - ------- - Task - The durable HTTP request to schedule. - """ - json_content: Optional[str] = None - if content and content is not isinstance(content, str): - json_content = json.dumps(content) - else: - json_content = content - - request = DurableHttpRequest(method, uri, json_content, headers, token_source) - - new_action = CallHttpAction(request) - - task_scheduled = find_task_scheduled(state, HTTP_ACTION_NAME) - task_completed = find_task_completed(state, task_scheduled) - task_failed = find_task_failed(state, task_scheduled) - set_processed([task_scheduled, task_completed, task_failed]) - - if task_completed is not None: - return Task( - is_completed=True, - is_faulted=False, - action=new_action, - is_played=task_completed._is_played, - result=parse_history_event(task_completed), - timestamp=task_completed.timestamp, - id_=task_completed.TaskScheduledId) - - if task_failed is not None: - return Task( - is_completed=True, - is_faulted=True, - action=new_action, - is_played=task_failed._is_played, - result=task_failed.Reason, - timestamp=task_failed.timestamp, - id_=task_failed.TaskScheduledId, - exc=Exception( - f"{task_failed.Reason} \n {task_failed.Details}") - ) - - return Task(is_completed=False, is_faulted=False, action=new_action) diff --git a/azure/durable_functions/tasks/call_suborchestrator.py b/azure/durable_functions/tasks/call_suborchestrator.py deleted file mode 100644 index 65a6c6d3..00000000 --- a/azure/durable_functions/tasks/call_suborchestrator.py +++ /dev/null @@ -1,69 +0,0 @@ -from typing import List, Any, Optional - -from ..models.Task import ( - Task) -from ..models.actions.CallSubOrchestratorAction import CallSubOrchestratorAction -from ..models.history import HistoryEvent -from .task_utilities import set_processed, parse_history_event, \ - find_sub_orchestration_created, find_sub_orchestration_completed, \ - find_sub_orchestration_failed - - -def call_sub_orchestrator_task( - context, - state: List[HistoryEvent], - name: str, - input_: Optional[Any] = None, - instance_id: Optional[str] = None) -> Task: - """Determine the state of Scheduling a sub-orchestrator for execution. - - Parameters - ---------- - context: 'DurableOrchestrationContext': - A reference to the orchestration context. - state: List[HistoryEvent] - The list of history events to search to determine the current state of the activity. - name: str - The name of the activity function to schedule. - input_: Optional[Any] - The JSON-serializable input to pass to the activity function. Defaults to None. - instance_id: str - The instance ID of the sub-orchestrator to call. Defaults to "". - - Returns - ------- - Task - A Durable Task that completes when the called sub-orchestrator completes or fails. - """ - new_action = CallSubOrchestratorAction(name, input_, instance_id) - - task_scheduled = find_sub_orchestration_created( - state, name, context=context, instance_id=instance_id) - task_completed = find_sub_orchestration_completed(state, task_scheduled) - task_failed = find_sub_orchestration_failed(state, task_scheduled) - set_processed([task_scheduled, task_completed, task_failed]) - - if task_completed is not None: - return Task( - is_completed=True, - is_faulted=False, - action=new_action, - is_played=task_completed._is_played, - result=parse_history_event(task_completed), - timestamp=task_completed.timestamp, - id_=task_completed.TaskScheduledId) - - if task_failed is not None: - return Task( - is_completed=True, - is_faulted=True, - action=new_action, - is_played=task_failed._is_played, - result=task_failed.Reason, - timestamp=task_failed.timestamp, - id_=task_failed.TaskScheduledId, - exc=Exception( - f"{task_failed.Reason} \n {task_failed.Details}") - ) - - return Task(is_completed=False, is_faulted=False, action=new_action) diff --git a/azure/durable_functions/tasks/call_suborchestrator_with_retry.py b/azure/durable_functions/tasks/call_suborchestrator_with_retry.py deleted file mode 100644 index e27dd354..00000000 --- a/azure/durable_functions/tasks/call_suborchestrator_with_retry.py +++ /dev/null @@ -1,48 +0,0 @@ -from typing import List, Any, Optional - -from ..models.Task import ( - Task) -from ..models.actions.CallSubOrchestratorWithRetryAction import CallSubOrchestratorWithRetryAction -from ..models.RetryOptions import RetryOptions -from ..models.history import HistoryEvent, HistoryEventType -from .task_utilities import get_retried_task - - -def call_sub_orchestrator_with_retry_task( - context, - state: List[HistoryEvent], - retry_options: RetryOptions, - name: str, - input_: Optional[Any] = None, - instance_id: Optional[str] = None) -> Task: - """Determine the state of Scheduling a sub-orchestrator for execution, with retry options. - - Parameters - ---------- - context: 'DurableOrchestrationContext': - A reference to the orchestration context. - state: List[HistoryEvent] - The list of history events to search to determine the current state of the activity. - retry_options: RetryOptions - The settings for retrying this sub-orchestrator in case of a failure. - name: str - The name of the activity function to schedule. - input_: Optional[Any] - The JSON-serializable input to pass to the activity function. Defaults to None. - instance_id: str - The instance ID of the sub-orchestrator to call. Defaults to "". - - Returns - ------- - Task - A Durable Task that completes when the called sub-orchestrator completes or fails. - """ - new_action = CallSubOrchestratorWithRetryAction(name, retry_options, input_, instance_id) - return get_retried_task( - state=state, - max_number_of_attempts=retry_options.max_number_of_attempts, - scheduled_type=HistoryEventType.SUB_ORCHESTRATION_INSTANCE_CREATED, - completed_type=HistoryEventType.SUB_ORCHESTRATION_INSTANCE_COMPLETED, - failed_type=HistoryEventType.SUB_ORCHESTRATION_INSTANCE_FAILED, - action=new_action - ) diff --git a/azure/durable_functions/tasks/continue_as_new.py b/azure/durable_functions/tasks/continue_as_new.py deleted file mode 100644 index dd07765f..00000000 --- a/azure/durable_functions/tasks/continue_as_new.py +++ /dev/null @@ -1,19 +0,0 @@ -from typing import Any - -from ..models.actions.ContinueAsNewAction import ContinueAsNewAction - - -def continue_as_new( - context, - input_: Any = None): - """Create a new continue as new action. - - Parameters - ---------- - input_: Any - The JSON-serializable input to pass to the activity function. - """ - new_action = ContinueAsNewAction(input_) - - context.actions.append([new_action]) - context._continue_as_new_flag = True diff --git a/azure/durable_functions/tasks/create_timer.py b/azure/durable_functions/tasks/create_timer.py deleted file mode 100644 index 4d67a4d5..00000000 --- a/azure/durable_functions/tasks/create_timer.py +++ /dev/null @@ -1,42 +0,0 @@ -from typing import List -from ..models.actions.CreateTimerAction import CreateTimerAction -from ..models.history import HistoryEvent -from .task_utilities import find_task_timer_created, find_task_retry_timer_fired, set_processed -import datetime -from .timer_task import TimerTask - - -def create_timer_task(state: List[HistoryEvent], - fire_at: datetime.datetime) -> TimerTask: - """Durable Timers are used in orchestrator function to implement delays. - - Parameters - ---------- - state : List[HistoryEvent] - The list of history events to search to determine the current state of the activity - fire_at : datetime - The time interval to fire the timer trigger - - Returns - ------- - TimerTask - A Durable Timer Task that schedules the timer to wake up the activity - """ - new_action = CreateTimerAction(fire_at) - - timer_created = find_task_timer_created(state, fire_at) - timer_fired = find_task_retry_timer_fired(state, timer_created) - - set_processed([timer_created, timer_fired]) - - if timer_fired: - return TimerTask( - is_completed=True, action=new_action, - timestamp=timer_fired.timestamp, - id_=timer_fired.event_id, - is_played=timer_fired.is_played) - else: - return TimerTask( - is_completed=False, action=new_action, - timestamp=None, - id_=None) diff --git a/azure/durable_functions/tasks/new_uuid.py b/azure/durable_functions/tasks/new_uuid.py deleted file mode 100644 index 364e4798..00000000 --- a/azure/durable_functions/tasks/new_uuid.py +++ /dev/null @@ -1,42 +0,0 @@ -from uuid import uuid5, NAMESPACE_OID - -from azure.durable_functions.constants import DATETIME_STRING_FORMAT -import typing - -if typing.TYPE_CHECKING: - from azure.durable_functions.models.DurableOrchestrationContext \ - import DurableOrchestrationContext - -URL_NAMESPACE: str = "9e952958-5e33-4daf-827f-2fa12937b875" - - -def _create_deterministic_uuid(namespace_value: str, name: str) -> str: - namespace_uuid = uuid5(NAMESPACE_OID, namespace_value) - return str(uuid5(namespace_uuid, name)) - - -def new_uuid(context: 'DurableOrchestrationContext') -> str: - """Create a new UUID that is safe for replay within an orchestration or operation. - - The default implementation of this method creates a name-based UUID - using the algorithm from RFC 4122 ยง4.3. The name input used to generate - this value is a combination of the orchestration instance ID and an - internally managed sequence number. - - Parameters - ---------- - context : DurableOrchestrationContext - Provides reference to the instance id, current_utc_datetime and a new_uuid_counter - attribute that is combined together to form that name that is used for the V5 UUID. - - Returns - ------- - str - New UUID that is safe for replay within an orchestration or operation. - """ - uuid_name_value = \ - f"{context._instance_id}" \ - f"_{context.current_utc_datetime.strftime(DATETIME_STRING_FORMAT)}" \ - f"_{context._new_uuid_counter}" - context._new_uuid_counter += 1 - return _create_deterministic_uuid(URL_NAMESPACE, uuid_name_value) diff --git a/azure/durable_functions/tasks/signal_entity.py b/azure/durable_functions/tasks/signal_entity.py deleted file mode 100644 index c7006495..00000000 --- a/azure/durable_functions/tasks/signal_entity.py +++ /dev/null @@ -1,45 +0,0 @@ -from typing import List, Any, Optional -from ..models.actions.SignalEntityAction import SignalEntityAction -from ..models.history import HistoryEvent, HistoryEventType -from .task_utilities import set_processed, find_event -from ..models.utils.entity_utils import EntityId - - -def signal_entity_task( - context, - state: List[HistoryEvent], - entity_id: EntityId, - operation_name: str = "", - input_: Optional[Any] = None): - """Signal a entity operation. - - It the action hasn't been scheduled, it appends the action. - If the action has been scheduled, no ops. - - Parameters - ---------- - state: List[HistoryEvent] - The list of history events to search over to determine the - current state of the callEntity Task. - entity_id: EntityId - An identifier for the entity to call. - operation_name: str - The name of the operation the entity needs to execute. - input_: Any - The JSON-serializable input to pass to the activity function. - """ - new_action = SignalEntityAction(entity_id, operation_name, input_) - scheduler_id = EntityId.get_scheduler_id(entity_id=entity_id) - - hist_type = HistoryEventType.EVENT_SENT - extra_constraints = { - "InstanceId": scheduler_id, - "Name": "op" - } - - event_sent = find_event(state, hist_type, extra_constraints) - set_processed([event_sent]) - context.actions.append([new_action]) - - if event_sent: - return diff --git a/azure/durable_functions/tasks/task_all.py b/azure/durable_functions/tasks/task_all.py deleted file mode 100644 index 13721a10..00000000 --- a/azure/durable_functions/tasks/task_all.py +++ /dev/null @@ -1,83 +0,0 @@ -from azure.durable_functions.models.actions.WhenAllAction import WhenAllAction -from azure.durable_functions.models.ReplaySchema import ReplaySchema -from datetime import datetime -from typing import List, Optional, Any - -from ..models.Task import Task -from ..models.TaskSet import TaskSet -from ..models.actions import Action - - -def task_all(tasks: List[Task], replay_schema: ReplaySchema): - """Determine the state of scheduling the activities for execution with retry options. - - Parameters - ---------- - tasks: List[Task] - The tasks to evaluate their current state. - - Returns - ------- - TaskSet - A Durable Task Set that reports the state of running all of the tasks within it. - """ - # Args for constructing the output TaskSet - is_played = True - is_faulted = False - is_completed = True - - actions: List[Action] = [] - results: List[Any] = [] - - exception: Optional[str] = None - end_time: Optional[datetime] = None - - for task in tasks: - # Add actions and results - if isinstance(task, TaskSet): - if replay_schema == ReplaySchema.V1: - actions.extend(task.actions) - else: - actions.append(task.actions) - else: - # We know it's an atomic Task - actions.append(task.action) - results.append(task.result) - - # Record first exception, if it exists - if task.is_faulted and not is_faulted: - is_faulted = True - exception = task.exception - - # If any task is not played, TaskSet is not played - if not task._is_played: - is_played = False - - # If any task is incomplete, TaskSet is incomplete - # If the task is complete, we can update the end_time - if not task.is_completed: - is_completed = False - elif end_time is None: - end_time = task.timestamp - else: - end_time = max([task.timestamp, end_time]) - - # Incomplete TaskSets do not have results or end-time - if not is_completed: - results = [] - end_time = None - - if replay_schema == ReplaySchema.V2: - actions = WhenAllAction(actions) - - # Construct TaskSet - taskset = TaskSet( - is_completed=is_completed, - actions=actions, - result=results, - is_faulted=is_faulted, - timestamp=end_time, - exception=exception, - is_played=is_played - ) - return taskset diff --git a/azure/durable_functions/tasks/task_any.py b/azure/durable_functions/tasks/task_any.py deleted file mode 100644 index f76e5572..00000000 --- a/azure/durable_functions/tasks/task_any.py +++ /dev/null @@ -1,51 +0,0 @@ -from azure.durable_functions.models.actions.WhenAnyAction import WhenAnyAction -from azure.durable_functions.models.ReplaySchema import ReplaySchema -from ..models.TaskSet import TaskSet - - -def task_any(tasks, replay_schema: ReplaySchema): - """Determine whether any of the given tasks is completed. - - Parameters - ---------- - tasks : Task - The tasks to evaluate their current state. - - Returns - ------- - TaskSet - Returns a completed Durable Task Set if any of the tasks is completed. - Returns a not completed Durable Task Set if none of the tasks are completed. - Returns a faulted Taskset if all tasks are faulted - """ - all_actions = [] - completed_tasks = [] - faulted_tasks = [] - error_message = [] - for task in tasks: - if isinstance(task, TaskSet): - if replay_schema == ReplaySchema.V1: - all_actions.extend(task.actions) - else: - all_actions.append(task.actions) - else: - all_actions.append(task.action) - - if task.is_faulted: - faulted_tasks.append(task) - error_message.append(task.exception) - elif task.is_completed: - completed_tasks.append(task) - - completed_tasks.sort(key=lambda t: t.timestamp) - - if replay_schema == ReplaySchema.V2: - all_actions = WhenAnyAction(all_actions) - - if len(faulted_tasks) == len(tasks): - return TaskSet(True, all_actions, None, is_faulted=True, exception=Exception( - f"All tasks have failed, errors messages in all tasks:{error_message}")) - elif len(completed_tasks) != 0: - return TaskSet(True, all_actions, completed_tasks[0], False, completed_tasks[0].timestamp) - else: - return TaskSet(False, all_actions, None) diff --git a/azure/durable_functions/tasks/task_utilities.py b/azure/durable_functions/tasks/task_utilities.py deleted file mode 100644 index b0dd0251..00000000 --- a/azure/durable_functions/tasks/task_utilities.py +++ /dev/null @@ -1,560 +0,0 @@ -import json -from ..models.history import HistoryEventType, HistoryEvent -from azure.functions._durable_functions import _deserialize_custom_object -from dateutil import parser -from typing import List, Optional, Dict, Any -from ..models.actions.Action import Action -from ..models.Task import Task - - -def should_suspend(partial_result) -> bool: - """Check the state of the result to determine if the orchestration should suspend.""" - return bool(partial_result is not None - and hasattr(partial_result, "is_completed") - and not partial_result.is_completed) - - -def parse_history_event(directive_result): - """Based on the type of event, parse the JSON.serializable portion of the event.""" - event_type = directive_result.event_type - if event_type is None: - raise ValueError("EventType is not found in task object") - - # We provide the ability to deserialize custom objects, because the output of this - # will be passed directly to the orchestrator as the output of some activity - if event_type == HistoryEventType.SUB_ORCHESTRATION_INSTANCE_COMPLETED: - return json.loads(directive_result.Result, object_hook=_deserialize_custom_object) - if event_type == HistoryEventType.TASK_COMPLETED: - return json.loads(directive_result.Result, object_hook=_deserialize_custom_object) - if event_type == HistoryEventType.EVENT_RAISED: - # TODO: Investigate why the payload is in "Input" instead of "Result" - return json.loads(directive_result.Input, object_hook=_deserialize_custom_object) - return None - - -def find_event(state: List[HistoryEvent], event_type: HistoryEventType, - extra_constraints: Dict[str, Any]) -> Optional[HistoryEvent]: - """Find event in the histories array as per some constraints. - - Parameters - ---------- - state: List[HistoryEvent] - The list of events so far in the orchestaration - event_type: HistoryEventType - The type of the event we're looking for - extra_constraints: Dict[str, Any] - A dictionary of key-value pairs where the key is a property of the - sought-after event, and value are its expected contents. - - Returns - ------- - Optional[HistoryEvent] - The event being searched-for, if found. Else, None. - """ - def satisfies_contraints(e: HistoryEvent) -> bool: - """Determine if an event matches our search criteria. - - Parameters - ---------- - e: HistoryEvent - An event from the state array - - Returns - ------- - bool - True if the event matches our constraints. Else, False. - """ - for attr, val in extra_constraints.items(): - if hasattr(e, attr) and getattr(e, attr) == val: - continue - else: - return False - return True - - tasks = [e for e in state - if e.event_type == event_type - and satisfies_contraints(e) and not e.is_processed] - - if len(tasks) == 0: - return None - - return tasks[0] - - -def find_event_raised(state, name): - """Find if the event with the given event name is raised. - - Parameters - ---------- - state : List[HistoryEvent] - List of histories to search from - name : str - Name of the event to search for - - Returns - ------- - HistoryEvent - The raised event with the given event name that has not yet been processed. - Returns None if no event with the given conditions was found. - - Raises - ------ - ValueError - Raises an error if no name was given when calling this function. - """ - if not name: - raise ValueError("Name cannot be empty") - - tasks = [e for e in state - if e.event_type == HistoryEventType.EVENT_RAISED - and e.Name == name and not e.is_processed] - - if len(tasks) == 0: - return None - - return tasks[0] - - -def find_task_scheduled(state, name): - """Locate the Scheduled Task. - - Within the state passed, search for an event that has hasn't been processed - and has the the name provided. - """ - if not name: - raise ValueError("Name cannot be empty") - - tasks = [e for e in state - if e.event_type == HistoryEventType.TASK_SCHEDULED - and e.Name == name and not e.is_processed] - - if len(tasks) == 0: - return None - - return tasks[0] - - -def find_task_completed(state, scheduled_task): - """Locate the Completed Task. - - Within the state passed, search for an event that has hasn't been processed, - is a completed task type, - and has the a scheduled id that equals the EventId of the provided scheduled task. - """ - if scheduled_task is None: - return None - - tasks = [e for e in state if e.event_type == HistoryEventType.TASK_COMPLETED - and e.TaskScheduledId == scheduled_task.event_id] - - if len(tasks) == 0: - return None - - return tasks[0] - - -def find_task_failed(state, scheduled_task): - """Locate the Failed Task. - - Within the state passed, search for an event that has hasn't been processed, - is a failed task type, - and has the a scheduled id that equals the EventId of the provided scheduled task. - """ - if scheduled_task is None: - return None - - tasks = [e for e in state if e.event_type == HistoryEventType.TASK_FAILED - and e.TaskScheduledId == scheduled_task.event_id] - - if len(tasks) == 0: - return None - - return tasks[0] - - -def find_task_timer_created(state, fire_at): - """Locate the Timer Created Task. - - Within the state passed, search for an event that has hasn't been processed, - is a timer created task type, - and has the an event id that is one higher then Scheduled Id of the provided - failed task provided. - """ - if fire_at is None: - return None - - # We remove the timezone metadata, - # to enable comparisons with timezone-naive datetime objects. This may be dangerous - fire_at = fire_at.replace(tzinfo=None) - tasks = [] - for e in state: - if e.event_type == HistoryEventType.TIMER_CREATED and hasattr(e, "FireAt"): - if parser.parse(e.FireAt).replace(tzinfo=None) == fire_at: - tasks.append(e) - - if len(tasks) == 0: - return None - - return tasks[0] - - -def find_task_retry_timer_created(state, failed_task): - """Locate the Timer Created Task. - - Within the state passed, search for an event that has hasn't been processed, - is a timer created task type, - and has the an event id that is one higher then Scheduled Id of the provided - failed task provided. - """ - if failed_task is None: - return None - - tasks = [e for e in state if e.event_type == HistoryEventType.TIMER_CREATED - and e.event_id == failed_task.TaskScheduledId + 1] - - if len(tasks) == 0: - return None - - return tasks[0] - - -def find_task_retry_timer_fired(state, retry_timer_created): - """Locate the Timer Fired Task. - - Within the state passed, search for an event that has hasn't been processed, - is a timer fired task type, - and has the an timer id that is equal to the EventId of the provided - timer created task provided. - """ - if retry_timer_created is None: - return None - - tasks = [e for e in state if e.event_type == HistoryEventType.TIMER_FIRED - and e.TimerId == retry_timer_created.event_id] - - if len(tasks) == 0: - return None - - return tasks[0] - - -def set_processed(tasks): - """Set the isProcessed attribute of all of the tasks to true. - - This provides the ability to not look at events that have already been processed within - searching the history of events. - """ - for task in tasks: - if task is not None: - task.is_processed = True - - -def find_sub_orchestration( - state: List[HistoryEvent], - event_type: HistoryEventType, - name: Optional[str] = None, - context=None, - instance_id: Optional[str] = None, - scheduled_task: Optional[HistoryEvent] = None) -> Optional[HistoryEvent]: - """Look-up matching sub-orchestrator event in the state array. - - Parameters - ---------- - state: List[HistoryEvent] - The history of Durable events - event_type: HistoryEventType - The type of Durable event to look for. - name: Optional[str]: - Name of the sub-orchestrator. - context: Optional['DurableOrchestrationContext'] - A reference to the orchestration context - instance_id: Optional[str], optional: - Instance ID of the sub-orchestrator. Defaults to None. - scheduled_task" Optional[HistoryEvent], optional: - The corresponding `scheduled` task for the searched-for event, - only available when looking for a completed or failed event. - Defaults to None. - - Returns - ------- - Optional[HistoryEvent]: - The matching event from the state array, if it exists. - """ - - def gen_err_message(counter: int, mid_message: str, found: str, expected: str) -> str: - beg = f"The sub-orchestration call (n = {counter}) should be executed with " - middle = mid_message.format(found, expected) - end = " Check your code for non-deterministic behavior." - err_message = beg + middle + end - return err_message - - event: Optional[HistoryEvent] = find_matching_event(state, event_type, scheduled_task) - - # Test for name and instance_id mistaches and, if so, error out. - # Also increase sub-orchestrator counter, for reporting. - if event_type == HistoryEventType.SUB_ORCHESTRATION_INSTANCE_CREATED and (event is not None): - - context._sub_orchestrator_counter += 1 - counter: int = context._sub_orchestrator_counter - - if name is None: - err = "Tried to lookup suborchestration in history but had not name to reference it." - raise ValueError(err) - - # TODO: The HistoryEvent does not necessarily have a name or an instance_id - # We should create sub-classes of these types like JS does, to ensure their - # precense. - - if event.Name is None: - raise ValueError("History Event for suborchestration found with no {Name} field") - event_name: str = event.Name - err_message: str = "" - if not(event.Name == name): - mid_message = "a function name of {} instead of the provided function name of {}." - err_message = gen_err_message(counter, mid_message, event_name, name) - raise ValueError(err_message) - if instance_id and not(event.InstanceId == instance_id): - mid_message = "an instance id of {} instead of the provided instance id of {}." - err_message = gen_err_message(counter, mid_message, event_name, name) - raise ValueError(err_message) - - return event - - -def find_sub_orchestration_created( - state: List[HistoryEvent], - name: str, - context=None, - instance_id: Optional[str] = None) -> Optional[HistoryEvent]: - """Look-up matching sub-orchestrator created event in the state array. - - Parameters - ---------- - state: List[HistoryEvent]: - The history of Durable events - name: str: - Name of the sub-orchestrator. - context: Optional['DurableOrchestrationContext']: - A reference to the orchestration context. - instance_id: Optional[str], optional: - Instance ID of the sub-orchestrator. Defaults to None. - - Raises - ------ - ValueError: When the provided sub-orchestration name or instance_id (if provided) do not - correspond to the matching event in the state list. - - Returns - ------- - Optional[HistoryEvent]: - The matching sub-orchestration creation event. Else, None. - """ - event_type = HistoryEventType.SUB_ORCHESTRATION_INSTANCE_CREATED - return find_sub_orchestration( - state=state, - event_type=event_type, - name=name, - instance_id=instance_id, - context=context) - - -def find_sub_orchestration_completed( - state: List[HistoryEvent], - scheduled_task: Optional[HistoryEvent]) -> Optional[HistoryEvent]: - """Look-up the sub-orchestration completed event. - - Parameters - ---------- - state: List[HistoryEvent]: - The history of Durable events - scheduled_task: Optional[HistoryEvent]: - The sub-orchestration creation event, if found. - - Returns - ------- - Optional[HistoryEvent]: - The matching sub-orchestration completed event, if found. Else, None. - """ - event_type = HistoryEventType.SUB_ORCHESTRATION_INSTANCE_COMPLETED - return find_sub_orchestration( - state=state, - event_type=event_type, - scheduled_task=scheduled_task) - - -def find_sub_orchestration_failed( - state: List[HistoryEvent], - scheduled_task: Optional[HistoryEvent]) -> Optional[HistoryEvent]: - """Look-up the sub-orchestration failure event. - - Parameters - ---------- - state: List[HistoryEvent]: - The history of Durable events - scheduled_task: Optional[HistoryEvent]: - The sub-orchestration creation event, if found. - - Returns - ------- - Optional[HistoryEvent]: - The matching sub-orchestration failure event, if found. Else, None. - """ - event_type = HistoryEventType.SUB_ORCHESTRATION_INSTANCE_FAILED - return find_sub_orchestration( - state=state, - event_type=event_type, - scheduled_task=scheduled_task) - - -def find_matching_event( - state: List[HistoryEvent], - event_type: HistoryEventType, - scheduled_task: Optional[HistoryEvent] = None) -> Optional[HistoryEvent]: - """Find matching event in the state array, if it exists. - - Parameters - ---------- - state: List[HistoryEvent]: - The list of Durable events - event_type: HistoryEventType: - The type of event being searched-for. - scheduled_task" Optional[HistoryEvent], optional: - The corresponding `scheduled` task for the searched-for event, - only available when looking for a completed or failed event. - Defaults to None. - - Returns - ------- - Optional[HistoryEvent]: - The matching event from the state array, if it exists. - """ - - def should_preserve(event: HistoryEvent) -> bool: - """Check if `event` matches the task being searched-for. - - Parameters - ---------- - event: HistoryEvent: - An event from the `state` array. - - Returns - ------- - bool: - True if `event` matches the task being search-for. - False otherwise. - """ - should_preserve = False - has_correct_type = event.event_type == event_type - if has_correct_type: - is_not_processed = not event.is_processed - extra_constraints = True - if not (scheduled_task is None): - extra_constraints = event.TaskScheduledId == scheduled_task.event_id - should_preserve = has_correct_type and is_not_processed and extra_constraints - return should_preserve - - event: Optional[HistoryEvent] = None - - # Preverse only the elements of the state array that correspond with the looked-up event - matches = list(filter(should_preserve, state)) - - if len(matches) >= 1: - # TODO: in many instances, `matches` will be greater than 1 in length. We take the - # first element because that corresponds to the first non-processed event, which - # we assume corresponds to the one we are looking for. This may be brittle but - # is true about other areas of the code as well such as with `call_activity`. - # We should try to refactor this logic at some point - event = matches[0] - return event - - -def get_retried_task( - state: List[HistoryEvent], max_number_of_attempts: int, scheduled_type: HistoryEventType, - completed_type: HistoryEventType, failed_type: HistoryEventType, action: Action) -> Task: - """Determine the state of scheduling some task for execution with retry options. - - Parameters - ---------- - state: List[HistoryEvent] - The list of history events - max_number_of_ints: int - The maximum number of retrying attempts - scheduled_type: HistoryEventType - The event type corresponding to scheduling the searched-for task - completed_type: HistoryEventType - The event type corresponding to a completion of the searched-for task - failed_type: HistoryEventType - The event type coresponding to the failure of the searched-for task - action: Action - The action corresponding to the searched-for task - - Returns - ------- - Task - A Task encompassing the state of the scheduled work item, that is, - either completed, failed, or incomplete. - """ - # tasks to look for in the state array - scheduled_task, completed_task = None, None - failed_task, scheduled_timer_task = None, None - attempt = 1 - - # Note each case below is exclusive, and the order matters - for event in state: - event_type = HistoryEventType(event.event_type) - - # Skip processed events - if event.is_processed: - continue - - # first we find the scheduled_task - elif scheduled_task is None: - if event_type is scheduled_type: - scheduled_task = event - - # if the task has a correponding completion, we process the events - # and return a completed task - elif event_type == completed_type and \ - event.TaskScheduledId == scheduled_task.event_id: - completed_task = event - set_processed([scheduled_task, completed_task]) - return Task( - is_completed=True, - is_faulted=False, - action=action, - result=parse_history_event(completed_task), - timestamp=completed_task.timestamp, - id_=completed_task.TaskScheduledId - ) - - # if its failed, we'll have to wait for an upcoming timer scheduled - elif failed_task is None: - if event_type is failed_type: - if event.TaskScheduledId == scheduled_task.event_id: - failed_task = event - - # if we have a timer scheduled, we'll have to find a timer fired - elif scheduled_timer_task is None: - if event_type is HistoryEventType.TIMER_CREATED: - scheduled_timer_task = event - - # if we have a timer fired, we check if we still have more attempts for retries. - # If so, we retry again and clear our found events so far. - # If not, we process the events and return a completed task - elif event_type is HistoryEventType.TIMER_FIRED: - if event.TimerId == scheduled_timer_task.event_id: - set_processed([scheduled_task, completed_task, failed_task, scheduled_timer_task]) - if attempt >= max_number_of_attempts: - return Task( - is_completed=True, - is_faulted=True, - action=action, - timestamp=failed_task.timestamp, - id_=failed_task.TaskScheduledId, - exc=Exception( - f"{failed_task.Reason} \n {failed_task.Details}") - ) - else: - scheduled_task, failed_task, scheduled_timer_task = None, None, None - attempt += 1 - return Task(is_completed=False, is_faulted=False, action=action) diff --git a/azure/durable_functions/tasks/timer_task.py b/azure/durable_functions/tasks/timer_task.py deleted file mode 100644 index 454e6a9c..00000000 --- a/azure/durable_functions/tasks/timer_task.py +++ /dev/null @@ -1,49 +0,0 @@ -from ..models.Task import Task -from ..models.actions.CreateTimerAction import CreateTimerAction - - -class TimerTask(Task): - """Represents a pending timer. - - All pending timers must be completed or canceled for an orchestration to complete. - - Example: Cancel a timer - ``` - timeout_task = context.df.create_timer(expiration_date) - if not timeout_task.is_completed(): - timeout_task.cancel() - ``` - """ - - def __init__(self, action: CreateTimerAction, is_completed, timestamp, id_, is_played=False): - self._action: CreateTimerAction = action - self._is_completed = is_completed - self._timestamp = timestamp - self._id = id_ - - super().__init__(self._is_completed, False, - self._action, None, self._timestamp, self._id, None) - self._is_played = is_played - - def is_cancelled(self) -> bool: - """Check of a timer is cancelled. - - Returns - ------- - bool - Returns whether a timer has been cancelled or not - """ - return self._action.is_cancelled - - def cancel(self): - """Cancel a timer. - - Raises - ------ - ValueError - Raises an error if the task is already completed and an attempt is made to cancel it - """ - if not self._is_completed: - self._action.is_cancelled = True - else: - raise ValueError("Cannot cancel a completed task.") diff --git a/azure/durable_functions/tasks/wait_for_external_event.py b/azure/durable_functions/tasks/wait_for_external_event.py deleted file mode 100644 index 64645232..00000000 --- a/azure/durable_functions/tasks/wait_for_external_event.py +++ /dev/null @@ -1,43 +0,0 @@ -from typing import List - -from ..models.Task import ( - Task) -from ..models.actions.WaitForExternalEventAction import WaitForExternalEventAction -from ..models.history import HistoryEvent -from .task_utilities import set_processed, parse_history_event, find_event_raised - - -def wait_for_external_event_task( - state: List[HistoryEvent], - name: str) -> Task: - """Determine the state of a task that is waiting for an event to occur. - - Parameters - ---------- - state : List[HistoryEvent] - The list of history events to search to determine the current - state of the task. - name : str - The event name of the event that the task is waiting for. - - Returns - ------- - Task - Returns a completed task if the expected event was raised. - Returns a not completed task if the expected event has not occurred yet. - """ - new_action = WaitForExternalEventAction(name) - event_raised = find_event_raised(state, name) - set_processed([event_raised]) - if event_raised: - return Task( - is_completed=True, - is_faulted=False, - action=new_action, - is_played=event_raised._is_played, - result=parse_history_event(event_raised), - timestamp=event_raised.timestamp, - id_=event_raised.event_id) - - else: - return Task(is_completed=False, is_faulted=False, action=new_action) diff --git a/noxfile.py b/noxfile.py index cae7ac72..29367157 100644 --- a/noxfile.py +++ b/noxfile.py @@ -18,4 +18,9 @@ def lint(session): def typecheck(session): session.install("-r", "requirements.txt") session.install("mypy") - session.run("mypy", "./azure/") \ No newline at end of file + session.run("mypy", "./azure/") + +@nox.session(python=["3.7", "3.8"]) +def autopep(session): + session.install("-r", "requirements.txt") + session.run("autopep8", "--in-place --aggressive --aggressive --recursive \"./azure/\"") \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 3763252d..20535994 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,5 @@ azure-functions>=1.2.0 nox==2019.11.9 furl==2.1.0 pytest-asyncio==0.10.0 +autopep8 +types-python-dateutil diff --git a/tests/orchestrator/test_create_timer.py b/tests/orchestrator/test_create_timer.py index bd6eb174..869b8c49 100644 --- a/tests/orchestrator/test_create_timer.py +++ b/tests/orchestrator/test_create_timer.py @@ -9,7 +9,7 @@ def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.V1.value) + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value) def add_timer_fired_events(context_builder: ContextBuilder, id_: int, timestamp: str): fire_at: str = context_builder.add_timer_created_event(id_, timestamp) diff --git a/tests/orchestrator/test_entity.py b/tests/orchestrator/test_entity.py index af0bccf4..872066cc 100644 --- a/tests/orchestrator/test_entity.py +++ b/tests/orchestrator/test_entity.py @@ -150,7 +150,7 @@ def add_call_entity_action_for_entity(state: OrchestratorState, id_: df.EntityId def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.V1.value) + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value) def add_call_entity_action(state: OrchestratorState, id_: df.EntityId, op: str, input_: Any): action = CallEntityAction(entity_id=id_, operation=op, input_=input_) @@ -161,8 +161,8 @@ def add_signal_entity_action(state: OrchestratorState, id_: df.EntityId, op: str state.actions.append([action]) def add_call_entity_completed_events( - context_builder: ContextBuilder, op: str, instance_id=str, input_=None): - context_builder.add_event_sent_event(instance_id) + context_builder: ContextBuilder, op: str, instance_id=str, input_=None, event_id=0): + context_builder.add_event_sent_event(instance_id, event_id) context_builder.add_orchestrator_completed_event() context_builder.add_orchestrator_started_event() context_builder.add_event_raised_event(name="0000", id_=0, input_=input_, is_entity=True) @@ -200,7 +200,7 @@ def test_signal_entity_sent(): def test_call_entity_raised(): entityId = df.EntityId("Counter", "myCounter") context_builder = ContextBuilder('test_simple_function') - add_call_entity_completed_events(context_builder, "add", df.EntityId.get_scheduler_id(entityId), 3) + add_call_entity_completed_events(context_builder, "add", df.EntityId.get_scheduler_id(entityId), 3, 0) result = get_orchestration_state_result( context_builder, generator_function_call_entity) diff --git a/tests/orchestrator/test_external_event.py b/tests/orchestrator/test_external_event.py new file mode 100644 index 00000000..263ef774 --- /dev/null +++ b/tests/orchestrator/test_external_event.py @@ -0,0 +1,54 @@ +from datetime import datetime +from tests.orchestrator.test_fan_out_fan_in import add_completed_event, add_failed_event, base_expected_state, add_multi_actions +from tests.orchestrator.orchestrator_test_utils import assert_orchestration_state_equals, get_orchestration_state_result +from tests.test_utils.ContextBuilder import ContextBuilder +from azure.durable_functions.models.actions.WaitForExternalEventAction import WaitForExternalEventAction + +def generator_function(context): + result = yield context.wait_for_external_event("A") + return result + +def generator_function_multiple(context): + result = yield context.wait_for_external_event("B") + result = yield context.wait_for_external_event("A") + return result + +def test_continue_when_no_payload(): + context_builder = ContextBuilder() + result = get_orchestration_state_result( + context_builder, generator_function) + + expected_state = base_expected_state() + expected_state.actions.append([WaitForExternalEventAction("A")]) + expected = expected_state.to_json() + assert_orchestration_state_equals(expected, result) + +def test_succeeds_on_payload(): + timestamp = datetime.now() + json_input = '{"test":"somecontent"}' + context_builder = ContextBuilder() + context_builder.add_event_raised_event("A", input_=json_input, timestamp=timestamp, id_=-1) + result = get_orchestration_state_result( + context_builder, generator_function) + + expected_state = base_expected_state({"test":"somecontent"}) + expected_state.actions.append([WaitForExternalEventAction("A")]) + expected_state._is_done = True + expected = expected_state.to_json() + assert_orchestration_state_equals(expected, result) + +def test_succeeds_on_out_of_order_payload(): + timestamp = datetime.now() + json_input = '{"test":"somecontent"}' + context_builder = ContextBuilder() + context_builder.add_event_raised_event("B", input_=json_input, timestamp=timestamp, id_=-1) + context_builder.add_event_raised_event("A", input_=json_input, timestamp=timestamp, id_=-1) + result = get_orchestration_state_result( + context_builder, generator_function_multiple) + + expected_state = base_expected_state({"test":"somecontent"}) + expected_state.actions.append([WaitForExternalEventAction("A")]) + expected_state.actions.append([WaitForExternalEventAction("B")]) + expected_state._is_done = True + expected = expected_state.to_json() + assert_orchestration_state_equals(expected, result) \ No newline at end of file diff --git a/tests/orchestrator/test_fan_out_fan_in.py b/tests/orchestrator/test_fan_out_fan_in.py index e0283be6..b0d889db 100644 --- a/tests/orchestrator/test_fan_out_fan_in.py +++ b/tests/orchestrator/test_fan_out_fan_in.py @@ -20,7 +20,7 @@ def generator_function(context): def base_expected_state(output=None, error=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output, error=error, replay_schema=replay_schema.value) + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema) def add_completed_event( diff --git a/tests/orchestrator/test_is_replaying_flag.py b/tests/orchestrator/test_is_replaying_flag.py index e638c6df..e3e1023d 100644 --- a/tests/orchestrator/test_is_replaying_flag.py +++ b/tests/orchestrator/test_is_replaying_flag.py @@ -18,6 +18,19 @@ def generator_function(context): deadline = deadline + timedelta(seconds=30) yield context.create_timer(deadline) +def generator_function_compound_task(context): + # Create a timezone aware datetime object, just like a normal + # call to `context.current_utc_datetime` would create + timestamp = "2020-07-23T21:56:54.936700Z" + deadline = datetime.strptime(timestamp, DATETIME_STRING_FORMAT) + deadline = deadline.replace(tzinfo=timezone.utc) + + tasks = [] + for _ in range(0, 3): + deadline = deadline + timedelta(seconds=30) + tasks.append(context.create_timer(deadline)) + yield context.task_any(tasks) + def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value) @@ -71,3 +84,18 @@ def test_is_replaying_one_replayed_one_not(): context_builder, generator_function, "durable_context") assert result.is_replaying == False + +def test_is_replaying_propagates_in_compound_task(): + + timestamp = "2020-07-23T21:56:54.9367Z" + fire_at = datetime.strptime(timestamp, DATETIME_STRING_FORMAT) + timedelta(seconds=30) + fire_at_str = fire_at.strftime(DATETIME_STRING_FORMAT) + + context_builder = ContextBuilder("") + add_timer_fired_events(context_builder, 0, fire_at_str, is_played=True) + + result = get_orchestration_property( + context_builder, generator_function_compound_task, "durable_context") + + assert result.is_replaying == True + diff --git a/tests/orchestrator/test_retries.py b/tests/orchestrator/test_retries.py index c08ffdad..3f425985 100644 --- a/tests/orchestrator/test_retries.py +++ b/tests/orchestrator/test_retries.py @@ -123,12 +123,9 @@ def _schedule_events(context: ContextBuilder, id_counter: int) -> Tuple[ContextB Tuple[ContextBuilder, int, List[int]]: The updated context, the updated counter, a list of event IDs for each scheduled event """ - scheduled_ids: List[int] = [] - for id_ in range(num_activities): - scheduled_ids.append(id_) - context.add_task_scheduled_event(name='Hello', id_=id_) - id_counter += 1 - return context, id_counter, scheduled_ids + id_counter = id_counter + 1 + context.add_task_scheduled_event(name='Hello', id_=id_counter) + return context, id_counter def _fail_events(context: ContextBuilder, id_counter: int) -> Tuple[ContextBuilder, int]: """Add event failed to the context. @@ -146,10 +143,8 @@ def _fail_events(context: ContextBuilder, id_counter: int) -> Tuple[ContextBuild The updated context, the updated id_counter """ context.add_orchestrator_started_event() - for id_ in scheduled_ids: - context.add_task_failed_event( - id_=id_, reason=REASONS, details=DETAILS) - id_counter += 1 + context.add_task_failed_event( + id_=id_counter, reason=REASONS, details=DETAILS) return context, id_counter def _schedule_timers(context: ContextBuilder, id_counter: int) -> Tuple[ContextBuilder, int, List[datetime]]: @@ -167,10 +162,9 @@ def _schedule_timers(context: ContextBuilder, id_counter: int) -> Tuple[ContextB Tuple[ContextBuilder, int, List[datetime]]: The updated context, the updated counter, a list of timer deadlines """ + id_counter = id_counter + 1 deadlines: List[datetime] = [] - for _ in range(num_activities): - deadlines.append((id_counter, context.add_timer_created_event(id_counter))) - id_counter += 1 + deadlines.append((id_counter, context.add_timer_created_event(id_counter))) return context, id_counter, deadlines def _fire_timer(context: ContextBuilder, id_counter: int, deadlines: List[datetime]) -> Tuple[ContextBuilder, int]: @@ -192,10 +186,9 @@ def _fire_timer(context: ContextBuilder, id_counter: int, deadlines: List[dateti """ for id_, fire_at in deadlines: context.add_timer_fired_event(id_=id_, fire_at=fire_at) - id_counter += 1 return context, id_counter - def _complete_event(context: ContextBuilder, id_counter: int) -> Tuple[ContextBuilder, int]: + def _complete_event(context: ContextBuilder, id_counter: int, city:str) -> Tuple[ContextBuilder, int]: """Add event / task completions to the context. Parameters @@ -210,45 +203,44 @@ def _complete_event(context: ContextBuilder, id_counter: int) -> Tuple[ContextBu Tuple[ContextBuilder, int] The updated context, the updated id_counter """ - for id_, city in zip(scheduled_ids, CITIES): - result = f"\"{RESULT_PREFIX}{city}\"" - context.add_task_completed_event(id_=id_, result=result) - id_counter += 1 + result = f"\"{RESULT_PREFIX}{city}\"" + context.add_task_completed_event(id_=id_counter, result=result) return context, id_counter - id_counter = 0 - - # Schedule the events - context, id_counter, scheduled_ids = _schedule_events(context, id_counter) - context.add_orchestrator_completed_event() - - # Record failures, schedule timers - context, id_counter = _fail_events(context, id_counter) - context, id_counter, deadlines = _schedule_timers(context, id_counter) - context.add_orchestrator_completed_event() + id_counter = -1 - # Fire timers, re-schedule events - context.add_orchestrator_started_event() - context, id_counter = _fire_timer(context, id_counter, deadlines) - context, id_counter, scheduled_ids = _schedule_events(context, id_counter) - context.add_orchestrator_completed_event() - - context.add_orchestrator_started_event() + for city in CITIES: + # Schedule the events + context, id_counter = _schedule_events(context, id_counter) + context.add_orchestrator_completed_event() - # Either complete the event or, if we want all failed events, then - # fail the events, schedule timer, and fire time. - if will_fail: + # Record failures, schedule timers context, id_counter = _fail_events(context, id_counter) context, id_counter, deadlines = _schedule_timers(context, id_counter) context.add_orchestrator_completed_event() + # Fire timers, re-schedule events context.add_orchestrator_started_event() context, id_counter = _fire_timer(context, id_counter, deadlines) - else: - context, id_counter = _complete_event(context, id_counter) + context, id_counter = _schedule_events(context, id_counter) + context.add_orchestrator_completed_event() - context.add_orchestrator_completed_event() + context.add_orchestrator_started_event() + + # Either complete the event or, if we want all failed events, then + # fail the events, schedule timer, and fire time. + if will_fail: + context, id_counter = _fail_events(context, id_counter) + context, id_counter, deadlines = _schedule_timers(context, id_counter) + context.add_orchestrator_completed_event() + + context.add_orchestrator_started_event() + context, id_counter = _fire_timer(context, id_counter, deadlines) + else: + context, id_counter = _complete_event(context, id_counter, city) + + context.add_orchestrator_completed_event() return context def test_redundant_completion_doesnt_get_processed(): @@ -303,7 +295,7 @@ def test_retries_can_fail(): assert str.startswith(error_str, expected_error_str) def test_retries_with_serializable_input(): - """Tests that retried tasks work with serialized input classes.""" + # Tests that retried tasks work with serialized input classes. context = get_context_with_retries() result_1 = get_orchestration_state_result( @@ -314,4 +306,4 @@ def test_retries_with_serializable_input(): assert "output" in result_1 assert "output" in result_2 - assert result_1["output"] == result_2["output"] + assert result_1["output"] == result_2["output"] \ No newline at end of file diff --git a/tests/orchestrator/test_sequential_orchestrator.py b/tests/orchestrator/test_sequential_orchestrator.py index ac63259b..aeb60701 100644 --- a/tests/orchestrator/test_sequential_orchestrator.py +++ b/tests/orchestrator/test_sequential_orchestrator.py @@ -24,6 +24,15 @@ def generator_function(context): return outputs +def generator_function_no_yield(context): + outputs = [] + + task1 = context.call_activity("Hello", "Tokyo") + task2 = context.call_activity("Hello", "Seattle") + task3 = yield context.call_activity("Hello", "London") + + return task3 + def generator_function_duplicate_yield(context): task1 = context.call_activity("Hello", "Tokyo") yield task1 @@ -273,6 +282,21 @@ def test_tokyo_and_seattle_and_london_state(): assert_valid_schema(result) assert_orchestration_state_equals(expected, result) +def test_sequential_orchestration_no_yield(): + context_builder = ContextBuilder('test_simple_function') + add_hello_completed_events(context_builder, 0, "\"Hello London!\"") + + result = get_orchestration_state_result( + context_builder, generator_function_no_yield) + + expected_state = base_expected_state('Hello London!') + add_hello_action(expected_state, 'London') + expected_state._is_done = True + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + def test_tokyo_and_seattle_and_london_with_serialization_state(): """Tests the sequential function pattern with custom object serialization. diff --git a/tests/orchestrator/test_sequential_orchestrator_custom_status.py b/tests/orchestrator/test_sequential_orchestrator_custom_status.py index fd56eebc..10503aa8 100644 --- a/tests/orchestrator/test_sequential_orchestrator_custom_status.py +++ b/tests/orchestrator/test_sequential_orchestrator_custom_status.py @@ -82,7 +82,7 @@ def test_custom_status_tokyo_seattle(): # Complete the two event so that it sets the custom status accordingly add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"") - add_hello_completed_events(context_builder, 0, "\"Hello Seattle!\"") + add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"") result = get_orchestration_state_result( context_builder, generator_function) diff --git a/tests/orchestrator/test_sub_orchestrator_with_retry.py b/tests/orchestrator/test_sub_orchestrator_with_retry.py index 1247a342..ae8db018 100644 --- a/tests/orchestrator/test_sub_orchestrator_with_retry.py +++ b/tests/orchestrator/test_sub_orchestrator_with_retry.py @@ -81,9 +81,9 @@ def test_tokyo_and_seattle_and_london_state_partial_failure(): context_builder = ContextBuilder('test_simple_function') add_hello_suborch_completed_events(context_builder, 0, "\"Hello Tokyo!\"") add_hello_suborch_failed_events(context_builder, 1, failed_reason, failed_details) - add_retry_timer_events(context_builder, 3) - add_hello_suborch_completed_events(context_builder, 4, "\"Hello Seattle!\"") - add_hello_suborch_completed_events(context_builder, 5, "\"Hello London!\"") + add_retry_timer_events(context_builder, 2) + add_hello_suborch_completed_events(context_builder, 3, "\"Hello Seattle!\"") + add_hello_suborch_completed_events(context_builder, 4, "\"Hello London!\"") result = get_orchestration_state_result( context_builder, generator_function) diff --git a/tests/orchestrator/test_task_any.py b/tests/orchestrator/test_task_any.py new file mode 100644 index 00000000..57f49a28 --- /dev/null +++ b/tests/orchestrator/test_task_any.py @@ -0,0 +1,48 @@ +from tests.orchestrator.test_fan_out_fan_in import add_completed_event, add_failed_event, base_expected_state, add_multi_actions +from tests.orchestrator.orchestrator_test_utils import assert_orchestration_state_equals, get_orchestration_state_result +from tests.test_utils.ContextBuilder import ContextBuilder + +def generator_function(context): + task1 = context.call_activity("Hello", "0") + task2 = context.call_activity("Hello", "1") + task3 = context.call_activity("Hello", "2") + task4 = context.task_any([task1, task2, task3]) + first_completed_task = yield task4 + try: + result = yield first_completed_task + return result + except: + return "exception" + +def test_continues_on_zero_results(): + context_builder = ContextBuilder() + result = get_orchestration_state_result( + context_builder, generator_function) + expected_state = base_expected_state() + add_multi_actions(expected_state, function_name='Hello', volume=3) + expected = expected_state.to_json() + assert_orchestration_state_equals(expected, result) + +def test_continues_on_one_failure(): + context_builder = ContextBuilder() + add_failed_event(context_builder, 0, "Hello", reason="", details="") + result = get_orchestration_state_result( + context_builder, generator_function) + add_failed_event(context_builder, 0, "Hello", reason="", details="") + expected_state = base_expected_state("exception") + add_multi_actions(expected_state, function_name='Hello', volume=3) + expected_state._is_done = True + expected = expected_state.to_json() + assert_orchestration_state_equals(expected, result) + +def test_succeeds_on_one_result(): + context_builder = ContextBuilder() + add_completed_event(context_builder, 0, "Hello", result="1") + result = get_orchestration_state_result( + context_builder, generator_function) + add_completed_event(context_builder, 2, "Hello", "3") + expected_state = base_expected_state("1") + add_multi_actions(expected_state, function_name='Hello', volume=3) + expected_state._is_done = True + expected = expected_state.to_json() + assert_orchestration_state_equals(expected, result) \ No newline at end of file diff --git a/tests/tasks/test_new_uuid.py b/tests/tasks/test_new_uuid.py index a240c5c2..0803b904 100644 --- a/tests/tasks/test_new_uuid.py +++ b/tests/tasks/test_new_uuid.py @@ -1,35 +1,9 @@ from uuid import uuid1 -import datetime from typing import List, Any, Dict -from datetime import datetime - -from azure.durable_functions.tasks.new_uuid import URL_NAMESPACE, \ - _create_deterministic_uuid from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext from azure.durable_functions.constants import DATETIME_STRING_FORMAT -def test_create_deterministic_uuid(): - namespace = URL_NAMESPACE - instance_id = uuid1() - current_utc_datetime = datetime.now().strftime(DATETIME_STRING_FORMAT); - - name1 = f"{instance_id}_{current_utc_datetime}_0" - name2 = f"{instance_id}_{current_utc_datetime}_12" - - result1a = _create_deterministic_uuid(namespace, name1) - result1b = _create_deterministic_uuid(namespace, name1) - - result2a = _create_deterministic_uuid(namespace, name2) - result2b = _create_deterministic_uuid(namespace, name2) - - assert result1a == result1b - assert result2a == result2b - - assert result1a != result2a - assert result1b != result2b - - def history_list() -> List[Dict[Any, Any]]: history = [{'EventType': 12, 'EventId': -1, 'IsPlayed': False, 'Timestamp': '2019-12-08T23:18:41.3240927Z'}, { @@ -58,4 +32,4 @@ def test_new_uuid(): assert result1b == result2b assert result1a != result1b - assert result2a != result2b + assert result2a != result2b \ No newline at end of file diff --git a/tests/tasks/test_task_any.py b/tests/tasks/test_task_any.py deleted file mode 100644 index b55abd7c..00000000 --- a/tests/tasks/test_task_any.py +++ /dev/null @@ -1,79 +0,0 @@ -from azure.durable_functions.models.ReplaySchema import ReplaySchema -from datetime import datetime, date -import json -from azure.durable_functions.models import Task, TaskSet -from azure.durable_functions.tasks import task_any -from azure.durable_functions.tasks.wait_for_external_event import wait_for_external_event_task -from azure.durable_functions.models.actions.WaitForExternalEventAction import WaitForExternalEventAction -from azure.durable_functions.constants import DATETIME_STRING_FORMAT -from tests.test_utils.ContextBuilder import ContextBuilder -from .tasks_test_utils import assert_taskset_equal - - -from tests.orchestrator.orchestrator_test_utils \ - import assert_orchestration_state_equals, get_orchestration_state_result -from tests.test_utils.ContextBuilder import ContextBuilder -from azure.durable_functions.models.OrchestratorState import OrchestratorState -from tests.orchestrator.test_sequential_orchestrator import base_expected_state,\ - add_hello_action, add_hello_failed_events - -def test_has_completed_task(): - all_actions = [WaitForExternalEventAction("C"), WaitForExternalEventAction("A"), WaitForExternalEventAction("B")] - task1 = Task(is_completed=False, is_faulted=False, action=all_actions[0], timestamp=date(2000,1,1)) - task2 = Task(is_completed=True, is_faulted=False, action=all_actions[1],timestamp=date(2000,2,1)) - task3 = Task(is_completed=True, is_faulted=False, action=all_actions[2],timestamp=date(2000,1,1)) - - tasks = [task1, task2, task3] - returned_taskset = task_any(tasks, replay_schema=ReplaySchema.V1) - expected_taskset = TaskSet(is_completed=True, actions=all_actions, result=task3, timestamp=date(2000,1,1)) - - assert_taskset_equal(expected_taskset, returned_taskset) - -def test_has_no_completed_task(): - all_actions = [WaitForExternalEventAction("C"), WaitForExternalEventAction("A"), WaitForExternalEventAction("B")] - task1 = Task(is_completed=False, is_faulted=False, action=all_actions[0], timestamp=date(2000,1,1)) - task2 = Task(is_completed=False, is_faulted=False, action=all_actions[1],timestamp=date(2000,2,1)) - task3 = Task(is_completed=False, is_faulted=False, action=all_actions[2],timestamp=date(2000,1,1)) - - tasks = [task1, task2, task3] - returned_taskset = task_any(tasks, replay_schema=ReplaySchema.V1) - expected_taskset = TaskSet(is_completed=False, actions=all_actions, result=None) - - assert_taskset_equal(expected_taskset, returned_taskset) - -def test_all_faulted_task_should_fail(): - all_actions = [WaitForExternalEventAction("C"), WaitForExternalEventAction("A"), WaitForExternalEventAction("B")] - task1 = Task(is_completed=False, is_faulted=True, action=all_actions[0], timestamp=date(2000,1,1), exc=Exception("test failure")) - task2 = Task(is_completed=False, is_faulted=True, action=all_actions[1], timestamp=date(2000,2,1), exc=Exception("test failure")) - task3 = Task(is_completed=False, is_faulted=True, action=all_actions[2], timestamp=date(2000,1,1), exc=Exception("test failure")) - - tasks = [task1, task2, task3] - returned_taskset = task_any(tasks, replay_schema=ReplaySchema.V1) - error_messages = [Exception("test failure") for _ in range(3)] - expected_exception = Exception(f"All tasks have failed, errors messages in all tasks:{error_messages}") - expected_taskset = TaskSet(is_completed=True, actions=all_actions, result=None, is_faulted=True, exception=expected_exception) - assert_taskset_equal(expected_taskset, returned_taskset) - -def test_one_faulted_task_should_still_proceed(): - all_actions = [WaitForExternalEventAction("C"), WaitForExternalEventAction("A"), WaitForExternalEventAction("B")] - task1 = Task(is_completed=False, is_faulted=True, action=all_actions[0], timestamp=date(2000,1,1)) - task2 = Task(is_completed=False, is_faulted=False, action=all_actions[1],timestamp=date(2000,2,1)) - task3 = Task(is_completed=False, is_faulted=False, action=all_actions[2],timestamp=date(2000,1,1)) - - tasks = [task1, task2, task3] - returned_taskset = task_any(tasks, replay_schema=ReplaySchema.V1) - expected_taskset = TaskSet(is_completed=False, actions=all_actions, result=None) - - assert_taskset_equal(expected_taskset, returned_taskset) - -def test_taskset_and_tasks_as_args(): - all_actions = [WaitForExternalEventAction("C"), WaitForExternalEventAction("A"), WaitForExternalEventAction("B")] - task1 = Task(is_completed=False, is_faulted=True, action=all_actions[0], timestamp=date(2000,1,1)) - task2 = TaskSet(is_completed=True, is_faulted=False, actions=[all_actions[1], all_actions[2]], \ - result=[None, None], timestamp=date(2000,1,1)) - - tasks = [task1, task2] - returned_taskset = task_any(tasks, replay_schema=ReplaySchema.V1) - expected_taskset = TaskSet(is_completed=True, actions=all_actions, result=task2, timestamp=date(2000,1,1)) - - assert_taskset_equal(expected_taskset, returned_taskset) diff --git a/tests/tasks/test_wait_for_external_event_task.py b/tests/tasks/test_wait_for_external_event_task.py deleted file mode 100644 index d314c54d..00000000 --- a/tests/tasks/test_wait_for_external_event_task.py +++ /dev/null @@ -1,40 +0,0 @@ -import json -from datetime import datetime - -from dateutil.tz import tzutc - -from azure.durable_functions.models.Task import Task -from azure.durable_functions.models.actions.WaitForExternalEventAction import \ - WaitForExternalEventAction -from azure.durable_functions.tasks.wait_for_external_event import wait_for_external_event_task -from tests.test_utils.ContextBuilder import ContextBuilder -from .tasks_test_utils import assert_tasks_equal - - -def test_event_not_raised_return_incompleted_task(): - context_builder = ContextBuilder('test_simple_function') - expected_action = WaitForExternalEventAction("A") - - returned_task = wait_for_external_event_task(context_builder.history_events, "A") - expected_task = Task(is_completed=False, is_faulted=False, action=expected_action) - - assert_tasks_equal(expected_task, returned_task) - - -def test_event_raised_return_completed_task(): - timestamp = datetime.now() - json_input = '{"test":"somecontent"}' - expected_action = WaitForExternalEventAction("A") - context_builder = ContextBuilder('test_simple_function') - context_builder.add_event_raised_event(name="A", input_=json_input, timestamp=timestamp, id_=1) - - returned_task = wait_for_external_event_task(context_builder.history_events, "A") - expected_task = Task( - is_completed=True, - is_faulted=False, - action=expected_action, - result=json.loads(json_input), - timestamp=timestamp.replace(tzinfo=tzutc()), - id_=1) - - assert_tasks_equal(expected_task, returned_task) diff --git a/tests/test_utils/ContextBuilder.py b/tests/test_utils/ContextBuilder.py index 1ec218fe..7cc7273f 100644 --- a/tests/test_utils/ContextBuilder.py +++ b/tests/test_utils/ContextBuilder.py @@ -71,9 +71,10 @@ def add_sub_orchestrator_failed_event(self, id_, reason, details): event.TaskScheduledId = id_ self.history_events.append(event) - def add_event_sent_event(self, instance_id): + def add_event_sent_event(self, instance_id, event_id): event = self.get_base_event(HistoryEventType.EVENT_SENT) event.InstanceId = instance_id + event._event_id = event_id event.Name = "op" event.Input = json.dumps({ "id": "0000" }) # usually provided by the extension self.history_events.append(event)