Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/op-rbuilder/src/args/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ pub struct FlashblocksArgs {
/// Flashblocks p2p configuration
#[command(flatten)]
pub p2p: FlashblocksP2pArgs,

/// Maximum number of concurrent WebSocket subscribers
#[arg(
long = "flashblocks.ws-subscriber-limit",
env = "FLASHBLOCK_WS_SUBSCRIBER_LIMIT",
default_value = "256"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could the default be no limit? this way it would be a backwards compatible change

Copy link
Contributor Author

@sieniven sieniven Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the default was added here because it helps protect the node against perhaps an attack vector (like ddos)? Will it be better if we tune the default value up instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avalonche hey! eba797b resolves the issue by switching the sub limit to an option instead, and removes the default limit size.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sieniven op-rbuilder flashblocks websocket should be accesses only by rollup-boosts

)]
pub ws_subscriber_limit: Option<u16>,
}

impl Default for FlashblocksArgs {
Expand Down
5 changes: 5 additions & 0 deletions crates/op-rbuilder/src/builders/flashblocks/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ pub struct FlashblocksConfig {

/// Maximum number of peers for the p2p node
pub p2p_max_peer_count: u32,

/// Maximum number of concurrent WebSocket subscribers
pub ws_subscriber_limit: Option<u16>,
}

impl Default for FlashblocksConfig {
Expand All @@ -87,6 +90,7 @@ impl Default for FlashblocksConfig {
p2p_private_key_file: None,
p2p_known_peers: None,
p2p_max_peer_count: 50,
ws_subscriber_limit: None,
}
}
}
Expand Down Expand Up @@ -128,6 +132,7 @@ impl TryFrom<OpRbuilderArgs> for FlashblocksConfig {
p2p_private_key_file: args.flashblocks.p2p.p2p_private_key_file,
p2p_known_peers: args.flashblocks.p2p.p2p_known_peers,
p2p_max_peer_count: args.flashblocks.p2p.p2p_max_peer_count,
ws_subscriber_limit: args.flashblocks.ws_subscriber_limit,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/op-rbuilder/src/builders/flashblocks/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl FlashblocksServiceBuilder {
self.0.specific.ws_addr,
metrics.clone(),
&task_metrics.websocket_publisher,
self.0.specific.ws_subscriber_limit,
)
.wrap_err("failed to create ws publisher")?
.into();
Expand Down
22 changes: 20 additions & 2 deletions crates/op-rbuilder/src/builders/flashblocks/wspub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use tokio::{
};
use tokio_tungstenite::{
WebSocketStream, accept_async,
tungstenite::{Message, Utf8Bytes},
tungstenite::{
Message, Utf8Bytes,
protocol::frame::{CloseFrame, coding::CloseCode},
},
};
use tracing::{debug, warn};

Expand All @@ -31,13 +34,15 @@ pub(super) struct WebSocketPublisher {
subs: Arc<AtomicUsize>,
term: watch::Sender<bool>,
pipe: broadcast::Sender<Utf8Bytes>,
subscriber_limit: Option<u16>,
}

impl WebSocketPublisher {
pub(super) fn new(
addr: SocketAddr,
metrics: Arc<OpRBuilderMetrics>,
task_monitor: &MonitoredTask,
subscriber_limit: Option<u16>,
) -> io::Result<Self> {
let (pipe, _) = broadcast::channel(100);
let (term, _) = watch::channel(false);
Expand All @@ -53,13 +58,15 @@ impl WebSocketPublisher {
term.subscribe(),
Arc::clone(&sent),
Arc::clone(&subs),
subscriber_limit,
)));

Ok(Self {
sent,
subs,
term,
pipe,
subscriber_limit,
})
}

Expand Down Expand Up @@ -101,6 +108,7 @@ async fn listener_loop(
term: watch::Receiver<bool>,
sent: Arc<AtomicUsize>,
subs: Arc<AtomicUsize>,
subscriber_limit: Option<u16>,
) {
listener
.set_nonblocking(true)
Expand Down Expand Up @@ -137,8 +145,16 @@ async fn listener_loop(
let receiver_clone = receiver.resubscribe();

match accept_async(connection).await {
Ok(stream) => {
Ok(mut stream) => {
tokio::spawn(async move {
if let Some(limit) = subscriber_limit && subs.load(Ordering::Relaxed) >= limit as usize {
warn!(target: "payload_builder", "WebSocket connection for {peer_addr} rejected: subscriber limit reached");
let _ = stream.close(Some(CloseFrame {
code: CloseCode::Again,
reason: "subscriber limit reached, please try again later".into(),
})).await;
return;
}
subs.fetch_add(1, Ordering::Relaxed);
tracing::debug!("WebSocket connection established with {}", peer_addr);

Expand Down Expand Up @@ -233,10 +249,12 @@ impl Debug for WebSocketPublisher {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
let subs = self.subs.load(Ordering::Relaxed);
let sent = self.sent.load(Ordering::Relaxed);
let subscriber_limit = self.subscriber_limit;

f.debug_struct("WebSocketPublisher")
.field("subs", &subs)
.field("payloads_sent", &sent)
.field("subscriber_limit", &subscriber_limit)
.finish()
}
}
Loading