Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
package io.dapr.examples.workflows;

import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowState;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* For setup instructions, see the README.
Expand All @@ -24,29 +27,53 @@ public class DemoWorkflowClient {

/**
* The main method.
*
* @param args Input arguments (unused).
* @throws InterruptedException If program has been interrupted.
*/
public static void main(String[] args) throws InterruptedException {
DaprWorkflowClient client = new DaprWorkflowClient();

try (client) {
System.out.println("*****");
String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class);
String separatorStr = "*******";
System.out.println(separatorStr);
String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data");
System.out.printf("Started new workflow instance with random ID: %s%n", instanceId);

System.out.println("Sleep and allow this workflow instance to timeout...");
TimeUnit.SECONDS.sleep(10);
System.out.println(separatorStr);
System.out.println("**GetInstanceMetadata:Running Workflow**");
WorkflowState workflowMetadata = client.getInstanceState(instanceId, true);
System.out.printf("Result: %s%n", workflowMetadata);

System.out.println("*****");
System.out.println(separatorStr);
System.out.println("**WaitForInstanceStart**");
try {
WorkflowState waitForInstanceStartResult =
client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true);
System.out.printf("Result: %s%n", waitForInstanceStartResult);
} catch (TimeoutException ex) {
System.out.printf("waitForInstanceStart has an exception:%s%n", ex);
}

System.out.println(separatorStr);
System.out.println("**WaitForInstanceCompletion**");
try {
WorkflowState waitForInstanceCompletionResult =
client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true);
System.out.printf("Result: %s%n", waitForInstanceCompletionResult);
} catch (TimeoutException ex) {
System.out.printf("waitForInstanceCompletion has an exception:%s%n", ex);
}

System.out.println(separatorStr);
String instanceToTerminateId = "terminateMe";
client.scheduleNewWorkflow(DemoWorkflow.class, null, instanceToTerminateId);
System.out.printf("Started new workflow instance with specified ID: %s%n", instanceToTerminateId);

TimeUnit.SECONDS.sleep(5);
System.out.println("Terminate this workflow instance manually before the timeout is reached");
client.terminateWorkflow(instanceToTerminateId, null);
System.out.println("*****");
System.out.println(separatorStr);
}

System.out.println("Exiting DemoWorkflowClient.");
Expand Down
6 changes: 6 additions & 0 deletions sdk-workflows/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,21 @@

import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.DurableTaskGrpcClientBuilder;
import com.microsoft.durabletask.OrchestrationMetadata;
import io.dapr.config.Properties;
import io.dapr.utils.Version;
import io.dapr.workflows.runtime.Workflow;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Defines client operations for managing Dapr Workflow instances.
*/
public class DaprWorkflowClient implements AutoCloseable {

private final DurableTaskClient innerClient;
Expand All @@ -50,7 +56,6 @@ private DaprWorkflowClient(ManagedChannel grpcChannel) {
*
* @param innerClient DurableTaskGrpcClient with GRPC Channel set up.
* @param grpcChannel ManagedChannel for instance variable setting.
*
*/
private DaprWorkflowClient(DurableTaskClient innerClient, ManagedChannel grpcChannel) {
this.innerClient = innerClient;
Expand Down Expand Up @@ -134,8 +139,78 @@ public void terminateWorkflow(String workflowInstanceId, @Nullable Object output
}

/**
* Closes the inner DurableTask client and shutdown the GRPC channel.
* Fetches workflow instance metadata from the configured durable store.
*
* @param instanceId the unique ID of the workflow instance to fetch
* @param getInputsAndOutputs <code>true</code> to fetch the workflow instance's
inputs, outputs, and custom status, or <code>false</code> to omit them
* @return a metadata record that describes the workflow instance and its
execution status, or a default instance if no such instance is found.
*/
@Nullable
public WorkflowState getInstanceState(String instanceId, boolean getInputsAndOutputs) {
OrchestrationMetadata metadata = this.innerClient.getInstanceMetadata(instanceId, getInputsAndOutputs);
if (metadata == null) {
return null;
}
return new WorkflowState(metadata);
}

/**
* Waits for an workflow to start running and returns an
* {@link WorkflowState} object that contains metadata about the started
* instance and optionally its input, output, and custom status payloads.
*
* <p>A "started" workflow instance is any instance not in the Pending state.
*
* <p>If an workflow instance is already running when this method is called,
* the method will return immediately.
*
* @param instanceId the unique ID of the workflow instance to wait for
* @param timeout the amount of time to wait for the workflow instance to start
* @param getInputsAndOutputs true to fetch the workflow instance's
* inputs, outputs, and custom status, or false to omit them
* @throws TimeoutException when the workflow instance is not started within the specified amount of time
* @return the workflow instance metadata or null if no such instance is found
*/
@Nullable
public WorkflowState waitForInstanceStart(String instanceId, Duration timeout, boolean getInputsAndOutputs)
throws TimeoutException {

OrchestrationMetadata metadata = this.innerClient.waitForInstanceStart(instanceId, timeout, getInputsAndOutputs);
return metadata == null ? null : new WorkflowState(metadata);
}

/**
* Waits for an workflow to complete and returns an {@link WorkflowState} object that contains
* metadata about the completed instance.
*
* <p>A "completed" workflow instance is any instance in one of the terminal states. For example, the
* Completed, Failed, or Terminated states.
*
* <p>Workflows are long-running and could take hours, days, or months before completing.
* Workflows can also be eternal, in which case they'll never complete unless terminated.
* In such cases, this call may block indefinitely, so care must be taken to ensure appropriate timeouts are used.
* If an workflow instance is already complete when this method is called, the method will return immediately.
*
* @param instanceId the unique ID of the workflow instance to wait for
* @param timeout the amount of time to wait for the workflow instance to complete
* @param getInputsAndOutputs true to fetch the workflow instance's inputs, outputs, and custom
* status, or false to omit them
* @throws TimeoutException when the workflow instance is not completed within the specified amount of time
* @return the workflow instance metadata or null if no such instance is found
*/
@Nullable
public WorkflowState waitForInstanceCompletion(String instanceId, Duration timeout,
boolean getInputsAndOutputs) throws TimeoutException {

OrchestrationMetadata metadata =
this.innerClient.waitForInstanceCompletion(instanceId, timeout, getInputsAndOutputs);
return metadata == null ? null : new WorkflowState(metadata);
}

/**
* Closes the inner DurableTask client and shutdown the GRPC channel.
*/
public void close() throws InterruptedException {
if (this.innerClient != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.workflows.client;

import com.microsoft.durabletask.FailureDetails;

/**
* Represents a workflow failure details.
*/
public class WorkflowFailureDetails {

FailureDetails workflowFailureDetails;

/**
* Class constructor.
* @param failureDetails failure Details
*/
public WorkflowFailureDetails(FailureDetails failureDetails) {
this.workflowFailureDetails = failureDetails;
}

/**
* Gets the error type, which is the namespace-qualified exception type name.
*
* @return the error type, which is the namespace-qualified exception type name
*/
public String getErrorType() {
return workflowFailureDetails.getErrorType();
}

/**
* Gets the error message.
*
* @return the error message
*/
public String getErrorMessage() {
return workflowFailureDetails.getErrorMessage();
}

/**
* Gets the stack trace.
*
* @return the stack trace
*/
public String getStackTrace() {
return workflowFailureDetails.getStackTrace();
}

@Override
public String toString() {
return workflowFailureDetails.toString();
}
}
Loading