diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 8a67ed325..0d0168740 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -226,6 +226,14 @@ pub struct FlashblocksArgs { /// Flashblocks p2p configuration #[command(flatten)] pub p2p: FlashblocksP2pArgs, + + /// Maximum number of concurrent WebSocket subscribers + #[arg( + long = "flashblocks.ws-subscriber-limit", + env = "FLASHBLOCK_WS_SUBSCRIBER_LIMIT", + default_value = "256" + )] + pub ws_subscriber_limit: u16, } impl Default for FlashblocksArgs { diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index c9778695f..7cc9934de 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -79,6 +79,9 @@ pub struct FlashblocksConfig { /// Optional flag to process the full payload received by peers pub p2p_process_full_payload: bool, + + /// Maximum number of concurrent WebSocket subscribers + pub ws_subscriber_limit: u16, } impl Default for FlashblocksConfig { @@ -103,6 +106,7 @@ impl Default for FlashblocksConfig { p2p_max_peer_count: 50, p2p_send_full_payload: false, p2p_process_full_payload: false, + ws_subscriber_limit: 256, } } } @@ -154,6 +158,7 @@ impl TryFrom for FlashblocksConfig { p2p_max_peer_count: args.flashblocks.p2p.p2p_max_peer_count, p2p_send_full_payload: args.flashblocks.p2p.p2p_send_full_payload, p2p_process_full_payload: args.flashblocks.p2p.p2p_process_full_payload, + ws_subscriber_limit: args.flashblocks.ws_subscriber_limit, }) } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index 6cbd83e7d..8f941060c 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -118,6 +118,7 @@ impl FlashblocksServiceBuilder { self.0.specific.ws_addr, metrics.clone(), &task_metrics.websocket_publisher, + self.0.specific.ws_subscriber_limit, ) .wrap_err("failed to create ws publisher")? .into(); diff --git a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs index 49cede5e5..86b4f9e3c 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs @@ -16,7 +16,10 @@ use tokio::{ }; use tokio_tungstenite::{ WebSocketStream, accept_async, - tungstenite::{Message, Utf8Bytes}, + tungstenite::{ + Message, Utf8Bytes, + protocol::frame::{CloseFrame, coding::CloseCode}, + }, }; use tracing::{debug, info, trace, warn}; @@ -31,6 +34,7 @@ pub struct WebSocketPublisher { subs: Arc, term: watch::Sender, pipe: broadcast::Sender, + subscriber_limit: u16, } impl WebSocketPublisher { @@ -38,6 +42,7 @@ impl WebSocketPublisher { addr: SocketAddr, metrics: Arc, task_monitor: &MonitoredTask, + subscriber_limit: u16, ) -> io::Result { let (pipe, _) = broadcast::channel(100); let (term, _) = watch::channel(false); @@ -53,6 +58,7 @@ impl WebSocketPublisher { term.subscribe(), Arc::clone(&sent), Arc::clone(&subs), + subscriber_limit, ))); Ok(Self { @@ -60,6 +66,7 @@ impl WebSocketPublisher { subs, term, pipe, + subscriber_limit, }) } @@ -101,6 +108,7 @@ async fn listener_loop( term: watch::Receiver, sent: Arc, subs: Arc, + subscriber_limit: u16, ) { listener .set_nonblocking(true) @@ -137,8 +145,16 @@ async fn listener_loop( let receiver_clone = receiver.resubscribe(); match accept_async(connection).await { - Ok(stream) => { + Ok(mut stream) => { tokio::spawn(async move { + if subs.load(Ordering::Relaxed) >= subscriber_limit as usize { + warn!(target: "payload_builder", "WebSocket connection for {peer_addr} rejected: subscriber limit reached"); + let _ = stream.close(Some(CloseFrame { + code: CloseCode::Again, + reason: "subscriber limit reached, please try again later".into(), + })).await; + return; + } subs.fetch_add(1, Ordering::Relaxed); debug!(target: "payload_builder", "WebSocket connection established with {}", peer_addr); @@ -233,10 +249,12 @@ impl Debug for WebSocketPublisher { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { let subs = self.subs.load(Ordering::Relaxed); let sent = self.sent.load(Ordering::Relaxed); + let subscriber_limit = self.subscriber_limit; f.debug_struct("WebSocketPublisher") .field("subs", &subs) .field("payloads_sent", &sent) + .field("subscriber_limit", &subscriber_limit) .finish() } }