diff --git a/meteor-common/src/main/java/dev/pixelib/meteor/base/interfaces/SubscriptionHandler.java b/meteor-common/src/main/java/dev/pixelib/meteor/base/interfaces/SubscriptionHandler.java index d6fe900..5240d3b 100644 --- a/meteor-common/src/main/java/dev/pixelib/meteor/base/interfaces/SubscriptionHandler.java +++ b/meteor-common/src/main/java/dev/pixelib/meteor/base/interfaces/SubscriptionHandler.java @@ -1,5 +1,6 @@ package dev.pixelib.meteor.base.interfaces; +@FunctionalInterface public interface SubscriptionHandler { boolean onPacket(byte[] packet) throws Exception; diff --git a/meteor-core/src/main/java/dev/pixelib/meteor/core/proxy/PendingInvocation.java b/meteor-core/src/main/java/dev/pixelib/meteor/core/proxy/PendingInvocation.java index 63a7e7e..2c64ec9 100644 --- a/meteor-core/src/main/java/dev/pixelib/meteor/core/proxy/PendingInvocation.java +++ b/meteor-core/src/main/java/dev/pixelib/meteor/core/proxy/PendingInvocation.java @@ -86,6 +86,7 @@ public void run() { } isTimedOut.set(true); + this.completable.completeExceptionally( new InvocationTimedOutException(invocationDescriptor.getMethodName(), invocationDescriptor.getNamespace(), timeoutSeconds) ); diff --git a/meteor-core/src/main/java/dev/pixelib/meteor/core/trackers/OutgoingInvocationTracker.java b/meteor-core/src/main/java/dev/pixelib/meteor/core/trackers/OutgoingInvocationTracker.java index 8239969..9f95c11 100644 --- a/meteor-core/src/main/java/dev/pixelib/meteor/core/trackers/OutgoingInvocationTracker.java +++ b/meteor-core/src/main/java/dev/pixelib/meteor/core/trackers/OutgoingInvocationTracker.java @@ -54,7 +54,8 @@ public void completeInvocation(InvocationResponse invocationResponse) { // do we have a pending invocation for this invocation id? PendingInvocation pendingInvocation = pendingInvocations.get(invocationResponse.getInvocationId()); if (pendingInvocation == null) { - throw new IllegalStateException("No pending invocation found for invocation id " + invocationResponse.getInvocationId()); + throw new IllegalStateException("No pending invocation found for invocation id " + invocationResponse.getInvocationId() + ". Data: " + invocationResponse.getResult()); + //return; } pendingInvocation.complete(invocationResponse.getResult()); diff --git a/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisPacketListener.java b/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisPacketListener.java index b5fa3d2..e45e437 100644 --- a/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisPacketListener.java +++ b/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisPacketListener.java @@ -10,6 +10,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -18,10 +19,10 @@ public class RedisPacketListener extends JedisPubSub { private final Logger logger; private final ExecutorService jedisThreadPool = Executors.newCachedThreadPool(); - private final Map> messageBrokers = new ConcurrentHashMap<>(); + private final Map> messageBrokers = new ConcurrentHashMap<>(); private final Collection customSubscribedChannels = ConcurrentHashMap.newKeySet(); - public RedisPacketListener(SubscriptionHandler messageBroker, String startChannel, Logger logger) { + public RedisPacketListener(StringMessageBroker messageBroker, String startChannel, Logger logger) { this.logger = logger; registerBroker(startChannel, messageBroker); customSubscribedChannels.add(startChannel); @@ -31,14 +32,14 @@ public RedisPacketListener(SubscriptionHandler messageBroker, String startChanne public void onMessage(String channel, String message) { messageBrokers.get(channel).forEach(subscriptionHandler -> { try { - subscriptionHandler.onPacket(Base64.getDecoder().decode(message)); + subscriptionHandler.onRedisMessage(message); } catch (Exception exception) { logger.log(Level.SEVERE, "Error while handling packet", exception); } }); } - public void subscribe(String channel, SubscriptionHandler onReceive) { + public void subscribe(String channel, StringMessageBroker onReceive) { registerBroker(channel, onReceive); if (customSubscribedChannels.add(channel)) { @@ -55,7 +56,7 @@ public Collection getCustomSubscribedChannels() { return customSubscribedChannels; } - private void registerBroker(String channel, SubscriptionHandler onReceive) { + private void registerBroker(String channel, StringMessageBroker onReceive) { messageBrokers.computeIfAbsent(channel, key -> ConcurrentHashMap.newKeySet()).add(onReceive); } } diff --git a/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisSubscriptionThread.java b/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisSubscriptionThread.java index c473324..4859cfb 100644 --- a/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisSubscriptionThread.java +++ b/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisSubscriptionThread.java @@ -9,12 +9,13 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; public class RedisSubscriptionThread { - private final SubscriptionHandler messageBroker; + private final StringMessageBroker messageBroker; private final Logger logger; private final String defaultChannel; private final JedisPool jedisPool; @@ -28,7 +29,7 @@ public class RedisSubscriptionThread { return thread; }); - public RedisSubscriptionThread(SubscriptionHandler messageBroker, Logger logger, String channel, JedisPool jedisPool) { + public RedisSubscriptionThread(StringMessageBroker messageBroker, Logger logger, String channel, JedisPool jedisPool) { this.messageBroker = messageBroker; this.logger = logger; this.defaultChannel = channel; @@ -69,11 +70,6 @@ public CompletableFuture start() { } - public void subscribe(String channel, SubscriptionHandler onReceive) { - jedisPacketListener.subscribe(channel, onReceive); - - } - public void stop() { if (isStopping) return; isStopping = true; @@ -81,6 +77,11 @@ public void stop() { listenerThread.shutdownNow(); } + public void subscribe(String channel, StringMessageBroker onReceive) { + jedisPacketListener.subscribe(channel, onReceive); + + } + private CompletableFuture isSubscribed() { return CompletableFuture.supplyAsync(() -> { final int maxAttempts = 5; diff --git a/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisTransport.java b/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisTransport.java index fba6d5e..2a28289 100644 --- a/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisTransport.java +++ b/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/RedisTransport.java @@ -9,6 +9,8 @@ import java.io.IOException; import java.util.Base64; import java.util.Locale; +import java.util.UUID; +import java.util.function.Consumer; import java.util.logging.Logger; public class RedisTransport implements RpcTransport { @@ -18,6 +20,8 @@ public class RedisTransport implements RpcTransport { private final JedisPool jedisPool; private final String topic; private RedisSubscriptionThread redisSubscriptionThread; + private final UUID transportId = UUID.randomUUID(); + private boolean ignoreSelf = true; public RedisTransport(JedisPool jedisPool, String topic) { this.jedisPool = jedisPool; @@ -34,6 +38,11 @@ public RedisTransport(String host, int port, String topic) { this.topic = topic; } + public RedisTransport withIgnoreSelf(boolean ignoreSelf) { + this.ignoreSelf = ignoreSelf; + return this; + } + @Override public void send(Direction direction, byte[] bytes) { if (jedisPool.isClosed()) { @@ -41,7 +50,10 @@ public void send(Direction direction, byte[] bytes) { } try (Jedis connection = jedisPool.getResource()) { - connection.publish(getTopicName(direction), Base64.getEncoder().encodeToString(bytes)); + connection.publish( + getTopicName(direction), + transportId + Base64.getEncoder().encodeToString(bytes) + ); } } @@ -51,11 +63,31 @@ public void subscribe(Direction direction, SubscriptionHandler onReceive) { throw new IllegalStateException("Jedis pool is closed"); } + StringMessageBroker wrappedHandler = (message) -> { + byte[] bytes = message.getBytes(); + // only split after UUID, which always has a length of 36 + byte[] uuid = new byte[36]; + System.arraycopy(bytes, 0, uuid, 0, 36); + + if (ignoreSelf && transportId.toString().equals(new String(uuid))) { + return false; + } + + byte[] data = new byte[bytes.length - 36]; + System.arraycopy(bytes, 36, data, 0, data.length); + + try { + return onReceive.onPacket(Base64.getDecoder().decode(data)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + if (redisSubscriptionThread == null) { - redisSubscriptionThread = new RedisSubscriptionThread(onReceive, logger, getTopicName(direction), jedisPool); + redisSubscriptionThread = new RedisSubscriptionThread(wrappedHandler, logger, getTopicName(direction), jedisPool); redisSubscriptionThread.start().join(); } else { - redisSubscriptionThread.subscribe(getTopicName(direction), onReceive); + redisSubscriptionThread.subscribe(getTopicName(direction), wrappedHandler); } } diff --git a/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/StringMessageBroker.java b/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/StringMessageBroker.java new file mode 100644 index 0000000..1e8922c --- /dev/null +++ b/meteor-jedis/src/main/java/dev/pixelib/meteor/transport/redis/StringMessageBroker.java @@ -0,0 +1,8 @@ +package dev.pixelib.meteor.transport.redis; + +@FunctionalInterface +public interface StringMessageBroker { + + boolean onRedisMessage(String message) throws Exception; + +} diff --git a/meteor-jedis/src/test/java/dev/pixelib/meteor/transport/redis/RedisPacketListenerTest.java b/meteor-jedis/src/test/java/dev/pixelib/meteor/transport/redis/RedisPacketListenerTest.java index 0543cbe..0adb592 100644 --- a/meteor-jedis/src/test/java/dev/pixelib/meteor/transport/redis/RedisPacketListenerTest.java +++ b/meteor-jedis/src/test/java/dev/pixelib/meteor/transport/redis/RedisPacketListenerTest.java @@ -21,20 +21,19 @@ class RedisPacketListenerTest { @Mock - SubscriptionHandler subscriptionHandler; + StringMessageBroker subscriptionHandler; @Test void onMessage_withValidChannel() throws Exception { String topic = "test"; - String message = "message"; - byte[] expected = Base64.getDecoder().decode(message); + String expected = "message"; RedisPacketListener redisPacketListener = new RedisPacketListener(subscriptionHandler, topic, Logger.getAnonymousLogger()); - redisPacketListener.onMessage(topic, message); - verify(subscriptionHandler, times(1)).onPacket(expected); - verify(subscriptionHandler).onPacket(argThat(argument -> { - assertArrayEquals(expected,argument); + redisPacketListener.onMessage(topic, expected); + verify(subscriptionHandler, times(1)).onRedisMessage(expected); + verify(subscriptionHandler).onRedisMessage(argThat(argument -> { + assertEquals(expected,argument); return true; })); } @@ -42,24 +41,22 @@ void onMessage_withValidChannel() throws Exception { @Test void onMessage_throwException() throws Exception { String topic = "test"; - String message = "message"; - byte[] expected = Base64.getDecoder().decode(message); - + String expected = "message"; - SubscriptionHandler handler = new SubscriptionHandler() { + StringMessageBroker handler = new StringMessageBroker() { @Override - public boolean onPacket(byte[] packet) throws Exception { + public boolean onRedisMessage(String message) throws Exception { throw new NullPointerException(); } }; - SubscriptionHandler handlerSub = spy(handler); + StringMessageBroker handlerSub = spy(handler); RedisPacketListener redisPacketListener = new RedisPacketListener(handlerSub, topic, Logger.getAnonymousLogger()); - redisPacketListener.onMessage(topic, message); - verify(handlerSub, times(1)).onPacket(expected); - verify(handlerSub).onPacket(argThat(argument -> { - assertArrayEquals(expected,argument); + redisPacketListener.onMessage(topic, expected); + verify(handlerSub, times(1)).onRedisMessage(expected); + verify(handlerSub).onRedisMessage(argThat(argument -> { + assertEquals(expected,argument); return true; })); }