Skip to content

Commit

Permalink
Avoid decomposition of Publish fields into method params (moquette-io…
Browse files Browse the repository at this point in the history
…#827)

Updated all PostOffice methods that accepted QoS, topic, retained flag and payload, components of a publish message to move around just the publish message without decomposing it.
  • Loading branch information
andsel authored Apr 20, 2024
1 parent 9b9f286 commit ac2a9be
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 64 deletions.
3 changes: 2 additions & 1 deletion ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
Version 0.18-SNAPSHOT:
[feature] Add Netty native trsansport support on MacOS. Bundle all the native transport module by default (#806)
[refactoring] Refactory of PostOffice to pass publish message in hits entirety avoiding decomposition into single parameters. (#827)
[feature] Add Netty native transport support on MacOS. Bundle all the native transport module by default (#806)
[feature] message expiry interval: (issue #818)
- Implements the management of message expiry for retained part. (#819)
- Avoid to publish messages that has elapsed its expire property. (#822)
Expand Down
4 changes: 2 additions & 2 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -637,15 +637,15 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
if (!isBoundToSession()) {
return null;
}
postOffice.receivedPublishQos0(topic, username, clientId, msg, expiry);
postOffice.receivedPublishQos0(username, clientId, msg, expiry);
return null;
}).ifFailed(msg::release);
case AT_LEAST_ONCE:
return postOffice.routeCommand(clientId, "PUB QoS1", () -> {
checkMatchSessionLoop(clientId);
if (!isBoundToSession())
return null;
postOffice.receivedPublishQos1(this, topic, username, messageID, msg, expiry);
postOffice.receivedPublishQos1(this, username, messageID, msg, expiry);
return null;
}).ifFailed(msg::release);
case EXACTLY_ONCE: {
Expand Down
102 changes: 62 additions & 40 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType;
Expand Down Expand Up @@ -61,7 +62,6 @@

import static io.moquette.broker.Utils.messageId;
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
Expand Down Expand Up @@ -312,8 +312,14 @@ private void trackWillSpecificationForFutureFire(Session bindedSession, ISession

private void publishWill(ISessionsRepository.Will will) {
final Instant messageExpiryInstant = willMessageExpiry(will);
publish2Subscribers(WILL_PUBLISHER, Unpooled.copiedBuffer(will.payload), new Topic(will.topic),
will.qos, will.retained, messageExpiryInstant);
MqttPublishMessage willPublishMessage = MqttMessageBuilders.publish()
.topicName(will.topic)
.retained(will.retained)
.qos(will.qos)
.payload(Unpooled.copiedBuffer(will.payload))
.build();

publish2Subscribers(WILL_PUBLISHER, messageExpiryInstant, willPublishMessage);
}

private static Instant willMessageExpiry(ISessionsRepository.Will will) {
Expand Down Expand Up @@ -526,7 +532,7 @@ private void publishRetainedMessagesForSubscriptions(String clientID, Collection
LOG.info("No retained messages matching topic filter {}", topicFilter);
continue;
}
MqttProperties.MqttProperty[] properties = prepareSubscriptionProperties(subscription);
MqttProperties.MqttProperty[] properties = prepareSubscriptionProperties(subscription, Collections.emptyList());
for (RetainedMessage retainedMsg : retainedMsgs) {
final MqttQoS retainedQos = retainedMsg.qosLevel();
MqttQoS qos = lowerQosToTheSubscriptionDesired(subscription, retainedQos);
Expand Down Expand Up @@ -612,14 +618,15 @@ public void unsubscribe(List<String> topics, MQTTConnection mqttConnection, int
mqttConnection.sendUnsubAckMessage(topics, clientID, messageId);
}

CompletableFuture<Void> receivedPublishQos0(Topic topic, String username, String clientID, MqttPublishMessage msg,
CompletableFuture<Void> receivedPublishQos0(String username, String clientID, MqttPublishMessage msg,
Instant messageExpiry) {
final Topic topic = new Topic(msg.variableHeader().topicName());
if (!authorizator.canWrite(topic, username, clientID)) {
LOG.error("client is not authorized to publish on topic: {}", topic);
ReferenceCountUtil.release(msg);
return CompletableFuture.completedFuture(null);
}
final RoutingResults publishResult = publish2Subscribers(clientID, msg.payload(), topic, AT_MOST_ONCE, msg.fixedHeader().isRetain(), Instant.MAX);
final RoutingResults publishResult = publish2Subscribers(clientID, messageExpiry, msg);
if (publishResult.isAllFailed()) {
LOG.info("No one publish was successfully enqueued to session loops");
ReferenceCountUtil.release(msg);
Expand All @@ -637,9 +644,10 @@ CompletableFuture<Void> receivedPublishQos0(Topic topic, String username, String
});
}

RoutingResults receivedPublishQos1(MQTTConnection connection, Topic topic, String username, int messageID,
RoutingResults receivedPublishQos1(MQTTConnection connection, String username, int messageID,
MqttPublishMessage msg, Instant messageExpiry) {
// verify if topic can be written
final Topic topic = new Topic(msg.variableHeader().topicName());
topic.getTokens();
if (!topic.isValid()) {
LOG.warn("Invalid topic format, force close the connection");
Expand All @@ -654,14 +662,12 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, Topic topic, Strin
return RoutingResults.preroutingError();
}

ByteBuf payload = msg.payload();
boolean retainPublish = msg.fixedHeader().isRetain();
final RoutingResults routes;
if (msg.fixedHeader().isDup()) {
final Set<String> failedClients = failedPublishes.listFailed(clientId, messageID);
routes = publish2Subscribers(clientId, payload, topic, AT_LEAST_ONCE, failedClients, retainPublish, messageExpiry);
routes = publish2Subscribers(clientId, failedClients, messageExpiry, msg);
} else {
routes = publish2Subscribers(clientId, payload, topic, AT_LEAST_ONCE, retainPublish, messageExpiry);
routes = publish2Subscribers(clientId, messageExpiry, msg);
}
if (LOG.isTraceEnabled()) {
LOG.trace("subscriber routes: {}", routes);
Expand All @@ -684,7 +690,7 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, Topic topic, Strin
}

private void manageRetain(Topic topic, MqttPublishMessage msg) {
if (msg.fixedHeader().isRetain()) {
if (isRetained(msg)) {
if (!msg.payload().isReadable()) {
retainedRepository.cleanRetained(topic);
// clean also the tracker
Expand Down Expand Up @@ -715,9 +721,10 @@ private static int getIntProperty(MqttProperties props, MqttPropertyType prop) {
return mqttProperty.value();
}

private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf payload, Topic topic,
MqttQoS publishingQos, boolean isPublishRetained, Instant messageExpiry) {
return publish2Subscribers(publisherClientId, payload, topic, publishingQos, NO_FILTER, isPublishRetained, messageExpiry);
private RoutingResults publish2Subscribers(String publisherClientId,
Instant messageExpiry,
MqttPublishMessage msg) {
return publish2Subscribers(publisherClientId, NO_FILTER, messageExpiry, msg);
}

private class BatchingPublishesCollector {
Expand Down Expand Up @@ -783,8 +790,12 @@ public int countBatches() {
}
}

private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf payload, Topic topic, MqttQoS publishingQos,
Set<String> filterTargetClients, boolean retainPublish, Instant messageExpiry) {
private RoutingResults publish2Subscribers(String publisherClientId,
Set<String> filterTargetClients, Instant messageExpiry,
MqttPublishMessage msg) {
final boolean retainPublish = msg.fixedHeader().isRetain();
final Topic topic = new Topic(msg.variableHeader().topicName());
final MqttQoS publishingQos = msg.fixedHeader().qosLevel();
List<Subscription> topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic);
if (topicMatchingSubscriptions.isEmpty()) {
// no matching subscriptions, clean exit
Expand Down Expand Up @@ -815,11 +826,11 @@ private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf pay
return new RoutingResults(Collections.emptyList(), Collections.emptyList(), CompletableFuture.completedFuture(null));
}

payload.retain(subscriptionCount);
msg.retain(subscriptionCount);

List<RouteResult> publishResults = collector.routeBatchedPublishes((batch) -> {
publishToSession(payload, topic, batch, publishingQos, retainPublish, messageExpiry);
payload.release();
publishToSession(topic, batch, publishingQos, retainPublish, messageExpiry, msg);
msg.release();
});

final CompletableFuture[] publishFutures = publishResults.stream()
Expand All @@ -833,36 +844,38 @@ private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf pay
Collection<String> subscibersIds = collector.subscriberIdsByEventLoop(rr.clientId);
if (rr.status == RouteResult.Status.FAIL) {
failedRoutings.addAll(subscibersIds);
payload.release();
msg.release();
} else {
successedRoutings.addAll(subscibersIds);
}
}
return new RoutingResults(successedRoutings, failedRoutings, publishes);
}

private void publishToSession(ByteBuf payload, Topic topic, Collection<Subscription> subscriptions,
MqttQoS publishingQos, boolean retainPublish, Instant messageExpiry) {
ByteBuf duplicatedPayload = payload.duplicate();
private void publishToSession(Topic topic, Collection<Subscription> subscriptions,
MqttQoS publishingQos, boolean retainPublish, Instant messageExpiry, MqttPublishMessage msg) {
ByteBuf duplicatedPayload = msg.payload().duplicate();
for (Subscription sub : subscriptions) {
MqttQoS qos = lowerQosToTheSubscriptionDesired(sub, publishingQos);
boolean retained = false;
if (sub.option().isRetainAsPublished()) {
retained = retainPublish;
}
publishToSession(duplicatedPayload, topic, sub, qos, retained, messageExpiry);
publishToSession(duplicatedPayload, topic, sub, qos, retained, messageExpiry, msg);
}
}

private void publishToSession(ByteBuf payload, Topic topic, Subscription sub, MqttQoS qos, boolean retained,
Instant messageExpiry) {
Instant messageExpiry, MqttPublishMessage msg) {
Session targetSession = this.sessionRegistry.retrieve(sub.getClientId());

boolean isSessionPresent = targetSession != null;
if (isSessionPresent) {
LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}",
sub.getClientId(), sub.getTopicFilter(), qos);
final MqttProperties.MqttProperty[] properties = prepareSubscriptionProperties(sub);

Collection<? extends MqttProperties.MqttProperty> existingProperties = msg.variableHeader().properties().listAll();
final MqttProperties.MqttProperty[] properties = prepareSubscriptionProperties(sub, existingProperties);
final SessionRegistry.PublishedMessage publishedMessage =
new SessionRegistry.PublishedMessage(topic, qos, payload, retained, messageExpiry, properties);
targetSession.sendPublishOnSessionAtQos(publishedMessage);
Expand All @@ -874,15 +887,23 @@ private void publishToSession(ByteBuf payload, Topic topic, Subscription sub, Mq
}
}

private MqttProperties.MqttProperty[] prepareSubscriptionProperties(Subscription sub) {
MqttProperties.MqttProperty[] properties;
private MqttProperties.MqttProperty[] prepareSubscriptionProperties(Subscription sub,
Collection<? extends MqttProperties.MqttProperty> existingProperties) {

// copy all properties except SubscriptionId
Collection<MqttProperties.MqttProperty> properties = new ArrayList<>(existingProperties.size() + 1);
for (MqttProperties.MqttProperty property : existingProperties) {
// skip SUBSCRIPTION_IDENTIFIER because could be added by the subscription
if (property.propertyId() != MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value()) {
properties.add(property);
}
}
if (sub.hasSubscriptionIdentifier()) {
MqttProperties.IntegerProperty subscriptionId = createSubscriptionIdProperty(sub);
properties = new MqttProperties.MqttProperty[] { subscriptionId };
} else {
properties = new MqttProperties.MqttProperty[0];
properties.add(subscriptionId);
}
return properties;

return properties.toArray(new MqttProperties.MqttProperty[0]);
}

private MqttProperties.IntegerProperty createSubscriptionIdProperty(Subscription sub) {
Expand All @@ -899,7 +920,6 @@ RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage
Instant messageExpiry) {
LOG.trace("Processing PUB QoS2 message on connection: {}", connection);
final Topic topic = new Topic(msg.variableHeader().topicName());
final ByteBuf payload = msg.payload();

final String clientId = connection.getClientId();
if (!authorizator.canWrite(topic, username, clientId)) {
Expand All @@ -910,13 +930,12 @@ RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage
}

final int messageID = msg.variableHeader().packetId();
boolean retainPublish = msg.fixedHeader().isRetain();
final RoutingResults publishRoutings;
if (msg.fixedHeader().isDup()) {
final Set<String> failedClients = failedPublishes.listFailed(clientId, messageID);
publishRoutings = publish2Subscribers(clientId, payload, topic, EXACTLY_ONCE, failedClients, retainPublish, messageExpiry);
publishRoutings = publish2Subscribers(clientId, failedClients, messageExpiry, msg);
} else {
publishRoutings = publish2Subscribers(clientId, payload, topic, EXACTLY_ONCE, retainPublish, messageExpiry);
publishRoutings = publish2Subscribers(clientId, messageExpiry, msg);
}
if (publishRoutings.isAllSuccess()) {
// QoS2 PUB message was enqueued successfully to every event loop
Expand Down Expand Up @@ -958,11 +977,10 @@ public RoutingResults internalPublish(MqttPublishMessage msg) {
final ByteBuf payload = msg.payload();
LOG.info("Sending internal PUBLISH message Topic={}, qos={}", topic, qos);

boolean retainPublish = msg.fixedHeader().isRetain();
final RoutingResults publishResult = publish2Subscribers(INTERNAL_PUBLISHER, payload, topic, qos, retainPublish, Instant.MAX);
final RoutingResults publishResult = publish2Subscribers(INTERNAL_PUBLISHER, Instant.MAX, msg);
LOG.trace("after routed publishes: {}", publishResult);

if (!retainPublish) {
if (!isRetained(msg)) {
return publishResult;
}
if (qos == AT_MOST_ONCE || payload.readableBytes() == 0) {
Expand All @@ -974,6 +992,10 @@ public RoutingResults internalPublish(MqttPublishMessage msg) {
return publishResult;
}

private static boolean isRetained(MqttPublishMessage msg) {
return msg.fixedHeader().isRetain();
}

/**
* notify MqttConnectMessage after connection established (already pass login).
* @param msg
Expand Down
Loading

0 comments on commit ac2a9be

Please sign in to comment.