Skip to content

Commit

Permalink
Refactoring: converted loop into stream processing (moquette-io#817)
Browse files Browse the repository at this point in the history
Converted a loop with various stages (subscription add and filtering to send retained) into fluent stream map and filter
  • Loading branch information
andsel authored Feb 12, 2024
1 parent eff06d8 commit ed39019
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 79 deletions.
54 changes: 31 additions & 23 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -371,28 +370,11 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
}
}).collect(Collectors.toList());

final Set<Subscription> subscriptionToSendRetained = new HashSet<>();
for (Subscription subscription : newSubscriptions) {
boolean newlyAdded;
MqttSubscriptionOption subOptions = subscription.option();
if (subscriptionIdOpt.isPresent()) {
newlyAdded = subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subOptions,
subscriptionIdOpt.get());
} else {
newlyAdded = subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subOptions);
}

switch (subOptions.retainHandling()) {
case SEND_AT_SUBSCRIBE:
subscriptionToSendRetained.add(subscription);
break;
case SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS:
if (newlyAdded) {
subscriptionToSendRetained.add(subscription);
}
break;
}
}
final Set<Subscription> subscriptionToSendRetained = newSubscriptions.stream()
.map(this::addSubscriptionReportingNewStatus) // mutating operation of SubscriptionDirectory
.filter(PostOffice::needToReceiveRetained)
.map(couple -> couple.v2)
.collect(Collectors.toSet());

for (SharedSubscriptionData sharedSubData : sharedSubscriptions) {
if (subscriptionIdOpt.isPresent()) {
Expand All @@ -417,6 +399,32 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
}
}

private static boolean needToReceiveRetained(Utils.Couple<Boolean, Subscription> addedAndSub) {
MqttSubscriptionOption subOptions = addedAndSub.v2.option();
switch (subOptions.retainHandling()) {
case SEND_AT_SUBSCRIBE:
return true;
case SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS:
if (addedAndSub.v1) {
return true;
}
default:
return false;
}
}

private Utils.Couple<Boolean, Subscription> addSubscriptionReportingNewStatus(Subscription subscription) {
final boolean newlyAdded;
if (subscription.hasSubscriptionIdentifier()) {
SubscriptionIdentifier subscriptionId = subscription.getSubscriptionIdentifier();
newlyAdded = subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(),
subscription.option(), subscriptionId);
} else {
newlyAdded = subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.option());
}
return new Utils.Couple<>(newlyAdded, subscription);
}

private List<MqttTopicSubscription> updateWithMaximumSupportedQoS(List<MqttTopicSubscription> subscriptions) {
return subscriptions.stream()
.map(this::updateWithMaximumSupportedQoS)
Expand Down
28 changes: 28 additions & 0 deletions broker/src/main/java/io/moquette/broker/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.handler.codec.mqtt.MqttVersion;

import java.util.Map;
import java.util.Objects;

/**
* Utility static methods, like Map get with default value, or elvis operator.
Expand Down Expand Up @@ -55,4 +56,31 @@ public static MqttVersion versionFromConnect(MqttConnectMessage msg) {

private Utils() {
}

public static final class Couple<K, L> {
public final K v1;
public final L v2;

public Couple(K v1, L v2) {
this.v1 = v1;
this.v2 = v2;
}

public static <K, L> Couple<K, L> of(K v1, L v2) {
return new Couple<>(v1, v2);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Couple<?, ?> couple = (Couple<?, ?>) o;
return Objects.equals(v1, couple.v1) && Objects.equals(v2, couple.v2);
}

@Override
public int hashCode() {
return Objects.hash(v1, v2);
}
}
}
30 changes: 0 additions & 30 deletions broker/src/main/java/io/moquette/persistence/Couple.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.moquette.persistence;

import io.moquette.broker.ISubscriptionsRepository;
import io.moquette.broker.Utils;
import io.moquette.broker.subscriptions.ShareName;
import io.moquette.broker.subscriptions.SharedSubscription;
import io.moquette.broker.subscriptions.Subscription;
Expand Down Expand Up @@ -33,7 +34,7 @@ public class H2SubscriptionsRepository implements ISubscriptionsRepository {
private static final String SUBSCRIPTIONS_MAP = "subscriptions";
private static final String SHARED_SUBSCRIPTIONS_MAP = "shared_subscriptions";
private final MVStore mvStore;
private final MVMap.Builder<Couple<ShareName, Topic>, SubscriptionOptionAndId> submapBuilder;
private final MVMap.Builder<Utils.Couple<ShareName, Topic>, SubscriptionOptionAndId> submapBuilder;

private MVMap<String, Subscription> subscriptions;
// clientId -> shared subscription map name
Expand All @@ -44,7 +45,7 @@ public class H2SubscriptionsRepository implements ISubscriptionsRepository {
H2SubscriptionsRepository(MVStore mvStore) {
this.mvStore = mvStore;

submapBuilder = new MVMap.Builder<Couple<ShareName, Topic>, SubscriptionOptionAndId>()
submapBuilder = new MVMap.Builder<Utils.Couple<ShareName, Topic>, SubscriptionOptionAndId>()
.keyType(new CoupleValueType())
.valueType(new SubscriptionOptionAndIdValueType());

Expand Down Expand Up @@ -98,8 +99,8 @@ public void removeSharedSubscription(String clientId, ShareName share, Topic top
LOG.info("Removing a non existing shared subscription for client: {}", clientId);
return;
}
MVMap<Couple<ShareName, Topic>, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder);
Couple<ShareName, Topic> sharedSubKey = Couple.of(share, topicFilter);
MVMap<Utils.Couple<ShareName, Topic>, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder);
Utils.Couple<ShareName, Topic> sharedSubKey = Utils.Couple.of(share, topicFilter);

// remove from submap, null means the key didn't exist
if (subMap.remove(sharedSubKey) == null) {
Expand Down Expand Up @@ -139,8 +140,8 @@ private void storeNewSharedSubscription(String clientId, ShareName share, Topic
H2SubscriptionsRepository::computeShareSubscriptionSubMap);

// maps the couple (share name, topic) to requested qos
MVMap<Couple<ShareName, Topic>, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder);
subMap.put(Couple.of(share, topicFilter), value);
MVMap<Utils.Couple<ShareName, Topic>, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder);
subMap.put(Utils.Couple.of(share, topicFilter), value);
}

@Override
Expand All @@ -158,8 +159,8 @@ public Collection<SharedSubscription> listAllSharedSubscription() {
String clientId = entry.getKey();
String sharedSubsMapName = entry.getValue();

MVMap<Couple<ShareName, Topic>, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder);
for (Map.Entry<Couple<ShareName, Topic>, SubscriptionOptionAndId> subEntry : subMap.entrySet()) {
MVMap<Utils.Couple<ShareName, Topic>, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder);
for (Map.Entry<Utils.Couple<ShareName, Topic>, SubscriptionOptionAndId> subEntry : subMap.entrySet()) {
final ShareName shareName = subEntry.getKey().v1;
final Topic topicFilter = subEntry.getKey().v2;
final MqttSubscriptionOption option = subEntry.getValue().option;
Expand All @@ -183,39 +184,39 @@ private static String computeShareSubscriptionSubMap(String sessionId) {
return SHARED_SUBSCRIPTIONS_MAP + "_" + sessionId;
}

static final class CoupleValueType extends BasicDataType<Couple<ShareName, Topic>> {
static final class CoupleValueType extends BasicDataType<Utils.Couple<ShareName, Topic>> {

private final Comparator<Couple<ShareName, Topic>> coupleComparator =
Comparator.<Couple<ShareName, Topic>, String>comparing(c -> c.v1.getShareName())
private final Comparator<Utils.Couple<ShareName, Topic>> coupleComparator =
Comparator.<Utils.Couple<ShareName, Topic>, String>comparing(c -> c.v1.getShareName())
.thenComparing(c -> c.v2.toString());

@Override
public int compare(Couple<ShareName, Topic> var1, Couple<ShareName, Topic> var2) {
public int compare(Utils.Couple<ShareName, Topic> var1, Utils.Couple<ShareName, Topic> var2) {
return coupleComparator.compare(var1, var2);
}

@Override
public int getMemory(Couple<ShareName, Topic> couple) {
public int getMemory(Utils.Couple<ShareName, Topic> couple) {
return StringDataType.INSTANCE.getMemory(couple.v1.getShareName()) +
StringDataType.INSTANCE.getMemory(couple.v2.toString());
}

@Override
public void write(WriteBuffer buff, Couple<ShareName, Topic> couple) {
public void write(WriteBuffer buff, Utils.Couple<ShareName, Topic> couple) {
StringDataType.INSTANCE.write(buff, couple.v1.getShareName());
StringDataType.INSTANCE.write(buff, couple.v2.toString());
}

@Override
public Couple<ShareName, Topic> read(ByteBuffer buffer) {
public Utils.Couple<ShareName, Topic> read(ByteBuffer buffer) {
String shareName = StringDataType.INSTANCE.read(buffer);
String topicFilter = StringDataType.INSTANCE.read(buffer);
return new Couple<>(new ShareName(shareName), Topic.asTopic(topicFilter));
return new Utils.Couple<>(new ShareName(shareName), Topic.asTopic(topicFilter));
}

@Override
public Couple<ShareName, Topic>[] createStorage(int i) {
return new Couple[i];
public Utils.Couple<ShareName, Topic>[] createStorage(int i) {
return new Utils.Couple[i];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
package io.moquette.persistence;

import io.moquette.broker.ISubscriptionsRepository;
import io.moquette.broker.Utils;
import io.moquette.broker.subscriptions.ShareName;
import io.moquette.broker.subscriptions.SharedSubscription;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.SubscriptionIdentifier;
import io.moquette.broker.subscriptions.Topic;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,7 +39,7 @@ public class MemorySubscriptionsRepository implements ISubscriptionsRepository {

private static final Logger LOG = LoggerFactory.getLogger(MemorySubscriptionsRepository.class);
private final Set<Subscription> subscriptions = new ConcurrentSkipListSet<>();
private final Map<String, Map<Couple<ShareName, Topic>, SharedSubscription>> sharedSubscriptions = new HashMap<>();
private final Map<String, Map<Utils.Couple<ShareName, Topic>, SharedSubscription>> sharedSubscriptions = new HashMap<>();

@Override
public Set<Subscription> listAllSubscriptions() {
Expand All @@ -66,12 +66,12 @@ public void removeAllSharedSubscriptions(String clientId) {

@Override
public void removeSharedSubscription(String clientId, ShareName share, Topic topicFilter) {
Map<Couple<ShareName, Topic>, SharedSubscription> subsMap = sharedSubscriptions.get(clientId);
Map<Utils.Couple<ShareName, Topic>, SharedSubscription> subsMap = sharedSubscriptions.get(clientId);
if (subsMap == null) {
LOG.info("Removing a non existing shared subscription for client: {}", clientId);
return;
}
subsMap.remove(Couple.of(share, topicFilter));
subsMap.remove(Utils.Couple.of(share, topicFilter));
if (subsMap.isEmpty()) {
// clean up an empty sub map
sharedSubscriptions.remove(clientId);
Expand All @@ -85,8 +85,8 @@ public void addNewSharedSubscription(String clientId, ShareName share, Topic top
}

private void storeNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, SharedSubscription sharedSub) {
Map<Couple<ShareName, Topic>, SharedSubscription> subsMap = sharedSubscriptions.computeIfAbsent(clientId, unused -> new HashMap<>());
subsMap.put(Couple.of(share, topicFilter), sharedSub);
Map<Utils.Couple<ShareName, Topic>, SharedSubscription> subsMap = sharedSubscriptions.computeIfAbsent(clientId, unused -> new HashMap<>());
subsMap.put(Utils.Couple.of(share, topicFilter), sharedSub);
}

@Override
Expand All @@ -99,8 +99,8 @@ public void addNewSharedSubscription(String clientId, ShareName share, Topic top
@Override
public Collection<SharedSubscription> listAllSharedSubscription() {
final List<SharedSubscription> result = new ArrayList<>();
for (Map.Entry<String, Map<Couple<ShareName, Topic>, SharedSubscription>> entry : sharedSubscriptions.entrySet()) {
for (Map.Entry<Couple<ShareName, Topic>, SharedSubscription> nestedEntry : entry.getValue().entrySet()) {
for (Map.Entry<String, Map<Utils.Couple<ShareName, Topic>, SharedSubscription>> entry : sharedSubscriptions.entrySet()) {
for (Map.Entry<Utils.Couple<ShareName, Topic>, SharedSubscription> nestedEntry : entry.getValue().entrySet()) {
result.add(nestedEntry.getValue());
}
}
Expand Down

0 comments on commit ed39019

Please sign in to comment.