Skip to content

Commit

Permalink
[ISSUE #7330] Add goaway and reconnection mechanism (#7331)
Browse files Browse the repository at this point in the history
* Add shutdown wait for NettyRemotingServer

* Add goaway and reconnection mechanism

* Add client version check

* Add enableTransparentRetry for NettyClientConfig

* Add enableReconnectForGoAway for NettyClientConfig

* fix unit test

* fix client version check
  • Loading branch information
drpmma authored Oct 10, 2023
1 parent 38d3d5d commit 4acb43e
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public class NettyClientConfig {
private boolean disableCallbackExecutor = false;
private boolean disableNettyWorkerGroup = false;

private long maxReconnectIntervalTimeSeconds = 60;

private boolean enableReconnectForGoAway = true;

private boolean enableTransparentRetry = true;

public boolean isClientCloseSocketIfTimeout() {
return clientCloseSocketIfTimeout;
}
Expand Down Expand Up @@ -181,6 +187,30 @@ public void setDisableNettyWorkerGroup(boolean disableNettyWorkerGroup) {
this.disableNettyWorkerGroup = disableNettyWorkerGroup;
}

public long getMaxReconnectIntervalTimeSeconds() {
return maxReconnectIntervalTimeSeconds;
}

public void setMaxReconnectIntervalTimeSeconds(long maxReconnectIntervalTimeSeconds) {
this.maxReconnectIntervalTimeSeconds = maxReconnectIntervalTimeSeconds;
}

public boolean isEnableReconnectForGoAway() {
return enableReconnectForGoAway;
}

public void setEnableReconnectForGoAway(boolean enableReconnectForGoAway) {
this.enableReconnectForGoAway = enableReconnectForGoAway;
}

public boolean isEnableTransparentRetry() {
return enableTransparentRetry;
}

public void setEnableTransparentRetry(boolean enableTransparentRetry) {
this.enableTransparentRetry = enableTransparentRetry;
}

public String getSocksProxyConfig() {
return socksProxyConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
Expand All @@ -60,6 +62,7 @@
import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;

import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_IS_LONG_POLLING;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
Expand Down Expand Up @@ -120,6 +123,8 @@ public abstract class NettyRemotingAbstract {
*/
protected List<RPCHook> rpcHooks = new ArrayList<>();

protected AtomicBoolean isShuttingDown = new AtomicBoolean(false);

static {
NettyLogger.initNettyLogger();
}
Expand Down Expand Up @@ -264,6 +269,16 @@ public void processRequestCommand(final ChannelHandlerContext ctx, final Remotin

Runnable run = buildProcessRequestHandler(ctx, cmd, pair, opaque);

if (isShuttingDown.get()) {
if (cmd.getVersion() > MQVersion.Version.V5_1_4.ordinal()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.GO_AWAY,
"please go away");
response.setOpaque(opaque);
writeResponse(ctx.channel(), cmd, response);
return;
}
}

if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.google.common.base.Stopwatch;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -48,6 +49,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -57,6 +59,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
Expand All @@ -66,6 +69,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
Expand All @@ -82,6 +86,7 @@
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.proxy.SocksProxyConfig;

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
Expand All @@ -97,6 +102,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private final Map<String /* cidr */, SocksProxyConfig /* proxy */> proxyMap = new HashMap<>();
private final ConcurrentHashMap<String /* cidr */, Bootstrap> bootstrapMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<>();
private final ConcurrentMap<Channel, ChannelWrapper> channelWrapperTables = new ConcurrentHashMap<>();

private final HashedWheelTimer timer = new HashedWheelTimer(r -> new Thread(r, "ClientHouseKeepingService"));

Expand Down Expand Up @@ -356,9 +362,10 @@ public void shutdown() {
this.timer.stop();

for (String addr : this.channelTables.keySet()) {
this.closeChannel(addr, this.channelTables.get(addr).getChannel());
this.channelTables.get(addr).close();
}

this.channelWrapperTables.clear();
this.channelTables.clear();

this.eventLoopGroupWorker.shutdownGracefully();
Expand Down Expand Up @@ -416,7 +423,10 @@ public void closeChannel(final String addr, final Channel channel) {
}

if (removeItemFromTable) {
this.channelTables.remove(addrRemote);
ChannelWrapper channelWrapper = this.channelWrapperTables.remove(channel);
if (channelWrapper != null && channelWrapper.tryClose(channel)) {
this.channelTables.remove(addrRemote);
}
LOGGER.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
}

Expand Down Expand Up @@ -463,7 +473,10 @@ public void closeChannel(final Channel channel) {
}

if (removeItemFromTable) {
this.channelTables.remove(addrRemote);
ChannelWrapper channelWrapper = this.channelWrapperTables.remove(channel);
if (channelWrapper != null && channelWrapper.tryClose(channel)) {
this.channelTables.remove(addrRemote);
}
LOGGER.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
RemotingHelper.closeChannel(channel);
}
Expand Down Expand Up @@ -511,7 +524,7 @@ public void updateNameServerAddressList(List<String> addrs) {
if (addr.contains(namesrvAddr)) {
ChannelWrapper channelWrapper = this.channelTables.get(addr);
if (channelWrapper != null) {
closeChannel(channelWrapper.getChannel());
channelWrapper.close();
}
}
}
Expand Down Expand Up @@ -689,8 +702,9 @@ private Channel createChannel(final String addr) throws InterruptedException {
ChannelFuture channelFuture = fetchBootstrap(addr)
.connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
cw = new ChannelWrapper(addr, channelFuture);
this.channelTables.put(addr, cw);
this.channelWrapperTables.put(channelFuture.channel(), cw);
}
} catch (Exception e) {
LOGGER.error("createChannel: create channel exception", e);
Expand Down Expand Up @@ -758,6 +772,64 @@ public void invokeOneway(String addr, RemotingCommand request, long timeoutMilli
}
}

@Override
public CompletableFuture<RemotingCommand> invoke(String addr, RemotingCommand request,
long timeoutMillis) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
return invokeImpl(channel, request, timeoutMillis).whenComplete((v, t) -> {
if (t == null) {
updateChannelLastResponseTime(addr);
}
}).thenApply(ResponseFuture::getResponseCommand);
} else {
this.closeChannel(addr, channel);
future.completeExceptionally(new RemotingConnectException(addr));
}
} catch (Throwable t) {
future.completeExceptionally(t);
}
return future;
}

@Override
public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis) {
Stopwatch stopwatch = Stopwatch.createStarted();
return super.invokeImpl(channel, request, timeoutMillis).thenCompose(responseFuture -> {
RemotingCommand response = responseFuture.getResponseCommand();
if (response.getCode() == ResponseCode.GO_AWAY) {
if (nettyClientConfig.isEnableReconnectForGoAway()) {
ChannelWrapper channelWrapper = channelWrapperTables.computeIfPresent(channel, (channel0, channelWrapper0) -> {
try {
if (channelWrapper0.reconnect()) {
LOGGER.info("Receive go away from channel {}, recreate the channel", channel0);
channelWrapperTables.put(channelWrapper0.getChannel(), channelWrapper0);
}
} catch (Throwable t) {
LOGGER.error("Channel {} reconnect error", channelWrapper0, t);
}
return channelWrapper0;
});
if (channelWrapper != null) {
if (nettyClientConfig.isEnableTransparentRetry()) {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader());
Channel retryChannel = channelWrapper.getChannel();
if (channel != retryChannel) {
return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration);
}
}
}
}
}
return CompletableFuture.completedFuture(responseFuture);
});
}

@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
Expand Down Expand Up @@ -877,30 +949,41 @@ public void run() {
}
}

static class ChannelWrapper {
private final ChannelFuture channelFuture;
class ChannelWrapper {
private final ReentrantReadWriteLock lock;
private ChannelFuture channelFuture;
// only affected by sync or async request, oneway is not included.
private ChannelFuture channelToClose;
private long lastResponseTime;
private volatile long lastReconnectTimestamp = 0L;
private final String channelAddress;

public ChannelWrapper(ChannelFuture channelFuture) {
public ChannelWrapper(String address, ChannelFuture channelFuture) {
this.lock = new ReentrantReadWriteLock();
this.channelFuture = channelFuture;
this.lastResponseTime = System.currentTimeMillis();
this.channelAddress = address;
}

public boolean isOK() {
return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
return getChannel() != null && getChannel().isActive();
}

public boolean isWritable() {
return this.channelFuture.channel().isWritable();
return getChannel().isWritable();
}

private Channel getChannel() {
return this.channelFuture.channel();
return getChannelFuture().channel();
}

public ChannelFuture getChannelFuture() {
return channelFuture;
lock.readLock().lock();
try {
return this.channelFuture;
} finally {
lock.readLock().unlock();
}
}

public long getLastResponseTime() {
Expand All @@ -910,6 +993,52 @@ public long getLastResponseTime() {
public void updateLastResponseTime() {
this.lastResponseTime = System.currentTimeMillis();
}

public boolean reconnect() {
if (lock.writeLock().tryLock()) {
try {
if (lastReconnectTimestamp == 0L || System.currentTimeMillis() - lastReconnectTimestamp > Duration.ofSeconds(nettyClientConfig.getMaxReconnectIntervalTimeSeconds()).toMillis()) {
channelToClose = channelFuture;
String[] hostAndPort = getHostAndPort(channelAddress);
channelFuture = fetchBootstrap(channelAddress)
.connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
lastReconnectTimestamp = System.currentTimeMillis();
return true;
}
} finally {
lock.writeLock().unlock();
}
}
return false;
}

public boolean tryClose(Channel channel) {
try {
lock.readLock().lock();
if (channelFuture != null) {
if (channelFuture.channel().equals(channel)) {
return true;
}
}
} finally {
lock.readLock().unlock();
}
return false;
}

public void close() {
try {
lock.writeLock().lock();
if (channelFuture != null) {
closeChannel(channelFuture.channel());
}
if (channelToClose != null) {
closeChannel(channelToClose.channel());
}
} finally {
lock.writeLock().unlock();
}
}
}

class InvokeCallbackWrapper implements InvokeCallback {
Expand Down
Loading

0 comments on commit 4acb43e

Please sign in to comment.