Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk-workflows/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<dependency>
<groupId>io.dapr</groupId>
<artifactId>durabletask-client</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
</dependency>
<!--
manually declare durabletask-client's jackson dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,27 @@
public class WorkflowTaskOptions {

private final WorkflowTaskRetryPolicy retryPolicy;
private final WorkflowTaskRetryHandler retryHandler;

public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy) {
public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy, WorkflowTaskRetryHandler retryHandler) {
this.retryPolicy = retryPolicy;
this.retryHandler = retryHandler;
}

public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy) {
this(retryPolicy, null);
}

public WorkflowTaskOptions(WorkflowTaskRetryHandler retryHandler) {
this(null, retryHandler);
}

public WorkflowTaskRetryPolicy getRetryPolicy() {
return retryPolicy;
}

public WorkflowTaskRetryHandler getRetryHandler() {
return retryHandler;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.workflows;

import io.dapr.workflows.client.WorkflowFailureDetails;
import io.dapr.workflows.runtime.DefaultWorkflowContext;

import java.time.Duration;

public class WorkflowTaskRetryContext {

private final DefaultWorkflowContext workflowContext;
private final int lastAttemptNumber;
private final WorkflowFailureDetails lastFailure;
private final Duration totalRetryTime;

/**
* Constructor for WorkflowTaskRetryContext.
*
* @param workflowContext The workflow context
* @param lastAttemptNumber The number of the previous attempt
* @param lastFailure The failure details from the most recent failure
* @param totalRetryTime The amount of time spent retrying
*/
public WorkflowTaskRetryContext(
DefaultWorkflowContext workflowContext,
int lastAttemptNumber,
WorkflowFailureDetails lastFailure,
Duration totalRetryTime) {
this.workflowContext = workflowContext;
this.lastAttemptNumber = lastAttemptNumber;
this.lastFailure = lastFailure;
this.totalRetryTime = totalRetryTime;
}

/**
* Gets the context of the current workflow.
*
* <p>The workflow context can be used in retry handlers to schedule timers (via the
* {@link DefaultWorkflowContext#createTimer} methods) for implementing delays between retries. It can also be
* used to implement time-based retry logic by using the {@link DefaultWorkflowContext#getCurrentInstant} method.
*
* @return the context of the parent workflow
*/
public DefaultWorkflowContext getWorkflowContext() {
return this.workflowContext;
}

/**
* Gets the details of the previous task failure, including the exception type, message, and callstack.
*
* @return the details of the previous task failure
*/
public WorkflowFailureDetails getLastFailure() {
return this.lastFailure;
}

/**
* Gets the previous retry attempt number. This number starts at 1 and increments each time the retry handler
* is invoked for a particular task failure.
*
* @return the previous retry attempt number
*/
public int getLastAttemptNumber() {
return this.lastAttemptNumber;
}

/**
* Gets the total amount of time spent in a retry loop for the current task.
*
* @return the total amount of time spent in a retry loop for the current task
*/
public Duration getTotalRetryTime() {
return this.totalRetryTime;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.workflows;

public interface WorkflowTaskRetryHandler {

/**
* Invokes retry handler logic. Return value indicates whether to continue retrying.
*
* @param retryContext The context of the retry
* @return {@code true} to continue retrying or {@code false} to stop retrying.
*/
boolean handle(WorkflowTaskRetryContext retryContext);

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,14 @@ public interface WorkflowFailureDetails {
*/
String getStackTrace();

/**
* Checks whether the failure was caused by the provided exception class.
*
* @param exceptionClass the exception class to check
* @return {@code true} if the failure was caused by the provided exception class
*/
default boolean isCausedBy(Class<? extends Exception> exceptionClass) {
throw new UnsupportedOperationException("This method is not implemented");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
package io.dapr.workflows.runtime;

import io.dapr.durabletask.CompositeTaskFailedException;
import io.dapr.durabletask.RetryHandler;
import io.dapr.durabletask.RetryPolicy;
import io.dapr.durabletask.Task;
import io.dapr.durabletask.TaskCanceledException;
import io.dapr.durabletask.TaskOptions;
import io.dapr.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryContext;
import io.dapr.workflows.WorkflowTaskRetryHandler;
import io.dapr.workflows.WorkflowTaskRetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -228,22 +231,50 @@ public UUID newUuid() {
return this.innerContext.newUUID();
}

private static TaskOptions toTaskOptions(WorkflowTaskOptions options) {
private TaskOptions toTaskOptions(WorkflowTaskOptions options) {
if (options == null) {
return null;
}

WorkflowTaskRetryPolicy workflowTaskRetryPolicy = options.getRetryPolicy();
RetryPolicy retryPolicy = new RetryPolicy(
workflowTaskRetryPolicy.getMaxNumberOfAttempts(),
workflowTaskRetryPolicy.getFirstRetryInterval()
);
RetryPolicy retryPolicy = null;
RetryHandler retryHandler = toRetryHandler(options.getRetryHandler());

retryPolicy.setBackoffCoefficient(workflowTaskRetryPolicy.getBackoffCoefficient());
if (workflowTaskRetryPolicy.getRetryTimeout() != null) {
retryPolicy.setRetryTimeout(workflowTaskRetryPolicy.getRetryTimeout());
if (options.getRetryPolicy() != null) {
WorkflowTaskRetryPolicy workflowTaskRetryPolicy = options.getRetryPolicy();
retryPolicy = new RetryPolicy(
workflowTaskRetryPolicy.getMaxNumberOfAttempts(),
workflowTaskRetryPolicy.getFirstRetryInterval()
);

retryPolicy.setBackoffCoefficient(workflowTaskRetryPolicy.getBackoffCoefficient());
if (workflowTaskRetryPolicy.getRetryTimeout() != null) {
retryPolicy.setRetryTimeout(workflowTaskRetryPolicy.getRetryTimeout());
}
}

return new TaskOptions(retryPolicy, retryHandler);
}

/**
* Converts a {@link WorkflowTaskRetryHandler} to a {@link RetryHandler}.
*
* @param workflowTaskRetryHandler The {@link WorkflowTaskRetryHandler} being converted
* @return A {@link RetryHandler}
*/
private RetryHandler toRetryHandler(WorkflowTaskRetryHandler workflowTaskRetryHandler) {
if (workflowTaskRetryHandler == null) {
return null;
}

return new TaskOptions(retryPolicy);
return retryContext -> {
WorkflowTaskRetryContext workflowRetryContext = new WorkflowTaskRetryContext(
this,
retryContext.getLastAttemptNumber(),
new DefaultWorkflowFailureDetails(retryContext.getLastFailure()),
retryContext.getTotalRetryTime()
);

return workflowTaskRetryHandler.handle(workflowRetryContext);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ public String getStackTrace() {
return workflowFailureDetails.getStackTrace();
}

/**
* Checks whether the failure was caused by the provided exception class.
*
* @param exceptionClass the exception class to check
* @return {@code true} if the failure was caused by the provided exception class
*/
@Override
public boolean isCausedBy(Class<? extends Exception> exceptionClass) {
return workflowFailureDetails.isCausedBy(exceptionClass);
}

@Override
public String toString() {
return workflowFailureDetails.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package io.dapr.workflows;

import io.dapr.durabletask.CompositeTaskFailedException;
import io.dapr.durabletask.FailureDetails;
import io.dapr.durabletask.RetryContext;
import io.dapr.durabletask.RetryHandler;
import io.dapr.durabletask.Task;
import io.dapr.durabletask.TaskCanceledException;
import io.dapr.durabletask.TaskOptions;
Expand All @@ -35,10 +38,11 @@
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -278,7 +282,7 @@ public void callChildWorkflowWithName() {
}

@Test
public void callChildWorkflowWithOptions() {
public void callChildWorkflowWithRetryPolicy() {
String expectedName = "TestActivity";
String expectedInput = "TestInput";
String expectedInstanceId = "TestInstanceId";
Expand All @@ -305,6 +309,90 @@ public void callChildWorkflowWithOptions() {
assertEquals(retryPolicy.getMaxNumberOfAttempts(), taskOptions.getRetryPolicy().getMaxNumberOfAttempts());
assertEquals(retryPolicy.getFirstRetryInterval(), taskOptions.getRetryPolicy().getFirstRetryInterval());
assertEquals(Duration.ZERO, taskOptions.getRetryPolicy().getRetryTimeout());
assertNull(taskOptions.getRetryHandler());
}

@Test
public void callChildWorkflowWithRetryHandler() {
String expectedName = "TestActivity";
String expectedInput = "TestInput";
String expectedInstanceId = "TestInstanceId";

WorkflowTaskRetryHandler retryHandler = spy(new WorkflowTaskRetryHandler() {
@Override
public boolean handle(WorkflowTaskRetryContext retryContext) {
return true;
}
});

WorkflowTaskOptions executionOptions = new WorkflowTaskOptions(retryHandler);
ArgumentCaptor<TaskOptions> captor = ArgumentCaptor.forClass(TaskOptions.class);

context.callChildWorkflow(expectedName, expectedInput, expectedInstanceId, executionOptions, String.class);

verify(mockInnerContext, times(1))
.callSubOrchestrator(
eq(expectedName),
eq(expectedInput),
eq(expectedInstanceId),
captor.capture(),
eq(String.class)
);

TaskOptions taskOptions = captor.getValue();

RetryHandler durableRetryHandler = taskOptions.getRetryHandler();
RetryContext retryContext = mock(RetryContext.class, invocationOnMock -> null);

durableRetryHandler.handle(retryContext);

verify(retryHandler, times(1)).handle(any());
assertNull(taskOptions.getRetryPolicy());
}

@Test
public void callChildWorkflowWithRetryPolicyAndHandler() {
String expectedName = "TestActivity";
String expectedInput = "TestInput";
String expectedInstanceId = "TestInstanceId";

WorkflowTaskRetryPolicy retryPolicy = WorkflowTaskRetryPolicy.newBuilder()
.setMaxNumberOfAttempts(1)
.setFirstRetryInterval(Duration.ofSeconds(10))
.build();

WorkflowTaskRetryHandler retryHandler = spy(new WorkflowTaskRetryHandler() {
@Override
public boolean handle(WorkflowTaskRetryContext retryContext) {
return true;
}
});

WorkflowTaskOptions executionOptions = new WorkflowTaskOptions(retryPolicy, retryHandler);
ArgumentCaptor<TaskOptions> captor = ArgumentCaptor.forClass(TaskOptions.class);

context.callChildWorkflow(expectedName, expectedInput, expectedInstanceId, executionOptions, String.class);

verify(mockInnerContext, times(1))
.callSubOrchestrator(
eq(expectedName),
eq(expectedInput),
eq(expectedInstanceId),
captor.capture(),
eq(String.class)
);

TaskOptions taskOptions = captor.getValue();

RetryHandler durableRetryHandler = taskOptions.getRetryHandler();
RetryContext retryContext = mock(RetryContext.class, invocationOnMock -> null);

durableRetryHandler.handle(retryContext);

verify(retryHandler, times(1)).handle(any());
assertEquals(retryPolicy.getMaxNumberOfAttempts(), taskOptions.getRetryPolicy().getMaxNumberOfAttempts());
assertEquals(retryPolicy.getFirstRetryInterval(), taskOptions.getRetryPolicy().getFirstRetryInterval());
assertEquals(Duration.ZERO, taskOptions.getRetryPolicy().getRetryTimeout());
}

@Test
Expand Down