diff --git a/op-conductor/rpc/ws/flashblocks_handler.go b/op-conductor/rpc/ws/flashblocks_handler.go index b4b159c5eb2..068f60de61c 100644 --- a/op-conductor/rpc/ws/flashblocks_handler.go +++ b/op-conductor/rpc/ws/flashblocks_handler.go @@ -117,7 +117,12 @@ func (h *Handler) Start(ctx context.Context) error { func (h *Handler) Stop() { // Signal the hub to stop if it exists if h.hub != nil { - close(h.hub.done) + select { + case <-h.hub.done: + // already closed + default: + close(h.hub.done) + } } // Cancel the rollup boost context if it exists @@ -142,6 +147,15 @@ func (h *Handler) Stop() { } h.log.Info("WebSocket server closed") } + + // Wait for hub shutdown to complete to avoid leaking goroutines + if h.hub != nil { + select { + case <-h.hub.stopped: + case <-time.After(5 * time.Second): + h.log.Warn("Timed out waiting for hub shutdown") + } + } } // BroadcastMessage sends a message to all connected WebSocket clients @@ -196,6 +210,11 @@ func (h *Handler) listenToRollupBoost(ctx context.Context) { case <-ctx.Done(): return default: + // If not leader, avoid pulling messages to reduce allocation pressure + if !h.isLeaderFn(ctx) { + time.Sleep(500 * time.Millisecond) + continue + } // Try to connect if not connected indefinitely if h.rollupBoostConn == nil { h.log.Info("reconnecting to rollup boost WebSocket", "url", h.cfg.RollupBoostWsURL) diff --git a/op-conductor/rpc/ws/server.go b/op-conductor/rpc/ws/server.go index 159eb474b24..3fac0b8d297 100644 --- a/op-conductor/rpc/ws/server.go +++ b/op-conductor/rpc/ws/server.go @@ -17,6 +17,9 @@ type Hub struct { // Registered clients clients map[*Client]bool + // Protects access to clients map + mu sync.Mutex + // Register requests from the clients register chan *Client @@ -29,6 +32,9 @@ type Hub struct { // Signal to stop the hub done chan struct{} + // Signals that the hub has fully stopped + stopped chan struct{} + // Logger log log.Logger @@ -47,6 +53,7 @@ func newHub(m metrics.Metricer) *Hub { unregister: make(chan *Client), clients: make(map[*Client]bool), done: make(chan struct{}), + stopped: make(chan struct{}), log: log.New("component", "websocket-hub"), metrics: m, } @@ -54,6 +61,9 @@ func newHub(m metrics.Metricer) *Hub { // registerClient adds a client to the hub and updates metrics func (h *Hub) registerClient(client *Client) { + h.mu.Lock() + defer h.mu.Unlock() + h.clients[client] = true clientCount := len(h.clients) h.log.Info("Client registered with hub", "totalClients", clientCount) @@ -66,6 +76,9 @@ func (h *Hub) registerClient(client *Client) { // unregisterClient removes a client from the hub, closes it, and updates metrics func (h *Hub) unregisterClient(client *Client) { + h.mu.Lock() + defer h.mu.Unlock() + if _, ok := h.clients[client]; ok { delete(h.clients, client) client.Close() @@ -85,7 +98,14 @@ func (h *Hub) run() { select { case <-h.done: // Close all remaining client connections + h.mu.Lock() + var remaining []*Client for client := range h.clients { + remaining = append(remaining, client) + } + h.mu.Unlock() + + for _, client := range remaining { h.unregisterClient(client) } h.metrics.RecordWebSocketClientCount(0) @@ -93,6 +113,7 @@ func (h *Hub) run() { if h.callbacks.OnShutdown != nil { h.callbacks.OnShutdown() } + close(h.stopped) return case client := <-h.register: h.registerClient(client) @@ -101,19 +122,27 @@ func (h *Hub) run() { case message := <-h.broadcast: successCount := 0 dropCount := 0 + var toClose []*Client + h.mu.Lock() for client := range h.clients { select { case client.send <- message: // Message sent successfully successCount++ default: - // Channel is full, client is likely slow/dead - // The ping mechanism will detect and clean up dead clients - h.log.Debug("Failed to send message to client, channel full") + // Channel is full, client is likely slow/dead; mark for close + h.log.Warn("Client send channel full, dropping and closing client") dropCount++ + toClose = append(toClose, client) } } + h.mu.Unlock() + + for _, client := range toClose { + h.unregisterClient(client) + } + if dropCount > 0 { h.log.Warn("Failed to send message to all clients, dropped", "successCount", successCount, "dropCount", dropCount) } @@ -188,7 +217,12 @@ func (h *Handler) serveWs(w http.ResponseWriter, r *http.Request) { func (h *Handler) readPump(client *Client) { defer func() { // Unregister the client when the read pump exits - h.hub.unregister <- client + select { + case h.hub.unregister <- client: + case <-h.hub.done: + // Hub already stopping; unregister directly to avoid blocking + h.hub.unregisterClient(client) + } h.log.Info("WebSocket read pump exited, client unregistered") }()