Skip to content

Commit

Permalink
alibaba#1409 Introduce MCP server
Browse files Browse the repository at this point in the history
  • Loading branch information
nkorange committed Oct 14, 2019
1 parent db8015f commit d33b5a0
Show file tree
Hide file tree
Showing 48 changed files with 32,332 additions and 0 deletions.
121 changes: 121 additions & 0 deletions istio/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nacos-all</artifactId>
<groupId>com.alibaba.nacos</groupId>
<version>1.1.3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
<artifactId>nacos-istio</artifactId>
<packaging>jar</packaging>

<name>nacos-istio ${project.version}</name>
<url>http://maven.apache.org</url>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>nacos-api</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>nacos-client</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>nacos-config</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>nacos-naming</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>nacos-core</artifactId>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-common-protos</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>protoc-gen-grpc-java</artifactId>
<version>1.24.0</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.31.Final</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.alibaba.nacos.istio.IstioApp</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<!--<excludes>-->
<!--<exclude>application.properties</exclude>-->
<!--</excludes>-->
</resource>
</resources>
</build>
</project>
19 changes: 19 additions & 0 deletions istio/src/main/java/com/alibaba/nacos/istio/IstioApp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.alibaba.nacos.istio;

import com.alibaba.nacos.istio.mcp.NacosMcpServer;

/**
* @author nkorange
* @since 1.1.4
*/
public class IstioApp {

public static void main(String[] args) throws Exception {

final NacosMcpServer server = new NacosMcpServer();

server.start();

server.waitForTerminated();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.alibaba.nacos.istio.mcp;


/**
* @author nkorange
* @since 1.1.4
*/
public class CollectionTypes {

public static final String SERVICE_ENTRY = "istio/networking/v1alpha3/serviceentries";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.alibaba.nacos.istio.mcp;

/**
* @author nkorange
* @since 1.1.4
*/
public class McpConnection {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.alibaba.nacos.istio.mcp;

import com.alibaba.nacos.istio.misc.Loggers;
import io.grpc.*;
import org.springframework.stereotype.Service;

import java.net.SocketAddress;


@Service
public class McpServerIntercepter implements ServerInterceptor {

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {

SocketAddress address = call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
String methodName = call.getMethodDescriptor().getFullMethodName();

Loggers.MAIN.info("address: {}, method: {}", address, methodName);

return next.startCall(call, headers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.alibaba.nacos.istio.mcp;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.IOException;


/**
* @author nkorange
* @since 1.1.4
*/
@Service
public class NacosMcpServer {

private int port = 18848;
private Server server;

@Autowired
private McpServerIntercepter intercepter;

@Autowired
private NacosMcpService nacosMcpService;

@PostConstruct
public void start() throws IOException {

System.out.println("Starting Nacos MCP server...");

server = ServerBuilder.forPort(port)
.addService(ServerInterceptors.intercept(nacosMcpService, intercepter))
.build();
server.start();

Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {

System.out.println("Stopping Nacos MCP server...");
NacosMcpServer.this.stop();
System.out.println("Nacos MCP server stopped...");
}
});
}

public void stop() {
if (server != null) {
server.shutdown();
}
}

public void waitForTerminated() throws InterruptedException {
server.awaitTermination();
}
}
147 changes: 147 additions & 0 deletions istio/src/main/java/com/alibaba/nacos/istio/mcp/NacosMcpService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package com.alibaba.nacos.istio.mcp;


import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.istio.misc.Loggers;
import com.alibaba.nacos.istio.model.Port;
import com.alibaba.nacos.istio.model.mcp.*;
import com.alibaba.nacos.istio.model.naming.ServiceEntry;
import com.google.protobuf.Any;
import io.grpc.stub.StreamObserver;
import org.springframework.stereotype.Service;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author nkorange
* @since 1.1.4
*/
@Service
public class NacosMcpService extends ResourceSourceGrpc.ResourceSourceImplBase {

private Map<String, StreamObserver<Resources>> connnections = new ConcurrentHashMap<>();

public NacosMcpService() {
super();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {

for (; ; ) {
try {
Thread.sleep(5000L);
if (connnections.isEmpty()) {
continue;
}
for (StreamObserver<Resources> observer : connnections.values()) {
observer.onNext(generateResources());
}

} catch (Exception e) {

}
}
}
});
thread.start();
}

public Resources generateResources() {

String serviceName = "java.mock.1";
int endpointCount = 10;

ServiceEntry.Builder serviceEntryBuilder = ServiceEntry.newBuilder()
.setResolution(ServiceEntry.Resolution.STATIC)
.setLocation(ServiceEntry.Location.MESH_INTERNAL)
.setHosts(0, serviceName + ".nacos")
.setPorts(0, Port.newBuilder().setNumber(8080).setName("http").setProtocol("HTTP").build());

for (int i = 0; i < endpointCount; i++) {
ServiceEntry.Endpoint endpoint =
ServiceEntry.Endpoint.newBuilder()
.setAddress("10.10.10." + i)
.setWeight(1)
.putPorts("http", 8080)
.putLabels("app", "nacos-istio")
.build();
serviceEntryBuilder.addEndpoints(endpoint);
}

ServiceEntry serviceEntry = serviceEntryBuilder.build();

Any any = Any.newBuilder()
.setValue(serviceEntry.toByteString())
.setTypeUrl(ServiceEntry.class.getCanonicalName())
.build();

Metadata metadata = Metadata.newBuilder().setName("nacos/" + serviceName).build();
metadata.getAnnotationsMap().put("virtual", "1");

Resource resource = Resource.newBuilder()
.setBody(any)
.setMetadata(metadata)
.build();

Resources resources = Resources.newBuilder()
.addResources(resource)
.setCollection(CollectionTypes.SERVICE_ENTRY)
.setNonce(String.valueOf(System.currentTimeMillis()))
.build();

Loggers.MAIN.info("generated resources: {}", JSON.toJSONString(resource));

return resources;
}

public void addConnection(String id, StreamObserver<Resources> observer) {
if (!connnections.containsKey(id)) {
connnections.put(id, observer);
}
}

@Override
public StreamObserver<RequestResources> establishResourceStream(StreamObserver<Resources> responseObserver) {

Loggers.MAIN.info("new connection incoming...");

return new StreamObserver<RequestResources>() {

@Override
public void onNext(RequestResources value) {

String nonce = value.getResponseNonce();
List<Integer> list = new LinkedList<>();

Loggers.MAIN.info("receiving request, sink: {}, type: {}", value.getSinkNode(), value.getCollection());

// Return empty resources for other types:
if (!CollectionTypes.SERVICE_ENTRY.equals(value.getCollection())) {

Resources resources = Resources.newBuilder()
.setCollection(value.getCollection())
.build();

responseObserver.onNext(resources);
return;
}

addConnection(value.getSinkNode().getId(), responseObserver);
}

@Override
public void onError(Throwable t) {
Loggers.MAIN.error("", t);
responseObserver.onError(t);
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
Loading

0 comments on commit d33b5a0

Please sign in to comment.