Skip to content

Commit eff06d8

Browse files
authored
Implement subscription retain policy handling (moquette-io#816)
Modify SubscriptionDirectory's add methods to return the indication of the fact that the subscription was freshly created. Updates forwarding of retained messages during subscription processing to apply the three retained policies.
1 parent 0b92f0f commit eff06d8

File tree

7 files changed

+124
-19
lines changed

7 files changed

+124
-19
lines changed

ChangeLog.txt

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Version 0.18-SNAPSHOT:
44
- Exposed the maximum granted QoS by the server with the config setting 'max_server_granted_qos'. (#811)
55
- Implements handling of noLocal subscription option on MQTT5 connections. (#814)
66
- Implements subscription options retain as published feature. (#815)
7+
- Implements subscription options retains handling policies. (#816)
78
[feature] subscription identifiers: (issue #801)
89
- Implements the validation of subscription identifier properties in SUBSCRIBE. (#803)
910
- Store and retrieve the subscription identifier into the subscriptions directory. (#804)

broker/src/main/java/io/moquette/broker/PostOffice.java

+20-4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
3535
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
3636
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
37+
import io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
3738
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
3839
import io.netty.util.ReferenceCountUtil;
3940
import org.slf4j.Logger;
@@ -370,12 +371,26 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
370371
}
371372
}).collect(Collectors.toList());
372373

374+
final Set<Subscription> subscriptionToSendRetained = new HashSet<>();
373375
for (Subscription subscription : newSubscriptions) {
376+
boolean newlyAdded;
377+
MqttSubscriptionOption subOptions = subscription.option();
374378
if (subscriptionIdOpt.isPresent()) {
375-
subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.option(),
379+
newlyAdded = subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subOptions,
376380
subscriptionIdOpt.get());
377381
} else {
378-
subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.option());
382+
newlyAdded = subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subOptions);
383+
}
384+
385+
switch (subOptions.retainHandling()) {
386+
case SEND_AT_SUBSCRIBE:
387+
subscriptionToSendRetained.add(subscription);
388+
break;
389+
case SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS:
390+
if (newlyAdded) {
391+
subscriptionToSendRetained.add(subscription);
392+
}
393+
break;
379394
}
380395
}
381396

@@ -394,7 +409,8 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
394409
// send ack message
395410
mqttConnection.sendSubAckMessage(messageID, ackMessage);
396411

397-
publishRetainedMessagesForSubscriptions(clientID, newSubscriptions);
412+
// shared subscriptions doesn't receive retained messages
413+
publishRetainedMessagesForSubscriptions(clientID, subscriptionToSendRetained);
398414

399415
for (Subscription subscription : newSubscriptions) {
400416
interceptor.notifyTopicSubscribed(subscription, username);
@@ -445,7 +461,7 @@ private static Optional<SubscriptionIdentifier> verifyAndExtractMessageIdentifie
445461
}
446462
}
447463

448-
private void publishRetainedMessagesForSubscriptions(String clientID, List<Subscription> newSubscriptions) {
464+
private void publishRetainedMessagesForSubscriptions(String clientID, Collection<Subscription> newSubscriptions) {
449465
Session targetSession = this.sessionRegistry.retrieve(clientID);
450466
for (Subscription subscription : newSubscriptions) {
451467
final String topicFilter = subscription.getTopicFilter().toString();

broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,8 @@ interface IVisitor<T> {
171171
private static final INode NO_PARENT = null;
172172

173173
private enum Action {
174-
OK, REPEAT
174+
OK, REPEAT,
175+
OK_NEW // used to indicate that the action was successful and the subscription created a new branch
175176
}
176177

177178
INode root;
@@ -270,11 +271,15 @@ private List<Subscription> recursiveMatch(Topic topicName, INode inode, int dept
270271
return subscriptions;
271272
}
272273

273-
public void addToTree(SubscriptionRequest request) {
274+
/**
275+
* @return true if the subscription didn't exist.
276+
* */
277+
public boolean addToTree(SubscriptionRequest request) {
274278
Action res;
275279
do {
276280
res = insert(request.getTopicFilter(), this.root, request);
277281
} while (res == Action.REPEAT);
282+
return res == Action.OK_NEW;
278283
}
279284

280285
private Action insert(Topic topic, final INode inode, SubscriptionRequest request) {
@@ -315,7 +320,7 @@ private Action createNodeAndInsertSubscription(Topic topic, INode inode, CNode c
315320
}
316321
updatedCnode.add(newInode);
317322

318-
return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
323+
return inode.compareAndSet(cnode, updatedCnode) ? Action.OK_NEW : Action.REPEAT;
319324
}
320325

321326
private INode createPathRec(Topic topic, SubscriptionRequest request) {

broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -107,20 +107,21 @@ private static List<Subscription> selectSubscriptionsWithHigherQoSForEachSession
107107
}
108108

109109
@Override
110-
public void add(String clientId, Topic filter, MqttSubscriptionOption option) {
110+
public boolean add(String clientId, Topic filter, MqttSubscriptionOption option) {
111111
SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, option);
112-
addNonSharedSubscriptionRequest(subRequest);
112+
return addNonSharedSubscriptionRequest(subRequest);
113113
}
114114

115115
@Override
116-
public void add(String clientId, Topic filter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId) {
116+
public boolean add(String clientId, Topic filter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId) {
117117
SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, option, subscriptionId);
118-
addNonSharedSubscriptionRequest(subRequest);
118+
return addNonSharedSubscriptionRequest(subRequest);
119119
}
120120

121-
private void addNonSharedSubscriptionRequest(SubscriptionRequest subRequest) {
122-
ctrie.addToTree(subRequest);
121+
private boolean addNonSharedSubscriptionRequest(SubscriptionRequest subRequest) {
122+
boolean notExistingSubscription = ctrie.addToTree(subRequest);
123123
subscriptionsRepository.addNewSubscription(subRequest.subscription());
124+
return notExistingSubscription;
124125
}
125126

126127
@Override

broker/src/main/java/io/moquette/broker/subscriptions/ISubscriptionsDirectory.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,9 @@
1616
package io.moquette.broker.subscriptions;
1717

1818
import io.moquette.broker.ISubscriptionsRepository;
19-
import io.netty.handler.codec.mqtt.MqttQoS;
2019
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
2120

2221
import java.util.List;
23-
import java.util.Set;
2422

2523
/**
2624
* Contains all topic filters that are used to match against topic names.
@@ -33,9 +31,9 @@ public interface ISubscriptionsDirectory {
3331

3432
List<Subscription> matchQosSharpening(Topic topic);
3533

36-
void add(String clientId, Topic filter, MqttSubscriptionOption option);
34+
boolean add(String clientId, Topic filter, MqttSubscriptionOption option);
3735

38-
void add(String clientId, Topic filter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId);
36+
boolean add(String clientId, Topic filter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId);
3937

4038
void addShared(String clientId, ShareName name, Topic topicFilter, MqttSubscriptionOption option);
4139

broker/src/test/java/io/moquette/broker/subscriptions/CTrieTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,11 @@ public void testLookup() {
9090
@Test
9191
public void testAddNewSubscriptionOnExistingNode() {
9292
final SubscriptionRequest existingSubscription = clientSubOnTopic("TempSensor1", "/temp");
93-
sut.addToTree(existingSubscription);
93+
assertTrue(sut.addToTree(existingSubscription), "First created subscription on topic filter MUST return true");
9494

9595
//Exercise
9696
final SubscriptionRequest newSubscription = clientSubOnTopic("TempSensor2", "/temp");
97-
sut.addToTree(newSubscription);
97+
assertFalse(sut.addToTree(newSubscription), "Not new created subscription on topic filter MUST return false");
9898

9999
//Verify
100100
final Optional<CNode> matchedNode = sut.lookup(asTopic("/temp"));

broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionOptionsTest.java

+84
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.hivemq.client.mqtt.datatypes.MqttQos;
2222
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
23+
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5RetainHandling;
2324
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode;
2425
import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
2526
import org.eclipse.paho.mqttv5.client.IMqttToken;
@@ -243,6 +244,89 @@ public void givenSubscriptionWithRetainAsPublishedUnsetThenRetainedFlagIsUnsetOn
243244
assertFalse(publishCollector.receivedMessage.isRetained());
244245
}
245246

247+
@Test
248+
public void givenFirstSubscriptionWithRetainPolicyToSendAtSubscribeIfNotYetExistsAndARetainedMessagedExistsThenPublishIsReceived() throws Exception {
249+
Mqtt5BlockingClient publisher = createPublisherClient();
250+
//publish a retained message
251+
publisher.publishWith()
252+
.topic("metric/temperature/living")
253+
.payload("18".getBytes(StandardCharsets.UTF_8))
254+
.retain(true)
255+
.qos(MqttQos.AT_LEAST_ONCE)
256+
.send();
257+
258+
// receive retained only if new subscription
259+
PublishCollector publishCollector = new PublishCollector();
260+
createClientWithRetainPolicy(publishCollector, Mqtt5RetainHandling.SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST.getCode());
261+
262+
// verify retain flag is respected
263+
publishCollector.assertReceivedMessageIn(1, TimeUnit.SECONDS);
264+
verifyTopicPayloadAndQoSAsExpected(publishCollector);
265+
}
266+
267+
@Test
268+
public void givenNonFirstSubscriptionWithRetainPolicyToSendAtSubscribeIfAlreadyExistsAndARetainedMessagedExistsThenPublishIsNotReceived() throws Exception {
269+
Mqtt5BlockingClient publisher = createPublisherClient();
270+
//publish a retained message
271+
publisher.publishWith()
272+
.topic("metric/temperature/living")
273+
.payload("18".getBytes(StandardCharsets.UTF_8))
274+
.retain(true)
275+
.qos(MqttQos.AT_LEAST_ONCE)
276+
.send();
277+
278+
// create first subscriber and subscribe to the topic
279+
final PublishCollector unusedCollector = new PublishCollector();
280+
createSubscriberClient(unusedCollector, "firstSubscriber");
281+
282+
// create second subscriber to same topic with RetainPolicy to SendAtSubscribeIfAlreadyExists
283+
PublishCollector publishCollector = new PublishCollector();
284+
createClientWithRetainPolicy(publishCollector, Mqtt5RetainHandling.SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST.getCode());
285+
286+
// verify no retained message is received
287+
publishCollector.assertNotReceivedMessageIn(2, TimeUnit.SECONDS);
288+
}
289+
290+
@Test
291+
public void givenSubscriptionWithRetainPolicyToDoNotSendAndARetainedMessagedExistsThenPublishIsNotReceived() throws Exception {
292+
Mqtt5BlockingClient publisher = createPublisherClient();
293+
//publish a retained message
294+
publisher.publishWith()
295+
.topic("metric/temperature/living")
296+
.payload("18".getBytes(StandardCharsets.UTF_8))
297+
.retain(true)
298+
.qos(MqttQos.AT_LEAST_ONCE)
299+
.send();
300+
301+
// subscriber subscribe to same topic matching the retained but with DO_NOT_SEND policy
302+
PublishCollector publishCollector = new PublishCollector();
303+
createClientWithRetainPolicy(publishCollector, Mqtt5RetainHandling.DO_NOT_SEND.getCode());
304+
305+
// verify no retained message is received
306+
publishCollector.assertNotReceivedMessageIn(1, TimeUnit.SECONDS);
307+
}
308+
309+
private static void createSubscriberClient(PublishCollector publishCollector, String clientId) throws MqttException {
310+
MqttClient subscriber = new MqttClient("tcp://localhost:1883", clientId, new MemoryPersistence());
311+
subscriber.connect();
312+
MqttSubscription subscription = new MqttSubscription("metric/temperature/living", MqttQos.AT_LEAST_ONCE.getCode());
313+
314+
IMqttToken subscribeToken = subscriber.subscribe(new MqttSubscription[]{subscription},
315+
new IMqttMessageListener[] {publishCollector});
316+
verifySubscribedSuccessfully(subscribeToken);
317+
}
318+
319+
private static void createClientWithRetainPolicy(PublishCollector publishCollector, int retainPolicy) throws MqttException {
320+
MqttClient subscriber = new MqttClient("tcp://localhost:1883", "subscriber", new MemoryPersistence());
321+
subscriber.connect();
322+
MqttSubscription subscription = new MqttSubscription("metric/temperature/living", MqttQos.AT_LEAST_ONCE.getCode());
323+
subscription.setRetainHandling(retainPolicy);
324+
325+
IMqttToken subscribeToken = subscriber.subscribe(new MqttSubscription[]{subscription},
326+
new IMqttMessageListener[] {publishCollector});
327+
verifySubscribedSuccessfully(subscribeToken);
328+
}
329+
246330
private static MqttClient createSubscriberClientWithRetainAsPublished(PublishCollector publishCollector, String topic) throws MqttException {
247331
return createSubscriberClient(publishCollector, topic, true);
248332
}

0 commit comments

Comments
 (0)