Skip to content

Commit b87c388

Browse files
Add waitForStateExecutionCompletion API (#207)
1 parent d2ae843 commit b87c388

File tree

9 files changed

+45
-12
lines changed

9 files changed

+45
-12
lines changed

iwf-idl

script/docker-compose-init.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ for run in {1..120}; do
3333
sleep 0.1
3434
tctl search-attribute create -name CustomStringField -type text -y
3535

36-
if checkExists "IwfWorkflowType" ] && checkExists "IwfGlobalWorkflowVersion" && checkExists "IwfExecutingStateIds" && checkExists "CustomKeywordField" && checkExists "CustomIntField" && checkExists "CustomBoolField" && checkExists "CustomDoubleField" && checkExists "CustomDatetimeField" && checkExists "CustomStringField" ] ; then
36+
if checkExists "IwfWorkflowType" && checkExists "IwfGlobalWorkflowVersion" && checkExists "IwfExecutingStateIds" && checkExists "CustomKeywordField" && checkExists "CustomIntField" && checkExists "CustomBoolField" && checkExists "CustomDoubleField" && checkExists "CustomDatetimeField" && checkExists "CustomStringField"; then
3737
echo "All search attributes are registered"
3838
break
3939
fi

src/main/java/io/iworkflow/core/Client.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.iworkflow.core;
22

3-
import feign.FeignException;
43
import io.iworkflow.core.persistence.PersistenceOptions;
54
import io.iworkflow.gen.models.KeyValue;
65
import io.iworkflow.gen.models.SearchAttribute;
@@ -128,6 +127,7 @@ public String startWorkflow(
128127
unregisterWorkflowOptions.cronSchedule(options.getCronSchedule());
129128
unregisterWorkflowOptions.workflowRetryPolicy(options.getWorkflowRetryPolicy());
130129
unregisterWorkflowOptions.workflowConfigOverride(options.getWorkflowConfigOverride());
130+
unregisterWorkflowOptions.waitForCompletionStateExecutionIds(options.getWaitForCompletionStateExecutionIds());
131131

132132
final Map<String, SearchAttributeValueType> saTypes = registry.getSearchAttributeKeyToTypeMap(wfType);
133133
final List<SearchAttribute> convertedSAs = convertToSearchAttributeList(saTypes, options.getInitialSearchAttribute());
@@ -910,4 +910,11 @@ public void skipTimer(
910910
final int timerCommandIndex) {
911911
unregisteredClient.skipTimer(workflowId, workflowRunId, workflowStateId, stateExecutionNumber, timerCommandIndex);
912912
}
913+
914+
public <T> T waitForStateExecutionCompletion(
915+
final Class<T> valueClass,
916+
final String workflowId,
917+
final String stateExecutionId) {
918+
return unregisteredClient.waitForStateExecutionCompletion(valueClass, workflowId, stateExecutionId);
919+
}
913920
}

src/main/java/io/iworkflow/core/UnregisteredClient.java

+24
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import io.iworkflow.gen.models.WorkflowStartResponse;
3232
import io.iworkflow.gen.models.WorkflowStatus;
3333
import io.iworkflow.gen.models.WorkflowStopRequest;
34+
import io.iworkflow.gen.models.WorkflowWaitForStateCompletionRequest;
35+
import io.iworkflow.gen.models.WorkflowWaitForStateCompletionResponse;
3436

3537
import java.util.List;
3638
import java.util.stream.Collectors;
@@ -152,6 +154,8 @@ public String startWorkflow(
152154
}
153155

154156
request.workflowStartOptions(startOptions);
157+
158+
request.waitForCompletionStateExecutionIds(options.getWaitForCompletionStateExecutionIds());
155159
}
156160

157161
try {
@@ -341,6 +345,26 @@ private List<StateCompletionOutput> getWorkflowResults(
341345
return results;
342346
}
343347

348+
public <T> T waitForStateExecutionCompletion(
349+
final Class<T> valueClass,
350+
final String workflowId,
351+
final String stateExecutionId) {
352+
final WorkflowWaitForStateCompletionRequest request = new WorkflowWaitForStateCompletionRequest()
353+
.stateExecutionId(stateExecutionId)
354+
.workflowId(workflowId);
355+
final WorkflowWaitForStateCompletionResponse response;
356+
try {
357+
response = defaultApi.apiV1WorkflowWaitForStateCompletionPost(request);
358+
} catch (final FeignException.FeignClientException exp) {
359+
throw IwfHttpException.fromFeignException(clientOptions.getObjectEncoder(), exp);
360+
}
361+
362+
if (response.getStateCompletionOutput() == null) {
363+
return null;
364+
}
365+
return clientOptions.getObjectEncoder().decode(response.getStateCompletionOutput().getCompletedStateOutput(), valueClass);
366+
}
367+
344368
private void throwUncompletedException(final WorkflowGetResponse workflowGetResponse) {
345369
throw new WorkflowUncompletedException(
346370
workflowGetResponse.getWorkflowRunId(),

src/main/java/io/iworkflow/core/UnregisteredWorkflowOptions.java

+2
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,6 @@ public abstract class UnregisteredWorkflowOptions {
2525
public abstract Optional<WorkflowConfig> getWorkflowConfigOverride();
2626

2727
public abstract Optional<Boolean> getUsingMemoForDataAttributes();
28+
29+
public abstract List<String> getWaitForCompletionStateExecutionIds();
2830
}

src/main/java/io/iworkflow/core/WorkflowOptions.java

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.iworkflow.gen.models.WorkflowRetryPolicy;
66
import org.immutables.value.Value;
77

8+
import java.util.List;
89
import java.util.Map;
910
import java.util.Optional;
1011

@@ -19,4 +20,6 @@ public abstract class WorkflowOptions {
1920
public abstract Map<String, Object> getInitialSearchAttribute();
2021

2122
public abstract Optional<WorkflowConfig> getWorkflowConfigOverride();
23+
24+
public abstract List<String> getWaitForCompletionStateExecutionIds();
2225
}

src/test/java/io/iworkflow/integ/RpcTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void testRPCLocking() throws InterruptedException, ExecutionException {
6060

6161
ExecutorService executor = Executors.newFixedThreadPool(10);
6262
final ArrayList<Future<String>> futures = new ArrayList<>();
63-
int total = 1000;
63+
int total = 100;
6464
for (int i = 0; i < total; i++) {
6565

6666
final Future<String> future = executor.submit(() -> {
@@ -107,7 +107,7 @@ public void testRPCWorkflowFunc1() throws InterruptedException {
107107
final String runId = client.startWorkflow(
108108
RpcWorkflow.class, wfId, 10, 999);
109109

110-
final RpcWorkflow rpcStub = client.newRpcStub(RpcWorkflow.class, wfId, "");
110+
final RpcWorkflow rpcStub = client.newRpcStub(RpcWorkflow.class, wfId, "" );
111111

112112
client.invokeRPC(rpcStub::testRpcSetDataAttribute, "test-value");
113113
String value = client.invokeRPC(rpcStub::testRpcGetDataAttribute);

src/test/java/io/iworkflow/integ/TimerTest.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.iworkflow.core.Client;
44
import io.iworkflow.core.ClientOptions;
5+
import io.iworkflow.core.ImmutableWorkflowOptions;
56
import io.iworkflow.integ.timer.BasicTimerWorkflow;
67
import io.iworkflow.spring.TestSingletonWorkerService;
78
import io.iworkflow.spring.controller.WorkflowRegistry;
@@ -26,8 +27,10 @@ public void testBasicTimerWorkflow() throws InterruptedException {
2627
final Integer input = 5;
2728

2829
client.startWorkflow(
29-
BasicTimerWorkflow.class, wfId, 10, input);
30+
BasicTimerWorkflow.class, wfId, 10, input,
31+
ImmutableWorkflowOptions.builder().addWaitForCompletionStateExecutionIds("BasicTimerWorkflowState1-1").build());
3032

33+
client.waitForStateExecutionCompletion(Void.class, wfId, "BasicTimerWorkflowState1-1");
3134
client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
3235
final long elapsed = System.currentTimeMillis() - startTs;
3336
Assertions.assertTrue(elapsed >= 4000 && elapsed <= 7000, String.format("actual duration: %d", elapsed));

src/test/java/io/iworkflow/integ/timer/BasicTimerWorkflowState1.java

-6
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,8 @@
1212
import java.time.Duration;
1313

1414
public class BasicTimerWorkflowState1 implements WorkflowState<Integer> {
15-
public static final String STATE_ID = "timer-s1";
1615
public static final String COMMAND_ID = "test-timer-id";
1716

18-
@Override
19-
public String getStateId() {
20-
return STATE_ID;
21-
}
22-
2317
@Override
2418
public Class<Integer> getInputType() {
2519
return Integer.class;

0 commit comments

Comments
 (0)