diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index b3918a9e5..59f21bfd6 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -34,7 +34,6 @@ import io.netty.handler.codec.mqtt.MqttSubAckPayload; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttSubscriptionOption; -import io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy; import io.netty.handler.codec.mqtt.MqttTopicSubscription; import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; @@ -371,28 +370,11 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S } }).collect(Collectors.toList()); - final Set subscriptionToSendRetained = new HashSet<>(); - for (Subscription subscription : newSubscriptions) { - boolean newlyAdded; - MqttSubscriptionOption subOptions = subscription.option(); - if (subscriptionIdOpt.isPresent()) { - newlyAdded = subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subOptions, - subscriptionIdOpt.get()); - } else { - newlyAdded = subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subOptions); - } - - switch (subOptions.retainHandling()) { - case SEND_AT_SUBSCRIBE: - subscriptionToSendRetained.add(subscription); - break; - case SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS: - if (newlyAdded) { - subscriptionToSendRetained.add(subscription); - } - break; - } - } + final Set subscriptionToSendRetained = newSubscriptions.stream() + .map(this::addSubscriptionReportingNewStatus) // mutating operation of SubscriptionDirectory + .filter(PostOffice::needToReceiveRetained) + .map(couple -> couple.v2) + .collect(Collectors.toSet()); for (SharedSubscriptionData sharedSubData : sharedSubscriptions) { if (subscriptionIdOpt.isPresent()) { @@ -417,6 +399,32 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S } } + private static boolean needToReceiveRetained(Utils.Couple addedAndSub) { + MqttSubscriptionOption subOptions = addedAndSub.v2.option(); + switch (subOptions.retainHandling()) { + case SEND_AT_SUBSCRIBE: + return true; + case SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS: + if (addedAndSub.v1) { + return true; + } + default: + return false; + } + } + + private Utils.Couple addSubscriptionReportingNewStatus(Subscription subscription) { + final boolean newlyAdded; + if (subscription.hasSubscriptionIdentifier()) { + SubscriptionIdentifier subscriptionId = subscription.getSubscriptionIdentifier(); + newlyAdded = subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), + subscription.option(), subscriptionId); + } else { + newlyAdded = subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.option()); + } + return new Utils.Couple<>(newlyAdded, subscription); + } + private List updateWithMaximumSupportedQoS(List subscriptions) { return subscriptions.stream() .map(this::updateWithMaximumSupportedQoS) diff --git a/broker/src/main/java/io/moquette/broker/Utils.java b/broker/src/main/java/io/moquette/broker/Utils.java index 3d671c266..1f2ae95af 100644 --- a/broker/src/main/java/io/moquette/broker/Utils.java +++ b/broker/src/main/java/io/moquette/broker/Utils.java @@ -23,6 +23,7 @@ import io.netty.handler.codec.mqtt.MqttVersion; import java.util.Map; +import java.util.Objects; /** * Utility static methods, like Map get with default value, or elvis operator. @@ -55,4 +56,31 @@ public static MqttVersion versionFromConnect(MqttConnectMessage msg) { private Utils() { } + + public static final class Couple { + public final K v1; + public final L v2; + + public Couple(K v1, L v2) { + this.v1 = v1; + this.v2 = v2; + } + + public static Couple of(K v1, L v2) { + return new Couple<>(v1, v2); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Couple couple = (Couple) o; + return Objects.equals(v1, couple.v1) && Objects.equals(v2, couple.v2); + } + + @Override + public int hashCode() { + return Objects.hash(v1, v2); + } + } } diff --git a/broker/src/main/java/io/moquette/persistence/Couple.java b/broker/src/main/java/io/moquette/persistence/Couple.java deleted file mode 100644 index 918ca1047..000000000 --- a/broker/src/main/java/io/moquette/persistence/Couple.java +++ /dev/null @@ -1,30 +0,0 @@ -package io.moquette.persistence; - -import java.util.Objects; - -final class Couple { - final K v1; - final L v2; - - public Couple(K v1, L v2) { - this.v1 = v1; - this.v2 = v2; - } - - public static Couple of(K v1, L v2) { - return new Couple<>(v1, v2); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Couple couple = (Couple) o; - return Objects.equals(v1, couple.v1) && Objects.equals(v2, couple.v2); - } - - @Override - public int hashCode() { - return Objects.hash(v1, v2); - } -} diff --git a/broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java b/broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java index d94987bb3..424e9b918 100644 --- a/broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java +++ b/broker/src/main/java/io/moquette/persistence/H2SubscriptionsRepository.java @@ -1,6 +1,7 @@ package io.moquette.persistence; import io.moquette.broker.ISubscriptionsRepository; +import io.moquette.broker.Utils; import io.moquette.broker.subscriptions.ShareName; import io.moquette.broker.subscriptions.SharedSubscription; import io.moquette.broker.subscriptions.Subscription; @@ -33,7 +34,7 @@ public class H2SubscriptionsRepository implements ISubscriptionsRepository { private static final String SUBSCRIPTIONS_MAP = "subscriptions"; private static final String SHARED_SUBSCRIPTIONS_MAP = "shared_subscriptions"; private final MVStore mvStore; - private final MVMap.Builder, SubscriptionOptionAndId> submapBuilder; + private final MVMap.Builder, SubscriptionOptionAndId> submapBuilder; private MVMap subscriptions; // clientId -> shared subscription map name @@ -44,7 +45,7 @@ public class H2SubscriptionsRepository implements ISubscriptionsRepository { H2SubscriptionsRepository(MVStore mvStore) { this.mvStore = mvStore; - submapBuilder = new MVMap.Builder, SubscriptionOptionAndId>() + submapBuilder = new MVMap.Builder, SubscriptionOptionAndId>() .keyType(new CoupleValueType()) .valueType(new SubscriptionOptionAndIdValueType()); @@ -98,8 +99,8 @@ public void removeSharedSubscription(String clientId, ShareName share, Topic top LOG.info("Removing a non existing shared subscription for client: {}", clientId); return; } - MVMap, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder); - Couple sharedSubKey = Couple.of(share, topicFilter); + MVMap, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder); + Utils.Couple sharedSubKey = Utils.Couple.of(share, topicFilter); // remove from submap, null means the key didn't exist if (subMap.remove(sharedSubKey) == null) { @@ -139,8 +140,8 @@ private void storeNewSharedSubscription(String clientId, ShareName share, Topic H2SubscriptionsRepository::computeShareSubscriptionSubMap); // maps the couple (share name, topic) to requested qos - MVMap, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder); - subMap.put(Couple.of(share, topicFilter), value); + MVMap, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder); + subMap.put(Utils.Couple.of(share, topicFilter), value); } @Override @@ -158,8 +159,8 @@ public Collection listAllSharedSubscription() { String clientId = entry.getKey(); String sharedSubsMapName = entry.getValue(); - MVMap, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder); - for (Map.Entry, SubscriptionOptionAndId> subEntry : subMap.entrySet()) { + MVMap, SubscriptionOptionAndId> subMap = mvStore.openMap(sharedSubsMapName, submapBuilder); + for (Map.Entry, SubscriptionOptionAndId> subEntry : subMap.entrySet()) { final ShareName shareName = subEntry.getKey().v1; final Topic topicFilter = subEntry.getKey().v2; final MqttSubscriptionOption option = subEntry.getValue().option; @@ -183,39 +184,39 @@ private static String computeShareSubscriptionSubMap(String sessionId) { return SHARED_SUBSCRIPTIONS_MAP + "_" + sessionId; } - static final class CoupleValueType extends BasicDataType> { + static final class CoupleValueType extends BasicDataType> { - private final Comparator> coupleComparator = - Comparator., String>comparing(c -> c.v1.getShareName()) + private final Comparator> coupleComparator = + Comparator., String>comparing(c -> c.v1.getShareName()) .thenComparing(c -> c.v2.toString()); @Override - public int compare(Couple var1, Couple var2) { + public int compare(Utils.Couple var1, Utils.Couple var2) { return coupleComparator.compare(var1, var2); } @Override - public int getMemory(Couple couple) { + public int getMemory(Utils.Couple couple) { return StringDataType.INSTANCE.getMemory(couple.v1.getShareName()) + StringDataType.INSTANCE.getMemory(couple.v2.toString()); } @Override - public void write(WriteBuffer buff, Couple couple) { + public void write(WriteBuffer buff, Utils.Couple couple) { StringDataType.INSTANCE.write(buff, couple.v1.getShareName()); StringDataType.INSTANCE.write(buff, couple.v2.toString()); } @Override - public Couple read(ByteBuffer buffer) { + public Utils.Couple read(ByteBuffer buffer) { String shareName = StringDataType.INSTANCE.read(buffer); String topicFilter = StringDataType.INSTANCE.read(buffer); - return new Couple<>(new ShareName(shareName), Topic.asTopic(topicFilter)); + return new Utils.Couple<>(new ShareName(shareName), Topic.asTopic(topicFilter)); } @Override - public Couple[] createStorage(int i) { - return new Couple[i]; + public Utils.Couple[] createStorage(int i) { + return new Utils.Couple[i]; } } diff --git a/broker/src/main/java/io/moquette/persistence/MemorySubscriptionsRepository.java b/broker/src/main/java/io/moquette/persistence/MemorySubscriptionsRepository.java index 25d6c63bc..94ef36845 100644 --- a/broker/src/main/java/io/moquette/persistence/MemorySubscriptionsRepository.java +++ b/broker/src/main/java/io/moquette/persistence/MemorySubscriptionsRepository.java @@ -16,12 +16,12 @@ package io.moquette.persistence; import io.moquette.broker.ISubscriptionsRepository; +import io.moquette.broker.Utils; import io.moquette.broker.subscriptions.ShareName; import io.moquette.broker.subscriptions.SharedSubscription; import io.moquette.broker.subscriptions.Subscription; import io.moquette.broker.subscriptions.SubscriptionIdentifier; import io.moquette.broker.subscriptions.Topic; -import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubscriptionOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +39,7 @@ public class MemorySubscriptionsRepository implements ISubscriptionsRepository { private static final Logger LOG = LoggerFactory.getLogger(MemorySubscriptionsRepository.class); private final Set subscriptions = new ConcurrentSkipListSet<>(); - private final Map, SharedSubscription>> sharedSubscriptions = new HashMap<>(); + private final Map, SharedSubscription>> sharedSubscriptions = new HashMap<>(); @Override public Set listAllSubscriptions() { @@ -66,12 +66,12 @@ public void removeAllSharedSubscriptions(String clientId) { @Override public void removeSharedSubscription(String clientId, ShareName share, Topic topicFilter) { - Map, SharedSubscription> subsMap = sharedSubscriptions.get(clientId); + Map, SharedSubscription> subsMap = sharedSubscriptions.get(clientId); if (subsMap == null) { LOG.info("Removing a non existing shared subscription for client: {}", clientId); return; } - subsMap.remove(Couple.of(share, topicFilter)); + subsMap.remove(Utils.Couple.of(share, topicFilter)); if (subsMap.isEmpty()) { // clean up an empty sub map sharedSubscriptions.remove(clientId); @@ -85,8 +85,8 @@ public void addNewSharedSubscription(String clientId, ShareName share, Topic top } private void storeNewSharedSubscription(String clientId, ShareName share, Topic topicFilter, SharedSubscription sharedSub) { - Map, SharedSubscription> subsMap = sharedSubscriptions.computeIfAbsent(clientId, unused -> new HashMap<>()); - subsMap.put(Couple.of(share, topicFilter), sharedSub); + Map, SharedSubscription> subsMap = sharedSubscriptions.computeIfAbsent(clientId, unused -> new HashMap<>()); + subsMap.put(Utils.Couple.of(share, topicFilter), sharedSub); } @Override @@ -99,8 +99,8 @@ public void addNewSharedSubscription(String clientId, ShareName share, Topic top @Override public Collection listAllSharedSubscription() { final List result = new ArrayList<>(); - for (Map.Entry, SharedSubscription>> entry : sharedSubscriptions.entrySet()) { - for (Map.Entry, SharedSubscription> nestedEntry : entry.getValue().entrySet()) { + for (Map.Entry, SharedSubscription>> entry : sharedSubscriptions.entrySet()) { + for (Map.Entry, SharedSubscription> nestedEntry : entry.getValue().entrySet()) { result.add(nestedEntry.getValue()); } }