Skip to content

Commit

Permalink
Add Netty native transport support on MacOS. (moquette-io#806)
Browse files Browse the repository at this point in the history
  • Loading branch information
jingyu authored Mar 22, 2024
1 parent 23c7b39 commit 6ffb106
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 13 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[feature] Add Netty native trsansport support on MacOS. Bundle all the native transport module by default (#806)
[feature] message expiry interval: (issue #818)
- Implements the management of message expiry for retained part. (#819)
[feature] subscription option handling: (issue #808)
Expand Down
60 changes: 53 additions & 7 deletions broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,66 @@
<version>${netty.version}</version>
</dependency>


<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
<classifier>linux-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<version>${netty.version}</version>
<classifier>osx-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<version>${netty.version}</version>
<classifier>osx-x86_64</classifier>
</dependency>


<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty.tcnative.version}</version>
<classifier>linux-x86_64</classifier>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty.tcnative.version}</version>
<classifier>linux-aarch_64</classifier>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty.tcnative.version}</version>
<classifier>osx-x86_64</classifier>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty.tcnative.version}</version>
<classifier>osx-aarch_64</classifier>
<scope>runtime</scope>
</dependency>


<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2-mvstore</artifactId>
Expand Down Expand Up @@ -169,13 +222,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty.tcnative.version}</version>
<classifier>linux-x86_64</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
Expand Down
2 changes: 1 addition & 1 deletion broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public final class BrokerConstants {
public static final String NETTY_TCP_NODELAY_PROPERTY_NAME = "netty.tcp_nodelay";
public static final String NETTY_SO_KEEPALIVE_PROPERTY_NAME = "netty.so_keepalive";
public static final String NETTY_CHANNEL_TIMEOUT_SECONDS_PROPERTY_NAME = "netty.channel_timeout.seconds";
public static final String NETTY_EPOLL_PROPERTY_NAME = "netty.epoll";
public static final String NETTY_NATIVE_PROPERTY_NAME = "netty.native";
@Deprecated
public static final String NETTY_MAX_BYTES_PROPERTY_NAME = IConfig.NETTY_MAX_BYTES_PROPERTY_NAME;
@Deprecated
Expand Down
25 changes: 23 additions & 2 deletions broker/src/main/java/io/moquette/broker/NewNettyAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
Expand Down Expand Up @@ -120,6 +124,9 @@ public void operationComplete(ChannelFuture future) throws Exception {

private static final Logger LOG = LoggerFactory.getLogger(NewNettyAcceptor.class);

private static final String EPOLL_TRANSPORT = "io.netty.channel.epoll.Epoll";
private static final String KQUEUE_TRANSPORT = "io.netty.channel.kqueue.KQueue";

private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private final Map<String, Integer> ports = new HashMap<>();
Expand Down Expand Up @@ -148,12 +155,17 @@ public void initialize(NewNettyMQTTHandler mqttHandler, IConfig props, ISslConte
maxBytesInMessage = props.intProp(BrokerConstants.NETTY_MAX_BYTES_PROPERTY_NAME,
BrokerConstants.DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE);

boolean epoll = props.boolProp(BrokerConstants.NETTY_EPOLL_PROPERTY_NAME, false);
if (epoll) {
boolean nativeTransport = props.boolProp(BrokerConstants.NETTY_NATIVE_PROPERTY_NAME, false);
if (nativeTransport && classAvaliable(EPOLL_TRANSPORT) && Epoll.isAvailable()) {
LOG.info("Netty is using Epoll");
bossGroup = new EpollEventLoopGroup();
workerGroup = new EpollEventLoopGroup();
channelClass = EpollServerSocketChannel.class;
} else if (nativeTransport && classAvaliable(KQUEUE_TRANSPORT) && KQueue.isAvailable()) {
LOG.info("Netty is using KQueue");
bossGroup = new KQueueEventLoopGroup();
workerGroup = new KQueueEventLoopGroup();
channelClass = KQueueServerSocketChannel.class;
} else {
LOG.info("Netty is using NIO");
bossGroup = new NioEventLoopGroup();
Expand Down Expand Up @@ -191,6 +203,15 @@ public void initialize(NewNettyMQTTHandler mqttHandler, IConfig props, ISslConte
}
}

private boolean classAvaliable(String clazz) {
try {
Class.forName(clazz, false, getClass().getClassLoader());
return true;
} catch (Exception e) {
return false;
}
}

private boolean securityPortsConfigured(IConfig props) {
String sslTcpPortProp = props.getProperty(BrokerConstants.SSL_PORT_PROPERTY_NAME);
String wssPortProp = props.getProperty(BrokerConstants.WSS_PORT_PROPERTY_NAME);
Expand Down
6 changes: 3 additions & 3 deletions distribution/src/main/resources/moquette.conf
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ password_file config/password_file.conf
#*********************************************************************
# Netty Configuration
#
# netty.epoll: Linux systems can use epoll instead of nio. To get a performance
# gain and reduced GC.
# netty.native: Linux systems can use epoll, while MacOS can use kqueue
# instead of nio. To get a performance gain and reduced GC.
# http://netty.io/wiki/native-transports.html for more information
# netty.mqtt.message_size : by default the max size of message is set at 8092 bytes
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180836
# Fore more information about payload size specs.
#*********************************************************************
# netty.epoll true
# netty.native true
# netty.mqtt.message_size 8092

#*********************************************************************
Expand Down

0 comments on commit 6ffb106

Please sign in to comment.