Skip to content

Commit

Permalink
Fixed jedis connecting and added more jedis tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Duckelekuuk committed Aug 27, 2023
1 parent 7c65bad commit f55decd
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@

import java.io.Closeable;
import java.io.IOException;
import java.sql.Time;
import java.util.Collection;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class TransportHandler implements Closeable {

Expand Down Expand Up @@ -41,7 +45,7 @@ public TransportHandler(
this.executorPool = Executors.newFixedThreadPool(threadPoolSize, r -> new Thread(r, "meteor-executor-thread"));

transport.subscribe(Direction.METHOD_PROXY, this::handleInvocationResponse);
transport.subscribe(Direction.IMPLEMENTATION, this::handleInvocationRequest);
transport.subscribe(Direction.IMPLEMENTATION, TransportHandler.this::handleInvocationRequest);
}

private boolean handleInvocationResponse(byte[] bytes) throws ClassNotFoundException {
Expand Down
6 changes: 6 additions & 0 deletions meteor-jedis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,11 @@
<version>1.0.10</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ public class RedisSubscriptionThread {

private RedisPacketListener jedisPacketListener;

private Jedis currentConnection;

private final ExecutorService listenerThread = Executors.newSingleThreadExecutor(r -> new Thread(r, "meteor-redis-listener-thread"));

public RedisSubscriptionThread(SubscriptionHandler messageBroker, Logger logger, String channel, JedisPool jedisPool) {
Expand All @@ -33,18 +31,13 @@ public RedisSubscriptionThread(SubscriptionHandler messageBroker, Logger logger,
}

public CompletableFuture<Boolean> start() {
CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();

jedisPacketListener = new RedisPacketListener(messageBroker, defaultChannel, logger);

Runnable runnable = () -> {
while (!Thread.currentThread().isInterrupted()) {
try (Jedis connection = jedisPool.getResource()) {
this.currentConnection = connection;

connection.ping();
logger.info("Redis connected!");
completableFuture.complete(true);

//Start blocking
connection.subscribe(jedisPacketListener, jedisPacketListener.getCustomSubscribedChannels().toArray(new String[]{}));
Expand All @@ -67,7 +60,8 @@ public CompletableFuture<Boolean> start() {

listenerThread.execute(runnable);

return completableFuture;
return isSubscribed();

}

public void subscribe(String channel, SubscriptionHandler onReceive) {
Expand All @@ -81,4 +75,25 @@ public void stop() {
jedisPacketListener.stop();
listenerThread.shutdownNow();
}

private CompletableFuture<Boolean> isSubscribed() {
return CompletableFuture.supplyAsync(() -> {
final int maxAttempts = 5;
for (int i = 0; i < maxAttempts; i++) {
if (jedisPacketListener.isSubscribed()) {
return true;
}

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread was interrupted while waiting for subscription", e);
}
}

// If it fails to subscribe within 5 attempts (5 seconds), throw an exception
throw new IllegalStateException("Failed to subscribe within the given timeframe");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,21 @@ public RedisTransport(String host, int port, String topic) {

@Override
public void send(Direction direction, byte[] bytes) {
if (jedisPool.isClosed()) {
throw new IllegalStateException("Jedis pool is closed");
}

try (Jedis connection = jedisPool.getResource()) {
connection.publish(getTopicName(direction), Base64.getEncoder().encodeToString(bytes));
}
}

@Override
public void subscribe(Direction direction, SubscriptionHandler onReceive) {
if (jedisPool.isClosed()) {
throw new IllegalStateException("Jedis pool is closed");
}

if (redisSubscriptionThread == null) {
redisSubscriptionThread = new RedisSubscriptionThread(onReceive, logger, getTopicName(direction), jedisPool);
redisSubscriptionThread.start().join();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.meteormsg.transport.redis;

import com.meteormsg.base.interfaces.SubscriptionHandler;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Base64;
import java.util.Collection;
import java.util.logging.Logger;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
class RedisPacketListenerTest {

@Mock
SubscriptionHandler subscriptionHandler;

@Test
void onMessageWithValidChannel_ThenSuccess() throws Exception {
String topic = "test";
String message = "message";
byte[] expected = Base64.getDecoder().decode(message);

RedisPacketListener redisPacketListener = new RedisPacketListener(subscriptionHandler, topic, Logger.getAnonymousLogger());

redisPacketListener.onMessage(topic, message);
verify(subscriptionHandler, times(1)).onPacket(expected);
verify(subscriptionHandler).onPacket(argThat(argument -> {
assertArrayEquals(expected,argument);
return true;
}));
}


@Test
void onMessageWithUnKnownChannel_ThenFail() throws Exception {
String topic = "test";
String message = "message";

RedisPacketListener redisPacketListener = new RedisPacketListener(subscriptionHandler, topic, Logger.getAnonymousLogger());

assertThrows(NullPointerException.class, () -> redisPacketListener.onMessage("fake-test", message));
}

@Test
void subscribe() {
}

@Test
void stop() {
}

@Test
void getCustomSubscribedChannels_ThenSuccess() {
String topic = "test";
String message = "message";

RedisPacketListener redisPacketListener = new RedisPacketListener(subscriptionHandler, topic, Logger.getAnonymousLogger());

Collection<String> result = redisPacketListener.getCustomSubscribedChannels();

assertNotNull(result);
assertEquals(1, result.size());
assertTrue(result.contains(topic));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import com.github.fppt.jedismock.operations.server.MockExecutor;
import com.github.fppt.jedismock.server.ServiceOptions;
import com.meteormsg.base.enums.Direction;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.opentest4j.AssertionFailedError;
import redis.clients.jedis.JedisPool;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -40,10 +40,10 @@ void sendValidImplementation_thenSuccess() throws IOException {
}))
.start();

RedisTransport test = new RedisTransport(server.getHost(), server.getBindPort(), topic);
test.send(Direction.IMPLEMENTATION, message.getBytes());
RedisTransport transport = new RedisTransport(server.getHost(), server.getBindPort(), topic);
transport.send(Direction.IMPLEMENTATION, message.getBytes());

test.close();
transport.close();
server.stop();

if (!assertionErrors.isEmpty()) {
Expand Down Expand Up @@ -74,10 +74,10 @@ void sendValidMethodProxy_thenSuccess() throws IOException {
}))
.start();

RedisTransport test = new RedisTransport(server.getHost(), server.getBindPort(), topic);
test.send(Direction.METHOD_PROXY, message.getBytes());
RedisTransport transport = new RedisTransport(server.getHost(), server.getBindPort(), topic);
transport.send(Direction.METHOD_PROXY, message.getBytes());

test.close();
transport.close();
server.stop();

if (!assertionErrors.isEmpty()) {
Expand All @@ -86,18 +86,45 @@ void sendValidMethodProxy_thenSuccess() throws IOException {
}

@Test
@Disabled("not implemented yet")
void subscribe_thenSuccess() throws IOException, InterruptedException {
void subscribeImplementation_thenSuccess() throws IOException, InterruptedException {
String topic = "test";
String channel = "test_implementation";

List<AssertionFailedError> assertionErrors = new ArrayList<>();

RedisServer server = RedisServer.newRedisServer()
.setOptions(ServiceOptions.withInterceptor((state, command, params) -> {
System.out.println("command: " + command);
System.out.println("params: " + params);
try {
if ("subscribe".equals(command)) {
assertEquals(channel, params.get(0).toString(), "Channel name is not correct");
}
} catch (AssertionFailedError e) {
assertionErrors.add(e);
}
return MockExecutor.proceed(state, command, params);
}))
.start();

RedisTransport transport = new RedisTransport(server.getHost(), server.getBindPort(), topic);
transport.subscribe(Direction.IMPLEMENTATION, packet -> true);

transport.close();
server.stop();

if (!assertionErrors.isEmpty()) {
throw assertionErrors.get(0);
}
}

@Test
void subscribeMethodProxy_thenSuccess() throws IOException {
String topic = "test";
String channel = "test_method_proxy";

List<AssertionFailedError> assertionErrors = new ArrayList<>();

RedisServer server = RedisServer.newRedisServer()
.setOptions(ServiceOptions.withInterceptor((state, command, params) -> {
try {
if ("subscribe".equals(command)) {
assertEquals(channel, params.get(0).toString(), "Channel name is not correct");
Expand All @@ -109,10 +136,10 @@ void subscribe_thenSuccess() throws IOException, InterruptedException {
}))
.start();

RedisTransport test = new RedisTransport(server.getHost(), server.getBindPort(), topic);
test.subscribe(Direction.IMPLEMENTATION, packet -> true);
RedisTransport transport = new RedisTransport(server.getHost(), server.getBindPort(), topic);
transport.subscribe(Direction.METHOD_PROXY, packet -> true);

test.close();
transport.close();
server.stop();

if (!assertionErrors.isEmpty()) {
Expand All @@ -121,11 +148,95 @@ void subscribe_thenSuccess() throws IOException, InterruptedException {
}

@Test
void getTopicName() {
void getTopicNameWithImplementationDirection_ThenSuccess() throws IOException {
String topic = "test";
String expected = "test_implementation";

RedisServer server = RedisServer.newRedisServer().start();
RedisTransport transport = new RedisTransport(server.getHost(), server.getBindPort(), topic);


String resultTopicName = transport.getTopicName(Direction.IMPLEMENTATION);
assertEquals(expected, resultTopicName, "Topic name is not correct");


transport.close();
server.stop();
}

@Test
void close() {
void getTopicNameWithMethodProxy_ThenSuccess() throws IOException {
String topic = "test";
String expected = "test_method_proxy";

RedisServer server = RedisServer.newRedisServer().start();
RedisTransport transport = new RedisTransport(server.getHost(), server.getBindPort(), topic);


String resultTopicName = transport.getTopicName(Direction.METHOD_PROXY);
assertEquals(expected, resultTopicName, "Topic name is not correct");


transport.close();
server.stop();
}



@Test
void getTopicNameWithNull_ThenFail() throws IOException {
String topic = "test";

RedisServer server = RedisServer.newRedisServer().start();
RedisTransport transport = new RedisTransport(server.getHost(), server.getBindPort(), topic);


assertThrowsExactly(NullPointerException.class, () -> {
transport.getTopicName(null);
}, "Method returned a topic name for a null direction");


transport.close();
server.stop();
}

@Test
void close_ThenSuccess() throws IOException {
String topic = "test";

RedisServer server = RedisServer.newRedisServer().start();

JedisPool jedisPool = new JedisPool(server.getHost(), server.getBindPort());
RedisTransport transport = new RedisTransport(jedisPool, topic);

transport.close();
server.stop();

assertThrowsExactly(IllegalStateException.class, () -> {
transport.send(Direction.IMPLEMENTATION, "test".getBytes());

}, "Method did not throw an exception when trying to send a message after closing");
assertThrowsExactly(IllegalStateException.class, () -> {
transport.subscribe(Direction.IMPLEMENTATION, (data) -> true);
}, "Method did not throw an exception when trying to send a message after closing");

assertTrue(jedisPool.isClosed());
}


@Test
void closeWhenAlreadyClosed_ThenSuccess() throws IOException {
String topic = "test";

RedisServer server = RedisServer.newRedisServer().start();

JedisPool jedisPool = new JedisPool(server.getHost(), server.getBindPort());
RedisTransport transport = new RedisTransport(jedisPool, topic);

transport.close();
server.stop();

assertTrue(jedisPool.isClosed());

}
}
Loading

0 comments on commit f55decd

Please sign in to comment.