Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion op-conductor/rpc/ws/flashblocks_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,12 @@ func (h *Handler) Start(ctx context.Context) error {
func (h *Handler) Stop() {
// Signal the hub to stop if it exists
if h.hub != nil {
close(h.hub.done)
select {
case <-h.hub.done:
// already closed
default:
close(h.hub.done)
}
Comment thread
zhwrd marked this conversation as resolved.
}

// Cancel the rollup boost context if it exists
Expand All @@ -142,6 +147,15 @@ func (h *Handler) Stop() {
}
h.log.Info("WebSocket server closed")
}

// Wait for hub shutdown to complete to avoid leaking goroutines
if h.hub != nil {
select {
case <-h.hub.stopped:
case <-time.After(5 * time.Second):
h.log.Warn("Timed out waiting for hub shutdown")
}
}
Comment thread
yashvardhan-kukreja marked this conversation as resolved.
}

// BroadcastMessage sends a message to all connected WebSocket clients
Expand Down Expand Up @@ -196,6 +210,11 @@ func (h *Handler) listenToRollupBoost(ctx context.Context) {
case <-ctx.Done():
return
default:
// If not leader, avoid pulling messages to reduce allocation pressure
if !h.isLeaderFn(ctx) {
time.Sleep(500 * time.Millisecond)
continue
}
Comment thread
yashvardhan-kukreja marked this conversation as resolved.
// Try to connect if not connected indefinitely
if h.rollupBoostConn == nil {
h.log.Info("reconnecting to rollup boost WebSocket", "url", h.cfg.RollupBoostWsURL)
Expand Down
42 changes: 38 additions & 4 deletions op-conductor/rpc/ws/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type Hub struct {
// Registered clients
clients map[*Client]bool

// Protects access to clients map
mu sync.Mutex

// Register requests from the clients
register chan *Client

Expand All @@ -29,6 +32,9 @@ type Hub struct {
// Signal to stop the hub
done chan struct{}

// Signals that the hub has fully stopped
stopped chan struct{}

// Logger
log log.Logger

Expand All @@ -47,13 +53,17 @@ func newHub(m metrics.Metricer) *Hub {
unregister: make(chan *Client),
clients: make(map[*Client]bool),
done: make(chan struct{}),
stopped: make(chan struct{}),
log: log.New("component", "websocket-hub"),
metrics: m,
}
}

// registerClient adds a client to the hub and updates metrics
func (h *Hub) registerClient(client *Client) {
h.mu.Lock()
defer h.mu.Unlock()

h.clients[client] = true
clientCount := len(h.clients)
h.log.Info("Client registered with hub", "totalClients", clientCount)
Expand All @@ -66,6 +76,9 @@ func (h *Hub) registerClient(client *Client) {

// unregisterClient removes a client from the hub, closes it, and updates metrics
func (h *Hub) unregisterClient(client *Client) {
h.mu.Lock()
defer h.mu.Unlock()

if _, ok := h.clients[client]; ok {
delete(h.clients, client)
client.Close()
Expand All @@ -85,14 +98,22 @@ func (h *Hub) run() {
select {
case <-h.done:
// Close all remaining client connections
h.mu.Lock()
var remaining []*Client
for client := range h.clients {
remaining = append(remaining, client)
}
h.mu.Unlock()

for _, client := range remaining {
h.unregisterClient(client)
}
h.metrics.RecordWebSocketClientCount(0)

if h.callbacks.OnShutdown != nil {
h.callbacks.OnShutdown()
}
close(h.stopped)
return
case client := <-h.register:
h.registerClient(client)
Expand All @@ -101,19 +122,27 @@ func (h *Hub) run() {
case message := <-h.broadcast:
successCount := 0
dropCount := 0
var toClose []*Client

h.mu.Lock()
for client := range h.clients {
select {
case client.send <- message:
// Message sent successfully
successCount++
default:
// Channel is full, client is likely slow/dead
// The ping mechanism will detect and clean up dead clients
h.log.Debug("Failed to send message to client, channel full")
// Channel is full, client is likely slow/dead; mark for close
h.log.Warn("Client send channel full, dropping and closing client")
dropCount++
toClose = append(toClose, client)
}
}
h.mu.Unlock()

for _, client := range toClose {
h.unregisterClient(client)
}

if dropCount > 0 {
h.log.Warn("Failed to send message to all clients, dropped", "successCount", successCount, "dropCount", dropCount)
}
Expand Down Expand Up @@ -188,7 +217,12 @@ func (h *Handler) serveWs(w http.ResponseWriter, r *http.Request) {
func (h *Handler) readPump(client *Client) {
defer func() {
// Unregister the client when the read pump exits
h.hub.unregister <- client
select {
case h.hub.unregister <- client:
case <-h.hub.done:
// Hub already stopping; unregister directly to avoid blocking
h.hub.unregisterClient(client)
}
h.log.Info("WebSocket read pump exited, client unregistered")
}()

Expand Down