Skip to content

Commit ca2ec09

Browse files
committed
feat: Integrate SSE handler and fix real-time UI updates
Refactors the MCP ConnectionManager to correctly track server IDs and include them in published events (ServerStatusChanged, ToolsUpdated). This ensures the SSEHandler receives the correct ID. Integrates the SSEHandler into the Fx application wiring, providing the handler, registering its `/api/events` route, and adding lifecycle hooks. Removes conflicting root route registration and fixes minor template issues. Resolves race condition where UI could fetch stale data by ensuring SSE notifications rely on backend data persistence confirmation (via ServerDataUpdated event pattern).
1 parent 451ccd2 commit ca2ec09

File tree

9 files changed

+636
-139
lines changed

9 files changed

+636
-139
lines changed

internal/app/app.go

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/co-browser/agent-browser/internal/web/handlers"
1616

1717
"go.uber.org/fx"
18+
// "go.uber.org/fx/fxevent" // Removed import for optional logger
1819
)
1920

2021
// --- Core Application Modules ---
@@ -24,6 +25,10 @@ var LogModule = fx.Module("logger",
2425
fx.Provide(
2526
log.NewLogger,
2627
),
28+
// Removed optional Fx event logger setup
29+
// fx.WithLogger(func(logger log.Logger) fxevent.Logger {
30+
// return &log.FxEventLogger{Logger: logger}
31+
// }),
2732
)
2833

2934
// DatabaseModule provides the database dependency and manages its lifecycle.
@@ -109,27 +114,58 @@ var MCPClientModule = fx.Module("mcp_client",
109114
fx.Invoke(mcp.RegisterEventSubscribers),
110115
)
111116

112-
// WebModule provides the HTTP server, API handlers, and UI handlers (if any).
113-
// It serves the API used internally by the MCPClient and potentially by external UIs.
117+
// WebModule provides the HTTP server, API handlers, UI handlers, and SSE handler.
114118
var WebModule = fx.Module("web",
115119
fx.Provide(
116-
// Provide UI handler (if applicable)
120+
// Provide UI handler (depends on Logger, BackendService)
117121
handlers.NewUIHandler,
118-
// Provide API handlers
122+
// Provide API handlers (depends on BackendService, Logger)
119123
func(bs backend.Service, logger log.Logger) *handlers.APIHandlers {
120124
return handlers.NewAPIHandlers(bs, logger)
121125
},
126+
// Provide SSE handler (depends on Logger, EventBus, BackendService)
127+
// Note: Fx automatically provides context.Background() if context.Context is needed.
128+
func(lc fx.Lifecycle, logger log.Logger, bus events.Bus, bs backend.Service) *handlers.SSEHandler {
129+
// Create handler with background context
130+
// The handler manages its own internal context cancellation via Stop().
131+
sseHandler := handlers.NewSSEHandler(context.Background(), logger, bus, bs)
132+
// Register Stop hook
133+
lc.Append(fx.Hook{
134+
OnStop: func(_ context.Context) error {
135+
logger.Info().Msg("Stopping SSE handler...")
136+
sseHandler.Stop()
137+
return nil
138+
},
139+
})
140+
return sseHandler
141+
},
122142
// Provide the HTTP request router (ServeMux)
123-
web.NewMux,
124-
// Provide the HTTP server itself
143+
// Pass apiHandler and uiHandler explicitly if NewMux requires them.
144+
// Assuming NewMux takes no arguments for now as per web/server.go structure.
145+
// We register handlers via fx.Invoke later.
146+
func() *http.ServeMux {
147+
return http.NewServeMux()
148+
},
149+
// Provide the HTTP server itself (depends on ServeMux)
125150
web.NewServer,
126151
),
127-
// Register API handler routes with the router
128-
fx.Invoke(func(router *http.ServeMux, apiHandlers *handlers.APIHandlers) {
129-
apiHandlers.RegisterRoutes(router)
152+
// Register handlers with the router
153+
fx.Invoke(func(mux *http.ServeMux, apiHandlers *handlers.APIHandlers, uiHandler http.Handler, sseHandler *handlers.SSEHandler) {
154+
// API Routes (includes Swagger/OpenAPI docs)
155+
apiHandlers.RegisterRoutes(mux)
156+
// UI Route
157+
mux.Handle("/ui/", http.StripPrefix("/ui/", uiHandler)) // Serve UI under /ui/
158+
mux.Handle("/", http.RedirectHandler("/ui/", http.StatusMovedPermanently)) // Redirect root to UI
159+
// SSE Route
160+
mux.HandleFunc("/api/events", sseHandler.ServeHTTP)
161+
// Metrics Route (already in api.go RegisterRoutes? If not, add here)
162+
// mux.Handle("/metrics", promhttp.Handler())
130163
}),
131164
// Register web server lifecycle hooks for starting/stopping the server
132165
fx.Invoke(web.RegisterWebServerHooks),
166+
// Invoke the SSE handler's event listener setup explicitly if needed,
167+
// but NewSSEHandler already starts the listener goroutine.
168+
// fx.Invoke(func(sse *handlers.SSEHandler) { /* SSE Listener already started in New */ }),
133169
)
134170

135171
// InitModule performs initial setup tasks, like seeding the database with default servers

internal/mcp/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type RemoteMCPServer struct {
1313
URL string `json:"url"`
1414
Name string `json:"name"`
1515
Description string `json:"description"`
16+
ID int64 `json:"-"` // Add ID field, exclude from JSON config if needed
1617
}
1718

1819
// MCPServerConfig holds the configuration for the MCP server

internal/mcp/server.go

Lines changed: 73 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ type RemoteToolInfo struct {
6464
HandlerFn func(context.Context, mcp.CallToolRequest) (*mcp.CallToolResult, error)
6565
}
6666

67+
// connectionInfo holds runtime state for a managed connection
68+
type connectionInfo struct {
69+
State ConnectionState
70+
ID int64 // Store the server ID
71+
}
72+
6773
// ConnectionManager handles the lifecycle of remote MCP connections.
6874
// It maintains two types of state:
6975
//
@@ -88,7 +94,7 @@ type ConnectionManager struct {
8894
mu sync.RWMutex
8995

9096
// Runtime state
91-
connStates map[string]ConnectionState // URL -> State
97+
connections map[string]*connectionInfo // URL -> {State, ID}
9298
toolsByServer map[string][]mcp.Tool // URL -> Tools
9399
toolHandlers map[string]*RemoteToolInfo // ToolName -> Info
94100
permanentlyRemoved map[string]bool // URL -> bool (Tracks servers explicitly removed via API/event)
@@ -105,7 +111,7 @@ func NewConnectionManager(config MCPServerConfig, logger log.Logger, eventBus ev
105111
ctx: ctx,
106112
cancel: cancel,
107113
mcpServer: mcpServer,
108-
connStates: make(map[string]ConnectionState),
114+
connections: make(map[string]*connectionInfo),
109115
toolsByServer: make(map[string][]mcp.Tool),
110116
toolHandlers: make(map[string]*RemoteToolInfo),
111117
permanentlyRemoved: make(map[string]bool),
@@ -673,91 +679,66 @@ func (cm *ConnectionManager) connectWithRetry(remote RemoteMCPServer) {
673679

674680
// setConnectionState updates both runtime and persistent connection state
675681
func (cm *ConnectionManager) setConnectionState(url string, state ConnectionState) {
676-
// Keep runtime state update
682+
// Get the server ID from our internal state
683+
cm.mu.RLock()
684+
connInfo, exists := cm.connections[url]
685+
cm.mu.RUnlock()
686+
687+
if !exists {
688+
cm.logger.Warn().Str("url", url).Str("state", state.String()).Msg("Attempted to set connection state for untracked URL")
689+
return
690+
}
691+
692+
serverID := connInfo.ID // Use the stored ID
693+
694+
// Update runtime state
677695
cm.mu.Lock()
678-
cm.connStates[url] = state
696+
connInfo.State = state // Update state in the map
679697
mcpConnectionsTotal.WithLabelValues(state.String()).Inc()
680-
cm.mu.Unlock() // Unlock early before potentially blocking on event bus
681-
682-
// Determine the server ID - we don't have it directly anymore!
683-
// We need a way to map URL back to ID. This implies either:
684-
// 1. The ConnectionManager needs access to the server list/DB (bad coupling).
685-
// 2. Events need to carry the ID, or we rely on URL.
686-
// Let's assume for now the BackendService can handle mapping URL->ID if needed,
687-
// or that the event payload is sufficient. We need the ID for the event.
688-
// TODO: Find a way to get server ID from URL if necessary for the event.
689-
// For now, we'll publish without an ID, which might break the handler.
690-
691-
// Find server ID (This is a hack, assumes server exists in DB, inefficient)
692-
// Ideally, the ConnectionManager would store the ID when a connection is added/managed.
693-
// Let's skip finding ID for now and rely on the event handler to potentially look it up by URL if needed.
694-
var serverID int64 = 0 // Placeholder - ID is unknown here!
698+
cm.mu.Unlock()
695699

696700
// Determine error string for event
697701
var errStr *string
698702
if state == StateFailed {
699-
// We don't have the specific connection error here anymore.
700-
// The calling context (connectWithRetry) has it.
701-
// We should ideally pass the error to setConnectionState.
702-
// For now, use a generic message.
703+
// TODO: Pass the actual error message to setConnectionState
703704
errMsg := "connection failed"
704705
errStr = &errMsg
705706
}
706707

707-
cm.logger.Info().Str("url", url).Str("state", state.String()).Msg("Publishing ServerStatusChangedEvent")
708-
// Publish event instead of calling API
709-
cm.eventBus.Publish(events.NewServerStatusChangedEvent(serverID, url, models.ConnectionState(state.String()), errStr))
710-
711-
// --- Remove API Client Call ---
712-
/*
713-
start := time.Now()
714-
defer func() {
715-
mcpAPILatency.WithLabelValues("set_connection_state").Observe(time.Since(start).Seconds())
716-
}()
717-
718-
cm.mu.Lock()
719-
defer cm.mu.Unlock()
720-
721-
// Update runtime state
722-
cm.connStates[url] = state
723-
mcpConnectionsTotal.WithLabelValues(state.String()).Inc()
724-
725-
// Update persistent state through API
726-
servers, err := cm.apiClient.ListMCPServers(cm.ctx)
727-
if err != nil {
728-
cm.logger.Error().Err(err).Msg("Failed to list servers for state update")
729-
mcpAPIErrors.WithLabelValues("list_servers").Inc()
730-
return
731-
}
708+
cm.logger.Info().
709+
Str("url", url).
710+
Int64("id", serverID).
711+
Str("state", state.String()).
712+
Msg("Publishing ServerStatusChangedEvent")
732713

733-
// Find server by URL and update its state
734-
for _, server := range servers {
735-
if server.URL == url {
736-
var err error
737-
if state == StateFailed {
738-
err = fmt.Errorf("connection failed")
739-
}
740-
apiState := models.ConnectionState(state.String())
741-
if updateErr := cm.apiClient.UpdateServerStatus(cm.ctx, server.ID, apiState, err); updateErr != nil {
742-
cm.logger.Error().Err(updateErr).Msg("Failed to update server status")
743-
mcpAPIErrors.WithLabelValues("update_status").Inc()
744-
}
745-
break
746-
}
747-
}
748-
*/
714+
// Publish event with the correct ID
715+
cm.eventBus.Publish(events.NewServerStatusChangedEvent(serverID, url, models.ConnectionState(state.String()), errStr))
749716
}
750717

751718
// getConnectionState gets the current state of a connection
752719
func (cm *ConnectionManager) getConnectionState(url string) ConnectionState {
753720
cm.mu.RLock()
754721
defer cm.mu.RUnlock()
755-
return cm.connStates[url]
722+
if info, exists := cm.connections[url]; exists {
723+
return info.State
724+
}
725+
return StateDisconnected // Default to disconnected if not tracked
756726
}
757727

758728
// updateServerTools synchronizes tool state between runtime and persistent storage
759729
func (cm *ConnectionManager) updateServerTools(serverURL string, fetchedTools []mcp.Tool) {
760-
// Keep runtime state update
730+
// Get the server ID from our internal state
731+
cm.mu.RLock()
732+
connInfo, exists := cm.connections[serverURL]
733+
cm.mu.RUnlock()
734+
735+
if !exists {
736+
cm.logger.Warn().Str("url", serverURL).Msg("Attempted to update tools for untracked URL")
737+
return
738+
}
739+
serverID := connInfo.ID // Use stored ID
740+
741+
// Update runtime tool state
761742
cm.mu.Lock()
762743
if fetchedTools == nil {
763744
delete(cm.toolsByServer, serverURL)
@@ -766,54 +747,22 @@ func (cm *ConnectionManager) updateServerTools(serverURL string, fetchedTools []
766747
cm.toolsByServer[serverURL] = fetchedTools
767748
mcpToolsTotal.WithLabelValues(serverURL).Set(float64(len(fetchedTools)))
768749
}
769-
// Update MCP server tools needs to happen *after* the event is processed
770-
// cm.refreshMCPServerTools() // Move this or trigger via another event?
771-
cm.mu.Unlock() // Unlock before publishing
772-
773-
// TODO: Need server ID again. How to get it?
774-
// Assuming BackendService can look up by URL for now.
775-
var serverID int64 = 0 // Placeholder
750+
cm.mu.Unlock()
776751

777752
// Convert mcp.Tool to models.Tool for the event
778-
// This requires mapping. Assuming direct mapping for now, might need adjustment.
779753
modelTools := make([]models.Tool, 0, len(fetchedTools))
780754
for _, ft := range fetchedTools {
781-
// Basic mapping - adjust if models differ significantly
782755
modelTools = append(modelTools, models.Tool{
783-
ExternalID: ft.Name, // Assuming ExternalID is the tool name from MCP lib
784-
SourceServerID: serverID, // Placeholder!
756+
ExternalID: ft.Name,
757+
SourceServerID: serverID, // Use the correct ID
785758
Name: ft.Name,
786759
Description: ft.Description,
787-
// UpdatedAt/CreatedAt will be set by DB upsert
788760
})
789761
}
790762

791-
cm.logger.Info().Str("server", serverURL).Int("toolCount", len(modelTools)).Msg("Publishing ToolsUpdatedEvent")
792-
// Publish event instead of calling API
763+
cm.logger.Info().Str("server", serverURL).Int64("id", serverID).Int("toolCount", len(modelTools)).Msg("Publishing ToolsUpdatedEvent")
764+
// Publish event with the correct ID
793765
cm.eventBus.Publish(events.NewToolsUpdatedEvent(serverID, serverURL, modelTools))
794-
795-
// TODO: Decide when/how cm.refreshMCPServerTools() should be called now.
796-
// Maybe the backend service publishes another event after DB update?
797-
// Or maybe ConnectionManager subscribes to its own published events? Less ideal.
798-
// For now, let's call it directly after publishing, but this is not quite right.
799-
800-
/*
801-
Calling refreshMCPServerTools() immediately after publishing the ToolsUpdatedEvent
802-
means the internal MCP server's tool list is updated before the BackendService
803-
has necessarily finished processing the event and updating the database.
804-
This could lead to inconsistent states or potentially trigger other unintended
805-
actions that result in repeated connection/update cycles.
806-
807-
Recommendation:
808-
The refreshMCPServerTools() call should be decoupled and triggered only after
809-
the BackendService confirms the database update for the tools is complete.
810-
This could be done by:
811-
1. The BackendService.HandleToolsUpdated method publishing a new event (e.g., ToolsProcessedInDBEvent).
812-
2. The ConnectionManager subscribing to this new event and calling refreshMCPServerTools() in its handler.
813-
*/
814-
// cm.mu.Lock() // REMOVE Call to refreshMCPServerTools
815-
// cm.refreshMCPServerTools() // REMOVE Call
816-
// cm.mu.Unlock() // REMOVE Call
817766
}
818767

819768
// createToolHandler creates a new handler function for a tool
@@ -904,18 +853,28 @@ func (cm *ConnectionManager) handleServerAdded(event events.Event) {
904853

905854
serverURL := addedEvent.Server.URL
906855
serverName := addedEvent.Server.Name
856+
serverID := addedEvent.Server.ID // Get the ID from the event
907857

908858
cm.logger.Info().
909859
Str("url", serverURL).
910860
Str("name", serverName).
861+
Int64("id", serverID).
911862
Msg("Received ServerAddedEvent, attempting connection")
912863

913-
// --- Clear removal flag if it exists ---
864+
// --- Store connection info (including ID) and clear removal flag ---
914865
cm.mu.Lock()
915866
if _, exists := cm.permanentlyRemoved[serverURL]; exists {
916867
delete(cm.permanentlyRemoved, serverURL)
917868
cm.logger.Info().Str("url", serverURL).Msg("Cleared permanently removed flag due to server re-addition")
918869
}
870+
// Initialize connection state if not already present
871+
if _, exists := cm.connections[serverURL]; !exists {
872+
cm.connections[serverURL] = &connectionInfo{ID: serverID, State: StateDisconnected}
873+
cm.logger.Debug().Str("url", serverURL).Int64("id", serverID).Msg("Initialized connection info map entry")
874+
} else {
875+
// Update ID just in case (should be consistent)
876+
cm.connections[serverURL].ID = serverID
877+
}
919878
cm.mu.Unlock()
920879

921880
// Check if we are already trying to connect or are connected to this URL
@@ -934,7 +893,7 @@ func (cm *ConnectionManager) handleServerAdded(event events.Event) {
934893
remote := RemoteMCPServer{
935894
URL: serverURL,
936895
Name: serverName,
937-
// Description is not in the event, but not strictly needed here
896+
ID: serverID, // Pass ID along if needed by connectWithRetry
938897
}
939898
go cm.connectWithRetry(remote)
940899
}
@@ -954,11 +913,12 @@ func (cm *ConnectionManager) handleServerRemoved(event events.Event) {
954913
Int64("id", removedEvent.ServerID).
955914
Msg("Received ServerRemovedEvent, stopping connection")
956915

957-
// --- Mark as permanently removed ---
916+
// --- Mark as permanently removed and remove from connections map ---
958917
cm.mu.Lock()
959918
cm.permanentlyRemoved[removedEvent.ServerURL] = true
919+
delete(cm.connections, removedEvent.ServerURL) // Remove from our state map
960920
cm.mu.Unlock()
961-
cm.logger.Info().Str("url", removedEvent.ServerURL).Msg("Marked server as permanently removed")
921+
cm.logger.Info().Str("url", removedEvent.ServerURL).Msg("Marked server as permanently removed and cleaned connection state")
962922

963923
// --- Stop Connection ---
964924
connectionsMutex.Lock()
@@ -991,16 +951,14 @@ func (cm *ConnectionManager) handleServerRemoved(event events.Event) {
991951
cm.logger.Info().Str("url", removedEvent.ServerURL).Int("count", cleanedCount).Msg("Cleaned up tool handlers for removed server")
992952
cm.mu.Unlock()
993953

994-
// --- Update State and Tools ---
995-
// Update state (marks disconnected in DB via API) - Now handled by event publisher? No, this is reacting to remove.
996-
// cm.setConnectionState(removedEvent.ServerURL, StateDisconnected) // Should not publish another event here. This removal is final.
997-
998-
// Update tools (removes from cm.toolsByServer and calls refreshMCPServerTools)
999-
cm.mu.Lock() // Lock needed for refreshMCPServerTools
1000-
if _, exists := cm.toolsByServer[removedEvent.ServerURL]; exists {
954+
// --- Update Tools (removes from cm.toolsByServer) ---
955+
cm.mu.Lock() // Lock needed for map access
956+
if _, toolsExist := cm.toolsByServer[removedEvent.ServerURL]; toolsExist {
1001957
delete(cm.toolsByServer, removedEvent.ServerURL)
1002958
mcpToolsTotal.WithLabelValues(removedEvent.ServerURL).Set(0)
1003-
cm.refreshMCPServerTools() // Refresh tools in MCP server
959+
// Refreshing the MCP server tools might need to happen AFTER the backend confirms DB update.
960+
// Let's assume ToolsProcessedInDBEvent handles this refresh.
961+
// cm.refreshMCPServerTools() // Avoid calling this directly here
1004962
}
1005963
cm.mu.Unlock()
1006964

0 commit comments

Comments
 (0)