Skip to content

Commit

Permalink
Add rpc interface without persistence (#219)
Browse files Browse the repository at this point in the history
  • Loading branch information
duoertai authored Mar 9, 2024
1 parent b640e6c commit dc66ac7
Show file tree
Hide file tree
Showing 9 changed files with 358 additions and 80 deletions.
46 changes: 45 additions & 1 deletion src/main/java/io/iworkflow/core/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class Client {
final ClientOptions clientOptions;

/**
* return a full featured client. If you don't have the workflow Registry, you should use {@link UnregisteredClient} instead
* return a full-featured client. If you don't have the workflow Registry, you should use {@link UnregisteredClient} instead
*
* @param registry registry is required so that this client can perform some validation checks (workflow types, channel names)
* @param clientOptions is for configuring the client
Expand Down Expand Up @@ -704,6 +704,19 @@ public <I, O> O invokeRPC(RpcDefinitions.RpcFunc1<I, O> rpcStubMethod, I input)
return rpcStubMethod.execute(null, input, null, null);
}

/**
* invoking the RPC through RPC stub
*
* @param rpcStubMethod the RPC method from stub created by {@link #newRpcStub(Class, String, String)}
* @param input the input of the RPC method
* @param <I> the input type
* @param <O> the output type
* @return output
*/
public <I, O> O invokeRPC(RpcDefinitions.RpcFunc1NoPersistence<I, O> rpcStubMethod, I input) {
return rpcStubMethod.execute(null, input, null);
}

/**
* invoking the RPC through RPC stub
*
Expand All @@ -715,6 +728,17 @@ public <O> O invokeRPC(RpcDefinitions.RpcFunc0<O> rpcStubMethod) {
return rpcStubMethod.execute(null, null, null);
}

/**
* invoking the RPC through RPC stub
*
* @param rpcStubMethod the RPC method from stub created by {@link #newRpcStub(Class, String, String)}
* @param <O> the output type
* @return output
*/
public <O> O invokeRPC(RpcDefinitions.RpcFunc0NoPersistence<O> rpcStubMethod) {
return rpcStubMethod.execute(null, null);
}

/**
* invoking the RPC through RPC stub
*
Expand All @@ -726,6 +750,17 @@ public <I> void invokeRPC(RpcDefinitions.RpcProc1<I> rpcStubMethod, I input) {
rpcStubMethod.execute(null, input, null, null);
}

/**
* invoking the RPC through RPC stub
*
* @param rpcStubMethod the RPC method from stub created by {@link #newRpcStub(Class, String, String)}
* @param input the input of the RPC method
* @param <I> the input type
*/
public <I> void invokeRPC(RpcDefinitions.RpcProc1NoPersistence<I> rpcStubMethod, I input) {
rpcStubMethod.execute(null, input, null);
}

/**
* invoking the RPC through RPC stub
*
Expand All @@ -735,6 +770,15 @@ public void invokeRPC(RpcDefinitions.RpcProc0 rpcStubMethod) {
rpcStubMethod.execute(null, null, null);
}

/**
* invoking the RPC through RPC stub
*
* @param rpcStubMethod the RPC method from stub created by {@link #newRpcStub(Class, String, String)}
*/
public void invokeRPC(RpcDefinitions.RpcProc0NoPersistence rpcStubMethod) {
rpcStubMethod.execute(null, null);
}

/**
* Get specified search attributes (by attributeKeys) of a workflow
*
Expand Down
88 changes: 62 additions & 26 deletions src/main/java/io/iworkflow/core/RpcDefinitions.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package io.iworkflow.core;

import com.google.common.collect.ImmutableMap;
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.persistence.Persistence;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Map;

public final class RpcDefinitions {
private RpcDefinitions() {
}

/**
* RPC with input and output
*
* RPC definition
* with: input, output, persistence, communication
* without: NA
* @param <I> input type
* @param <O> output type
*/
Expand All @@ -22,8 +25,21 @@ public interface RpcFunc1<I, O> extends Serializable {
}

/**
* RPC with output only
*
* RPC definition
* with: input, output, communication
* without: persistence
* @param <I> input type
* @param <O> output type
*/
@FunctionalInterface
public interface RpcFunc1NoPersistence<I, O> extends Serializable {
O execute(Context context, I input, Communication communication);
}

/**
* RPC definition
* with: output, persistence, communication
* without: input
* @param <O> output type
*/
@FunctionalInterface
Expand All @@ -32,8 +48,20 @@ public interface RpcFunc0<O> extends Serializable {
}

/**
* RPC with input only
*
* RPC definition
* with: output, communication
* without: input, persistence
* @param <O> output type
*/
@FunctionalInterface
public interface RpcFunc0NoPersistence<O> extends Serializable {
O execute(Context context, Communication communication);
}

/**
* RPC definition
* with: input, persistence, communication
* without: output
* @param <I> input type
*/
@FunctionalInterface
Expand All @@ -42,34 +70,42 @@ public interface RpcProc1<I> extends Serializable {
}

/**
* RPC without input or output
* RPC definition
* with: input, communication
* without: output, persistence
* @param <I> input type
*/
@FunctionalInterface
public interface RpcProc1NoPersistence<I> extends Serializable {
void execute(Context context, I input, Communication communication);
}

/**
* RPC definition
* with: persistence, communication
* without: input, output
*/
@FunctionalInterface
public interface RpcProc0 extends Serializable {
void execute(Context context, Persistence persistence, Communication communication);
}

public static final int PARAMETERS_WITH_INPUT = 4;
public static final int PARAMETERS_NO_INPUT = 3;
/**
* RPC definition
* with: communication
* without: input, output, persistence
*/
@FunctionalInterface
public interface RpcProc0NoPersistence extends Serializable {
void execute(Context context, Communication communication);
}

public static final int INDEX_OF_INPUT_PARAMETER = 1;
public static final String ERROR_MESSAGE = "An RPC method must be in the form of one of {@link RpcDefinitions}";

public static void validateRpcMethod(final Method method) {
final Class<?>[] paramTypes = method.getParameterTypes();
final Class<?> persistenceType, communicationType, contextType;
if (paramTypes.length == PARAMETERS_NO_INPUT) {
contextType = paramTypes[0];
persistenceType = paramTypes[1];
communicationType = paramTypes[2];
} else if (paramTypes.length == PARAMETERS_WITH_INPUT) {
contextType = paramTypes[0];
persistenceType = paramTypes[2];
communicationType = paramTypes[3];
} else {
throw new WorkflowDefinitionException("An RPC method must be in the form of one of {@link RpcDefinitions}");
}
if (!persistenceType.equals(Persistence.class) || !communicationType.equals(Communication.class)) {
throw new WorkflowDefinitionException("An RPC method must be in the form of one of {@link RpcDefinitions}");
RpcMethodMetadata methodMetadata = RpcMethodMatcher.match(method);
if (methodMetadata == null) {
throw new WorkflowDefinitionException(ERROR_MESSAGE);
}
}
}
}
63 changes: 43 additions & 20 deletions src/main/java/io/iworkflow/core/RpcInvocationHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.iworkflow.core.persistence.PersistenceOptions;
import io.iworkflow.gen.models.PersistenceLoadingPolicy;
import io.iworkflow.gen.models.PersistenceLoadingType;
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
import net.bytebuddy.implementation.bind.annotation.AllArguments;
import net.bytebuddy.implementation.bind.annotation.Origin;
Expand All @@ -11,9 +12,7 @@
import java.util.Arrays;
import java.util.List;

import static io.iworkflow.core.RpcDefinitions.INDEX_OF_INPUT_PARAMETER;
import static io.iworkflow.core.RpcDefinitions.PARAMETERS_WITH_INPUT;
import static io.iworkflow.core.RpcDefinitions.validateRpcMethod;
import static io.iworkflow.core.RpcDefinitions.*;

public class RpcInvocationHandler {

Expand Down Expand Up @@ -41,30 +40,54 @@ public Object intercept(@AllArguments Object[] allArguments,
if (rpcAnno == null) {
throw new WorkflowDefinitionException("An RPC method must be annotated by RPC annotation");
}
validateRpcMethod(method);
Object input = null;
if (method.getParameterTypes().length == PARAMETERS_WITH_INPUT) {
input = allArguments[INDEX_OF_INPUT_PARAMETER];

RpcMethodMetadata metadata = RpcMethodMatcher.match(method);
if (metadata == null) {
throw new WorkflowDefinitionException("An RPC method must be annotated by RPC annotation");
}
Object input = metadata.hasInput() ? allArguments[metadata.getInputIndex()] : null;

final Class<?> outputType = method.getReturnType();

boolean useMemo = schemaOptions.getEnableCaching();
if (rpcAnno.bypassCachingForStrongConsistency()) {
useMemo = false;
}
final Object output = unregisteredClient.invokeRpc(outputType, input, workflowId, workflowRunId, method.getName(), rpcAnno.timeoutSeconds(),
new PersistenceLoadingPolicy()
.persistenceLoadingType(rpcAnno.dataAttributesLoadingType())
.partialLoadingKeys(Arrays.asList(rpcAnno.dataAttributesPartialLoadingKeys()))
.lockingKeys(Arrays.asList(rpcAnno.dataAttributesLockingKeys())),
new PersistenceLoadingPolicy()
.persistenceLoadingType(rpcAnno.searchAttributesLoadingType())
.lockingKeys(Arrays.asList(rpcAnno.searchAttributesLockingKeys()))
.partialLoadingKeys(Arrays.asList(rpcAnno.searchAttributesPartialLoadingKeys())),
useMemo,
searchAttributeKeyAndTypes
);
return output;

if (metadata.usesPersistence()) {
return unregisteredClient.invokeRpc(
outputType,
input,
workflowId,
workflowRunId,
method.getName(),
rpcAnno.timeoutSeconds(),
new PersistenceLoadingPolicy()
.persistenceLoadingType(rpcAnno.dataAttributesLoadingType())
.partialLoadingKeys(Arrays.asList(rpcAnno.dataAttributesPartialLoadingKeys()))
.lockingKeys(Arrays.asList(rpcAnno.dataAttributesLockingKeys())),
new PersistenceLoadingPolicy()
.persistenceLoadingType(rpcAnno.searchAttributesLoadingType())
.lockingKeys(Arrays.asList(rpcAnno.searchAttributesLockingKeys()))
.partialLoadingKeys(Arrays.asList(rpcAnno.searchAttributesPartialLoadingKeys())),
useMemo,
searchAttributeKeyAndTypes
);
} else {
return unregisteredClient.invokeRpc(
outputType,
input,
workflowId,
workflowRunId,
method.getName(),
rpcAnno.timeoutSeconds(),
new PersistenceLoadingPolicy()
.persistenceLoadingType(PersistenceLoadingType.NONE),
new PersistenceLoadingPolicy()
.persistenceLoadingType(PersistenceLoadingType.NONE),
useMemo,
null);
}

}
}
Loading

0 comments on commit dc66ac7

Please sign in to comment.