diff --git a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java index f51aba578b9..c76dc322c63 100644 --- a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java @@ -154,7 +154,7 @@ public void test() throws Exception { public void run() { long start = System.currentTimeMillis(); Random random = new Random(); - int times = 1; + int times = 100; while (times > 0) { try { boolean success = configService.publishConfig(dataId + random.nextInt(20), group, diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java index 42156903463..925de10df00 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java @@ -35,24 +35,12 @@ import com.alibaba.nacos.common.remote.client.RpcClient; import com.alibaba.nacos.common.utils.VersionUtils; import com.google.common.util.concurrent.ListenableFuture; -import io.grpc.Attributes; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.ClientStreamTracer; -import io.grpc.Context; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.NameResolver; import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.net.URI; import java.util.concurrent.TimeUnit; /** @@ -153,8 +141,8 @@ public void sendBeat() { switchServerAsync(); } - private GrpcMetadata buildMeta() { - GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP()) + private GrpcMetadata buildMeta(String connectionIdInner) { + GrpcMetadata meta = GrpcMetadata.newBuilder().setClientIp(NetUtils.localIP()) .setVersion(VersionUtils.getFullClientVersion()).putAllLabels(labels).build(); return meta; } @@ -171,7 +159,7 @@ private boolean serverCheck(RequestGrpc.RequestFutureStub requestBlockingStub) { return false; } ServerCheckRequest serverCheckRequest = new ServerCheckRequest(); - GrpcRequest grpcRequest = GrpcUtils.convertToGrpcRequest(serverCheckRequest, buildMeta()); + GrpcRequest grpcRequest = GrpcUtils.convertToGrpcRequest(serverCheckRequest, buildMeta("")); ListenableFuture responseFuture = requestBlockingStub.request(grpcRequest); GrpcResponse response = responseFuture.get(); return response != null; @@ -186,8 +174,7 @@ private boolean serverCheck(RequestGrpc.RequestFutureStub requestBlockingStub) { * @param streamStub streamStub to bind. */ private void bindRequestStream(RequestStreamGrpc.RequestStreamStub streamStub) { - - GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).build(); + GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta("")).build(); LOGGER.info("GrpcClient send stream request grpc server,streamRequest:{}", streamRequest); streamStub.requestStream(streamRequest, new StreamObserver() { @Override @@ -250,11 +237,11 @@ public Connection connectToServer(ServerInfo serverInfo) { serverInfo.getServerPort()); if (newChannelStubTemp != null) { - GrpcConnection grpcConn = new GrpcConnection(connectionId, serverInfo); LOGGER.info("success to create a connection to a server."); RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc .newStub(newChannelStubTemp.getChannel()); bindRequestStream(requestStreamStubTemp); + GrpcConnection grpcConn = new GrpcConnection("", serverInfo); //switch current channel and stub RequestGrpc.RequestFutureStub grpcFutureServiceStubTemp = RequestGrpc diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java index 3ac3b9eddd6..e60359d2ed9 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java @@ -45,7 +45,7 @@ public GrpcConnection(ConnectionMetaInfo metaInfo, StreamObserver streamObserver @Override public boolean heartBeatExpire() { - return true; + return false; } @Override diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestHandlerReactor.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestHandlerReactor.java index df6ff0a3f80..f18b70001f8 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestHandlerReactor.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestHandlerReactor.java @@ -33,6 +33,7 @@ import com.alibaba.nacos.core.remote.RequestHandler; import com.alibaba.nacos.core.remote.RequestHandlerRegistry; import com.alibaba.nacos.core.utils.Loggers; +import io.grpc.stub.ServerCalls; import io.grpc.stub.StreamObserver; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -54,7 +55,6 @@ public class GrpcRequestHandlerReactor extends RequestGrpc.RequestImplBase { @Override public void request(GrpcRequest grpcRequest, StreamObserver responseObserver) { - Loggers.RPC_DIGEST.debug(" gRpc Server receive request :" + grpcRequest); String type = grpcRequest.getType(); if (RequestTypeConstants.SERVER_CHECK.equals(type)) { diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java index b47ca40bd7a..5b255775d8b 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java @@ -16,7 +16,11 @@ package com.alibaba.nacos.core.remote.grpc; +import com.alibaba.nacos.api.grpc.GrpcMetadata; +import com.alibaba.nacos.api.grpc.GrpcRequest; +import com.alibaba.nacos.api.grpc.GrpcResponse; import com.alibaba.nacos.common.remote.ConnectionType; +import com.alibaba.nacos.core.remote.ConnectionManager; import com.alibaba.nacos.core.remote.RequestHandlerRegistry; import com.alibaba.nacos.core.remote.RpcServer; import com.alibaba.nacos.core.utils.ApplicationUtils; @@ -25,14 +29,18 @@ import io.grpc.Contexts; import io.grpc.Grpc; import io.grpc.Metadata; +import io.grpc.MethodDescriptor; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.ServerServiceDefinition; import io.grpc.ServerTransportFilter; -import io.grpc.internal.ServerStream; -import io.grpc.internal.ServerStreamHelper; +import io.grpc.protobuf.ProtoUtils; +import io.grpc.stub.ServerCalls; +import io.grpc.util.MutableHandlerRegistry; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -58,6 +66,9 @@ public class GrpcServer extends RpcServer { @Autowired private GrpcRequestHandlerReactor requestHander; + @Autowired + private ConnectionManager connectionManager; + @Autowired private RequestHandlerRegistry requestHandlerRegistry; @@ -74,24 +85,86 @@ public ConnectionType getConnectionType() { @Override public void startServer() throws Exception { init(); - server = ServerBuilder.forPort(grpcServerPort).addService(streamRequestHander).addService(requestHander) + final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry(); + //handlerRegistry.addService(streamRequestHander); + //handlerRegistry.addService(requestHander); + + // server intercetpor to set connection id. + ServerInterceptor serverInterceptor = new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, + ServerCallHandler next) { + final Context ctx = Context.current() + .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID)); + return Contexts.interceptCall(ctx, call, headers, next); + } + }; + + addServices(handlerRegistry, serverInterceptor); + server = ServerBuilder.forPort(grpcServerPort).fallbackHandlerRegistry(handlerRegistry) .addTransportFilter(new ServerTransportFilter() { @Override public Attributes transportReady(Attributes transportAttrs) { System.out.println("transportReady:" + transportAttrs); - Attributes test = transportAttrs.toBuilder().set(key, UUID.randomUUID().toString()).build(); + Attributes test = transportAttrs.toBuilder() + .set(TRANS_KEY_CONN_ID, UUID.randomUUID().toString()).build(); return test; } - + @Override public void transportTerminated(Attributes transportAttrs) { System.out.println("transportTerminated:" + transportAttrs); + String connectionid = transportAttrs.get(TRANS_KEY_CONN_ID); + connectionManager.unregister(connectionid); super.transportTerminated(transportAttrs); } - }).intercept(new ConnetionIntereptor()).build(); + }).build(); server.start(); } + private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor serverInterceptor) { + + // unary call register. + final MethodDescriptor unaryMethod = MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName(MethodDescriptor.generateFullMethodName("Request", "request")) + .setRequestMarshaller(ProtoUtils.marshaller(GrpcRequest.newBuilder().build())) + .setResponseMarshaller(ProtoUtils.marshaller(GrpcResponse.getDefaultInstance())).build(); + + final ServerCallHandler handler = ServerCalls + .asyncUnaryCall((request, responseObserver) -> { + GrpcMetadata grpcMetadata = request.getMetadata().toBuilder() + .setConnectionId(CONTEXT_KEY_CONN_ID.get()).build(); + GrpcRequest requestNew = request.toBuilder().setMetadata(grpcMetadata).build(); + requestHander.request(requestNew, responseObserver); + }); + + final ServerServiceDefinition serviceDefOfUnary = ServerServiceDefinition.builder("Request") + .addMethod(unaryMethod, handler).build(); + handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnary, serverInterceptor)); + + // server stream register. + final ServerCallHandler streamHandler = ServerCalls + .asyncServerStreamingCall((request, responseObserver) -> { + GrpcMetadata grpcMetadata = request.getMetadata().toBuilder() + .setConnectionId(CONTEXT_KEY_CONN_ID.get()).build(); + GrpcRequest requestNew = request.toBuilder().setMetadata(grpcMetadata).build(); + streamRequestHander.requestStream(requestNew, responseObserver); + }); + + final MethodDescriptor serverStreamMethod = MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.SERVER_STREAMING) + .setFullMethodName(MethodDescriptor.generateFullMethodName("RequestStream", "requestStream")) + .setRequestMarshaller(ProtoUtils.marshaller(GrpcRequest.newBuilder().build())) + .setResponseMarshaller(ProtoUtils.marshaller(GrpcResponse.getDefaultInstance())).build(); + + final ServerServiceDefinition serviceDefOfServerStream = ServerServiceDefinition.builder("RequestStream") + .addMethod(serverStreamMethod, streamHandler).build(); + + handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfServerStream, serverInterceptor)); + + } + @Override public int rpcPortOffset() { return PORT_OFFSET; @@ -104,18 +177,8 @@ public void shundownServer() { } } - static final Attributes.Key key = Attributes.Key.create("conn_id"); + static final Attributes.Key TRANS_KEY_CONN_ID = Attributes.Key.create("conn_id"); + + static final Context.Key CONTEXT_KEY_CONN_ID = Context.key("conn_id"); - static class ConnetionIntereptor implements ServerInterceptor { - - @Override - public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, - ServerCallHandler next) { - Context ctx = Context.current(); - // System.out.println(build); - System.out.println(call.getAttributes().get(key).toString()); - return Contexts.interceptCall(Context.current(), call, headers, next); - - } - } }