-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Exception when creating a ManagedChannel after being shut down #10407
Comments
This is a warning message not an throwing exception, so if there are unexpected behaviour it might be caused from somewhere else. Are you using version < 1.36.0? For versions >= 1.36.0 grpc would throw exception and the channel will be crashed if you own load balancing policy does not respect the threading model grpc enforces. |
Yes, I have re customized LoadBalancer and NameResolver. Through NameResolver and registry implementation services, I found that I directly shut down the ManagedChannel and then re created a new ManagedChannel pair, which caused this exception to occur. Additionally, when sending the grpc request again, the request remained blocked How can I make the ManagedChannel I have recreated effective? Thank you very much! |
Does this still happen in the current version of gRPC? |
1.33.1 <dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.re2j</groupId>
<artifactId>re2j</artifactId>
</exclusion>
</exclusions>
</dependency> |
From your previous comment I understand that you are using 1.33.1. Would you please try with 1.57.2 and see if the issue remains? |
// i want change loadbalance by the function
@PostMapping("/loadbalance/strategy")
public String grpcClientLoadBalance(String loadBalance, String grpcServerName) {
ManagedChannel grpcClient = ManagedChannelManager.getGrpcClient(grpcServerName);
try {
if (Objects.nonNull(grpcClient)) {
//removeBean("grpcDiscoveryClientResolverFactory");
//addBean("grpcDiscoveryClientResolverFactory", DiscoveryClientResolverProvider.class);
ManagedChannelManager.removeClient(grpcServerName);
ManagedChannelManager.initGrpcClient(grpcServerName, loadBalance);
}
return "修改成功";
} catch (Exception e) {
e.printStackTrace();
return "修改失败";
}
}
public class ManagedChannelManager {
// key为ServiceName
public static final Map<String, ManagedChannel> CLIENT_CACHE = Maps.newConcurrentMap();
static {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
for (Map.Entry<String, ManagedChannel> entry : CLIENT_CACHE.entrySet()) {
ManagedChannel grpcClient = entry.getValue();
grpcClient.shutdown();
}
CLIENT_CACHE.clear();
}));
}
private ManagedChannelManager() {
}
/**
* 创建channel.
*
* @param contextPath contextPath grpcServerName
* @param loadBalance {@link LoadBalancerStrategy#getStrategy()}
*/
public static void initGrpcClient(final String contextPath, String loadBalance) {
CLIENT_CACHE.computeIfAbsent(contextPath, s -> GrpcClientBuilder.buildClientChannel(contextPath, loadBalance));
}
/**
* 获取客户端channel.
*
* @param contextPath contextPath
* @return GrpcClient GrpcClient
*/
public static ManagedChannel getGrpcClient(final String contextPath) {
ManagedChannel managedChannel = CLIENT_CACHE.get(contextPath);
// 获取时如果channel连接为空时,将创建客户端channel
if (managedChannel == null){
CLIENT_CACHE.computeIfAbsent(contextPath, s -> GrpcClientBuilder.buildClientChannel(contextPath, LoadBalancerStrategy.ROUND_ROBIN.getStrategy()));
}
return CLIENT_CACHE.get(contextPath);
}
/**
* 删除客户端channel.
*
* @param contextPath contextPath
*/
public static void removeClient(final String contextPath) {
ManagedChannel grpcClient = CLIENT_CACHE.remove(contextPath);
// 关闭grpc连接ManagedChannel
if (Objects.nonNull(grpcClient)) {
grpcClient.shutdownNow();
}
}
}
@Component
public final class GrpcClientBuilder {
private static final String DISCOVERY_SCHEMA = "discovery:///";
static {
// 将自定义的负载策略注入到grpc的负载注册器中供后面grpc负载调用时查找对应的负载提供器
LoadBalancerRegistry.getDefaultRegistry().register(new RandomLoadBalancerProvider());
LoadBalancerRegistry.getDefaultRegistry().register(new RoundRobinLoadBalancerProvider());
// 通过NameResolverRegistry方式注册NameResolver
//NameResolverRegistry.getDefaultRegistry().register(new DiscoveryClientResolverProvider(client));
}
private GrpcClientBuilder() {
}
/**
* Build the client channel.
*
* @return ManagedChannel
*/
public static ManagedChannel buildClientChannel(String contextPath, String loadBalance) {
if (StringUtil.isNullOrEmpty(loadBalance)) {
loadBalance = LoadBalancerStrategy.RANDOM.getStrategy();
} else {
// 策略方式不是自定义中某一种
List<String> strategyNames = Arrays.stream(LoadBalancerStrategy.values()).map(item -> item.getStrategy()).collect(Collectors.toList());
// 全部转为小写方式比较策略名称
if (!strategyNames.contains(loadBalance.toLowerCase(Locale.ROOT))) {
loadBalance = LoadBalancerStrategy.RANDOM.getStrategy();
}
}
if (!contextPath.contains(DISCOVERY_SCHEMA)) {
contextPath = "discovery:///" + contextPath;
}
ManagedChannelBuilder<?> builder = ManagedChannelBuilder
// build channel to server with server's address "discovery:///serverName"
.forTarget(contextPath)
// 设置拦截器
.intercept(new ContextClientInterceptor())
// 设置默认的负载规则
.defaultLoadBalancingPolicy(loadBalance)
// 不会再去尝试升级http1
.usePlaintext()
// 消息传输大小限制
.maxInboundMessageSize(100 * 1024 * 1024)
// 关闭重试
.disableRetry();
ManagedChannel channel = builder.build();
channel.getState(true);
return channel;
}
} public abstract class AbstractLoadBalancer extends LoadBalancer {
private static final Logger LOG = LoggerFactory.getLogger(AbstractLoadBalancer.class);
private static final Status EMPTY_OK = Status.OK.withDescription("no subchannels ready");
// 提供LoadBalancer实现的基本要素,用于进行SubChannel的创建、更新辅助
private final Helper helper;
private final AtomicReference<String> serviceName = new AtomicReference<>();
// 缓存创建完成的SubChannel
public static final Map<EquivalentAddressGroup, Subchannel> subchannels = new ConcurrentHashMap<>();
// grpc连接状态
private ConnectivityState currentState;
private AbstractPicker currentPicker = new EmptyPicker(EMPTY_OK);
protected AbstractLoadBalancer(final Helper helper) {
this.helper = checkNotNull(helper, "helper");
}
// 设置当前对应负载的服务名,便于日志打印
private String getServiceName() {
return serviceName.get();
}
//
private void setAttribute(final Attributes attributes) {
this.serviceName.compareAndSet(null, attributes.get(GrpcAttributeUtils.APP_NAME));
}
// grpc直接使用时直接使用handler中的ip:port创建grpc链接并存储Channel,strippedAddressGroup-->ip:port
// 为服务列表中的每个服务建立一个subchannel
@Override
public void handleResolvedAddresses(final ResolvedAddresses resolvedAddresses) {
// 设置当前进行解析的服务名
setAttribute(resolvedAddresses.getAttributes());
// 从本地内存中获取
Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet();
// key对象中只封装了getAddresses()方法获取的值
Map<EquivalentAddressGroup, EquivalentAddressGroup> latestAddrs = stripAttrs(resolvedAddresses.getAddresses());
// 获取将本地缓存中的多余数据
Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs.keySet());
// 处理nameResolver传递过来的数据
for (Map.Entry<EquivalentAddressGroup, EquivalentAddressGroup> latestEntry : latestAddrs.entrySet()) {
EquivalentAddressGroup strippedAddressGroup = latestEntry.getKey();
EquivalentAddressGroup originalAddressGroup = latestEntry.getValue();
Subchannel subchannel;
// 获取本地已创建好连接的SubChannel
Subchannel existingSubchannel = subchannels.get(strippedAddressGroup);
if (Objects.nonNull(existingSubchannel)) {
subchannel = existingSubchannel;
// 更新当前channel中关联的attribute属性
SubChannels.updateAttributes(existingSubchannel, originalAddressGroup.getAttributes());
} else {
// 重新创建
subchannel = SubChannels.createSubChannel(helper, strippedAddressGroup, originalAddressGroup.getAttributes());
//SubchannelStateListener是Subchannel 的状态监听器,当 Subchannel 状态发生变化时进行处理
subchannel.start(state -> processSubchannelState(subchannel, state));
subchannels.put(strippedAddressGroup, subchannel);
}
// 建立连接
subchannel.requestConnection();
}
List<Subchannel> removedSubchannels = new ArrayList<>();
for (EquivalentAddressGroup addressGroup : removedAddrs) {
removedSubchannels.add(subchannels.remove(addressGroup));
}
updateBalancingState();
// 关闭本地缓存中被移除的subChannel
for (Subchannel removedSubchannel : removedSubchannels) {
shutdownSubchannel(removedSubchannel);
}
}
private void processSubchannelState(final Subchannel subchannel, final ConnectivityStateInfo stateInfo) {
if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) {
return;
}
if (stateInfo.getState() == IDLE) {
subchannel.requestConnection();
LOG.info("AbstractLoadBalancer.handleSubchannelState, current state:IDLE, subchannel.requestConnection().");
}
final ConnectivityStateInfo originStateInfo = SubChannels.getStateInfo(subchannel);
if (originStateInfo.getState().equals(TRANSIENT_FAILURE) && (stateInfo.getState().equals(CONNECTING) || stateInfo.getState().equals(IDLE))) {
return;
}
SubChannels.setStateInfo(subchannel, stateInfo);
updateBalancingState();
}
private Map<EquivalentAddressGroup, EquivalentAddressGroup> stripAttrs(final List<EquivalentAddressGroup> groupList) {
Map<EquivalentAddressGroup, EquivalentAddressGroup> addrs = new HashMap<>(groupList.size() * 2);
for (EquivalentAddressGroup group : groupList) {
addrs.put(stripAttrs(group), group);
}
return addrs;
}
private static EquivalentAddressGroup stripAttrs(final EquivalentAddressGroup eag) {
return new EquivalentAddressGroup(eag.getAddresses());
}
private <T> Set<T> setsDifference(final Set<T> a, final Set<T> b) {
Set<T> aCopy = new HashSet<>(a);
aCopy.removeAll(b);
return aCopy;
}
@Override
public void shutdown() {
for (Subchannel subchannel : subchannels.values()) {
shutdownSubchannel(subchannel);
}
}
private void shutdownSubchannel(final Subchannel subchannel) {
subchannel.shutdown();
SubChannels.setStateInfo(subchannel, ConnectivityStateInfo.forNonError(SHUTDOWN));
}
@Override
public void handleNameResolutionError(final Status error) {
updateBalancingState(TRANSIENT_FAILURE,
currentPicker instanceof AbstractReadyPicker ? currentPicker : new EmptyPicker(error));
}
/**
* Updates picker with the list of active subchannels (state == READY).
*/
private void updateBalancingState() {
final List<Subchannel> activeList = subchannels.values()
.stream()
.filter(r -> SubChannels.getStateInfo(r).getState() == READY)
.collect(Collectors.toList());
if (activeList.isEmpty()) {
// No READY subchannels
boolean isConnecting = false;
Status aggStatus = EMPTY_OK;
for (Subchannel subchannel : getSubchannels()) {
ConnectivityStateInfo stateInfo = SubChannels.getStateInfo(subchannel);
if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) {
isConnecting = true;
}
if (aggStatus == EMPTY_OK || !aggStatus.isOk()) {
aggStatus = stateInfo.getStatus();
}
}
// 针对subChannel状态为CONNECTING或TRANSIENT_FAILURE不使用负载策略
updateBalancingState(isConnecting ? CONNECTING : TRANSIENT_FAILURE, new EmptyPicker(aggStatus));
} else {
updateBalancingState(READY, newPicker(new ArrayList<>(subchannels.values())));
}
}
private void updateBalancingState(final ConnectivityState state, final AbstractPicker picker) {
if (state == currentState && picker.isEquivalentTo(currentPicker)) {
return;
}
helper.updateBalancingState(state, picker);
currentState = state;
currentPicker = picker;
LOG.info("AbstractPicker update, serviceName:{}, all subchannels:{}, state:{}", serviceName, picker.getSubchannelsInfo(), state);
}
private Collection<Subchannel> getSubchannels() {
return subchannels.values();
}
/**
* Create new picker.
*
* @param list all subchannels
* @return ReadyPicker
*/
protected abstract AbstractReadyPicker newPicker(List<Subchannel> list);
} |
This is for a very old version of gRPC. It needs to be reproduced in a newer version before it is worth us looking more into it. If you reproduce it on a newer version add a comment, and we'll reopen. |
I want to redefine the load rules by redefining the defaultLoadBalancingPolicy property of ManagedChannel. Therefore, by defining the original ManagedChannel. shutdown() and then using ManagedChannelBuilder to define a new ManagedChannel to achieve dynamic load balancing policy changes
result: GRPC client request is blocking,so i can not get grpc server response
The text was updated successfully, but these errors were encountered: