From 25abeecd5597cafb8308d04822e6822d2ed6aff5 Mon Sep 17 00:00:00 2001 From: 0xsimulacra <21297015+0xsimulacra@users.noreply.github.com> Date: Wed, 1 Oct 2025 03:04:39 +0200 Subject: [PATCH 1/2] added flashblocks RTT logging as debug --- crates/flashblocks-rpc/src/subscription.rs | 44 ++++++++++++++++++++-- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/crates/flashblocks-rpc/src/subscription.rs b/crates/flashblocks-rpc/src/subscription.rs index 750c9b0..177f350 100644 --- a/crates/flashblocks-rpc/src/subscription.rs +++ b/crates/flashblocks-rpc/src/subscription.rs @@ -1,3 +1,4 @@ +use std::time::{SystemTime, UNIX_EPOCH}; use std::{io::Read, sync::Arc, time::Duration}; use alloy_primitives::map::foldhash::HashMap; @@ -12,7 +13,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::time::interval; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; use url::Url; use crate::metrics::Metrics; @@ -122,7 +123,18 @@ where ?data, "Received pong from upstream" ); - awaiting_pong_resp = false + awaiting_pong_resp = false; + if let Some(rtt_ms) = rtt_from_pong(data.as_ref()) { + debug!( + message= "Received pong from upstream flashblocks", + rtt_ms = rtt_ms, + ); + }else { + debug!( + message = "Received UNEXPECTED pong from upstream flashblocks", + data=?data + ); + } } Err(e) => { metrics.upstream_errors.increment(1); @@ -152,7 +164,7 @@ where "Sending ping to upstream" ); - if let Err(error) = write.send(Message::Ping(Default::default())).await { + if let Err(error) = write.send(ping_with_timestamp()).await { warn!( target: "flashblocks_rpc::subscription", ?backoff, @@ -242,3 +254,29 @@ fn try_parse_message(bytes: &[u8]) -> eyre::Result { let text = String::from_utf8(decompressed)?; Ok(text) } + +fn ping_with_timestamp() -> Message { + let now_us: u64 = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_micros() as u64) + .unwrap_or(0); + + Message::Ping(now_us.to_be_bytes().to_vec().into()) +} + +fn rtt_from_pong(data: &[u8]) -> Option { + if data.len() == 8 { + let mut arr = [0u8; 8]; + arr.copy_from_slice(&data[..8]); + let sent_us = u64::from_be_bytes(arr); + + let now_us: u64 = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_micros() as u64) + .unwrap_or(0); + + Some(now_us.saturating_sub(sent_us) as f64 / 1000.0) + } else { + None + } +} From b60dd80bdf187b20e919d297eadc771ab6941442 Mon Sep 17 00:00:00 2001 From: 0xsimulacra <21297015+0xsimulacra@users.noreply.github.com> Date: Wed, 1 Oct 2025 20:42:02 +0200 Subject: [PATCH 2/2] moved to using duraction to follow what repo uses --- crates/flashblocks-rpc/src/subscription.rs | 30 ++++++++++------------ 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/crates/flashblocks-rpc/src/subscription.rs b/crates/flashblocks-rpc/src/subscription.rs index 177f350..b29ea8e 100644 --- a/crates/flashblocks-rpc/src/subscription.rs +++ b/crates/flashblocks-rpc/src/subscription.rs @@ -124,10 +124,10 @@ where "Received pong from upstream" ); awaiting_pong_resp = false; - if let Some(rtt_ms) = rtt_from_pong(data.as_ref()) { + if let Some(rtt) = rtt_from_pong(data.as_ref()) { debug!( message= "Received pong from upstream flashblocks", - rtt_ms = rtt_ms, + rtt = ?rtt, ); }else { debug!( @@ -258,25 +258,21 @@ fn try_parse_message(bytes: &[u8]) -> eyre::Result { fn ping_with_timestamp() -> Message { let now_us: u64 = SystemTime::now() .duration_since(UNIX_EPOCH) - .map(|d| d.as_micros() as u64) - .unwrap_or(0); + .unwrap_or(Duration::ZERO) + .as_micros() as u64; Message::Ping(now_us.to_be_bytes().to_vec().into()) } -fn rtt_from_pong(data: &[u8]) -> Option { - if data.len() == 8 { - let mut arr = [0u8; 8]; - arr.copy_from_slice(&data[..8]); - let sent_us = u64::from_be_bytes(arr); +fn rtt_from_pong(data: &[u8]) -> Option { + let arr: [u8; 8] = data.try_into().ok()?; + let sent_us = u64::from_be_bytes(arr); - let now_us: u64 = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_micros() as u64) - .unwrap_or(0); + let now_us: u64 = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO) + .as_micros() as u64; - Some(now_us.saturating_sub(sent_us) as f64 / 1000.0) - } else { - None - } + let delta_us = now_us.saturating_sub(sent_us); + Some(Duration::from_micros(delta_us)) }