Skip to content

Commit 0b616ec

Browse files
authored
feat: Support MCP 2025-03-26 protocol with streaming tools (#3054)
- 实现MCP协议2025-03-26版本支持 - 添加流式工具(Streamable Tools)架构 - 支持双传输模式:流式传输(SSE)和无状态传输 - 实现30+个Arthas工具的MCP接口封装 - 添加实时进度通知和结果流式输出 - 支持watch、monitor、trace、dashboard、profiler等流式工具
1 parent 1522341 commit 0b616ec

File tree

85 files changed

+6485
-1529
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+6485
-1529
lines changed

core/src/main/java/com/taobao/arthas/core/command/CommandExecutorImpl.java

Lines changed: 300 additions & 123 deletions
Large diffs are not rendered by default.

core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@
8080

8181
import com.taobao.arthas.mcp.server.ArthasMcpBootstrap;
8282
import com.taobao.arthas.mcp.server.CommandExecutor;
83-
import com.taobao.arthas.mcp.server.protocol.server.handler.McpRequestHandler;
84-
import com.taobao.arthas.mcp.server.protocol.spec.McpServerTransportProvider;
83+
import com.taobao.arthas.mcp.server.protocol.server.handler.McpHttpRequestHandler;
8584
import io.netty.channel.ChannelFuture;
8685
import io.netty.channel.nio.NioEventLoopGroup;
8786
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -130,7 +129,7 @@ public class ArthasBootstrap {
130129

131130
private HttpApiHandler httpApiHandler;
132131

133-
private McpRequestHandler mcpRequestHandler;
132+
private McpHttpRequestHandler mcpRequestHandler;
134133

135134
private HttpSessionManager httpSessionManager;
136135
private SecurityAuthenticator securityAuthenticator;
@@ -683,7 +682,7 @@ public HttpApiHandler getHttpApiHandler() {
683682
return httpApiHandler;
684683
}
685684

686-
public McpRequestHandler getMcpRequestHandler() {
685+
public McpHttpRequestHandler getMcpRequestHandler() {
687686
return mcpRequestHandler;
688687
}
689688

core/src/main/java/com/taobao/arthas/core/shell/term/impl/http/HttpRequestHandler.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import com.taobao.arthas.common.IOUtils;
66
import com.taobao.arthas.core.server.ArthasBootstrap;
77
import com.taobao.arthas.core.shell.term.impl.http.api.HttpApiHandler;
8-
import com.taobao.arthas.mcp.server.protocol.server.handler.McpRequestHandler;
8+
import com.taobao.arthas.mcp.server.protocol.server.handler.McpHttpRequestHandler;
99
import io.netty.channel.ChannelFuture;
1010
import io.netty.channel.ChannelFutureListener;
1111
import io.netty.channel.ChannelHandlerContext;
@@ -38,7 +38,7 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequ
3838

3939
private HttpApiHandler httpApiHandler;
4040

41-
private McpRequestHandler mcpRequestHandler;
41+
private McpHttpRequestHandler mcpRequestHandler;
4242

4343
public HttpRequestHandler(String wsUri) {
4444
this(wsUri, ArthasBootstrap.getInstance().getOutputPath());
@@ -76,9 +76,8 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
7676
}
7777

7878
//handle mcp request
79-
String messageEndpoint = mcpRequestHandler.getMessageEndpoint();
80-
String sseEndpoint = mcpRequestHandler.getSseEndpoint();
81-
if (sseEndpoint.equals(path) || messageEndpoint.equals(path)) {
79+
String mcpEndpoint = mcpRequestHandler.getMcpEndpoint();
80+
if (mcpEndpoint.equals(path)) {
8281
mcpRequestHandler.handle(ctx, request);
8382
isMcpHandled = true;
8483
return;

labs/arthas-mcp-server/pom.xml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,11 @@
4141
<artifactId>jackson-databind</artifactId>
4242
<version>2.18.1</version>
4343
</dependency>
44-
45-
44+
<dependency>
45+
<groupId>com.fasterxml.jackson.datatype</groupId>
46+
<artifactId>jackson-datatype-jsr310</artifactId>
47+
<version>2.18.1</version>
48+
</dependency>
4649

4750
<!-- log dependencies -->
4851
<dependency>

labs/arthas-mcp-server/src/main/java/com/taobao/arthas/mcp/server/ArthasMcpBootstrap.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,18 @@ public CommandExecutor getCommandExecutor() {
2929
}
3030

3131
public ArthasMcpServer start() {
32+
logger.info("Initializing Arthas MCP Bootstrap...");
3233
try {
33-
// Create and start MCP server
34-
mcpServer = new ArthasMcpServer();
34+
logger.debug("Creating MCP server instance with command executor: {}",
35+
commandExecutor.getClass().getSimpleName());
36+
37+
// Create and start MCP server with CommandExecutor
38+
mcpServer = new ArthasMcpServer(commandExecutor);
39+
logger.debug("MCP server instance created successfully");
40+
3541
mcpServer.start();
3642
logger.info("Arthas MCP server initialized successfully");
43+
logger.info("Bootstrap ready - server is operational");
3744
return mcpServer;
3845
} catch (Exception e) {
3946
logger.error("Failed to initialize Arthas MCP server", e);
@@ -42,8 +49,14 @@ public ArthasMcpServer start() {
4249
}
4350

4451
public void shutdown() {
52+
logger.info("Initiating Arthas MCP Bootstrap shutdown...");
4553
if (mcpServer != null) {
54+
logger.debug("Stopping MCP server...");
4655
mcpServer.stop();
56+
logger.info("MCP server stopped");
57+
} else {
58+
logger.warn("MCP server was null during shutdown - may not have been properly initialized");
4759
}
60+
logger.info("Arthas MCP Bootstrap shutdown completed");
4861
}
4962
}

labs/arthas-mcp-server/src/main/java/com/taobao/arthas/mcp/server/ArthasMcpServer.java

Lines changed: 110 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,24 @@
44
import com.taobao.arthas.mcp.server.protocol.config.McpServerProperties;
55
import com.taobao.arthas.mcp.server.protocol.server.McpNettyServer;
66
import com.taobao.arthas.mcp.server.protocol.server.McpServer;
7-
import com.taobao.arthas.mcp.server.protocol.server.handler.McpRequestHandler;
8-
import com.taobao.arthas.mcp.server.protocol.server.transport.HttpNettyServerTransportProvider;
9-
import com.taobao.arthas.mcp.server.protocol.spec.McpSchema.*;
10-
import com.taobao.arthas.mcp.server.protocol.spec.McpServerTransportProvider;
7+
import com.taobao.arthas.mcp.server.protocol.server.McpStatelessNettyServer;
8+
import com.taobao.arthas.mcp.server.protocol.server.handler.McpHttpRequestHandler;
9+
import com.taobao.arthas.mcp.server.protocol.server.handler.McpStatelessHttpRequestHandler;
10+
import com.taobao.arthas.mcp.server.protocol.server.handler.McpStreamableHttpRequestHandler;
11+
import com.taobao.arthas.mcp.server.protocol.server.transport.NettyStatelessServerTransport;
12+
import com.taobao.arthas.mcp.server.protocol.server.transport.NettyStreamableServerTransportProvider;
13+
import com.taobao.arthas.mcp.server.protocol.spec.McpSchema.Implementation;
14+
import com.taobao.arthas.mcp.server.protocol.spec.McpSchema.ServerCapabilities;
15+
import com.taobao.arthas.mcp.server.protocol.spec.McpStreamableServerTransportProvider;
1116
import com.taobao.arthas.mcp.server.tool.DefaultToolCallbackProvider;
1217
import com.taobao.arthas.mcp.server.tool.ToolCallback;
1318
import com.taobao.arthas.mcp.server.tool.ToolCallbackProvider;
1419
import com.taobao.arthas.mcp.server.tool.util.McpToolUtils;
20+
import com.taobao.arthas.mcp.server.util.JsonParser;
1521
import org.slf4j.Logger;
1622
import org.slf4j.LoggerFactory;
1723

18-
import java.util.Arrays;
19-
import java.util.List;
20-
import java.util.Objects;
24+
import java.util.*;
2125
import java.util.stream.Collectors;
2226

2327
/**
@@ -26,66 +30,106 @@
2630
*/
2731
public class ArthasMcpServer {
2832
private static final Logger logger = LoggerFactory.getLogger(ArthasMcpServer.class);
29-
private McpNettyServer server;
33+
34+
private McpNettyServer streamableServer;
35+
private McpStatelessNettyServer statelessServer;
36+
3037
private final int port;
3138
private final String bindAddress;
32-
private McpRequestHandler mcpRequestHandler;
39+
40+
private final CommandExecutor commandExecutor;
41+
42+
private McpHttpRequestHandler unifiedMcpHandler;
43+
44+
private McpStreamableHttpRequestHandler streamableHandler;
45+
46+
private McpStatelessHttpRequestHandler statelessHandler;
3347

34-
public ArthasMcpServer() {
35-
this(8080, "localhost");
48+
public ArthasMcpServer(CommandExecutor commandExecutor) {
49+
this(8080, "localhost", commandExecutor);
3650
}
3751

38-
public ArthasMcpServer(int port, String bindAddress) {
52+
public ArthasMcpServer(int port, String bindAddress, CommandExecutor commandExecutor) {
3953
this.port = port;
4054
this.bindAddress = bindAddress;
55+
this.commandExecutor = commandExecutor;
4156
}
4257

43-
public McpRequestHandler getMcpRequestHandler() {
44-
return mcpRequestHandler;
58+
public McpHttpRequestHandler getMcpRequestHandler() {
59+
return unifiedMcpHandler;
4560
}
4661

4762
/**
4863
* Start MCP server
4964
*/
5065
public void start() {
66+
logger.info("Starting Arthas MCP server on {}:{}", bindAddress, port);
5167
try {
52-
// 1. Create server configuration
5368
McpServerProperties properties = new McpServerProperties.Builder()
5469
.name("arthas-mcp-server")
5570
.version("1.0.0")
5671
.bindAddress(bindAddress)
5772
.port(port)
58-
.messageEndpoint("/sse/message")
59-
.sseEndpoint("/sse")
73+
.mcpEndpoint("/mcp")
6074
.toolChangeNotification(true)
6175
.resourceChangeNotification(true)
6276
.promptChangeNotification(true)
63-
.objectMapper(new ObjectMapper())
77+
.objectMapper(JsonParser.getObjectMapper())
6478
.build();
65-
66-
// 2. Create transport provider
67-
McpServerTransportProvider transportProvider = createHttpTransportProvider(properties);
68-
mcpRequestHandler = transportProvider.getMcpRequestHandler();
69-
70-
// 3. Create server builder
71-
McpServer.NettySpecification serverBuilder = McpServer.netty(transportProvider)
72-
.serverInfo(new Implementation(properties.getName(), properties.getVersion()))
73-
.capabilities(buildServerCapabilities(properties))
74-
.instructions(properties.getInstructions())
75-
.requestTimeout(properties.getRequestTimeout())
76-
.objectMapper(properties.getObjectMapper() != null ? properties.getObjectMapper() : new ObjectMapper());
7779

7880
ToolCallbackProvider toolCallbackProvider = new DefaultToolCallbackProvider();
7981
ToolCallback[] callbacks = toolCallbackProvider.getToolCallbacks();
8082
List<ToolCallback> providerToolCallbacks = Arrays.stream(callbacks)
8183
.filter(Objects::nonNull)
8284
.collect(Collectors.toList());
8385

84-
serverBuilder.tools(
85-
McpToolUtils.toToolSpecifications(providerToolCallbacks, properties));
86-
server = serverBuilder.build();
86+
// Create transport for both streamable and stateless servers
87+
McpStreamableServerTransportProvider transportProvider = createStreamableHttpTransportProvider(properties);
88+
streamableHandler = transportProvider.getMcpRequestHandler();
89+
90+
NettyStatelessServerTransport statelessTransport = createStatelessHttpTransport(properties);
91+
statelessHandler = statelessTransport.getMcpRequestHandler();
92+
93+
unifiedMcpHandler = McpHttpRequestHandler.builder()
94+
.mcpEndpoint(properties.getMcpEndpoint())
95+
.objectMapper(properties.getObjectMapper())
96+
.tools(Arrays.asList(callbacks))
97+
.build();
98+
unifiedMcpHandler.setStreamableHandler(streamableHandler);
99+
unifiedMcpHandler.setStatelessHandler(statelessHandler);
100+
101+
// Set up unified MCP handler for both streamable and stateless servers
102+
McpServer.StreamableServerNettySpecification streamableServerNettySpecification = McpServer.netty(transportProvider)
103+
.serverInfo(new Implementation(properties.getName(), properties.getVersion()))
104+
.capabilities(buildServerCapabilities(properties))
105+
.instructions(properties.getInstructions())
106+
.requestTimeout(properties.getRequestTimeout())
107+
.commandExecutor(commandExecutor)
108+
.objectMapper(properties.getObjectMapper() != null ? properties.getObjectMapper() : JsonParser.getObjectMapper());
109+
110+
// Set up unified MCP handler for both streamable and stateless servers
111+
McpServer.StatelessServerNettySpecification statelessServerNettySpecification = McpServer.netty(statelessTransport)
112+
.serverInfo(new Implementation(properties.getName(), properties.getVersion()))
113+
.capabilities(buildServerCapabilities(properties))
114+
.instructions(properties.getInstructions())
115+
.requestTimeout(properties.getRequestTimeout())
116+
.commandExecutor(commandExecutor)
117+
.objectMapper(properties.getObjectMapper() != null ? properties.getObjectMapper() : JsonParser.getObjectMapper());
118+
119+
streamableServerNettySpecification.tools(
120+
McpToolUtils.toStreamableToolSpecifications(providerToolCallbacks));
121+
statelessServerNettySpecification.tools(
122+
McpToolUtils.toStatelessToolSpecifications(providerToolCallbacks));
123+
124+
streamableServer = streamableServerNettySpecification.build();
125+
statelessServer = statelessServerNettySpecification.build();
87126

88-
logger.info("Arthas MCP server started, listening on {}:{}", bindAddress, port);
127+
logger.info("Arthas MCP server started successfully");
128+
logger.info("- Listening on {}:{}", bindAddress, port);
129+
logger.info("- MCP Endpoint: {}", properties.getMcpEndpoint());
130+
logger.info("- Transport modes: Streamable + Stateless");
131+
logger.info("- Available tools: {}", providerToolCallbacks.size());
132+
logger.info("- Server ready to accept connections");
89133
} catch (Exception e) {
90134
logger.error("Failed to start Arthas MCP server", e);
91135
throw new RuntimeException("Failed to start Arthas MCP server", e);
@@ -95,47 +139,52 @@ public void start() {
95139
/**
96140
* Create HTTP transport provider
97141
*/
98-
private HttpNettyServerTransportProvider createHttpTransportProvider(McpServerProperties properties) {
99-
return HttpNettyServerTransportProvider.builder()
100-
.messageEndpoint(properties.getMessageEndpoint())
101-
.sseEndpoint(properties.getSseEndpoint())
142+
private NettyStreamableServerTransportProvider createStreamableHttpTransportProvider(McpServerProperties properties) {
143+
return NettyStreamableServerTransportProvider.builder()
144+
.mcpEndpoint(properties.getMcpEndpoint())
145+
.objectMapper(properties.getObjectMapper() != null ? properties.getObjectMapper() : new ObjectMapper())
146+
.build();
147+
}
148+
149+
private NettyStatelessServerTransport createStatelessHttpTransport(McpServerProperties properties) {
150+
return NettyStatelessServerTransport.builder()
151+
.mcpEndpoint(properties.getMcpEndpoint())
102152
.objectMapper(properties.getObjectMapper() != null ? properties.getObjectMapper() : new ObjectMapper())
103153
.build();
104154
}
105155

106-
/**
107-
* Build server capabilities configuration
108-
*/
109156
private ServerCapabilities buildServerCapabilities(McpServerProperties properties) {
110157
return ServerCapabilities.builder()
111158
.prompts(new ServerCapabilities.PromptCapabilities(properties.isPromptChangeNotification()))
112159
.resources(new ServerCapabilities.ResourceCapabilities(properties.isResourceSubscribe(), properties.isResourceChangeNotification()))
113160
.tools(new ServerCapabilities.ToolCapabilities(properties.isToolChangeNotification()))
114161
.build();
115162
}
116-
117-
/**
118-
* Stop MCP server
119-
*/
163+
120164
public void stop() {
121-
if (server != null) {
122-
try {
123-
server.closeGracefully().get();
124-
logger.info("Arthas MCP server stopped");
125-
} catch (Exception e) {
126-
logger.error("Failed to stop Arthas MCP server", e);
165+
logger.info("Stopping Arthas MCP server...");
166+
try {
167+
if (unifiedMcpHandler != null) {
168+
logger.debug("Shutting down unified MCP handler");
169+
unifiedMcpHandler.closeGracefully().get();
170+
logger.info("Unified MCP handler stopped successfully");
127171
}
128-
}
129-
}
130172

131-
public static void main(String[] args) {
132-
ArthasMcpServer arthasMcpServer = new ArthasMcpServer();
133-
arthasMcpServer.start();
134-
// Keep the server running
135-
try {
136-
Thread.sleep(Long.MAX_VALUE);
137-
} catch (InterruptedException e) {
138-
Thread.currentThread().interrupt();
173+
if (streamableServer != null) {
174+
logger.debug("Shutting down streamable server");
175+
streamableServer.closeGracefully().get();
176+
logger.info("Streamable server stopped successfully");
177+
}
178+
179+
if (statelessServer != null) {
180+
logger.debug("Shutting down stateless server");
181+
statelessServer.closeGracefully().get();
182+
logger.info("Stateless server stopped successfully");
183+
}
184+
185+
logger.info("Arthas MCP server stopped completely");
186+
} catch (Exception e) {
187+
logger.error("Failed to stop Arthas MCP server gracefully", e);
139188
}
140189
}
141190
}

labs/arthas-mcp-server/src/main/java/com/taobao/arthas/mcp/server/CommandExecutor.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,27 @@
22

33
import java.util.Map;
44

5-
5+
/**
6+
* 命令执行器接口
7+
*
8+
* @author Yeaury 2025/5/26
9+
*/
610
public interface CommandExecutor {
711

8-
Map<String, Object> execute(String commandLine, long timeout);
12+
default Map<String, Object> executeSync(String commandLine, long timeout) {
13+
return executeSync(commandLine, timeout, null);
14+
}
15+
16+
Map<String, Object> executeSync(String commandLine, long timeout, String sessionId);
17+
18+
Map<String, Object> executeAsync(String commandLine, String sessionId);
19+
20+
Map<String, Object> pullResults(String sessionId, String consumerId);
21+
22+
Map<String, Object> interruptJob(String sessionId);
23+
24+
Map<String, Object> createSession();
25+
26+
Map<String, Object> closeSession(String sessionId);
927
}
28+

0 commit comments

Comments
 (0)