From f55decdfad8ace8cd7f3f03a58e012a3f1ecd4ff Mon Sep 17 00:00:00 2001 From: Duckelekuuk Date: Mon, 28 Aug 2023 01:02:40 +0200 Subject: [PATCH] Fixed jedis connecting and added more jedis tests --- .../core/transport/TransportHandler.java | 6 +- meteor-jedis/pom.xml | 6 + .../redis/RedisSubscriptionThread.java | 31 +++- .../transport/redis/RedisTransport.java | 8 + .../redis/RedisPacketListenerTest.java | 70 +++++++++ .../transport/redis/RedisTransportTest.java | 143 ++++++++++++++++-- pom.xml | 21 +++ 7 files changed, 260 insertions(+), 25 deletions(-) create mode 100644 meteor-jedis/src/test/java/com/meteormsg/transport/redis/RedisPacketListenerTest.java diff --git a/meteor-core/src/main/java/com/meteormsg/core/transport/TransportHandler.java b/meteor-core/src/main/java/com/meteormsg/core/transport/TransportHandler.java index 3af7315..e453c70 100644 --- a/meteor-core/src/main/java/com/meteormsg/core/transport/TransportHandler.java +++ b/meteor-core/src/main/java/com/meteormsg/core/transport/TransportHandler.java @@ -11,9 +11,13 @@ import java.io.Closeable; import java.io.IOException; +import java.sql.Time; import java.util.Collection; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public class TransportHandler implements Closeable { @@ -41,7 +45,7 @@ public TransportHandler( this.executorPool = Executors.newFixedThreadPool(threadPoolSize, r -> new Thread(r, "meteor-executor-thread")); transport.subscribe(Direction.METHOD_PROXY, this::handleInvocationResponse); - transport.subscribe(Direction.IMPLEMENTATION, this::handleInvocationRequest); + transport.subscribe(Direction.IMPLEMENTATION, TransportHandler.this::handleInvocationRequest); } private boolean handleInvocationResponse(byte[] bytes) throws ClassNotFoundException { diff --git a/meteor-jedis/pom.xml b/meteor-jedis/pom.xml index b7ee6da..9cd7425 100644 --- a/meteor-jedis/pom.xml +++ b/meteor-jedis/pom.xml @@ -50,5 +50,11 @@ 1.0.10 test + + + org.mockito + mockito-junit-jupiter + test + \ No newline at end of file diff --git a/meteor-jedis/src/main/java/com/meteormsg/transport/redis/RedisSubscriptionThread.java b/meteor-jedis/src/main/java/com/meteormsg/transport/redis/RedisSubscriptionThread.java index 4ca4a79..2029246 100644 --- a/meteor-jedis/src/main/java/com/meteormsg/transport/redis/RedisSubscriptionThread.java +++ b/meteor-jedis/src/main/java/com/meteormsg/transport/redis/RedisSubscriptionThread.java @@ -21,8 +21,6 @@ public class RedisSubscriptionThread { private RedisPacketListener jedisPacketListener; - private Jedis currentConnection; - private final ExecutorService listenerThread = Executors.newSingleThreadExecutor(r -> new Thread(r, "meteor-redis-listener-thread")); public RedisSubscriptionThread(SubscriptionHandler messageBroker, Logger logger, String channel, JedisPool jedisPool) { @@ -33,18 +31,13 @@ public RedisSubscriptionThread(SubscriptionHandler messageBroker, Logger logger, } public CompletableFuture start() { - CompletableFuture completableFuture = new CompletableFuture<>(); - jedisPacketListener = new RedisPacketListener(messageBroker, defaultChannel, logger); Runnable runnable = () -> { while (!Thread.currentThread().isInterrupted()) { try (Jedis connection = jedisPool.getResource()) { - this.currentConnection = connection; - connection.ping(); logger.info("Redis connected!"); - completableFuture.complete(true); //Start blocking connection.subscribe(jedisPacketListener, jedisPacketListener.getCustomSubscribedChannels().toArray(new String[]{})); @@ -67,7 +60,8 @@ public CompletableFuture start() { listenerThread.execute(runnable); - return completableFuture; + return isSubscribed(); + } public void subscribe(String channel, SubscriptionHandler onReceive) { @@ -81,4 +75,25 @@ public void stop() { jedisPacketListener.stop(); listenerThread.shutdownNow(); } + + private CompletableFuture isSubscribed() { + return CompletableFuture.supplyAsync(() -> { + final int maxAttempts = 5; + for (int i = 0; i < maxAttempts; i++) { + if (jedisPacketListener.isSubscribed()) { + return true; + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread was interrupted while waiting for subscription", e); + } + } + + // If it fails to subscribe within 5 attempts (5 seconds), throw an exception + throw new IllegalStateException("Failed to subscribe within the given timeframe"); + }); + } } diff --git a/meteor-jedis/src/main/java/com/meteormsg/transport/redis/RedisTransport.java b/meteor-jedis/src/main/java/com/meteormsg/transport/redis/RedisTransport.java index c3ce93f..94bd9d3 100644 --- a/meteor-jedis/src/main/java/com/meteormsg/transport/redis/RedisTransport.java +++ b/meteor-jedis/src/main/java/com/meteormsg/transport/redis/RedisTransport.java @@ -36,6 +36,10 @@ public RedisTransport(String host, int port, String topic) { @Override public void send(Direction direction, byte[] bytes) { + if (jedisPool.isClosed()) { + throw new IllegalStateException("Jedis pool is closed"); + } + try (Jedis connection = jedisPool.getResource()) { connection.publish(getTopicName(direction), Base64.getEncoder().encodeToString(bytes)); } @@ -43,6 +47,10 @@ public void send(Direction direction, byte[] bytes) { @Override public void subscribe(Direction direction, SubscriptionHandler onReceive) { + if (jedisPool.isClosed()) { + throw new IllegalStateException("Jedis pool is closed"); + } + if (redisSubscriptionThread == null) { redisSubscriptionThread = new RedisSubscriptionThread(onReceive, logger, getTopicName(direction), jedisPool); redisSubscriptionThread.start().join(); diff --git a/meteor-jedis/src/test/java/com/meteormsg/transport/redis/RedisPacketListenerTest.java b/meteor-jedis/src/test/java/com/meteormsg/transport/redis/RedisPacketListenerTest.java new file mode 100644 index 0000000..000f788 --- /dev/null +++ b/meteor-jedis/src/test/java/com/meteormsg/transport/redis/RedisPacketListenerTest.java @@ -0,0 +1,70 @@ +package com.meteormsg.transport.redis; + +import com.meteormsg.base.interfaces.SubscriptionHandler; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Base64; +import java.util.Collection; +import java.util.logging.Logger; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class RedisPacketListenerTest { + + @Mock + SubscriptionHandler subscriptionHandler; + + @Test + void onMessageWithValidChannel_ThenSuccess() throws Exception { + String topic = "test"; + String message = "message"; + byte[] expected = Base64.getDecoder().decode(message); + + RedisPacketListener redisPacketListener = new RedisPacketListener(subscriptionHandler, topic, Logger.getAnonymousLogger()); + + redisPacketListener.onMessage(topic, message); + verify(subscriptionHandler, times(1)).onPacket(expected); + verify(subscriptionHandler).onPacket(argThat(argument -> { + assertArrayEquals(expected,argument); + return true; + })); + } + + + @Test + void onMessageWithUnKnownChannel_ThenFail() throws Exception { + String topic = "test"; + String message = "message"; + + RedisPacketListener redisPacketListener = new RedisPacketListener(subscriptionHandler, topic, Logger.getAnonymousLogger()); + + assertThrows(NullPointerException.class, () -> redisPacketListener.onMessage("fake-test", message)); + } + + @Test + void subscribe() { + } + + @Test + void stop() { + } + + @Test + void getCustomSubscribedChannels_ThenSuccess() { + String topic = "test"; + String message = "message"; + + RedisPacketListener redisPacketListener = new RedisPacketListener(subscriptionHandler, topic, Logger.getAnonymousLogger()); + + Collection result = redisPacketListener.getCustomSubscribedChannels(); + + assertNotNull(result); + assertEquals(1, result.size()); + assertTrue(result.contains(topic)); + } +} \ No newline at end of file diff --git a/meteor-jedis/src/test/java/com/meteormsg/transport/redis/RedisTransportTest.java b/meteor-jedis/src/test/java/com/meteormsg/transport/redis/RedisTransportTest.java index 6157fd1..10f22e5 100644 --- a/meteor-jedis/src/test/java/com/meteormsg/transport/redis/RedisTransportTest.java +++ b/meteor-jedis/src/test/java/com/meteormsg/transport/redis/RedisTransportTest.java @@ -4,9 +4,9 @@ import com.github.fppt.jedismock.operations.server.MockExecutor; import com.github.fppt.jedismock.server.ServiceOptions; import com.meteormsg.base.enums.Direction; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.opentest4j.AssertionFailedError; +import redis.clients.jedis.JedisPool; import java.io.IOException; import java.util.ArrayList; @@ -40,10 +40,10 @@ void sendValidImplementation_thenSuccess() throws IOException { })) .start(); - RedisTransport test = new RedisTransport(server.getHost(), server.getBindPort(), topic); - test.send(Direction.IMPLEMENTATION, message.getBytes()); + RedisTransport transport = new RedisTransport(server.getHost(), server.getBindPort(), topic); + transport.send(Direction.IMPLEMENTATION, message.getBytes()); - test.close(); + transport.close(); server.stop(); if (!assertionErrors.isEmpty()) { @@ -74,10 +74,10 @@ void sendValidMethodProxy_thenSuccess() throws IOException { })) .start(); - RedisTransport test = new RedisTransport(server.getHost(), server.getBindPort(), topic); - test.send(Direction.METHOD_PROXY, message.getBytes()); + RedisTransport transport = new RedisTransport(server.getHost(), server.getBindPort(), topic); + transport.send(Direction.METHOD_PROXY, message.getBytes()); - test.close(); + transport.close(); server.stop(); if (!assertionErrors.isEmpty()) { @@ -86,8 +86,7 @@ void sendValidMethodProxy_thenSuccess() throws IOException { } @Test - @Disabled("not implemented yet") - void subscribe_thenSuccess() throws IOException, InterruptedException { + void subscribeImplementation_thenSuccess() throws IOException, InterruptedException { String topic = "test"; String channel = "test_implementation"; @@ -95,9 +94,37 @@ void subscribe_thenSuccess() throws IOException, InterruptedException { RedisServer server = RedisServer.newRedisServer() .setOptions(ServiceOptions.withInterceptor((state, command, params) -> { - System.out.println("command: " + command); - System.out.println("params: " + params); + try { + if ("subscribe".equals(command)) { + assertEquals(channel, params.get(0).toString(), "Channel name is not correct"); + } + } catch (AssertionFailedError e) { + assertionErrors.add(e); + } + return MockExecutor.proceed(state, command, params); + })) + .start(); + RedisTransport transport = new RedisTransport(server.getHost(), server.getBindPort(), topic); + transport.subscribe(Direction.IMPLEMENTATION, packet -> true); + + transport.close(); + server.stop(); + + if (!assertionErrors.isEmpty()) { + throw assertionErrors.get(0); + } + } + + @Test + void subscribeMethodProxy_thenSuccess() throws IOException { + String topic = "test"; + String channel = "test_method_proxy"; + + List assertionErrors = new ArrayList<>(); + + RedisServer server = RedisServer.newRedisServer() + .setOptions(ServiceOptions.withInterceptor((state, command, params) -> { try { if ("subscribe".equals(command)) { assertEquals(channel, params.get(0).toString(), "Channel name is not correct"); @@ -109,10 +136,10 @@ void subscribe_thenSuccess() throws IOException, InterruptedException { })) .start(); - RedisTransport test = new RedisTransport(server.getHost(), server.getBindPort(), topic); - test.subscribe(Direction.IMPLEMENTATION, packet -> true); + RedisTransport transport = new RedisTransport(server.getHost(), server.getBindPort(), topic); + transport.subscribe(Direction.METHOD_PROXY, packet -> true); - test.close(); + transport.close(); server.stop(); if (!assertionErrors.isEmpty()) { @@ -121,11 +148,95 @@ void subscribe_thenSuccess() throws IOException, InterruptedException { } @Test - void getTopicName() { + void getTopicNameWithImplementationDirection_ThenSuccess() throws IOException { + String topic = "test"; + String expected = "test_implementation"; + + RedisServer server = RedisServer.newRedisServer().start(); + RedisTransport transport = new RedisTransport(server.getHost(), server.getBindPort(), topic); + + String resultTopicName = transport.getTopicName(Direction.IMPLEMENTATION); + assertEquals(expected, resultTopicName, "Topic name is not correct"); + + + transport.close(); + server.stop(); } @Test - void close() { + void getTopicNameWithMethodProxy_ThenSuccess() throws IOException { + String topic = "test"; + String expected = "test_method_proxy"; + + RedisServer server = RedisServer.newRedisServer().start(); + RedisTransport transport = new RedisTransport(server.getHost(), server.getBindPort(), topic); + + + String resultTopicName = transport.getTopicName(Direction.METHOD_PROXY); + assertEquals(expected, resultTopicName, "Topic name is not correct"); + + + transport.close(); + server.stop(); + } + + + + @Test + void getTopicNameWithNull_ThenFail() throws IOException { + String topic = "test"; + + RedisServer server = RedisServer.newRedisServer().start(); + RedisTransport transport = new RedisTransport(server.getHost(), server.getBindPort(), topic); + + + assertThrowsExactly(NullPointerException.class, () -> { + transport.getTopicName(null); + }, "Method returned a topic name for a null direction"); + + + transport.close(); + server.stop(); + } + + @Test + void close_ThenSuccess() throws IOException { + String topic = "test"; + + RedisServer server = RedisServer.newRedisServer().start(); + + JedisPool jedisPool = new JedisPool(server.getHost(), server.getBindPort()); + RedisTransport transport = new RedisTransport(jedisPool, topic); + + transport.close(); + server.stop(); + + assertThrowsExactly(IllegalStateException.class, () -> { + transport.send(Direction.IMPLEMENTATION, "test".getBytes()); + + }, "Method did not throw an exception when trying to send a message after closing"); + assertThrowsExactly(IllegalStateException.class, () -> { + transport.subscribe(Direction.IMPLEMENTATION, (data) -> true); + }, "Method did not throw an exception when trying to send a message after closing"); + + assertTrue(jedisPool.isClosed()); + } + + + @Test + void closeWhenAlreadyClosed_ThenSuccess() throws IOException { + String topic = "test"; + + RedisServer server = RedisServer.newRedisServer().start(); + + JedisPool jedisPool = new JedisPool(server.getHost(), server.getBindPort()); + RedisTransport transport = new RedisTransport(jedisPool, topic); + + transport.close(); + server.stop(); + + assertTrue(jedisPool.isClosed()); + } } \ No newline at end of file diff --git a/pom.xml b/pom.xml index 7dc1af9..04d008a 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,20 @@ UTF-8 + + + GNU General Public License v3.0 + https://www.gnu.org/licenses/gpl-3.0.html + repo + + + + + scm:git:git@github.com/MeteorMsg/Meteor.git + scm:git:ssh://github.com/MeteorMsg/Meteor.git + https://github.com/MeteorMsg/Meteor/tree/main + + @@ -32,6 +46,13 @@ 5.10.0 + + org.mockito + mockito-junit-jupiter + 4.8.1 + test + + redis.clients jedis