Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/op-rbuilder/src/args/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,14 @@ pub struct FlashblocksArgs {
/// Flashblocks p2p configuration
#[command(flatten)]
pub p2p: FlashblocksP2pArgs,

/// Maximum number of concurrent WebSocket subscribers
#[arg(
long = "flashblocks.ws-subscriber-limit",
env = "FLASHBLOCK_WS_SUBSCRIBER_LIMIT",
default_value = "256"
)]
pub ws_subscriber_limit: u16,
}

impl Default for FlashblocksArgs {
Expand Down
5 changes: 5 additions & 0 deletions crates/op-rbuilder/src/builders/flashblocks/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ pub struct FlashblocksConfig {

/// Optional flag to process the full payload received by peers
pub p2p_process_full_payload: bool,

/// Maximum number of concurrent WebSocket subscribers
pub ws_subscriber_limit: u16,
}

impl Default for FlashblocksConfig {
Expand All @@ -103,6 +106,7 @@ impl Default for FlashblocksConfig {
p2p_max_peer_count: 50,
p2p_send_full_payload: false,
p2p_process_full_payload: false,
ws_subscriber_limit: 256,
}
}
}
Expand Down Expand Up @@ -154,6 +158,7 @@ impl TryFrom<OpRbuilderArgs> for FlashblocksConfig {
p2p_max_peer_count: args.flashblocks.p2p.p2p_max_peer_count,
p2p_send_full_payload: args.flashblocks.p2p.p2p_send_full_payload,
p2p_process_full_payload: args.flashblocks.p2p.p2p_process_full_payload,
ws_subscriber_limit: args.flashblocks.ws_subscriber_limit,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/op-rbuilder/src/builders/flashblocks/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl FlashblocksServiceBuilder {
self.0.specific.ws_addr,
metrics.clone(),
&task_metrics.websocket_publisher,
self.0.specific.ws_subscriber_limit,
)
.wrap_err("failed to create ws publisher")?
.into();
Expand Down
22 changes: 20 additions & 2 deletions crates/op-rbuilder/src/builders/flashblocks/wspub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use tokio::{
};
use tokio_tungstenite::{
WebSocketStream, accept_async,
tungstenite::{Message, Utf8Bytes},
tungstenite::{
Message, Utf8Bytes,
protocol::frame::{CloseFrame, coding::CloseCode},
},
};
use tracing::{debug, info, trace, warn};

Expand All @@ -31,13 +34,15 @@ pub struct WebSocketPublisher {
subs: Arc<AtomicUsize>,
term: watch::Sender<bool>,
pipe: broadcast::Sender<Utf8Bytes>,
subscriber_limit: u16,
}

impl WebSocketPublisher {
pub fn new(
addr: SocketAddr,
metrics: Arc<OpRBuilderMetrics>,
task_monitor: &MonitoredTask,
subscriber_limit: u16,
) -> io::Result<Self> {
let (pipe, _) = broadcast::channel(100);
let (term, _) = watch::channel(false);
Expand All @@ -53,13 +58,15 @@ impl WebSocketPublisher {
term.subscribe(),
Arc::clone(&sent),
Arc::clone(&subs),
subscriber_limit,
)));

Ok(Self {
sent,
subs,
term,
pipe,
subscriber_limit,
})
}

Expand Down Expand Up @@ -101,6 +108,7 @@ async fn listener_loop(
term: watch::Receiver<bool>,
sent: Arc<AtomicUsize>,
subs: Arc<AtomicUsize>,
subscriber_limit: u16,
) {
listener
.set_nonblocking(true)
Expand Down Expand Up @@ -137,8 +145,16 @@ async fn listener_loop(
let receiver_clone = receiver.resubscribe();

match accept_async(connection).await {
Ok(stream) => {
Ok(mut stream) => {
tokio::spawn(async move {
if subs.load(Ordering::Relaxed) >= subscriber_limit as usize {
warn!(target: "payload_builder", "WebSocket connection for {peer_addr} rejected: subscriber limit reached");
let _ = stream.close(Some(CloseFrame {
code: CloseCode::Again,
reason: "subscriber limit reached, please try again later".into(),
})).await;
return;
}
subs.fetch_add(1, Ordering::Relaxed);
debug!(target: "payload_builder", "WebSocket connection established with {}", peer_addr);

Expand Down Expand Up @@ -233,10 +249,12 @@ impl Debug for WebSocketPublisher {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
let subs = self.subs.load(Ordering::Relaxed);
let sent = self.sent.load(Ordering::Relaxed);
let subscriber_limit = self.subscriber_limit;

f.debug_struct("WebSocketPublisher")
.field("subs", &subs)
.field("payloads_sent", &sent)
.field("subscriber_limit", &subscriber_limit)
.finish()
}
}