Skip to content

Commit

Permalink
Implement response-information property for request-response flow (mo…
Browse files Browse the repository at this point in the history
…quette-io#840)

- [feature] Updates the executeConnect to return the property response-information in CONNACT if the property request-response-information is set in CONNECT message. The response-information property contains the topic used by the responder to respond to the requester and consequently updates the read access map so that the requester can listen for new PUBLIUSH on that topic.
- [test] Extracted common test behavior in AbstractServerIntegrationWithoutClientFixture to share fixture startup of just broker without any client.
  • Loading branch information
andsel authored Jul 20, 2024
1 parent 38428f6 commit c713730
Show file tree
Hide file tree
Showing 8 changed files with 399 additions and 127 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[feature] Implement response-information property for request-response flow. (#840)
[fix] Optimised page file opening for disk-based queues. (#837)
[feature] Manage payload format indicator property, when set verify payload format. (#826)
[refactoring] Refactory of PostOffice to pass publish message in hits entirety avoiding decomposition into single parameters. (#827)
Expand Down
31 changes: 29 additions & 2 deletions broker/src/main/java/io/moquette/broker/Authorizator.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

import static io.moquette.broker.Utils.messageId;
Expand All @@ -37,6 +39,11 @@ final class Authorizator {

private final IAuthorizatorPolicy policy;

// Contains the list of topic-client that has read access forced on reply topic.
private ConcurrentMap<Utils.Couple<Topic, String>, Boolean> responseTopicForcedReads = new ConcurrentHashMap<>();
// Contains the list of requesters' reply topics that need write access by all the other (responders).
private ConcurrentMap<Topic, Boolean> responseTopicForcedWrites = new ConcurrentHashMap<>();

Authorizator(IAuthorizatorPolicy policy) {
this.policy = policy;
}
Expand Down Expand Up @@ -113,10 +120,30 @@ private MqttQoS getQoSCheckingAlsoPermissionsOnTopic(String clientID, String use
* @return true if the user from client can publish data on topic.
*/
boolean canWrite(Topic topic, String user, String client) {
return policy.canWrite(topic, user, client);
boolean policyResult = policy.canWrite(topic, user, client);
if (!policyResult && responseTopicForcedWrites.containsKey(topic)) {
LOG.warn("Found write discord by policy and response information topic configured. The policy prohibit " +
"while the response topic should be accessible for all to write. topic: {}", topic);
return true;
}
return policyResult;
}

boolean canRead(Topic topic, String user, String client) {
return policy.canRead(topic, user, client);
boolean policyResult = policy.canRead(topic, user, client);
if (!policyResult && responseTopicForcedReads.containsKey(Utils.Couple.of(topic, client))) {
LOG.warn("Found read discord by policy and response information topic configured. The policy prohibit " +
"while the response topic should be accessible by read from client{}. topic: {}", client, topic);
return true;
}
return policyResult;
}

void forceReadAccess(Topic topic, String client) {
responseTopicForcedReads.putIfAbsent(Utils.Couple.of(topic, client), true);
}

public void forceWriteToAll(Topic topic) {
responseTopicForcedWrites.putIfAbsent(topic, true);
}
}
22 changes: 17 additions & 5 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,13 @@ private void executeConnect(MqttConnectMessage msg, String clientId, boolean ser
.sessionPresent(isSessionAlreadyPresent);
if (isProtocolVersion(msg, MqttVersion.MQTT_5)) {
// set properties for MQTT 5
final MqttProperties ackProperties = prepareConnAckProperties(serverGeneratedClientId, clientId);
ConnAckPropertiesBuilder connAckPropertiesBuilder = prepareConnAckPropertiesBuilder(serverGeneratedClientId, clientId);
if (isNeedResponseInformation(msg)) {
// the responder and requested access to the topic are already configured during session creation
// in SessionRegistry
connAckPropertiesBuilder.responseInformation("/reqresp/response/" + clientId);
}
final MqttProperties ackProperties = connAckPropertiesBuilder.build();
connAckBuilder.properties(ackProperties);
}
final MqttConnAckMessage ackMessage = connAckBuilder.build();
Expand Down Expand Up @@ -328,6 +334,16 @@ public void operationComplete(ChannelFuture future) throws Exception {
});
}

/**
* @return true iff message contains property REQUEST_RESPONSE_INFORMATION and is positive.
* */
static boolean isNeedResponseInformation(MqttConnectMessage msg) {
MqttProperties.IntegerProperty requestRespInfo = (MqttProperties.IntegerProperty) msg.variableHeader()
.properties()
.getProperty(MqttProperties.MqttPropertyType.REQUEST_RESPONSE_INFORMATION.value());
return requestRespInfo != null && requestRespInfo.value() >= 1;
}

/**
* @return the value of the Payload Format Indicator property from Will specification.
* */
Expand All @@ -352,10 +368,6 @@ private static boolean checkUTF8Validity(byte[] rawBytes) {
return true;
}

private MqttProperties prepareConnAckProperties(boolean serverGeneratedClientId, String clientId) {
return prepareConnAckPropertiesBuilder(serverGeneratedClientId, clientId).build();
}

private ConnAckPropertiesBuilder prepareConnAckPropertiesBuilder(boolean serverGeneratedClientId, String clientId) {
final ConnAckPropertiesBuilder builder = new ConnAckPropertiesBuilder();
// default maximumQos is 2, [MQTT-3.2.2-10]
Expand Down
6 changes: 6 additions & 0 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,12 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) {
newSession = new Session(sessionData, clean, queue);
newSession.markConnecting();
sessionsRepository.saveSession(sessionData);
if (MQTTConnection.isNeedResponseInformation(msg)) {
// the responder client must have write access to this topic
// the requester client must have read access on this topic
authorizator.forceReadAccess(Topic.asTopic("/reqresp/response/" + clientId), clientId);
authorizator.forceWriteToAll(Topic.asTopic("/reqresp/response/" + clientId));
}
return newSession;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,150 +1,37 @@
package io.moquette.integration.mqtt5;

import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.integration.IntegrationUtils;
import io.moquette.testclient.Client;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static io.moquette.integration.mqtt5.ConnectTest.assertConnectionAccepted;
import static org.junit.jupiter.api.Assertions.*;

public abstract class AbstractServerIntegrationTest {
Server broker;
IConfig config;

@TempDir
Path tempFolder;
protected String dbPath;
public abstract class AbstractServerIntegrationTest extends AbstractServerIntegrationWithoutClientFixture {

Client lowLevelClient;

@NotNull
static Mqtt5BlockingClient createSubscriberClient(String clientId) {
final Mqtt5BlockingClient client = MqttClient.builder()
.useMqttVersion5()
.identifier(clientId)
.serverHost("localhost")
.serverPort(1883)
.buildBlocking();
assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, client.connect().getReasonCode(), clientId + " connected");
return client;
}

@NotNull
static Mqtt5BlockingClient createPublisherClient() {
return AbstractSubscriptionIntegrationTest.createClientWithStartFlagAndClientId(true, "publisher");
}

protected static void verifyNoPublish(Mqtt5BlockingClient subscriber, Consumer<Void> action, Duration timeout, String message) throws InterruptedException {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
action.accept(null);
Optional<Mqtt5Publish> publishedMessage = publishes.receive(timeout.getSeconds(), TimeUnit.SECONDS);

// verify no published will in 10 seconds
assertFalse(publishedMessage.isPresent(), message);
}
}

protected static void verifyPublishedMessage(Mqtt5BlockingClient client, Consumer<Void> action, MqttQos expectedQos,
String expectedPayload, String errorMessage, int timeoutSeconds) throws Exception {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) {
action.accept(null);
Optional<Mqtt5Publish> publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
final String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8);
assertEquals(expectedPayload, payload, errorMessage);
assertEquals(expectedQos, msgPub.getQos());
}
}

static void verifyOfType(MqttMessage received, MqttMessageType mqttMessageType) {
assertEquals(mqttMessageType, received.fixedHeader().messageType());
}

static void verifyPublishMessage(Mqtt5BlockingClient subscriber, Consumer<Mqtt5Publish> assertion) throws InterruptedException {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
Optional<Mqtt5Publish> publishMessage = publishes.receive(1, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
assertion.accept(msgPub);
}
}

@NotNull
Mqtt5BlockingClient createSubscriberClient() {
String clientId = clientName();
return createSubscriberClient(clientId);
return createHiveBlockingClient(clientId);
}

public abstract String clientName();

protected void startServer(String dbPath) throws IOException {
broker = new Server();
final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath);
config = new MemoryConfig(configProps);
broker.startServer(config);
}

@BeforeAll
public static void beforeTests() {
Awaitility.setDefaultTimeout(Durations.ONE_SECOND);
}

@BeforeEach
public void setUp() throws Exception {
dbPath = IntegrationUtils.tempH2Path(tempFolder);
startServer(dbPath);
super.setUp();

lowLevelClient = new Client("localhost").clientId(clientName());
}

@AfterEach
public void tearDown() throws Exception {
lowLevelClient.shutdownConnection();
stopServer();
}

protected void stopServer() {
broker.stopServer();
}

void restartServerWithSuspension(Duration timeout) throws InterruptedException, IOException {
stopServer();
Thread.sleep(timeout.toMillis());
startServer(dbPath);
super.tearDown();
}

void connectLowLevel() {
Expand Down
Loading

0 comments on commit c713730

Please sign in to comment.