From 25a4798ee931832570cf442165c0bd4dc7c0edf6 Mon Sep 17 00:00:00 2001 From: lvchenguang Date: Thu, 27 Mar 2025 15:24:41 +0800 Subject: [PATCH 1/9] feat: add ping for sse server --- server/sse.go | 43 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/server/sse.go b/server/sse.go index a869ad1aa..e17aabdba 100644 --- a/server/sse.go +++ b/server/sse.go @@ -10,6 +10,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/google/uuid" "github.com/mark3labs/mcp-go/mcp" @@ -60,6 +61,9 @@ type SSEServer struct { sessions sync.Map srv *http.Server contextFunc SSEContextFunc + + keepAlive bool + keepAliveInterval time.Duration } // SSEOption defines a function type for configuring SSEServer @@ -120,6 +124,19 @@ func WithHTTPServer(srv *http.Server) SSEOption { } } +func WithKeepAliveInterval(keepAliveInterval time.Duration) SSEOption { + return func(s *SSEServer) { + s.keepAlive = true + s.keepAliveInterval = keepAliveInterval + } +} + +func WithKeepAlive(keepAlive bool) SSEOption { + return func(s *SSEServer) { + s.keepAlive = keepAlive + } +} + // WithContextFunc sets a function that will be called to customise the context // to the server using the incoming request. func WithSSEContextFunc(fn SSEContextFunc) SSEOption { @@ -131,9 +148,11 @@ func WithSSEContextFunc(fn SSEContextFunc) SSEOption { // NewSSEServer creates a new SSE server instance with the given MCP server and options. func NewSSEServer(server *MCPServer, opts ...SSEOption) *SSEServer { s := &SSEServer{ - server: server, - sseEndpoint: "/sse", - messageEndpoint: "/message", + server: server, + sseEndpoint: "/sse", + messageEndpoint: "/message", + keepAlive: false, + keepAliveInterval: 10 * time.Second, } // Apply all options @@ -244,6 +263,24 @@ func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) { } }() + // Start keep alive : ping + if s.keepAlive { + go func() { + ticker := time.NewTicker(s.keepAliveInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + session.eventQueue <- fmt.Sprintf("event: ping\ndata: %s\n\n", time.Now().Format(time.RFC3339)) + case <-session.done: + return + case <-r.Context().Done(): + return + } + } + }() + } + messageEndpoint := fmt.Sprintf("%s?sessionId=%s", s.CompleteMessageEndpoint(), sessionID) // Send the initial endpoint event From 993d9f03aa6171923a20464f2155f4e15d2d512c Mon Sep 17 00:00:00 2001 From: lcgash Date: Thu, 27 Mar 2025 15:48:02 +0800 Subject: [PATCH 2/9] fix ping message --- server/sse.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/sse.go b/server/sse.go index e17aabdba..7fd25c894 100644 --- a/server/sse.go +++ b/server/sse.go @@ -271,7 +271,7 @@ func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) { for { select { case <-ticker.C: - session.eventQueue <- fmt.Sprintf("event: ping\ndata: %s\n\n", time.Now().Format(time.RFC3339)) + session.eventQueue <- fmt.Sprintf(":ping - %s\n\n", time.Now().Format(time.RFC3339)) case <-session.done: return case <-r.Context().Done(): From 103410e579c88012ba2925552d020a9e3395a73c Mon Sep 17 00:00:00 2001 From: mr-chenguang <37072324+lcgash@users.noreply.github.com> Date: Tue, 8 Apr 2025 19:36:01 +0800 Subject: [PATCH 3/9] Update server/sse.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- server/sse.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/sse.go b/server/sse.go index e7810fb8b..83426436c 100644 --- a/server/sse.go +++ b/server/sse.go @@ -283,7 +283,12 @@ func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) { for { select { case <-ticker.C: - session.eventQueue <- fmt.Sprintf(":ping - %s\n\n", time.Now().Format(time.RFC3339)) + select { + case session.eventQueue <- fmt.Sprintf(":ping - %s\n\n", time.Now().Format(time.RFC3339)): + // Ping sent successfully + default: + log.Printf("Keep-alive ping dropped: event queue is full") + } case <-session.done: return case <-r.Context().Done(): From 9a44d14ab6849de3b7bcf33c3a6975f11e0f433c Mon Sep 17 00:00:00 2001 From: mr-chenguang <37072324+lcgash@users.noreply.github.com> Date: Tue, 8 Apr 2025 19:58:03 +0800 Subject: [PATCH 4/9] Update server/sse.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- server/sse.go | 38 ++++++++++++++++---------------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/server/sse.go b/server/sse.go index 83426436c..2395450a6 100644 --- a/server/sse.go +++ b/server/sse.go @@ -275,28 +275,22 @@ func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) { }() - // Start keep alive : ping - if s.keepAlive { - go func() { - ticker := time.NewTicker(s.keepAliveInterval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - select { - case session.eventQueue <- fmt.Sprintf(":ping - %s\n\n", time.Now().Format(time.RFC3339)): - // Ping sent successfully - default: - log.Printf("Keep-alive ping dropped: event queue is full") - } - case <-session.done: - return - case <-r.Context().Done(): - return - } - } - }() - } +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" + "github.com/mark3labs/mcp-go/mcp" +) messageEndpoint := fmt.Sprintf("%s?sessionId=%s", s.CompleteMessageEndpoint(), sessionID) From 84a7f2a8056f2f4cb4c9068e9712f571b7a2575a Mon Sep 17 00:00:00 2001 From: mr-chenguang <37072324+lcgash@users.noreply.github.com> Date: Tue, 8 Apr 2025 20:01:05 +0800 Subject: [PATCH 5/9] Update sse.go --- server/sse.go | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/server/sse.go b/server/sse.go index 2395450a6..ea610693e 100644 --- a/server/sse.go +++ b/server/sse.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "log" "net/http" "net/http/httptest" "net/url" @@ -274,24 +275,6 @@ func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) { } }() - -import ( - "context" - "encoding/json" - "fmt" - "log" - "net/http" - "net/http/httptest" - "net/url" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/google/uuid" - "github.com/mark3labs/mcp-go/mcp" -) - messageEndpoint := fmt.Sprintf("%s?sessionId=%s", s.CompleteMessageEndpoint(), sessionID) // Send the initial endpoint event From d5cab8e06f82ed501fba9ba42b6fb278752ed8f7 Mon Sep 17 00:00:00 2001 From: mr-chenguang <37072324+lcgash@users.noreply.github.com> Date: Tue, 8 Apr 2025 20:02:24 +0800 Subject: [PATCH 6/9] Update sse.go --- server/sse.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/sse.go b/server/sse.go index ea610693e..6feac1253 100644 --- a/server/sse.go +++ b/server/sse.go @@ -275,8 +275,6 @@ func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) { } }() - messageEndpoint := fmt.Sprintf("%s?sessionId=%s", s.CompleteMessageEndpoint(), sessionID) - // Send the initial endpoint event fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", s.GetMessageEndpointForClient(sessionID)) flusher.Flush() From 47f2f47c6e674319614460507be7bbc95e548eaa Mon Sep 17 00:00:00 2001 From: mr-chenguang <37072324+lcgash@users.noreply.github.com> Date: Fri, 11 Apr 2025 10:40:48 +0800 Subject: [PATCH 7/9] Update sse.go --- server/sse.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/sse.go b/server/sse.go index 6feac1253..8334cc5d2 100644 --- a/server/sse.go +++ b/server/sse.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "log" "net/http" "net/http/httptest" "net/url" From 0f5c0af6e18c7d25223f36e6bbe200622276b6a5 Mon Sep 17 00:00:00 2001 From: mr-chenguang <37072324+lcgash@users.noreply.github.com> Date: Fri, 11 Apr 2025 10:42:43 +0800 Subject: [PATCH 8/9] Update sse.go --- server/sse.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/server/sse.go b/server/sse.go index 8334cc5d2..4771b116c 100644 --- a/server/sse.go +++ b/server/sse.go @@ -274,6 +274,24 @@ func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) { } }() + // Start keep alive : ping + if s.keepAlive { + go func() { + ticker := time.NewTicker(s.keepAliveInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + session.eventQueue <- fmt.Sprintf("event: ping\ndata: %s\n\n", time.Now().Format(time.RFC3339)) + case <-session.done: + return + case <-r.Context().Done(): + return + } + } + }() + } + // Send the initial endpoint event fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", s.GetMessageEndpointForClient(sessionID)) flusher.Flush() From 15204ed8e3d3c19b9ae436e861778a0f4ad8f8ae Mon Sep 17 00:00:00 2001 From: mr-chenguang <37072324+lcgash@users.noreply.github.com> Date: Fri, 11 Apr 2025 10:44:03 +0800 Subject: [PATCH 9/9] fix ping message --- server/sse.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/sse.go b/server/sse.go index 4771b116c..f69451c6d 100644 --- a/server/sse.go +++ b/server/sse.go @@ -282,7 +282,8 @@ func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) { for { select { case <-ticker.C: - session.eventQueue <- fmt.Sprintf("event: ping\ndata: %s\n\n", time.Now().Format(time.RFC3339)) + //: ping - 2025-03-27 07:44:38.682659+00:00 + session.eventQueue <- fmt.Sprintf(":ping - %s\n\n", time.Now().Format(time.RFC3339)) case <-session.done: return case <-r.Context().Done(): @@ -292,6 +293,7 @@ func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) { }() } + // Send the initial endpoint event fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", s.GetMessageEndpointForClient(sessionID)) flusher.Flush()