Skip to content

Commit 4206c41

Browse files
authored
GH-3955: Mqtt adapter unsubscribe when cleanStart
Fixes #3955 `Mqttv5PahoMessageDrivenChannelAdapter` unsubscribes from all topics, even if `cleanStart/cleanSession` is set to `false`, thus not receiving offline messages after restart. * unsubscribe `Mqttv5PahoMessageDrivenChannelAdapter` only when `cleanStart` * add tests **Cherry-pick to `5.5.x`**
1 parent 532f323 commit 4206c41

File tree

2 files changed

+104
-1
lines changed

2 files changed

+104
-1
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
* @author Mikhail Polivakha
7373
* @author Lucas Bowler
7474
* @author Artem Vozhdayenko
75+
* @author Matthias Thoma
7576
*
7677
* @since 5.5.5
7778
*
@@ -217,7 +218,9 @@ protected void doStop() {
217218
String[] topics = getTopic();
218219
try {
219220
if (this.mqttClient != null && this.mqttClient.isConnected()) {
220-
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
221+
if (this.connectionOptions.isCleanStart()) {
222+
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
223+
}
221224

222225
if (getClientManager() == null) {
223226
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2002-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt;
18+
19+
20+
import org.eclipse.paho.mqttv5.client.IMqttAsyncClient;
21+
import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
22+
import org.eclipse.paho.mqttv5.client.IMqttToken;
23+
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
24+
import org.eclipse.paho.mqttv5.common.MqttException;
25+
import org.eclipse.paho.mqttv5.common.MqttSubscription;
26+
import org.junit.jupiter.api.Test;
27+
28+
import org.springframework.beans.factory.BeanFactory;
29+
import org.springframework.context.ApplicationEventPublisher;
30+
import org.springframework.integration.channel.NullChannel;
31+
import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter;
32+
import org.springframework.test.util.ReflectionTestUtils;
33+
34+
import static org.mockito.ArgumentMatchers.any;
35+
import static org.mockito.BDDMockito.given;
36+
import static org.mockito.Mockito.mock;
37+
import static org.mockito.Mockito.never;
38+
import static org.mockito.Mockito.verify;
39+
40+
/**
41+
* @author Gary Russell
42+
* @author Artem Bilan
43+
* @author Artem Vozhdayenko
44+
* @author Matthias Thoma
45+
*
46+
* @since 5.5.16
47+
*
48+
*/
49+
public class Mqttv5AdapterTests {
50+
51+
@Test
52+
public void testStop() throws Exception {
53+
final IMqttAsyncClient client = mock(IMqttAsyncClient.class);
54+
Mqttv5PahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, true);
55+
56+
adapter.start();
57+
adapter.connectComplete(false, null);
58+
adapter.stop();
59+
60+
verify(client).connect(any(MqttConnectionOptions.class));
61+
verify(client).subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener[].class), any());
62+
verify(client).unsubscribe(any(String[].class));
63+
}
64+
65+
@Test
66+
public void testStopNotClean() throws Exception {
67+
final IMqttAsyncClient client = mock(IMqttAsyncClient.class);
68+
Mqttv5PahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false);
69+
70+
adapter.start();
71+
adapter.connectComplete(false, null);
72+
adapter.stop();
73+
74+
verify(client).connect(any(MqttConnectionOptions.class));
75+
verify(client).subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener[].class), any());
76+
verify(client, never()).unsubscribe(any(String[].class));
77+
}
78+
79+
private static Mqttv5PahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttAsyncClient client, boolean cleanStart) throws MqttException {
80+
81+
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
82+
connectionOptions.setServerURIs(new String[] {"tcp://localhost:1883"});
83+
connectionOptions.setCleanStart(cleanStart);
84+
85+
given(client.isConnected()).willReturn(true);
86+
IMqttToken token = mock(IMqttToken.class);
87+
given(client.disconnect()).willReturn(token);
88+
given(client.connect(any(MqttConnectionOptions.class))).willReturn(token);
89+
given(client.subscribe(any(MqttSubscription[].class), any(), any(), any(IMqttMessageListener[].class), any())).willReturn(token);
90+
given(client.unsubscribe(any(String[].class))).willReturn(token);
91+
Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(connectionOptions, "client", "foo");
92+
ReflectionTestUtils.setField(adapter, "mqttClient", client);
93+
adapter.setBeanFactory(mock(BeanFactory.class));
94+
adapter.setApplicationEventPublisher(mock(ApplicationEventPublisher.class));
95+
adapter.setOutputChannel(new NullChannel());
96+
adapter.afterPropertiesSet();
97+
return adapter;
98+
}
99+
100+
}

0 commit comments

Comments
 (0)