Skip to content

Commit dd1e1f7

Browse files
committed
feat: Use event bus for MCP status/tool updates
Removes the API client dependency from ConnectionManager. ConnectionManager now publishes ServerStatusChanged and ToolsUpdated events directly to the event bus instead of calling back into the Web API. BackendService subscribes to these events to handle DB updates. InitModule now publishes ServerAddedEvent for existing servers at startup to ensure ConnectionManager connects to them.
1 parent e68f428 commit dd1e1f7

File tree

4 files changed

+426
-259
lines changed

4 files changed

+426
-259
lines changed

internal/app/app.go

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,23 @@ var BackendModule = fx.Module("backend",
6868
fx.Provide(
6969
// Provide the backend service implementation, satisfying the backend.Service interface.
7070
func(db database.DBInterface, bus events.Bus, logger log.Logger) backend.Service {
71+
// Just return the service instance created by NewService.
7172
return backend.NewService(db, bus, logger)
7273
},
74+
/* // Remove previous attempts
75+
func(lc fx.Lifecycle, db database.DBInterface, bus events.Bus, logger log.Logger) backend.Service {
76+
// ... creation, assertion, hook registration ...
77+
return serviceInterface
78+
},
79+
*/
7380
),
81+
// Invoke the dedicated registration function from the backend package.
82+
fx.Invoke(backend.RegisterEventHandlers),
83+
/* // Remove previous attempts
84+
fx.Invoke(func(bus events.Bus, serviceImpl *backend.service, logger log.Logger) { // Request concrete type
85+
// ... subscription ...
86+
}),
87+
*/
7488
)
7589

7690
// MCPClientModule provides the components responsible for connecting *to* remote MCP servers.
@@ -91,6 +105,8 @@ var MCPClientModule = fx.Module("mcp_client",
91105
),
92106
// Register lifecycle hooks for starting/stopping the MCP ConnectionManager
93107
fx.Invoke(mcp.RegisterMCPServerHooks),
108+
// Register event subscribers for the ConnectionManager
109+
fx.Invoke(mcp.RegisterEventSubscribers),
94110
)
95111

96112
// WebModule provides the HTTP server, API handlers, and UI handlers (if any).
@@ -116,57 +132,61 @@ var WebModule = fx.Module("web",
116132
fx.Invoke(web.RegisterWebServerHooks),
117133
)
118134

119-
// InitModule performs initial setup tasks, like seeding the database with default servers.
120-
// It runs once during application startup if needed.
135+
// InitModule performs initial setup tasks, like seeding the database with default servers
136+
// and triggering connections for existing servers.
121137
var InitModule = fx.Module("init",
122-
fx.Invoke(func(bs backend.Service, logger log.Logger) {
138+
// Use fx.Invoke to run initialization logic after dependencies are ready.
139+
fx.Invoke(func(bs backend.Service, bus events.Bus, logger log.Logger) {
140+
logger.Info().Msg("Running initialization logic...")
123141
// Check if any servers already exist in the database.
124142
servers, err := bs.ListMCPServers()
125143
if err != nil {
126-
logger.Error().Err(err).Msg("Failed to check existing servers during initialization")
127-
// Decide if this should be fatal. For now, we continue, but defaults won't be added.
144+
logger.Error().Err(err).Msg("Failed to list existing servers during initialization")
145+
// Decide if this should be fatal. For now, we continue.
128146
return
129147
}
130148

131-
// Only add default servers if the database is empty.
149+
// Add default servers ONLY if the database is empty.
132150
if len(servers) == 0 {
133151
logger.Info().Msg("No MCP servers found in the database. Adding default servers...")
134-
135152
// Define the default servers to add.
136-
// TODO: Consider moving these defaults to configuration.
137153
defaultServers := []struct {
138154
name string
139155
url string
140156
description string
141157
}{
142158
{
143159
name: "Local Test Server",
144-
url: "http://0.0.0.0:8001/sse", // Assumes local test server runs on 8001
160+
url: "http://0.0.0.0:8001/sse",
145161
description: "Local MCP test server",
146162
},
147163
}
148164

149165
// Add each default server via the backend service.
150166
for _, server := range defaultServers {
151-
_, err := bs.AddMCPServer(server.name, server.url) // Description is not stored via AddMCPServer currently
167+
// AddMCPServer already publishes ServerAddedEvent, so we don't need to do it manually here.
168+
_, err := bs.AddMCPServer(server.name, server.url)
152169
if err != nil {
153-
// Log error but continue trying to add others.
154170
logger.Error().
155171
Err(err).
156172
Str("name", server.name).
157173
Str("url", server.url).
158-
// Str("description", server.description). // Not used by AddMCPServer
159174
Msg("Failed to add default server")
160-
continue // Skip logging success for this server
175+
continue
161176
}
162177
logger.Info().
163178
Str("name", server.name).
164179
Str("url", server.url).
165-
// Str("description", server.description). // Not stored
166-
Msg("Added default server")
180+
Msg("Added default server (will trigger ServerAddedEvent)")
167181
}
168182
} else {
169-
logger.Info().Int("count", len(servers)).Msg("Servers already exist, skipping default server initialization")
183+
// If servers already exist, publish ServerAddedEvent for each one
184+
// to trigger the ConnectionManager.
185+
logger.Info().Int("count", len(servers)).Msg("Existing servers found. Publishing ServerAddedEvent for each...")
186+
for _, server := range servers {
187+
logger.Debug().Int64("id", server.ID).Str("url", server.URL).Msg("Publishing ServerAddedEvent for existing server")
188+
bus.Publish(events.NewServerAddedEvent(server))
189+
}
170190
}
171191
}),
172192
)

internal/backend/service.go

Lines changed: 122 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ func (s *service) ProcessFetchedTools(serverID int64, fetchedTools []models.Fetc
167167
}
168168

169169
s.logger.Info().Int("fetched", len(fetchedTools)).Int("added", addedCount).Int("updated", updatedCount).Int64("serverID", serverID).Msg("Processed fetched tools")
170-
s.bus.Publish(events.NewToolsUpdatedEvent(serverID, len(fetchedTools))) // Existing event
171170
return addedCount, updatedCount, nil
172171
}
173172

@@ -180,9 +179,9 @@ func (s *service) UpdateMCPServerStatus(id int64, state models.ConnectionState,
180179
return fmt.Errorf("failed to update server status: %w", err)
181180
}
182181

183-
// Publish the status change event
184-
s.bus.Publish(events.NewServerStatusChangedEvent(id, state, errStr))
185-
s.logger.Debug().Int64("id", id).Str("state", string(state)).Msg("Updated server status and published ServerStatusChangedEvent")
182+
// Publish the status change event - REMOVED - ConnectionManager publishes this now.
183+
// s.bus.Publish(events.NewServerStatusChangedEvent(id, state, errStr))
184+
s.logger.Debug().Int64("id", id).Str("state", string(state)).Msg("Updated server status in DB (event published by ConnectionManager)")
186185

187186
// Fetching the updated server is no longer strictly necessary just for publishing the event
188187
// updatedServer, getErr := s.db.GetServerByID(id)
@@ -229,3 +228,122 @@ func (s *service) UpdateMCPServer(id int64, name, url string) (*models.MCPServer
229228
// s.bus.Publish(events.NewServerUpdatedEvent(*updatedServer)) // TODO: Define and use a ServerUpdatedEvent
230229
return updatedServer, nil
231230
}
231+
232+
// --- Event Handlers ---
233+
234+
// HandleServerStatusChanged processes ServerStatusChangedEvent received from the event bus.
235+
func (s *service) HandleServerStatusChanged(event events.Event) {
236+
statusEvent, ok := event.(*events.ServerStatusChangedEvent)
237+
if !ok {
238+
s.logger.Error().Str("eventType", string(event.Type())).Msg("Received event of unexpected type in HandleServerStatusChanged")
239+
return
240+
}
241+
242+
// Need to find the server ID from the URL if ID is 0 (publisher might not know it)
243+
serverID := statusEvent.ServerID
244+
if serverID == 0 {
245+
server, err := s.db.GetServerByURL(statusEvent.ServerURL)
246+
if err != nil {
247+
s.logger.Error().Err(err).Str("url", statusEvent.ServerURL).Msg("Error finding server by URL for status update event")
248+
return
249+
}
250+
if server == nil {
251+
s.logger.Warn().Str("url", statusEvent.ServerURL).Msg("Received status update event for unknown server URL")
252+
return
253+
}
254+
serverID = server.ID
255+
s.logger.Debug().Int64("serverID", serverID).Str("url", statusEvent.ServerURL).Msg("Mapped server URL to ID for status update event")
256+
}
257+
258+
s.logger.Info().
259+
Int64("serverID", serverID).
260+
Str("url", statusEvent.ServerURL).
261+
Str("newState", string(statusEvent.NewState)).
262+
Msg("Handling ServerStatusChanged event")
263+
264+
// Call the existing DB update method
265+
err := s.db.UpdateServerStatus(serverID, statusEvent.NewState, statusEvent.LastError, time.Now())
266+
if err != nil {
267+
s.logger.Error().Err(err).Int64("serverID", serverID).Msg("Failed to update DB from ServerStatusChangedEvent")
268+
// Note: We don't re-publish the event here to avoid loops.
269+
}
270+
}
271+
272+
// HandleToolsUpdated processes ToolsUpdatedEvent received from the event bus.
273+
func (s *service) HandleToolsUpdated(event events.Event) {
274+
toolsEvent, ok := event.(*events.ToolsUpdatedEvent)
275+
if !ok {
276+
s.logger.Error().Str("eventType", string(event.Type())).Msg("Received event of unexpected type in HandleToolsUpdated")
277+
return
278+
}
279+
280+
// Need to find the server ID from the URL if ID is 0
281+
serverID := toolsEvent.ServerID
282+
if serverID == 0 {
283+
server, err := s.db.GetServerByURL(toolsEvent.ServerURL)
284+
if err != nil {
285+
s.logger.Error().Err(err).Str("url", toolsEvent.ServerURL).Msg("Error finding server by URL for tools update event")
286+
return
287+
}
288+
if server == nil {
289+
s.logger.Warn().Str("url", toolsEvent.ServerURL).Msg("Received tools update event for unknown server URL")
290+
return
291+
}
292+
serverID = server.ID
293+
s.logger.Debug().Int64("serverID", serverID).Str("url", toolsEvent.ServerURL).Msg("Mapped server URL to ID for tools update event")
294+
}
295+
296+
s.logger.Info().
297+
Int64("serverID", serverID).
298+
Str("url", toolsEvent.ServerURL).
299+
Int("toolCount", len(toolsEvent.Tools)).
300+
Msg("Handling ToolsUpdated event")
301+
302+
// Process each tool using the DB upsert method
303+
addedCount := 0
304+
updatedCount := 0
305+
hadError := false
306+
for _, tool := range toolsEvent.Tools {
307+
// Ensure the SourceServerID is set correctly, as the event might have had 0
308+
tool.SourceServerID = serverID
309+
310+
wasAdded, upsertErr := s.db.UpsertTool(tool)
311+
if upsertErr != nil {
312+
s.logger.Error().Err(upsertErr).Str("externalID", tool.ExternalID).Int64("serverID", serverID).Msg("Error upserting tool from ToolsUpdatedEvent")
313+
hadError = true
314+
continue // Continue processing other tools
315+
}
316+
if wasAdded {
317+
addedCount++
318+
} else {
319+
updatedCount++
320+
}
321+
}
322+
323+
if hadError {
324+
s.logger.Warn().Int64("serverID", serverID).Msg("Encountered errors while processing tools from ToolsUpdatedEvent")
325+
}
326+
327+
s.logger.Info().
328+
Int64("serverID", serverID).
329+
Int("added", addedCount).
330+
Int("updated", updatedCount).
331+
Msg("Finished processing ToolsUpdatedEvent")
332+
333+
// TODO: Potentially publish another event like 'ToolsProcessed' if other components
334+
// need to know the DB update is complete (e.g., to trigger MCP server tool refresh).
335+
}
336+
337+
// RegisterEventHandlers registers the necessary event handlers for the backend service.
338+
// It performs a type assertion to access the unexported handler methods.
339+
func RegisterEventHandlers(bus events.Bus, serviceInterface Service, logger log.Logger) {
340+
serviceImpl, ok := serviceInterface.(*service) // Type assertion to concrete *service
341+
if !ok {
342+
logger.Fatal().Msg("Backend service provided to RegisterEventHandlers is not of expected type *service")
343+
return // Or panic
344+
}
345+
346+
logger.Info().Msg("Registering backend service event handlers...")
347+
bus.Subscribe(events.ServerStatusChanged, serviceImpl.HandleServerStatusChanged)
348+
bus.Subscribe(events.ToolsUpdated, serviceImpl.HandleToolsUpdated)
349+
}

internal/events/events.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,19 @@ func NewServerRemovedEvent(serverID int64, serverURL string) *ServerRemovedEvent
8181
type ToolsUpdatedEvent struct {
8282
baseEvent
8383
ServerID int64
84-
FetchedCount int // How many tools were in the fetched list (can be 0)
85-
// TODO: Optionally include added/updated counts if ProcessFetchedTools can return them
86-
// TODO: Optionally include list of changed tool IDs/names?
84+
ServerURL string
85+
FetchedCount int
86+
Tools []models.Tool
8787
}
8888

8989
// NewToolsUpdatedEvent creates a new event for when tools are updated for a server
90-
func NewToolsUpdatedEvent(serverID int64, fetchedCount int) *ToolsUpdatedEvent {
90+
func NewToolsUpdatedEvent(serverID int64, serverURL string, tools []models.Tool) *ToolsUpdatedEvent {
9191
return &ToolsUpdatedEvent{
9292
baseEvent: newBaseEvent(ToolsUpdated),
9393
ServerID: serverID,
94-
FetchedCount: fetchedCount,
94+
ServerURL: serverURL,
95+
FetchedCount: len(tools),
96+
Tools: tools,
9597
}
9698
}
9799

@@ -100,15 +102,17 @@ func NewToolsUpdatedEvent(serverID int64, fetchedCount int) *ToolsUpdatedEvent {
100102
type ServerStatusChangedEvent struct {
101103
baseEvent
102104
ServerID int64
103-
NewState models.ConnectionState // The new state (connected, disconnected, failed)
104-
LastError *string // Optional error message if state is 'failed'
105+
ServerURL string
106+
NewState models.ConnectionState
107+
LastError *string
105108
}
106109

107110
// NewServerStatusChangedEvent creates a new event for server status changes.
108-
func NewServerStatusChangedEvent(serverID int64, newState models.ConnectionState, lastError *string) *ServerStatusChangedEvent {
111+
func NewServerStatusChangedEvent(serverID int64, serverURL string, newState models.ConnectionState, lastError *string) *ServerStatusChangedEvent {
109112
return &ServerStatusChangedEvent{
110113
baseEvent: newBaseEvent(ServerStatusChanged),
111114
ServerID: serverID,
115+
ServerURL: serverURL,
112116
NewState: newState,
113117
LastError: lastError,
114118
}

0 commit comments

Comments
 (0)