From 9af37447a294120587f8b7ee4cde320f12de6646 Mon Sep 17 00:00:00 2001 From: Mats Date: Thu, 24 Aug 2023 21:31:02 +0200 Subject: [PATCH] Refactor trackers --- .../src/main/java/com/rpcnis/core/Rpcnis.java | 94 +++++-------------- .../rpcnis/core/proxy/PendingInvocation.java | 8 +- .../rpcnis/core/proxy/ProxyInvocHandler.java | 10 +- .../trackers/IncomingInvocationTracker.java | 31 ++++++ .../trackers/OutgoingInvocationTracker.java | 58 ++++++++++++ .../core/utils/ArgumentTransformer.java | 3 +- .../invocations/PendingInvocationTest.java | 19 ++-- 7 files changed, 135 insertions(+), 88 deletions(-) create mode 100644 rpcnis-core/src/main/java/com/rpcnis/core/trackers/IncomingInvocationTracker.java create mode 100644 rpcnis-core/src/main/java/com/rpcnis/core/trackers/OutgoingInvocationTracker.java diff --git a/rpcnis-core/src/main/java/com/rpcnis/core/Rpcnis.java b/rpcnis-core/src/main/java/com/rpcnis/core/Rpcnis.java index b041474..48192a7 100644 --- a/rpcnis-core/src/main/java/com/rpcnis/core/Rpcnis.java +++ b/rpcnis-core/src/main/java/com/rpcnis/core/Rpcnis.java @@ -5,17 +5,15 @@ import com.rpcnis.base.RpcTransport; import com.rpcnis.base.defaults.GsonSerializer; import com.rpcnis.core.executor.ImplementationWrapper; -import com.rpcnis.core.models.InvocationDescriptor; -import com.rpcnis.core.proxy.PendingInvocation; import com.rpcnis.core.proxy.ProxyInvocHandler; import com.rpcnis.core.proxy.RpcnisMock; +import com.rpcnis.core.trackers.IncomingInvocationTracker; +import com.rpcnis.core.trackers.OutgoingInvocationTracker; import java.lang.reflect.Proxy; import java.util.Collection; import java.util.Locale; import java.util.Timer; -import java.util.UUID; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; public class Rpcnis { @@ -27,34 +25,33 @@ public class Rpcnis { // Timer for scheduling timeouts and retries private final Timer timer = new Timer(); - // Map of pending invocations, keyed by invocation id - private final ConcurrentHashMap> pendingInvocations = new ConcurrentHashMap<>(); - - // Map invocation handlers by type; used for dispatching incoming invocations - // Handlers without a namespace (which is nullable) are also stored here - private final ConcurrentHashMap, Collection> invocationHandlers = new ConcurrentHashMap<>(); + private final OutgoingInvocationTracker outgoingInvocationTracker; + private final IncomingInvocationTracker incomingInvocationTracker; /** - * @param options A preconfigured RpcOptions object. + * @param options A preconfigured RpcOptions object. * @param serializer The serializer to use for serializing and deserializing objects. - * @param transport The transport to use for sending and receiving data. + * @param transport The transport to use for sending and receiving data. */ public Rpcnis(RpcTransport transport, RpcOptions options, RpcSerializer serializer) { this.transport = transport; this.options = options; this.serializer = serializer; + + outgoingInvocationTracker = new OutgoingInvocationTracker(options, timer); + incomingInvocationTracker = new IncomingInvocationTracker(); } /** * @param serializer The serializer to use for serializing and deserializing objects. - * @param transport The transport to use for sending and receiving data. + * @param transport The transport to use for sending and receiving data. */ public Rpcnis(RpcTransport transport, RpcSerializer serializer) { this(transport, new RpcOptions(), serializer); } /** - * @param options A preconfigured RpcOptions object. + * @param options A preconfigured RpcOptions object. * @param transport The transport to use for sending and receiving data. */ public Rpcnis(RpcTransport transport, RpcOptions options) { @@ -63,6 +60,7 @@ public Rpcnis(RpcTransport transport, RpcOptions options) { /** * Use default GsonSerializer and options. + * * @param transport The transport to use for sending and receiving data. */ public Rpcnis(RpcTransport transport) { @@ -78,19 +76,21 @@ public RpcOptions getOptions() { /** * Register a procedure without a namespace. + * * @param procedure The interface to register as a procedure. - * @param The type of the interface. + * @param The type of the interface. * @return A proxy object that implements the given interface. */ public T registerProcedure(Class procedure) { - return registerProcedure(procedure,procedure.getSimpleName().toLowerCase(Locale.ROOT)); + return registerProcedure(procedure, procedure.getSimpleName().toLowerCase(Locale.ROOT)); } /** * Register a procedure with a namespace. Invocations will only be mapped on implementations with the same namespace. + * * @param procedure The interface to register as a procedure. - * @param name The name of the procedure. - * @param The type of the interface. + * @param name The name of the procedure. + * @param The type of the interface. * @return A proxy object that implements the given interface. */ public T registerProcedure(Class procedure, String name) { @@ -98,7 +98,7 @@ public T registerProcedure(Class procedure, String name) { throw new IllegalArgumentException("Procedure must be an interface"); } - return procedure.cast(Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{procedure, RpcnisMock.class}, new ProxyInvocHandler(this,name))); + return procedure.cast(Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{procedure, RpcnisMock.class}, new ProxyInvocHandler(outgoingInvocationTracker, name))); } /** @@ -120,70 +120,22 @@ public static boolean isRpcnis(Object target) { /** * Received remote procedure calls will be dispatched to implementations registered with this method. * The implementation will be registered under all interfaces implemented by the object, and under the given namespace. + * * @param implementation The object to register as an implementation. - * @param namespace The namespace to register the implementation under. + * @param namespace The namespace to register the implementation under. */ public void registerImplementation(Object implementation, String namespace) { - // get the interfaces implemented by the implementation - Class[] interfaces = implementation.getClass().getInterfaces(); - - // there must be at least one interface - if (interfaces.length == 0) { - throw new IllegalArgumentException("Implementation must implement at least one interface/procedure"); - } - - // register this interface as all of the implemented interfaces - ImplementationWrapper implementationWrapper = new ImplementationWrapper(implementation, namespace); - - for (Class anInterface : interfaces) { - invocationHandlers.computeIfAbsent(anInterface, k -> ConcurrentHashMap.newKeySet()).add(implementationWrapper); - } + incomingInvocationTracker.registerImplementation(implementation, namespace); } /** * Received remote procedure calls will be dispatched to implementations registered with this method. * The implementation will be registered under all interfaces implemented by the object, and must be called without a namespace. + * * @param implementation The object to register as an implementation. */ public void registerImplementation(Object implementation) { registerImplementation(implementation, null); } - /* === INTERNAL METHODS === */ - public T invokeRemoteMethod(InvocationDescriptor invocationDescriptor) throws Throwable { - // create a pending invocation - PendingInvocation pendingInvocation = new PendingInvocation<>(this, invocationDescriptor, () -> { - // remove the pending invocation from the map - pendingInvocations.remove(invocationDescriptor.getUniqueInvocationId()); - }); - - // add the pending invocation to the map - pendingInvocations.put(invocationDescriptor.getUniqueInvocationId(), pendingInvocation); - - // TODO: transmit - - // wait for response or timeout - try { - return pendingInvocation.waitForResponse(); - } catch (CompletionException e) { - throw e.getCause(); - } - } - - public void completeInvocation(InvocationDescriptor invocationDescriptor, Object value) { - // do we have a pending invocation for this invocation id? - PendingInvocation pendingInvocation = pendingInvocations.get(invocationDescriptor.getUniqueInvocationId()); - if (pendingInvocation == null) { - throw new IllegalStateException("No pending invocation found for invocation id " + invocationDescriptor.getUniqueInvocationId()); - } - - pendingInvocation.complete(value); - - // remove the pending invocation from the map - pendingInvocations.remove(invocationDescriptor.getUniqueInvocationId()); - } - - public Timer getTimer() { - return timer; - } } diff --git a/rpcnis-core/src/main/java/com/rpcnis/core/proxy/PendingInvocation.java b/rpcnis-core/src/main/java/com/rpcnis/core/proxy/PendingInvocation.java index 776d326..15152a6 100644 --- a/rpcnis-core/src/main/java/com/rpcnis/core/proxy/PendingInvocation.java +++ b/rpcnis-core/src/main/java/com/rpcnis/core/proxy/PendingInvocation.java @@ -1,9 +1,11 @@ package com.rpcnis.core.proxy; +import com.rpcnis.base.RpcOptions; import com.rpcnis.base.errors.InvocationTimedOutException; import com.rpcnis.core.Rpcnis; import com.rpcnis.core.models.InvocationDescriptor; +import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -26,14 +28,14 @@ public class PendingInvocation extends TimerTask { private final AtomicBoolean isComplete = new AtomicBoolean(false); private final AtomicBoolean isTimedOut = new AtomicBoolean(false); - public PendingInvocation(Rpcnis rpcnis, InvocationDescriptor invocationDescriptor, Runnable timeoutCallback) { + public PendingInvocation(int timeoutSeconds, Timer timer, InvocationDescriptor invocationDescriptor, Runnable timeoutCallback) { this.invocationDescriptor = invocationDescriptor; this.timeoutCallback = timeoutCallback; - this.timeoutSeconds = rpcnis.getOptions().getTimeoutSeconds(); + this.timeoutSeconds = timeoutSeconds; completable = new CompletableFuture<>(); // schedule timeout, timeoutSeconds is in seconds, Timer.schedule() takes milliseconds - rpcnis.getTimer().schedule(this, TimeUnit.SECONDS.toMillis(timeoutSeconds)); + timer.schedule(this, TimeUnit.SECONDS.toMillis(timeoutSeconds)); } /** diff --git a/rpcnis-core/src/main/java/com/rpcnis/core/proxy/ProxyInvocHandler.java b/rpcnis-core/src/main/java/com/rpcnis/core/proxy/ProxyInvocHandler.java index 03bacf4..94c09a8 100644 --- a/rpcnis-core/src/main/java/com/rpcnis/core/proxy/ProxyInvocHandler.java +++ b/rpcnis-core/src/main/java/com/rpcnis/core/proxy/ProxyInvocHandler.java @@ -1,18 +1,18 @@ package com.rpcnis.core.proxy; -import com.rpcnis.core.Rpcnis; import com.rpcnis.core.models.InvocationDescriptor; +import com.rpcnis.core.trackers.OutgoingInvocationTracker; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; public class ProxyInvocHandler implements InvocationHandler { - private final Rpcnis rpcnis; + private final OutgoingInvocationTracker localInvocationTracker; private final String namespace; - public ProxyInvocHandler(Rpcnis rpcnis, String namespace) { - this.rpcnis = rpcnis; + public ProxyInvocHandler(OutgoingInvocationTracker outgoingInvocationTracker, String namespace) { + this.localInvocationTracker = outgoingInvocationTracker; this.namespace = namespace; } @@ -26,7 +26,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl InvocationDescriptor invocationDescriptor = new InvocationDescriptor(namespace, method.getName(), args, argTypes, method.getReturnType()); // wait for response or timeout - return rpcnis.invokeRemoteMethod(invocationDescriptor); + return localInvocationTracker.invokeRemoteMethod(invocationDescriptor); } } diff --git a/rpcnis-core/src/main/java/com/rpcnis/core/trackers/IncomingInvocationTracker.java b/rpcnis-core/src/main/java/com/rpcnis/core/trackers/IncomingInvocationTracker.java new file mode 100644 index 0000000..6dfc6a9 --- /dev/null +++ b/rpcnis-core/src/main/java/com/rpcnis/core/trackers/IncomingInvocationTracker.java @@ -0,0 +1,31 @@ +package com.rpcnis.core.trackers; + +import com.rpcnis.core.executor.ImplementationWrapper; + +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; + +public class IncomingInvocationTracker { + + // Map invocation handlers by type; used for dispatching incoming invocations + // Handlers without a namespace (which is nullable) are also stored here + private final ConcurrentHashMap, Collection> implementations = new ConcurrentHashMap<>(); + + public void registerImplementation(Object implementation, String namespace) { + // get the interfaces implemented by the implementation + Class[] interfaces = implementation.getClass().getInterfaces(); + + // there must be at least one interface + if (interfaces.length == 0) { + throw new IllegalArgumentException("Implementation must implement at least one interface/procedure"); + } + + // register this interface as all the implemented interfaces + ImplementationWrapper implementationWrapper = new ImplementationWrapper(implementation, namespace); + + for (Class anInterface : interfaces) { + implementations.computeIfAbsent(anInterface, k -> ConcurrentHashMap.newKeySet()).add(implementationWrapper); + } + } + +} diff --git a/rpcnis-core/src/main/java/com/rpcnis/core/trackers/OutgoingInvocationTracker.java b/rpcnis-core/src/main/java/com/rpcnis/core/trackers/OutgoingInvocationTracker.java new file mode 100644 index 0000000..c4c3760 --- /dev/null +++ b/rpcnis-core/src/main/java/com/rpcnis/core/trackers/OutgoingInvocationTracker.java @@ -0,0 +1,58 @@ +package com.rpcnis.core.trackers; + +import com.rpcnis.base.RpcOptions; +import com.rpcnis.core.models.InvocationDescriptor; +import com.rpcnis.core.proxy.PendingInvocation; + +import java.util.Timer; +import java.util.UUID; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; + +public class OutgoingInvocationTracker { + + private final Timer timer; + private final RpcOptions options; + + // Map of pending invocations, keyed by invocation id + private final ConcurrentHashMap> pendingInvocations = new ConcurrentHashMap<>(); + + public OutgoingInvocationTracker(RpcOptions options, Timer timer) { + this.options = options; + this.timer = timer; + } + + public T invokeRemoteMethod(InvocationDescriptor invocationDescriptor) throws Throwable { + // create a pending invocation + PendingInvocation pendingInvocation = new PendingInvocation<>(options.getTimeoutSeconds(), this.timer, invocationDescriptor, () -> { + // remove the pending invocation from the map + pendingInvocations.remove(invocationDescriptor.getUniqueInvocationId()); + }); + + // add the pending invocation to the map + pendingInvocations.put(invocationDescriptor.getUniqueInvocationId(), pendingInvocation); + + // TODO: transmit + + // wait for response or timeout + try { + return pendingInvocation.waitForResponse(); + } catch (CompletionException e) { + throw e.getCause(); + } + } + + public void completeInvocation(InvocationDescriptor invocationDescriptor, Object value) { + // do we have a pending invocation for this invocation id? + PendingInvocation pendingInvocation = pendingInvocations.get(invocationDescriptor.getUniqueInvocationId()); + if (pendingInvocation == null) { + throw new IllegalStateException("No pending invocation found for invocation id " + invocationDescriptor.getUniqueInvocationId()); + } + + pendingInvocation.complete(value); + + // remove the pending invocation from the map + pendingInvocations.remove(invocationDescriptor.getUniqueInvocationId()); + } + +} diff --git a/rpcnis-core/src/main/java/com/rpcnis/core/utils/ArgumentTransformer.java b/rpcnis-core/src/main/java/com/rpcnis/core/utils/ArgumentTransformer.java index 13a803d..82342c0 100644 --- a/rpcnis-core/src/main/java/com/rpcnis/core/utils/ArgumentTransformer.java +++ b/rpcnis-core/src/main/java/com/rpcnis/core/utils/ArgumentTransformer.java @@ -1,5 +1,6 @@ package com.rpcnis.core.utils; +import java.lang.reflect.Array; import java.lang.reflect.Method; import java.util.Map; @@ -43,7 +44,7 @@ public static Object[] overflowArguments(Method method, Object[] allArguments) { int length = allArguments.length - method.getParameterCount() + 1; Class componentType = PRIMITIVE_TO_BOXED.getOrDefault(lastParameterType.getComponentType(), lastParameterType.getComponentType()); - Object[] array = (Object[]) java.lang.reflect.Array.newInstance(componentType, length); + Object[] array = (Object[]) Array.newInstance(componentType, length); // fill the array with the remaining arguments if (allArguments.length - (method.getParameterCount() - 1) >= 0) diff --git a/rpcnis-core/src/test/java/com/rpcnis/core/invocations/PendingInvocationTest.java b/rpcnis-core/src/test/java/com/rpcnis/core/invocations/PendingInvocationTest.java index eda706a..4168c9f 100644 --- a/rpcnis-core/src/test/java/com/rpcnis/core/invocations/PendingInvocationTest.java +++ b/rpcnis-core/src/test/java/com/rpcnis/core/invocations/PendingInvocationTest.java @@ -1,11 +1,14 @@ package com.rpcnis.core.invocations; +import com.rpcnis.base.RpcOptions; import com.rpcnis.base.defaults.LoopbackTransport; import com.rpcnis.base.errors.InvocationTimedOutException; import com.rpcnis.core.Rpcnis; import com.rpcnis.core.models.InvocationDescriptor; +import com.rpcnis.core.trackers.OutgoingInvocationTracker; import org.junit.jupiter.api.*; +import java.util.Timer; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -30,7 +33,7 @@ public static void tearDown() { @Test public void testPendingInvocation() throws Throwable { // base instance - Rpcnis rpcnis = new Rpcnis(new LoopbackTransport()); + OutgoingInvocationTracker outgoingInvocationTracker = new OutgoingInvocationTracker(new RpcOptions(), new Timer()); InvocationDescriptor invocationDescriptor = new InvocationDescriptor("namespace", "methodName", new Object[]{}, new Class[]{}, String.class); @@ -43,24 +46,24 @@ public void testPendingInvocation() throws Throwable { } catch (InterruptedException e) { e.printStackTrace(); } - rpcnis.completeInvocation(invocationDescriptor, testString); + outgoingInvocationTracker.completeInvocation(invocationDescriptor, testString); }); - String response = rpcnis.invokeRemoteMethod(invocationDescriptor); + String response = outgoingInvocationTracker.invokeRemoteMethod(invocationDescriptor); assert response.equals(testString); } @Test - @Timeout(3) // seconds + @Timeout(2) // seconds public void testTimeout() { - // base instance - Rpcnis rpcnis = new Rpcnis(new LoopbackTransport()); - rpcnis.getOptions().setTimeoutSeconds(2); // or else it will take 10 seconds to run + RpcOptions options = new RpcOptions(); + options.setTimeoutSeconds(1); + OutgoingInvocationTracker outgoingInvocationTracker = new OutgoingInvocationTracker(options, new Timer()); InvocationDescriptor invocationDescriptor = new InvocationDescriptor("namespace", "methodName", new Object[]{}, new Class[]{}, String.class); Assertions.assertThrowsExactly(InvocationTimedOutException.class, () -> { - rpcnis.invokeRemoteMethod(invocationDescriptor); + outgoingInvocationTracker.invokeRemoteMethod(invocationDescriptor); });