diff --git a/Cargo.lock b/Cargo.lock index 986152df..a8eecd1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14105,6 +14105,7 @@ dependencies = [ "jsonrpsee", "moka", "reth-node-api", + "reth-node-core", "reth-optimism-flashblocks", "reth-primitives-traits", "reth-rpc", diff --git a/bin/node/src/main.rs b/bin/node/src/main.rs index 06d1bffd..280a9e80 100644 --- a/bin/node/src/main.rs +++ b/bin/node/src/main.rs @@ -83,6 +83,7 @@ fn main() { // Clone xlayer_args early to avoid partial move issues let xlayer_args = args.xlayer_args.clone(); + let datadir = builder.config().datadir().clone(); let legacy_config = LegacyRpcRouterConfig { enabled: xlayer_args.legacy.legacy_rpc_url.is_some(), @@ -130,6 +131,7 @@ fn main() { flashblock_rx, args.xlayer_args.builder.flashblocks, args.rollup_args.flashblocks_url.is_some(), + datadir, )?; service.spawn(); info!(target: "reth::cli", "xlayer flashblocks service initialized"); diff --git a/crates/builder/src/args/op.rs b/crates/builder/src/args/op.rs index fe0b61c8..2465e5f0 100644 --- a/crates/builder/src/args/op.rs +++ b/crates/builder/src/args/op.rs @@ -151,6 +151,14 @@ pub struct FlashblocksArgs { default_value = "256" )] pub ws_subscriber_limit: Option, + + /// Enable replay from the persistence file on startup + #[arg( + long = "flashblocks.replay-from-persistence-file", + env = "FLASHBLOCKS_REPLAY_FROM_PERSISTENCE_FILE", + default_value = "false" + )] + pub replay_from_persistence_file: bool, } impl Default for FlashblocksArgs { diff --git a/crates/builder/src/payload/flashblocks/cache.rs b/crates/builder/src/payload/flashblocks/cache.rs index b44138a8..abc8b6bb 100644 --- a/crates/builder/src/payload/flashblocks/cache.rs +++ b/crates/builder/src/payload/flashblocks/cache.rs @@ -1,48 +1,136 @@ use parking_lot::Mutex; -use std::sync::Arc; +use serde::{Deserialize, Serialize}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; use alloy_consensus::transaction::Recovered; use alloy_eips::eip2718::WithEncoded; use alloy_primitives::B256; use op_alloy_rpc_types_engine::OpFlashblockPayload; + +use reth_node_core::dirs::{ChainPath, DataDirPath}; use reth_payload_builder::PayloadId; use reth_primitives_traits::SignedTransaction; -type FlashblockPayloadsSequence = Option<(PayloadId, Option, Vec)>; +/// Flashblocks sub-dir within the datadir. +const FLASHBLOCKS_DIR: &str = "flashblocks"; + +/// Flashblocks persistence filename for the current pending flashblocks sequence. +const PENDING_SEQUENCE_FILE: &str = "pending_sequence.json"; + +fn init_pending_sequence_path(datadir: ChainPath) -> Option { + let flashblocks_dir = datadir.data_dir().join(FLASHBLOCKS_DIR); + std::fs::create_dir_all(&flashblocks_dir) + .inspect_err(|e| { + // log target is flashblocks since datadir init can be for both sequencer and RPC + tracing::warn!( + target: "flashblocks", + "Failed to create flashblocks directory at {}: {e}", + flashblocks_dir.display() + ); + }) + .ok()?; + Some(flashblocks_dir.join(PENDING_SEQUENCE_FILE)) +} + +fn try_load_from_filepath(path: Option<&Path>) -> Option { + let path = path?; + if !path.exists() { + tracing::warn!(target: "payload_builder", "Failed to read flashblocks persistence file: does not exist"); + return None; + } + + let data = std::fs::read(path) + .inspect_err(|e| { + tracing::warn!(target: "payload_builder", "Failed to read flashblocks persistence file: {e}"); + }) + .ok()?; + + let sequence = serde_json::from_slice::(&data) + .inspect_err(|e| { + tracing::warn!(target: "payload_builder", "Failed to deserialize flashblocks persistence file: {e}"); + }) + .ok()?; + + tracing::info!( + target: "payload_builder", + payload_id = %sequence.payload_id, + parent_hash = ?sequence.parent_hash, + payloads = sequence.payloads.len(), + "Loaded pending flashblocks sequence from disk" + ); + + Some(sequence) +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FlashblockPayloadsSequence { + pub payload_id: PayloadId, + pub parent_hash: Option, + pub payloads: Vec, +} /// Cache for the current pending block's flashblock payloads sequence that is /// being built, based on the `payload_id`. #[derive(Debug, Clone, Default)] -pub(crate) struct FlashblockPayloadsCache { - inner: Arc>, +pub struct FlashblockPayloadsCache { + inner: Arc>>, + persist_path: Option, } impl FlashblockPayloadsCache { - pub(crate) fn new() -> Self { - Self::default() + pub fn new(datadir: Option>) -> Self { + let persist_path = datadir.and_then(init_pending_sequence_path); + + Self { + inner: Arc::new(Mutex::new(try_load_from_filepath(persist_path.as_deref()))), + persist_path, + } } - pub(crate) fn add_flashblock_payload(&self, payload: OpFlashblockPayload) -> eyre::Result<()> { + pub fn add_flashblock_payload(&self, payload: OpFlashblockPayload) -> eyre::Result<()> { let mut guard = self.inner.lock(); match guard.as_mut() { - Some((curr_payload_id, parent_hash, payloads)) - if *curr_payload_id == payload.payload_id => - { - if parent_hash.is_none() + Some(sequence) if sequence.payload_id == payload.payload_id => { + if sequence.parent_hash.is_none() && let Some(hash) = payload.parent_hash() { - *parent_hash = Some(hash); + sequence.parent_hash = Some(hash); } - payloads.push(payload); + sequence.payloads.push(payload); } _ => { // New payload_id - replace entire cache - *guard = Some((payload.payload_id, payload.parent_hash(), vec![payload])); + *guard = Some(FlashblockPayloadsSequence { + payload_id: payload.payload_id, + parent_hash: payload.parent_hash(), + payloads: vec![payload], + }); } } Ok(()) } + pub async fn persist(&self) -> eyre::Result<()> { + let Some(path) = self.persist_path.as_ref() else { return Ok(()) }; + let Some(sequence) = self.inner.lock().clone() else { return Ok(()) }; + + let data = serde_json::to_vec(&sequence)?; + + let file_name = path + .file_name() + .ok_or_else(|| eyre::eyre!("persist path has no file name"))? + .to_string_lossy(); + let tmp_path = path.with_file_name(format!(".{file_name}")); + + tokio::fs::write(&tmp_path, &data).await?; + tokio::fs::rename(&tmp_path, path).await?; + + Ok(()) + } + /// Get the flashblocks sequence transactions for a given `parent_hash`. Note that we do not /// yield sequencer transactions that were included in the payload attributes (index 0). /// @@ -54,14 +142,12 @@ impl FlashblockPayloadsCache { parent_hash: B256, ) -> Option>>> { let mut payloads = { - let mut guard = self.inner.lock(); - let (_, curr_parent_hash, _) = guard.as_ref()?; - if *curr_parent_hash != Some(parent_hash) { + let guard = self.inner.lock(); + let sequence = guard.as_ref()?; + if sequence.parent_hash != Some(parent_hash) { return None; } - // Take ownership and flush the cache - let (_, _, payloads) = guard.take()?; - payloads + sequence.payloads.clone() }; payloads.sort_by_key(|p| p.index); @@ -72,9 +158,10 @@ impl FlashblockPayloadsCache { |mut acc, (expected_index, payload)| { if payload.index != expected_index as u64 + 1 { tracing::warn!( + target: "payload_builder", expected = expected_index + 1, got = payload.index, - "flashblock payloads have missing or out-of-order indexes" + "flashblock payloads have missing or out-of-order indexes", ); return None; } @@ -83,4 +170,439 @@ impl FlashblockPayloadsCache { }, ) } + + #[cfg(test)] + fn with_persist_path(path: PathBuf) -> Self { + Self { inner: Arc::new(Mutex::new(None)), persist_path: Some(path) } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use op_alloy_rpc_types_engine::{ + OpFlashblockPayloadBase, OpFlashblockPayloadDelta, OpFlashblockPayloadMetadata, + }; + use reth_optimism_primitives::OpTransactionSigned; + use std::collections::BTreeMap; + + /// RAII guard for a temporary directory that cleans up on drop (success or failure). + struct TempDir(PathBuf); + + impl TempDir { + fn new(name: &str) -> Self { + let path = std::env::temp_dir() + .join(format!("xlayer_cache_test_{name}_{}", std::process::id())); + std::fs::create_dir_all(&path).expect("failed to create temp dir"); + Self(path) + } + + fn path(&self) -> &Path { + &self.0 + } + } + + impl Drop for TempDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + + /// Creates a test flashblock payload with configurable fields. + fn make_payload( + payload_id: [u8; 8], + index: u64, + parent_hash: Option, + block_number: u64, + ) -> OpFlashblockPayload { + OpFlashblockPayload { + payload_id: PayloadId::new(payload_id), + index, + base: parent_hash.map(|hash| OpFlashblockPayloadBase { + parent_hash: hash, + block_number, + ..Default::default() + }), + diff: OpFlashblockPayloadDelta::default(), + metadata: OpFlashblockPayloadMetadata { + block_number, + new_account_balances: BTreeMap::new(), + receipts: BTreeMap::new(), + }, + } + } + + // ======================================================================== + // Cache creation + // ======================================================================== + + #[test] + fn default_cache_is_empty() { + let cache = FlashblockPayloadsCache::default(); + assert!(cache.inner.lock().is_none()); + assert!(cache.persist_path.is_none()); + } + + #[test] + fn new_without_datadir_has_no_persist_path() { + let cache = FlashblockPayloadsCache::new(None); + assert!(cache.persist_path.is_none()); + } + + // ======================================================================== + // add_flashblock_payload + // ======================================================================== + + #[test] + fn add_single_payload_creates_sequence() { + let cache = FlashblockPayloadsCache::default(); + let parent = B256::random(); + let payload = make_payload([1u8; 8], 0, Some(parent), 100); + + cache.add_flashblock_payload(payload).unwrap(); + + let guard = cache.inner.lock(); + let seq = guard.as_ref().unwrap(); + assert_eq!(seq.payload_id, PayloadId::new([1u8; 8])); + assert_eq!(seq.parent_hash, Some(parent)); + assert_eq!(seq.payloads.len(), 1); + } + + #[test] + fn add_payloads_same_id_appends() { + let cache = FlashblockPayloadsCache::default(); + let parent = B256::random(); + let id = [1u8; 8]; + + // First payload (base, index 0) + cache.add_flashblock_payload(make_payload(id, 0, Some(parent), 100)).unwrap(); + // Second payload (incremental, index 1) + cache.add_flashblock_payload(make_payload(id, 1, None, 100)).unwrap(); + // Third payload (incremental, index 2) + cache.add_flashblock_payload(make_payload(id, 2, None, 100)).unwrap(); + + let guard = cache.inner.lock(); + let seq = guard.as_ref().unwrap(); + assert_eq!(seq.payload_id, PayloadId::new(id)); + assert_eq!(seq.payloads.len(), 3); + assert_eq!(seq.payloads[0].index, 0); + assert_eq!(seq.payloads[1].index, 1); + assert_eq!(seq.payloads[2].index, 2); + } + + #[test] + fn add_payload_new_id_replaces_sequence() { + let cache = FlashblockPayloadsCache::default(); + let parent_a = B256::random(); + let parent_b = B256::random(); + + // Add payloads for first block + cache.add_flashblock_payload(make_payload([1u8; 8], 0, Some(parent_a), 100)).unwrap(); + cache.add_flashblock_payload(make_payload([1u8; 8], 1, None, 100)).unwrap(); + + // New payload_id replaces the entire cache + cache.add_flashblock_payload(make_payload([2u8; 8], 0, Some(parent_b), 101)).unwrap(); + + let guard = cache.inner.lock(); + let seq = guard.as_ref().unwrap(); + assert_eq!(seq.payload_id, PayloadId::new([2u8; 8])); + assert_eq!(seq.parent_hash, Some(parent_b)); + assert_eq!(seq.payloads.len(), 1); + } + + #[test] + fn parent_hash_extracted_from_first_payload_with_base() { + let cache = FlashblockPayloadsCache::default(); + let parent = B256::random(); + let id = [1u8; 8]; + + // First payload without base (no parent_hash) + cache.add_flashblock_payload(make_payload(id, 1, None, 100)).unwrap(); + + { + let guard = cache.inner.lock(); + assert_eq!(guard.as_ref().unwrap().parent_hash, None); + } + + // Second payload with base containing parent_hash - should backfill + cache.add_flashblock_payload(make_payload(id, 0, Some(parent), 100)).unwrap(); + + let guard = cache.inner.lock(); + assert_eq!(guard.as_ref().unwrap().parent_hash, Some(parent)); + } + + #[test] + fn parent_hash_not_overwritten_once_set() { + let cache = FlashblockPayloadsCache::default(); + let parent_first = B256::random(); + let parent_second = B256::random(); + let id = [1u8; 8]; + + // First payload sets parent_hash + cache.add_flashblock_payload(make_payload(id, 0, Some(parent_first), 100)).unwrap(); + + // Second payload with different parent_hash in base - should NOT overwrite + cache.add_flashblock_payload(make_payload(id, 1, Some(parent_second), 100)).unwrap(); + + let guard = cache.inner.lock(); + assert_eq!(guard.as_ref().unwrap().parent_hash, Some(parent_first)); + } + + // ======================================================================== + // FlashblockPayloadsSequence serialization + // ======================================================================== + + #[test] + fn sequence_serde_roundtrip() { + let parent = B256::random(); + let sequence = FlashblockPayloadsSequence { + payload_id: PayloadId::new([42u8; 8]), + parent_hash: Some(parent), + payloads: vec![ + make_payload([42u8; 8], 0, Some(parent), 100), + make_payload([42u8; 8], 1, None, 100), + ], + }; + + let json = serde_json::to_vec(&sequence).unwrap(); + let deserialized: FlashblockPayloadsSequence = serde_json::from_slice(&json).unwrap(); + + assert_eq!(deserialized.payload_id, sequence.payload_id); + assert_eq!(deserialized.parent_hash, sequence.parent_hash); + assert_eq!(deserialized.payloads.len(), sequence.payloads.len()); + assert_eq!(deserialized.payloads[0].index, 0); + assert_eq!(deserialized.payloads[1].index, 1); + } + + #[test] + fn sequence_serde_with_none_parent_hash() { + let sequence = FlashblockPayloadsSequence { + payload_id: PayloadId::new([1u8; 8]), + parent_hash: None, + payloads: vec![make_payload([1u8; 8], 0, None, 50)], + }; + + let json = serde_json::to_vec(&sequence).unwrap(); + let deserialized: FlashblockPayloadsSequence = serde_json::from_slice(&json).unwrap(); + + assert_eq!(deserialized.parent_hash, None); + } + + #[tokio::test] + async fn persist_writes_file_and_load_restores() { + let tmp = TempDir::new("persist_roundtrip"); + let file_path = tmp.path().join(PENDING_SEQUENCE_FILE); + + let cache = FlashblockPayloadsCache::with_persist_path(file_path.clone()); + let parent = B256::random(); + let id = [10u8; 8]; + + cache.add_flashblock_payload(make_payload(id, 0, Some(parent), 200)).unwrap(); + cache.add_flashblock_payload(make_payload(id, 1, None, 200)).unwrap(); + cache.add_flashblock_payload(make_payload(id, 2, None, 200)).unwrap(); + + // Persist to disk + cache.persist().await.unwrap(); + + // Verify file exists + assert!(file_path.exists(), "persistence file should exist after persist()"); + + // Verify no temp file left behind + let tmp_path = file_path.with_file_name(format!(".{PENDING_SEQUENCE_FILE}")); + assert!(!tmp_path.exists(), "temp file should be cleaned up after atomic rename"); + + // Load from file and verify contents match + let loaded_seq = + try_load_from_filepath(Some(&file_path)).expect("should load persisted sequence"); + + assert_eq!(loaded_seq.payload_id, PayloadId::new(id)); + assert_eq!(loaded_seq.parent_hash, Some(parent)); + assert_eq!(loaded_seq.payloads.len(), 3); + assert_eq!(loaded_seq.payloads[0].index, 0); + assert_eq!(loaded_seq.payloads[1].index, 1); + assert_eq!(loaded_seq.payloads[2].index, 2); + } + + #[tokio::test] + async fn persist_no_path_is_noop() { + let cache = FlashblockPayloadsCache::default(); + cache.add_flashblock_payload(make_payload([1u8; 8], 0, Some(B256::ZERO), 1)).unwrap(); + + // Should succeed without writing anything (no persist_path) + cache.persist().await.unwrap(); + } + + #[tokio::test] + async fn persist_empty_cache_is_noop() { + let tmp = TempDir::new("persist_empty"); + let file_path = tmp.path().join(PENDING_SEQUENCE_FILE); + + let cache = FlashblockPayloadsCache::with_persist_path(file_path.clone()); + + // Cache is empty — persist should be a no-op + cache.persist().await.unwrap(); + assert!(!file_path.exists(), "no file should be written for empty cache"); + } + + #[tokio::test] + async fn persist_overwrites_previous_file() { + let tmp = TempDir::new("persist_overwrite"); + let file_path = tmp.path().join(PENDING_SEQUENCE_FILE); + + let cache = FlashblockPayloadsCache::with_persist_path(file_path.clone()); + + // First sequence + let parent_a = B256::random(); + cache.add_flashblock_payload(make_payload([1u8; 8], 0, Some(parent_a), 100)).unwrap(); + cache.persist().await.unwrap(); + + // Replace with second sequence + let parent_b = B256::random(); + cache.add_flashblock_payload(make_payload([2u8; 8], 0, Some(parent_b), 101)).unwrap(); + cache.persist().await.unwrap(); + + // Loaded data should reflect the second sequence + let loaded_seq = + try_load_from_filepath(Some(&file_path)).expect("should load persisted sequence"); + assert_eq!(loaded_seq.payload_id, PayloadId::new([2u8; 8])); + assert_eq!(loaded_seq.parent_hash, Some(parent_b)); + } + + #[test] + fn load_from_nonexistent_file_returns_none() { + let result = try_load_from_filepath(Some(Path::new("/nonexistent/path.json"))); + assert!(result.is_none()); + } + + #[test] + fn load_from_invalid_json_returns_none() { + let tmp = TempDir::new("load_invalid"); + let file_path = tmp.path().join("bad.json"); + std::fs::write(&file_path, b"not valid json").unwrap(); + + let result = try_load_from_filepath(Some(&file_path)); + assert!(result.is_none()); + } + + #[tokio::test] + async fn persist_file_contains_valid_json() { + let tmp = TempDir::new("persist_json"); + let file_path = tmp.path().join(PENDING_SEQUENCE_FILE); + + let cache = FlashblockPayloadsCache::with_persist_path(file_path.clone()); + let parent = B256::random(); + cache.add_flashblock_payload(make_payload([5u8; 8], 0, Some(parent), 42)).unwrap(); + cache.persist().await.unwrap(); + + // Read raw bytes and verify it's valid JSON that deserializes correctly + let data = std::fs::read(&file_path).unwrap(); + let seq: FlashblockPayloadsSequence = serde_json::from_slice(&data).unwrap(); + assert_eq!(seq.payload_id, PayloadId::new([5u8; 8])); + assert_eq!(seq.parent_hash, Some(parent)); + assert_eq!(seq.payloads[0].metadata.block_number, 42); + } + + #[test] + fn get_txs_empty_cache_returns_none() { + let cache = FlashblockPayloadsCache::default(); + let result = cache.get_flashblocks_sequence_txs::(B256::random()); + assert!(result.is_none()); + } + + #[test] + fn get_txs_wrong_parent_hash_returns_none() { + let cache = FlashblockPayloadsCache::default(); + let parent = B256::random(); + let wrong_parent = B256::random(); + + cache.add_flashblock_payload(make_payload([1u8; 8], 0, Some(parent), 100)).unwrap(); + + let result = cache.get_flashblocks_sequence_txs::(wrong_parent); + assert!(result.is_none()); + } + + #[test] + fn get_txs_only_base_payload_returns_empty_vec() { + let cache = FlashblockPayloadsCache::default(); + let parent = B256::random(); + + // Only index 0 (base) — no flashblock transactions to return + cache.add_flashblock_payload(make_payload([1u8; 8], 0, Some(parent), 100)).unwrap(); + + let result = cache.get_flashblocks_sequence_txs::(parent); + assert_eq!(result, Some(vec![])); + } + + #[test] + fn get_txs_with_none_parent_hash_returns_none() { + let cache = FlashblockPayloadsCache::default(); + + // Payload without base (parent_hash will be None) + cache.add_flashblock_payload(make_payload([1u8; 8], 1, None, 100)).unwrap(); + + // Any query should return None since cached parent_hash is None + let result = cache.get_flashblocks_sequence_txs::(B256::ZERO); + assert!(result.is_none()); + } + + #[test] + fn get_txs_non_sequential_indexes_returns_none() { + let cache = FlashblockPayloadsCache::default(); + let parent = B256::random(); + let id = [1u8; 8]; + + // index 0 (base) + cache.add_flashblock_payload(make_payload(id, 0, Some(parent), 100)).unwrap(); + // index 1 — sequential + cache.add_flashblock_payload(make_payload(id, 1, None, 100)).unwrap(); + // index 3 — gap (skipped index 2) + cache.add_flashblock_payload(make_payload(id, 3, None, 100)).unwrap(); + + let result = cache.get_flashblocks_sequence_txs::(parent); + assert!(result.is_none(), "gap in indexes should return None"); + } + + #[test] + fn concurrent_add_and_read() { + let cache = FlashblockPayloadsCache::default(); + let cache_clone = cache.clone(); + let parent = B256::random(); + let id = [1u8; 8]; + + // Spawn writer thread + let writer = std::thread::spawn(move || { + for i in 0..100u64 { + cache_clone.add_flashblock_payload(make_payload(id, i, Some(parent), 100)).unwrap(); + } + }); + + // Read concurrently from main thread + for _ in 0..50 { + let guard = cache.inner.lock(); + if let Some(seq) = guard.as_ref() { + assert_eq!(seq.payload_id, PayloadId::new(id)); + assert!(!seq.payloads.is_empty()); + } + drop(guard); + } + + writer.join().unwrap(); + + let guard = cache.inner.lock(); + let seq = guard.as_ref().unwrap(); + assert_eq!(seq.payloads.len(), 100); + } + + #[test] + fn clone_shares_underlying_data() { + let cache = FlashblockPayloadsCache::default(); + let clone = cache.clone(); + + cache.add_flashblock_payload(make_payload([1u8; 8], 0, Some(B256::ZERO), 1)).unwrap(); + + // Clone should see the same data + let guard = clone.inner.lock(); + assert!(guard.is_some()); + assert_eq!(guard.as_ref().unwrap().payloads.len(), 1); + } } diff --git a/crates/builder/src/payload/flashblocks/config.rs b/crates/builder/src/payload/flashblocks/config.rs index 932cb245..3a734045 100644 --- a/crates/builder/src/payload/flashblocks/config.rs +++ b/crates/builder/src/payload/flashblocks/config.rs @@ -60,6 +60,9 @@ pub struct FlashblocksConfig { /// Maximum number of concurrent WebSocket subscribers pub ws_subscriber_limit: Option, + + /// Whether to replay from the persistence file on startup + pub replay_from_persistence_file: bool, } impl Default for FlashblocksConfig { @@ -80,6 +83,7 @@ impl Default for FlashblocksConfig { p2p_send_full_payload: false, p2p_process_full_payload: false, ws_subscriber_limit: None, + replay_from_persistence_file: false, } } } @@ -118,6 +122,7 @@ impl TryFrom for FlashblocksConfig { p2p_send_full_payload: args.flashblocks.p2p.p2p_send_full_payload, p2p_process_full_payload: args.flashblocks.p2p.p2p_process_full_payload, ws_subscriber_limit: args.flashblocks.ws_subscriber_limit, + replay_from_persistence_file: args.flashblocks.replay_from_persistence_file, }) } } diff --git a/crates/builder/src/payload/flashblocks/mod.rs b/crates/builder/src/payload/flashblocks/mod.rs index 5e495b6c..70d6469a 100644 --- a/crates/builder/src/payload/flashblocks/mod.rs +++ b/crates/builder/src/payload/flashblocks/mod.rs @@ -15,6 +15,7 @@ mod service; mod timing; mod wspub; +pub use cache::FlashblockPayloadsCache; pub use wspub::WebSocketPublisher; /// Block building strategy that progressively builds chunks of a block and makes them available diff --git a/crates/builder/src/payload/flashblocks/service.rs b/crates/builder/src/payload/flashblocks/service.rs index 6d434323..1fef9b6a 100644 --- a/crates/builder/src/payload/flashblocks/service.rs +++ b/crates/builder/src/payload/flashblocks/service.rs @@ -104,7 +104,11 @@ impl FlashblocksServiceBuilder { // Channels for built full block payloads let (built_payload_tx, built_payload_rx) = tokio::sync::mpsc::channel(16); - let p2p_cache = FlashblockPayloadsCache::new(); + let p2p_cache = if self.0.specific.replay_from_persistence_file { + FlashblockPayloadsCache::new(Some(ctx.config().datadir())) + } else { + FlashblockPayloadsCache::new(None) + }; let ws_pub: Arc = WebSocketPublisher::new( self.0.specific.ws_addr, diff --git a/crates/builder/src/payload/mod.rs b/crates/builder/src/payload/mod.rs index 4de61d15..4c1c3aa0 100644 --- a/crates/builder/src/payload/mod.rs +++ b/crates/builder/src/payload/mod.rs @@ -20,7 +20,9 @@ pub use builder_tx::{ InvalidContractDataError, SimulationSuccessResult, }; pub use context::OpPayloadBuilderCtx; -pub use flashblocks::{FlashblocksBuilder, FlashblocksServiceBuilder, WebSocketPublisher}; +pub use flashblocks::{ + FlashblockPayloadsCache, FlashblocksBuilder, FlashblocksServiceBuilder, WebSocketPublisher, +}; /// Defines the interface for any block builder implementation API entry point. /// diff --git a/crates/flashblocks/Cargo.toml b/crates/flashblocks/Cargo.toml index bd21dbf6..1ddd0937 100644 --- a/crates/flashblocks/Cargo.toml +++ b/crates/flashblocks/Cargo.toml @@ -13,6 +13,7 @@ default = [] [dependencies] xlayer-builder.workspace = true reth-node-api.workspace = true +reth-node-core.workspace = true reth-primitives-traits.workspace = true reth-tracing.workspace = true reth-optimism-flashblocks.workspace = true diff --git a/crates/flashblocks/src/handler.rs b/crates/flashblocks/src/handler.rs index c8bd75b1..170a9ef9 100644 --- a/crates/flashblocks/src/handler.rs +++ b/crates/flashblocks/src/handler.rs @@ -1,11 +1,14 @@ -use reth_node_api::FullNodeComponents; -use reth_optimism_flashblocks::FlashBlockRx; -use std::net::SocketAddr; -use std::sync::Arc; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use tracing::{debug, info, trace, warn}; + +use reth_node_api::FullNodeComponents; +use reth_node_core::dirs::{ChainPath, DataDirPath}; +use reth_optimism_flashblocks::{FlashBlock, FlashBlockRx}; + use xlayer_builder::{ - args::FlashblocksArgs, metrics::tokio::FlashblocksTaskMetrics, metrics::BuilderMetrics, - payload::WebSocketPublisher, + args::FlashblocksArgs, + metrics::{tokio::FlashblocksTaskMetrics, BuilderMetrics}, + payload::{FlashblockPayloadsCache, WebSocketPublisher}, }; pub struct FlashblocksService @@ -16,6 +19,7 @@ where flashblock_rx: FlashBlockRx, ws_pub: Arc, relay_flashblocks: bool, + datadir: ChainPath, } impl FlashblocksService @@ -27,6 +31,7 @@ where flashblock_rx: FlashBlockRx, args: FlashblocksArgs, relay_flashblocks: bool, + datadir: ChainPath, ) -> Result { let ws_addr = SocketAddr::new(args.flashblocks_addr.parse()?, args.flashblocks_port); @@ -44,7 +49,7 @@ where info!(target: "flashblocks", "WebSocket publisher initialized at {}", ws_addr); - Ok(Self { node, flashblock_rx, ws_pub, relay_flashblocks }) + Ok(Self { node, flashblock_rx, ws_pub, relay_flashblocks, datadir }) } pub fn spawn(mut self) { @@ -52,16 +57,25 @@ where let task_executor = self.node.task_executor().clone(); if self.relay_flashblocks { + let datadir = self.datadir.clone(); + let flashblock_rx = self.flashblock_rx.resubscribe(); task_executor.spawn_critical( - "xlayer-flashblocks-service", + "xlayer-flashblocks-persistence", Box::pin(async move { - self.run().await; + handle_persistence(flashblock_rx, datadir).await; + }), + ); + + task_executor.spawn_critical( + "xlayer-flashblocks-publish", + Box::pin(async move { + self.publish().await; }), ); } } - async fn run(&mut self) { + async fn publish(&mut self) { info!( target: "flashblocks", "Flashblocks websocket publisher started" @@ -88,7 +102,8 @@ where info!(target: "flashblocks", "Flashblocks service stopped"); } - async fn publish_flashblock(&self, flashblock: &Arc) { + /// Relays the incoming flashblock to the flashblock websocket subscribers. + async fn publish_flashblock(&self, flashblock: &Arc) { match self.ws_pub.publish(flashblock) { Ok(_) => { trace!( @@ -107,3 +122,47 @@ where } } } + +/// Handles the persistence of the pending flashblocks sequence to disk. +async fn handle_persistence(mut rx: FlashBlockRx, datadir: ChainPath) { + let cache = FlashblockPayloadsCache::new(Some(datadir)); + + // Set default flush interval to 5 seconds + let mut flush_interval = tokio::time::interval(Duration::from_secs(5)); + let mut dirty = false; + + loop { + tokio::select! { + result = rx.recv() => { + match result { + Ok(flashblock) => { + if let Err(e) = cache.add_flashblock_payload(flashblock.as_ref().clone()) { + warn!(target: "flashblocks", "Failed to cache flashblock payload: {e}"); + continue; + } + dirty = true; + } + Err(e) => { + warn!(target: "flashblocks", "Persistence handle receiver error: {e:?}"); + break; + } + } + } + _ = flush_interval.tick() => { + if dirty { + if let Err(e) = cache.persist().await { + warn!(target: "flashblocks", "Failed to persist pending sequence: {e}"); + } + dirty = false; + } + } + } + } + + // Flush again on shutdown + if dirty && let Err(e) = cache.persist().await { + warn!(target: "flashblocks", "Failed final persist of pending sequence: {e}"); + } + + info!(target: "flashblocks", "Flashblocks persistence handle stopped"); +}