From 9f1c10736d744327eff2eef164f1fa3ad3ee0504 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Wed, 21 Jan 2026 12:02:25 +0800 Subject: [PATCH 1/9] feat: add subscriber limit for websocket connections --- crates/op-rbuilder/src/args/op.rs | 7 +++++++ .../src/builders/flashblocks/config.rs | 5 +++++ .../src/builders/flashblocks/service.rs | 1 + .../src/builders/flashblocks/wspub.rs | 16 ++++++++++++++++ 4 files changed, 29 insertions(+) diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 8a67ed325..f46d13d7c 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -226,6 +226,13 @@ pub struct FlashblocksArgs { /// Flashblocks p2p configuration #[command(flatten)] pub p2p: FlashblocksP2pArgs, + + /// Optional flag to limit the number of subscribers to the websocket server + #[arg( + long = "flashblocks.ws_subscriber_limit", + env = "FLASHBLOCK_WS_SUBSCRIBER_LIMIT" + )] + pub ws_subscriber_limit: Option, } impl Default for FlashblocksArgs { diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index c9778695f..009639c33 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -79,6 +79,9 @@ pub struct FlashblocksConfig { /// Optional flag to process the full payload received by peers pub p2p_process_full_payload: bool, + + /// Optional maximum number of concurrent WebSocket subscribers + pub ws_subscriber_limit: Option, } impl Default for FlashblocksConfig { @@ -103,6 +106,7 @@ impl Default for FlashblocksConfig { p2p_max_peer_count: 50, p2p_send_full_payload: false, p2p_process_full_payload: false, + ws_subscriber_limit: None, } } } @@ -154,6 +158,7 @@ impl TryFrom for FlashblocksConfig { p2p_max_peer_count: args.flashblocks.p2p.p2p_max_peer_count, p2p_send_full_payload: args.flashblocks.p2p.p2p_send_full_payload, p2p_process_full_payload: args.flashblocks.p2p.p2p_process_full_payload, + ws_subscriber_limit: args.flashblocks.ws_subscriber_limit, }) } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index 6cbd83e7d..8f941060c 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -118,6 +118,7 @@ impl FlashblocksServiceBuilder { self.0.specific.ws_addr, metrics.clone(), &task_metrics.websocket_publisher, + self.0.specific.ws_subscriber_limit, ) .wrap_err("failed to create ws publisher")? .into(); diff --git a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs index 49cede5e5..20579d48e 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs @@ -31,6 +31,7 @@ pub struct WebSocketPublisher { subs: Arc, term: watch::Sender, pipe: broadcast::Sender, + subscriber_limit: Option, } impl WebSocketPublisher { @@ -38,6 +39,7 @@ impl WebSocketPublisher { addr: SocketAddr, metrics: Arc, task_monitor: &MonitoredTask, + subscriber_limit: Option, ) -> io::Result { let (pipe, _) = broadcast::channel(100); let (term, _) = watch::channel(false); @@ -53,6 +55,7 @@ impl WebSocketPublisher { term.subscribe(), Arc::clone(&sent), Arc::clone(&subs), + subscriber_limit, ))); Ok(Self { @@ -60,6 +63,7 @@ impl WebSocketPublisher { subs, term, pipe, + subscriber_limit, }) } @@ -101,6 +105,7 @@ async fn listener_loop( term: watch::Receiver, sent: Arc, subs: Arc, + subscriber_limit: Option, ) { listener .set_nonblocking(true) @@ -136,6 +141,14 @@ async fn listener_loop( let term = term.clone(); let receiver_clone = receiver.resubscribe(); + let current = subs.fetch_add(1, Ordering::Relaxed); + if let Some(limit) = subscriber_limit && current >= limit as usize { + warn!("WebSocket connection rejected: subscriber limit reached"); + subs.fetch_sub(1, Ordering::Relaxed); + continue; + + } + match accept_async(connection).await { Ok(stream) => { tokio::spawn(async move { @@ -150,6 +163,7 @@ async fn listener_loop( }); } Err(e) => { + subs.fetch_sub(1, Ordering::Relaxed); warn!(target: "payload_builder", "Failed to accept WebSocket connection from {peer_addr}: {e}"); } } @@ -233,10 +247,12 @@ impl Debug for WebSocketPublisher { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { let subs = self.subs.load(Ordering::Relaxed); let sent = self.sent.load(Ordering::Relaxed); + let subscriber_limit = self.subscriber_limit; f.debug_struct("WebSocketPublisher") .field("subs", &subs) .field("payloads_sent", &sent) + .field("subscriber_limit", &subscriber_limit) .finish() } } From 5b9991d26e2f1dd7abbb2bfc9f9793c4d31269a4 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Wed, 21 Jan 2026 12:14:29 +0800 Subject: [PATCH 2/9] fix: change args --- crates/op-rbuilder/src/args/op.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index f46d13d7c..9a30ff4a3 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -227,9 +227,9 @@ pub struct FlashblocksArgs { #[command(flatten)] pub p2p: FlashblocksP2pArgs, - /// Optional flag to limit the number of subscribers to the websocket server + /// Optional value to limit the number of concurrent WebSocket subscribers #[arg( - long = "flashblocks.ws_subscriber_limit", + long = "flashblocks.ws-subscriber-limit", env = "FLASHBLOCK_WS_SUBSCRIBER_LIMIT" )] pub ws_subscriber_limit: Option, From 2babad3f18aad4e9e45564f3b0d215b8ca64774b Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Wed, 21 Jan 2026 14:56:06 +0800 Subject: [PATCH 3/9] fix: add return error message --- .../src/builders/flashblocks/wspub.rs | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs index 20579d48e..428f80893 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs @@ -1,3 +1,4 @@ +use crate::metrics::OpRBuilderMetrics; use core::{ fmt::{Debug, Formatter}, net::SocketAddr, @@ -14,6 +15,7 @@ use tokio::{ watch, }, }; +use tokio_tungstenite::tungstenite::protocol::{self, frame::coding::CloseCode}; use tokio_tungstenite::{ WebSocketStream, accept_async, tungstenite::{Message, Utf8Bytes}, @@ -141,19 +143,20 @@ async fn listener_loop( let term = term.clone(); let receiver_clone = receiver.resubscribe(); - let current = subs.fetch_add(1, Ordering::Relaxed); - if let Some(limit) = subscriber_limit && current >= limit as usize { - warn!("WebSocket connection rejected: subscriber limit reached"); - subs.fetch_sub(1, Ordering::Relaxed); - continue; - - } - match accept_async(connection).await { - Ok(stream) => { + Ok(mut stream) => { tokio::spawn(async move { - subs.fetch_add(1, Ordering::Relaxed); - debug!(target: "payload_builder", "WebSocket connection established with {}", peer_addr); + let current = subs.fetch_add(1, Ordering::Relaxed); + if let Some(limit) = subscriber_limit && current >= limit as usize { + warn!("WebSocket connection rejected: subscriber limit reached"); + let _ = stream.close(Some(protocol::CloseFrame { + code: CloseCode::Again, + reason: "subscriber limit reached".into(), + })).await; + subs.fetch_sub(1, Ordering::Relaxed); + return; + } + tracing::debug!("WebSocket connection established with {}", peer_addr); // Handle the WebSocket connection in a dedicated task broadcast_loop(stream, metrics, term, receiver_clone, sent).await; @@ -163,7 +166,6 @@ async fn listener_loop( }); } Err(e) => { - subs.fetch_sub(1, Ordering::Relaxed); warn!(target: "payload_builder", "Failed to accept WebSocket connection from {peer_addr}: {e}"); } } From 3c6815ab01163deac587176a6aca498afa1512ba Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Wed, 21 Jan 2026 15:41:39 +0800 Subject: [PATCH 4/9] fix: change log level & make lint --- crates/op-rbuilder/src/builders/flashblocks/wspub.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs index 428f80893..b67149cac 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs @@ -15,10 +15,12 @@ use tokio::{ watch, }, }; -use tokio_tungstenite::tungstenite::protocol::{self, frame::coding::CloseCode}; use tokio_tungstenite::{ WebSocketStream, accept_async, - tungstenite::{Message, Utf8Bytes}, + tungstenite::{ + Message, Utf8Bytes, + protocol::{self, frame::coding::CloseCode}, + }, }; use tracing::{debug, info, trace, warn}; @@ -148,10 +150,10 @@ async fn listener_loop( tokio::spawn(async move { let current = subs.fetch_add(1, Ordering::Relaxed); if let Some(limit) = subscriber_limit && current >= limit as usize { - warn!("WebSocket connection rejected: subscriber limit reached"); + trace!("WebSocket connection rejected: subscriber limit reached"); let _ = stream.close(Some(protocol::CloseFrame { code: CloseCode::Again, - reason: "subscriber limit reached".into(), + reason: "subscriber limit reached, please try again later".into(), })).await; subs.fetch_sub(1, Ordering::Relaxed); return; From c6497d37724b4fb8e28307aecc95627cf55669fd Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Thu, 22 Jan 2026 10:54:38 +0800 Subject: [PATCH 5/9] chore: update imports --- crates/op-rbuilder/src/builders/flashblocks/wspub.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs index b67149cac..3f589cc5f 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs @@ -19,7 +19,7 @@ use tokio_tungstenite::{ WebSocketStream, accept_async, tungstenite::{ Message, Utf8Bytes, - protocol::{self, frame::coding::CloseCode}, + protocol::frame::{CloseFrame, coding::CloseCode}, }, }; use tracing::{debug, info, trace, warn}; @@ -151,7 +151,7 @@ async fn listener_loop( let current = subs.fetch_add(1, Ordering::Relaxed); if let Some(limit) = subscriber_limit && current >= limit as usize { trace!("WebSocket connection rejected: subscriber limit reached"); - let _ = stream.close(Some(protocol::CloseFrame { + let _ = stream.close(Some(CloseFrame { code: CloseCode::Again, reason: "subscriber limit reached, please try again later".into(), })).await; From 3081483b1d0b765fd538eceab6f6929edae61972 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Thu, 22 Jan 2026 13:35:03 +0800 Subject: [PATCH 6/9] chore: increment after check, change log level --- crates/op-rbuilder/src/builders/flashblocks/wspub.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs index 3f589cc5f..535611434 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs @@ -1,4 +1,3 @@ -use crate::metrics::OpRBuilderMetrics; use core::{ fmt::{Debug, Formatter}, net::SocketAddr, @@ -148,17 +147,17 @@ async fn listener_loop( match accept_async(connection).await { Ok(mut stream) => { tokio::spawn(async move { - let current = subs.fetch_add(1, Ordering::Relaxed); + let current = subs.load(Ordering::Relaxed); if let Some(limit) = subscriber_limit && current >= limit as usize { - trace!("WebSocket connection rejected: subscriber limit reached"); + warn!(target: "payload_builder", "WebSocket connection for {peer_addr} rejected: subscriber limit reached"); let _ = stream.close(Some(CloseFrame { code: CloseCode::Again, reason: "subscriber limit reached, please try again later".into(), })).await; - subs.fetch_sub(1, Ordering::Relaxed); return; } - tracing::debug!("WebSocket connection established with {}", peer_addr); + subs.fetch_add(1, Ordering::Relaxed); + debug!(target: "payload_builder", "WebSocket connection established with {}", peer_addr); // Handle the WebSocket connection in a dedicated task broadcast_loop(stream, metrics, term, receiver_clone, sent).await; From fae84c433b1944fc1aed28dc02ca88be945ed615 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Thu, 22 Jan 2026 14:09:39 +0800 Subject: [PATCH 7/9] fix: change subscriber limit to u16 from Option --- crates/op-rbuilder/src/args/op.rs | 5 +++-- crates/op-rbuilder/src/builders/flashblocks/config.rs | 4 ++-- crates/op-rbuilder/src/builders/flashblocks/wspub.rs | 8 ++++---- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 9a30ff4a3..30c5db732 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -230,9 +230,10 @@ pub struct FlashblocksArgs { /// Optional value to limit the number of concurrent WebSocket subscribers #[arg( long = "flashblocks.ws-subscriber-limit", - env = "FLASHBLOCK_WS_SUBSCRIBER_LIMIT" + env = "FLASHBLOCK_WS_SUBSCRIBER_LIMIT", + default_value = "256" )] - pub ws_subscriber_limit: Option, + pub ws_subscriber_limit: u16, } impl Default for FlashblocksArgs { diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index 009639c33..313f60b1c 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -81,7 +81,7 @@ pub struct FlashblocksConfig { pub p2p_process_full_payload: bool, /// Optional maximum number of concurrent WebSocket subscribers - pub ws_subscriber_limit: Option, + pub ws_subscriber_limit: u16, } impl Default for FlashblocksConfig { @@ -106,7 +106,7 @@ impl Default for FlashblocksConfig { p2p_max_peer_count: 50, p2p_send_full_payload: false, p2p_process_full_payload: false, - ws_subscriber_limit: None, + ws_subscriber_limit: 256, } } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs index 535611434..6bb046f2d 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs @@ -34,7 +34,7 @@ pub struct WebSocketPublisher { subs: Arc, term: watch::Sender, pipe: broadcast::Sender, - subscriber_limit: Option, + subscriber_limit: u16, } impl WebSocketPublisher { @@ -42,7 +42,7 @@ impl WebSocketPublisher { addr: SocketAddr, metrics: Arc, task_monitor: &MonitoredTask, - subscriber_limit: Option, + subscriber_limit: u16, ) -> io::Result { let (pipe, _) = broadcast::channel(100); let (term, _) = watch::channel(false); @@ -108,7 +108,7 @@ async fn listener_loop( term: watch::Receiver, sent: Arc, subs: Arc, - subscriber_limit: Option, + subscriber_limit: u16, ) { listener .set_nonblocking(true) @@ -148,7 +148,7 @@ async fn listener_loop( Ok(mut stream) => { tokio::spawn(async move { let current = subs.load(Ordering::Relaxed); - if let Some(limit) = subscriber_limit && current >= limit as usize { + if current >= subscriber_limit as usize { warn!(target: "payload_builder", "WebSocket connection for {peer_addr} rejected: subscriber limit reached"); let _ = stream.close(Some(CloseFrame { code: CloseCode::Again, From ab6e80eef3cf22ad801ee4cab25992d0d033fcd3 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Thu, 22 Jan 2026 14:14:11 +0800 Subject: [PATCH 8/9] chore: inline code --- crates/op-rbuilder/src/builders/flashblocks/wspub.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs index 6bb046f2d..86b4f9e3c 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/wspub.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/wspub.rs @@ -147,8 +147,7 @@ async fn listener_loop( match accept_async(connection).await { Ok(mut stream) => { tokio::spawn(async move { - let current = subs.load(Ordering::Relaxed); - if current >= subscriber_limit as usize { + if subs.load(Ordering::Relaxed) >= subscriber_limit as usize { warn!(target: "payload_builder", "WebSocket connection for {peer_addr} rejected: subscriber limit reached"); let _ = stream.close(Some(CloseFrame { code: CloseCode::Again, From cc7a5785c57ad6e4c086be72e035006beb59af70 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Thu, 22 Jan 2026 14:26:18 +0800 Subject: [PATCH 9/9] chore: update comments --- crates/op-rbuilder/src/args/op.rs | 2 +- crates/op-rbuilder/src/builders/flashblocks/config.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 30c5db732..0d0168740 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -227,7 +227,7 @@ pub struct FlashblocksArgs { #[command(flatten)] pub p2p: FlashblocksP2pArgs, - /// Optional value to limit the number of concurrent WebSocket subscribers + /// Maximum number of concurrent WebSocket subscribers #[arg( long = "flashblocks.ws-subscriber-limit", env = "FLASHBLOCK_WS_SUBSCRIBER_LIMIT", diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index 313f60b1c..7cc9934de 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -80,7 +80,7 @@ pub struct FlashblocksConfig { /// Optional flag to process the full payload received by peers pub p2p_process_full_payload: bool, - /// Optional maximum number of concurrent WebSocket subscribers + /// Maximum number of concurrent WebSocket subscribers pub ws_subscriber_limit: u16, }