diff --git a/crates/networking/p2p/discv5/server.rs b/crates/networking/p2p/discv5/server.rs index 954b6ee44b4..6a5c9cc339a 100644 --- a/crates/networking/p2p/discv5/server.rs +++ b/crates/networking/p2p/discv5/server.rs @@ -3,8 +3,8 @@ use crate::{ codec::Discv5Codec, messages::{ DISTANCES_PER_FIND_NODE_MSG, FindNodeMessage, Handshake, Message, NodesMessage, - Ordinary, Packet, PacketCodecError, PacketHeader, PacketTrait as _, PingMessage, - PongMessage, WhoAreYou, + Ordinary, Packet, PacketCodecError, PacketTrait as _, PingMessage, PongMessage, + WhoAreYou, }, session::{build_challenge_data, create_id_signature, derive_session_keys}, }, diff --git a/crates/networking/p2p/network.rs b/crates/networking/p2p/network.rs index 53cf2c03a96..043d7653e0e 100644 --- a/crates/networking/p2p/network.rs +++ b/crates/networking/p2p/network.rs @@ -5,7 +5,7 @@ use crate::rlpx::l2::l2_connection::P2PBasedContext; pub struct P2PBasedContext; use crate::{ discovery_server::{DiscoveryServer, DiscoveryServerError}, - metrics::METRICS, + metrics::{CurrentStepValue, METRICS}, peer_table::{PeerData, PeerTable}, rlpx::{ connection::server::{PeerConnBroadcastSender, PeerConnection}, @@ -16,6 +16,7 @@ use crate::{ types::Node, }; use ethrex_blockchain::Blockchain; +use ethrex_common::H256; use ethrex_storage::Store; use secp256k1::SecretKey; use spawned_concurrency::tasks::GenServerHandle; @@ -171,122 +172,470 @@ pub async fn periodically_show_peer_stats(blockchain: Arc, mut peer_ periodically_show_peer_stats_after_sync(&mut peer_table).await; } +/// Tracks metric values at phase start and from the previous interval for rate calculations +#[derive(Default, Clone, Copy)] +struct PhaseCounters { + headers: u64, + accounts: u64, + accounts_inserted: u64, + storage: u64, + storage_inserted: u64, + healed_accounts: u64, + healed_storage: u64, + bytecodes: u64, +} + +impl PhaseCounters { + fn capture_current() -> Self { + Self { + headers: METRICS.downloaded_headers.get(), + accounts: METRICS.downloaded_account_tries.load(Ordering::Relaxed), + accounts_inserted: METRICS.account_tries_inserted.load(Ordering::Relaxed), + storage: METRICS.storage_leaves_downloaded.get(), + storage_inserted: METRICS.storage_leaves_inserted.get(), + healed_accounts: METRICS + .global_state_trie_leafs_healed + .load(Ordering::Relaxed), + healed_storage: METRICS + .global_storage_tries_leafs_healed + .load(Ordering::Relaxed), + bytecodes: METRICS.downloaded_bytecodes.load(Ordering::Relaxed), + } + } +} + pub async fn periodically_show_peer_stats_during_syncing( blockchain: Arc, peer_table: &mut PeerTable, ) { let start = std::time::Instant::now(); + let mut previous_step = CurrentStepValue::None; + let mut phase_start_time = std::time::Instant::now(); + let mut sync_started_logged = false; + + // Track metrics at phase start for phase summaries + let mut phase_start = PhaseCounters::default(); + // Track metrics from previous interval for rate calculations + let mut prev_interval = PhaseCounters::default(); + loop { - { - if blockchain.is_synced() { - return; + if blockchain.is_synced() { + // Log sync complete summary + let total_elapsed = format_duration(start.elapsed()); + let headers_downloaded = METRICS.downloaded_headers.get(); + let accounts_downloaded = METRICS.downloaded_account_tries.load(Ordering::Relaxed); + let storage_downloaded = METRICS.storage_leaves_downloaded.get(); + let bytecodes_downloaded = METRICS.downloaded_bytecodes.load(Ordering::Relaxed); + let healed_accounts = METRICS + .global_state_trie_leafs_healed + .load(Ordering::Relaxed); + let healed_storage = METRICS + .global_storage_tries_leafs_healed + .load(Ordering::Relaxed); + + info!(""); + info!( + "╭──────────────────────────────────────────────────────────────────────────────╮" + ); + info!( + "│ SNAP SYNC COMPLETE │" + ); + info!( + "├──────────────────────────────────────────────────────────────────────────────┤" + ); + info!("│ {:<76}│", format!("Total time: {}", total_elapsed)); + info!( + "├──────────────────────────────────────────────────────────────────────────────┤" + ); + info!( + "│ Data summary: │" + ); + let headers_accounts = format!( + " Headers: {:<14} │ Accounts: {}", + format_thousands(headers_downloaded), + format_thousands(accounts_downloaded) + ); + info!("│ {:<76}│", headers_accounts); + let storage_bytecodes = format!( + " Storage: {:<14} │ Bytecodes: {}", + format_thousands(storage_downloaded), + format_thousands(bytecodes_downloaded) + ); + info!("│ {:<76}│", storage_bytecodes); + let healed = format!( + " Healed: {} state paths + {} storage accounts", + format_thousands(healed_accounts), + format_thousands(healed_storage) + ); + info!("│ {:<76}│", healed); + info!( + "╰──────────────────────────────────────────────────────────────────────────────╯" + ); + return; + } + + let metrics_enabled = *METRICS.enabled.lock().await; + if !metrics_enabled { + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + + let current_step = METRICS.current_step.get(); + let peer_number = peer_table.peer_count().await.unwrap_or(0); + + // Log sync started banner when we have valid sync head data + if !sync_started_logged && current_step != CurrentStepValue::None { + let sync_head_block = METRICS.sync_head_block.load(Ordering::Relaxed); + let sync_head_hash = *METRICS.sync_head_hash.lock().await; + + // Only show banner when sync_head data is populated (not genesis/default) + if sync_head_block > 0 && sync_head_hash != H256::zero() { + let head_short = format!("{:x}", sync_head_hash); + let head_short = &head_short[..8.min(head_short.len())]; + + info!(""); + info!("╭─────────────────────────────────────────────────────────────╮"); + info!("│ {:<59} │", "SNAP SYNC STARTED"); + let target_content = format!( + "Target: {}... (block #{})", + head_short, + format_thousands(sync_head_block) + ); + info!("│ {:<59} │", target_content); + info!("│ {:<59} │", format!("Peers: {}", peer_number)); + info!("╰─────────────────────────────────────────────────────────────╯"); + sync_started_logged = true; } - let metrics_enabled = *METRICS.enabled.lock().await; - // Show the metrics only when these are enabled - if !metrics_enabled { - tokio::time::sleep(Duration::from_secs(1)).await; - continue; + } + + // Only show phase progress after the SNAP SYNC STARTED banner + if !sync_started_logged { + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + + // Detect phase transition + if current_step != previous_step && current_step != CurrentStepValue::None { + // Log completion of previous phase (if any) + if previous_step != CurrentStepValue::None { + let phase_elapsed = format_duration(phase_start_time.elapsed()); + log_phase_completion( + previous_step, + phase_elapsed, + &phase_metrics(previous_step, &phase_start).await, + ); } - // Common metrics - let elapsed = format_duration(start.elapsed()); - let peer_number = peer_table.peer_count().await.unwrap_or(0); - let current_step = METRICS.current_step.get(); - let current_header_hash = *METRICS.sync_head_hash.lock().await; + // Start new phase + phase_start_time = std::time::Instant::now(); + + // Capture metrics at phase start + phase_start = PhaseCounters::capture_current(); + prev_interval = phase_start; + + log_phase_separator(current_step); + previous_step = current_step; + } + + // Log phase-specific progress update + let phase_elapsed = phase_start_time.elapsed(); + let total_elapsed = format_duration(start.elapsed()); + + log_phase_progress( + current_step, + phase_elapsed, + &total_elapsed, + peer_number, + &prev_interval, + ) + .await; - // Headers metrics + // Update previous interval counters for next rate calculation + prev_interval = PhaseCounters::capture_current(); + + tokio::time::sleep(Duration::from_secs(10)).await; + } +} + +/// Returns (phase_number, phase_name) for the current step +fn phase_info(step: CurrentStepValue) -> (u8, &'static str) { + match step { + CurrentStepValue::DownloadingHeaders => (1, "BLOCK HEADERS"), + CurrentStepValue::RequestingAccountRanges => (2, "ACCOUNT RANGES"), + CurrentStepValue::InsertingAccountRanges | CurrentStepValue::InsertingAccountRangesNoDb => { + (3, "ACCOUNT INSERTION") + } + CurrentStepValue::RequestingStorageRanges => (4, "STORAGE RANGES"), + CurrentStepValue::InsertingStorageRanges => (5, "STORAGE INSERTION"), + CurrentStepValue::HealingState => (6, "STATE HEALING"), + CurrentStepValue::HealingStorage => (7, "STORAGE HEALING"), + CurrentStepValue::RequestingBytecodes => (8, "BYTECODES"), + CurrentStepValue::None => (0, "UNKNOWN"), + } +} + +fn log_phase_separator(step: CurrentStepValue) { + let (phase_num, phase_name) = phase_info(step); + let header = format!("── PHASE {}/8: {} ", phase_num, phase_name); + let header_width = header.chars().count(); + let padding_width = 80usize.saturating_sub(header_width); + let padding = "─".repeat(padding_width); + info!(""); + info!("{}{}", header, padding); +} + +fn log_phase_completion(step: CurrentStepValue, elapsed: String, summary: &str) { + let (_, phase_name) = phase_info(step); + info!("✓ {} complete: {} in {}", phase_name, summary, elapsed); +} + +async fn phase_metrics(step: CurrentStepValue, phase_start: &PhaseCounters) -> String { + match step { + CurrentStepValue::DownloadingHeaders => { + let downloaded = METRICS + .downloaded_headers + .get() + .saturating_sub(phase_start.headers); + format!("{} headers", format_thousands(downloaded)) + } + CurrentStepValue::RequestingAccountRanges => { + let downloaded = METRICS + .downloaded_account_tries + .load(Ordering::Relaxed) + .saturating_sub(phase_start.accounts); + format!("{} accounts", format_thousands(downloaded)) + } + CurrentStepValue::InsertingAccountRanges | CurrentStepValue::InsertingAccountRangesNoDb => { + let inserted = METRICS + .account_tries_inserted + .load(Ordering::Relaxed) + .saturating_sub(phase_start.accounts_inserted); + format!("{} accounts inserted", format_thousands(inserted)) + } + CurrentStepValue::RequestingStorageRanges => { + let downloaded = METRICS + .storage_leaves_downloaded + .get() + .saturating_sub(phase_start.storage); + format!("{} storage slots", format_thousands(downloaded)) + } + CurrentStepValue::InsertingStorageRanges => { + let inserted = METRICS + .storage_leaves_inserted + .get() + .saturating_sub(phase_start.storage_inserted); + format!("{} storage slots inserted", format_thousands(inserted)) + } + CurrentStepValue::HealingState => { + let healed = METRICS + .global_state_trie_leafs_healed + .load(Ordering::Relaxed) + .saturating_sub(phase_start.healed_accounts); + format!("{} state paths healed", format_thousands(healed)) + } + CurrentStepValue::HealingStorage => { + let healed = METRICS + .global_storage_tries_leafs_healed + .load(Ordering::Relaxed) + .saturating_sub(phase_start.healed_storage); + format!("{} storage accounts healed", format_thousands(healed)) + } + CurrentStepValue::RequestingBytecodes => { + let downloaded = METRICS + .downloaded_bytecodes + .load(Ordering::Relaxed) + .saturating_sub(phase_start.bytecodes); + format!("{} bytecodes", format_thousands(downloaded)) + } + CurrentStepValue::None => String::new(), + } +} + +/// Interval in seconds between progress updates +const PROGRESS_INTERVAL_SECS: u64 = 30; + +async fn log_phase_progress( + step: CurrentStepValue, + phase_elapsed: Duration, + total_elapsed: &str, + peer_count: usize, + prev_interval: &PhaseCounters, +) { + let phase_elapsed_str = format_duration(phase_elapsed); + + // Use consistent column widths: left column 40 chars, then │, then right column + let col1_width = 40; + + match step { + CurrentStepValue::DownloadingHeaders => { let headers_to_download = METRICS.sync_head_block.load(Ordering::Relaxed); - // We may download more than expected headers due to duplicates - // We just clamp it to the max to avoid showing the user confusing data let headers_downloaded = u64::min(METRICS.downloaded_headers.get(), headers_to_download); - let headers_percentage = if headers_to_download == 0 { + let interval_downloaded = headers_downloaded.saturating_sub(prev_interval.headers); + let percentage = if headers_to_download == 0 { 0.0 } else { (headers_downloaded as f64 / headers_to_download as f64) * 100.0 }; - let elapsed_secs = start.elapsed().as_secs(); - let headers_per_second = if elapsed_secs == 0 { - 0 + let rate = interval_downloaded / PROGRESS_INTERVAL_SECS; + + let progress = progress_bar(percentage, 40); + info!(" {} {:>5.1}%", progress, percentage); + info!(""); + let col1 = format!( + "Headers: {} / {}", + format_thousands(headers_downloaded), + format_thousands(headers_to_download) + ); + info!(" {: { + let accounts_downloaded = METRICS.downloaded_account_tries.load(Ordering::Relaxed); + let interval_downloaded = accounts_downloaded.saturating_sub(prev_interval.accounts); + let rate = interval_downloaded / PROGRESS_INTERVAL_SECS; + + info!(""); + let col1 = format!( + "Accounts fetched: {}", + format_thousands(accounts_downloaded) + ); + info!(" {: { + let accounts_to_insert = METRICS.downloaded_account_tries.load(Ordering::Relaxed); + let accounts_inserted = METRICS.account_tries_inserted.load(Ordering::Relaxed); + let interval_inserted = + accounts_inserted.saturating_sub(prev_interval.accounts_inserted); + let percentage = if accounts_to_insert == 0 { + 0.0 } else { - headers_downloaded / elapsed_secs + (accounts_inserted as f64 / accounts_to_insert as f64) * 100.0 }; + let rate = interval_inserted / PROGRESS_INTERVAL_SECS; - // Account leaves metrics - let account_leaves_downloaded = - METRICS.downloaded_account_tries.load(Ordering::Relaxed); - let account_leaves_inserted = METRICS.account_tries_inserted.load(Ordering::Relaxed); - let accounts_per_second = - if let Some(start_time) = *METRICS.account_tries_download_start_time.lock().await { - let elapsed_secs = start_time.elapsed().map(|d| d.as_secs()).unwrap_or(0); - if elapsed_secs == 0 { - 0 - } else { - account_leaves_downloaded / elapsed_secs - } - } else { - 0 - }; - - // Storage leaves metrics - let storage_leaves_downloaded = METRICS.storage_leaves_downloaded.get(); - let storage_leaves_inserted = METRICS.storage_leaves_inserted.get(); - let storage_per_second = - if let Some(start_time) = *METRICS.storage_tries_download_start_time.lock().await { - let elapsed_secs = start_time.elapsed().map(|d| d.as_secs()).unwrap_or(0); - if elapsed_secs == 0 { - 0 - } else { - storage_leaves_downloaded / elapsed_secs - } - } else { - 0 - }; - - // Healing metrics - let healed_accounts = METRICS + let progress = progress_bar(percentage, 40); + info!(" {} {:>5.1}%", progress, percentage); + info!(""); + let col1 = format!( + "Accounts: {} / {}", + format_thousands(accounts_inserted), + format_thousands(accounts_to_insert) + ); + info!(" {: { + let storage_downloaded = METRICS.storage_leaves_downloaded.get(); + let interval_downloaded = storage_downloaded.saturating_sub(prev_interval.storage); + let rate = interval_downloaded / PROGRESS_INTERVAL_SECS; + + info!(""); + let col1 = format!( + "Storage slots fetched: {}", + format_thousands(storage_downloaded) + ); + info!(" {: { + let storage_inserted = METRICS.storage_leaves_inserted.get(); + let interval_inserted = storage_inserted.saturating_sub(prev_interval.storage_inserted); + let rate = interval_inserted / PROGRESS_INTERVAL_SECS; + + info!(""); + let col1 = format!( + "Storage slots inserted: {}", + format_thousands(storage_inserted) + ); + info!(" {: { + let healed = METRICS .global_state_trie_leafs_healed .load(Ordering::Relaxed); - let healed_storages = METRICS + let interval_healed = healed.saturating_sub(prev_interval.healed_accounts); + let rate = interval_healed / PROGRESS_INTERVAL_SECS; + + info!(""); + let col1 = format!("State paths healed: {}", format_thousands(healed)); + info!(" {: { + let healed = METRICS .global_storage_tries_leafs_healed .load(Ordering::Relaxed); + let interval_healed = healed.saturating_sub(prev_interval.healed_storage); + let rate = interval_healed / PROGRESS_INTERVAL_SECS; - // Bytecode metrics + info!(""); + let col1 = format!("Storage accounts healed: {}", format_thousands(healed)); + info!(" {: { + let bytecodes_to_download = METRICS.bytecodes_to_download.load(Ordering::Relaxed); let bytecodes_downloaded = METRICS.downloaded_bytecodes.load(Ordering::Relaxed); - let bytecodes_per_second = - if let Some(start_time) = *METRICS.bytecode_download_start_time.lock().await { - let elapsed_secs = start_time.elapsed().map(|d| d.as_secs()).unwrap_or(0); - if elapsed_secs == 0 { - 0 - } else { - bytecodes_downloaded / elapsed_secs - } - } else { - 0 - }; - - // Truncate hash to first 6 hex chars - let head_short = format!("{:x}", current_header_hash); - let head_short = &head_short[..6.min(head_short.len())]; + let interval_downloaded = bytecodes_downloaded.saturating_sub(prev_interval.bytecodes); + let percentage = if bytecodes_to_download == 0 { + 0.0 + } else { + (bytecodes_downloaded as f64 / bytecodes_to_download as f64) * 100.0 + }; + let rate = interval_downloaded / PROGRESS_INTERVAL_SECS; - info!( - r#" -─────────────────────────────────────────────────────────────────────── - SNAP SYNC │ {elapsed} │ {peer_number} peers │ {current_step} │ {head_short} -─────────────────────────────────────────────────────────────────────── - 1. Headers Downloaded {headers_downloaded:>13} {headers_percentage:>5.1}% {headers_per_second} headers/s - 2. Accounts Downloaded {account_leaves_downloaded:>13} {accounts_per_second} accounts/s - 3. Accounts Inserted {account_leaves_inserted:>13} - 4. Storage Downloaded {storage_leaves_downloaded:>13} {storage_per_second} storage slots/s - 5. Storage Inserted {storage_leaves_inserted:>13} - 6. Healing: {healed_accounts} accounts - 7. Healing: {healed_storages} storages - 8. Bytecodes Downloaded {bytecodes_downloaded:>13} {bytecodes_per_second} bytecodes/s -───────────────────────────────────────────────────────────────────────"# + let progress = progress_bar(percentage, 40); + info!(" {} {:>5.1}%", progress, percentage); + info!(""); + let col1 = format!( + "Bytecodes: {} / {}", + format_thousands(bytecodes_downloaded), + format_thousands(bytecodes_to_download) ); + info!(" {: {} + } +} + +fn progress_bar(percentage: f64, width: usize) -> String { + let clamped_percentage = percentage.clamp(0.0, 100.0); + let filled = ((clamped_percentage / 100.0) * width as f64) as usize; + let filled = filled.min(width); + let empty = width.saturating_sub(filled); + format!("{}{}", "▓".repeat(filled), "░".repeat(empty)) +} + +fn format_thousands(n: u64) -> String { + let s = n.to_string(); + let mut result = String::new(); + for (i, c) in s.chars().rev().enumerate() { + if i > 0 && i % 3 == 0 { + result.push(','); } - tokio::time::sleep(Duration::from_secs(30)).await; + result.push(c); } + result.chars().rev().collect() } /// Shows the amount of connected peers, active peers, and peers suitable for snap sync on a set interval diff --git a/crates/networking/p2p/sync.rs b/crates/networking/p2p/sync.rs index 1e490f649bb..faec6ac9031 100644 --- a/crates/networking/p2p/sync.rs +++ b/crates/networking/p2p/sync.rs @@ -13,7 +13,7 @@ use crate::utils::{ get_account_storages_snapshots_dir, get_code_hashes_snapshots_dir, }; use crate::{ - metrics::METRICS, + metrics::{CurrentStepValue, METRICS}, peer_handler::{MAX_BLOCK_BODIES_TO_REQUEST, PeerHandler}, }; use ethrex_blockchain::{BatchBlockProcessingFailure, Blockchain, error::ChainError}; @@ -722,6 +722,9 @@ impl Syncer { info!("Finish downloading account ranges from peers"); *METRICS.account_tries_insert_start_time.lock().await = Some(SystemTime::now()); + METRICS + .current_step + .set(CurrentStepValue::InsertingAccountRanges); // We read the account leafs from the files in account_state_snapshots_dir, write it into // the trie to compute the nodes and stores the accounts with storages for later use