Skip to content

Commit

Permalink
Improve java doc for dubbo-remoting-netty4 (#4180)
Browse files Browse the repository at this point in the history
* add some comments for dubbo-remoting-netty4
* update from upstream/master
* adjust format
  • Loading branch information
wavesZh authored and htynkn committed Jun 1, 2019
1 parent 492f760 commit 8479906
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ public static boolean isProvider(URL url) {
PROVIDERS_CATEGORY.equals(url.getParameter(CATEGORY_KEY, PROVIDERS_CATEGORY));
}


/**
* Check if the given value matches the given pattern. The pattern supports wildcard "*".
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ public interface Constants {
String IDLE_TIMEOUT_KEY = "idle.timeout";

int DEFAULT_IDLE_TIMEOUT = 600 * 1000;

/**
* max size of channel. default value is zero that means unlimited.
*/
String ACCEPTS_KEY = "accepts";

int DEFAULT_ACCEPTS = 0;
int DEFAULT_ACCEPTS = 0;

String CONNECT_QUEUE_CAPACITY = "connect.queue.capacity";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
public class UrlUtils {
public static int getIdleTimeout(URL url) {
int heartBeat = getHeartbeat(url);
// idleTimeout should be at least more than twice heartBeat because possible retries of client.
int idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3);
if (idleTimeout < heartBeat * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,45 @@
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;

/**
* NettyChannel.
* NettyChannel maintains the cache of channel.
*/
final class NettyChannel extends AbstractChannel {

private static final Logger logger = LoggerFactory.getLogger(NettyChannel.class);

/**
* the cache for netty channel and dubbo channel
*/
private static final ConcurrentMap<Channel, NettyChannel> CHANNEL_MAP = new ConcurrentHashMap<Channel, NettyChannel>();

/**
* netty channel
*/
private final Channel channel;

private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();

/**
* The constructor of NettyChannel.
* It is private so NettyChannel usually create by {@link NettyChannel#getOrAddChannel(Channel, URL, ChannelHandler)}
*
* @param channel netty channel
* @param url
* @param handler dubbo handler that contain netty handler
*/
private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
super(url, handler);
if (channel == null) {
throw new IllegalArgumentException("netty channel == null;");
}
this.channel = channel;
}

/**
* Get dubbo channel by netty channel through channel cache.
* Put netty channel into it if dubbo channel don't exist in the cache.
*
* @param ch netty channel
* @param url
* @param handler dubbo handler that contain netty's handler
* @return
*/
static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
if (ch == null) {
return null;
Expand All @@ -71,7 +90,11 @@ static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler)
}
return ret;
}

/**
* Remove the inactive channel.
*
* @param ch netty channel
*/
static void removeChannelIfDisconnected(Channel ch) {
if (ch != null && !ch.isActive()) {
CHANNEL_MAP.remove(ch);
Expand All @@ -93,15 +116,24 @@ public boolean isConnected() {
return !isClosed() && channel.isActive();
}

/**
* Send message by netty and whether to wait the completion of the send.
*
* @param message message that need send.
* @param sent whether to ack async-sent
* @throws RemotingException throw RemotingException if wait until timeout or any exception thrown by method body that surrounded by try-catch.
*/
@Override
public void send(Object message, boolean sent) throws RemotingException {
// whether the channel is closed
super.send(message, sent);

boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
// wait timeout ms
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Expand All @@ -112,7 +144,6 @@ public void send(Object message, boolean sent) throws RemotingException {
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}

if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
Expand Down Expand Up @@ -158,7 +189,8 @@ public Object getAttribute(String key) {

@Override
public void setAttribute(String key, Object value) {
if (value == null) { // The null value unallowed in the ConcurrentHashMap.
// The null value is unallowed in the ConcurrentHashMap.
if (value == null) {
attributes.remove(key);
} else {
attributes.put(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
public class NettyClient extends AbstractClient {

private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

/**
* netty client bootstrap
*/
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));

private static final String SOCKS_PROXY_HOST = "socksProxyHost";
Expand All @@ -61,12 +63,28 @@ public class NettyClient extends AbstractClient {

private Bootstrap bootstrap;

private volatile Channel channel; // volatile, please copy reference to use

/**
* current channel. Each successful invocation of {@link NettyClient#doConnect()} will
* replace this with new channel and close old channel.
* <b>volatile, please copy reference to use.</b>
*/
private volatile Channel channel;

/**
* The constructor of NettyClient.
* It wil init and start netty.
*/
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, wrapChannelHandler(url, handler));
}

/**
* Init bootstrap
*
* @throws Throwable
*/
@Override
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
Expand Down Expand Up @@ -116,7 +134,8 @@ protected void doConnect() throws Throwable {
Channel newChannel = future.channel();
try {
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
// copy reference
Channel oldChannel = NettyClient.this.channel;
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
Expand Down Expand Up @@ -152,6 +171,7 @@ protected void doConnect() throws Throwable {
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
// just add new valid channel to NettyChannel's cache
if (!isConnected()) {
//future.cancel(true);
}
Expand All @@ -169,8 +189,9 @@ protected void doDisConnect() throws Throwable {

@Override
protected void doClose() throws Throwable {
//can't shutdown nioEventLoopGroup
//nioEventLoopGroup.shutdownGracefully();
// can't shutdown nioEventLoopGroup because the method will be invoked when closing one channel but not a client,
// but when and how to close the nioEventLoopGroup ?
// nioEventLoopGroup.shutdownGracefully();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// send heartbeat when read idle.
if (evt instanceof IdleStateEvent) {
try {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,25 +51,39 @@
import static org.apache.dubbo.common.constants.CommonConstants.IO_THREADS_KEY;

/**
* NettyServer
* NettyServer.
*/
public class NettyServer extends AbstractServer implements Server {

private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

private Map<String, Channel> channels; // <ip:port, channel>

/**
* the cache for alive worker channel.
* <ip:port, dubbo channel>
*/
private Map<String, Channel> channels;
/**
* netty server bootstrap.
*/
private ServerBootstrap bootstrap;

private io.netty.channel.Channel channel;
/**
* the boss channel that receive connections and dispatch these to worker channel.
*/
private io.netty.channel.Channel channel;

private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

/**
* Init and start netty server
*
* @throws Throwable
*/
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@
import java.util.concurrent.ConcurrentHashMap;

/**
* NettyServerHandler
* NettyServerHandler.
*/
@io.netty.channel.ChannelHandler.Sharable
public class NettyServerHandler extends ChannelDuplexHandler {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);

private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
/**
* the cache for alive worker channel.
* <ip:port, dubbo channel>
*/
private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>();

private final URL url;

Expand Down Expand Up @@ -108,6 +111,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// server will close channel when server don't receive any heartbeat from client util timeout.
if (evt instanceof IdleStateEvent) {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.dubbo.remoting.Server;
import org.apache.dubbo.remoting.Transporter;

/**
* Default extension of {@link Transporter} using netty4.x.
*/
public class NettyTransporter implements Transporter {

public static final String NAME = "netty";
Expand Down

0 comments on commit 8479906

Please sign in to comment.