Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connection event notification of grpc server #3611

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void test() throws Exception {
public void run() {
long start = System.currentTimeMillis();
Random random = new Random();
int times = 1;
int times = 100;
while (times > 0) {
try {
boolean success = configService.publishConfig(dataId + random.nextInt(20), group,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,12 @@
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientStreamTracer;
import io.grpc.Context;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.NameResolver;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.net.URI;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -153,8 +141,8 @@ public void sendBeat() {
switchServerAsync();
}

private GrpcMetadata buildMeta() {
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
private GrpcMetadata buildMeta(String connectionIdInner) {
GrpcMetadata meta = GrpcMetadata.newBuilder().setClientIp(NetUtils.localIP())
.setVersion(VersionUtils.getFullClientVersion()).putAllLabels(labels).build();
return meta;
}
Expand All @@ -171,7 +159,7 @@ private boolean serverCheck(RequestGrpc.RequestFutureStub requestBlockingStub) {
return false;
}
ServerCheckRequest serverCheckRequest = new ServerCheckRequest();
GrpcRequest grpcRequest = GrpcUtils.convertToGrpcRequest(serverCheckRequest, buildMeta());
GrpcRequest grpcRequest = GrpcUtils.convertToGrpcRequest(serverCheckRequest, buildMeta(""));
ListenableFuture<GrpcResponse> responseFuture = requestBlockingStub.request(grpcRequest);
GrpcResponse response = responseFuture.get();
return response != null;
Expand All @@ -186,8 +174,7 @@ private boolean serverCheck(RequestGrpc.RequestFutureStub requestBlockingStub) {
* @param streamStub streamStub to bind.
*/
private void bindRequestStream(RequestStreamGrpc.RequestStreamStub streamStub) {

GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).build();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta("")).build();
LOGGER.info("GrpcClient send stream request grpc server,streamRequest:{}", streamRequest);
streamStub.requestStream(streamRequest, new StreamObserver<GrpcResponse>() {
@Override
Expand Down Expand Up @@ -250,11 +237,11 @@ public Connection connectToServer(ServerInfo serverInfo) {
serverInfo.getServerPort());
if (newChannelStubTemp != null) {

GrpcConnection grpcConn = new GrpcConnection(connectionId, serverInfo);
LOGGER.info("success to create a connection to a server.");
RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc
.newStub(newChannelStubTemp.getChannel());
bindRequestStream(requestStreamStubTemp);
GrpcConnection grpcConn = new GrpcConnection("", serverInfo);

//switch current channel and stub
RequestGrpc.RequestFutureStub grpcFutureServiceStubTemp = RequestGrpc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public GrpcConnection(ConnectionMetaInfo metaInfo, StreamObserver streamObserver

@Override
public boolean heartBeatExpire() {
return true;
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.core.remote.RequestHandlerRegistry;
import com.alibaba.nacos.core.utils.Loggers;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Expand All @@ -54,7 +55,6 @@ public class GrpcRequestHandlerReactor extends RequestGrpc.RequestImplBase {

@Override
public void request(GrpcRequest grpcRequest, StreamObserver<GrpcResponse> responseObserver) {

Loggers.RPC_DIGEST.debug(" gRpc Server receive request :" + grpcRequest);
String type = grpcRequest.getType();
if (RequestTypeConstants.SERVER_CHECK.equals(type)) {
Expand Down
101 changes: 82 additions & 19 deletions core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

package com.alibaba.nacos.core.remote.grpc;

import com.alibaba.nacos.api.grpc.GrpcMetadata;
import com.alibaba.nacos.api.grpc.GrpcRequest;
import com.alibaba.nacos.api.grpc.GrpcResponse;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.RequestHandlerRegistry;
import com.alibaba.nacos.core.remote.RpcServer;
import com.alibaba.nacos.core.utils.ApplicationUtils;
Expand All @@ -25,14 +29,18 @@
import io.grpc.Contexts;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerTransportFilter;
import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerStreamHelper;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.stub.ServerCalls;
import io.grpc.util.MutableHandlerRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

Expand All @@ -58,6 +66,9 @@ public class GrpcServer extends RpcServer {
@Autowired
private GrpcRequestHandlerReactor requestHander;

@Autowired
private ConnectionManager connectionManager;

@Autowired
private RequestHandlerRegistry requestHandlerRegistry;

Expand All @@ -74,24 +85,86 @@ public ConnectionType getConnectionType() {
@Override
public void startServer() throws Exception {
init();
server = ServerBuilder.forPort(grpcServerPort).addService(streamRequestHander).addService(requestHander)
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
//handlerRegistry.addService(streamRequestHander);
//handlerRegistry.addService(requestHander);

// server intercetpor to set connection id.
ServerInterceptor serverInterceptor = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
final Context ctx = Context.current()
.withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID));
return Contexts.interceptCall(ctx, call, headers, next);
}
};

addServices(handlerRegistry, serverInterceptor);
server = ServerBuilder.forPort(grpcServerPort).fallbackHandlerRegistry(handlerRegistry)
.addTransportFilter(new ServerTransportFilter() {
@Override
public Attributes transportReady(Attributes transportAttrs) {
System.out.println("transportReady:" + transportAttrs);
Attributes test = transportAttrs.toBuilder().set(key, UUID.randomUUID().toString()).build();
Attributes test = transportAttrs.toBuilder()
.set(TRANS_KEY_CONN_ID, UUID.randomUUID().toString()).build();
return test;
}

@Override
public void transportTerminated(Attributes transportAttrs) {
System.out.println("transportTerminated:" + transportAttrs);
String connectionid = transportAttrs.get(TRANS_KEY_CONN_ID);
connectionManager.unregister(connectionid);
super.transportTerminated(transportAttrs);
}
}).intercept(new ConnetionIntereptor()).build();
}).build();
server.start();
}

private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor serverInterceptor) {

// unary call register.
final MethodDescriptor<GrpcRequest, GrpcResponse> unaryMethod = MethodDescriptor.<GrpcRequest, GrpcResponse>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(MethodDescriptor.generateFullMethodName("Request", "request"))
.setRequestMarshaller(ProtoUtils.marshaller(GrpcRequest.newBuilder().build()))
.setResponseMarshaller(ProtoUtils.marshaller(GrpcResponse.getDefaultInstance())).build();

final ServerCallHandler<GrpcRequest, GrpcResponse> handler = ServerCalls
.asyncUnaryCall((request, responseObserver) -> {
GrpcMetadata grpcMetadata = request.getMetadata().toBuilder()
.setConnectionId(CONTEXT_KEY_CONN_ID.get()).build();
GrpcRequest requestNew = request.toBuilder().setMetadata(grpcMetadata).build();
requestHander.request(requestNew, responseObserver);
});

final ServerServiceDefinition serviceDefOfUnary = ServerServiceDefinition.builder("Request")
.addMethod(unaryMethod, handler).build();
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnary, serverInterceptor));

// server stream register.
final ServerCallHandler<GrpcRequest, GrpcResponse> streamHandler = ServerCalls
.asyncServerStreamingCall((request, responseObserver) -> {
GrpcMetadata grpcMetadata = request.getMetadata().toBuilder()
.setConnectionId(CONTEXT_KEY_CONN_ID.get()).build();
GrpcRequest requestNew = request.toBuilder().setMetadata(grpcMetadata).build();
streamRequestHander.requestStream(requestNew, responseObserver);
});

final MethodDescriptor<GrpcRequest, GrpcResponse> serverStreamMethod = MethodDescriptor.<GrpcRequest, GrpcResponse>newBuilder()
.setType(MethodDescriptor.MethodType.SERVER_STREAMING)
.setFullMethodName(MethodDescriptor.generateFullMethodName("RequestStream", "requestStream"))
.setRequestMarshaller(ProtoUtils.marshaller(GrpcRequest.newBuilder().build()))
.setResponseMarshaller(ProtoUtils.marshaller(GrpcResponse.getDefaultInstance())).build();

final ServerServiceDefinition serviceDefOfServerStream = ServerServiceDefinition.builder("RequestStream")
.addMethod(serverStreamMethod, streamHandler).build();

handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfServerStream, serverInterceptor));

}

@Override
public int rpcPortOffset() {
return PORT_OFFSET;
Expand All @@ -104,18 +177,8 @@ public void shundownServer() {
}
}

static final Attributes.Key key = Attributes.Key.create("conn_id");
static final Attributes.Key<String> TRANS_KEY_CONN_ID = Attributes.Key.create("conn_id");

static final Context.Key<String> CONTEXT_KEY_CONN_ID = Context.key("conn_id");

static class ConnetionIntereptor implements ServerInterceptor {

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
Context ctx = Context.current();
// System.out.println(build);
System.out.println(call.getAttributes().get(key).toString());
return Contexts.interceptCall(Context.current(), call, headers, next);

}
}
}