Skip to content

Commit

Permalink
[ISSUE #669] Support RocketMQ as storage of mqtt broker (#687)
Browse files Browse the repository at this point in the history
* Config refactor, add some rocketmq config

* Add rmq publish producer

* Fix NPE start with memory mode, remove useless dependency

* Non-standard mqtt request return directly

* Add rmq subscribe consumer and related pull task, complete mqtt SUBSCRIBE and UNSUBSCRIBE protcol

* Fix subTopic not remove when disconnect

* Fix connection management, fix consume delay to make pull task to queue level

* Fix ut not pass when root topic removed from subsription

* Fix integration test not pass when not use rmq subscribe consumer

Co-authored-by: zhangxu16 <[email protected]>
  • Loading branch information
maixiaohai and zhangxu16 committed Jul 21, 2021
1 parent ce5c543 commit 3aa8ee5
Show file tree
Hide file tree
Showing 24 changed files with 940 additions and 96 deletions.
22 changes: 16 additions & 6 deletions rocketmq-iot-bridge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ limitations under the License.
<name>rocketmq-iot-bridge ${project.version}</name>

<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down Expand Up @@ -53,12 +58,6 @@ limitations under the License.
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand All @@ -74,7 +73,10 @@ limitations under the License.
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

<rocketmq.version>4.8.0</rocketmq.version>
</properties>

<build>
<plugins>
<plugin>
Expand Down Expand Up @@ -209,6 +211,14 @@ limitations under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import org.apache.rocketmq.iot.common.configuration.MQTTBridgeConfiguration;
import org.apache.rocketmq.iot.common.config.MqttBridgeConfig;
import org.apache.rocketmq.iot.common.data.Message;
import org.apache.rocketmq.iot.connection.client.ClientManager;
import org.apache.rocketmq.iot.protocol.mqtt.handler.MessageDispatcher;
Expand All @@ -38,36 +38,61 @@
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttDisconnectMessageHandler;
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttMessageForwarder;
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttPingreqMessageHandler;
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttPublishMessageHandler;
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttSubscribeMessageHandler;
import org.apache.rocketmq.iot.protocol.mqtt.handler.downstream.impl.MqttUnsubscribeMessagHandler;
import org.apache.rocketmq.iot.storage.message.MessageStore;
import org.apache.rocketmq.iot.storage.rocketmq.PublishProducer;
import org.apache.rocketmq.iot.storage.rocketmq.RocketMQPublishProducer;
import org.apache.rocketmq.iot.storage.rocketmq.RocketMQSubscribeConsumer;
import org.apache.rocketmq.iot.storage.rocketmq.SubscribeConsumer;
import org.apache.rocketmq.iot.storage.subscription.SubscriptionStore;
import org.apache.rocketmq.iot.storage.subscription.impl.InMemorySubscriptionStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MQTTBridge {
private Logger logger = LoggerFactory.getLogger(MQTTBridge.class);

private MqttBridgeConfig bridgeConfig;

private ServerBootstrap serverBootstrap;
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;

private MessageDispatcher messageDispatcher;
private SubscriptionStore subscriptionStore;
private ClientManager clientManager;
private MqttConnectionHandler connectionHandler;
private Logger logger = LoggerFactory.getLogger(MQTTBridge.class);
private MessageStore messageStore;
private PublishProducer publishProducer;
private SubscribeConsumer subscribeConsumer;

public MQTTBridge() {
init();
}

private void init() {
bossGroup = new NioEventLoopGroup(MQTTBridgeConfiguration.threadNumOfBossGroup());
workerGroup = new NioEventLoopGroup(MQTTBridgeConfiguration.threadNumOfWorkerGroup());
this.bridgeConfig = new MqttBridgeConfig();

subscriptionStore = new InMemorySubscriptionStore();
if (bridgeConfig.isEnableRocketMQStore()) {
this.publishProducer = new RocketMQPublishProducer(bridgeConfig);
this.subscribeConsumer = new RocketMQSubscribeConsumer(bridgeConfig, subscriptionStore);
}

clientManager = new ClientManagerImpl();
messageDispatcher = new MessageDispatcher(clientManager);
connectionHandler = new MqttConnectionHandler(clientManager, subscriptionStore, subscribeConsumer);
registerMessageHandlers();

bossGroup = new NioEventLoopGroup(bridgeConfig.getBossGroupThreadNum());
workerGroup = new NioEventLoopGroup(bridgeConfig.getWorkerGroupThreadNum());
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.localAddress(MQTTBridgeConfiguration.port())
.localAddress(bridgeConfig.getBrokerPort())
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, MQTTBridgeConfiguration.socketBacklog())
.option(ChannelOption.SO_BACKLOG, bridgeConfig.getSocketBacklogSize())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
Expand All @@ -78,29 +103,35 @@ private void init() {
pipeline.addLast("connection-manager", connectionHandler);
}
});
subscriptionStore = new InMemorySubscriptionStore();
clientManager = new ClientManagerImpl();
messageDispatcher = new MessageDispatcher(clientManager);
connectionHandler = new MqttConnectionHandler(clientManager, subscriptionStore);
registerMessageHandlers();

}

private void registerMessageHandlers() {
messageDispatcher.registerHandler(Message.Type.MQTT_CONNECT, new MqttConnectMessageHandler(clientManager));
messageDispatcher.registerHandler(Message.Type.MQTT_DISCONNECT, new MqttDisconnectMessageHandler(clientManager));
messageDispatcher.registerHandler(Message.Type.MQTT_PUBLISH, new MqttMessageForwarder(subscriptionStore));
if (bridgeConfig.isEnableRocketMQStore()) {
messageDispatcher.registerHandler(Message.Type.MQTT_PUBLISH, new MqttPublishMessageHandler(messageStore, publishProducer));
// TODO: mqtt cluster inner forwarder, need management of offset and client
} else {
messageDispatcher.registerHandler(Message.Type.MQTT_PUBLISH, new MqttMessageForwarder(subscriptionStore));
}
// TODO qos 1/2 PUBLISH
// TODO qos 1: PUBACK
// TODO qos 2: PUBREC
// TODO qos 2: PUBREL
// TODO qos 2: PUBCOMP
messageDispatcher.registerHandler(Message.Type.MQTT_PINGREQ, new MqttPingreqMessageHandler());
messageDispatcher.registerHandler(Message.Type.MQTT_SUBSCRIBE, new MqttSubscribeMessageHandler(subscriptionStore));
messageDispatcher.registerHandler(Message.Type.MQTT_UNSUBSCRIBE, new MqttUnsubscribeMessagHandler(subscriptionStore));
messageDispatcher.registerHandler(Message.Type.MQTT_SUBSCRIBE, new MqttSubscribeMessageHandler(subscriptionStore, subscribeConsumer));
messageDispatcher.registerHandler(Message.Type.MQTT_UNSUBSCRIBE, new MqttUnsubscribeMessagHandler(subscriptionStore, subscribeConsumer));
}

public void start() {
logger.info("start the MQTTServer with config " + bridgeConfig);
try {
if (bridgeConfig.isEnableRocketMQStore()) {
publishProducer.start();
subscribeConsumer.start();
}
ChannelFuture channelFuture = serverBootstrap.bind().sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
Expand All @@ -109,12 +140,15 @@ public void start() {
logger.info("shutdown the MQTTServer");
shutdown();
}

}

public void shutdown() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
if (bridgeConfig.isEnableRocketMQStore()) {
publishProducer.shutdown();
subscribeConsumer.shutdown();
}
}

public static void main(String [] args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.iot.common.config;

import java.util.Properties;

import static org.apache.rocketmq.iot.common.configuration.MQTTBridgeConfiguration.*;

public class MqttBridgeConfig {
private Properties properties;

private String brokerHost;
private int brokerPort;
private int bossGroupThreadNum;
private int workerGroupThreadNum;
private int socketBacklogSize;

private boolean enableRocketMQStore;
private String rmqNamesrvAddr;
private String rmqProductGroup;
private String rmqConsumerGroup;
private int rmqConsumerPullNums;
private String rmqAccessKey;
private String rmqSecretKey;

public MqttBridgeConfig() {
initConfig();
}

public MqttBridgeConfig(Properties properties) {
this.properties = properties;
}

public void initConfig() {
this.brokerHost = System.getProperty(MQTT_BROKER_HOST, MQTT_BROKER_HOST_DEFAULT);
this.brokerPort = Integer.parseInt(System.getProperty(MQTT_BROKER_PORT, MQTT_BROKER_PORT_DEFAULT));

this.bossGroupThreadNum = Integer.parseInt(System.getProperty(MQTT_SERVER_BOSS_GROUP_THREAD_NUM,
MQTT_SERVER_BOSS_GROUP_THREAD_NUM_DEFAULT));
this.workerGroupThreadNum = Integer.parseInt(System.getProperty(MQTT_SERVER_WORKER_GROUP_THREAD_NUM,
MQTT_SERVER_WORKER_GROUP_THREAD_NUM_DEFAULT));
this.socketBacklogSize = Integer.parseInt(System.getProperty(MQTT_SERVER_SOCKET_BACKLOG_SIZE,
MQTT_SERVER_SOCKET_BACKLOG_SIZE_DEFAULT));

this.enableRocketMQStore = Boolean.parseBoolean(System.getProperty(MQTT_ROCKETMQ_STORE_ENABLED, MQTT_ROCKETMQ_STORE_ENABLED_DEFAULT));
if (enableRocketMQStore) {
this.rmqNamesrvAddr = System.getProperty(MQTT_ROCKETMQ_NAMESRVADDR, MQTT_ROCKETMQ_NAMESRVADDR_DEFAULT);
this.rmqProductGroup = System.getProperty(MQTT_ROCKETMQ_PRODUCER_GROUP, MQTT_ROCKETMQ_PRODUCER_GROUP_DEFAULT);
this.rmqConsumerGroup = System.getProperty(MQTT_ROCKETMQ_CONSUMER_GROUP, MQTT_ROCKETMQ_CONSUMER_GROUP_DEFAULT);
this.rmqConsumerPullNums = Integer.parseInt(System.getProperty(MQTT_ROKECTMQ_CONSUMER_PULL_NUMS,
MQTT_ROKECTMQ_CONSUMER_PULL_NUMS_DEFAULT));

this.rmqAccessKey = System.getProperty(MQTT_ROCKETMQ_ACCESSKEY, MQTT_ROCKETMQ_ACCESSKEY_DEFAULT);
this.rmqSecretKey = System.getProperty(MQTT_ROCKETMQ_SECRETKEY, MQTT_ROCKETMQ_SECRETKEY_DEFAULT);
}

}

public String getBrokerHost() {
return brokerHost;
}

public int getBrokerPort() {
return brokerPort;
}

public int getBossGroupThreadNum() {
return bossGroupThreadNum;
}

public int getWorkerGroupThreadNum() {
return workerGroupThreadNum;
}

public int getSocketBacklogSize() {
return socketBacklogSize;
}

public boolean isEnableRocketMQStore() {
return enableRocketMQStore;
}

public String getRmqAccessKey() {
return rmqAccessKey;
}

public String getRmqSecretKey() {
return rmqSecretKey;
}

public String getRmqNamesrvAddr() {
return rmqNamesrvAddr;
}

public String getRmqProductGroup() {
return rmqProductGroup;
}

public String getRmqConsumerGroup() {
return rmqConsumerGroup;
}

public int getRmqConsumerPullNums() {
return rmqConsumerPullNums;
}

@Override public String toString() {
return "MqttBridgeConfig{" +
"brokerHost='" + brokerHost + '\'' +
", brokerPort=" + brokerPort +
", bossGroupThreadNum=" + bossGroupThreadNum +
", workerGroupThreadNum=" + workerGroupThreadNum +
", socketBacklogSize=" + socketBacklogSize +
", enableRocketMQStore=" + enableRocketMQStore +
", rmqNamesrvAddr='" + rmqNamesrvAddr + '\'' +
", rmqProductGroup='" + rmqProductGroup + '\'' +
", rmqConsumerGroup='" + rmqConsumerGroup + '\'' +
", rmqConsumerPullNums='" + rmqConsumerPullNums + '\'' +
", rmqAccessKey='" + rmqAccessKey + '\'' +
", rmqSecretKey='" + rmqSecretKey + '\'' +
'}';
}
}
Loading

0 comments on commit 3aa8ee5

Please sign in to comment.