Skip to content

Commit

Permalink
close channel when receive go away twice (#8862)
Browse files Browse the repository at this point in the history
close channel when receive go away twice (#8862)
  • Loading branch information
qianye1001 authored Nov 14, 2024
1 parent 2bbc852 commit 66ba456
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.NetworkUtil;
Expand All @@ -36,15 +45,6 @@
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;

public class RemotingHelper {
public static final String DEFAULT_CHARSET = "UTF-8";
public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0";
Expand Down Expand Up @@ -355,6 +355,18 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
}

public static CompletableFuture<Void> convertChannelFutureToCompletableFuture(ChannelFuture channelFuture) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
channelFuture.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
completableFuture.complete(null);
} else {
completableFuture.completeExceptionally(new RemotingConnectException(channelFuture.channel().remoteAddress().toString(), future.cause()));
}
});
return completableFuture;
}

public static String getRequestCodeDesc(int code) {
return REQUEST_CODE_MAP.getOrDefault(code, String.valueOf(code));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public class NettyClientConfig {

private boolean enableReconnectForGoAway = true;

private boolean enableTransparentRetry = true;

public boolean isClientCloseSocketIfTimeout() {
return clientCloseSocketIfTimeout;
}
Expand Down Expand Up @@ -205,14 +203,6 @@ public void setEnableReconnectForGoAway(boolean enableReconnectForGoAway) {
this.enableReconnectForGoAway = enableReconnectForGoAway;
}

public boolean isEnableTransparentRetry() {
return enableTransparentRetry;
}

public void setEnableTransparentRetry(boolean enableTransparentRetry) {
this.enableTransparentRetry = enableTransparentRetry;
}

public String getSocksProxyConfig() {
return socksProxyConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void processRequestCommand(final ChannelHandlerContext ctx, final Remotin
Runnable run = buildProcessRequestHandler(ctx, cmd, pair, opaque);

if (isShuttingDown.get()) {
if (cmd.getVersion() > MQVersion.Version.V5_1_4.ordinal()) {
if (cmd.getVersion() > MQVersion.Version.V5_3_1.ordinal()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.GO_AWAY,
"please go away");
response.setOpaque(opaque);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.FutureUtils;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
Expand All @@ -88,6 +89,8 @@
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.proxy.SocksProxyConfig;

import static org.apache.rocketmq.remoting.common.RemotingHelper.convertChannelFutureToCompletableFuture;

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);

Expand Down Expand Up @@ -554,7 +557,7 @@ public RemotingCommand invokeSync(String addr, final RemotingCommand request, lo
updateChannelLastResponseTime(addr);
return response;
} catch (RemotingSendRequestException e) {
LOGGER.warn("invokeSync: send request exception, so close the channel[{}]", channelRemoteAddr);
LOGGER.warn("invokeSync: send request exception, so close the channel[addr={}, id={}]", channelRemoteAddr, channel.id());
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
Expand Down Expand Up @@ -832,45 +835,27 @@ public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final
return channelWrapper0;
});
if (channelWrapper != null && !channelWrapper.isWrapperOf(channel)) {
if (nettyClientConfig.isEnableTransparentRetry()) {
RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader());
retryRequest.setBody(request.getBody());
retryRequest.setExtFields(request.getExtFields());
if (channelWrapper.isOK()) {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
Channel retryChannel = channelWrapper.getChannel();
if (retryChannel != null && channel != retryChannel) {
return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration);
}
} else {
CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
ChannelFuture channelFuture = channelWrapper.getChannelFuture();
channelFuture.addListener(f -> {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
if (f.isSuccess()) {
Channel retryChannel0 = channelFuture.channel();
if (retryChannel0 != null && channel != retryChannel0) {
super.invokeImpl(retryChannel0, retryRequest, timeoutMillis - duration).whenComplete((v, t) -> {
if (t != null) {
future.completeExceptionally(t);
} else {
future.complete(v);
}
});
}
} else {
future.completeExceptionally(new RemotingConnectException(channelWrapper.channelAddress));
RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader());
retryRequest.setBody(request.getBody());
retryRequest.setExtFields(request.getExtFields());
CompletableFuture<Void> future = convertChannelFutureToCompletableFuture(channelWrapper.getChannelFuture());
return future.thenCompose(v -> {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
return super.invokeImpl(channelWrapper.getChannel(), retryRequest, timeoutMillis - duration)
.thenCompose(r -> {
if (r.getResponseCommand().getCode() == ResponseCode.GO_AWAY) {
return FutureUtils.completeExceptionally(new RemotingSendRequestException(channelRemoteAddr,
new Throwable("Receive GO_AWAY twice in request from channelId=" + channel.id())));
}
return CompletableFuture.completedFuture(r);
});
return future;
}
}
});
} else {
LOGGER.warn("invokeImpl receive GO_AWAY, channelWrapper is null or channel is the same in wrapper, channelId={}", channel.id());
}
}
return FutureUtils.completeExceptionally(new RemotingSendRequestException(channelRemoteAddr, new Throwable("Receive GO_AWAY from channelId=" + channel.id())));
}
return CompletableFuture.completedFuture(responseFuture);
}).whenComplete((v, t) -> {
Expand Down

0 comments on commit 66ba456

Please sign in to comment.