diff --git a/ChangeLog.txt b/ChangeLog.txt index 5764e651a..2128e61a3 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,5 @@ Version 0.18-SNAPSHOT: + [feature] Implement response-information property for request-response flow. (#840) [fix] Optimised page file opening for disk-based queues. (#837) [feature] Manage payload format indicator property, when set verify payload format. (#826) [refactoring] Refactory of PostOffice to pass publish message in hits entirety avoiding decomposition into single parameters. (#827) diff --git a/broker/src/main/java/io/moquette/broker/Authorizator.java b/broker/src/main/java/io/moquette/broker/Authorizator.java index 7cc404aee..6f237b1ea 100644 --- a/broker/src/main/java/io/moquette/broker/Authorizator.java +++ b/broker/src/main/java/io/moquette/broker/Authorizator.java @@ -26,6 +26,8 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.function.Function; import static io.moquette.broker.Utils.messageId; @@ -37,6 +39,11 @@ final class Authorizator { private final IAuthorizatorPolicy policy; + // Contains the list of topic-client that has read access forced on reply topic. + private ConcurrentMap, Boolean> responseTopicForcedReads = new ConcurrentHashMap<>(); + // Contains the list of requesters' reply topics that need write access by all the other (responders). + private ConcurrentMap responseTopicForcedWrites = new ConcurrentHashMap<>(); + Authorizator(IAuthorizatorPolicy policy) { this.policy = policy; } @@ -113,10 +120,30 @@ private MqttQoS getQoSCheckingAlsoPermissionsOnTopic(String clientID, String use * @return true if the user from client can publish data on topic. */ boolean canWrite(Topic topic, String user, String client) { - return policy.canWrite(topic, user, client); + boolean policyResult = policy.canWrite(topic, user, client); + if (!policyResult && responseTopicForcedWrites.containsKey(topic)) { + LOG.warn("Found write discord by policy and response information topic configured. The policy prohibit " + + "while the response topic should be accessible for all to write. topic: {}", topic); + return true; + } + return policyResult; } boolean canRead(Topic topic, String user, String client) { - return policy.canRead(topic, user, client); + boolean policyResult = policy.canRead(topic, user, client); + if (!policyResult && responseTopicForcedReads.containsKey(Utils.Couple.of(topic, client))) { + LOG.warn("Found read discord by policy and response information topic configured. The policy prohibit " + + "while the response topic should be accessible by read from client{}. topic: {}", client, topic); + return true; + } + return policyResult; + } + + void forceReadAccess(Topic topic, String client) { + responseTopicForcedReads.putIfAbsent(Utils.Couple.of(topic, client), true); + } + + public void forceWriteToAll(Topic topic) { + responseTopicForcedWrites.putIfAbsent(topic, true); } } diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index 37453103f..df8cfd22f 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -281,7 +281,13 @@ private void executeConnect(MqttConnectMessage msg, String clientId, boolean ser .sessionPresent(isSessionAlreadyPresent); if (isProtocolVersion(msg, MqttVersion.MQTT_5)) { // set properties for MQTT 5 - final MqttProperties ackProperties = prepareConnAckProperties(serverGeneratedClientId, clientId); + ConnAckPropertiesBuilder connAckPropertiesBuilder = prepareConnAckPropertiesBuilder(serverGeneratedClientId, clientId); + if (isNeedResponseInformation(msg)) { + // the responder and requested access to the topic are already configured during session creation + // in SessionRegistry + connAckPropertiesBuilder.responseInformation("/reqresp/response/" + clientId); + } + final MqttProperties ackProperties = connAckPropertiesBuilder.build(); connAckBuilder.properties(ackProperties); } final MqttConnAckMessage ackMessage = connAckBuilder.build(); @@ -328,6 +334,16 @@ public void operationComplete(ChannelFuture future) throws Exception { }); } + /** + * @return true iff message contains property REQUEST_RESPONSE_INFORMATION and is positive. + * */ + static boolean isNeedResponseInformation(MqttConnectMessage msg) { + MqttProperties.IntegerProperty requestRespInfo = (MqttProperties.IntegerProperty) msg.variableHeader() + .properties() + .getProperty(MqttProperties.MqttPropertyType.REQUEST_RESPONSE_INFORMATION.value()); + return requestRespInfo != null && requestRespInfo.value() >= 1; + } + /** * @return the value of the Payload Format Indicator property from Will specification. * */ @@ -352,10 +368,6 @@ private static boolean checkUTF8Validity(byte[] rawBytes) { return true; } - private MqttProperties prepareConnAckProperties(boolean serverGeneratedClientId, String clientId) { - return prepareConnAckPropertiesBuilder(serverGeneratedClientId, clientId).build(); - } - private ConnAckPropertiesBuilder prepareConnAckPropertiesBuilder(boolean serverGeneratedClientId, String clientId) { final ConnAckPropertiesBuilder builder = new ConnAckPropertiesBuilder(); // default maximumQos is 2, [MQTT-3.2.2-10] diff --git a/broker/src/main/java/io/moquette/broker/SessionRegistry.java b/broker/src/main/java/io/moquette/broker/SessionRegistry.java index 6c26641f2..e6520971d 100644 --- a/broker/src/main/java/io/moquette/broker/SessionRegistry.java +++ b/broker/src/main/java/io/moquette/broker/SessionRegistry.java @@ -395,6 +395,12 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) { newSession = new Session(sessionData, clean, queue); newSession.markConnecting(); sessionsRepository.saveSession(sessionData); + if (MQTTConnection.isNeedResponseInformation(msg)) { + // the responder client must have write access to this topic + // the requester client must have read access on this topic + authorizator.forceReadAccess(Topic.asTopic("/reqresp/response/" + clientId), clientId); + authorizator.forceWriteToAll(Topic.asTopic("/reqresp/response/" + clientId)); + } return newSession; } diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java index 848915d29..ffd53f275 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java @@ -1,132 +1,29 @@ package io.moquette.integration.mqtt5; -import com.hivemq.client.mqtt.MqttClient; -import com.hivemq.client.mqtt.MqttGlobalPublishFilter; -import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; -import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode; -import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; -import io.moquette.broker.Server; -import io.moquette.broker.config.IConfig; -import io.moquette.broker.config.MemoryConfig; -import io.moquette.integration.IntegrationUtils; import io.moquette.testclient.Client; import io.netty.handler.codec.mqtt.MqttConnAckMessage; -import io.netty.handler.codec.mqtt.MqttMessage; -import io.netty.handler.codec.mqtt.MqttMessageType; -import org.awaitility.Awaitility; -import org.awaitility.Durations; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.io.TempDir; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Path; -import java.time.Duration; -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import static io.moquette.integration.mqtt5.ConnectTest.assertConnectionAccepted; -import static org.junit.jupiter.api.Assertions.*; - -public abstract class AbstractServerIntegrationTest { - Server broker; - IConfig config; - @TempDir - Path tempFolder; - protected String dbPath; +public abstract class AbstractServerIntegrationTest extends AbstractServerIntegrationWithoutClientFixture { Client lowLevelClient; - @NotNull - static Mqtt5BlockingClient createSubscriberClient(String clientId) { - final Mqtt5BlockingClient client = MqttClient.builder() - .useMqttVersion5() - .identifier(clientId) - .serverHost("localhost") - .serverPort(1883) - .buildBlocking(); - assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, client.connect().getReasonCode(), clientId + " connected"); - return client; - } - - @NotNull - static Mqtt5BlockingClient createPublisherClient() { - return AbstractSubscriptionIntegrationTest.createClientWithStartFlagAndClientId(true, "publisher"); - } - - protected static void verifyNoPublish(Mqtt5BlockingClient subscriber, Consumer action, Duration timeout, String message) throws InterruptedException { - try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { - action.accept(null); - Optional publishedMessage = publishes.receive(timeout.getSeconds(), TimeUnit.SECONDS); - - // verify no published will in 10 seconds - assertFalse(publishedMessage.isPresent(), message); - } - } - - protected static void verifyPublishedMessage(Mqtt5BlockingClient client, Consumer action, MqttQos expectedQos, - String expectedPayload, String errorMessage, int timeoutSeconds) throws Exception { - try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) { - action.accept(null); - Optional publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS); - if (!publishMessage.isPresent()) { - fail("Expected to receive a publish message"); - return; - } - Mqtt5Publish msgPub = publishMessage.get(); - final String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); - assertEquals(expectedPayload, payload, errorMessage); - assertEquals(expectedQos, msgPub.getQos()); - } - } - - static void verifyOfType(MqttMessage received, MqttMessageType mqttMessageType) { - assertEquals(mqttMessageType, received.fixedHeader().messageType()); - } - - static void verifyPublishMessage(Mqtt5BlockingClient subscriber, Consumer assertion) throws InterruptedException { - try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { - Optional publishMessage = publishes.receive(1, TimeUnit.SECONDS); - if (!publishMessage.isPresent()) { - fail("Expected to receive a publish message"); - return; - } - Mqtt5Publish msgPub = publishMessage.get(); - assertion.accept(msgPub); - } - } - @NotNull Mqtt5BlockingClient createSubscriberClient() { String clientId = clientName(); - return createSubscriberClient(clientId); + return createHiveBlockingClient(clientId); } public abstract String clientName(); - protected void startServer(String dbPath) throws IOException { - broker = new Server(); - final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath); - config = new MemoryConfig(configProps); - broker.startServer(config); - } - - @BeforeAll - public static void beforeTests() { - Awaitility.setDefaultTimeout(Durations.ONE_SECOND); - } - @BeforeEach public void setUp() throws Exception { - dbPath = IntegrationUtils.tempH2Path(tempFolder); - startServer(dbPath); + super.setUp(); lowLevelClient = new Client("localhost").clientId(clientName()); } @@ -134,17 +31,7 @@ public void setUp() throws Exception { @AfterEach public void tearDown() throws Exception { lowLevelClient.shutdownConnection(); - stopServer(); - } - - protected void stopServer() { - broker.stopServer(); - } - - void restartServerWithSuspension(Duration timeout) throws InterruptedException, IOException { - stopServer(); - Thread.sleep(timeout.toMillis()); - startServer(dbPath); + super.tearDown(); } void connectLowLevel() { diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java new file mode 100644 index 000000000..47b3a8e60 --- /dev/null +++ b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationWithoutClientFixture.java @@ -0,0 +1,178 @@ +/* + * + * * Copyright (c) 2012-2024 The original author or authors + * * ------------------------------------------------------ + * * All rights reserved. This program and the accompanying materials + * * are made available under the terms of the Eclipse Public License v1.0 + * * and Apache License v2.0 which accompanies this distribution. + * * + * * The Eclipse Public License is available at + * * http://www.eclipse.org/legal/epl-v10.html + * * + * * The Apache License v2.0 is available at + * * http://www.opensource.org/licenses/apache2.0.php + * * + * * You may elect to redistribute this code under either of these licenses. + * + */ + +package io.moquette.integration.mqtt5; + +import com.hivemq.client.mqtt.MqttClient; +import com.hivemq.client.mqtt.MqttGlobalPublishFilter; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect; +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder; +import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; +import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import io.moquette.broker.Server; +import io.moquette.broker.config.IConfig; +import io.moquette.broker.config.MemoryConfig; +import io.moquette.integration.IntegrationUtils; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import org.awaitility.Awaitility; +import org.awaitility.Durations; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +public class AbstractServerIntegrationWithoutClientFixture { + + @TempDir + Path tempFolder; + protected String dbPath; + Server broker; + IConfig config; + + @BeforeAll + public static void beforeTests() { + Awaitility.setDefaultTimeout(Durations.ONE_SECOND); + } + + @BeforeEach + public void setUp() throws Exception { + dbPath = IntegrationUtils.tempH2Path(tempFolder); + startServer(dbPath); + } + + @AfterEach + public void tearDown() throws Exception { + stopServer(); + } + + protected void startServer(String dbPath) throws IOException { + broker = new Server(); + final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath); + config = new MemoryConfig(configProps); + broker.startServer(config); + } + + protected void stopServer() { + broker.stopServer(); + } + + void restartServerWithSuspension(Duration timeout) throws InterruptedException, IOException { + stopServer(); + Thread.sleep(timeout.toMillis()); + startServer(dbPath); + } + + @NotNull + static Mqtt5BlockingClient createHiveBlockingClient(String clientId) { + final Mqtt5BlockingClient client = MqttClient.builder() + .useMqttVersion5() + .identifier(clientId) + .serverHost("localhost") + .serverPort(1883) + .buildBlocking(); + assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, client.connect().getReasonCode(), clientId + " connected"); + return client; + } + + @NotNull + static Mqtt5BlockingClient createHiveBlockingClientWithResponseProtocol(String clientId) { + Mqtt5Connect connectRequest = Mqtt5Connect.builder() + .keepAlive(10) + .restrictions() + .requestResponseInformation(true) + .applyRestrictions() + .build(); + + final Mqtt5BlockingClient client = MqttClient.builder() + .useMqttVersion5() + .identifier(clientId) + .serverHost("localhost") + .serverPort(1883) + .buildBlocking(); + Mqtt5ConnAck connAck = client.connect(connectRequest); + assertEquals(Mqtt5ConnAckReasonCode.SUCCESS, connAck.getReasonCode(), clientId + " connected"); + assertTrue(connAck.getResponseInformation().isPresent(), "ConnACK must contain response topic assigned by the broker"); + String responseTopic = connAck.getResponseInformation().get().toString(); + assertEquals(responseTopic, "/reqresp/response/" + clientId, "Response topic pattern MUST we respected"); + return client; + } + + @NotNull + static Mqtt5BlockingClient createPublisherClient() { + return AbstractSubscriptionIntegrationTest.createClientWithStartFlagAndClientId(true, "publisher"); + } + + protected static void verifyNoPublish(Mqtt5BlockingClient subscriber, Consumer action, Duration timeout, String message) throws InterruptedException { + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { + action.accept(null); + Optional publishedMessage = publishes.receive(timeout.getSeconds(), TimeUnit.SECONDS); + + // verify no published will in 10 seconds + assertFalse(publishedMessage.isPresent(), message); + } + } + + protected static void verifyPublishedMessage(Mqtt5BlockingClient client, Consumer action, MqttQos expectedQos, + String expectedPayload, String errorMessage, int timeoutSeconds) throws Exception { + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) { + action.accept(null); + Optional publishMessage = publishes.receive(timeoutSeconds, TimeUnit.SECONDS); + if (!publishMessage.isPresent()) { + fail("Expected to receive a publish message"); + return; + } + Mqtt5Publish msgPub = publishMessage.get(); + final String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); + assertEquals(expectedPayload, payload, errorMessage); + assertEquals(expectedQos, msgPub.getQos()); + } + } + + static void verifyOfType(MqttMessage received, MqttMessageType mqttMessageType) { + assertEquals(mqttMessageType, received.fixedHeader().messageType()); + } + + static void verifyPublishMessage(Mqtt5BlockingClient subscriber, Consumer assertion) throws InterruptedException { + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { + Optional publishMessage = publishes.receive(1, TimeUnit.SECONDS); + if (!publishMessage.isPresent()) { + fail("Expected to receive a publish message"); + return; + } + Mqtt5Publish msgPub = publishMessage.get(); + assertion.accept(msgPub); + } + } +} diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java new file mode 100644 index 000000000..1eafab0ad --- /dev/null +++ b/broker/src/test/java/io/moquette/integration/mqtt5/RequestResponseTest.java @@ -0,0 +1,161 @@ +/* + * + * * Copyright (c) 2012-2024 The original author or authors + * * ------------------------------------------------------ + * * All rights reserved. This program and the accompanying materials + * * are made available under the terms of the Eclipse Public License v1.0 + * * and Apache License v2.0 which accompanies this distribution. + * * + * * The Eclipse Public License is available at + * * http://www.eclipse.org/legal/epl-v10.html + * * + * * The Apache License v2.0 is available at + * * http://www.opensource.org/licenses/apache2.0.php + * * + * * You may elect to redistribute this code under either of these licenses. + * + */ + +package io.moquette.integration.mqtt5; + +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; +import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckReasonCode; +import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe; +import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; +import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RequestResponseTest extends AbstractServerIntegrationWithoutClientFixture { + + @Test + public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReply() throws InterruptedException { + final Mqtt5BlockingClient requester = createHiveBlockingClient("requester"); + final String responseTopic = "requester/door/open/result"; + subscribeToResponseTopic(requester, responseTopic); + + final Mqtt5BlockingClient responder = createHiveBlockingClient("responder"); + + responderRepliesToRequesterPublish(responder, requester, responseTopic); + + verifyPublishMessage(requester, msgPub -> { + assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); + String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); + assertEquals("OK", payload); + }); + } + + private static void responderRepliesToRequesterPublish(Mqtt5BlockingClient responder, Mqtt5BlockingClient requester, String responseTopic) { + Mqtt5Subscribe subscribeToRequest = Mqtt5Subscribe.builder() + .topicFilter("requester/door/open") + .qos(MqttQos.AT_LEAST_ONCE) + .build(); + responder.toAsync().subscribe(subscribeToRequest, + (Mqtt5Publish pub) -> { + assertTrue(pub.getResponseTopic().isPresent(), "Response topic MUST defined in request publish"); + Mqtt5PublishResult responseResult = responder.publishWith() + .topic(pub.getResponseTopic().get()) + .payload("OK".getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + assertTrue(responseResult instanceof Mqtt5PublishResult.Mqtt5Qos1Result, "QoS1 Response must be present"); + Mqtt5PublishResult.Mqtt5Qos1Result qos1Result = (Mqtt5PublishResult.Mqtt5Qos1Result) responseResult; + assertEquals(Mqtt5PubAckReasonCode.SUCCESS, qos1Result.getPubAck().getReasonCode(), + "Open door response cannot be published "); + }); + + Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith() + .topic("requester/door/open") + .responseTopic(responseTopic) + .payload("Please open the door".getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + assertEquals(Mqtt5PubAckReasonCode.SUCCESS, requestResult.getPubAck().getReasonCode(), + "Open door request cannot be published "); + } + + private static void subscribeToResponseTopic(Mqtt5BlockingClient requester, String responseTopic) { + Mqtt5SubAck subAck = requester.subscribeWith() + .topicFilter(responseTopic) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + assertThat(subAck.getReasonCodes()).contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1); + } + + @Test + public void givenRequestResponseProtocolWhenRequestIsIssueThenTheResponderReplyWithCorrelationData() throws InterruptedException { + final Mqtt5BlockingClient requester = createHiveBlockingClient("requester"); + final String responseTopic = "requester/door/open/result"; + subscribeToResponseTopic(requester, responseTopic); + + final Mqtt5BlockingClient responder = createHiveBlockingClient("responder"); + + Mqtt5Subscribe subscribeToRequest = Mqtt5Subscribe.builder() + .topicFilter("requester/door/open") + .qos(MqttQos.AT_LEAST_ONCE) + .build(); + responder.toAsync().subscribe(subscribeToRequest, + (Mqtt5Publish pub) -> { + assertTrue(pub.getResponseTopic().isPresent(), "Response topic MUST defined in request publish"); + assertTrue(pub.getCorrelationData().isPresent(), "Correlation data MUST defined in request publish"); + Mqtt5PublishResult responseResult = responder.publishWith() + .topic(pub.getResponseTopic().get()) + .correlationData(pub.getCorrelationData().get()) + .payload("OK".getBytes(StandardCharsets.UTF_8)) + .send(); + assertFalse(responseResult.getError().isPresent(), "Open door response cannot be published "); + }); + + Mqtt5PublishResult.Mqtt5Qos1Result requestResult = (Mqtt5PublishResult.Mqtt5Qos1Result) requester.publishWith() + .topic("requester/door/open") + .responseTopic(responseTopic) + .correlationData("req-open-door".getBytes(StandardCharsets.UTF_8)) + .payload("Please open the door".getBytes(StandardCharsets.UTF_8)) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + assertEquals(Mqtt5PubAckReasonCode.SUCCESS, requestResult.getPubAck().getReasonCode(), + "Open door request cannot be published "); + + verifyPublishMessage(requester, msgPub -> { + assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); + String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); + assertEquals("OK", payload); + assertTrue(msgPub.getCorrelationData().isPresent(), "Request correlation data MUST defined in response publish"); + final byte[] correlationData = asByteArray(msgPub.getCorrelationData().get()); + assertEquals("req-open-door", new String(correlationData, StandardCharsets.UTF_8)); + }); + } + + private byte[] asByteArray(ByteBuffer byteBuffer) { + byte[] arr = new byte[byteBuffer.remaining()]; + byteBuffer.get(arr); + return arr; + } + + @Test + public void givenRequestResponseProtocolAndClientIsConnectedWhenRequestIsIssueThenTheResponderReply() throws InterruptedException { + final Mqtt5BlockingClient requester = createHiveBlockingClientWithResponseProtocol("requester"); + final String responseTopic = "/reqresp/response/requester"; + subscribeToResponseTopic(requester, responseTopic); + + final Mqtt5BlockingClient responder = createHiveBlockingClient("responder"); + + responderRepliesToRequesterPublish(responder, requester, responseTopic); + + verifyPublishMessage(requester, msgPub -> { + assertTrue(msgPub.getPayload().isPresent(), "Response payload MUST be present"); + String payload = new String(msgPub.getPayloadAsBytes(), StandardCharsets.UTF_8); + assertEquals("OK", payload); + }); + } +} diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java index 9308fbf94..e0997816c 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/SharedSubscriptionTest.java @@ -253,11 +253,11 @@ public void givenMultipleClientSubscribedToSharedSubscriptionWhenOneUnsubscribeT String fullSharedSubscriptionTopicFilter = "$share/collectors/metric/temperature/living"; // subscribe first client to shared subscription - final Mqtt5BlockingClient subscriber1 = createSubscriberClient("subscriber1"); + final Mqtt5BlockingClient subscriber1 = createHiveBlockingClient("subscriber1"); subscribe(subscriber1, fullSharedSubscriptionTopicFilter, MqttQos.AT_LEAST_ONCE); // subscribe second client to shared subscription - final Mqtt5BlockingClient subscriber2 = createSubscriberClient("subscriber2"); + final Mqtt5BlockingClient subscriber2 = createHiveBlockingClient("subscriber2"); subscribe(subscriber2, fullSharedSubscriptionTopicFilter, MqttQos.AT_LEAST_ONCE); // unsubscribe successfully the first subscriber @@ -286,7 +286,7 @@ public void givenASharedSubscriptionWhenLastSubscribedClientUnsubscribeThenTheSh String fullSharedSubscriptionTopicFilter = "$share/collectors/metric/temperature/living"; // subscribe client to shared subscription - final Mqtt5BlockingClient subscriber = createSubscriberClient("subscriber1"); + final Mqtt5BlockingClient subscriber = createHiveBlockingClient("subscriber1"); subscribe(subscriber, fullSharedSubscriptionTopicFilter, MqttQos.AT_LEAST_ONCE); // verify subscribed to the shared receives a message