-
Notifications
You must be signed in to change notification settings - Fork 64
The project now typechecks, improved IntelliSense #164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
39ee2ba
27baacf
5e18888
f43a232
e17c03c
17c99ee
ff73172
a0686e3
31c981c
d3c64a5
50470fd
a52b753
aae83c9
d4a6904
1ed4496
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| import json | ||
| from datetime import datetime | ||
| from typing import List, Any, Awaitable, Optional, Dict | ||
| from typing import List, Any, Optional, Dict, Union | ||
| from time import time | ||
| from asyncio import sleep | ||
| from urllib.parse import urlparse, quote | ||
|
|
@@ -11,8 +11,9 @@ | |
| from .DurableOrchestrationStatus import DurableOrchestrationStatus | ||
| from .RpcManagementOptions import RpcManagementOptions | ||
| from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus | ||
| from ..models import DurableOrchestrationBindings | ||
| from ..models.DurableOrchestrationBindings import DurableOrchestrationBindings | ||
| from .utils.http_utils import get_async_request, post_async_request, delete_async_request | ||
| from .utils.type_aliases import SerializableToJSON | ||
| from azure.functions._durable_functions import _serialize_custom_object | ||
|
|
||
|
|
||
|
|
@@ -44,8 +45,8 @@ def __init__(self, context: str): | |
|
|
||
| async def start_new(self, | ||
| orchestration_function_name: str, | ||
| instance_id: str = None, | ||
| client_input: object = None) -> Awaitable[str]: | ||
| instance_id: Optional[str] = None, | ||
| client_input: Optional[SerializableToJSON] = None) -> str: | ||
| """Start a new instance of the specified orchestrator function. | ||
|
|
||
| If an orchestration instance with the specified ID already exists, the | ||
|
|
@@ -55,10 +56,10 @@ async def start_new(self, | |
| ---------- | ||
| orchestration_function_name : str | ||
| The name of the orchestrator function to start. | ||
| instance_id : str | ||
| instance_id : Optional[str] | ||
| The ID to use for the new orchestration instance. If no instance id is specified, | ||
| the Durable Functions extension will generate a random GUID (recommended). | ||
| client_input : object | ||
| client_input : Optional[Any] | ||
| JSON-serializable input value for the orchestrator function. | ||
|
|
||
| Returns | ||
|
|
@@ -69,22 +70,25 @@ async def start_new(self, | |
| request_url = self._get_start_new_url( | ||
| instance_id=instance_id, orchestration_function_name=orchestration_function_name) | ||
|
|
||
| response = await self._post_async_request(request_url, self._get_json_input(client_input)) | ||
| response: List[SerializableToJSON] = await self._post_async_request( | ||
|
||
| request_url, self._get_json_input(client_input)) | ||
|
|
||
| if response[0] <= 202 and response[1]: | ||
| status_code: int = response[0] | ||
| if status_code <= 202 and response[1]: | ||
| return response[1]["id"] | ||
| elif response[0] == 400: | ||
| elif status_code == 400: | ||
| # Orchestrator not found, report clean exception | ||
| exception_data = response[1] | ||
| exception_data: Dict[str, str] = response[1] | ||
| exception_message = exception_data["ExceptionMessage"] | ||
| raise Exception(exception_message) | ||
| else: | ||
| # Catch all: simply surfacing the durable-extension exception | ||
| # we surface the stack trace too, since this may be a more involed exception | ||
| exception_message = response[1] | ||
| raise Exception(exception_message) | ||
| ex_message: SerializableToJSON = response[1] | ||
| raise Exception(ex_message) | ||
|
|
||
| def create_check_status_response(self, request, instance_id): | ||
| def create_check_status_response( | ||
| self, request: func.HttpRequest, instance_id: str) -> func.HttpResponse: | ||
| """Create a HttpResponse that contains useful information for \ | ||
| checking the status of the specified instance. | ||
|
|
||
|
|
@@ -148,16 +152,16 @@ def get_client_response_links( | |
| payload = self._orchestration_bindings.management_urls.copy() | ||
|
|
||
| for key, _ in payload.items(): | ||
| request_is_not_none = not (request is None) | ||
| if request_is_not_none and request.url: | ||
| if not(request is None) and request.url: | ||
| payload[key] = self._replace_url_origin(request.url, payload[key]) | ||
| payload[key] = payload[key].replace( | ||
| self._orchestration_bindings.management_urls["id"], instance_id) | ||
|
|
||
| return payload | ||
|
|
||
| async def raise_event(self, instance_id, event_name, event_data=None, | ||
| task_hub_name=None, connection_name=None): | ||
| async def raise_event( | ||
| self, instance_id: str, event_name: str, event_data: SerializableToJSON = None, | ||
| task_hub_name: str = None, connection_name: str = None) -> None: | ||
| """Send an event notification message to a waiting orchestration instance. | ||
|
|
||
| In order to handle the event, the target orchestration instance must be | ||
|
|
@@ -169,7 +173,7 @@ async def raise_event(self, instance_id, event_name, event_data=None, | |
| The ID of the orchestration instance that will handle the event. | ||
| event_name : str | ||
| The name of the event. | ||
| event_data : any, optional | ||
| event_data : Any, optional | ||
| The JSON-serializable data associated with the event. | ||
| task_hub_name : str, optional | ||
| The TaskHubName of the orchestration that will handle the event. | ||
|
|
@@ -183,7 +187,7 @@ async def raise_event(self, instance_id, event_name, event_data=None, | |
| Exception | ||
| Raises an exception if the status code is 404 or 400 when raising the event. | ||
| """ | ||
| if not event_name: | ||
| if event_name == "": | ||
davidmrdavid marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| raise ValueError("event_name must be a valid string.") | ||
davidmrdavid marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| request_url = self._get_raise_event_url( | ||
|
|
@@ -203,9 +207,9 @@ async def raise_event(self, instance_id, event_name, event_data=None, | |
| if error_message: | ||
| raise Exception(error_message) | ||
|
|
||
| async def get_status(self, instance_id: str, show_history: bool = None, | ||
| show_history_output: bool = None, | ||
| show_input: bool = None) -> DurableOrchestrationStatus: | ||
| async def get_status(self, instance_id: str, show_history: bool = False, | ||
| show_history_output: bool = False, | ||
| show_input: bool = False) -> DurableOrchestrationStatus: | ||
| """Get the status of the specified orchestration instance. | ||
|
|
||
| Parameters | ||
|
|
@@ -268,7 +272,8 @@ async def get_status_all(self) -> List[DurableOrchestrationStatus]: | |
| if error_message: | ||
| raise Exception(error_message) | ||
| else: | ||
| return [DurableOrchestrationStatus.from_json(o) for o in response[1]] | ||
| statuses: List[SerializableToJSON] = response[1] | ||
| return [DurableOrchestrationStatus.from_json(o) for o in statuses] | ||
|
|
||
| async def get_status_by(self, created_time_from: datetime = None, | ||
| created_time_to: datetime = None, | ||
|
|
@@ -291,6 +296,7 @@ async def get_status_by(self, created_time_from: datetime = None, | |
| DurableOrchestrationStatus | ||
| The status of the requested orchestration instances | ||
| """ | ||
| # TODO: do we really want folks to us this without specifying all the args? | ||
| options = RpcManagementOptions(created_time_from=created_time_from, | ||
| created_time_to=created_time_to, | ||
| runtime_status=runtime_status) | ||
|
|
@@ -326,19 +332,20 @@ async def purge_instance_history(self, instance_id: str) -> PurgeHistoryResult: | |
| response = await self._delete_async_request(request_url) | ||
| return self._parse_purge_instance_history_response(response) | ||
|
|
||
| async def purge_instance_history_by(self, created_time_from: datetime = None, | ||
| created_time_to: datetime = None, | ||
| runtime_status: List[OrchestrationRuntimeStatus] = None) \ | ||
| async def purge_instance_history_by( | ||
| self, created_time_from: Optional[datetime] = None, | ||
| created_time_to: Optional[datetime] = None, | ||
| runtime_status: Optional[List[OrchestrationRuntimeStatus]] = None) \ | ||
| -> PurgeHistoryResult: | ||
| """Delete the history of all orchestration instances that match the specified conditions. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| created_time_from : datetime | ||
| created_time_from : Optional[datetime] | ||
| Delete orchestration history which were created after this Date. | ||
| created_time_to: datetime | ||
| created_time_to: Optional[datetime] | ||
| Delete orchestration history which were created before this Date. | ||
| runtime_status: List[OrchestrationRuntimeStatus] | ||
| runtime_status: Optional[List[OrchestrationRuntimeStatus]] | ||
| Delete orchestration instances which match any of the runtimeStatus values | ||
| in this list. | ||
|
|
||
|
|
@@ -347,14 +354,15 @@ async def purge_instance_history_by(self, created_time_from: datetime = None, | |
| PurgeHistoryResult | ||
| The results of the request to purge history | ||
| """ | ||
| # TODO: do we really want folks to us this without specifying all the args? | ||
| options = RpcManagementOptions(created_time_from=created_time_from, | ||
| created_time_to=created_time_to, | ||
| runtime_status=runtime_status) | ||
| request_url = options.to_url(self._orchestration_bindings.rpc_base_url) | ||
| response = await self._delete_async_request(request_url) | ||
| return self._parse_purge_instance_history_response(response) | ||
|
|
||
| async def terminate(self, instance_id: str, reason: str): | ||
| async def terminate(self, instance_id: str, reason: str) -> None: | ||
| """Terminate the specified orchestration instance. | ||
|
|
||
| Parameters | ||
|
|
@@ -364,6 +372,11 @@ async def terminate(self, instance_id: str, reason: str): | |
| reason: str | ||
| The reason for terminating the instance. | ||
|
|
||
| Raises | ||
| ------ | ||
| Exception: | ||
| When the terminate call failed with an unexpected status code | ||
|
|
||
| Returns | ||
| ------- | ||
| None | ||
|
|
@@ -446,7 +459,8 @@ async def wait_for_completion_or_create_check_status_response( | |
| return self.create_check_status_response(request, instance_id) | ||
|
|
||
| @staticmethod | ||
| def _create_http_response(status_code: int, body: Any) -> func.HttpResponse: | ||
| def _create_http_response( | ||
| status_code: int, body: Union[str, SerializableToJSON]) -> func.HttpResponse: | ||
| body_as_json = body if isinstance(body, str) else json.dumps(body) | ||
| response_args = { | ||
| "status_code": status_code, | ||
|
|
@@ -459,7 +473,7 @@ def _create_http_response(status_code: int, body: Any) -> func.HttpResponse: | |
| return func.HttpResponse(**response_args) | ||
|
|
||
| @staticmethod | ||
| def _get_json_input(client_input: object) -> str: | ||
| def _get_json_input(client_input: object) -> Optional[str]: | ||
| """Serialize the orchestrator input. | ||
|
|
||
| Parameters | ||
|
|
@@ -469,8 +483,10 @@ def _get_json_input(client_input: object) -> str: | |
|
|
||
| Returns | ||
| ------- | ||
| str | ||
| A string representing the JSON-serialization of `client_input` | ||
| Optional[str] | ||
| If `client_input` is not None, return a string representing | ||
| the JSON-serialization of `client_input`. Otherwise, returns | ||
| None | ||
|
|
||
| Exceptions | ||
| ---------- | ||
|
|
@@ -482,7 +498,7 @@ def _get_json_input(client_input: object) -> str: | |
| return None | ||
|
|
||
| @staticmethod | ||
| def _replace_url_origin(request_url, value_url): | ||
| def _replace_url_origin(request_url: str, value_url: str) -> str: | ||
| request_parsed_url = urlparse(request_url) | ||
| value_parsed_url = urlparse(value_url) | ||
| request_url_origin = '{url.scheme}://{url.netloc}/'.format(url=request_parsed_url) | ||
|
|
@@ -491,7 +507,8 @@ def _replace_url_origin(request_url, value_url): | |
| return value_url | ||
|
|
||
| @staticmethod | ||
| def _parse_purge_instance_history_response(response: [int, Any]): | ||
| def _parse_purge_instance_history_response( | ||
| response: List[Any]) -> PurgeHistoryResult: | ||
| switch_statement = { | ||
| 200: lambda: PurgeHistoryResult.from_json(response[1]), # instance completed | ||
| 404: lambda: PurgeHistoryResult(instancesDeleted=0), # instance not found | ||
|
|
@@ -506,17 +523,20 @@ def _parse_purge_instance_history_response(response: [int, Any]): | |
| else: | ||
| raise Exception(result) | ||
|
|
||
| def _get_start_new_url(self, instance_id, orchestration_function_name): | ||
| def _get_start_new_url( | ||
| self, instance_id: Optional[str], orchestration_function_name: str) -> str: | ||
| instance_path = f'/{instance_id}' if instance_id is not None else '' | ||
| request_url = f'{self._orchestration_bindings.rpc_base_url}orchestrators/' \ | ||
| f'{orchestration_function_name}{instance_path}' | ||
| return request_url | ||
|
|
||
| def _get_raise_event_url(self, instance_id, event_name, task_hub_name, connection_name): | ||
| def _get_raise_event_url( | ||
| self, instance_id: str, event_name: str, | ||
| task_hub_name: Optional[str], connection_name: Optional[str]) -> str: | ||
| request_url = f'{self._orchestration_bindings.rpc_base_url}' \ | ||
| f'instances/{instance_id}/raiseEvent/{event_name}' | ||
|
|
||
| query = [] | ||
| query: List[str] = [] | ||
| if task_hub_name: | ||
| query.append(f'taskHub={task_hub_name}') | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.