From 39ee2ba8a090e1203e09c97cfd10666a5dd745c3 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 16 Jul 2020 17:11:44 -0700 Subject: [PATCH 01/13] type-checked the project --- .../models/DurableHttpRequest.py | 27 +++---- .../models/DurableOrchestrationBindings.py | 8 +- .../models/DurableOrchestrationClient.py | 35 +++++---- .../models/DurableOrchestrationContext.py | 75 +++++++++---------- .../models/DurableOrchestrationStatus.py | 40 +++++----- .../models/OrchestratorState.py | 10 +-- .../models/PurgeHistoryResult.py | 11 +-- .../durable_functions/models/RetryOptions.py | 6 +- .../models/RpcManagementOptions.py | 17 ++++- azure/durable_functions/models/Task.py | 2 +- azure/durable_functions/models/TokenSource.py | 6 +- .../models/actions/CallActivityAction.py | 8 +- .../actions/CallActivityWithRetryAction.py | 8 +- .../models/actions/CallHttpAction.py | 4 +- .../models/actions/ContinueAsNewAction.py | 6 +- .../models/actions/CreateTimerAction.py | 11 +-- .../actions/WaitForExternalEventAction.py | 6 +- .../models/history/HistoryEvent.py | 4 +- .../models/utils/http_utils.py | 8 +- azure/durable_functions/orchestrator.py | 8 +- azure/durable_functions/tasks/call_http.py | 7 +- azure/durable_functions/tasks/create_timer.py | 2 +- azure/durable_functions/tasks/new_uuid.py | 5 +- azure/durable_functions/tasks/timer_task.py | 5 +- .../DurableTrigger/__init__.py | 3 +- samples/function_chaining/local.settings.json | 2 +- 26 files changed, 168 insertions(+), 156 deletions(-) diff --git a/azure/durable_functions/models/DurableHttpRequest.py b/azure/durable_functions/models/DurableHttpRequest.py index 9fffa601..eab80530 100644 --- a/azure/durable_functions/models/DurableHttpRequest.py +++ b/azure/durable_functions/models/DurableHttpRequest.py @@ -1,19 +1,20 @@ -from typing import Dict, Any +from typing import Dict, Any, Union, Optional -from azure.durable_functions.models import TokenSource +from azure.durable_functions.models.TokenSource import TokenSource from azure.durable_functions.models.utils.json_utils import add_attrib, add_json_attrib class DurableHttpRequest: """Data structure representing a durable HTTP request.""" - def __init__(self, method: str, uri: str, content: str = None, headers: Dict[str, str] = None, - token_source: TokenSource = None): + def __init__(self, method: str, uri: str, content: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + token_source: Optional[TokenSource] = None): self._method: str = method self._uri: str = uri - self._content: str = content - self._headers: Dict[str, str] = headers - self._token_source: TokenSource = token_source + self._content: Optional[str] = content + self._headers: Optional[Dict[str, str]] = headers + self._token_source: Optional[TokenSource] = token_source @property def method(self) -> str: @@ -26,29 +27,29 @@ def uri(self) -> str: return self._uri @property - def content(self) -> str: + def content(self) -> Optional[str]: """Get the HTTP request content.""" return self._content @property - def headers(self) -> Dict[str, str]: + def headers(self) -> Optional[Dict[str, str]]: """Get the HTTP request headers.""" return self._headers @property - def token_source(self) -> TokenSource: + def token_source(self) -> Optional[TokenSource]: """Get the source of OAuth token to add to the request.""" return self._token_source - def to_json(self) -> Dict[str, Any]: + def to_json(self) -> Dict[str, Union[str, int]]: """Convert object into a json dictionary. Returns ------- - Dict[str, Any] + Dict[str, Union[str, int]] The instance of the class converted into a json dictionary """ - json_dict = {} + json_dict: Dict[str, Union[str, int]] = {} add_attrib(json_dict, self, 'method') add_attrib(json_dict, self, 'uri') add_attrib(json_dict, self, 'content') diff --git a/azure/durable_functions/models/DurableOrchestrationBindings.py b/azure/durable_functions/models/DurableOrchestrationBindings.py index bb2ca7c1..00f613a5 100644 --- a/azure/durable_functions/models/DurableOrchestrationBindings.py +++ b/azure/durable_functions/models/DurableOrchestrationBindings.py @@ -1,5 +1,5 @@ import json -from typing import Dict +from typing import Dict, Optional from azure.durable_functions.models.FunctionContext import FunctionContext @@ -13,11 +13,11 @@ class DurableOrchestrationBindings: # parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions def __init__(self, taskHubName: str, creationUrls: Dict[str, str], - managementUrls: Dict[str, str], rpcBaseUrl: str = None, **kwargs): + managementUrls: Dict[str, str], rpcBaseUrl: Optional[str] = None, **kwargs): self._task_hub_name: str = taskHubName self._creation_urls: Dict[str, str] = creationUrls self._management_urls: Dict[str, str] = managementUrls - self._rpc_base_url: str = rpcBaseUrl + self._rpc_base_url: Optional[str] = rpcBaseUrl self._client_data = FunctionContext(**kwargs) @property @@ -36,7 +36,7 @@ def management_urls(self) -> Dict[str, str]: return self._management_urls @property - def rpc_base_url(self) -> str: + def rpc_base_url(self) -> Optional[str]: """Get the base url communication between out of proc workers and the function host.""" return self._rpc_base_url diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index c96f370d..8614e4bd 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -1,6 +1,6 @@ import json from datetime import datetime -from typing import List, Any, Awaitable +from typing import List, Any, Awaitable, Optional, Union, Dict from time import time from asyncio import sleep from urllib.parse import urlparse, quote @@ -11,7 +11,7 @@ from .DurableOrchestrationStatus import DurableOrchestrationStatus from .RpcManagementOptions import RpcManagementOptions from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus -from ..models import DurableOrchestrationBindings +from ..models.DurableOrchestrationBindings import DurableOrchestrationBindings from .utils.http_utils import get_async_request, post_async_request, delete_async_request from azure.functions._durable_functions import _serialize_custom_object @@ -44,8 +44,8 @@ def __init__(self, context: str): async def start_new(self, orchestration_function_name: str, - instance_id: str = None, - client_input: object = None) -> Awaitable[str]: + instance_id: Optional[str] = None, + client_input: object = None) -> str: """Start a new instance of the specified orchestrator function. If an orchestration instance with the specified ID already exists, the @@ -69,20 +69,22 @@ async def start_new(self, request_url = self._get_start_new_url( instance_id=instance_id, orchestration_function_name=orchestration_function_name) - response = await self._post_async_request(request_url, self._get_json_input(client_input)) + response: List[Any] = await self._post_async_request( + request_url, self._get_json_input(client_input)) - if response[0] <= 202 and response[1]: + status_code: int = response[0] + if status_code <= 202 and response[1]: return response[1]["id"] - elif response[0] == 400: + elif status_code == 400: # Orchestrator not found, report clean exception - exception_data = response[1] + exception_data: Dict[str, str] = response[1] exception_message = exception_data["ExceptionMessage"] raise Exception(exception_message) else: # Catch all: simply surfacing the durable-extension exception # we surface the stack trace too, since this may be a more involed exception - exception_message = response[1] - raise Exception(exception_message) + ex_message: Any = response[1] + raise Exception(ex_message) def create_check_status_response(self, request, instance_id): """Create a HttpResponse that contains useful information for \ @@ -250,7 +252,8 @@ async def get_status_all(self) -> List[DurableOrchestrationStatus]: if error_message: raise Exception(error_message) else: - return [DurableOrchestrationStatus.from_json(o) for o in response[1]] + statuses: List[Any] = response[1] + return [DurableOrchestrationStatus.from_json(o) for o in statuses] async def get_status_by(self, created_time_from: datetime = None, created_time_to: datetime = None, @@ -441,7 +444,7 @@ def _create_http_response(status_code: int, body: Any) -> func.HttpResponse: return func.HttpResponse(**response_args) @staticmethod - def _get_json_input(client_input: object) -> str: + def _get_json_input(client_input: object) -> Optional[str]: """Serialize the orchestrator input. Parameters @@ -451,8 +454,10 @@ def _get_json_input(client_input: object) -> str: Returns ------- - str - A string representing the JSON-serialization of `client_input` + Optional[str] + If `client_input` is not None, return a string representing + the JSON-serialization of `client_input`. Otherwise, returns + None Exceptions ---------- @@ -473,7 +478,7 @@ def _replace_url_origin(request_url, value_url): return value_url @staticmethod - def _parse_purge_instance_history_response(response: [int, Any]): + def _parse_purge_instance_history_response(response: List[Any]): switch_statement = { 200: lambda: PurgeHistoryResult.from_json(response[1]), # instance completed 404: lambda: PurgeHistoryResult(instancesDeleted=0), # instance not found diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index c975ba88..995eb141 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -1,8 +1,9 @@ import json import datetime -from typing import List, Any, Dict +from typing import List, Any, Dict, Optional -from . import (RetryOptions, TaskSet) +from .RetryOptions import RetryOptions +from .TaskSet import TaskSet from .FunctionContext import FunctionContext from .history import HistoryEvent, HistoryEventType from .actions import Action @@ -30,27 +31,10 @@ def __init__(self, self._parent_instance_id: str = parentInstanceId self._custom_status: Any = None self._new_uuid_counter: int = 0 - self.call_activity = lambda n, i=None: call_activity_task( - state=self.histories, - name=n, - input_=i) - self.call_activity_with_retry = \ - lambda n, o, i=None: call_activity_with_retry_task( - state=self.histories, - retry_options=o, - name=n, - input_=i) - self.call_http = lambda method, uri, content=None, headers=None, token_source=None: \ - call_http( - state=self.histories, method=method, uri=uri, content=content, headers=headers, - token_source=token_source) self.wait_for_external_event = lambda n: wait_for_external_event_task( state=self.histories, name=n) - self.new_uuid = lambda: new_uuid(context=self) self.continue_as_new = lambda i: continue_as_new(input_=i) - self.task_any = lambda t: task_any(tasks=t) - self.task_all = lambda t: task_all(tasks=t) self.create_timer = lambda d: create_timer_task(state=self.histories, fire_at=d) self.decision_started_event: HistoryEvent = \ [e_ for e_ in self.histories @@ -65,7 +49,7 @@ def __init__(self, # (consistent with Python Functions generic trigger/input bindings) if (isinstance(input, Dict)): input = json.dumps(input) - self._input: str = input + self._input: Any = input @classmethod def from_json(cls, json_string: str): @@ -86,14 +70,14 @@ def from_json(cls, json_string: str): json_dict = json.loads(json_string) return cls(**json_dict) - def call_activity(self, name: str, input_=None) -> Task: + def call_activity(self, name: str, input_: Optional[Any] = None) -> Task: """Schedule an activity for execution. Parameters ---------- name: str The name of the activity function to call. - input_: + input_: Optional[Any] The JSON-serializable input to pass to the activity function. Returns @@ -101,11 +85,14 @@ def call_activity(self, name: str, input_=None) -> Task: Task A Durable Task that completes when the called activity function completes or fails. """ - raise NotImplementedError("This is a placeholder.") + return call_activity_task( + state=self.histories, + name=name, + input_=input_) def call_activity_with_retry(self, name: str, retry_options: RetryOptions, - input_=None) -> Task: + input_: Optional[Any] = None) -> Task: """Schedule an activity for execution with retry options. Parameters @@ -114,7 +101,7 @@ def call_activity_with_retry(self, The name of the activity function to call. retry_options: RetryOptions The retry options for the activity function. - input_: + input_: Optional[Any] The JSON-serializable input to pass to the activity function. Returns @@ -123,10 +110,16 @@ def call_activity_with_retry(self, A Durable Task that completes when the called activity function completes or fails completely. """ + return call_activity_with_retry_task( + state=self.histories, + retry_options=retry_options, + name=name, + input_=input_) raise NotImplementedError("This is a placeholder.") - def call_http(self, method: str, uri: str, content: str = None, - headers: Dict[str, str] = None, token_source: TokenSource = None) -> Task: + def call_http(self, method: str, uri: str, content: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + token_source: TokenSource = None) -> Task: """Schedule a durable HTTP call to the specified endpoint. Parameters @@ -135,9 +128,9 @@ def call_http(self, method: str, uri: str, content: str = None, The HTTP request method. uri: str The HTTP request uri. - content: str + content: Optional[str] The HTTP request content. - headers: Dict[str, str] + headers: Optional[Dict[str, str]] The HTTP request headers. token_source: TokenSource The source of OAuth token to add to the request. @@ -147,27 +140,29 @@ def call_http(self, method: str, uri: str, content: str = None, Task The durable HTTP request to schedule. """ - raise NotImplementedError("This is a placeholder.") + return call_http( + state=self.histories, method=method, uri=uri, content=content, headers=headers, + token_source=token_source) def call_sub_orchestrator(self, - name: str, input_=None, - instance_id: str = None) -> Task: + name: str, input_: Optional[Any] = None, + instance_id: Optional[str] = None) -> Task: """Schedule an orchestration function named `name` for execution. Parameters ---------- name: str The name of the orchestrator function to call. - input_: + input_: Optional[Any] The JSON-serializable input to pass to the orchestrator function. - instance_id: str + instance_id: Optional[str] A unique ID to use for the sub-orchestration instance. If `instanceId` is not specified, the extension will generate an id in the format `:<#>` """ raise NotImplementedError("This is a placeholder.") - def get_input(self) -> str: + def get_input(self) -> Optional[Any]: """Get the orchestration input.""" return None if self._input is None else json.loads(self._input, object_hook=_deserialize_custom_object) @@ -185,7 +180,7 @@ def new_uuid(self) -> str: str New UUID that is safe for replay within an orchestration or operation. """ - raise NotImplementedError("This is a placeholder.") + return new_uuid(context=self) def task_all(self, activities: List[Task]) -> TaskSet: """Schedule the execution of all activities. @@ -205,7 +200,7 @@ def task_all(self, activities: List[Task]) -> TaskSet: TaskSet The results of all activities. """ - raise NotImplementedError("This is a placeholder.") + return task_all(tasks=activities) def task_any(self, activities: List[Task]) -> TaskSet: """Schedule the execution of all activities. @@ -225,7 +220,7 @@ def task_any(self, activities: List[Task]) -> TaskSet: TaskSet The first [[Task]] instance to complete. """ - raise NotImplementedError("This is a placeholder.") + return task_any(tasks=activities) def set_custom_status(self, status: Any): """Set the customized orchestration status for your orchestrator function. @@ -299,7 +294,7 @@ def parent_instance_id(self) -> str: return self._parent_instance_id @property - def current_utc_datetime(self) -> datetime: + def current_utc_datetime(self) -> datetime.datetime: """Get the current date/time. This date/time value is derived from the orchestration history. It @@ -314,7 +309,7 @@ def current_utc_datetime(self) -> datetime: return self._current_utc_datetime @current_utc_datetime.setter - def current_utc_datetime(self, value: datetime): + def current_utc_datetime(self, value: datetime.datetime): self._current_utc_datetime = value @property diff --git a/azure/durable_functions/models/DurableOrchestrationStatus.py b/azure/durable_functions/models/DurableOrchestrationStatus.py index 38924157..b4990c9a 100644 --- a/azure/durable_functions/models/DurableOrchestrationStatus.py +++ b/azure/durable_functions/models/DurableOrchestrationStatus.py @@ -1,6 +1,6 @@ from datetime import datetime from dateutil.parser import parse as dt_parse -from typing import Any, List, Dict +from typing import Any, List, Dict, Optional, Union from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus from .utils.json_utils import add_attrib, add_datetime_attrib @@ -13,20 +13,22 @@ class DurableOrchestrationStatus: """ # parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions - def __init__(self, name: str = None, instanceId: str = None, createdTime: str = None, - lastUpdatedTime: str = None, input: Any = None, output: Any = None, - runtimeStatus: str = None, customStatus: Any = None, history: List[Any] = None, + def __init__(self, name: Optional[str] = None, instanceId: Optional[str] = None, + createdTime: Optional[str] = None, lastUpdatedTime: Optional[str] = None, + input: Optional[Any] = None, output: Optional[Any] = None, + runtimeStatus: Optional[str] = None, customStatus: Optional[Any] = None, + history: Optional[List[Any]] = None, **kwargs): - self._name: str = name - self._instance_id: str = instanceId - self._created_time: datetime = dt_parse(createdTime) if createdTime is not None else None - self._last_updated_time: datetime = dt_parse(lastUpdatedTime) \ + self._name: Optional[str] = name + self._instance_id: Optional[str] = instanceId + self._created_time: Optional[datetime] = dt_parse(createdTime) if createdTime is not None else None + self._last_updated_time: Optional[datetime] = dt_parse(lastUpdatedTime) \ if lastUpdatedTime is not None else None self._input: Any = input self._output: Any = output - self._runtime_status: OrchestrationRuntimeStatus = runtimeStatus + self._runtime_status: Optional[str] = runtimeStatus self._custom_status: Any = customStatus - self._history: List[Any] = history + self._history: Optional[List[Any]] = history if kwargs is not None: for key, value in kwargs.items(): self.__setattr__(key, value) @@ -65,15 +67,15 @@ def from_json(cls, json_obj: Any): else: return cls(**json_obj) - def to_json(self) -> Dict[str, Any]: + def to_json(self) -> Dict[str, Union[int, str]]: """Convert object into a json dictionary. Returns ------- - Dict[str, Any] + Dict[str, Union[int, str]] The instance of the class converted into a json dictionary """ - json = {} + json: Dict[str, Union[int, str]] = {} add_attrib(json, self, 'name') add_attrib(json, self, 'instance_id', 'instanceId') add_datetime_attrib(json, self, 'created_time', 'createdTime') @@ -86,12 +88,12 @@ def to_json(self) -> Dict[str, Any]: return json @property - def name(self) -> str: + def name(self) -> Optional[str]: """Get the orchestrator function name.""" return self._name @property - def instance_id(self) -> str: + def instance_id(self) -> Optional[str]: """Get the unique ID of the instance. The instance ID is generated and fixed when the orchestrator @@ -102,7 +104,7 @@ def instance_id(self) -> str: return self._instance_id @property - def created_time(self) -> datetime: + def created_time(self) -> Optional[datetime]: """Get the time at which the orchestration instance was created. If the orchestration instance is in the [[Pending]] status, this @@ -112,7 +114,7 @@ def created_time(self) -> datetime: return self._created_time @property - def last_updated_time(self) -> datetime: + def last_updated_time(self) -> Optional[datetime]: """Get the time at which the orchestration instance last updated its execution history.""" return self._last_updated_time @@ -127,7 +129,7 @@ def output(self) -> Any: return self._output @property - def runtime_status(self) -> OrchestrationRuntimeStatus: + def runtime_status(self) -> Optional[str]: """Get the runtime status of the orchestration instance.""" return self._runtime_status @@ -140,7 +142,7 @@ def custom_status(self) -> Any: return self._custom_status @property - def history(self) -> List[Any]: + def history(self) -> Optional[List[Any]]: """Get the execution history of the orchestration instance. The history log can be large and is therefore `undefined` by diff --git a/azure/durable_functions/models/OrchestratorState.py b/azure/durable_functions/models/OrchestratorState.py index fa44655c..6ed6b505 100644 --- a/azure/durable_functions/models/OrchestratorState.py +++ b/azure/durable_functions/models/OrchestratorState.py @@ -1,5 +1,5 @@ import json -from typing import List, Any, Dict +from typing import List, Any, Dict, Optional, Union from .utils.json_utils import add_attrib from azure.durable_functions.models.actions.Action import Action @@ -21,7 +21,7 @@ def __init__(self, self._is_done: bool = is_done self._actions: List[List[Action]] = actions self._output: Any = output - self._error: str = error + self._error: Optional[str] = error self._custom_status: Any = custom_status @property @@ -54,7 +54,7 @@ def output(self): return self._output @property - def error(self) -> str: + def error(self) -> Optional[str]: """Get the error received when running the orchestration. Optional. @@ -66,7 +66,7 @@ def custom_status(self): """Get the JSON-serializable value used by DurableOrchestrationContext.SetCustomStatus.""" return self._custom_status - def to_json(self) -> Dict[str, Any]: + def to_json(self) -> Dict[str, Union[str, int]]: """Convert object into a json dictionary. Returns @@ -74,7 +74,7 @@ def to_json(self) -> Dict[str, Any]: Dict[str, Any] The instance of the class converted into a json dictionary """ - json_dict = {} + json_dict: Dict[str, Union[str, int]] = {} add_attrib(json_dict, self, '_is_done', 'isDone') self._add_actions(json_dict) if self._output: diff --git a/azure/durable_functions/models/PurgeHistoryResult.py b/azure/durable_functions/models/PurgeHistoryResult.py index 11e69416..4ccd48d0 100644 --- a/azure/durable_functions/models/PurgeHistoryResult.py +++ b/azure/durable_functions/models/PurgeHistoryResult.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Union, Dict, Any class PurgeHistoryResult: @@ -12,12 +12,12 @@ def __init__(self, instancesDeleted: int, **kwargs): self.__setattr__(key, value) @classmethod - def from_json(cls, json_obj: Any): + def from_json(cls, json_obj: Dict[Any, Any]): """Convert the value passed into a new instance of the class. Parameters ---------- - json_obj: any + json_obj: Dict[Any, Any] JSON object to be converted into an instance of the class Returns @@ -25,10 +25,7 @@ def from_json(cls, json_obj: Any): PurgeHistoryResult New instance of the durable orchestration status class """ - if isinstance(json_obj, str): - return cls(message=json_obj) - else: - return cls(**json_obj) + return cls(**json_obj) @property def instances_deleted(self): diff --git a/azure/durable_functions/models/RetryOptions.py b/azure/durable_functions/models/RetryOptions.py index 85d7f5a5..1770293c 100644 --- a/azure/durable_functions/models/RetryOptions.py +++ b/azure/durable_functions/models/RetryOptions.py @@ -1,4 +1,4 @@ -from typing import Any, Dict +from typing import Any, Dict, Union from .utils.json_utils import add_attrib @@ -46,7 +46,7 @@ def max_number_of_attempts(self) -> int: """ return self._max_number_of_attempts - def to_json(self) -> Dict[str, Any]: + def to_json(self) -> Dict[str, Union[str, int]]: """Convert object into a json dictionary. Returns @@ -54,7 +54,7 @@ def to_json(self) -> Dict[str, Any]: Dict[str, Any] The instance of the class converted into a json dictionary """ - json_dict = {} + json_dict: Dict[str, Union[str, int]] = {} add_attrib( json_dict, diff --git a/azure/durable_functions/models/RpcManagementOptions.py b/azure/durable_functions/models/RpcManagementOptions.py index a703a9d7..9fcc09e0 100644 --- a/azure/durable_functions/models/RpcManagementOptions.py +++ b/azure/durable_functions/models/RpcManagementOptions.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Any, List +from typing import Any, List, Optional from azure.durable_functions.constants import DATETIME_STRING_FORMAT from azure.durable_functions.models.OrchestrationRuntimeStatus import OrchestrationRuntimeStatus @@ -29,27 +29,36 @@ def _add_arg(query: List[str], name: str, value: Any): query.append(f'{name}={value}') @staticmethod - def _add_date_arg(query: List[str], name: str, value: datetime): + def _add_date_arg(query: List[str], name: str, value: Optional[datetime]): if value: date_as_string = value.strftime(DATETIME_STRING_FORMAT) RpcManagementOptions._add_arg(query, name, date_as_string) - def to_url(self, base_url: str) -> str: + def to_url(self, base_url: Optional[str]) -> str: """Get the url based on the options selected. Parameters ---------- base_url: str The base url to prepend to the url path + + Raises + ------ + ValueError + When the `base_url` argument is None Returns ------- str The Url used to get orchestration status information """ + if base_url is None: + # TODO: In JS we manage a "legacy app frontend code path. Here too?" + raise ValueError("orchestration bindings has not RPC base url") + url = f"{base_url}instances/{self._instance_id if self._instance_id else ''}" - query = [] + query: List[str] = [] self._add_arg(query, 'taskHub', self._task_hub_name) self._add_arg(query, 'connectionName', self._connection_name) diff --git a/azure/durable_functions/models/Task.py b/azure/durable_functions/models/Task.py index 7fa71410..67ee0466 100644 --- a/azure/durable_functions/models/Task.py +++ b/azure/durable_functions/models/Task.py @@ -1,6 +1,6 @@ from datetime import datetime -from .actions import Action +from .actions.Action import Action class Task: diff --git a/azure/durable_functions/models/TokenSource.py b/azure/durable_functions/models/TokenSource.py index bf5a322a..375b84bd 100644 --- a/azure/durable_functions/models/TokenSource.py +++ b/azure/durable_functions/models/TokenSource.py @@ -1,5 +1,5 @@ from abc import ABC -from typing import Dict, Any +from typing import Dict, Any, Union from azure.durable_functions.models.utils.json_utils import add_attrib @@ -41,7 +41,7 @@ def resource(self) -> str: """ return self._resource - def to_json(self) -> Dict[str, Any]: + def to_json(self) -> Dict[str, Union[str, int]]: """Convert object into a json dictionary. Returns @@ -49,6 +49,6 @@ def to_json(self) -> Dict[str, Any]: Dict[str, Any] The instance of the class converted into a json dictionary """ - json_dict = {} + json_dict: Dict[str, Union[str, int]] = {} add_attrib(json_dict, self, 'resource') return json_dict diff --git a/azure/durable_functions/models/actions/CallActivityAction.py b/azure/durable_functions/models/actions/CallActivityAction.py index a3e8d58f..37361832 100644 --- a/azure/durable_functions/models/actions/CallActivityAction.py +++ b/azure/durable_functions/models/actions/CallActivityAction.py @@ -1,4 +1,4 @@ -from typing import Any, Dict +from typing import Any, Dict, Union from .Action import Action from .ActionType import ActionType @@ -26,15 +26,15 @@ def action_type(self) -> int: """Get the type of action this class represents.""" return ActionType.CALL_ACTIVITY - def to_json(self) -> Dict[str, Any]: + def to_json(self) -> Dict[str, Union[str, int]]: """Convert object into a json dictionary. Returns ------- - Dict[str, Any] + Dict[str, Union[str, int]] The instance of the class converted into a json dictionary """ - json_dict = {} + json_dict: Dict[str, Union[str, int]] = {} add_attrib(json_dict, self, 'action_type', 'actionType') add_attrib(json_dict, self, 'function_name', 'functionName') add_attrib(json_dict, self, 'input_', 'input') diff --git a/azure/durable_functions/models/actions/CallActivityWithRetryAction.py b/azure/durable_functions/models/actions/CallActivityWithRetryAction.py index 7f769065..72fdba46 100644 --- a/azure/durable_functions/models/actions/CallActivityWithRetryAction.py +++ b/azure/durable_functions/models/actions/CallActivityWithRetryAction.py @@ -1,4 +1,4 @@ -from typing import Any, Dict +from typing import Any, Dict, Union from .Action import Action from .ActionType import ActionType @@ -26,15 +26,15 @@ def action_type(self) -> int: """Get the type of action this class represents.""" return ActionType.CALL_ACTIVITY_WITH_RETRY - def to_json(self) -> Dict[str, Any]: + def to_json(self) -> Dict[str, Union[str, int]]: """Convert object into a json dictionary. Returns ------- - Dict[str, Any] + Dict[str, Union[str, int]] The instance of the class converted into a json dictionary """ - json_dict = {} + json_dict: Dict[str, Union[str, int]] = {} add_attrib(json_dict, self, 'action_type', 'actionType') add_attrib(json_dict, self, 'function_name', 'functionName') diff --git a/azure/durable_functions/models/actions/CallHttpAction.py b/azure/durable_functions/models/actions/CallHttpAction.py index 283f24df..2d592bf2 100644 --- a/azure/durable_functions/models/actions/CallHttpAction.py +++ b/azure/durable_functions/models/actions/CallHttpAction.py @@ -2,7 +2,7 @@ from .Action import Action from .ActionType import ActionType -from .. import DurableHttpRequest +from ..DurableHttpRequest import DurableHttpRequest from ..utils.json_utils import add_attrib, add_json_attrib @@ -29,7 +29,7 @@ def to_json(self) -> Dict[str, Any]: Dict[str, Any] The instance of the class converted into a json dictionary """ - json_dict = {} + json_dict: Dict[str, Any] = {} add_attrib(json_dict, self, 'action_type', 'actionType') add_json_attrib(json_dict, self, 'http_request', 'httpRequest') return json_dict diff --git a/azure/durable_functions/models/actions/ContinueAsNewAction.py b/azure/durable_functions/models/actions/ContinueAsNewAction.py index 52d00d7b..eed20d52 100644 --- a/azure/durable_functions/models/actions/ContinueAsNewAction.py +++ b/azure/durable_functions/models/actions/ContinueAsNewAction.py @@ -1,4 +1,4 @@ -from typing import Any, Dict +from typing import Any, Dict, Union from .Action import Action from .ActionType import ActionType @@ -20,7 +20,7 @@ def action_type(self) -> int: """Get the type of action this class represents.""" return ActionType.CONTINUE_AS_NEW - def to_json(self) -> Dict[str, Any]: + def to_json(self) -> Dict[str, Union[int, str]]: """Convert object into a json dictionary. Returns @@ -28,7 +28,7 @@ def to_json(self) -> Dict[str, Any]: Dict[str, Any] The instance of the class converted into a json dictionary """ - json_dict = {} + json_dict: Dict[str, Union[int, str]] = {} add_attrib(json_dict, self, 'action_type', 'actionType') add_attrib(json_dict, self, 'input_', 'input') return json_dict diff --git a/azure/durable_functions/models/actions/CreateTimerAction.py b/azure/durable_functions/models/actions/CreateTimerAction.py index f2c918eb..d50baf8e 100644 --- a/azure/durable_functions/models/actions/CreateTimerAction.py +++ b/azure/durable_functions/models/actions/CreateTimerAction.py @@ -1,11 +1,12 @@ -from typing import Any, Dict +from typing import Any, Dict, Union from .ActionType import ActionType +from .Action import Action from ..utils.json_utils import add_attrib, add_datetime_attrib import datetime -class CreateTimerAction: +class CreateTimerAction(Action): """Defines the structure of the Create Timer object. Returns @@ -18,9 +19,9 @@ class CreateTimerAction: if the event fired is not of valid datetime object """ - def __init__(self, fire_at: datetime, is_cancelled: bool = False): + def __init__(self, fire_at: datetime.datetime, is_cancelled: bool = False): self.action_type: ActionType = ActionType.CREATE_TIMER - self.fire_at: datetime = fire_at + self.fire_at: datetime.datetime = fire_at self.is_cancelled: bool = is_cancelled if not isinstance(self.fire_at, datetime.date): @@ -35,7 +36,7 @@ def to_json(self) -> Dict[str, Any]: Dict[str, Any] The instance of the class converted into a json dictionary """ - json_dict = {} + json_dict: Dict[str, Union[int, str]] = {} add_attrib(json_dict, self, 'action_type', 'actionType') add_datetime_attrib(json_dict, self, 'fire_at', 'fireAt') add_attrib(json_dict, self, 'is_cancelled', 'isCanceled') diff --git a/azure/durable_functions/models/actions/WaitForExternalEventAction.py b/azure/durable_functions/models/actions/WaitForExternalEventAction.py index 561c953d..70d9c290 100644 --- a/azure/durable_functions/models/actions/WaitForExternalEventAction.py +++ b/azure/durable_functions/models/actions/WaitForExternalEventAction.py @@ -1,4 +1,4 @@ -from typing import Any, Dict +from typing import Any, Dict, Union from .Action import Action from .ActionType import ActionType @@ -36,10 +36,10 @@ def to_json(self) -> Dict[str, Any]: Returns ------- - Dict[str, Any] + Dict[str, Union[str, int]] The instance of the class converted into a json dictionary """ - json_dict = {} + json_dict: Dict[str, Union[str, int]] = {} add_attrib(json_dict, self, 'action_type', 'actionType') add_attrib(json_dict, self, 'external_event_name', 'externalEventName') diff --git a/azure/durable_functions/models/history/HistoryEvent.py b/azure/durable_functions/models/history/HistoryEvent.py index c0897d19..84023944 100644 --- a/azure/durable_functions/models/history/HistoryEvent.py +++ b/azure/durable_functions/models/history/HistoryEvent.py @@ -12,7 +12,7 @@ def __init__(self, EventType: HistoryEventType, EventId: int, IsPlayed: bool, Ti self._event_type: HistoryEventType = EventType self._event_id: int = EventId self._is_played: bool = IsPlayed - self._timestamp: datetime = dt_parse(Timestamp) + self._timestamp: datetime.datetime = dt_parse(Timestamp) self._is_processed: bool = False if kwargs is not None: for key, value in kwargs.items(): @@ -74,7 +74,7 @@ def is_processed(self, value: bool): self._is_processed = value @property - def timestamp(self) -> datetime: + def timestamp(self) -> datetime.datetime: """Get the timestamp property. Returns diff --git a/azure/durable_functions/models/utils/http_utils.py b/azure/durable_functions/models/utils/http_utils.py index ef8cb8ed..62d28f21 100644 --- a/azure/durable_functions/models/utils/http_utils.py +++ b/azure/durable_functions/models/utils/http_utils.py @@ -1,9 +1,9 @@ -from typing import Any +from typing import Any, List, Union import aiohttp -async def post_async_request(url: str, data: Any = None) -> [int, Any]: +async def post_async_request(url: str, data: Any = None) -> List[Union[int, Any]]: """Post request with the data provided to the url provided. Parameters @@ -29,7 +29,7 @@ async def post_async_request(url: str, data: Any = None) -> [int, Any]: return [response.status, data] -async def get_async_request(url: str) -> [int, Any]: +async def get_async_request(url: str) -> List[Any]: """Get the data from the url provided. Parameters @@ -50,7 +50,7 @@ async def get_async_request(url: str) -> [int, Any]: return [response.status, data] -async def delete_async_request(url: str) -> [int, Any]: +async def delete_async_request(url: str) -> List[Union[int, Any]]: """Delete the data from the url provided. Parameters diff --git a/azure/durable_functions/orchestrator.py b/azure/durable_functions/orchestrator.py index 74a6c20a..8361eea7 100644 --- a/azure/durable_functions/orchestrator.py +++ b/azure/durable_functions/orchestrator.py @@ -3,7 +3,7 @@ Responsible for orchestrating the execution of the user defined generator function. """ -from typing import Callable, Iterator, Any +from typing import Callable, Iterator, Any, Generator from .models import ( DurableOrchestrationContext, @@ -24,14 +24,14 @@ class Orchestrator: """ def __init__(self, - activity_func: Callable[[DurableOrchestrationContext], Iterator[Any]]): + activity_func: Callable[[DurableOrchestrationContext], Generator[Any, Any, Any]]): """Create a new orchestrator for the user defined generator. Responsible for orchestrating the execution of the user defined generator function. :param activity_func: Generator function to orchestrate. """ - self.fn: Callable[[DurableOrchestrationContext], Iterator[Any]] = activity_func + self.fn: Callable[[DurableOrchestrationContext], Generator[Any, Any, Any]] = activity_func def handle(self, context: DurableOrchestrationContext): """Handle the orchestration of the user defined generator function. @@ -132,7 +132,7 @@ def _reset_timestamp(self): self.durable_context.decision_started_event.timestamp @classmethod - def create(cls, fn: Callable[[DurableOrchestrationContext], Iterator[Any]]) \ + def create(cls, fn: Callable[[DurableOrchestrationContext], Generator[Any, Any, Any]]) \ -> Callable[[Any], str]: """Create an instance of the orchestration class. diff --git a/azure/durable_functions/tasks/call_http.py b/azure/durable_functions/tasks/call_http.py index 0fb51b4d..85ec228f 100644 --- a/azure/durable_functions/tasks/call_http.py +++ b/azure/durable_functions/tasks/call_http.py @@ -1,5 +1,5 @@ import json -from typing import Dict, List +from typing import Dict, List, Optional from .task_utilities import find_task_scheduled, find_task_completed, find_task_failed, \ set_processed, parse_history_event @@ -12,8 +12,8 @@ Task) -def call_http(state: List[HistoryEvent], method: str, uri: str, content: str = None, - headers: Dict[str, str] = None, token_source: TokenSource = None) -> Task: +def call_http(state: List[HistoryEvent], method: str, uri: str, content: Optional[str] = None, + headers: Dict[str, str] = None, token_source: Optional[TokenSource] = None) -> Task: """Get task used to schedule a durable HTTP call to the specified endpoint. Parameters @@ -37,6 +37,7 @@ def call_http(state: List[HistoryEvent], method: str, uri: str, content: str = N Task The durable HTTP request to schedule. """ + json_content: Optional[str] = None if content and content is not isinstance(content, str): json_content = json.dumps(content) else: diff --git a/azure/durable_functions/tasks/create_timer.py b/azure/durable_functions/tasks/create_timer.py index e775790b..00da0b61 100644 --- a/azure/durable_functions/tasks/create_timer.py +++ b/azure/durable_functions/tasks/create_timer.py @@ -7,7 +7,7 @@ def create_timer_task(state: List[HistoryEvent], - fire_at: datetime) -> TimerTask: + fire_at: datetime.datetime) -> TimerTask: """Durable Timers are used in orchestrator function to implement delays. Parameters diff --git a/azure/durable_functions/tasks/new_uuid.py b/azure/durable_functions/tasks/new_uuid.py index 0807876b..43140824 100644 --- a/azure/durable_functions/tasks/new_uuid.py +++ b/azure/durable_functions/tasks/new_uuid.py @@ -1,6 +1,5 @@ from uuid import uuid5, NAMESPACE_OID -from azure.durable_functions.models import DurableOrchestrationContext from azure.durable_functions.constants import DATETIME_STRING_FORMAT URL_NAMESPACE: str = "9e952958-5e33-4daf-827f-2fa12937b875" @@ -11,7 +10,7 @@ def _create_deterministic_uuid(namespace_value: str, name: str) -> str: return str(uuid5(namespace_uuid, name)) -def new_uuid(context: DurableOrchestrationContext) -> str: +def new_uuid(context) -> str: """Create a new UUID that is safe for replay within an orchestration or operation. The default implementation of this method creates a name-based UUID @@ -31,7 +30,7 @@ def new_uuid(context: DurableOrchestrationContext) -> str: New UUID that is safe for replay within an orchestration or operation. """ uuid_name_value = \ - f"{context.instance_id}" \ + f"{context._instance_id}" \ f"_{context.current_utc_datetime.strftime(DATETIME_STRING_FORMAT)}" \ f"_{context._new_uuid_counter}" context._new_uuid_counter += 1 diff --git a/azure/durable_functions/tasks/timer_task.py b/azure/durable_functions/tasks/timer_task.py index 0eaca181..fdd719fe 100644 --- a/azure/durable_functions/tasks/timer_task.py +++ b/azure/durable_functions/tasks/timer_task.py @@ -1,4 +1,5 @@ from ..models.Task import Task +from ..models.actions.CreateTimerAction import CreateTimerAction class TimerTask(Task): @@ -14,8 +15,8 @@ class TimerTask(Task): ``` """ - def __init__(self, action, is_completed, timestamp, id_): - self._action = action + def __init__(self, action: CreateTimerAction, is_completed, timestamp, id_): + self._action: CreateTimerAction = action self._is_completed = is_completed self._timestamp = timestamp self._id = id_ diff --git a/samples/function_chaining/DurableTrigger/__init__.py b/samples/function_chaining/DurableTrigger/__init__.py index e4eccba6..d3ceaed7 100755 --- a/samples/function_chaining/DurableTrigger/__init__.py +++ b/samples/function_chaining/DurableTrigger/__init__.py @@ -27,4 +27,5 @@ async def main(req: func.HttpRequest, starter: str, message): client = DurableOrchestrationClient(starter) instance_id = await client.start_new(function_name) response = client.create_check_status_response(req, instance_id) - message.set(response) + var = client.create_http_management_payload(instance_id) + message.set(str(var)) diff --git a/samples/function_chaining/local.settings.json b/samples/function_chaining/local.settings.json index 853a5a84..a2ded917 100644 --- a/samples/function_chaining/local.settings.json +++ b/samples/function_chaining/local.settings.json @@ -1,7 +1,7 @@ { "IsEncrypted": false, "Values": { - "AzureWebJobsStorage": "", + "AzureWebJobsStorage": "UseDevelopmentStorage=true", "FUNCTIONS_WORKER_RUNTIME": "python" } } From 27baacf5a9ec9f99e5882b134c619cbeb96f3226 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 16 Jul 2020 19:56:25 -0700 Subject: [PATCH 02/13] linting --- .../models/DurableHttpRequest.py | 2 +- .../models/DurableOrchestrationClient.py | 2 +- .../models/DurableOrchestrationContext.py | 15 +++++++-------- .../models/DurableOrchestrationStatus.py | 4 ++-- .../models/PurgeHistoryResult.py | 2 +- azure/durable_functions/models/RetryOptions.py | 2 +- .../models/RpcManagementOptions.py | 2 +- azure/durable_functions/models/TokenSource.py | 2 +- .../models/actions/CallActivityAction.py | 2 +- .../models/actions/CallActivityWithRetryAction.py | 2 +- .../models/actions/ContinueAsNewAction.py | 2 +- 11 files changed, 18 insertions(+), 19 deletions(-) diff --git a/azure/durable_functions/models/DurableHttpRequest.py b/azure/durable_functions/models/DurableHttpRequest.py index eab80530..c1c96c92 100644 --- a/azure/durable_functions/models/DurableHttpRequest.py +++ b/azure/durable_functions/models/DurableHttpRequest.py @@ -1,4 +1,4 @@ -from typing import Dict, Any, Union, Optional +from typing import Dict, Union, Optional from azure.durable_functions.models.TokenSource import TokenSource from azure.durable_functions.models.utils.json_utils import add_attrib, add_json_attrib diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 8614e4bd..2a9b6640 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -1,6 +1,6 @@ import json from datetime import datetime -from typing import List, Any, Awaitable, Optional, Union, Dict +from typing import List, Any, Optional, Dict from time import time from asyncio import sleep from urllib.parse import urlparse, quote diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 995eb141..ba80ac3f 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -111,14 +111,13 @@ def call_activity_with_retry(self, fails completely. """ return call_activity_with_retry_task( - state=self.histories, - retry_options=retry_options, - name=name, - input_=input_) - raise NotImplementedError("This is a placeholder.") + state=self.histories, + retry_options=retry_options, + name=name, + input_=input_) def call_http(self, method: str, uri: str, content: Optional[str] = None, - headers: Optional[Dict[str, str]] = None, + headers: Optional[Dict[str, str]] = None, token_source: TokenSource = None) -> Task: """Schedule a durable HTTP call to the specified endpoint. @@ -141,8 +140,8 @@ def call_http(self, method: str, uri: str, content: Optional[str] = None, The durable HTTP request to schedule. """ return call_http( - state=self.histories, method=method, uri=uri, content=content, headers=headers, - token_source=token_source) + state=self.histories, method=method, uri=uri, content=content, headers=headers, + token_source=token_source) def call_sub_orchestrator(self, name: str, input_: Optional[Any] = None, diff --git a/azure/durable_functions/models/DurableOrchestrationStatus.py b/azure/durable_functions/models/DurableOrchestrationStatus.py index b4990c9a..e60850f5 100644 --- a/azure/durable_functions/models/DurableOrchestrationStatus.py +++ b/azure/durable_functions/models/DurableOrchestrationStatus.py @@ -2,7 +2,6 @@ from dateutil.parser import parse as dt_parse from typing import Any, List, Dict, Optional, Union -from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus from .utils.json_utils import add_attrib, add_datetime_attrib @@ -21,7 +20,8 @@ def __init__(self, name: Optional[str] = None, instanceId: Optional[str] = None, **kwargs): self._name: Optional[str] = name self._instance_id: Optional[str] = instanceId - self._created_time: Optional[datetime] = dt_parse(createdTime) if createdTime is not None else None + self._created_time: Optional[datetime] = \ + dt_parse(createdTime) if createdTime is not None else None self._last_updated_time: Optional[datetime] = dt_parse(lastUpdatedTime) \ if lastUpdatedTime is not None else None self._input: Any = input diff --git a/azure/durable_functions/models/PurgeHistoryResult.py b/azure/durable_functions/models/PurgeHistoryResult.py index 4ccd48d0..b3d6587a 100644 --- a/azure/durable_functions/models/PurgeHistoryResult.py +++ b/azure/durable_functions/models/PurgeHistoryResult.py @@ -1,4 +1,4 @@ -from typing import Union, Dict, Any +from typing import Dict, Any class PurgeHistoryResult: diff --git a/azure/durable_functions/models/RetryOptions.py b/azure/durable_functions/models/RetryOptions.py index 1770293c..95734ed4 100644 --- a/azure/durable_functions/models/RetryOptions.py +++ b/azure/durable_functions/models/RetryOptions.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Union +from typing import Dict, Union from .utils.json_utils import add_attrib diff --git a/azure/durable_functions/models/RpcManagementOptions.py b/azure/durable_functions/models/RpcManagementOptions.py index 9fcc09e0..36158315 100644 --- a/azure/durable_functions/models/RpcManagementOptions.py +++ b/azure/durable_functions/models/RpcManagementOptions.py @@ -41,7 +41,7 @@ def to_url(self, base_url: Optional[str]) -> str: ---------- base_url: str The base url to prepend to the url path - + Raises ------ ValueError diff --git a/azure/durable_functions/models/TokenSource.py b/azure/durable_functions/models/TokenSource.py index 375b84bd..d6ced05f 100644 --- a/azure/durable_functions/models/TokenSource.py +++ b/azure/durable_functions/models/TokenSource.py @@ -1,5 +1,5 @@ from abc import ABC -from typing import Dict, Any, Union +from typing import Dict, Union from azure.durable_functions.models.utils.json_utils import add_attrib diff --git a/azure/durable_functions/models/actions/CallActivityAction.py b/azure/durable_functions/models/actions/CallActivityAction.py index 37361832..2e5c4ade 100644 --- a/azure/durable_functions/models/actions/CallActivityAction.py +++ b/azure/durable_functions/models/actions/CallActivityAction.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Union +from typing import Dict, Union from .Action import Action from .ActionType import ActionType diff --git a/azure/durable_functions/models/actions/CallActivityWithRetryAction.py b/azure/durable_functions/models/actions/CallActivityWithRetryAction.py index 72fdba46..7ec97580 100644 --- a/azure/durable_functions/models/actions/CallActivityWithRetryAction.py +++ b/azure/durable_functions/models/actions/CallActivityWithRetryAction.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Union +from typing import Dict, Union from .Action import Action from .ActionType import ActionType diff --git a/azure/durable_functions/models/actions/ContinueAsNewAction.py b/azure/durable_functions/models/actions/ContinueAsNewAction.py index eed20d52..76e49f6e 100644 --- a/azure/durable_functions/models/actions/ContinueAsNewAction.py +++ b/azure/durable_functions/models/actions/ContinueAsNewAction.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Union +from typing import Dict, Union from .Action import Action from .ActionType import ActionType From 5e188880c21d4401278a65fa41163f7aa37edea8 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 16 Jul 2020 20:55:37 -0700 Subject: [PATCH 03/13] undoing sample changes --- samples/function_chaining/DurableTrigger/__init__.py | 3 +-- samples/function_chaining/local.settings.json | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/samples/function_chaining/DurableTrigger/__init__.py b/samples/function_chaining/DurableTrigger/__init__.py index d3ceaed7..e4eccba6 100755 --- a/samples/function_chaining/DurableTrigger/__init__.py +++ b/samples/function_chaining/DurableTrigger/__init__.py @@ -27,5 +27,4 @@ async def main(req: func.HttpRequest, starter: str, message): client = DurableOrchestrationClient(starter) instance_id = await client.start_new(function_name) response = client.create_check_status_response(req, instance_id) - var = client.create_http_management_payload(instance_id) - message.set(str(var)) + message.set(response) diff --git a/samples/function_chaining/local.settings.json b/samples/function_chaining/local.settings.json index a2ded917..853a5a84 100644 --- a/samples/function_chaining/local.settings.json +++ b/samples/function_chaining/local.settings.json @@ -1,7 +1,7 @@ { "IsEncrypted": false, "Values": { - "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "AzureWebJobsStorage": "", "FUNCTIONS_WORKER_RUNTIME": "python" } } From f43a2329060da03c3eface1bd28ff8fbd397cd5f Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 16 Jul 2020 20:55:53 -0700 Subject: [PATCH 04/13] added mypy to build --- noxfile.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/noxfile.py b/noxfile.py index a6c9770b..25632006 100644 --- a/noxfile.py +++ b/noxfile.py @@ -1,6 +1,6 @@ import nox -@nox.session(python="3.7") +@nox.session(python=["3.7","3.8"]) def tests(session): # same as pip install -r -requirements.txt session.install("-r", "requirements.txt") @@ -8,8 +8,13 @@ def tests(session): session.run("pytest", "-v", "tests") -@nox.session(python="3.7") +@nox.session(python=["3.7", "3.8"]) def lint(session): session.install("flake8") session.install("flake8-docstrings") session.run("flake8", "./azure/") + +@nox.session(python=["3.7", "3.8"]) +def typecheck(session): + session.install("mypy") + session.run("mypy", "./azure/") \ No newline at end of file From e17c03c52c04db11c3ac0a3d4477eba4b7365962 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 16 Jul 2020 21:44:01 -0700 Subject: [PATCH 05/13] added type alias for being SerializableToJSON --- .../models/DurableOrchestrationClient.py | 76 +++++++++++-------- .../models/utils/type_aliases.py | 4 + 2 files changed, 50 insertions(+), 30 deletions(-) create mode 100644 azure/durable_functions/models/utils/type_aliases.py diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 2a9b6640..b3ab85e4 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -1,6 +1,6 @@ import json from datetime import datetime -from typing import List, Any, Optional, Dict +from typing import List, Any, Optional, Dict, Union from time import time from asyncio import sleep from urllib.parse import urlparse, quote @@ -13,6 +13,7 @@ from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus from ..models.DurableOrchestrationBindings import DurableOrchestrationBindings from .utils.http_utils import get_async_request, post_async_request, delete_async_request +from .utils.type_aliases import SerializableToJSON from azure.functions._durable_functions import _serialize_custom_object @@ -45,7 +46,7 @@ def __init__(self, context: str): async def start_new(self, orchestration_function_name: str, instance_id: Optional[str] = None, - client_input: object = None) -> str: + client_input: Optional[SerializableToJSON] = None) -> str: """Start a new instance of the specified orchestrator function. If an orchestration instance with the specified ID already exists, the @@ -55,10 +56,10 @@ async def start_new(self, ---------- orchestration_function_name : str The name of the orchestrator function to start. - instance_id : str + instance_id : Optional[str] The ID to use for the new orchestration instance. If no instance id is specified, the Durable Functions extension will generate a random GUID (recommended). - client_input : object + client_input : Optional[Any] JSON-serializable input value for the orchestrator function. Returns @@ -69,7 +70,7 @@ async def start_new(self, request_url = self._get_start_new_url( instance_id=instance_id, orchestration_function_name=orchestration_function_name) - response: List[Any] = await self._post_async_request( + response: List[SerializableToJSON] = await self._post_async_request( request_url, self._get_json_input(client_input)) status_code: int = response[0] @@ -83,10 +84,10 @@ async def start_new(self, else: # Catch all: simply surfacing the durable-extension exception # we surface the stack trace too, since this may be a more involed exception - ex_message: Any = response[1] + ex_message: SerializableToJSON = response[1] raise Exception(ex_message) - def create_check_status_response(self, request, instance_id): + def create_check_status_response(self, request: func.HttpRequest, instance_id: str) -> func.HttpResponse: """Create a HttpResponse that contains useful information for \ checking the status of the specified instance. @@ -115,7 +116,8 @@ def create_check_status_response(self, request, instance_id): } return func.HttpResponse(**response_args) - def get_client_response_links(self, request, instance_id): + def get_client_response_links(self, + request: func.HttpRequest, instance_id: str) -> Dict[str, str]: """Create a dictionary of orchestrator management urls. Parameters @@ -127,7 +129,7 @@ def get_client_response_links(self, request, instance_id): Returns ------- - dict + Dict[str, str] a dictionary object of orchestrator instance management urls """ payload = self._orchestration_bindings.management_urls.copy() @@ -140,8 +142,9 @@ def get_client_response_links(self, request, instance_id): return payload - async def raise_event(self, instance_id, event_name, event_data=None, - task_hub_name=None, connection_name=None): + async def raise_event(self, instance_id: str, + event_name: str, event_data: SerializableToJSON = None, + task_hub_name: str = None, connection_name: str = None) -> None: """Send an event notification message to a waiting orchestration instance. In order to handle the event, the target orchestration instance must be @@ -153,7 +156,7 @@ async def raise_event(self, instance_id, event_name, event_data=None, The ID of the orchestration instance that will handle the event. event_name : str The name of the event. - event_data : any, optional + event_data : Any, optional The JSON-serializable data associated with the event. task_hub_name : str, optional The TaskHubName of the orchestration that will handle the event. @@ -167,7 +170,7 @@ async def raise_event(self, instance_id, event_name, event_data=None, Exception Raises an exception if the status code is 404 or 400 when raising the event. """ - if not event_name: + if event_name == "": raise ValueError("event_name must be a valid string.") request_url = self._get_raise_event_url( @@ -187,9 +190,9 @@ async def raise_event(self, instance_id, event_name, event_data=None, if error_message: raise Exception(error_message) - async def get_status(self, instance_id: str, show_history: bool = None, - show_history_output: bool = None, - show_input: bool = None) -> DurableOrchestrationStatus: + async def get_status(self, instance_id: str, show_history: bool = False, + show_history_output: bool = False, + show_input: bool = False) -> DurableOrchestrationStatus: """Get the status of the specified orchestration instance. Parameters @@ -252,7 +255,7 @@ async def get_status_all(self) -> List[DurableOrchestrationStatus]: if error_message: raise Exception(error_message) else: - statuses: List[Any] = response[1] + statuses: List[SerializableToJSON] = response[1] return [DurableOrchestrationStatus.from_json(o) for o in statuses] async def get_status_by(self, created_time_from: datetime = None, @@ -276,6 +279,7 @@ async def get_status_by(self, created_time_from: datetime = None, DurableOrchestrationStatus The status of the requested orchestration instances """ + # TODO: do we really want folks to us this without specifying all the args? options = RpcManagementOptions(created_time_from=created_time_from, created_time_to=created_time_to, runtime_status=runtime_status) @@ -311,19 +315,20 @@ async def purge_instance_history(self, instance_id: str) -> PurgeHistoryResult: response = await self._delete_async_request(request_url) return self._parse_purge_instance_history_response(response) - async def purge_instance_history_by(self, created_time_from: datetime = None, - created_time_to: datetime = None, - runtime_status: List[OrchestrationRuntimeStatus] = None) \ + async def purge_instance_history_by( + self, created_time_from: Optional[datetime] = None, + created_time_to: Optional[datetime] = None, + runtime_status: Optional[List[OrchestrationRuntimeStatus]] = None) \ -> PurgeHistoryResult: """Delete the history of all orchestration instances that match the specified conditions. Parameters ---------- - created_time_from : datetime + created_time_from : Optional[datetime] Delete orchestration history which were created after this Date. - created_time_to: datetime + created_time_to: Optional[datetime] Delete orchestration history which were created before this Date. - runtime_status: List[OrchestrationRuntimeStatus] + runtime_status: Optional[List[OrchestrationRuntimeStatus]] Delete orchestration instances which match any of the runtimeStatus values in this list. @@ -332,6 +337,7 @@ async def purge_instance_history_by(self, created_time_from: datetime = None, PurgeHistoryResult The results of the request to purge history """ + # TODO: do we really want folks to us this without specifying all the args? options = RpcManagementOptions(created_time_from=created_time_from, created_time_to=created_time_to, runtime_status=runtime_status) @@ -339,7 +345,7 @@ async def purge_instance_history_by(self, created_time_from: datetime = None, response = await self._delete_async_request(request_url) return self._parse_purge_instance_history_response(response) - async def terminate(self, instance_id: str, reason: str): + async def terminate(self, instance_id: str, reason: str) -> None: """Terminate the specified orchestration instance. Parameters @@ -348,6 +354,11 @@ async def terminate(self, instance_id: str, reason: str): The ID of the orchestration instance to query. reason: str The reason for terminating the instance. + + Raises + ------ + Exception: + When the terminate call failed with an unexpected status code Returns ------- @@ -431,7 +442,8 @@ async def wait_for_completion_or_create_check_status_response( return self.create_check_status_response(request, instance_id) @staticmethod - def _create_http_response(status_code: int, body: Any) -> func.HttpResponse: + def _create_http_response( + status_code: int, body: Union[str, SerializableToJSON]) -> func.HttpResponse: body_as_json = body if isinstance(body, str) else json.dumps(body) response_args = { "status_code": status_code, @@ -469,7 +481,7 @@ def _get_json_input(client_input: object) -> Optional[str]: return None @staticmethod - def _replace_url_origin(request_url, value_url): + def _replace_url_origin(request_url: str, value_url: str) -> str: request_parsed_url = urlparse(request_url) value_parsed_url = urlparse(value_url) request_url_origin = '{url.scheme}://{url.netloc}/'.format(url=request_parsed_url) @@ -478,7 +490,8 @@ def _replace_url_origin(request_url, value_url): return value_url @staticmethod - def _parse_purge_instance_history_response(response: List[Any]): + def _parse_purge_instance_history_response( + response: List[Any]) -> PurgeHistoryResult: switch_statement = { 200: lambda: PurgeHistoryResult.from_json(response[1]), # instance completed 404: lambda: PurgeHistoryResult(instancesDeleted=0), # instance not found @@ -493,17 +506,20 @@ def _parse_purge_instance_history_response(response: List[Any]): else: raise Exception(result) - def _get_start_new_url(self, instance_id, orchestration_function_name): + def _get_start_new_url(self, + instance_id: Optional[str], orchestration_function_name: str) -> str: instance_path = f'/{instance_id}' if instance_id is not None else '' request_url = f'{self._orchestration_bindings.rpc_base_url}orchestrators/' \ f'{orchestration_function_name}{instance_path}' return request_url - def _get_raise_event_url(self, instance_id, event_name, task_hub_name, connection_name): + def _get_raise_event_url(self, + instance_id: str, event_name: str, + task_hub_name: Optional[str], connection_name: Optional[str]) -> str: request_url = f'{self._orchestration_bindings.rpc_base_url}' \ f'instances/{instance_id}/raiseEvent/{event_name}' - query = [] + query: List[str] = [] if task_hub_name: query.append(f'taskHub={task_hub_name}') diff --git a/azure/durable_functions/models/utils/type_aliases.py b/azure/durable_functions/models/utils/type_aliases.py new file mode 100644 index 00000000..031c2e59 --- /dev/null +++ b/azure/durable_functions/models/utils/type_aliases.py @@ -0,0 +1,4 @@ +from typing import Any + +# TODO: Use Python's NewType functionality to further constraint types +SerializableToJSON = Any From 17c99eeafeb3723f728125e65a0bea2a3588ca2d Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 16 Jul 2020 21:51:53 -0700 Subject: [PATCH 06/13] lint --- .../models/DurableOrchestrationClient.py | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index b3ab85e4..15f7f1cf 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -87,7 +87,8 @@ async def start_new(self, ex_message: SerializableToJSON = response[1] raise Exception(ex_message) - def create_check_status_response(self, request: func.HttpRequest, instance_id: str) -> func.HttpResponse: + def create_check_status_response( + self, request: func.HttpRequest, instance_id: str) -> func.HttpResponse: """Create a HttpResponse that contains useful information for \ checking the status of the specified instance. @@ -116,8 +117,8 @@ def create_check_status_response(self, request: func.HttpRequest, instance_id: s } return func.HttpResponse(**response_args) - def get_client_response_links(self, - request: func.HttpRequest, instance_id: str) -> Dict[str, str]: + def get_client_response_links( + self, request: func.HttpRequest, instance_id: str) -> Dict[str, str]: """Create a dictionary of orchestrator management urls. Parameters @@ -142,8 +143,8 @@ def get_client_response_links(self, return payload - async def raise_event(self, instance_id: str, - event_name: str, event_data: SerializableToJSON = None, + async def raise_event( + self, instance_id: str, event_name: str, event_data: SerializableToJSON = None, task_hub_name: str = None, connection_name: str = None) -> None: """Send an event notification message to a waiting orchestration instance. @@ -354,7 +355,7 @@ async def terminate(self, instance_id: str, reason: str) -> None: The ID of the orchestration instance to query. reason: str The reason for terminating the instance. - + Raises ------ Exception: @@ -506,15 +507,15 @@ def _parse_purge_instance_history_response( else: raise Exception(result) - def _get_start_new_url(self, - instance_id: Optional[str], orchestration_function_name: str) -> str: + def _get_start_new_url( + self, instance_id: Optional[str], orchestration_function_name: str) -> str: instance_path = f'/{instance_id}' if instance_id is not None else '' request_url = f'{self._orchestration_bindings.rpc_base_url}orchestrators/' \ f'{orchestration_function_name}{instance_path}' return request_url - def _get_raise_event_url(self, - instance_id: str, event_name: str, + def _get_raise_event_url( + self, instance_id: str, event_name: str, task_hub_name: Optional[str], connection_name: Optional[str]) -> str: request_url = f'{self._orchestration_bindings.rpc_base_url}' \ f'instances/{instance_id}/raiseEvent/{event_name}' From a0686e39baa5e10ca9023093cbdb4ab54196027f Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 28 Jul 2020 18:46:54 -0700 Subject: [PATCH 07/13] after merge, made everything typecheck --- .../models/DurableOrchestrationClient.py | 3 +- .../models/DurableOrchestrationContext.py | 98 ++++++++++++++----- .../actions/CallSubOrchestratorAction.py | 5 +- .../CallSubOrchestratorWithRetryAction.py | 4 +- .../models/actions/CreateTimerAction.py | 7 +- .../tasks/call_suborchestrator.py | 2 +- .../tasks/call_suborchestrator_with_retry.py | 2 +- .../durable_functions/tasks/task_utilities.py | 38 +++---- 8 files changed, 109 insertions(+), 50 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 2363e92c..de170b09 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -152,8 +152,7 @@ def get_client_response_links( payload = self._orchestration_bindings.management_urls.copy() for key, _ in payload.items(): - request_is_not_none = not (request is None) - if request_is_not_none and request.url: + if not(request is None) and request.url: payload[key] = self._replace_url_origin(request.url, payload[key]) payload[key] = payload[key].replace( self._orchestration_bindings.management_urls["id"], instance_id) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 7fd35915..e93aecdb 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -33,26 +33,8 @@ def __init__(self, self._custom_status: Any = None self._new_uuid_counter: int = 0 self._sub_orchestrator_counter: int = 0 - self.call_sub_orchestrator = \ - lambda n, i=None, _id=None: call_sub_orchestrator_task( - context=self, - state=self.histories, - name=n, - input_=i, - instance_id=_id) - self.call_sub_orchestrator_with_retry = \ - lambda n, o, i=None, _id=None: call_sub_orchestrator_with_retry_task( - context=self, - state=self.histories, - retry_options=o, - name=n, - input_=i, - instance_id=_id) - self.wait_for_external_event = lambda n: wait_for_external_event_task( - state=self.histories, - name=n) + # TODO: waiting on the `continue_as_new` intellisense until that's implemented self.continue_as_new = lambda i: continue_as_new(input_=i) - self.create_timer = lambda d: create_timer_task(state=self.histories, fire_at=d) self.decision_started_event: HistoryEvent = \ [e_ for e_ in self.histories if e_.event_type == HistoryEventType.ORCHESTRATOR_STARTED][0] @@ -163,7 +145,7 @@ def call_http(self, method: str, uri: str, content: Optional[str] = None, def call_sub_orchestrator(self, name: str, input_: Optional[Any] = None, instance_id: Optional[str] = None) -> Task: - """Schedule an orchestration function named `name` for execution. + """Schedule sub-orchestration function named `name` for execution. Parameters ---------- @@ -172,11 +154,49 @@ def call_sub_orchestrator(self, input_: Optional[Any] The JSON-serializable input to pass to the orchestrator function. instance_id: Optional[str] - A unique ID to use for the sub-orchestration instance. If `instanceId` is not - specified, the extension will generate an id in the format `:<#>` + A unique ID to use for the sub-orchestration instance. + + Returns + ------- + Task + A Durable Task that completes when the called sub-orchestrator completes or fails. """ - raise NotImplementedError("This is a placeholder.") + return call_sub_orchestrator_task( + context=self, + state=self.histories, + name=name, + input_=input_, + instance_id=instance_id) + + def call_sub_orchestrator_with_retry(self, + name: str, retry_options: RetryOptions, + input_: Optional[Any] = None, + instance_id: Optional[str] = None) -> Task: + """Schedule sub-orchestration function named `name` for execution, with retry-options. + + Parameters + ---------- + name: str + The name of the activity function to schedule. + retry_options: RetryOptions + The settings for retrying this sub-orchestrator in case of a failure. + input_: Optional[Any] + The JSON-serializable input to pass to the activity function. Defaults to None. + instance_id: str + The instance ID of the sub-orchestrator to call. + + Returns + ------- + Task + A Durable Task that completes when the called sub-orchestrator completes or fails. + """ + return call_sub_orchestrator_with_retry_task( + context=self, + state=self.histories, + retry_options=retry_options, + name=name, + input_=input_, + instance_id=instance_id) def get_input(self) -> Optional[Any]: """Get the orchestration input.""" @@ -338,3 +358,33 @@ def function_context(self) -> FunctionContext: Object containing function level attributes not used by durable orchestrator. """ return self._function_context + + def create_timer(self, fire_at: datetime.datetime) -> Task: + """Create a Durable Timer Task to implement a deadline at which to wake-up the orchestrator. + + Parameters + ---------- + fire_at : datetime.datetime + The time for the timer to trigger + + Returns + ------- + TimerTask + A Durable Timer Task that schedules the timer to wake up the activity + """ + return create_timer_task(state=self.histories, fire_at=fire_at) + + def wait_for_external_event(self, name: str) -> Task: + """Wait asynchronously for an event to be raised with the name `name`. + + Parameters + ---------- + name : str + The event name of the event that the task is waiting for. + + Returns + ------- + Task + Task to wait for the event + """ + return wait_for_external_event_task(state=self.histories, name=name) diff --git a/azure/durable_functions/models/actions/CallSubOrchestratorAction.py b/azure/durable_functions/models/actions/CallSubOrchestratorAction.py index 5a733716..16b4c853 100644 --- a/azure/durable_functions/models/actions/CallSubOrchestratorAction.py +++ b/azure/durable_functions/models/actions/CallSubOrchestratorAction.py @@ -10,10 +10,11 @@ class CallSubOrchestratorAction(Action): """Defines the structure of the Call SubOrchestrator object.""" - def __init__(self, function_name: str, _input: Optional[Any] = None, instance_id: str = ""): + def __init__(self, function_name: str, _input: Optional[Any] = None, + instance_id: Optional[str] = None): self.function_name: str = function_name self._input: str = dumps(_input, default=_serialize_custom_object) - self.instance_id: str = instance_id + self.instance_id: Optional[str] = instance_id if not self.function_name: raise ValueError("function_name cannot be empty") diff --git a/azure/durable_functions/models/actions/CallSubOrchestratorWithRetryAction.py b/azure/durable_functions/models/actions/CallSubOrchestratorWithRetryAction.py index dc77b454..07b0ba55 100644 --- a/azure/durable_functions/models/actions/CallSubOrchestratorWithRetryAction.py +++ b/azure/durable_functions/models/actions/CallSubOrchestratorWithRetryAction.py @@ -13,11 +13,11 @@ class CallSubOrchestratorWithRetryAction(Action): def __init__(self, function_name: str, retry_options: RetryOptions, _input: Optional[Any] = None, - instance_id: str = ""): + instance_id: Optional[str] = None): self.function_name: str = function_name self._input: str = dumps(_input, default=_serialize_custom_object) self.retry_options: RetryOptions = retry_options - self.instance_id: str = instance_id + self.instance_id: Optional[str] = instance_id if not self.function_name: raise ValueError("function_name cannot be empty") diff --git a/azure/durable_functions/models/actions/CreateTimerAction.py b/azure/durable_functions/models/actions/CreateTimerAction.py index d50baf8e..40251f7e 100644 --- a/azure/durable_functions/models/actions/CreateTimerAction.py +++ b/azure/durable_functions/models/actions/CreateTimerAction.py @@ -20,7 +20,7 @@ class CreateTimerAction(Action): """ def __init__(self, fire_at: datetime.datetime, is_cancelled: bool = False): - self.action_type: ActionType = ActionType.CREATE_TIMER + self._action_type: ActionType = ActionType.CREATE_TIMER self.fire_at: datetime.datetime = fire_at self.is_cancelled: bool = is_cancelled @@ -41,3 +41,8 @@ def to_json(self) -> Dict[str, Any]: add_datetime_attrib(json_dict, self, 'fire_at', 'fireAt') add_attrib(json_dict, self, 'is_cancelled', 'isCanceled') return json_dict + + @property + def action_type(self) -> int: + """Get the type of action this class represents.""" + return ActionType.CREATE_TIMER diff --git a/azure/durable_functions/tasks/call_suborchestrator.py b/azure/durable_functions/tasks/call_suborchestrator.py index f17ee28e..851ea25c 100644 --- a/azure/durable_functions/tasks/call_suborchestrator.py +++ b/azure/durable_functions/tasks/call_suborchestrator.py @@ -14,7 +14,7 @@ def call_sub_orchestrator_task( state: List[HistoryEvent], name: str, input_: Optional[Any] = None, - instance_id: str = "") -> Task: + instance_id: Optional[str] = None) -> Task: """Determine the state of Scheduling a sub-orchestrator for execution. Parameters diff --git a/azure/durable_functions/tasks/call_suborchestrator_with_retry.py b/azure/durable_functions/tasks/call_suborchestrator_with_retry.py index e1f2766b..3be2fa65 100644 --- a/azure/durable_functions/tasks/call_suborchestrator_with_retry.py +++ b/azure/durable_functions/tasks/call_suborchestrator_with_retry.py @@ -16,7 +16,7 @@ def call_sub_orchestrator_with_retry_task( retry_options: RetryOptions, name: str, input_: Optional[Any] = None, - instance_id: str = "") -> Task: + instance_id: Optional[str] = None) -> Task: """Determine the state of Scheduling a sub-orchestrator for execution, with retry options. Parameters diff --git a/azure/durable_functions/tasks/task_utilities.py b/azure/durable_functions/tasks/task_utilities.py index 167a31d5..99a8d4b5 100644 --- a/azure/durable_functions/tasks/task_utilities.py +++ b/azure/durable_functions/tasks/task_utilities.py @@ -199,7 +199,7 @@ def set_processed(tasks): def find_sub_orchestration( - state: List[HistoryEventType], + state: List[HistoryEvent], event_type: HistoryEventType, name: Optional[str] = None, context=None, @@ -209,11 +209,11 @@ def find_sub_orchestration( Parameters ---------- - state: List[HistoryEventType] + state: List[HistoryEvent] The history of Durable events event_type: HistoryEventType The type of Durable event to look for. - name: str: + name: Optional[str]: Name of the sub-orchestrator. context: Optional['DurableOrchestrationContext'] A reference to the orchestration context @@ -246,30 +246,35 @@ def gen_err_message(counter: int, mid_message: str, found: str, expected: str) - context._sub_orchestrator_counter += 1 counter: int = context._sub_orchestrator_counter + if name is None: + err = "Tried to lookup suborchestration in history but had not name to reference it." + raise ValueError(err) + # TODO: The HistoryEvent does not necessarily have an name or an instance_id # We should create sub-classes of these types like JS does + err_message: str = "" if not(event.Name == name): mid_message = "a function name of {} instead of the provided function name of {}." - err_message: str = gen_err_message(counter, mid_message, event.name, name) + err_message = gen_err_message(counter, mid_message, event.Name, name) raise ValueError(err_message) if instance_id and not(event.InstanceId == instance_id): mid_message = "an instance id of {} instead of the provided instance id of {}." - err_message: str = gen_err_message(counter, mid_message, event.name, name) + err_message = gen_err_message(counter, mid_message, event.Name, name) raise ValueError(err_message) return event def find_sub_orchestration_created( - state: List[HistoryEventType], + state: List[HistoryEvent], name: str, context=None, - instance_id: Optional[str] = None) -> Optional[HistoryEventType]: + instance_id: Optional[str] = None) -> Optional[HistoryEvent]: """Look-up matching sub-orchestrator created event in the state array. Parameters ---------- - state: List[HistoryEventType]: + state: List[HistoryEvent]: The history of Durable events name: str: Name of the sub-orchestrator. @@ -285,7 +290,7 @@ def find_sub_orchestration_created( Returns ------- - Optional[HistoryEventType]: + Optional[HistoryEvent]: The matching sub-orchestration creation event. Else, None. """ event_type = HistoryEventType.SUB_ORCHESTRATION_INSTANCE_CREATED @@ -298,20 +303,20 @@ def find_sub_orchestration_created( def find_sub_orchestration_completed( - state: List[HistoryEventType], - scheduled_task: Optional[HistoryEventType]) -> Optional[HistoryEventType]: + state: List[HistoryEvent], + scheduled_task: Optional[HistoryEvent]) -> Optional[HistoryEvent]: """Look-up the sub-orchestration completed event. Parameters ---------- - state: List[HistoryEventType]: + state: List[HistoryEvent]: The history of Durable events - scheduled_task: Optional[HistoryEventType]: + scheduled_task: Optional[HistoryEvent]: The sub-orchestration creation event, if found. Returns ------- - Optional[HistoryEventType]: + Optional[HistoryEvent]: The matching sub-orchestration completed event, if found. Else, None. """ event_type = HistoryEventType.SUB_ORCHESTRATION_INSTANCE_COMPLETED @@ -395,8 +400,7 @@ def should_preserve(event: HistoryEvent) -> bool: event: Optional[HistoryEvent] = None # Preverse only the elements of the state array that correspond with the looked-up event - matches = filter(should_preserve, state) - matches = list(matches) + matches = list(filter(should_preserve, state)) if len(matches) >= 1: # TODO: in many instances, `matches` will be greater than 1 in length. We take the @@ -404,5 +408,5 @@ def should_preserve(event: HistoryEvent) -> bool: # we assume corresponds to the one we are looking for. This may be brittle but # is true about other areas of the code as well such as with `call_activity`. # We should try to refactor this logic at some point - event: HistoryEventType = matches[0] + event = matches[0] return event From 31c981cdf1054d11b0983d33332ba442f395c318 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 30 Jul 2020 16:20:54 -0700 Subject: [PATCH 08/13] PR feedback --- .../models/DurableOrchestrationBindings.py | 1 + .../durable_functions/models/DurableOrchestrationClient.py | 2 +- azure/durable_functions/models/RpcManagementOptions.py | 3 +-- azure/durable_functions/tasks/new_uuid.py | 6 +++++- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationBindings.py b/azure/durable_functions/models/DurableOrchestrationBindings.py index 00f613a5..9deb597d 100644 --- a/azure/durable_functions/models/DurableOrchestrationBindings.py +++ b/azure/durable_functions/models/DurableOrchestrationBindings.py @@ -17,6 +17,7 @@ def __init__(self, taskHubName: str, creationUrls: Dict[str, str], self._task_hub_name: str = taskHubName self._creation_urls: Dict[str, str] = creationUrls self._management_urls: Dict[str, str] = managementUrls + # TODO: we can remove this once we drop support for 1.x, this is always provided in 2.x self._rpc_base_url: Optional[str] = rpcBaseUrl self._client_data = FunctionContext(**kwargs) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index de170b09..65c599ee 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -188,7 +188,7 @@ async def raise_event( Raises an exception if the status code is 404 or 400 when raising the event. """ if event_name == "": - raise ValueError("event_name must be a valid string.") + raise ValueError("event_name must be a non-empty string.") request_url = self._get_raise_event_url( instance_id, event_name, task_hub_name, connection_name) diff --git a/azure/durable_functions/models/RpcManagementOptions.py b/azure/durable_functions/models/RpcManagementOptions.py index 36158315..c142a3a4 100644 --- a/azure/durable_functions/models/RpcManagementOptions.py +++ b/azure/durable_functions/models/RpcManagementOptions.py @@ -53,8 +53,7 @@ def to_url(self, base_url: Optional[str]) -> str: The Url used to get orchestration status information """ if base_url is None: - # TODO: In JS we manage a "legacy app frontend code path. Here too?" - raise ValueError("orchestration bindings has not RPC base url") + raise ValueError("orchestration bindings has not RPC base url") url = f"{base_url}instances/{self._instance_id if self._instance_id else ''}" diff --git a/azure/durable_functions/tasks/new_uuid.py b/azure/durable_functions/tasks/new_uuid.py index 43140824..a5cab69d 100644 --- a/azure/durable_functions/tasks/new_uuid.py +++ b/azure/durable_functions/tasks/new_uuid.py @@ -1,6 +1,10 @@ from uuid import uuid5, NAMESPACE_OID from azure.durable_functions.constants import DATETIME_STRING_FORMAT +import typing + +if typing.TYPE_CHECKING: + from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext URL_NAMESPACE: str = "9e952958-5e33-4daf-827f-2fa12937b875" @@ -10,7 +14,7 @@ def _create_deterministic_uuid(namespace_value: str, name: str) -> str: return str(uuid5(namespace_uuid, name)) -def new_uuid(context) -> str: +def new_uuid(context: 'DurableOrchestrationContext') -> str: """Create a new UUID that is safe for replay within an orchestration or operation. The default implementation of this method creates a name-based UUID From d3c64a533fbe03dd8f17bdd2f010826539d2ab7c Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 30 Jul 2020 16:28:55 -0700 Subject: [PATCH 09/13] PR feedback 2 --- azure/durable_functions/models/DurableOrchestrationStatus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/azure/durable_functions/models/DurableOrchestrationStatus.py b/azure/durable_functions/models/DurableOrchestrationStatus.py index e60850f5..2712069a 100644 --- a/azure/durable_functions/models/DurableOrchestrationStatus.py +++ b/azure/durable_functions/models/DurableOrchestrationStatus.py @@ -26,7 +26,7 @@ def __init__(self, name: Optional[str] = None, instanceId: Optional[str] = None, if lastUpdatedTime is not None else None self._input: Any = input self._output: Any = output - self._runtime_status: Optional[str] = runtimeStatus + self._runtime_status: Optional[str] = runtimeStatus # TODO: GH issue 178 self._custom_status: Any = customStatus self._history: Optional[List[Any]] = history if kwargs is not None: From 50470fd5dc62ec8f1ced5ff91fab9740c1ba99fa Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 30 Jul 2020 16:33:06 -0700 Subject: [PATCH 10/13] linting fix --- azure/durable_functions/models/DurableOrchestrationStatus.py | 2 +- azure/durable_functions/models/RpcManagementOptions.py | 2 +- azure/durable_functions/tasks/new_uuid.py | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationStatus.py b/azure/durable_functions/models/DurableOrchestrationStatus.py index 2712069a..3dd20a51 100644 --- a/azure/durable_functions/models/DurableOrchestrationStatus.py +++ b/azure/durable_functions/models/DurableOrchestrationStatus.py @@ -26,7 +26,7 @@ def __init__(self, name: Optional[str] = None, instanceId: Optional[str] = None, if lastUpdatedTime is not None else None self._input: Any = input self._output: Any = output - self._runtime_status: Optional[str] = runtimeStatus # TODO: GH issue 178 + self._runtime_status: Optional[str] = runtimeStatus # TODO: GH issue 178 self._custom_status: Any = customStatus self._history: Optional[List[Any]] = history if kwargs is not None: diff --git a/azure/durable_functions/models/RpcManagementOptions.py b/azure/durable_functions/models/RpcManagementOptions.py index c142a3a4..c16c508f 100644 --- a/azure/durable_functions/models/RpcManagementOptions.py +++ b/azure/durable_functions/models/RpcManagementOptions.py @@ -53,7 +53,7 @@ def to_url(self, base_url: Optional[str]) -> str: The Url used to get orchestration status information """ if base_url is None: - raise ValueError("orchestration bindings has not RPC base url") + raise ValueError("orchestration bindings has not RPC base url") url = f"{base_url}instances/{self._instance_id if self._instance_id else ''}" diff --git a/azure/durable_functions/tasks/new_uuid.py b/azure/durable_functions/tasks/new_uuid.py index a5cab69d..364e4798 100644 --- a/azure/durable_functions/tasks/new_uuid.py +++ b/azure/durable_functions/tasks/new_uuid.py @@ -4,7 +4,8 @@ import typing if typing.TYPE_CHECKING: - from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext + from azure.durable_functions.models.DurableOrchestrationContext \ + import DurableOrchestrationContext URL_NAMESPACE: str = "9e952958-5e33-4daf-827f-2fa12937b875" From a52b75388d4520c15219e81191e1efb373ad8a48 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 3 Aug 2020 10:44:01 -0700 Subject: [PATCH 11/13] removed SerializableToJSON 4 Any, may want to revisit the idea via typing Protocols later --- .../models/DurableOrchestrationClient.py | 13 ++++++------- .../durable_functions/models/utils/type_aliases.py | 4 ---- 2 files changed, 6 insertions(+), 11 deletions(-) delete mode 100644 azure/durable_functions/models/utils/type_aliases.py diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 65c599ee..8bc22786 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -13,7 +13,6 @@ from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus from ..models.DurableOrchestrationBindings import DurableOrchestrationBindings from .utils.http_utils import get_async_request, post_async_request, delete_async_request -from .utils.type_aliases import SerializableToJSON from azure.functions._durable_functions import _serialize_custom_object @@ -46,7 +45,7 @@ def __init__(self, context: str): async def start_new(self, orchestration_function_name: str, instance_id: Optional[str] = None, - client_input: Optional[SerializableToJSON] = None) -> str: + client_input: Optional[Any] = None) -> str: """Start a new instance of the specified orchestrator function. If an orchestration instance with the specified ID already exists, the @@ -70,7 +69,7 @@ async def start_new(self, request_url = self._get_start_new_url( instance_id=instance_id, orchestration_function_name=orchestration_function_name) - response: List[SerializableToJSON] = await self._post_async_request( + response: List[Any] = await self._post_async_request( request_url, self._get_json_input(client_input)) status_code: int = response[0] @@ -84,7 +83,7 @@ async def start_new(self, else: # Catch all: simply surfacing the durable-extension exception # we surface the stack trace too, since this may be a more involed exception - ex_message: SerializableToJSON = response[1] + ex_message: Any = response[1] raise Exception(ex_message) def create_check_status_response( @@ -160,7 +159,7 @@ def get_client_response_links( return payload async def raise_event( - self, instance_id: str, event_name: str, event_data: SerializableToJSON = None, + self, instance_id: str, event_name: str, event_data: Any = None, task_hub_name: str = None, connection_name: str = None) -> None: """Send an event notification message to a waiting orchestration instance. @@ -272,7 +271,7 @@ async def get_status_all(self) -> List[DurableOrchestrationStatus]: if error_message: raise Exception(error_message) else: - statuses: List[SerializableToJSON] = response[1] + statuses: List[Any] = response[1] return [DurableOrchestrationStatus.from_json(o) for o in statuses] async def get_status_by(self, created_time_from: datetime = None, @@ -460,7 +459,7 @@ async def wait_for_completion_or_create_check_status_response( @staticmethod def _create_http_response( - status_code: int, body: Union[str, SerializableToJSON]) -> func.HttpResponse: + status_code: int, body: Union[str, Any]) -> func.HttpResponse: body_as_json = body if isinstance(body, str) else json.dumps(body) response_args = { "status_code": status_code, diff --git a/azure/durable_functions/models/utils/type_aliases.py b/azure/durable_functions/models/utils/type_aliases.py deleted file mode 100644 index 031c2e59..00000000 --- a/azure/durable_functions/models/utils/type_aliases.py +++ /dev/null @@ -1,4 +0,0 @@ -from typing import Any - -# TODO: Use Python's NewType functionality to further constraint types -SerializableToJSON = Any From aae83c98f52120a8495d31aa52fa25a694f1c1af Mon Sep 17 00:00:00 2001 From: David Justo Date: Wed, 5 Aug 2020 09:28:56 -0700 Subject: [PATCH 12/13] ContinueAsNew works, needed to mark the orchestrator as completed (#158) --- .../models/DurableOrchestrationContext.py | 8 +- azure/durable_functions/orchestrator.py | 99 +++++++++++-------- .../tasks/continue_as_new.py | 13 +-- tests/orchestrator/test_continue_as_new.py | 3 +- 4 files changed, 69 insertions(+), 54 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index b2656c0a..ec6b9197 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -32,6 +32,7 @@ def __init__(self, self._custom_status: Any = None self._new_uuid_counter: int = 0 self._sub_orchestrator_counter: int = 0 + self._continue_as_new_flag: bool = False self.call_activity = lambda n, i=None: call_activity_task( state=self.histories, name=n, @@ -65,7 +66,7 @@ def __init__(self, state=self.histories, name=n) self.new_uuid = lambda: new_uuid(context=self) - self.continue_as_new = lambda i: continue_as_new(input_=i) + self.continue_as_new = lambda i: continue_as_new(context=self, input_=i) self.task_any = lambda t: task_any(tasks=t) self.task_all = lambda t: task_all(tasks=t) self.create_timer = lambda d: create_timer_task(state=self.histories, fire_at=d) @@ -344,3 +345,8 @@ def function_context(self) -> FunctionContext: Object containing function level attributes not used by durable orchestrator. """ return self._function_context + + @property + def will_continue_as_new(self) -> bool: + """Return true if continue_as_new was called.""" + return self._continue_as_new_flag diff --git a/azure/durable_functions/orchestrator.py b/azure/durable_functions/orchestrator.py index 74a6c20a..00abe940 100644 --- a/azure/durable_functions/orchestrator.py +++ b/azure/durable_functions/orchestrator.py @@ -49,57 +49,66 @@ def handle(self, context: DurableOrchestrationContext): suspended = False fn_output = self.fn(self.durable_context) + # If `fn_output` is not an Iterator, then the orchestrator # function does not make use of its context parameter. If so, # `fn_output` is the return value instead of a generator - if isinstance(fn_output, Iterator): - self.generator = fn_output - - else: + if not isinstance(fn_output, Iterator): orchestration_state = OrchestratorState( is_done=True, output=fn_output, actions=self.durable_context.actions, custom_status=self.durable_context.custom_status) - return orchestration_state.to_json_string() - try: - generation_state = self._generate_next(None) - - while not suspended: - self._add_to_actions(generation_state) - - if should_suspend(generation_state): - orchestration_state = OrchestratorState( - is_done=False, - output=None, - actions=self.durable_context.actions, - custom_status=self.durable_context.custom_status) - suspended = True - continue - - if (isinstance(generation_state, Task) - or isinstance(generation_state, TaskSet)) and ( - generation_state.is_faulted): - generation_state = self.generator.throw( - generation_state.exception) - continue - - self._reset_timestamp() - generation_state = self._generate_next(generation_state) - - except StopIteration as sie: - orchestration_state = OrchestratorState( - is_done=True, - output=sie.value, - actions=self.durable_context.actions, - custom_status=self.durable_context.custom_status) - except Exception as e: - orchestration_state = OrchestratorState( - is_done=False, - output=None, # Should have no output, after generation range - actions=self.durable_context.actions, - error=str(e), - custom_status=self.durable_context.custom_status) + + else: + self.generator = fn_output + try: + generation_state = self._generate_next(None) + + while not suspended: + self._add_to_actions(generation_state) + + if should_suspend(generation_state): + + # The `is_done` field should be False here unless + # `continue_as_new` was called. Therefore, + # `will_continue_as_new` essentially "tracks" + # whether or not the orchestration is done. + orchestration_state = OrchestratorState( + is_done=self.durable_context.will_continue_as_new, + output=None, + actions=self.durable_context.actions, + custom_status=self.durable_context.custom_status) + suspended = True + continue + + if (isinstance(generation_state, Task) + or isinstance(generation_state, TaskSet)) and ( + generation_state.is_faulted): + generation_state = self.generator.throw( + generation_state.exception) + continue + + self._reset_timestamp() + generation_state = self._generate_next(generation_state) + + except StopIteration as sie: + orchestration_state = OrchestratorState( + is_done=True, + output=sie.value, + actions=self.durable_context.actions, + custom_status=self.durable_context.custom_status) + except Exception as e: + orchestration_state = OrchestratorState( + is_done=False, + output=None, # Should have no output, after generation range + actions=self.durable_context.actions, + error=str(e), + custom_status=self.durable_context.custom_status) + + # No output if continue_as_new was called + if self.durable_context.will_continue_as_new: + orchestration_state._output = None return orchestration_state.to_json_string() @@ -108,9 +117,13 @@ def _generate_next(self, partial_result): gen_result = self.generator.send(partial_result.result) else: gen_result = self.generator.send(None) + return gen_result def _add_to_actions(self, generation_state): + # Do not add new tasks to action if continue_as_new was called + if self.durable_context.will_continue_as_new: + return if (isinstance(generation_state, Task) and hasattr(generation_state, "action")): self.durable_context.actions.append([generation_state.action]) diff --git a/azure/durable_functions/tasks/continue_as_new.py b/azure/durable_functions/tasks/continue_as_new.py index 552a6c67..dd07765f 100644 --- a/azure/durable_functions/tasks/continue_as_new.py +++ b/azure/durable_functions/tasks/continue_as_new.py @@ -1,24 +1,19 @@ from typing import Any -from ..models.Task import ( - Task) from ..models.actions.ContinueAsNewAction import ContinueAsNewAction def continue_as_new( - input_: Any = None) -> Task: + context, + input_: Any = None): """Create a new continue as new action. Parameters ---------- input_: Any The JSON-serializable input to pass to the activity function. - - Returns - ------- - Task - A Durable Task that causes the orchestrator reset and start as a new orchestration. """ new_action = ContinueAsNewAction(input_) - return Task(is_completed=False, is_faulted=False, action=new_action) + context.actions.append([new_action]) + context._continue_as_new_flag = True diff --git a/tests/orchestrator/test_continue_as_new.py b/tests/orchestrator/test_continue_as_new.py index 8c8f1595..ec7956d9 100644 --- a/tests/orchestrator/test_continue_as_new.py +++ b/tests/orchestrator/test_continue_as_new.py @@ -10,7 +10,7 @@ def generator_function(context): yield context.call_activity("Hello", "Tokyo") - yield context.continue_as_new("Cause I can") + context.continue_as_new("Cause I can") def base_expected_state(output=None) -> OrchestratorState: @@ -25,6 +25,7 @@ def add_hello_action(state: OrchestratorState, input_: str): def add_continue_as_new_action(state: OrchestratorState, input_: str): action = ContinueAsNewAction(input_=input_) state.actions.append([action]) + state._is_done = True def add_hello_completed_events( From 1ed44961f4270a9b662308d6468b581db9f5c05d Mon Sep 17 00:00:00 2001 From: David Justo Date: Wed, 5 Aug 2020 10:20:42 -0700 Subject: [PATCH 13/13] linting fix --- azure/durable_functions/models/DurableOrchestrationBindings.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/azure/durable_functions/models/DurableOrchestrationBindings.py b/azure/durable_functions/models/DurableOrchestrationBindings.py index 98a40f96..8c8d8576 100644 --- a/azure/durable_functions/models/DurableOrchestrationBindings.py +++ b/azure/durable_functions/models/DurableOrchestrationBindings.py @@ -17,7 +17,8 @@ def __init__(self, taskHubName: str, creationUrls: Dict[str, str], self._task_hub_name: str = taskHubName self._creation_urls: Dict[str, str] = creationUrls self._management_urls: Dict[str, str] = managementUrls - # TODO: we can remove the Optional once we drop support for 1.x, this is always provided in 2.x + # TODO: we can remove the Optional once we drop support for 1.x, + # this is always provided in 2.x self._rpc_base_url: Optional[str] = rpcBaseUrl self._client_data = FunctionContext(**kwargs)