Skip to content

Commit

Permalink
Refactor trackers
Browse files Browse the repository at this point in the history
  • Loading branch information
Mindgamesnl committed Aug 24, 2023
1 parent 4eb5cfd commit 9af3744
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 88 deletions.
94 changes: 23 additions & 71 deletions rpcnis-core/src/main/java/com/rpcnis/core/Rpcnis.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@
import com.rpcnis.base.RpcTransport;
import com.rpcnis.base.defaults.GsonSerializer;
import com.rpcnis.core.executor.ImplementationWrapper;
import com.rpcnis.core.models.InvocationDescriptor;
import com.rpcnis.core.proxy.PendingInvocation;
import com.rpcnis.core.proxy.ProxyInvocHandler;
import com.rpcnis.core.proxy.RpcnisMock;
import com.rpcnis.core.trackers.IncomingInvocationTracker;
import com.rpcnis.core.trackers.OutgoingInvocationTracker;

import java.lang.reflect.Proxy;
import java.util.Collection;
import java.util.Locale;
import java.util.Timer;
import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;

public class Rpcnis {
Expand All @@ -27,34 +25,33 @@ public class Rpcnis {
// Timer for scheduling timeouts and retries
private final Timer timer = new Timer();

// Map of pending invocations, keyed by invocation id
private final ConcurrentHashMap<UUID, PendingInvocation<?>> pendingInvocations = new ConcurrentHashMap<>();

// Map invocation handlers by type; used for dispatching incoming invocations
// Handlers without a namespace (which is nullable) are also stored here
private final ConcurrentHashMap<Class<?>, Collection<ImplementationWrapper>> invocationHandlers = new ConcurrentHashMap<>();
private final OutgoingInvocationTracker outgoingInvocationTracker;
private final IncomingInvocationTracker incomingInvocationTracker;

/**
* @param options A preconfigured RpcOptions object.
* @param options A preconfigured RpcOptions object.
* @param serializer The serializer to use for serializing and deserializing objects.
* @param transport The transport to use for sending and receiving data.
* @param transport The transport to use for sending and receiving data.
*/
public Rpcnis(RpcTransport transport, RpcOptions options, RpcSerializer serializer) {
this.transport = transport;
this.options = options;
this.serializer = serializer;

outgoingInvocationTracker = new OutgoingInvocationTracker(options, timer);
incomingInvocationTracker = new IncomingInvocationTracker();
}

/**
* @param serializer The serializer to use for serializing and deserializing objects.
* @param transport The transport to use for sending and receiving data.
* @param transport The transport to use for sending and receiving data.
*/
public Rpcnis(RpcTransport transport, RpcSerializer serializer) {
this(transport, new RpcOptions(), serializer);
}

/**
* @param options A preconfigured RpcOptions object.
* @param options A preconfigured RpcOptions object.
* @param transport The transport to use for sending and receiving data.
*/
public Rpcnis(RpcTransport transport, RpcOptions options) {
Expand All @@ -63,6 +60,7 @@ public Rpcnis(RpcTransport transport, RpcOptions options) {

/**
* Use default GsonSerializer and options.
*
* @param transport The transport to use for sending and receiving data.
*/
public Rpcnis(RpcTransport transport) {
Expand All @@ -78,27 +76,29 @@ public RpcOptions getOptions() {

/**
* Register a procedure without a namespace.
*
* @param procedure The interface to register as a procedure.
* @param <T> The type of the interface.
* @param <T> The type of the interface.
* @return A proxy object that implements the given interface.
*/
public <T> T registerProcedure(Class<T> procedure) {
return registerProcedure(procedure,procedure.getSimpleName().toLowerCase(Locale.ROOT));
return registerProcedure(procedure, procedure.getSimpleName().toLowerCase(Locale.ROOT));
}

/**
* Register a procedure with a namespace. Invocations will only be mapped on implementations with the same namespace.
*
* @param procedure The interface to register as a procedure.
* @param name The name of the procedure.
* @param <T> The type of the interface.
* @param name The name of the procedure.
* @param <T> The type of the interface.
* @return A proxy object that implements the given interface.
*/
public <T> T registerProcedure(Class<T> procedure, String name) {
if (!procedure.isInterface()) {
throw new IllegalArgumentException("Procedure must be an interface");
}

return procedure.cast(Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{procedure, RpcnisMock.class}, new ProxyInvocHandler(this,name)));
return procedure.cast(Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{procedure, RpcnisMock.class}, new ProxyInvocHandler(outgoingInvocationTracker, name)));
}

/**
Expand All @@ -120,70 +120,22 @@ public static boolean isRpcnis(Object target) {
/**
* Received remote procedure calls will be dispatched to implementations registered with this method.
* The implementation will be registered under all interfaces implemented by the object, and under the given namespace.
*
* @param implementation The object to register as an implementation.
* @param namespace The namespace to register the implementation under.
* @param namespace The namespace to register the implementation under.
*/
public void registerImplementation(Object implementation, String namespace) {
// get the interfaces implemented by the implementation
Class<?>[] interfaces = implementation.getClass().getInterfaces();

// there must be at least one interface
if (interfaces.length == 0) {
throw new IllegalArgumentException("Implementation must implement at least one interface/procedure");
}

// register this interface as all of the implemented interfaces
ImplementationWrapper implementationWrapper = new ImplementationWrapper(implementation, namespace);

for (Class<?> anInterface : interfaces) {
invocationHandlers.computeIfAbsent(anInterface, k -> ConcurrentHashMap.newKeySet()).add(implementationWrapper);
}
incomingInvocationTracker.registerImplementation(implementation, namespace);
}

/**
* Received remote procedure calls will be dispatched to implementations registered with this method.
* The implementation will be registered under all interfaces implemented by the object, and must be called without a namespace.
*
* @param implementation The object to register as an implementation.
*/
public void registerImplementation(Object implementation) {
registerImplementation(implementation, null);
}

/* === INTERNAL METHODS === */
public <T> T invokeRemoteMethod(InvocationDescriptor invocationDescriptor) throws Throwable {
// create a pending invocation
PendingInvocation<T> pendingInvocation = new PendingInvocation<>(this, invocationDescriptor, () -> {
// remove the pending invocation from the map
pendingInvocations.remove(invocationDescriptor.getUniqueInvocationId());
});

// add the pending invocation to the map
pendingInvocations.put(invocationDescriptor.getUniqueInvocationId(), pendingInvocation);

// TODO: transmit

// wait for response or timeout
try {
return pendingInvocation.waitForResponse();
} catch (CompletionException e) {
throw e.getCause();
}
}

public void completeInvocation(InvocationDescriptor invocationDescriptor, Object value) {
// do we have a pending invocation for this invocation id?
PendingInvocation<?> pendingInvocation = pendingInvocations.get(invocationDescriptor.getUniqueInvocationId());
if (pendingInvocation == null) {
throw new IllegalStateException("No pending invocation found for invocation id " + invocationDescriptor.getUniqueInvocationId());
}

pendingInvocation.complete(value);

// remove the pending invocation from the map
pendingInvocations.remove(invocationDescriptor.getUniqueInvocationId());
}

public Timer getTimer() {
return timer;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.rpcnis.core.proxy;

import com.rpcnis.base.RpcOptions;
import com.rpcnis.base.errors.InvocationTimedOutException;
import com.rpcnis.core.Rpcnis;
import com.rpcnis.core.models.InvocationDescriptor;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -26,14 +28,14 @@ public class PendingInvocation<T> extends TimerTask {
private final AtomicBoolean isComplete = new AtomicBoolean(false);
private final AtomicBoolean isTimedOut = new AtomicBoolean(false);

public PendingInvocation(Rpcnis rpcnis, InvocationDescriptor invocationDescriptor, Runnable timeoutCallback) {
public PendingInvocation(int timeoutSeconds, Timer timer, InvocationDescriptor invocationDescriptor, Runnable timeoutCallback) {
this.invocationDescriptor = invocationDescriptor;
this.timeoutCallback = timeoutCallback;
this.timeoutSeconds = rpcnis.getOptions().getTimeoutSeconds();
this.timeoutSeconds = timeoutSeconds;
completable = new CompletableFuture<>();

// schedule timeout, timeoutSeconds is in seconds, Timer.schedule() takes milliseconds
rpcnis.getTimer().schedule(this, TimeUnit.SECONDS.toMillis(timeoutSeconds));
timer.schedule(this, TimeUnit.SECONDS.toMillis(timeoutSeconds));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package com.rpcnis.core.proxy;

import com.rpcnis.core.Rpcnis;
import com.rpcnis.core.models.InvocationDescriptor;
import com.rpcnis.core.trackers.OutgoingInvocationTracker;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class ProxyInvocHandler implements InvocationHandler {

private final Rpcnis rpcnis;
private final OutgoingInvocationTracker localInvocationTracker;
private final String namespace;

public ProxyInvocHandler(Rpcnis rpcnis, String namespace) {
this.rpcnis = rpcnis;
public ProxyInvocHandler(OutgoingInvocationTracker outgoingInvocationTracker, String namespace) {
this.localInvocationTracker = outgoingInvocationTracker;
this.namespace = namespace;
}

Expand All @@ -26,7 +26,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
InvocationDescriptor invocationDescriptor = new InvocationDescriptor(namespace, method.getName(), args, argTypes, method.getReturnType());

// wait for response or timeout
return rpcnis.invokeRemoteMethod(invocationDescriptor);
return localInvocationTracker.invokeRemoteMethod(invocationDescriptor);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.rpcnis.core.trackers;

import com.rpcnis.core.executor.ImplementationWrapper;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;

public class IncomingInvocationTracker {

// Map invocation handlers by type; used for dispatching incoming invocations
// Handlers without a namespace (which is nullable) are also stored here
private final ConcurrentHashMap<Class<?>, Collection<ImplementationWrapper>> implementations = new ConcurrentHashMap<>();

public void registerImplementation(Object implementation, String namespace) {
// get the interfaces implemented by the implementation
Class<?>[] interfaces = implementation.getClass().getInterfaces();

// there must be at least one interface
if (interfaces.length == 0) {
throw new IllegalArgumentException("Implementation must implement at least one interface/procedure");
}

// register this interface as all the implemented interfaces
ImplementationWrapper implementationWrapper = new ImplementationWrapper(implementation, namespace);

for (Class<?> anInterface : interfaces) {
implementations.computeIfAbsent(anInterface, k -> ConcurrentHashMap.newKeySet()).add(implementationWrapper);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.rpcnis.core.trackers;

import com.rpcnis.base.RpcOptions;
import com.rpcnis.core.models.InvocationDescriptor;
import com.rpcnis.core.proxy.PendingInvocation;

import java.util.Timer;
import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;

public class OutgoingInvocationTracker {

private final Timer timer;
private final RpcOptions options;

// Map of pending invocations, keyed by invocation id
private final ConcurrentHashMap<UUID, PendingInvocation<?>> pendingInvocations = new ConcurrentHashMap<>();

public OutgoingInvocationTracker(RpcOptions options, Timer timer) {
this.options = options;
this.timer = timer;
}

public <T> T invokeRemoteMethod(InvocationDescriptor invocationDescriptor) throws Throwable {
// create a pending invocation
PendingInvocation<T> pendingInvocation = new PendingInvocation<>(options.getTimeoutSeconds(), this.timer, invocationDescriptor, () -> {
// remove the pending invocation from the map
pendingInvocations.remove(invocationDescriptor.getUniqueInvocationId());
});

// add the pending invocation to the map
pendingInvocations.put(invocationDescriptor.getUniqueInvocationId(), pendingInvocation);

// TODO: transmit

// wait for response or timeout
try {
return pendingInvocation.waitForResponse();
} catch (CompletionException e) {
throw e.getCause();
}
}

public void completeInvocation(InvocationDescriptor invocationDescriptor, Object value) {
// do we have a pending invocation for this invocation id?
PendingInvocation<?> pendingInvocation = pendingInvocations.get(invocationDescriptor.getUniqueInvocationId());
if (pendingInvocation == null) {
throw new IllegalStateException("No pending invocation found for invocation id " + invocationDescriptor.getUniqueInvocationId());
}

pendingInvocation.complete(value);

// remove the pending invocation from the map
pendingInvocations.remove(invocationDescriptor.getUniqueInvocationId());
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.rpcnis.core.utils;

import java.lang.reflect.Array;
import java.lang.reflect.Method;
import java.util.Map;

Expand Down Expand Up @@ -43,7 +44,7 @@ public static Object[] overflowArguments(Method method, Object[] allArguments) {
int length = allArguments.length - method.getParameterCount() + 1;
Class<?> componentType = PRIMITIVE_TO_BOXED.getOrDefault(lastParameterType.getComponentType(), lastParameterType.getComponentType());

Object[] array = (Object[]) java.lang.reflect.Array.newInstance(componentType, length);
Object[] array = (Object[]) Array.newInstance(componentType, length);

// fill the array with the remaining arguments
if (allArguments.length - (method.getParameterCount() - 1) >= 0)
Expand Down
Loading

0 comments on commit 9af3744

Please sign in to comment.