Skip to content

Commit

Permalink
Update message expiry remaining time and drop queued messages if expi…
Browse files Browse the repository at this point in the history
…red (#823)

Updates the message expiry property, during forwarding of a publish, with the remaining seconds before expire.
This is done after extract a message from the Session's queue and before composing the publish message to forward.
When a message is straight forwarded without being enqueue, the message expiry property is equally forwarded.
  • Loading branch information
andsel authored Apr 1, 2024
1 parent 495514a commit dd1600d
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 44 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Version 0.18-SNAPSHOT:
[feature] message expiry interval: (issue #818)
- Implements the management of message expiry for retained part. (#819)
- Avoid to publish messages that has elapsed its expire property. (#822)
- Update the message expiry property remaining seconds when a publish is forwarded. (#823)
[feature] subscription option handling: (issue #808)
- Move from qos to subscription option implementing the persistence of SubscriptionOption to/from storage. (#810)
- Exposed the maximum granted QoS by the server with the config setting 'max_server_granted_qos'. (#811)
Expand Down
4 changes: 2 additions & 2 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
return PostOffice.RouteResult.failed(clientId);
}

final Instant expiry = extractExpiryFromPropery(msg);
final Instant expiry = extractExpiryFromProperty(msg);

// retain else msg is cleaned by the NewNettyMQTTHandler and is not available
// in execution by SessionEventLoop
Expand Down Expand Up @@ -672,7 +672,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
}
}

private Instant extractExpiryFromPropery(MqttPublishMessage msg) {
private Instant extractExpiryFromProperty(MqttPublishMessage msg) {
MqttProperties.MqttProperty expiryProp = msg.variableHeader()
.properties()
.getProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (evt instanceof IdleStateEvent) {
IdleState e = ((IdleStateEvent) evt).state();
if (e == IdleState.READER_IDLE) {
LOG.info("Firing channel inactive event. MqttClientId = {}.", NettyUtils.clientID(ctx.channel()));
LOG.warn("Close channel because it's inactive, passed keep alive. MqttClientId = {}.", NettyUtils.clientID(ctx.channel()));
// fire a close that then fire channelInactive to trigger publish of Will
ctx.close().addListener(CLOSE_ON_FAILURE);
}
Expand Down
27 changes: 15 additions & 12 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,7 @@

import java.net.InetSocketAddress;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -269,9 +262,11 @@ private void sendPublishQos0(PublishedMessage publishRequest) {
LOG.debug("Sending publish at QoS0 already expired, drop it");
return;
}

MqttProperties.MqttProperty[] mqttProperties = publishRequest.updatePublicationExpiryIfPresentOrAdd();
MqttPublishMessage publishMsg = MQTTConnection.createPublishMessage(publishRequest.getTopic().toString(),
publishRequest.getPublishingQos(), publishRequest.getPayload(), 0,
publishRequest.retained, false, publishRequest.mqttProperties);
publishRequest.retained, false, mqttProperties);
mqttConnection.sendPublish(publishMsg);
}

Expand All @@ -281,7 +276,7 @@ private void sendPublishQos1(PublishedMessage publishRequest) {
return;
}
if (publishRequest.isExpired()) {
LOG.debug("Sending publish at QoS1 already expired, drop it");
LOG.debug("Sending publish at QoS1 already expired, expected to happen before {}, drop it", publishRequest.messageExpiry);
return;
}

Expand All @@ -307,6 +302,8 @@ private void sendPublishInFlightWindowOrQueueing(MQTTConnection localMqttConnect
inflightSlots.decrementAndGet();
int packetId = localMqttConnectionRef.nextPacketId();

LOG.debug("Adding into inflight for session {} at QoS {}", getClientID(), publishRequest.getPublishingQos());

EnqueuedMessage old = inflightWindow.put(packetId, publishRequest);
// If there already was something, release it.
if (old != null) {
Expand All @@ -316,9 +313,11 @@ private void sendPublishInFlightWindowOrQueueing(MQTTConnection localMqttConnect
if (resendInflightOnTimeout) {
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));
}

MqttProperties.MqttProperty[] mqttProperties = publishRequest.updatePublicationExpiryIfPresentOrAdd();
MqttPublishMessage publishMsg = MQTTConnection.createPublishMessage(
publishRequest.topic.toString(), publishRequest.getPublishingQos(),
publishRequest.payload, packetId, publishRequest.retained, false, publishRequest.mqttProperties);
publishRequest.payload, packetId, publishRequest.retained, false, mqttProperties);
localMqttConnectionRef.sendPublish(publishMsg);

drainQueueToConnection();
Expand Down Expand Up @@ -352,6 +351,7 @@ void pubAckReceived(int ackPacketId) {
removed.release();

inflightSlots.incrementAndGet();
LOG.debug("Received PUBACK {} for session {}", ackPacketId, getClientID());
drainQueueToConnection();
}

Expand Down Expand Up @@ -440,12 +440,15 @@ private void drainQueueToConnection() {
if (resendInflightOnTimeout) {
inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS));
}

MqttProperties.MqttProperty[] mqttProperties = msgPub.updatePublicationExpiryIfPresentOrAdd();

MqttPublishMessage publishMsg = MQTTConnection.createNotRetainedPublishMessage(
msgPub.topic.toString(),
msgPub.publishingQos,
msgPub.payload,
sendPacketId,
msgPub.mqttProperties);
mqttProperties);
mqttConnection.sendPublish(publishMsg);

// we fetched msg from a map, but the release is cancelled out by the above retain
Expand Down
62 changes: 62 additions & 0 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,71 @@ public MqttProperties.MqttProperty[] getMqttProperties() {
public boolean isExpired() {
return messageExpiry != Instant.MAX && Instant.now().isAfter(messageExpiry);
}

public MqttProperties.MqttProperty[] updatePublicationExpiryIfPresentOrAdd() {
if (messageExpiry == Instant.MAX) {
return mqttProperties;
}

Duration duration = Duration.between(Instant.now(), messageExpiry);
// do some math rounding so that 2.9999 seconds remains 3 seconds
long remainingSeconds = Math.round(duration.toMillis() / 1_000.0);
final int indexOfExpiry = findPublicationExpiryProperty(mqttProperties);
MqttProperties.IntegerProperty updatedProperty = new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value(), (int) remainingSeconds);

// update existing property
if (indexOfExpiry != -1) {
mqttProperties[indexOfExpiry] = updatedProperty;
return mqttProperties;
}

// insert a new property
MqttProperties.MqttProperty[] newProperties = Arrays.copyOf(mqttProperties, mqttProperties.length + 1);
newProperties[newProperties.length - 1] = updatedProperty;
return newProperties;
}

/**
* Linear search of PUBLICATION_EXPIRY_INTERVAL.
* @param properties the array of properties.
* @return the index of matched property or -1.
* */
private static int findPublicationExpiryProperty(MqttProperties.MqttProperty[] properties) {
for (int i = 0; i < properties.length; i++) {
if (isPublicationExpiryProperty(properties[i])) {
return i;
}
}
return -1;
}

private static boolean isPublicationExpiryProperty(MqttProperties.MqttProperty property) {
return property instanceof MqttProperties.IntegerProperty &&
property.propertyId() == MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value();
}

public Instant getMessageExpiry() {
return messageExpiry;
}

@Override
public String toString() {
return "PublishedMessage{" +
"topic=" + topic +
", publishingQos=" + publishingQos +
", payload=" + payload +
", retained=" + retained +
", messageExpiry=" + messageExpiry +
", mqttProperties=" + Arrays.toString(mqttProperties) +
'}';
}
}

public static final class PubRelMarker extends EnqueuedMessage {
@Override
public String toString() {
return "PubRelMarker{}";
}
}

public enum CreationModeEnum {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
import io.moquette.broker.SessionRegistry;
import io.moquette.broker.unsafequeues.Queue;
import io.moquette.broker.unsafequeues.QueueException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.Optional;

public class SegmentPersistentQueue extends AbstractSessionMessageQueue<SessionRegistry.EnqueuedMessage> {

private static final Logger LOG = LoggerFactory.getLogger(SegmentPersistentQueue.class);

private final Queue segmentedQueue;
private final SegmentedPersistentQueueSerDes serdes = new SegmentedPersistentQueueSerDes();

Expand All @@ -19,6 +23,7 @@ public SegmentPersistentQueue(Queue segmentedQueue) {

@Override
public void enqueue(SessionRegistry.EnqueuedMessage message) {
LOG.debug("Adding message {}", message);
checkEnqueuePreconditions(message);

final ByteBuffer payload = serdes.toBytes(message);
Expand All @@ -40,11 +45,14 @@ public SessionRegistry.EnqueuedMessage dequeue() {
throw new RuntimeException(e);
}
if (!dequeue.isPresent()) {
LOG.debug("No data pulled out from the queue");
return null;
}

final ByteBuffer content = dequeue.get();
return serdes.fromBytes(content);
SessionRegistry.EnqueuedMessage message = serdes.fromBytes(content);
LOG.debug("Retrieved message {}", message);
return message;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private void write(SessionRegistry.EnqueuedMessage obj, ByteBuffer buff) {
final String topic = casted.getTopic().toString();

writeTopic(buff, topic);
writeMessageExpiry(buff, casted.getMessageExpiry());
writePayload(buff, casted.getPayload());
if (EnqueuedMessageValueType.hasProperties(casted)) {
buff.put((byte) 1); // there are properties
Expand All @@ -49,6 +50,10 @@ private void write(SessionRegistry.EnqueuedMessage obj, ByteBuffer buff) {
}
}

private void writeMessageExpiry(ByteBuffer buff, Instant messageExpiry) {
writeString(buff, messageExpiry.toString());
}

private void writePayload(ByteBuffer target, ByteBuf source) {
final int payloadSize = source.readableBytes();
byte[] rawBytes = new byte[payloadSize];
Expand Down Expand Up @@ -112,6 +117,7 @@ private int getMemory(SessionRegistry.EnqueuedMessage obj) {
return 1 + // message type
1 + // qos
topicMemorySize(casted.getTopic()) +
messageExpirySize(casted.getMessageExpiry()) +
payloadMemorySize(casted.getPayload()) +
1 + // flag to indicate if there are MQttProperties or not
propertiesSize;
Expand All @@ -127,6 +133,11 @@ private int topicMemorySize(Topic topic) {
topic.toString().getBytes(StandardCharsets.UTF_8).length;
}

private int messageExpirySize(Instant messageExpiry) {
return 4 + // size
messageExpiry.toString().getBytes(StandardCharsets.UTF_8).length;
}

private int propertiesMemorySize(MqttProperties.MqttProperty[] properties) {
return 4 + // integer containing the number of properties
Arrays.stream(properties).mapToInt(SegmentedPersistentQueueSerDes::propertyMemorySize).sum();
Expand Down Expand Up @@ -164,12 +175,13 @@ public SessionRegistry.EnqueuedMessage fromBytes(ByteBuffer buff) {
} else if (messageType == MessageType.PUBLISHED_MESSAGE.ordinal()) {
final MqttQoS qos = MqttQoS.valueOf(buff.get());
final String topicStr = readTopic(buff);
final Instant messageExpiry = readExpiry(buff);
final ByteBuf payload = readPayload(buff);
if (SerdesUtils.containsProperties(buff)) {
MqttProperties.MqttProperty[] mqttProperties = readProperties(buff);
return new SessionRegistry.PublishedMessage(Topic.asTopic(topicStr), qos, payload, false, Instant.MAX, mqttProperties);
return new SessionRegistry.PublishedMessage(Topic.asTopic(topicStr), qos, payload, false, messageExpiry, mqttProperties);
} else {
return new SessionRegistry.PublishedMessage(Topic.asTopic(topicStr), qos, payload, false, Instant.MAX);
return new SessionRegistry.PublishedMessage(Topic.asTopic(topicStr), qos, payload, false, messageExpiry);
}
} else {
throw new IllegalArgumentException("Can't recognize record of type: " + messageType);
Expand All @@ -185,12 +197,24 @@ private MqttProperties.MqttProperty readProperty(ByteBuffer buff) {
}

private String readTopic(ByteBuffer buff) {
return readString(buff);
}

private static String readString(ByteBuffer buff) {
final int stringLen = buff.getInt();
final byte[] rawString = new byte[stringLen];
buff.get(rawString);
return new String(rawString, StandardCharsets.UTF_8);
}

private Instant readExpiry(ByteBuffer buff) {
final String expiryText = readString(buff);
if (Instant.MAX.toString().equals(expiryText)) {
return Instant.MAX;
}
return Instant.parse(expiryText);
}

private ByteBuf readPayload(ByteBuffer buff) {
return Unpooled.wrappedBuffer(readByteArray(buff));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,9 @@ void connectLowLevel() {
MqttConnAckMessage connAck = lowLevelClient.connectV5();
assertConnectionAccepted(connAck, "Connection must be accepted");
}

void connectLowLevel(int keepAliveSecs) {
MqttConnAckMessage connAck = lowLevelClient.connectV5(keepAliveSecs);
assertConnectionAccepted(connAck, "Connection must be accepted");
}
}
Loading

0 comments on commit dd1600d

Please sign in to comment.