Skip to content

Commit

Permalink
Message expiry for queued (#822)
Browse files Browse the repository at this point in the history
Avoid to publish messages that has elapsed its expire property.
- Reworked some Session methods arguments list to be wrapped inside the PublishedMessage
- Extracts the message expiry property from publish and move around down to the forwarding logic to matching subscriptions.
- Update the publish to subscription logic to drop messages that has elapsed their expiry.
- Adds integration test to prove the feature.
  • Loading branch information
andsel authored Mar 24, 2024
1 parent 6ffb106 commit 495514a
Show file tree
Hide file tree
Showing 16 changed files with 202 additions and 93 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ Version 0.18-SNAPSHOT:
[feature] Add Netty native trsansport support on MacOS. Bundle all the native transport module by default (#806)
[feature] message expiry interval: (issue #818)
- Implements the management of message expiry for retained part. (#819)
- Avoid to publish messages that has elapsed its expire property. (#822)
[feature] subscription option handling: (issue #808)
- Move from qos to subscription option implementing the persistence of SubscriptionOption to/from storage. (#810)
- Exposed the maximum granted QoS by the server with the config setting 'max_server_granted_qos'. (#811)
Expand Down
24 changes: 20 additions & 4 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.nio.charset.StandardCharsets;
import java.security.cert.Certificate;
import java.security.cert.CertificateEncodingException;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -624,6 +625,8 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
return PostOffice.RouteResult.failed(clientId);
}

final Instant expiry = extractExpiryFromPropery(msg);

// retain else msg is cleaned by the NewNettyMQTTHandler and is not available
// in execution by SessionEventLoop
msg.retain();
Expand All @@ -634,15 +637,15 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
if (!isBoundToSession()) {
return null;
}
postOffice.receivedPublishQos0(topic, username, clientId, msg);
postOffice.receivedPublishQos0(topic, username, clientId, msg, expiry);
return null;
}).ifFailed(msg::release);
case AT_LEAST_ONCE:
return postOffice.routeCommand(clientId, "PUB QoS1", () -> {
checkMatchSessionLoop(clientId);
if (!isBoundToSession())
return null;
postOffice.receivedPublishQos1(this, topic, username, messageID, msg);
postOffice.receivedPublishQos1(this, topic, username, messageID, msg, expiry);
return null;
}).ifFailed(msg::release);
case EXACTLY_ONCE: {
Expand All @@ -655,11 +658,11 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
});
if (!firstStepResult.isSuccess()) {
msg.release();
LOG.trace("Failed to enqueue PUB QoS2 to session loop for {}", clientId);
LOG.trace("Failed to enqueue PUB QoS2 to session loop for {}", clientId);
return firstStepResult;
}
firstStepResult.completableFuture().thenRun(() ->
postOffice.receivedPublishQos2(this, msg, username).completableFuture()
postOffice.receivedPublishQos2(this, msg, username, expiry).completableFuture()
);
return firstStepResult;
}
Expand All @@ -669,6 +672,19 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
}
}

private Instant extractExpiryFromPropery(MqttPublishMessage msg) {
MqttProperties.MqttProperty expiryProp = msg.variableHeader()
.properties()
.getProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value());
if (expiryProp == null) {
// publish message doesn't contain the expiry property, leave it as it is.
return Instant.MAX;
}
Integer expirySeconds = ((MqttProperties.IntegerProperty) expiryProp).value();

return Instant.now().plusSeconds(expirySeconds);
}

void sendPubRec(int messageID) {
LOG.trace("sendPubRec invoked, messageID: {}", messageID);
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE,
Expand Down
41 changes: 23 additions & 18 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private void trackWillSpecificationForFutureFire(Session bindedSession, ISession
}

private void publishWill(ISessionsRepository.Will will) {
publish2Subscribers(WILL_PUBLISHER, Unpooled.copiedBuffer(will.payload), new Topic(will.topic), will.qos, will.retained);
publish2Subscribers(WILL_PUBLISHER, Unpooled.copiedBuffer(will.payload), new Topic(will.topic), will.qos, will.retained, Instant.MAX);
}

/**
Expand Down Expand Up @@ -602,13 +602,14 @@ public void unsubscribe(List<String> topics, MQTTConnection mqttConnection, int
mqttConnection.sendUnsubAckMessage(topics, clientID, messageId);
}

CompletableFuture<Void> receivedPublishQos0(Topic topic, String username, String clientID, MqttPublishMessage msg) {
CompletableFuture<Void> receivedPublishQos0(Topic topic, String username, String clientID, MqttPublishMessage msg,
Instant messageExpiry) {
if (!authorizator.canWrite(topic, username, clientID)) {
LOG.error("client is not authorized to publish on topic: {}", topic);
ReferenceCountUtil.release(msg);
return CompletableFuture.completedFuture(null);
}
final RoutingResults publishResult = publish2Subscribers(clientID, msg.payload(), topic, AT_MOST_ONCE, msg.fixedHeader().isRetain());
final RoutingResults publishResult = publish2Subscribers(clientID, msg.payload(), topic, AT_MOST_ONCE, msg.fixedHeader().isRetain(), Instant.MAX);
if (publishResult.isAllFailed()) {
LOG.info("No one publish was successfully enqueued to session loops");
ReferenceCountUtil.release(msg);
Expand All @@ -627,7 +628,7 @@ CompletableFuture<Void> receivedPublishQos0(Topic topic, String username, String
}

RoutingResults receivedPublishQos1(MQTTConnection connection, Topic topic, String username, int messageID,
MqttPublishMessage msg) {
MqttPublishMessage msg, Instant messageExpiry) {
// verify if topic can be written
topic.getTokens();
if (!topic.isValid()) {
Expand All @@ -648,9 +649,9 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, Topic topic, Strin
final RoutingResults routes;
if (msg.fixedHeader().isDup()) {
final Set<String> failedClients = failedPublishes.listFailed(clientId, messageID);
routes = publish2Subscribers(clientId, payload, topic, AT_LEAST_ONCE, failedClients, retainPublish);
routes = publish2Subscribers(clientId, payload, topic, AT_LEAST_ONCE, failedClients, retainPublish, messageExpiry);
} else {
routes = publish2Subscribers(clientId, payload, topic, AT_LEAST_ONCE, retainPublish);
routes = publish2Subscribers(clientId, payload, topic, AT_LEAST_ONCE, retainPublish, messageExpiry);
}
if (LOG.isTraceEnabled()) {
LOG.trace("subscriber routes: {}", routes);
Expand Down Expand Up @@ -705,8 +706,8 @@ private static int getIntProperty(MqttProperties props, MqttPropertyType prop) {
}

private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf payload, Topic topic,
MqttQoS publishingQos, boolean isPublishRetained) {
return publish2Subscribers(publisherClientId, payload, topic, publishingQos, NO_FILTER, isPublishRetained);
MqttQoS publishingQos, boolean isPublishRetained, Instant messageExpiry) {
return publish2Subscribers(publisherClientId, payload, topic, publishingQos, NO_FILTER, isPublishRetained, messageExpiry);
}

private class BatchingPublishesCollector {
Expand Down Expand Up @@ -773,7 +774,7 @@ public int countBatches() {
}

private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf payload, Topic topic, MqttQoS publishingQos,
Set<String> filterTargetClients, boolean retainPublish) {
Set<String> filterTargetClients, boolean retainPublish, Instant messageExpiry) {
List<Subscription> topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic);
if (topicMatchingSubscriptions.isEmpty()) {
// no matching subscriptions, clean exit
Expand Down Expand Up @@ -807,7 +808,7 @@ private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf pay
payload.retain(subscriptionCount);

List<RouteResult> publishResults = collector.routeBatchedPublishes((batch) -> {
publishToSession(payload, topic, batch, publishingQos, retainPublish);
publishToSession(payload, topic, batch, publishingQos, retainPublish, messageExpiry);
payload.release();
});

Expand All @@ -831,27 +832,30 @@ private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf pay
}

private void publishToSession(ByteBuf payload, Topic topic, Collection<Subscription> subscriptions,
MqttQoS publishingQos, boolean retainPublish) {
MqttQoS publishingQos, boolean retainPublish, Instant messageExpiry) {
ByteBuf duplicatedPayload = payload.duplicate();
for (Subscription sub : subscriptions) {
MqttQoS qos = lowerQosToTheSubscriptionDesired(sub, publishingQos);
boolean retained = false;
if (sub.option().isRetainAsPublished()) {
retained = retainPublish;
}
publishToSession(duplicatedPayload, topic, sub, qos, retained);
publishToSession(duplicatedPayload, topic, sub, qos, retained, messageExpiry);
}
}

private void publishToSession(ByteBuf payload, Topic topic, Subscription sub, MqttQoS qos, boolean retained) {
private void publishToSession(ByteBuf payload, Topic topic, Subscription sub, MqttQoS qos, boolean retained,
Instant messageExpiry) {
Session targetSession = this.sessionRegistry.retrieve(sub.getClientId());

boolean isSessionPresent = targetSession != null;
if (isSessionPresent) {
LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}",
sub.getClientId(), sub.getTopicFilter(), qos);
final MqttProperties.MqttProperty[] properties = prepareSubscriptionProperties(sub);
targetSession.sendPublishOnSessionAtQos(topic, qos, payload, retained, properties);
final SessionRegistry.PublishedMessage publishedMessage =
new SessionRegistry.PublishedMessage(topic, qos, payload, retained, messageExpiry, properties);
targetSession.sendPublishOnSessionAtQos(publishedMessage);
} else {
// If we are, the subscriber disconnected after the subscriptions tree selected that session as a
// destination.
Expand Down Expand Up @@ -881,7 +885,8 @@ private MqttProperties.IntegerProperty createSubscriptionIdProperty(Subscription
* subscribers.
* @return
*/
RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage msg, String username) {
RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage msg, String username,
Instant messageExpiry) {
LOG.trace("Processing PUB QoS2 message on connection: {}", connection);
final Topic topic = new Topic(msg.variableHeader().topicName());
final ByteBuf payload = msg.payload();
Expand All @@ -899,9 +904,9 @@ RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage
final RoutingResults publishRoutings;
if (msg.fixedHeader().isDup()) {
final Set<String> failedClients = failedPublishes.listFailed(clientId, messageID);
publishRoutings = publish2Subscribers(clientId, payload, topic, EXACTLY_ONCE, failedClients, retainPublish);
publishRoutings = publish2Subscribers(clientId, payload, topic, EXACTLY_ONCE, failedClients, retainPublish, messageExpiry);
} else {
publishRoutings = publish2Subscribers(clientId, payload, topic, EXACTLY_ONCE, retainPublish);
publishRoutings = publish2Subscribers(clientId, payload, topic, EXACTLY_ONCE, retainPublish, messageExpiry);
}
if (publishRoutings.isAllSuccess()) {
// QoS2 PUB message was enqueued successfully to every event loop
Expand Down Expand Up @@ -944,7 +949,7 @@ public RoutingResults internalPublish(MqttPublishMessage msg) {
LOG.info("Sending internal PUBLISH message Topic={}, qos={}", topic, qos);

boolean retainPublish = msg.fixedHeader().isRetain();
final RoutingResults publishResult = publish2Subscribers(INTERNAL_PUBLISHER, payload, topic, qos, retainPublish);
final RoutingResults publishResult = publish2Subscribers(INTERNAL_PUBLISHER, payload, topic, qos, retainPublish, Instant.MAX);
LOG.trace("after routed publishes: {}", publishResult);

if (!retainPublish) {
Expand Down
Loading

0 comments on commit 495514a

Please sign in to comment.