From dd1600db0e1b115172fc044cb01b92d01c71682a Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Mon, 1 Apr 2024 15:54:15 +0200 Subject: [PATCH] Update message expiry remaining time and drop queued messages if expired (#823) Updates the message expiry property, during forwarding of a publish, with the remaining seconds before expire. This is done after extract a message from the Session's queue and before composing the publish message to forward. When a message is straight forwarded without being enqueue, the message expiry property is equally forwarded. --- ChangeLog.txt | 1 + .../io/moquette/broker/MQTTConnection.java | 4 +- .../broker/MoquetteIdleTimeoutHandler.java | 2 +- .../main/java/io/moquette/broker/Session.java | 27 ++-- .../io/moquette/broker/SessionRegistry.java | 62 +++++++++ .../persistence/SegmentPersistentQueue.java | 10 +- .../SegmentedPersistentQueueSerDes.java | 28 +++- .../mqtt5/AbstractServerIntegrationTest.java | 5 + .../mqtt5/MessageExpirationTest.java | 121 ++++++++++++++++-- .../mqtt5/SubscriptionOptionsTest.java | 27 ++-- .../moquette/integration/mqtt5/TestUtils.java | 9 ++ .../SegmentPersistentQueueTest.java | 9 +- .../java/io/moquette/testclient/Client.java | 7 +- 13 files changed, 268 insertions(+), 44 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index 6e46b563c..1d79f9294 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -3,6 +3,7 @@ Version 0.18-SNAPSHOT: [feature] message expiry interval: (issue #818) - Implements the management of message expiry for retained part. (#819) - Avoid to publish messages that has elapsed its expire property. (#822) + - Update the message expiry property remaining seconds when a publish is forwarded. (#823) [feature] subscription option handling: (issue #808) - Move from qos to subscription option implementing the persistence of SubscriptionOption to/from storage. (#810) - Exposed the maximum granted QoS by the server with the config setting 'max_server_granted_qos'. (#811) diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index 1f23a5b64..3036f5a6b 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -625,7 +625,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { return PostOffice.RouteResult.failed(clientId); } - final Instant expiry = extractExpiryFromPropery(msg); + final Instant expiry = extractExpiryFromProperty(msg); // retain else msg is cleaned by the NewNettyMQTTHandler and is not available // in execution by SessionEventLoop @@ -672,7 +672,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { } } - private Instant extractExpiryFromPropery(MqttPublishMessage msg) { + private Instant extractExpiryFromProperty(MqttPublishMessage msg) { MqttProperties.MqttProperty expiryProp = msg.variableHeader() .properties() .getProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value()); diff --git a/broker/src/main/java/io/moquette/broker/MoquetteIdleTimeoutHandler.java b/broker/src/main/java/io/moquette/broker/MoquetteIdleTimeoutHandler.java index 0677aff03..cff9ea676 100644 --- a/broker/src/main/java/io/moquette/broker/MoquetteIdleTimeoutHandler.java +++ b/broker/src/main/java/io/moquette/broker/MoquetteIdleTimeoutHandler.java @@ -36,7 +36,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc if (evt instanceof IdleStateEvent) { IdleState e = ((IdleStateEvent) evt).state(); if (e == IdleState.READER_IDLE) { - LOG.info("Firing channel inactive event. MqttClientId = {}.", NettyUtils.clientID(ctx.channel())); + LOG.warn("Close channel because it's inactive, passed keep alive. MqttClientId = {}.", NettyUtils.clientID(ctx.channel())); // fire a close that then fire channelInactive to trigger publish of Will ctx.close().addListener(CLOSE_ON_FAILURE); } diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index c9312798e..364722486 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -29,14 +29,7 @@ import java.net.InetSocketAddress; import java.time.Instant; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; @@ -269,9 +262,11 @@ private void sendPublishQos0(PublishedMessage publishRequest) { LOG.debug("Sending publish at QoS0 already expired, drop it"); return; } + + MqttProperties.MqttProperty[] mqttProperties = publishRequest.updatePublicationExpiryIfPresentOrAdd(); MqttPublishMessage publishMsg = MQTTConnection.createPublishMessage(publishRequest.getTopic().toString(), publishRequest.getPublishingQos(), publishRequest.getPayload(), 0, - publishRequest.retained, false, publishRequest.mqttProperties); + publishRequest.retained, false, mqttProperties); mqttConnection.sendPublish(publishMsg); } @@ -281,7 +276,7 @@ private void sendPublishQos1(PublishedMessage publishRequest) { return; } if (publishRequest.isExpired()) { - LOG.debug("Sending publish at QoS1 already expired, drop it"); + LOG.debug("Sending publish at QoS1 already expired, expected to happen before {}, drop it", publishRequest.messageExpiry); return; } @@ -307,6 +302,8 @@ private void sendPublishInFlightWindowOrQueueing(MQTTConnection localMqttConnect inflightSlots.decrementAndGet(); int packetId = localMqttConnectionRef.nextPacketId(); + LOG.debug("Adding into inflight for session {} at QoS {}", getClientID(), publishRequest.getPublishingQos()); + EnqueuedMessage old = inflightWindow.put(packetId, publishRequest); // If there already was something, release it. if (old != null) { @@ -316,9 +313,11 @@ private void sendPublishInFlightWindowOrQueueing(MQTTConnection localMqttConnect if (resendInflightOnTimeout) { inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS)); } + + MqttProperties.MqttProperty[] mqttProperties = publishRequest.updatePublicationExpiryIfPresentOrAdd(); MqttPublishMessage publishMsg = MQTTConnection.createPublishMessage( publishRequest.topic.toString(), publishRequest.getPublishingQos(), - publishRequest.payload, packetId, publishRequest.retained, false, publishRequest.mqttProperties); + publishRequest.payload, packetId, publishRequest.retained, false, mqttProperties); localMqttConnectionRef.sendPublish(publishMsg); drainQueueToConnection(); @@ -352,6 +351,7 @@ void pubAckReceived(int ackPacketId) { removed.release(); inflightSlots.incrementAndGet(); + LOG.debug("Received PUBACK {} for session {}", ackPacketId, getClientID()); drainQueueToConnection(); } @@ -440,12 +440,15 @@ private void drainQueueToConnection() { if (resendInflightOnTimeout) { inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS)); } + + MqttProperties.MqttProperty[] mqttProperties = msgPub.updatePublicationExpiryIfPresentOrAdd(); + MqttPublishMessage publishMsg = MQTTConnection.createNotRetainedPublishMessage( msgPub.topic.toString(), msgPub.publishingQos, msgPub.payload, sendPacketId, - msgPub.mqttProperties); + mqttProperties); mqttConnection.sendPublish(publishMsg); // we fetched msg from a map, but the release is cancelled out by the above retain diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index cc25ab2ed..6c26641f2 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -112,9 +112,71 @@ public MqttProperties.MqttProperty[] getMqttProperties() { public boolean isExpired() { return messageExpiry != Instant.MAX && Instant.now().isAfter(messageExpiry); } + + public MqttProperties.MqttProperty[] updatePublicationExpiryIfPresentOrAdd() { + if (messageExpiry == Instant.MAX) { + return mqttProperties; + } + + Duration duration = Duration.between(Instant.now(), messageExpiry); + // do some math rounding so that 2.9999 seconds remains 3 seconds + long remainingSeconds = Math.round(duration.toMillis() / 1_000.0); + final int indexOfExpiry = findPublicationExpiryProperty(mqttProperties); + MqttProperties.IntegerProperty updatedProperty = new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value(), (int) remainingSeconds); + + // update existing property + if (indexOfExpiry != -1) { + mqttProperties[indexOfExpiry] = updatedProperty; + return mqttProperties; + } + + // insert a new property + MqttProperties.MqttProperty[] newProperties = Arrays.copyOf(mqttProperties, mqttProperties.length + 1); + newProperties[newProperties.length - 1] = updatedProperty; + return newProperties; + } + + /** + * Linear search of PUBLICATION_EXPIRY_INTERVAL. + * @param properties the array of properties. + * @return the index of matched property or -1. + * */ + private static int findPublicationExpiryProperty(MqttProperties.MqttProperty[] properties) { + for (int i = 0; i < properties.length; i++) { + if (isPublicationExpiryProperty(properties[i])) { + return i; + } + } + return -1; + } + + private static boolean isPublicationExpiryProperty(MqttProperties.MqttProperty property) { + return property instanceof MqttProperties.IntegerProperty && + property.propertyId() == MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value(); + } + + public Instant getMessageExpiry() { + return messageExpiry; + } + + @Override + public String toString() { + return "PublishedMessage{" + + "topic=" + topic + + ", publishingQos=" + publishingQos + + ", payload=" + payload + + ", retained=" + retained + + ", messageExpiry=" + messageExpiry + + ", mqttProperties=" + Arrays.toString(mqttProperties) + + '}'; + } } public static final class PubRelMarker extends EnqueuedMessage { + @Override + public String toString() { + return "PubRelMarker{}"; + } } public enum CreationModeEnum { diff --git a/broker/src/main/java/io/moquette/persistence/SegmentPersistentQueue.java b/broker/src/main/java/io/moquette/persistence/SegmentPersistentQueue.java index f735fcfe1..f6cc2b0a9 100644 --- a/broker/src/main/java/io/moquette/persistence/SegmentPersistentQueue.java +++ b/broker/src/main/java/io/moquette/persistence/SegmentPersistentQueue.java @@ -4,12 +4,16 @@ import io.moquette.broker.SessionRegistry; import io.moquette.broker.unsafequeues.Queue; import io.moquette.broker.unsafequeues.QueueException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.Optional; public class SegmentPersistentQueue extends AbstractSessionMessageQueue { + private static final Logger LOG = LoggerFactory.getLogger(SegmentPersistentQueue.class); + private final Queue segmentedQueue; private final SegmentedPersistentQueueSerDes serdes = new SegmentedPersistentQueueSerDes(); @@ -19,6 +23,7 @@ public SegmentPersistentQueue(Queue segmentedQueue) { @Override public void enqueue(SessionRegistry.EnqueuedMessage message) { + LOG.debug("Adding message {}", message); checkEnqueuePreconditions(message); final ByteBuffer payload = serdes.toBytes(message); @@ -40,11 +45,14 @@ public SessionRegistry.EnqueuedMessage dequeue() { throw new RuntimeException(e); } if (!dequeue.isPresent()) { + LOG.debug("No data pulled out from the queue"); return null; } final ByteBuffer content = dequeue.get(); - return serdes.fromBytes(content); + SessionRegistry.EnqueuedMessage message = serdes.fromBytes(content); + LOG.debug("Retrieved message {}", message); + return message; } @Override diff --git a/broker/src/main/java/io/moquette/persistence/SegmentedPersistentQueueSerDes.java b/broker/src/main/java/io/moquette/persistence/SegmentedPersistentQueueSerDes.java index 8d2409343..f40df5015 100644 --- a/broker/src/main/java/io/moquette/persistence/SegmentedPersistentQueueSerDes.java +++ b/broker/src/main/java/io/moquette/persistence/SegmentedPersistentQueueSerDes.java @@ -35,6 +35,7 @@ private void write(SessionRegistry.EnqueuedMessage obj, ByteBuffer buff) { final String topic = casted.getTopic().toString(); writeTopic(buff, topic); + writeMessageExpiry(buff, casted.getMessageExpiry()); writePayload(buff, casted.getPayload()); if (EnqueuedMessageValueType.hasProperties(casted)) { buff.put((byte) 1); // there are properties @@ -49,6 +50,10 @@ private void write(SessionRegistry.EnqueuedMessage obj, ByteBuffer buff) { } } + private void writeMessageExpiry(ByteBuffer buff, Instant messageExpiry) { + writeString(buff, messageExpiry.toString()); + } + private void writePayload(ByteBuffer target, ByteBuf source) { final int payloadSize = source.readableBytes(); byte[] rawBytes = new byte[payloadSize]; @@ -112,6 +117,7 @@ private int getMemory(SessionRegistry.EnqueuedMessage obj) { return 1 + // message type 1 + // qos topicMemorySize(casted.getTopic()) + + messageExpirySize(casted.getMessageExpiry()) + payloadMemorySize(casted.getPayload()) + 1 + // flag to indicate if there are MQttProperties or not propertiesSize; @@ -127,6 +133,11 @@ private int topicMemorySize(Topic topic) { topic.toString().getBytes(StandardCharsets.UTF_8).length; } + private int messageExpirySize(Instant messageExpiry) { + return 4 + // size + messageExpiry.toString().getBytes(StandardCharsets.UTF_8).length; + } + private int propertiesMemorySize(MqttProperties.MqttProperty[] properties) { return 4 + // integer containing the number of properties Arrays.stream(properties).mapToInt(SegmentedPersistentQueueSerDes::propertyMemorySize).sum(); @@ -164,12 +175,13 @@ public SessionRegistry.EnqueuedMessage fromBytes(ByteBuffer buff) { } else if (messageType == MessageType.PUBLISHED_MESSAGE.ordinal()) { final MqttQoS qos = MqttQoS.valueOf(buff.get()); final String topicStr = readTopic(buff); + final Instant messageExpiry = readExpiry(buff); final ByteBuf payload = readPayload(buff); if (SerdesUtils.containsProperties(buff)) { MqttProperties.MqttProperty[] mqttProperties = readProperties(buff); - return new SessionRegistry.PublishedMessage(Topic.asTopic(topicStr), qos, payload, false, Instant.MAX, mqttProperties); + return new SessionRegistry.PublishedMessage(Topic.asTopic(topicStr), qos, payload, false, messageExpiry, mqttProperties); } else { - return new SessionRegistry.PublishedMessage(Topic.asTopic(topicStr), qos, payload, false, Instant.MAX); + return new SessionRegistry.PublishedMessage(Topic.asTopic(topicStr), qos, payload, false, messageExpiry); } } else { throw new IllegalArgumentException("Can't recognize record of type: " + messageType); @@ -185,12 +197,24 @@ private MqttProperties.MqttProperty readProperty(ByteBuffer buff) { } private String readTopic(ByteBuffer buff) { + return readString(buff); + } + + private static String readString(ByteBuffer buff) { final int stringLen = buff.getInt(); final byte[] rawString = new byte[stringLen]; buff.get(rawString); return new String(rawString, StandardCharsets.UTF_8); } + private Instant readExpiry(ByteBuffer buff) { + final String expiryText = readString(buff); + if (Instant.MAX.toString().equals(expiryText)) { + return Instant.MAX; + } + return Instant.parse(expiryText); + } + private ByteBuf readPayload(ByteBuffer buff) { return Unpooled.wrappedBuffer(readByteArray(buff)); } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java index 2dbae9889..b0aa8af29 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java @@ -139,4 +139,9 @@ void connectLowLevel() { MqttConnAckMessage connAck = lowLevelClient.connectV5(); assertConnectionAccepted(connAck, "Connection must be accepted"); } + + void connectLowLevel(int keepAliveSecs) { + MqttConnAckMessage connAck = lowLevelClient.connectV5(keepAliveSecs); + assertConnectionAccepted(connAck, "Connection must be accepted"); + } } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java index 1e6d70337..ed11e17e8 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/MessageExpirationTest.java @@ -20,8 +20,23 @@ import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; -import io.netty.handler.codec.mqtt.*; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishBuilder; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttPubAckMessage; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttQoS; import org.awaitility.Awaitility; +import org.eclipse.paho.mqttv5.client.IMqttMessageListener; +import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttSubscription; import org.junit.jupiter.api.Test; import java.nio.charset.StandardCharsets; @@ -100,9 +115,41 @@ public void givenPublishWithRetainedAndMessageExpiryWhenTimeIsNotExpiredAndSubsc assertTrue(messageExpiryProperty.value() < messageExpiryInterval, "Forwarded message expiry should be lowered"); } + @Test + public void givenPublishMessageWithExpiryWhenForwarderToSubscriberStillContainsTheMessageExpiryHeader() throws MqttException { + // Use Paho client to avoid default subscriptionIdentifier (1) set by default by HiveMQ client. + MqttClient client = new MqttClient("tcp://localhost:1883", "subscriber", new MemoryPersistence()); + client.connect(); + MqttSubscription subscription = new MqttSubscription("temperature/living", 1); + SubscriptionOptionsTest.PublishCollector publishCollector = new SubscriptionOptionsTest.PublishCollector(); + IMqttToken subscribeToken = client.subscribe(new MqttSubscription[]{subscription}, + new IMqttMessageListener[] {publishCollector}); + TestUtils.verifySubscribedSuccessfully(subscribeToken); + + // publish a message on same topic the client subscribed + Mqtt5BlockingClient publisher = createPublisherClient(); + long messageExpiryInterval = 3; + publisher.publishWith() + .topic("temperature/living") + .payload("18".getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE) // Broker retains only QoS1 and QoS2 + .messageExpiryInterval(messageExpiryInterval) // 3 seconds + .send(); + + // Verify the message is also reflected back to the sender + publishCollector.assertReceivedMessageIn(2, TimeUnit.SECONDS); + assertEquals("temperature/living", publishCollector.receivedTopic()); + assertEquals("18", publishCollector.receivedPayload(), "Payload published on topic should match"); + org.eclipse.paho.mqttv5.common.MqttMessage receivedMessage = publishCollector.receivedMessage(); + assertEquals(MqttQos.AT_LEAST_ONCE.getCode(), receivedMessage.getQos()); + assertEquals(messageExpiryInterval, receivedMessage.getProperties().getMessageExpiryInterval()); + } + @Test public void givenPublishedMessageWithExpiryWhenMessageRemainInBrokerForMoreThanTheExipiryIsNotPublished() throws InterruptedException { - connectLowLevel(); + int messageExpiryInterval = 2; // seconds + // avoid the keep alive period could disconnect + connectLowLevel(messageExpiryInterval * 2); // subscribe with an identifier MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living", @@ -112,7 +159,6 @@ public void givenPublishedMessageWithExpiryWhenMessageRemainInBrokerForMoreThanT //lowlevel client doesn't ACK any pub, so the in flight window fills up Mqtt5BlockingClient publisher = createPublisherClient(); int inflightWindowSize = 10; - int messageExpiryInterval = 2; // seconds // fill the in flight window so that messages starts to be enqueued fillInFlightWindow(inflightWindowSize, publisher, messageExpiryInterval); @@ -128,16 +174,65 @@ public void givenPublishedMessageWithExpiryWhenMessageRemainInBrokerForMoreThanT Thread.sleep(Duration.ofSeconds(messageExpiryInterval + 1).toMillis()); // now subscriber consumes messages, shouldn't receive any message in the form "Enqueued-" - consumesPublishesInflifhtWindow(inflightWindowSize); + consumesPublishesInflightWindow(inflightWindowSize); MqttMessage mqttMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(100)); assertNull(mqttMessage, "No other messages MUST be received after consuming the in flight window"); } - private void consumesPublishesInflifhtWindow(int inflightWindowSize) throws InterruptedException { + @Test + public void givenPublishWithMessageExpiryPropertyWhenItsForwardedToSubscriberThenExpiryValueHasToBeDeducedByTheTimeSpentInBroker() throws InterruptedException { + int messageExpiryInterval = 10; // seconds + // avoid the keep alive period could disconnect + connectLowLevel((int)(messageExpiryInterval * 1.5)); + + // subscribe with an identifier + MqttMessage received = lowLevelClient.subscribeWithIdentifier("temperature/living", + MqttQoS.AT_LEAST_ONCE, 123); + verifyOfType(received, MqttMessageType.SUBACK); + + //lowlevel client doesn't ACK any pub, so the in flight window fills up + Mqtt5BlockingClient publisher = createPublisherClient(); + int inflightWindowSize = 10; + // fill the in flight window so that messages starts to be enqueued + fillInFlightWindow(inflightWindowSize, publisher, Integer.MIN_VALUE); + + // send another message, which is enqueued and has an expiry of messageExpiryInterval seconds + publisher.publishWith() + .topic("temperature/living") + .payload(("Enqueued").getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE) // Broker enqueues only QoS1 and QoS2 + .messageExpiryInterval(messageExpiryInterval) + .send(); + + // let time flow so that the message in queue passes its expiry time + long sleepMillis = Duration.ofSeconds(messageExpiryInterval / 2).toMillis(); + Thread.sleep(sleepMillis); + + // now subscriber consumes messages, shouldn't receive any message in the form "Enqueued-" + consumesPublishesInflightWindow(inflightWindowSize); + MqttMessage mqttMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(1000)); + + assertNotNull(mqttMessage, "A publish out of the queue has to be received"); + assertEquals(MqttMessageType.PUBLISH, mqttMessage.fixedHeader().messageType(), "Expected a publish message"); + MqttPublishMessage publishMessage = (MqttPublishMessage) mqttMessage; + + // extract message expiry property + MqttProperties.MqttProperty expiryProp = publishMessage.variableHeader() + .properties() + .getProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value()); + assertNotNull(expiryProp, "Publication expiry property can't be null"); + Integer expirySeconds = ((MqttProperties.IntegerProperty) expiryProp).value(); + + assertTrue(expirySeconds < messageExpiryInterval, "Publish's expiry has to be updated"); + } + + private void consumesPublishesInflightWindow(int inflightWindowSize) throws InterruptedException { for (int i = 0; i < inflightWindowSize; i++) { - MqttMessage mqttMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(200)); - assertTrue(mqttMessage instanceof MqttPublishMessage); + MqttMessage mqttMessage = lowLevelClient.receiveNextMessage(Duration.ofMillis(20000)); + assertNotNull(mqttMessage, "A message MUST be received"); + + assertEquals(MqttMessageType.PUBLISH, mqttMessage.fixedHeader().messageType(), "Message received should MqttPublishMessage"); MqttPublishMessage publish = (MqttPublishMessage) mqttMessage; assertEquals(Integer.toString(i), publish.payload().toString(StandardCharsets.UTF_8)); int packetId = publish.variableHeader().packetId(); @@ -151,12 +246,16 @@ private void consumesPublishesInflifhtWindow(int inflightWindowSize) throws Inte private static void fillInFlightWindow(int inflightWindowSize, Mqtt5BlockingClient publisher, int messageExpiryInterval) { for (int i = 0; i < inflightWindowSize; i++) { - publisher.publishWith() + Mqtt5PublishBuilder.Send.Complete builder = publisher.publishWith() .topic("temperature/living") .payload(Integer.toString(i).getBytes(StandardCharsets.UTF_8)) - .qos(MqttQos.AT_LEAST_ONCE) // Broker enqueues only QoS1 and QoS2 - .messageExpiryInterval(messageExpiryInterval) - .send(); + .qos(MqttQos.AT_LEAST_ONCE); + if (messageExpiryInterval != Integer.MIN_VALUE) { + builder // Broker enqueues only QoS1 and QoS2 + .messageExpiryInterval(messageExpiryInterval); + } + + builder.send(); } } } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionOptionsTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionOptionsTest.java index 63ea7ac44..cdca14864 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionOptionsTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/SubscriptionOptionsTest.java @@ -21,7 +21,6 @@ import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5RetainHandling; -import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode; import org.eclipse.paho.mqttv5.client.IMqttMessageListener; import org.eclipse.paho.mqttv5.client.IMqttToken; import org.eclipse.paho.mqttv5.client.MqttClient; @@ -64,6 +63,14 @@ public String receivedPayload() { return new String(receivedMessage.getPayload(), StandardCharsets.UTF_8); } + public String receivedTopic() { + return receivedTopic; + } + + public MqttMessage receivedMessage() { + return receivedMessage; + } + public void assertReceivedMessageIn(int time, TimeUnit unit) { try { assertTrue(latch.await(time, unit), "Publish is received"); @@ -95,7 +102,7 @@ public void givenSubscriptionWithNoLocalEnabledWhenTopicMatchPublishByItselfThen PublishCollector publishCollector = new PublishCollector(); IMqttToken subscribeToken = client.subscribe(new MqttSubscription[]{subscription}, new IMqttMessageListener[] {publishCollector}); - verifySubscribedSuccessfully(subscribeToken); + TestUtils.verifySubscribedSuccessfully(subscribeToken); // publish a message on same topic the client subscribed client.publish("/metering/temp", new MqttMessage("18".getBytes(StandardCharsets.UTF_8), 1, false, null)); @@ -113,7 +120,7 @@ public void givenSubscriptionWithNoLocalDisabledWhenTopicMatchPublishByItselfThe PublishCollector publishCollector = new PublishCollector(); IMqttToken subscribeToken = client.subscribe(new MqttSubscription[]{subscription}, new IMqttMessageListener[] {publishCollector}); - verifySubscribedSuccessfully(subscribeToken); + TestUtils.verifySubscribedSuccessfully(subscribeToken); // publish a message on same topic the client subscribed client.publish("/metering/temp", new MqttMessage("18".getBytes(StandardCharsets.UTF_8), 1, false, null)); @@ -125,12 +132,6 @@ public void givenSubscriptionWithNoLocalDisabledWhenTopicMatchPublishByItselfThe assertEquals(MqttQos.AT_LEAST_ONCE.getCode(), publishCollector.receivedMessage.getQos()); } - private static void verifySubscribedSuccessfully(IMqttToken subscribeToken) { - assertEquals(1, subscribeToken.getReasonCodes().length); - assertEquals(Mqtt5SubAckReasonCode.GRANTED_QOS_1.getCode(), subscribeToken.getReasonCodes()[0], - "Client is subscribed to the topic"); - } - @Test public void givenAnExistingRetainedMessageWhenClientSubscribeWithAnyRetainAsPublishedSubscriptionOptionThenPublishedMessageIsAlwaysFlaggedAsRetained() throws Exception { // publish a retained message, must be at qos => AT_LEAST_ONCE, @@ -160,7 +161,7 @@ private static void subscribeAndVerifyRetainedIsTrue(MqttClient subscriberWithRe PublishCollector publishCollector = new PublishCollector(); IMqttToken subscribeToken = subscriberWithRetain.subscribe(new MqttSubscription[]{subscription}, new IMqttMessageListener[] {publishCollector}); - verifySubscribedSuccessfully(subscribeToken); + TestUtils.verifySubscribedSuccessfully(subscribeToken); // Verify the message is also reflected back to the sender publishCollector.assertReceivedMessageIn(2, TimeUnit.SECONDS); @@ -313,7 +314,7 @@ private static void createSubscriberClient(PublishCollector publishCollector, St IMqttToken subscribeToken = subscriber.subscribe(new MqttSubscription[]{subscription}, new IMqttMessageListener[] {publishCollector}); - verifySubscribedSuccessfully(subscribeToken); + TestUtils.verifySubscribedSuccessfully(subscribeToken); } private static void createClientWithRetainPolicy(PublishCollector publishCollector, int retainPolicy) throws MqttException { @@ -324,7 +325,7 @@ private static void createClientWithRetainPolicy(PublishCollector publishCollect IMqttToken subscribeToken = subscriber.subscribe(new MqttSubscription[]{subscription}, new IMqttMessageListener[] {publishCollector}); - verifySubscribedSuccessfully(subscribeToken); + TestUtils.verifySubscribedSuccessfully(subscribeToken); } private static MqttClient createSubscriberClientWithRetainAsPublished(PublishCollector publishCollector, String topic) throws MqttException { @@ -344,7 +345,7 @@ private static MqttClient createSubscriberClient(PublishCollector publishCollect IMqttToken subscribeToken = subscriber.subscribe(new MqttSubscription[]{subscription}, new IMqttMessageListener[] {publishCollector}); - verifySubscribedSuccessfully(subscribeToken); + TestUtils.verifySubscribedSuccessfully(subscribeToken); return subscriber; } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/TestUtils.java b/broker/src/test/java/io/moquette/integration/mqtt5/TestUtils.java index 6bf283c16..60a23bad3 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/TestUtils.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/TestUtils.java @@ -21,11 +21,14 @@ import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode; +import org.eclipse.paho.mqttv5.client.IMqttToken; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; public class TestUtils { @@ -39,4 +42,10 @@ static void verifyPublishedMessage(Mqtt5BlockingClient client, Consumer