Skip to content

Commit

Permalink
alibaba#1409 gRPC server tuned OK.
Browse files Browse the repository at this point in the history
  • Loading branch information
nkorange committed Oct 15, 2019
1 parent d33b5a0 commit 584d73b
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.alibaba.nacos.istio.mcp;

import com.alibaba.nacos.istio.misc.Loggers;
import com.alibaba.nacos.istio.model.mcp.ResourceSourceGrpc;
import io.grpc.*;
import org.springframework.stereotype.Service;

Expand All @@ -10,6 +11,8 @@
@Service
public class McpServerIntercepter implements ServerInterceptor {

private static final String INTERCEPTE_METHOD_NAME = "EstablishResourceStream";

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers,
Expand All @@ -18,7 +21,11 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
SocketAddress address = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
String methodName = call.getMethodDescriptor().getFullMethodName();

Loggers.MAIN.info("address: {}, method: {}", address, methodName);
Loggers.MAIN.info("remote address: {}, method: {}", address, methodName);

if ((ResourceSourceGrpc.SERVICE_NAME + "/" + INTERCEPTE_METHOD_NAME).equals(methodName)) {

}

return next.startCall(call, headers);
}
Expand Down
143 changes: 81 additions & 62 deletions istio/src/main/java/com/alibaba/nacos/istio/mcp/NacosMcpService.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package com.alibaba.nacos.istio.mcp;


import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.istio.misc.Loggers;
import com.alibaba.nacos.istio.model.Port;
import com.alibaba.nacos.istio.model.mcp.*;
import com.alibaba.nacos.istio.model.naming.ServiceEntry;
import com.google.protobuf.Any;
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author nkorange
Expand All @@ -22,120 +22,139 @@
@Service
public class NacosMcpService extends ResourceSourceGrpc.ResourceSourceImplBase {

private Map<String, StreamObserver<Resources>> connnections = new ConcurrentHashMap<>();
private AtomicInteger connectIdGenerator = new AtomicInteger(0);

private Map<Integer, StreamObserver<Resources>> connnections = new ConcurrentHashMap<>();

public NacosMcpService() {
super();

Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {

while (true) {
TimeUnit.SECONDS.sleep(10L);

for (; ; ) {
try {
Thread.sleep(5000L);
if (connnections.isEmpty()) {
continue;
}
for (StreamObserver<Resources> observer : connnections.values()) {
observer.onNext(generateResources());
}
}

} catch (Exception e) {
} catch (Exception e) {

}
}
}
});

thread.start();
}

public Resources generateResources() {

String serviceName = "java.mock.1";
int endpointCount = 10;

ServiceEntry.Builder serviceEntryBuilder = ServiceEntry.newBuilder()
.setResolution(ServiceEntry.Resolution.STATIC)
.setLocation(ServiceEntry.Location.MESH_INTERNAL)
.setHosts(0, serviceName + ".nacos")
.setPorts(0, Port.newBuilder().setNumber(8080).setName("http").setProtocol("HTTP").build());

for (int i = 0; i < endpointCount; i++) {
ServiceEntry.Endpoint endpoint =
ServiceEntry.Endpoint.newBuilder()
.setAddress("10.10.10." + i)
.setWeight(1)
.putPorts("http", 8080)
.putLabels("app", "nacos-istio")
.build();
serviceEntryBuilder.addEndpoints(endpoint);
}
try {

ServiceEntry serviceEntry = serviceEntryBuilder.build();
String serviceName = "java.mock.1";
int endpointCount = 10;

Any any = Any.newBuilder()
.setValue(serviceEntry.toByteString())
.setTypeUrl(ServiceEntry.class.getCanonicalName())
.build();
ServiceEntry.Builder serviceEntryBuilder = ServiceEntry.newBuilder()
.setResolution(ServiceEntry.Resolution.STATIC)
.setLocation(ServiceEntry.Location.MESH_INTERNAL)
.addHosts(serviceName + ".nacos")
.addPorts(Port.newBuilder().setNumber(8080).setName("http").setProtocol("HTTP").build());

Metadata metadata = Metadata.newBuilder().setName("nacos/" + serviceName).build();
metadata.getAnnotationsMap().put("virtual", "1");
for (int i = 0; i < endpointCount; i++) {
ServiceEntry.Endpoint endpoint =
ServiceEntry.Endpoint.newBuilder()
.setAddress("10.10.10." + i)
.setWeight(1)
.putPorts("http", 8080)
.putLabels("app", "nacos-istio")
.build();
serviceEntryBuilder.addEndpoints(endpoint);
}

Resource resource = Resource.newBuilder()
.setBody(any)
.setMetadata(metadata)
.build();
ServiceEntry serviceEntry = serviceEntryBuilder.build();

Resources resources = Resources.newBuilder()
.addResources(resource)
.setCollection(CollectionTypes.SERVICE_ENTRY)
.setNonce(String.valueOf(System.currentTimeMillis()))
.build();
Any any = Any.newBuilder()
.setValue(serviceEntry.toByteString())
.setTypeUrl("type.googleapis.com/istio.networking.v1alpha3.ServiceEntry")
.build();

Loggers.MAIN.info("generated resources: {}", JSON.toJSONString(resource));
Metadata metadata = Metadata.newBuilder()
.setName("nacos/" + serviceName)
.putAnnotations("virtual", "1")
.build();

return resources;
}
Resource resource = Resource.newBuilder()
.setBody(any)
.setMetadata(metadata)
.build();

Resources resources = Resources.newBuilder()
.addResources(resource)
.setCollection(CollectionTypes.SERVICE_ENTRY)
.setNonce(String.valueOf(System.currentTimeMillis()))
.build();

Loggers.MAIN.info("generated resources: {}", resources);

return resources;
} catch (Exception e) {

public void addConnection(String id, StreamObserver<Resources> observer) {
if (!connnections.containsKey(id)) {
connnections.put(id, observer);
Loggers.MAIN.error("", e);
return null;
}
}

@Override
public StreamObserver<RequestResources> establishResourceStream(StreamObserver<Resources> responseObserver) {

Loggers.MAIN.info("new connection incoming...");
int id = connectIdGenerator.incrementAndGet();
connnections.put(id, responseObserver);

return new StreamObserver<RequestResources>() {

private int connectionId = id;

@Override
public void onNext(RequestResources value) {

String nonce = value.getResponseNonce();
List<Integer> list = new LinkedList<>();

Loggers.MAIN.info("receiving request, sink: {}, type: {}", value.getSinkNode(), value.getCollection());

// Return empty resources for other types:
if (!CollectionTypes.SERVICE_ENTRY.equals(value.getCollection())) {
if (value.getErrorDetail() != null && value.getErrorDetail().getCode() != 0) {

Loggers.MAIN.error("NACK error code: {}, message: {}", value.getErrorDetail().getCode()
, value.getErrorDetail().getMessage());
return;
}

if (StringUtils.isNotBlank(value.getResponseNonce())) {
// This is a response:
Loggers.MAIN.info("ACK nonce: {}, type: {}", value.getResponseNonce(), value.getCollection());
return;
}

if (!CollectionTypes.SERVICE_ENTRY.equals(value.getCollection())) {
// Return empty resources for other types:
Resources resources = Resources.newBuilder()
.setCollection(value.getCollection())
.setNonce(String.valueOf(System.currentTimeMillis()))
.build();

responseObserver.onNext(resources);
return;
} else {
responseObserver.onNext(generateResources());
}

addConnection(value.getSinkNode().getId(), responseObserver);
}

@Override
public void onError(Throwable t) {
Loggers.MAIN.error("", t);
responseObserver.onError(t);
Loggers.MAIN.error("stream error.", t);
connnections.remove(connectionId);
}

@Override
Expand Down

0 comments on commit 584d73b

Please sign in to comment.