From 449c93946620eefd141a9486c41f08c8262cd750 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Thu, 14 Sep 2023 12:44:45 -0400 Subject: [PATCH 1/2] add Java to quickstart Signed-off-by: Hannah Hunter --- .../quickstarts/workflow-quickstart.md | 454 +++++++++++------- 1 file changed, 280 insertions(+), 174 deletions(-) diff --git a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md index d139561cb04..51dd2539525 100644 --- a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md @@ -21,7 +21,245 @@ In this guide, you'll: -{{< tabs ".NET" "Python" >}} +{{< tabs "Python" ".NET" "Java" >}} + + +{{% codetab %}} + +The `order-processor` console app starts and manages the `order_processing_workflow`, which simulates purchasing items from a store. The workflow consists of five unique workflow activities, or tasks: + +- `notify_activity`: Utilizes a logger to print out messages throughout the workflow. These messages notify you when: + - You have insufficient inventory + - Your payment couldn't be processed, etc. +- `process_payment_activity`: Processes and authorizes the payment. +- `verify_inventory_activity`: Checks the state store to ensure there is enough inventory present for purchase. +- `update_inventory_activity`: Removes the requested items from the state store and updates the store with the new remaining inventory value. +- `request_approval_activity`: Seeks approval from the manager if payment is greater than 50,000 USD. + +### Step 1: Pre-requisites + +For this example, you will need: + +- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started). +- [Python 3.7+ installed](https://www.python.org/downloads/). + +- [Docker Desktop](https://www.docker.com/products/docker-desktop) + + +### Step 2: Set up the environment + +Clone the [sample provided in the Quickstarts repo](https://github.com/dapr/quickstarts/tree/master/workflows). + +```bash +git clone https://github.com/dapr/quickstarts.git +``` + +In a new terminal window, navigate to the `order-processor` directory: + +```bash +cd workflows/python/sdk/order-processor +``` + +Install the Dapr Python SDK package: + +```bash +pip3 install -r requirements.txt +``` + +### Step 3: Run the order processor app + +In the terminal, start the order processor app alongside a Dapr sidecar: + +```bash +dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py +``` + +> **Note:** Since Python3.exe is not defined in Windows, you may need to use `python app.py` instead of `python3 app.py`. + +This starts the `order-processor` app with unique workflow ID and runs the workflow activities. + +Expected output: + +```bash +== APP == Starting order workflow, purchasing 10 of cars +== APP == 2023-06-06 09:35:52.945 durabletask-worker INFO: Successfully connected to 127.0.0.1:65406. Waiting for work items... +== APP == INFO:NotifyActivity:Received order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at $150000 ! +== APP == INFO:VerifyInventoryActivity:Verifying inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da of 10 cars +== APP == INFO:VerifyInventoryActivity:There are 100 Cars available for purchase +== APP == INFO:RequestApprovalActivity:Requesting approval for payment of 165000 USD for 10 cars +== APP == 2023-06-06 09:36:05.969 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da Event raised: manager_approval +== APP == INFO:NotifyActivity:Payment for order f4e1926e-3721-478d-be8a-f5bebd1995da has been approved! +== APP == INFO:ProcessPaymentActivity:Processing payment: f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at 150000 USD +== APP == INFO:ProcessPaymentActivity:Payment for request ID f4e1926e-3721-478d-be8a-f5bebd1995da processed successfully +== APP == INFO:UpdateInventoryActivity:Checking inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars +== APP == INFO:UpdateInventoryActivity:There are now 90 cars left in stock +== APP == INFO:NotifyActivity:Order f4e1926e-3721-478d-be8a-f5bebd1995da has completed! +== APP == 2023-06-06 09:36:06.106 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Orchestration completed with status: COMPLETED +== APP == Workflow completed! Result: Completed +== APP == Purchase of item is Completed +``` + +### (Optional) Step 4: View in Zipkin + +If you have Zipkin configured for Dapr locally on your machine, you can view the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`). + + + +### What happened? + +When you ran `dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py`: + +1. A unique order ID for the workflow is generated (in the above example, `f4e1926e-3721-478d-be8a-f5bebd1995da`) and the workflow is scheduled. +1. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received. +1. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock. +1. Your workflow starts and notifies you of its status. +1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `f4e1926e-3721-478d-be8a-f5bebd1995da` and confirms if successful. +1. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed. +1. The `NotifyActivity` workflow activity sends a notification saying that order `f4e1926e-3721-478d-be8a-f5bebd1995da` has completed. +1. The workflow terminates as completed. + +#### `order-processor/app.py` + +In the application's program file: +- The unique workflow order ID is generated +- The workflow is scheduled +- The workflow status is retrieved +- The workflow and the workflow activities it invokes are registered + +```python +class WorkflowConsoleApp: + def main(self): + # Register workflow and activities + workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT) + workflowRuntime.register_workflow(order_processing_workflow) + workflowRuntime.register_activity(notify_activity) + workflowRuntime.register_activity(requst_approval_activity) + workflowRuntime.register_activity(verify_inventory_activity) + workflowRuntime.register_activity(process_payment_activity) + workflowRuntime.register_activity(update_inventory_activity) + workflowRuntime.start() + + print("==========Begin the purchase of item:==========", flush=True) + item_name = default_item_name + order_quantity = 10 + + total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost + order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost) + + # Start Workflow + print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True) + start_resp = daprClient.start_workflow(workflow_component=workflow_component, + workflow_name=workflow_name, + input=order) + _id = start_resp.instance_id + + def prompt_for_approval(daprClient: DaprClient): + daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component, + event_name="manager_approval", event_data={'approval': True}) + + approval_seeked = False + start_time = datetime.now() + while True: + time_delta = datetime.now() - start_time + state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) + if not state: + print("Workflow not found!") # not expected + elif state.runtime_status == "Completed" or\ + state.runtime_status == "Failed" or\ + state.runtime_status == "Terminated": + print(f'Workflow completed! Result: {state.runtime_status}', flush=True) + break + if time_delta.total_seconds() >= 10: + state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) + if total_cost > 50000 and ( + state.runtime_status != "Completed" or + state.runtime_status != "Failed" or + state.runtime_status != "Terminated" + ) and not approval_seeked: + approval_seeked = True + threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start() + + print("Purchase of item is ", state.runtime_status, flush=True) + + def restock_inventory(self, daprClient: DaprClient, baseInventory): + for key, item in baseInventory.items(): + print(f'item: {item}') + item_str = f'{{"name": "{item.item_name}", "quantity": {item.quantity},\ + "per_item_cost": {item.per_item_cost}}}' + daprClient.save_state("statestore-actors", key, item_str) + +if __name__ == '__main__': + app = WorkflowConsoleApp() + app.main() +``` + +#### `order-processor/workflow.py` + +In `workflow.py`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities). + +```python + def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: OrderPayload): + """Defines the order processing workflow. + When the order is received, the inventory is checked to see if there is enough inventory to + fulfill the order. If there is enough inventory, the payment is processed and the inventory is + updated. If there is not enough inventory, the order is rejected. + If the total order is greater than $50,000, the order is sent to a manager for approval. + """ + order_id = ctx.instance_id + order_payload=json.loads(order_payload_str) + yield ctx.call_activity(notify_activity, + input=Notification(message=('Received order ' +order_id+ ' for ' + +f'{order_payload["quantity"]}' +' ' +f'{order_payload["item_name"]}' + +' at $'+f'{order_payload["total_cost"]}' +' !'))) + result = yield ctx.call_activity(verify_inventory_activity, + input=InventoryRequest(request_id=order_id, + item_name=order_payload["item_name"], + quantity=order_payload["quantity"])) + if not result.success: + yield ctx.call_activity(notify_activity, + input=Notification(message='Insufficient inventory for ' + +f'{order_payload["item_name"]}'+'!')) + return OrderResult(processed=False) + + if order_payload["total_cost"] > 50000: + yield ctx.call_activity(requst_approval_activity, input=order_payload) + approval_task = ctx.wait_for_external_event("manager_approval") + timeout_event = ctx.create_timer(timedelta(seconds=200)) + winner = yield when_any([approval_task, timeout_event]) + if winner == timeout_event: + yield ctx.call_activity(notify_activity, + input=Notification(message='Payment for order '+order_id + +' has been cancelled due to timeout!')) + return OrderResult(processed=False) + approval_result = yield approval_task + if approval_result["approval"]: + yield ctx.call_activity(notify_activity, input=Notification( + message=f'Payment for order {order_id} has been approved!')) + else: + yield ctx.call_activity(notify_activity, input=Notification( + message=f'Payment for order {order_id} has been rejected!')) + return OrderResult(processed=False) + + yield ctx.call_activity(process_payment_activity, input=PaymentRequest( + request_id=order_id, item_being_purchased=order_payload["item_name"], + amount=order_payload["total_cost"], quantity=order_payload["quantity"])) + + try: + yield ctx.call_activity(update_inventory_activity, + input=PaymentRequest(request_id=order_id, + item_being_purchased=order_payload["item_name"], + amount=order_payload["total_cost"], + quantity=order_payload["quantity"])) + except Exception: + yield ctx.call_activity(notify_activity, + input=Notification(message=f'Order {order_id} Failed!')) + return OrderResult(processed=False) + + yield ctx.call_activity(notify_activity, input=Notification( + message=f'Order {order_id} has completed!')) + return OrderResult(processed=True) +``` +{{% /codetab %}} {{% codetab %}} @@ -254,27 +492,36 @@ The `Activities` directory holds the four workflow activities used by the workfl - `ProcessPaymentActivity.cs` - `UpdateInventoryActivity.cs` +## Watch the demo + +Watch [this video to walk through the Dapr Workflow .NET demo](https://youtu.be/BxiKpEmchgQ?t=2564): + + + {{% /codetab %}} - + {{% codetab %}} -The `order-processor` console app starts and manages the `order_processing_workflow`, which simulates purchasing items from a store. The workflow consists of five unique workflow activities, or tasks: +The `order-processor` console app starts and manages the lifecycle of an order processing workflow that stores and retrieves data in a state store. The workflow consists of four workflow activities, or tasks: +- `NotifyActivity`: Utilizes a logger to print out messages throughout the workflow +- `RequestApprovalActivity`: Requests approval for processing payment +- `ReserveInventoryActivity`: Checks the state store to ensure that there is enough inventory for the purchase +- `ProcessPaymentActivity`: Processes and authorizes the payment +- `UpdateInventoryActivity`: Removes the requested items from the state store and updates the store with the new remaining inventory value -- `notify_activity`: Utilizes a logger to print out messages throughout the workflow. These messages notify you when: - - You have insufficient inventory - - Your payment couldn't be processed, etc. -- `process_payment_activity`: Processes and authorizes the payment. -- `verify_inventory_activity`: Checks the state store to ensure there is enough inventory present for purchase. -- `update_inventory_activity`: Removes the requested items from the state store and updates the store with the new remaining inventory value. -- `request_approval_activity`: Seeks approval from the manager if payment is greater than 50,000 USD. ### Step 1: Pre-requisites For this example, you will need: -- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started). -- [Python 3.7+ installed](https://www.python.org/downloads/). +- [Dapr and Dapr CLI installed and initialized]({{< ref install-dapr-cli.md >}}). + * Run `dapr init`. +- Java JDK 11 (or greater): + - [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11) + - [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) + - [OpenJDK 11](https://jdk.java.net/11/) +- [Apache Maven](https://maven.apache.org/install.html) version 3.x. - [Docker Desktop](https://www.docker.com/products/docker-desktop) @@ -287,16 +534,16 @@ Clone the [sample provided in the Quickstarts repo](https://github.com/dapr/quic git clone https://github.com/dapr/quickstarts.git ``` -In a new terminal window, navigate to the `order-processor` directory: +Navigate into the root and install the dependencies: ```bash -cd workflows/python/sdk/order-processor +mvn install ``` -Install the Dapr Python SDK package: +Navigate to the `order-processor` directory: ```bash -pip3 install -r requirements.txt +cd workflows/java/sdk/order-processor ``` ### Step 3: Run the order processor app @@ -304,54 +551,37 @@ pip3 install -r requirements.txt In the terminal, start the order processor app alongside a Dapr sidecar: ```bash -dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py -``` -> **Note:** Since Python3.exe is not defined in Windows, you may need to use `python app.py` instead of `python3 app.py`. +``` This starts the `order-processor` app with unique workflow ID and runs the workflow activities. Expected output: -```bash -== APP == Starting order workflow, purchasing 10 of cars -== APP == 2023-06-06 09:35:52.945 durabletask-worker INFO: Successfully connected to 127.0.0.1:65406. Waiting for work items... -== APP == INFO:NotifyActivity:Received order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at $150000 ! -== APP == INFO:VerifyInventoryActivity:Verifying inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da of 10 cars -== APP == INFO:VerifyInventoryActivity:There are 100 Cars available for purchase -== APP == INFO:RequestApprovalActivity:Requesting approval for payment of 165000 USD for 10 cars -== APP == 2023-06-06 09:36:05.969 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da Event raised: manager_approval -== APP == INFO:NotifyActivity:Payment for order f4e1926e-3721-478d-be8a-f5bebd1995da has been approved! -== APP == INFO:ProcessPaymentActivity:Processing payment: f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at 150000 USD -== APP == INFO:ProcessPaymentActivity:Payment for request ID f4e1926e-3721-478d-be8a-f5bebd1995da processed successfully -== APP == INFO:UpdateInventoryActivity:Checking inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars -== APP == INFO:UpdateInventoryActivity:There are now 90 cars left in stock -== APP == INFO:NotifyActivity:Order f4e1926e-3721-478d-be8a-f5bebd1995da has completed! -== APP == 2023-06-06 09:36:06.106 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Orchestration completed with status: COMPLETED -== APP == Workflow completed! Result: Completed -== APP == Purchase of item is Completed +``` + ``` ### (Optional) Step 4: View in Zipkin If you have Zipkin configured for Dapr locally on your machine, you can view the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`). - + ### What happened? -When you ran `dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py`: +When you ran `todo`: -1. A unique order ID for the workflow is generated (in the above example, `f4e1926e-3721-478d-be8a-f5bebd1995da`) and the workflow is scheduled. +1. A unique order ID for the workflow is generated (in the above example, `6d2abcc9`) and the workflow is scheduled. 1. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received. 1. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock. 1. Your workflow starts and notifies you of its status. -1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `f4e1926e-3721-478d-be8a-f5bebd1995da` and confirms if successful. +1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `6d2abcc9` and confirms if successful. 1. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed. -1. The `NotifyActivity` workflow activity sends a notification saying that order `f4e1926e-3721-478d-be8a-f5bebd1995da` has completed. +1. The `NotifyActivity` workflow activity sends a notification saying that order `6d2abcc9` has completed. 1. The workflow terminates as completed. -#### `order-processor/app.py` +#### `order-processor/main file` In the application's program file: - The unique workflow order ID is generated @@ -359,151 +589,27 @@ In the application's program file: - The workflow status is retrieved - The workflow and the workflow activities it invokes are registered -```python -class WorkflowConsoleApp: - def main(self): - # Register workflow and activities - workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT) - workflowRuntime.register_workflow(order_processing_workflow) - workflowRuntime.register_activity(notify_activity) - workflowRuntime.register_activity(requst_approval_activity) - workflowRuntime.register_activity(verify_inventory_activity) - workflowRuntime.register_activity(process_payment_activity) - workflowRuntime.register_activity(update_inventory_activity) - workflowRuntime.start() +```java - print("==========Begin the purchase of item:==========", flush=True) - item_name = default_item_name - order_quantity = 10 - - total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost - order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost) - - # Start Workflow - print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True) - start_resp = daprClient.start_workflow(workflow_component=workflow_component, - workflow_name=workflow_name, - input=order) - _id = start_resp.instance_id +``` - def prompt_for_approval(daprClient: DaprClient): - daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component, - event_name="manager_approval", event_data={'approval': True}) +#### `name of order processor` - approval_seeked = False - start_time = datetime.now() - while True: - time_delta = datetime.now() - start_time - state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) - if not state: - print("Workflow not found!") # not expected - elif state.runtime_status == "Completed" or\ - state.runtime_status == "Failed" or\ - state.runtime_status == "Terminated": - print(f'Workflow completed! Result: {state.runtime_status}', flush=True) - break - if time_delta.total_seconds() >= 10: - state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) - if total_cost > 50000 and ( - state.runtime_status != "Completed" or - state.runtime_status != "Failed" or - state.runtime_status != "Terminated" - ) and not approval_seeked: - approval_seeked = True - threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start() - - print("Purchase of item is ", state.runtime_status, flush=True) +In `OrderProcessingWorkflow.cs`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities). - def restock_inventory(self, daprClient: DaprClient, baseInventory): - for key, item in baseInventory.items(): - print(f'item: {item}') - item_str = f'{{"name": "{item.item_name}", "quantity": {item.quantity},\ - "per_item_cost": {item.per_item_cost}}}' - daprClient.save_state("statestore-actors", key, item_str) +```java -if __name__ == '__main__': - app = WorkflowConsoleApp() - app.main() ``` -#### `order-processor/workflow.py` - -In `workflow.py`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities). - -```python - def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: OrderPayload): - """Defines the order processing workflow. - When the order is received, the inventory is checked to see if there is enough inventory to - fulfill the order. If there is enough inventory, the payment is processed and the inventory is - updated. If there is not enough inventory, the order is rejected. - If the total order is greater than $50,000, the order is sent to a manager for approval. - """ - order_id = ctx.instance_id - order_payload=json.loads(order_payload_str) - yield ctx.call_activity(notify_activity, - input=Notification(message=('Received order ' +order_id+ ' for ' - +f'{order_payload["quantity"]}' +' ' +f'{order_payload["item_name"]}' - +' at $'+f'{order_payload["total_cost"]}' +' !'))) - result = yield ctx.call_activity(verify_inventory_activity, - input=InventoryRequest(request_id=order_id, - item_name=order_payload["item_name"], - quantity=order_payload["quantity"])) - if not result.success: - yield ctx.call_activity(notify_activity, - input=Notification(message='Insufficient inventory for ' - +f'{order_payload["item_name"]}'+'!')) - return OrderResult(processed=False) - - if order_payload["total_cost"] > 50000: - yield ctx.call_activity(requst_approval_activity, input=order_payload) - approval_task = ctx.wait_for_external_event("manager_approval") - timeout_event = ctx.create_timer(timedelta(seconds=200)) - winner = yield when_any([approval_task, timeout_event]) - if winner == timeout_event: - yield ctx.call_activity(notify_activity, - input=Notification(message='Payment for order '+order_id - +' has been cancelled due to timeout!')) - return OrderResult(processed=False) - approval_result = yield approval_task - if approval_result["approval"]: - yield ctx.call_activity(notify_activity, input=Notification( - message=f'Payment for order {order_id} has been approved!')) - else: - yield ctx.call_activity(notify_activity, input=Notification( - message=f'Payment for order {order_id} has been rejected!')) - return OrderResult(processed=False) - - yield ctx.call_activity(process_payment_activity, input=PaymentRequest( - request_id=order_id, item_being_purchased=order_payload["item_name"], - amount=order_payload["total_cost"], quantity=order_payload["quantity"])) +#### `activities` directory - try: - yield ctx.call_activity(update_inventory_activity, - input=PaymentRequest(request_id=order_id, - item_being_purchased=order_payload["item_name"], - amount=order_payload["total_cost"], - quantity=order_payload["quantity"])) - except Exception: - yield ctx.call_activity(notify_activity, - input=Notification(message=f'Order {order_id} Failed!')) - return OrderResult(processed=False) +The `Activities` directory holds the four workflow activities used by the workflow, defined in the following files: +- - yield ctx.call_activity(notify_activity, input=Notification( - message=f'Order {order_id} has completed!')) - return OrderResult(processed=True) -``` {{% /codetab %}} - {{< /tabs >}} -## Watch the demo - -Watch [this video to walk through the Dapr Workflow .NET demo](https://youtu.be/BxiKpEmchgQ?t=2564): - - - - ## Tell us what you think! We're continuously working to improve our Quickstart examples and value your feedback. Did you find this Quickstart helpful? Do you have suggestions for improvement? From 71f9759cf465a1e9acea74011b52c583245345f3 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Wed, 20 Sep 2023 16:20:35 -0400 Subject: [PATCH 2/2] fill in java quickstart Signed-off-by: Hannah Hunter --- .../quickstarts/workflow-quickstart.md | 272 ++++++++++++++++-- 1 file changed, 253 insertions(+), 19 deletions(-) diff --git a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md index 51dd2539525..bec440e1d81 100644 --- a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md @@ -515,8 +515,7 @@ The `order-processor` console app starts and manages the lifecycle of an order p For this example, you will need: -- [Dapr and Dapr CLI installed and initialized]({{< ref install-dapr-cli.md >}}). - * Run `dapr init`. +- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started). - Java JDK 11 (or greater): - [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11) - [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) @@ -534,16 +533,16 @@ Clone the [sample provided in the Quickstarts repo](https://github.com/dapr/quic git clone https://github.com/dapr/quickstarts.git ``` -Navigate into the root and install the dependencies: +Navigate to the `order-processor` directory: ```bash -mvn install +cd workflows/java/sdk/order-processor ``` -Navigate to the `order-processor` directory: +Install the dependencies: ```bash -cd workflows/java/sdk/order-processor +mvn clean install ``` ### Step 3: Run the order processor app @@ -551,7 +550,7 @@ cd workflows/java/sdk/order-processor In the terminal, start the order processor app alongside a Dapr sidecar: ```bash - +dapr run --app-id WorkflowConsoleApp --resources-path ../../../components/ --dapr-grpc-port 50001 -- java -jar target/OrderProcessingService-0.0.1-SNAPSHOT.jar io.dapr.quickstarts.workflows.WorkflowConsoleApp ``` This starts the `order-processor` app with unique workflow ID and runs the workflow activities. @@ -559,29 +558,64 @@ This starts the `order-processor` app with unique workflow ID and runs the workf Expected output: ``` +== APP == *** Welcome to the Dapr Workflow console app sample! +== APP == *** Using this app, you can place orders that start workflows. +== APP == Start workflow runtime +== APP == Sep 20, 2023 3:23:05 PM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock +== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001. + +== APP == ==========Begin the purchase of item:========== +== APP == Starting order workflow, purchasing 10 of cars +== APP == scheduled new workflow instance of OrderProcessingWorkflow with instance ID: edceba90-9c45-4be8-ad40-60d16e060797 +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.quickstarts.workflows.OrderProcessingWorkflow +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Instance ID(order ID): edceba90-9c45-4be8-ad40-60d16e060797 +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Current Orchestration Time: 2023-09-20T19:23:09.755Z +== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.NotifyActivity - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == workflow instance edceba90-9c45-4be8-ad40-60d16e060797 started +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - Reserving inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - There are 100 cars available for purchase +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - Reserved inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.RequestApprovalActivity - Requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.RequestApprovalActivity - Approved requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10] +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity - Processing payment: edceba90-9c45-4be8-ad40-60d16e060797 for 10 cars at $150000 +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity - Payment for request ID 'edceba90-9c45-4be8-ad40-60d16e060797' processed successfully +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity - Updating inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797' of 10 cars +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity - Updated inventory for order 'edceba90-9c45-4be8-ad40-60d16e060797': there are now 90 cars left in stock +== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.NotifyActivity - Order completed! : edceba90-9c45-4be8-ad40-60d16e060797 + +== APP == workflow instance edceba90-9c45-4be8-ad40-60d16e060797 completed, out is: {"processed":true} ``` ### (Optional) Step 4: View in Zipkin -If you have Zipkin configured for Dapr locally on your machine, you can view the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`). +If you have Zipkin configured for Dapr locally on your machine, you can - +Running `dapr init` launches the [openzipkin/zipkin](https://hub.docker.com/r/openzipkin/zipkin/) Docker container. If the container has stopped running, launch the Zipkin Docker container with the following command: + +``` +docker run -d -p 9411:9411 openzipkin/zipkin +``` + +View the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`). + + ### What happened? -When you ran `todo`: +When you ran `dapr run`: -1. A unique order ID for the workflow is generated (in the above example, `6d2abcc9`) and the workflow is scheduled. +1. A unique order ID for the workflow is generated (in the above example, `edceba90-9c45-4be8-ad40-60d16e060797`) and the workflow is scheduled. 1. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received. 1. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock. -1. Your workflow starts and notifies you of its status. -1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `6d2abcc9` and confirms if successful. +1. Once approved, your workflow starts and notifies you of its status. +1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `edceba90-9c45-4be8-ad40-60d16e060797` and confirms if successful. 1. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed. -1. The `NotifyActivity` workflow activity sends a notification saying that order `6d2abcc9` has completed. +1. The `NotifyActivity` workflow activity sends a notification saying that order `edceba90-9c45-4be8-ad40-60d16e060797` has completed. 1. The workflow terminates as completed. -#### `order-processor/main file` +#### `order-processor/WorkflowConsoleApp.java` In the application's program file: - The unique workflow order ID is generated @@ -590,21 +624,221 @@ In the application's program file: - The workflow and the workflow activities it invokes are registered ```java +package io.dapr.quickstarts.workflows; +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.workflows.client.DaprWorkflowClient; + +public class WorkflowConsoleApp { + + private static final String STATE_STORE_NAME = "statestore-actors"; + + // ... + public static void main(String[] args) throws Exception { + System.out.println("*** Welcome to the Dapr Workflow console app sample!"); + System.out.println("*** Using this app, you can place orders that start workflows."); + // Wait for the sidecar to become available + Thread.sleep(5 * 1000); + + // Register the OrderProcessingWorkflow and its activities with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(OrderProcessingWorkflow.class); + builder.registerActivity(NotifyActivity.class); + builder.registerActivity(ProcessPaymentActivity.class); + builder.registerActivity(RequestApprovalActivity.class); + builder.registerActivity(ReserveInventoryActivity.class); + builder.registerActivity(UpdateInventoryActivity.class); + + // Build the workflow runtime + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(false); + } + + InventoryItem inventory = prepareInventoryAndOrder(); + + DaprWorkflowClient workflowClient = new DaprWorkflowClient(); + try (workflowClient) { + executeWorkflow(workflowClient, inventory); + } + } + + // Start the workflow runtime, pulling and executing tasks + private static void executeWorkflow(DaprWorkflowClient workflowClient, InventoryItem inventory) { + System.out.println("==========Begin the purchase of item:=========="); + String itemName = inventory.getName(); + int orderQuantity = inventory.getQuantity(); + int totalcost = orderQuantity * inventory.getPerItemCost(); + OrderPayload order = new OrderPayload(); + order.setItemName(itemName); + order.setQuantity(orderQuantity); + order.setTotalCost(totalcost); + System.out.println("Starting order workflow, purchasing " + orderQuantity + " of " + itemName); + + String instanceId = workflowClient.scheduleNewWorkflow(OrderProcessingWorkflow.class, order); + System.out.printf("scheduled new workflow instance of OrderProcessingWorkflow with instance ID: %s%n", + instanceId); + + // Check workflow instance start status + try { + workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false); + System.out.printf("workflow instance %s started%n", instanceId); + } catch (TimeoutException e) { + System.out.printf("workflow instance %s did not start within 10 seconds%n", instanceId); + return; + } + + // Check workflow instance complete status + try { + WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, + Duration.ofSeconds(30), + true); + if (workflowStatus != null) { + System.out.printf("workflow instance %s completed, out is: %s %n", instanceId, + workflowStatus.getSerializedOutput()); + } else { + System.out.printf("workflow instance %s not found%n", instanceId); + } + } catch (TimeoutException e) { + System.out.printf("workflow instance %s did not complete within 30 seconds%n", instanceId); + } + + } + + private static InventoryItem prepareInventoryAndOrder() { + // prepare 100 cars in inventory + InventoryItem inventory = new InventoryItem(); + inventory.setName("cars"); + inventory.setPerItemCost(15000); + inventory.setQuantity(100); + DaprClient daprClient = new DaprClientBuilder().build(); + restockInventory(daprClient, inventory); + + // prepare order for 10 cars + InventoryItem order = new InventoryItem(); + order.setName("cars"); + order.setPerItemCost(15000); + order.setQuantity(10); + return order; + } + + private static void restockInventory(DaprClient daprClient, InventoryItem inventory) { + String key = inventory.getName(); + daprClient.saveState(STATE_STORE_NAME, key, inventory).block(); + } +} ``` -#### `name of order processor` +#### `OrderProcessingWorkflow.java` -In `OrderProcessingWorkflow.cs`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities). +In `OrderProcessingWorkflow.java`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities). ```java - +package io.dapr.quickstarts.workflows; +import io.dapr.workflows.Workflow; + +public class OrderProcessingWorkflow extends Workflow { + + @Override + public WorkflowStub create() { + return ctx -> { + Logger logger = ctx.getLogger(); + String orderId = ctx.getInstanceId(); + logger.info("Starting Workflow: " + ctx.getName()); + logger.info("Instance ID(order ID): " + orderId); + logger.info("Current Orchestration Time: " + ctx.getCurrentInstant()); + + OrderPayload order = ctx.getInput(OrderPayload.class); + logger.info("Received Order: " + order.toString()); + OrderResult orderResult = new OrderResult(); + orderResult.setProcessed(false); + + // Notify the user that an order has come through + Notification notification = new Notification(); + notification.setMessage("Received Order: " + order.toString()); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + + // Determine if there is enough of the item available for purchase by checking + // the inventory + InventoryRequest inventoryRequest = new InventoryRequest(); + inventoryRequest.setRequestId(orderId); + inventoryRequest.setItemName(order.getItemName()); + inventoryRequest.setQuantity(order.getQuantity()); + InventoryResult inventoryResult = ctx.callActivity(ReserveInventoryActivity.class.getName(), + inventoryRequest, InventoryResult.class).await(); + + // If there is insufficient inventory, fail and let the user know + if (!inventoryResult.isSuccess()) { + notification.setMessage("Insufficient inventory for order : " + order.getItemName()); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + + // Require orders over a certain threshold to be approved + if (order.getTotalCost() > 5000) { + ApprovalResult approvalResult = ctx.callActivity(RequestApprovalActivity.class.getName(), + order, ApprovalResult.class).await(); + if (approvalResult != ApprovalResult.Approved) { + notification.setMessage("Order " + order.getItemName() + " was not approved."); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + } + + // There is enough inventory available so the user can purchase the item(s). + // Process their payment + PaymentRequest paymentRequest = new PaymentRequest(); + paymentRequest.setRequestId(orderId); + paymentRequest.setItemBeingPurchased(order.getItemName()); + paymentRequest.setQuantity(order.getQuantity()); + paymentRequest.setAmount(order.getTotalCost()); + boolean isOK = ctx.callActivity(ProcessPaymentActivity.class.getName(), + paymentRequest, boolean.class).await(); + if (!isOK) { + notification.setMessage("Payment failed for order : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + + inventoryResult = ctx.callActivity(UpdateInventoryActivity.class.getName(), + inventoryRequest, InventoryResult.class).await(); + if (!inventoryResult.isSuccess()) { + // If there is an error updating the inventory, refund the user + // paymentRequest.setAmount(-1 * paymentRequest.getAmount()); + // ctx.callActivity(ProcessPaymentActivity.class.getName(), + // paymentRequest).await(); + + // Let users know their payment processing failed + notification.setMessage("Order failed to update inventory! : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + ctx.complete(orderResult); + return; + } + + // Let user know their order was processed + notification.setMessage("Order completed! : " + orderId); + ctx.callActivity(NotifyActivity.class.getName(), notification).await(); + + // Complete the workflow with order result is processed + orderResult.setProcessed(true); + ctx.complete(orderResult); + }; + } + +} ``` #### `activities` directory The `Activities` directory holds the four workflow activities used by the workflow, defined in the following files: -- +- [`NotifyActivity.java`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/NotifyActivity.java) +- [`RequestApprovalActivity`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/RequestApprovalActivity.java) +- [`ReserveInventoryActivity`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/ReserveInventoryActivity.java) +- [`ProcessPaymentActivity`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/ProcessPaymentActivity.java) +- [`UpdateInventoryActivity`](https://github.com/dapr/quickstarts/tree/master/workflows/java/sdk/order-processor/src/main/java/io/dapr/quickstarts/workflows/activities/UpdateInventoryActivity.java) {{% /codetab %}}