Skip to content

Commit

Permalink
Develop (#14)
Browse files Browse the repository at this point in the history
* Updated version in readme, added mvn central badge, fix typo

* Added multiply example

* Optimized all imports

* Log errors thrown by implementations (#8)

* Version bump

* Add JaCoCo badge

* Bump jedis due to org.json cve

* Update develop (#10)

* Release (#9)

* Updated version in readme, added mvn central badge, fix typo

* Added multiply example

* Optimized all imports

* Log errors thrown by implementations (#8)

* Version bump

---------

Co-authored-by: Duckelekuuk <[email protected]>
Co-authored-by: Duckelekuuk <[email protected]>

* Add JaCoCo badge

---------

Co-authored-by: Duckelekuuk <[email protected]>
Co-authored-by: Duckelekuuk <[email protected]>
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>

* Make redis ignore itself, if configured (#13)

---------

Co-authored-by: Duckelekuuk <[email protected]>
Co-authored-by: Duckelekuuk <[email protected]>
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
4 people authored Mar 4, 2024
1 parent 406aeba commit c475ee2
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.pixelib.meteor.base.interfaces;

@FunctionalInterface
public interface SubscriptionHandler {

boolean onPacket(byte[] packet) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public void run() {
}

isTimedOut.set(true);

this.completable.completeExceptionally(
new InvocationTimedOutException(invocationDescriptor.getMethodName(), invocationDescriptor.getNamespace(), timeoutSeconds)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public void completeInvocation(InvocationResponse invocationResponse) {
// do we have a pending invocation for this invocation id?
PendingInvocation<?> pendingInvocation = pendingInvocations.get(invocationResponse.getInvocationId());
if (pendingInvocation == null) {
throw new IllegalStateException("No pending invocation found for invocation id " + invocationResponse.getInvocationId());
throw new IllegalStateException("No pending invocation found for invocation id " + invocationResponse.getInvocationId() + ". Data: " + invocationResponse.getResult());
//return;
}

pendingInvocation.complete(invocationResponse.getResult());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -18,10 +19,10 @@ public class RedisPacketListener extends JedisPubSub {
private final Logger logger;

private final ExecutorService jedisThreadPool = Executors.newCachedThreadPool();
private final Map<String, Set<SubscriptionHandler>> messageBrokers = new ConcurrentHashMap<>();
private final Map<String, Set<StringMessageBroker>> messageBrokers = new ConcurrentHashMap<>();
private final Collection<String> customSubscribedChannels = ConcurrentHashMap.newKeySet();

public RedisPacketListener(SubscriptionHandler messageBroker, String startChannel, Logger logger) {
public RedisPacketListener(StringMessageBroker messageBroker, String startChannel, Logger logger) {
this.logger = logger;
registerBroker(startChannel, messageBroker);
customSubscribedChannels.add(startChannel);
Expand All @@ -31,14 +32,14 @@ public RedisPacketListener(SubscriptionHandler messageBroker, String startChanne
public void onMessage(String channel, String message) {
messageBrokers.get(channel).forEach(subscriptionHandler -> {
try {
subscriptionHandler.onPacket(Base64.getDecoder().decode(message));
subscriptionHandler.onRedisMessage(message);
} catch (Exception exception) {
logger.log(Level.SEVERE, "Error while handling packet", exception);
}
});
}

public void subscribe(String channel, SubscriptionHandler onReceive) {
public void subscribe(String channel, StringMessageBroker onReceive) {
registerBroker(channel, onReceive);

if (customSubscribedChannels.add(channel)) {
Expand All @@ -55,7 +56,7 @@ public Collection<String> getCustomSubscribedChannels() {
return customSubscribedChannels;
}

private void registerBroker(String channel, SubscriptionHandler onReceive) {
private void registerBroker(String channel, StringMessageBroker onReceive) {
messageBrokers.computeIfAbsent(channel, key -> ConcurrentHashMap.newKeySet()).add(onReceive);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

public class RedisSubscriptionThread {

private final SubscriptionHandler messageBroker;
private final StringMessageBroker messageBroker;
private final Logger logger;
private final String defaultChannel;
private final JedisPool jedisPool;
Expand All @@ -28,7 +29,7 @@ public class RedisSubscriptionThread {
return thread;
});

public RedisSubscriptionThread(SubscriptionHandler messageBroker, Logger logger, String channel, JedisPool jedisPool) {
public RedisSubscriptionThread(StringMessageBroker messageBroker, Logger logger, String channel, JedisPool jedisPool) {
this.messageBroker = messageBroker;
this.logger = logger;
this.defaultChannel = channel;
Expand Down Expand Up @@ -69,18 +70,18 @@ public CompletableFuture<Boolean> start() {

}

public void subscribe(String channel, SubscriptionHandler onReceive) {
jedisPacketListener.subscribe(channel, onReceive);

}

public void stop() {
if (isStopping) return;
isStopping = true;
jedisPacketListener.stop();
listenerThread.shutdownNow();
}

public void subscribe(String channel, StringMessageBroker onReceive) {
jedisPacketListener.subscribe(channel, onReceive);

}

private CompletableFuture<Boolean> isSubscribed() {
return CompletableFuture.supplyAsync(() -> {
final int maxAttempts = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.io.IOException;
import java.util.Base64;
import java.util.Locale;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.logging.Logger;

public class RedisTransport implements RpcTransport {
Expand All @@ -18,6 +20,8 @@ public class RedisTransport implements RpcTransport {
private final JedisPool jedisPool;
private final String topic;
private RedisSubscriptionThread redisSubscriptionThread;
private final UUID transportId = UUID.randomUUID();
private boolean ignoreSelf = true;

public RedisTransport(JedisPool jedisPool, String topic) {
this.jedisPool = jedisPool;
Expand All @@ -34,14 +38,22 @@ public RedisTransport(String host, int port, String topic) {
this.topic = topic;
}

public RedisTransport withIgnoreSelf(boolean ignoreSelf) {
this.ignoreSelf = ignoreSelf;
return this;
}

@Override
public void send(Direction direction, byte[] bytes) {
if (jedisPool.isClosed()) {
throw new IllegalStateException("Jedis pool is closed");
}

try (Jedis connection = jedisPool.getResource()) {
connection.publish(getTopicName(direction), Base64.getEncoder().encodeToString(bytes));
connection.publish(
getTopicName(direction),
transportId + Base64.getEncoder().encodeToString(bytes)
);
}
}

Expand All @@ -51,11 +63,31 @@ public void subscribe(Direction direction, SubscriptionHandler onReceive) {
throw new IllegalStateException("Jedis pool is closed");
}

StringMessageBroker wrappedHandler = (message) -> {
byte[] bytes = message.getBytes();
// only split after UUID, which always has a length of 36
byte[] uuid = new byte[36];
System.arraycopy(bytes, 0, uuid, 0, 36);

if (ignoreSelf && transportId.toString().equals(new String(uuid))) {
return false;
}

byte[] data = new byte[bytes.length - 36];
System.arraycopy(bytes, 36, data, 0, data.length);

try {
return onReceive.onPacket(Base64.getDecoder().decode(data));
} catch (Exception e) {
throw new RuntimeException(e);
}
};

if (redisSubscriptionThread == null) {
redisSubscriptionThread = new RedisSubscriptionThread(onReceive, logger, getTopicName(direction), jedisPool);
redisSubscriptionThread = new RedisSubscriptionThread(wrappedHandler, logger, getTopicName(direction), jedisPool);
redisSubscriptionThread.start().join();
} else {
redisSubscriptionThread.subscribe(getTopicName(direction), onReceive);
redisSubscriptionThread.subscribe(getTopicName(direction), wrappedHandler);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package dev.pixelib.meteor.transport.redis;

@FunctionalInterface
public interface StringMessageBroker {

boolean onRedisMessage(String message) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,45 +21,42 @@
class RedisPacketListenerTest {

@Mock
SubscriptionHandler subscriptionHandler;
StringMessageBroker subscriptionHandler;

@Test
void onMessage_withValidChannel() throws Exception {
String topic = "test";
String message = "message";
byte[] expected = Base64.getDecoder().decode(message);
String expected = "message";

RedisPacketListener redisPacketListener = new RedisPacketListener(subscriptionHandler, topic, Logger.getAnonymousLogger());

redisPacketListener.onMessage(topic, message);
verify(subscriptionHandler, times(1)).onPacket(expected);
verify(subscriptionHandler).onPacket(argThat(argument -> {
assertArrayEquals(expected,argument);
redisPacketListener.onMessage(topic, expected);
verify(subscriptionHandler, times(1)).onRedisMessage(expected);
verify(subscriptionHandler).onRedisMessage(argThat(argument -> {
assertEquals(expected,argument);
return true;
}));
}

@Test
void onMessage_throwException() throws Exception {
String topic = "test";
String message = "message";
byte[] expected = Base64.getDecoder().decode(message);

String expected = "message";

SubscriptionHandler handler = new SubscriptionHandler() {
StringMessageBroker handler = new StringMessageBroker() {
@Override
public boolean onPacket(byte[] packet) throws Exception {
public boolean onRedisMessage(String message) throws Exception {
throw new NullPointerException();
}
};

SubscriptionHandler handlerSub = spy(handler);
StringMessageBroker handlerSub = spy(handler);
RedisPacketListener redisPacketListener = new RedisPacketListener(handlerSub, topic, Logger.getAnonymousLogger());

redisPacketListener.onMessage(topic, message);
verify(handlerSub, times(1)).onPacket(expected);
verify(handlerSub).onPacket(argThat(argument -> {
assertArrayEquals(expected,argument);
redisPacketListener.onMessage(topic, expected);
verify(handlerSub, times(1)).onRedisMessage(expected);
verify(handlerSub).onRedisMessage(argThat(argument -> {
assertEquals(expected,argument);
return true;
}));
}
Expand Down

0 comments on commit c475ee2

Please sign in to comment.