From e20ffc5ddc1e67e883ecb513a092d49723a5c7ac Mon Sep 17 00:00:00 2001 From: Jun Song Date: Tue, 5 Aug 2025 23:56:55 +0900 Subject: [PATCH 01/21] feat: add skeleton codes for validator service --- Cargo.lock | 2 + crates/common/validator/lean/Cargo.toml | 4 ++ crates/common/validator/lean/src/service.rs | 56 ++++++++++++++++++++- 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cfc940bdd..7b0d5dc6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5756,6 +5756,8 @@ dependencies = [ name = "ream-validator-lean" version = "0.1.0" dependencies = [ + "ream-network-spec", + "tokio", "tracing", ] diff --git a/crates/common/validator/lean/Cargo.toml b/crates/common/validator/lean/Cargo.toml index 54687680a..a5a3a3230 100644 --- a/crates/common/validator/lean/Cargo.toml +++ b/crates/common/validator/lean/Cargo.toml @@ -10,4 +10,8 @@ rust-version.workspace = true version.workspace = true [dependencies] +tokio.workspace = true tracing = { workspace = true, features = ["log"] } + +# ream dependencies +ream-network-spec.workspace = true diff --git a/crates/common/validator/lean/src/service.rs b/crates/common/validator/lean/src/service.rs index 587fe690a..4e093ee2d 100644 --- a/crates/common/validator/lean/src/service.rs +++ b/crates/common/validator/lean/src/service.rs @@ -1,7 +1,20 @@ -use std::{thread::sleep, time::Duration}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use ream_network_spec::networks::lean_network_spec; +use tokio::time::{Instant, MissedTickBehavior, interval_at}; use tracing::info; +/// 3SF-mini divides a slot into 4 intervals. +/// Reference: https://github.com/ethereum/research/blob/d225a6775a9b184b5c1fd6c830cc58a375d9535f/3sf-mini/p2p.py#L77-L98 +const INTERVALS_PER_SLOT: u64 = 4; + +/// ValidatorService is responsible for managing validator operations +/// such as proposing blocks and voting on them. +/// +/// Every first tick (t=0) it proposes a block if it's the validator's turn. +/// Every second tick (t=1/4) it votes on the proposed block. +/// +/// NOTE: Other ticks should be handled by the other services, such as the consensus service. pub struct ValidatorService {} impl ValidatorService { @@ -12,8 +25,47 @@ impl ValidatorService { pub async fn start(self) { info!("Validator Service started"); + // Get the Lean network specification. + let network_spec = lean_network_spec(); + let seconds_per_slot = network_spec.seconds_per_slot; + let genesis_time = network_spec.genesis_time; + + // Calculate the genesis instant from the genesis time (in seconds). + let genesis_instant = UNIX_EPOCH + Duration::from_secs(genesis_time); + + // Assume genesis time is "always" in the future, + // as we don't support syncing features yet. + let interval_start = Instant::now() + + genesis_instant + .duration_since(SystemTime::now()) + .expect("Genesis time is in the past"); + + let mut tick_count = 0u64; + let mut interval = interval_at( + interval_start, + Duration::from_secs(seconds_per_slot / INTERVALS_PER_SLOT), + ); + interval.set_missed_tick_behavior(MissedTickBehavior::Burst); + loop { - sleep(Duration::from_secs(10)); + tokio::select! { + _ = interval.tick() => { + match tick_count % 4 { + 0 => { + // First tick (t=0): Propose a block. + info!("Propose block if it's my turn."); + } + 1 => { + // Second tick (t=1/4): Vote. + info!("Vote."); + } + _ => { + // Other ticks (t=2/4, t=3/4): Do nothing. + } + } + tick_count += 1; + } + } } } } From f26095abac7caffb7aed4280bd86752aa6ed32ba Mon Sep 17 00:00:00 2001 From: Jun Song Date: Wed, 6 Aug 2025 00:44:05 +0900 Subject: [PATCH 02/21] chore: add LeanChainService --- Cargo.lock | 2 + bin/ream/Cargo.toml | 1 + bin/ream/src/main.rs | 8 +++ crates/common/chain/lean/src/lib.rs | 1 + crates/common/chain/lean/src/service.rs | 63 +++++++++++++++++++ .../consensus/misc/src/constants/lean.rs | 3 + crates/common/validator/lean/Cargo.toml | 1 + crates/common/validator/lean/src/service.rs | 7 +-- 8 files changed, 82 insertions(+), 4 deletions(-) create mode 100644 crates/common/chain/lean/src/service.rs diff --git a/Cargo.lock b/Cargo.lock index 7b0d5dc6e..dc9336439 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5161,6 +5161,7 @@ dependencies = [ "rand 0.8.5", "ream-account-manager", "ream-beacon-api-types", + "ream-chain-lean", "ream-checkpoint-sync", "ream-consensus-beacon", "ream-consensus-misc", @@ -5756,6 +5757,7 @@ dependencies = [ name = "ream-validator-lean" version = "0.1.0" dependencies = [ + "ream-consensus-misc", "ream-network-spec", "tokio", "tracing", diff --git a/bin/ream/Cargo.toml b/bin/ream/Cargo.toml index 3564d137c..692e42812 100644 --- a/bin/ream/Cargo.toml +++ b/bin/ream/Cargo.toml @@ -31,6 +31,7 @@ url.workspace = true # ream dependencies ream-account-manager.workspace = true ream-beacon-api-types.workspace = true +ream-chain-lean.workspace = true ream-checkpoint-sync.workspace = true ream-consensus-beacon.workspace = true ream-consensus-misc.workspace = true diff --git a/bin/ream/src/main.rs b/bin/ream/src/main.rs index a162256de..ae91f16a2 100644 --- a/bin/ream/src/main.rs +++ b/bin/ream/src/main.rs @@ -15,6 +15,7 @@ use ream::cli::{ voluntary_exit::VoluntaryExitConfig, }; use ream_beacon_api_types::id::{ID, ValidatorID}; +use ream_chain_lean::service::LeanChainService; use ream_checkpoint_sync::initialize_db_from_checkpoint; use ream_consensus_misc::{ constants::beacon::set_genesis_validator_root, misc::compute_epoch_at_slot, @@ -97,9 +98,13 @@ pub async fn run_lean_node(config: LeanNodeConfig, executor: ReamExecutor) { set_lean_network_spec(config.network.clone()); + let chain_service = LeanChainService::new().await; let network_service = LeanNetworkService::new().await; let validator_service = LeanValidatorService::new().await; + let chain_future = executor.spawn(async move { + chain_service.start().await; + }); let network_future = executor.spawn(async move { network_service.start().await; }); @@ -108,6 +113,9 @@ pub async fn run_lean_node(config: LeanNodeConfig, executor: ReamExecutor) { }); tokio::select! { + _ = chain_future => { + info!("Chain service has stopped unexpectedly"); + } _ = network_future => { info!("Network service has stopped unexpectedly"); } diff --git a/crates/common/chain/lean/src/lib.rs b/crates/common/chain/lean/src/lib.rs index c8f122945..7c5a5b6bf 100644 --- a/crates/common/chain/lean/src/lib.rs +++ b/crates/common/chain/lean/src/lib.rs @@ -1 +1,2 @@ +pub mod service; pub mod staker; diff --git a/crates/common/chain/lean/src/service.rs b/crates/common/chain/lean/src/service.rs new file mode 100644 index 000000000..fc94fa8f4 --- /dev/null +++ b/crates/common/chain/lean/src/service.rs @@ -0,0 +1,63 @@ +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use ream_consensus_misc::constants::lean::INTERVALS_PER_SLOT; +use ream_network_spec::networks::lean_network_spec; +use tokio::time::{Instant, MissedTickBehavior, interval_at}; +use tracing::info; + +pub struct LeanChainService {} + +impl LeanChainService { + pub async fn new() -> Self { + LeanChainService {} + } + + pub async fn start(self) { + info!("Lean Chain Service started"); + + // TODO: Duplicate clock logic from ValidatorService. May need to refactor later. + + // Get the Lean network specification. + let network_spec = lean_network_spec(); + let seconds_per_slot = network_spec.seconds_per_slot; + let genesis_time = network_spec.genesis_time; + + // Calculate the genesis instant from the genesis time (in seconds). + let genesis_instant = UNIX_EPOCH + Duration::from_secs(genesis_time); + + // Assume genesis time is "always" in the future, + // as we don't support syncing features yet. + let interval_start = Instant::now() + + genesis_instant + .duration_since(SystemTime::now()) + .expect("Genesis time is in the past"); + + let mut tick_count = 0u64; + let mut interval = interval_at( + interval_start, + Duration::from_secs(seconds_per_slot / INTERVALS_PER_SLOT), + ); + interval.set_missed_tick_behavior(MissedTickBehavior::Burst); + + loop { + tokio::select! { + _ = interval.tick() => { + match tick_count % 4 { + 2 => { + // Third tick (t=2/4): Compute the safe target. + info!("Compute safe target."); + } + 3 => { + // Fourth tick (t=3/4): Accept new votes. + info!("Accept new votes."); + } + _ => { + // Other ticks (t=0, t=1/4): Do nothing. + } + } + tick_count += 1; + } + } + } + } +} diff --git a/crates/common/consensus/misc/src/constants/lean.rs b/crates/common/consensus/misc/src/constants/lean.rs index d68aabaf2..4a8dcdad2 100644 --- a/crates/common/consensus/misc/src/constants/lean.rs +++ b/crates/common/consensus/misc/src/constants/lean.rs @@ -1,3 +1,6 @@ +/// 3SF-mini divides a slot into 4 intervals. +/// Reference: https://github.com/ethereum/research/blob/d225a6775a9b184b5c1fd6c830cc58a375d9535f/3sf-mini/p2p.py#L77-L98 +pub const INTERVALS_PER_SLOT: u64 = 4; pub const MAX_HISTORICAL_BLOCK_HASHES: u64 = 262144; pub const SLOT_DURATION: u64 = 12; pub const VALIDATOR_REGISTRY_LIMIT: u64 = 4096; diff --git a/crates/common/validator/lean/Cargo.toml b/crates/common/validator/lean/Cargo.toml index a5a3a3230..32783dada 100644 --- a/crates/common/validator/lean/Cargo.toml +++ b/crates/common/validator/lean/Cargo.toml @@ -14,4 +14,5 @@ tokio.workspace = true tracing = { workspace = true, features = ["log"] } # ream dependencies +ream-consensus-misc.workspace = true ream-network-spec.workspace = true diff --git a/crates/common/validator/lean/src/service.rs b/crates/common/validator/lean/src/service.rs index 4e093ee2d..ed80f9795 100644 --- a/crates/common/validator/lean/src/service.rs +++ b/crates/common/validator/lean/src/service.rs @@ -1,13 +1,10 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use ream_consensus_misc::constants::lean::INTERVALS_PER_SLOT; use ream_network_spec::networks::lean_network_spec; use tokio::time::{Instant, MissedTickBehavior, interval_at}; use tracing::info; -/// 3SF-mini divides a slot into 4 intervals. -/// Reference: https://github.com/ethereum/research/blob/d225a6775a9b184b5c1fd6c830cc58a375d9535f/3sf-mini/p2p.py#L77-L98 -const INTERVALS_PER_SLOT: u64 = 4; - /// ValidatorService is responsible for managing validator operations /// such as proposing blocks and voting on them. /// @@ -25,6 +22,8 @@ impl ValidatorService { pub async fn start(self) { info!("Validator Service started"); + // TODO: Duplicate clock logic from LeanChainService. May need to refactor later. + // Get the Lean network specification. let network_spec = lean_network_spec(); let seconds_per_slot = network_spec.seconds_per_slot; From 2a6f9519095602798f1557f5e08239b171c99991 Mon Sep 17 00:00:00 2001 From: Jun Song Date: Wed, 6 Aug 2025 01:30:32 +0900 Subject: [PATCH 03/21] chore: [WIP] remove ream-p2p dependency from ream-chain-lean to resolve dependency cycle issue --- Cargo.lock | 1 - crates/common/chain/lean/Cargo.toml | 1 - crates/common/chain/lean/src/staker.rs | 34 ++++++++------------------ 3 files changed, 10 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dc9336439..9cc8eeb86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5263,7 +5263,6 @@ dependencies = [ "ream-consensus-lean", "ream-consensus-misc", "ream-network-spec", - "ream-p2p", "ream-pqc", "serde", "ssz_types", diff --git a/crates/common/chain/lean/Cargo.toml b/crates/common/chain/lean/Cargo.toml index 9bc6137bd..f071190f0 100644 --- a/crates/common/chain/lean/Cargo.toml +++ b/crates/common/chain/lean/Cargo.toml @@ -22,5 +22,4 @@ tree_hash.workspace = true ream-consensus-lean.workspace = true ream-consensus-misc.workspace = true ream-network-spec.workspace = true -ream-p2p.workspace = true ream-pqc.workspace = true diff --git a/crates/common/chain/lean/src/staker.rs b/crates/common/chain/lean/src/staker.rs index cf023baa4..59ae24e14 100644 --- a/crates/common/chain/lean/src/staker.rs +++ b/crates/common/chain/lean/src/staker.rs @@ -1,7 +1,4 @@ -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; +use std::collections::HashMap; use alloy_primitives::B256; use ream_consensus_lean::{ @@ -12,7 +9,6 @@ use ream_consensus_lean::{ vote::{SignedVote, Vote}, }; use ream_consensus_misc::constants::lean::SLOT_DURATION; -use ream_p2p::network::lean::NetworkService; use ream_pqc::PQSignature; use ssz_types::VariableList; use tracing::info; @@ -21,7 +17,6 @@ use tree_hash::TreeHash; pub struct Staker { pub validator_id: u64, pub chain: HashMap, - pub network: Arc>, pub post_states: HashMap, pub known_votes: Vec, pub new_votes: Vec, @@ -33,19 +28,12 @@ pub struct Staker { } impl Staker { - pub fn new( - validator_id: u64, - network: Arc>, - genesis_block: Block, - genesis_state: LeanState, - ) -> Staker { + pub fn new(validator_id: u64, genesis_block: Block, genesis_state: LeanState) -> Staker { let genesis_hash = genesis_block.tree_hash_root(); Staker { // This node's validator ID validator_id, - // Hook to the p2p network - network, // Votes that we have received and taken into account known_votes: Vec::new(), // Votes that we have received but not yet taken into account @@ -116,11 +104,13 @@ impl Staker { pub fn tick(&mut self) -> anyhow::Result<()> { let current_slot = self.get_current_slot()?; let time_in_slot = { - let network = self - .network - .lock() - .map_err(|err| anyhow::anyhow!("Failed to acquire network lock: {err:?}"))?; - network.time % SLOT_DURATION + // let network = self + // .network + // .lock() + // .map_err(|err| anyhow::anyhow!("Failed to acquire network lock: {err:?}"))?; + // network.time % SLOT_DURATION + + 1 }; // t=0: propose a block @@ -149,11 +139,7 @@ impl Staker { } fn get_current_slot(&self) -> anyhow::Result { - let network = self - .network - .lock() - .map_err(|err| anyhow::anyhow!("Failed to acquire network lock: {err:?}"))?; - Ok(network.time / SLOT_DURATION + 2) + return Err(anyhow::anyhow!("get_current_slot not implemented")); } /// Called when it's the staker's turn to propose a block From cb8449a901a93d49227723040df6cd6a1d6d1dea Mon Sep 17 00:00:00 2001 From: Jun Song Date: Wed, 6 Aug 2025 01:34:17 +0900 Subject: [PATCH 04/21] feat: add LeanChain that holds necessary consensus information --- Cargo.lock | 2 + bin/ream/src/main.rs | 12 ++- crates/common/chain/lean/src/genesis.rs | 28 ++++++ crates/common/chain/lean/src/lean_chain.rs | 95 +++++++++++++++++++ crates/common/chain/lean/src/lib.rs | 2 + crates/common/chain/lean/src/service.rs | 20 +++- .../common/network_spec/src/networks/lean.rs | 1 + crates/common/validator/lean/Cargo.toml | 1 + crates/common/validator/lean/src/service.rs | 19 +++- crates/networking/p2p/Cargo.toml | 1 + crates/networking/p2p/src/network/lean.rs | 14 ++- 11 files changed, 179 insertions(+), 16 deletions(-) create mode 100644 crates/common/chain/lean/src/genesis.rs create mode 100644 crates/common/chain/lean/src/lean_chain.rs diff --git a/Cargo.lock b/Cargo.lock index 9cc8eeb86..8b76edcb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5570,6 +5570,7 @@ dependencies = [ "libp2p-identity", "libp2p-mplex", "parking_lot", + "ream-chain-lean", "ream-consensus-beacon", "ream-consensus-misc", "ream-discv5", @@ -5756,6 +5757,7 @@ dependencies = [ name = "ream-validator-lean" version = "0.1.0" dependencies = [ + "ream-chain-lean", "ream-consensus-misc", "ream-network-spec", "tokio", diff --git a/bin/ream/src/main.rs b/bin/ream/src/main.rs index ae91f16a2..c29a87178 100644 --- a/bin/ream/src/main.rs +++ b/bin/ream/src/main.rs @@ -15,7 +15,7 @@ use ream::cli::{ voluntary_exit::VoluntaryExitConfig, }; use ream_beacon_api_types::id::{ID, ValidatorID}; -use ream_chain_lean::service::LeanChainService; +use ream_chain_lean::{genesis as lean_genesis, lean_chain::LeanChain, service::LeanChainService}; use ream_checkpoint_sync::initialize_db_from_checkpoint; use ream_consensus_misc::{ constants::beacon::set_genesis_validator_root, misc::compute_epoch_at_slot, @@ -38,6 +38,7 @@ use ream_validator_beacon::{ voluntary_exit::process_voluntary_exit, }; use ream_validator_lean::service::ValidatorService as LeanValidatorService; +use tokio::sync::RwLock; use tracing::{error, info}; use tracing_subscriber::EnvFilter; @@ -98,9 +99,12 @@ pub async fn run_lean_node(config: LeanNodeConfig, executor: ReamExecutor) { set_lean_network_spec(config.network.clone()); - let chain_service = LeanChainService::new().await; - let network_service = LeanNetworkService::new().await; - let validator_service = LeanValidatorService::new().await; + let (genesis_block, genesis_state) = lean_genesis::setup_genesis(); + let lean_chain = Arc::new(RwLock::new(LeanChain::new(genesis_block, genesis_state))); + + let chain_service = LeanChainService::new(lean_chain.clone()).await; + let network_service = LeanNetworkService::new(lean_chain.clone()).await; + let validator_service = LeanValidatorService::new(lean_chain.clone()).await; let chain_future = executor.spawn(async move { chain_service.start().await; diff --git a/crates/common/chain/lean/src/genesis.rs b/crates/common/chain/lean/src/genesis.rs new file mode 100644 index 000000000..9a4764cdb --- /dev/null +++ b/crates/common/chain/lean/src/genesis.rs @@ -0,0 +1,28 @@ +use alloy_primitives::B256; +use ream_consensus_lean::{block::Block, state::LeanState}; +use ream_network_spec::networks::lean_network_spec; +use ssz_types::VariableList; +use tree_hash::TreeHash; + +fn genesis_block(state_root: B256) -> Block { + Block { + slot: 1, + parent: B256::ZERO, + votes: VariableList::empty(), + state_root, + } +} + +fn genesis_state(num_validators: u64) -> LeanState { + LeanState::new(num_validators) +} + +/// Setup the genesis block and state for the Lean chain. +/// +/// Reference: https://github.com/ethereum/research/blob/d225a6775a9b184b5c1fd6c830cc58a375d9535f/3sf-mini/test_p2p.py#L119-L131 +pub fn setup_genesis() -> (Block, LeanState) { + let genesis_state = genesis_state(lean_network_spec().num_validators); + let genesis_block = genesis_block(genesis_state.tree_hash_root()); + + (genesis_block, genesis_state) +} diff --git a/crates/common/chain/lean/src/lean_chain.rs b/crates/common/chain/lean/src/lean_chain.rs new file mode 100644 index 000000000..22c56cf05 --- /dev/null +++ b/crates/common/chain/lean/src/lean_chain.rs @@ -0,0 +1,95 @@ +use std::collections::HashMap; + +use alloy_primitives::B256; +use ream_consensus_lean::{ + QueueItem, block::Block, get_fork_choice_head, get_latest_justified_hash, state::LeanState, + vote::Vote, +}; +use tree_hash::TreeHash; + +#[derive(Clone, Debug)] +pub struct LeanChain { + pub chain: HashMap, + pub post_states: HashMap, + pub known_votes: Vec, + pub new_votes: Vec, + pub dependencies: HashMap>, + pub genesis_hash: B256, + pub num_validators: u64, + pub safe_target: B256, + pub head: B256, +} + +impl LeanChain { + pub fn new(genesis_block: Block, genesis_state: LeanState) -> LeanChain { + let genesis_hash = genesis_block.tree_hash_root(); + + LeanChain { + // Votes that we have received and taken into account + known_votes: Vec::new(), + // Votes that we have received but not yet taken into account + new_votes: Vec::new(), + // Objects that we will process once we have processed their parents + dependencies: HashMap::new(), + // Initialize the chain with the genesis block + genesis_hash, + num_validators: genesis_state.config.num_validators, + // Block that it is safe to use to vote as the target + // Diverge from Python implementation: Use genesis hash instead of `None` + safe_target: genesis_hash, + // Head of the chain + head: genesis_hash, + // {block_hash: block} for all blocks that we know about + chain: HashMap::from([(genesis_hash, genesis_block)]), + // {block_hash: post_state} for all blocks that we know about + post_states: HashMap::from([(genesis_hash, genesis_state)]), + } + } + + pub fn latest_justified_hash(&self) -> Option { + get_latest_justified_hash(&self.post_states) + } + + pub fn latest_finalized_hash(&self) -> Option { + self.post_states + .get(&self.head) + .map(|state| state.latest_finalized_hash) + } + + /// Compute the latest block that the staker is allowed to choose as the target + fn compute_safe_target(&self) -> anyhow::Result { + let justified_hash = get_latest_justified_hash(&self.post_states) + .ok_or_else(|| anyhow::anyhow!("No justified hash found in post states"))?; + + get_fork_choice_head( + &self.chain, + &justified_hash, + &self.new_votes, + self.num_validators * 2 / 3, + ) + } + + /// Process new votes that the staker has received. Vote processing is done + /// at a particular time, because of safe target and view merge rule + fn accept_new_votes(&mut self) -> anyhow::Result<()> { + for new_vote in self.new_votes.drain(..) { + if !self.known_votes.contains(&new_vote) { + self.known_votes.push(new_vote); + } + } + + self.recompute_head()?; + Ok(()) + } + + /// Done upon processing new votes or a new block + fn recompute_head(&mut self) -> anyhow::Result<()> { + let justified_hash = get_latest_justified_hash(&self.post_states).ok_or_else(|| { + anyhow::anyhow!("Failed to get latest_justified_hash from post_states") + })?; + self.head = get_fork_choice_head(&self.chain, &justified_hash, &self.known_votes, 0)?; + Ok(()) + } + + // TODO: Add necessary methods for processs_block, vote, and receive. +} diff --git a/crates/common/chain/lean/src/lib.rs b/crates/common/chain/lean/src/lib.rs index 7c5a5b6bf..ad13fdd84 100644 --- a/crates/common/chain/lean/src/lib.rs +++ b/crates/common/chain/lean/src/lib.rs @@ -1,2 +1,4 @@ +pub mod genesis; +pub mod lean_chain; pub mod service; pub mod staker; diff --git a/crates/common/chain/lean/src/service.rs b/crates/common/chain/lean/src/service.rs index fc94fa8f4..a1c4cc094 100644 --- a/crates/common/chain/lean/src/service.rs +++ b/crates/common/chain/lean/src/service.rs @@ -1,15 +1,25 @@ -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; use ream_consensus_misc::constants::lean::INTERVALS_PER_SLOT; use ream_network_spec::networks::lean_network_spec; -use tokio::time::{Instant, MissedTickBehavior, interval_at}; +use tokio::{ + sync::RwLock, + time::{Instant, MissedTickBehavior, interval_at}, +}; use tracing::info; -pub struct LeanChainService {} +use crate::lean_chain::LeanChain; + +pub struct LeanChainService { + lean_chain: Arc>, +} impl LeanChainService { - pub async fn new() -> Self { - LeanChainService {} + pub async fn new(lean_chain: Arc>) -> Self { + LeanChainService { lean_chain } } pub async fn start(self) { diff --git a/crates/common/network_spec/src/networks/lean.rs b/crates/common/network_spec/src/networks/lean.rs index 33d55f353..c8237da34 100644 --- a/crates/common/network_spec/src/networks/lean.rs +++ b/crates/common/network_spec/src/networks/lean.rs @@ -10,6 +10,7 @@ pub static LEAN_NETWORK_SPEC: OnceLock> = OnceLock::new(); pub struct LeanNetworkSpec { pub genesis_time: u64, pub seconds_per_slot: u64, + pub num_validators: u64, } /// MUST be called only once at the start of the application to initialize static diff --git a/crates/common/validator/lean/Cargo.toml b/crates/common/validator/lean/Cargo.toml index 32783dada..938256faa 100644 --- a/crates/common/validator/lean/Cargo.toml +++ b/crates/common/validator/lean/Cargo.toml @@ -14,5 +14,6 @@ tokio.workspace = true tracing = { workspace = true, features = ["log"] } # ream dependencies +ream-chain-lean.workspace = true ream-consensus-misc.workspace = true ream-network-spec.workspace = true diff --git a/crates/common/validator/lean/src/service.rs b/crates/common/validator/lean/src/service.rs index ed80f9795..5df3c9aab 100644 --- a/crates/common/validator/lean/src/service.rs +++ b/crates/common/validator/lean/src/service.rs @@ -1,8 +1,15 @@ -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use ream_chain_lean::lean_chain::LeanChain; use ream_consensus_misc::constants::lean::INTERVALS_PER_SLOT; use ream_network_spec::networks::lean_network_spec; -use tokio::time::{Instant, MissedTickBehavior, interval_at}; +use tokio::{ + sync::RwLock, + time::{Instant, MissedTickBehavior, interval_at}, +}; use tracing::info; /// ValidatorService is responsible for managing validator operations @@ -12,11 +19,13 @@ use tracing::info; /// Every second tick (t=1/4) it votes on the proposed block. /// /// NOTE: Other ticks should be handled by the other services, such as the consensus service. -pub struct ValidatorService {} +pub struct ValidatorService { + lean_chain: Arc>, +} impl ValidatorService { - pub async fn new() -> Self { - ValidatorService {} + pub async fn new(lean_chain: Arc>) -> Self { + ValidatorService { lean_chain } } pub async fn start(self) { diff --git a/crates/networking/p2p/Cargo.toml b/crates/networking/p2p/Cargo.toml index c436c8f4a..5a4a45169 100644 --- a/crates/networking/p2p/Cargo.toml +++ b/crates/networking/p2p/Cargo.toml @@ -38,6 +38,7 @@ tree_hash.workspace = true unsigned-varint = { version = "0.8", features = ["codec"] } # ream dependencies +ream-chain-lean.workspace = true ream-consensus-beacon.workspace = true ream-consensus-misc.workspace = true ream-discv5.workspace = true diff --git a/crates/networking/p2p/src/network/lean.rs b/crates/networking/p2p/src/network/lean.rs index d48dd93c0..e928f0642 100644 --- a/crates/networking/p2p/src/network/lean.rs +++ b/crates/networking/p2p/src/network/lean.rs @@ -1,12 +1,22 @@ +use std::sync::Arc; + +use ream_chain_lean::lean_chain::LeanChain; +use tokio::sync::RwLock; use tracing::info; pub struct NetworkService { pub time: u64, + + lean_chain: Arc>, } impl NetworkService { - pub async fn new() -> Self { - NetworkService { time: 0 } + pub async fn new(lean_chain: Arc>) -> Self { + NetworkService { + time: 0, + + lean_chain, + } } pub async fn start(self) { From 98843f0d47bd97380b755ffe1198bb38d0942dce Mon Sep 17 00:00:00 2001 From: Jun Song Date: Wed, 6 Aug 2025 02:17:35 +0900 Subject: [PATCH 05/21] feat: implement get_current_slot using network_spec --- crates/common/chain/lean/src/lib.rs | 1 + crates/common/chain/lean/src/slot.rs | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+) create mode 100644 crates/common/chain/lean/src/slot.rs diff --git a/crates/common/chain/lean/src/lib.rs b/crates/common/chain/lean/src/lib.rs index ad13fdd84..b54436bdc 100644 --- a/crates/common/chain/lean/src/lib.rs +++ b/crates/common/chain/lean/src/lib.rs @@ -1,4 +1,5 @@ pub mod genesis; pub mod lean_chain; pub mod service; +pub mod slot; pub mod staker; diff --git a/crates/common/chain/lean/src/slot.rs b/crates/common/chain/lean/src/slot.rs new file mode 100644 index 000000000..c07dc056e --- /dev/null +++ b/crates/common/chain/lean/src/slot.rs @@ -0,0 +1,19 @@ +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use ream_network_spec::networks::lean_network_spec; + +/// NOTE: Vitalik's implementation of 3SF-mini adds 2 slots more due to the test setup, unlike our +/// implementation. +/// This is due to the fact that his test code starts at slot 1. +pub fn get_current_slot() -> u64 { + let network_spec = lean_network_spec(); + let seconds_per_slot = network_spec.seconds_per_slot; + let genesis_time = network_spec.genesis_time; + + let genesis_instant = UNIX_EPOCH + Duration::from_secs(genesis_time); + let elapsed = genesis_instant + .duration_since(SystemTime::now()) + .expect("Genesis time is in the past"); + + elapsed.as_secs() / seconds_per_slot +} From ac429b8ca7020f25efb948953628d1c76e606f1b Mon Sep 17 00:00:00 2001 From: Jun Song Date: Wed, 6 Aug 2025 02:18:02 +0900 Subject: [PATCH 06/21] feat: implement actual tick function in LeanChainService and ValidatorService --- Cargo.lock | 1 + crates/common/chain/lean/src/lean_chain.rs | 118 +++++++++++++++++++- crates/common/chain/lean/src/service.rs | 9 +- crates/common/validator/lean/Cargo.toml | 1 + crates/common/validator/lean/src/service.rs | 61 +++++++++- 5 files changed, 177 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8b76edcb6..874a3c27a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5758,6 +5758,7 @@ name = "ream-validator-lean" version = "0.1.0" dependencies = [ "ream-chain-lean", + "ream-consensus-lean", "ream-consensus-misc", "ream-network-spec", "tokio", diff --git a/crates/common/chain/lean/src/lean_chain.rs b/crates/common/chain/lean/src/lean_chain.rs index 22c56cf05..262783601 100644 --- a/crates/common/chain/lean/src/lean_chain.rs +++ b/crates/common/chain/lean/src/lean_chain.rs @@ -2,11 +2,14 @@ use std::collections::HashMap; use alloy_primitives::B256; use ream_consensus_lean::{ - QueueItem, block::Block, get_fork_choice_head, get_latest_justified_hash, state::LeanState, - vote::Vote, + QueueItem, block::Block, get_fork_choice_head, get_latest_justified_hash, is_justifiable_slot, + process_block, state::LeanState, vote::Vote, }; +use ssz_types::VariableList; use tree_hash::TreeHash; +use crate::slot::get_current_slot; + #[derive(Clone, Debug)] pub struct LeanChain { pub chain: HashMap, @@ -57,7 +60,7 @@ impl LeanChain { } /// Compute the latest block that the staker is allowed to choose as the target - fn compute_safe_target(&self) -> anyhow::Result { + pub fn compute_safe_target(&self) -> anyhow::Result { let justified_hash = get_latest_justified_hash(&self.post_states) .ok_or_else(|| anyhow::anyhow!("No justified hash found in post states"))?; @@ -71,7 +74,7 @@ impl LeanChain { /// Process new votes that the staker has received. Vote processing is done /// at a particular time, because of safe target and view merge rule - fn accept_new_votes(&mut self) -> anyhow::Result<()> { + pub fn accept_new_votes(&mut self) -> anyhow::Result<()> { for new_vote in self.new_votes.drain(..) { if !self.known_votes.contains(&new_vote) { self.known_votes.push(new_vote); @@ -91,5 +94,110 @@ impl LeanChain { Ok(()) } - // TODO: Add necessary methods for processs_block, vote, and receive. + pub fn build_block(&mut self) -> anyhow::Result { + let new_slot = get_current_slot(); + + let head_state = self + .post_states + .get(&self.head) + .ok_or_else(|| anyhow::anyhow!("Post state not found for head: {}", self.head))?; + let mut new_block = Block { + slot: new_slot, + parent: self.head, + votes: VariableList::empty(), + // Diverged from Python implementation: Using `B256::ZERO` instead of `None`) + state_root: B256::ZERO, + }; + let mut state: LeanState; + + // Keep attempt to add valid votes from the list of available votes + loop { + state = process_block(head_state, &new_block)?; + + let new_votes_to_add = self + .known_votes + .clone() + .into_iter() + .filter(|vote| vote.source == state.latest_justified_hash) + .filter(|vote| !new_block.votes.contains(vote)) + .collect::>(); + + if new_votes_to_add.is_empty() { + break; + } + + for vote in new_votes_to_add { + new_block + .votes + .push(vote) + .map_err(|err| anyhow::anyhow!("Failed to add vote to new_block: {err:?}"))?; + } + } + + new_block.state_root = state.tree_hash_root(); + + let digest = new_block.tree_hash_root(); + + self.chain.insert(digest, new_block.clone()); + self.post_states.insert(digest, state); + + Ok(new_block) + } + + pub fn build_vote(&self) -> anyhow::Result { + let state = self + .post_states + .get(&self.head) + .ok_or_else(|| anyhow::anyhow!("Post state not found for head: {}", self.head))?; + let mut target_block = self + .chain + .get(&self.head) + .ok_or_else(|| anyhow::anyhow!("Block not found in chain for head: {}", self.head))?; + + // If there is no very recent safe target, then vote for the k'th ancestor + // of the head + for _ in 0..3 { + let safe_target_block = self.chain.get(&self.safe_target).ok_or_else(|| { + anyhow::anyhow!("Block not found for safe target hash: {}", self.safe_target) + })?; + if target_block.slot > safe_target_block.slot { + target_block = self.chain.get(&target_block.parent).ok_or_else(|| { + anyhow::anyhow!( + "Block not found for target block's parent hash: {}", + target_block.parent + ) + })?; + } + } + + // If the latest finalized slot is very far back, then only some slots are + // valid to justify, make sure the target is one of those + while !is_justifiable_slot(&state.latest_finalized_slot, &target_block.slot) { + target_block = self.chain.get(&target_block.parent).ok_or_else(|| { + anyhow::anyhow!( + "Block not found for target block's parent hash: {}", + target_block.parent + ) + })?; + } + + let head_block = self + .chain + .get(&self.head) + .ok_or_else(|| anyhow::anyhow!("Block not found for head: {}", self.head))?; + + Ok(Vote { + // Replace with actual validator ID + validator_id: 0, + slot: get_current_slot(), + head: self.head, + head_slot: head_block.slot, + target: target_block.tree_hash_root(), + target_slot: target_block.slot, + source: state.latest_justified_hash, + source_slot: state.latest_justified_slot, + }) + } + + // TODO: Add necessary methods for receive. } diff --git a/crates/common/chain/lean/src/service.rs b/crates/common/chain/lean/src/service.rs index a1c4cc094..32fbcbe11 100644 --- a/crates/common/chain/lean/src/service.rs +++ b/crates/common/chain/lean/src/service.rs @@ -9,7 +9,7 @@ use tokio::{ sync::RwLock, time::{Instant, MissedTickBehavior, interval_at}, }; -use tracing::info; +use tracing::{debug, info}; use crate::lean_chain::LeanChain; @@ -55,11 +55,14 @@ impl LeanChainService { match tick_count % 4 { 2 => { // Third tick (t=2/4): Compute the safe target. - info!("Compute safe target."); + debug!("Compute safe target."); + let mut lean_chain = self.lean_chain.write().await; + lean_chain.safe_target = lean_chain.compute_safe_target().expect("Failed to compute safe target"); } 3 => { // Fourth tick (t=3/4): Accept new votes. - info!("Accept new votes."); + debug!("Accept new votes."); + self.lean_chain.write().await.accept_new_votes().expect("Failed to accept new votes"); } _ => { // Other ticks (t=0, t=1/4): Do nothing. diff --git a/crates/common/validator/lean/Cargo.toml b/crates/common/validator/lean/Cargo.toml index 938256faa..51fd51a43 100644 --- a/crates/common/validator/lean/Cargo.toml +++ b/crates/common/validator/lean/Cargo.toml @@ -15,5 +15,6 @@ tracing = { workspace = true, features = ["log"] } # ream dependencies ream-chain-lean.workspace = true +ream-consensus-lean.workspace = true ream-consensus-misc.workspace = true ream-network-spec.workspace = true diff --git a/crates/common/validator/lean/src/service.rs b/crates/common/validator/lean/src/service.rs index 5df3c9aab..44e78b0f3 100644 --- a/crates/common/validator/lean/src/service.rs +++ b/crates/common/validator/lean/src/service.rs @@ -3,14 +3,21 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use ream_chain_lean::lean_chain::LeanChain; +use ream_chain_lean::{lean_chain::LeanChain, slot::get_current_slot}; use ream_consensus_misc::constants::lean::INTERVALS_PER_SLOT; use ream_network_spec::networks::lean_network_spec; use tokio::{ sync::RwLock, time::{Instant, MissedTickBehavior, interval_at}, }; -use tracing::info; +use tracing::{debug, info}; + +// TODO: We need to replace this after PQC integration. +// For now, we only need ID for keystore. +// Keystore MUST have +struct LeanKeystore { + id: u64, +} /// ValidatorService is responsible for managing validator operations /// such as proposing blocks and voting on them. @@ -21,11 +28,18 @@ use tracing::info; /// NOTE: Other ticks should be handled by the other services, such as the consensus service. pub struct ValidatorService { lean_chain: Arc>, + + keystores: Vec, } impl ValidatorService { pub async fn new(lean_chain: Arc>) -> Self { - ValidatorService { lean_chain } + ValidatorService { + lean_chain, + + // TODO: We need to load keystores from the config. + keystores: Vec::new(), + } } pub async fn start(self) { @@ -61,11 +75,38 @@ impl ValidatorService { match tick_count % 4 { 0 => { // First tick (t=0): Propose a block. - info!("Propose block if it's my turn."); + if let Some(keystore) = self.is_proposer() { + debug!("Propose block, validator ID: {}", keystore.id); + + let mut lean_chain = self.lean_chain.write().await; + lean_chain.accept_new_votes().expect("Failed to accept new votes"); + let new_block = lean_chain.build_block().expect("Failed to build block"); + + + debug!( + "Built block for validator {} at slot {}", + keystore.id, new_block.slot + ); + + // TODO 1: Sign the block with the keystore. + // TODO 2: Send the block to the network. + debug!("Not a proposer, skipping block proposal."); + } } 1 => { // Second tick (t=1/4): Vote. - info!("Vote."); + debug!("Vote."); + + // Build the vote from LeanChain, and modify its validator ID + let vote_template = self.lean_chain.read().await.build_vote().expect("Failed to build vote"); + let _votes = self.keystores.iter().map(|ks| { + let mut vote = vote_template.clone(); + vote.validator_id = ks.id; + vote + }).collect::>(); + + // TODO 1: Sign the votes with the keystore. + // TODO 2: Send the votes to the network. } _ => { // Other ticks (t=2/4, t=3/4): Do nothing. @@ -76,4 +117,14 @@ impl ValidatorService { } } } + + /// Determine if one of the keystores is the proposer for the current slot. + fn is_proposer(&self) -> Option<&LeanKeystore> { + let current_slot = get_current_slot(); + let proposer_index = current_slot % lean_network_spec().num_validators; + + self.keystores + .iter() + .find(|ks| ks.id == proposer_index as u64) + } } From 32b3fcdfd6976be8711ad59ed44873b9a7c01c84 Mon Sep 17 00:00:00 2001 From: Jun Song Date: Wed, 6 Aug 2025 02:25:52 +0900 Subject: [PATCH 07/21] chore: make clippy happy --- crates/common/chain/lean/src/staker.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/common/chain/lean/src/staker.rs b/crates/common/chain/lean/src/staker.rs index 59ae24e14..0b9cbbd92 100644 --- a/crates/common/chain/lean/src/staker.rs +++ b/crates/common/chain/lean/src/staker.rs @@ -139,7 +139,8 @@ impl Staker { } fn get_current_slot(&self) -> anyhow::Result { - return Err(anyhow::anyhow!("get_current_slot not implemented")); + // Temporary. + Ok(1) } /// Called when it's the staker's turn to propose a block From 8bcff55ecc26af4e12bd85bcba8f42dd1c447a21 Mon Sep 17 00:00:00 2001 From: Jun Song Date: Wed, 6 Aug 2025 20:59:20 +0900 Subject: [PATCH 08/21] chore: add some comments --- bin/ream/src/main.rs | 4 ++++ crates/common/validator/lean/src/service.rs | 8 ++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/bin/ream/src/main.rs b/bin/ream/src/main.rs index c29a87178..8345af472 100644 --- a/bin/ream/src/main.rs +++ b/bin/ream/src/main.rs @@ -99,13 +99,17 @@ pub async fn run_lean_node(config: LeanNodeConfig, executor: ReamExecutor) { set_lean_network_spec(config.network.clone()); + // Initialize the lean chain with genesis block and state. let (genesis_block, genesis_state) = lean_genesis::setup_genesis(); let lean_chain = Arc::new(RwLock::new(LeanChain::new(genesis_block, genesis_state))); + // Initialize the services that will run in the lean node. + // TODO: Add RPC service for lean node. let chain_service = LeanChainService::new(lean_chain.clone()).await; let network_service = LeanNetworkService::new(lean_chain.clone()).await; let validator_service = LeanValidatorService::new(lean_chain.clone()).await; + // Start the services concurrently. let chain_future = executor.spawn(async move { chain_service.start().await; }); diff --git a/crates/common/validator/lean/src/service.rs b/crates/common/validator/lean/src/service.rs index 44e78b0f3..6441e7c0e 100644 --- a/crates/common/validator/lean/src/service.rs +++ b/crates/common/validator/lean/src/service.rs @@ -14,7 +14,6 @@ use tracing::{debug, info}; // TODO: We need to replace this after PQC integration. // For now, we only need ID for keystore. -// Keystore MUST have struct LeanKeystore { id: u64, } @@ -78,10 +77,14 @@ impl ValidatorService { if let Some(keystore) = self.is_proposer() { debug!("Propose block, validator ID: {}", keystore.id); + // Acquire the write lock. `accept_new_votes` and `build_block` will modify the lean chain. let mut lean_chain = self.lean_chain.write().await; + + // Accept new votes and modify the lean chain. lean_chain.accept_new_votes().expect("Failed to accept new votes"); - let new_block = lean_chain.build_block().expect("Failed to build block"); + // Build a block from the lean chain. + let new_block = lean_chain.build_block().expect("Failed to build block"); debug!( "Built block for validator {} at slot {}", @@ -90,6 +93,7 @@ impl ValidatorService { // TODO 1: Sign the block with the keystore. // TODO 2: Send the block to the network. + } else { debug!("Not a proposer, skipping block proposal."); } } From 68882f1fb453eeb9c374168c52e87b86603d0288 Mon Sep 17 00:00:00 2001 From: Jun Song Date: Wed, 6 Aug 2025 21:19:11 +0900 Subject: [PATCH 09/21] chore: rollback debug logs --- crates/common/chain/lean/src/service.rs | 6 +++--- crates/common/validator/lean/src/service.rs | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/common/chain/lean/src/service.rs b/crates/common/chain/lean/src/service.rs index 32fbcbe11..0719b1c4e 100644 --- a/crates/common/chain/lean/src/service.rs +++ b/crates/common/chain/lean/src/service.rs @@ -9,7 +9,7 @@ use tokio::{ sync::RwLock, time::{Instant, MissedTickBehavior, interval_at}, }; -use tracing::{debug, info}; +use tracing::info; use crate::lean_chain::LeanChain; @@ -55,13 +55,13 @@ impl LeanChainService { match tick_count % 4 { 2 => { // Third tick (t=2/4): Compute the safe target. - debug!("Compute safe target."); + info!("Compute safe target."); let mut lean_chain = self.lean_chain.write().await; lean_chain.safe_target = lean_chain.compute_safe_target().expect("Failed to compute safe target"); } 3 => { // Fourth tick (t=3/4): Accept new votes. - debug!("Accept new votes."); + info!("Accept new votes."); self.lean_chain.write().await.accept_new_votes().expect("Failed to accept new votes"); } _ => { diff --git a/crates/common/validator/lean/src/service.rs b/crates/common/validator/lean/src/service.rs index 6441e7c0e..c25fca64c 100644 --- a/crates/common/validator/lean/src/service.rs +++ b/crates/common/validator/lean/src/service.rs @@ -10,7 +10,7 @@ use tokio::{ sync::RwLock, time::{Instant, MissedTickBehavior, interval_at}, }; -use tracing::{debug, info}; +use tracing::info; // TODO: We need to replace this after PQC integration. // For now, we only need ID for keystore. @@ -75,7 +75,7 @@ impl ValidatorService { 0 => { // First tick (t=0): Propose a block. if let Some(keystore) = self.is_proposer() { - debug!("Propose block, validator ID: {}", keystore.id); + info!("Propose block, validator ID: {}", keystore.id); // Acquire the write lock. `accept_new_votes` and `build_block` will modify the lean chain. let mut lean_chain = self.lean_chain.write().await; @@ -86,7 +86,7 @@ impl ValidatorService { // Build a block from the lean chain. let new_block = lean_chain.build_block().expect("Failed to build block"); - debug!( + info!( "Built block for validator {} at slot {}", keystore.id, new_block.slot ); @@ -94,12 +94,12 @@ impl ValidatorService { // TODO 1: Sign the block with the keystore. // TODO 2: Send the block to the network. } else { - debug!("Not a proposer, skipping block proposal."); + info!("Not a proposer, skipping block proposal."); } } 1 => { // Second tick (t=1/4): Vote. - debug!("Vote."); + info!("Vote."); // Build the vote from LeanChain, and modify its validator ID let vote_template = self.lean_chain.read().await.build_vote().expect("Failed to build vote"); From 0eed9444328adece73289422763b00eff1f3dadb Mon Sep 17 00:00:00 2001 From: Jun Song Date: Wed, 6 Aug 2025 21:19:48 +0900 Subject: [PATCH 10/21] fix: calculation of elapsed --- crates/common/chain/lean/src/slot.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/common/chain/lean/src/slot.rs b/crates/common/chain/lean/src/slot.rs index c07dc056e..161bb9f6e 100644 --- a/crates/common/chain/lean/src/slot.rs +++ b/crates/common/chain/lean/src/slot.rs @@ -11,9 +11,9 @@ pub fn get_current_slot() -> u64 { let genesis_time = network_spec.genesis_time; let genesis_instant = UNIX_EPOCH + Duration::from_secs(genesis_time); - let elapsed = genesis_instant - .duration_since(SystemTime::now()) - .expect("Genesis time is in the past"); + let elapsed = SystemTime::now() + .duration_since(genesis_instant) + .expect("Called before genesis time"); elapsed.as_secs() / seconds_per_slot } From 2ff6a0d53277bfb9a347d7cc1512971609c1fb26 Mon Sep 17 00:00:00 2001 From: Jun Song Date: Wed, 6 Aug 2025 21:20:36 +0900 Subject: [PATCH 11/21] feat: set genesis time if it is in the past & add num_validators for the sample_spec.yml --- bin/ream/assets/lean/sample_spec.yml | 1 + bin/ream/src/main.rs | 24 ++++++++++++++++++++++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/bin/ream/assets/lean/sample_spec.yml b/bin/ream/assets/lean/sample_spec.yml index a2928acba..36f64dc91 100644 --- a/bin/ream/assets/lean/sample_spec.yml +++ b/bin/ream/assets/lean/sample_spec.yml @@ -1,2 +1,3 @@ SECONDS_PER_SLOT: 12 GENESIS_TIME: 0 +NUM_VALIDATORS: 4 diff --git a/bin/ream/src/main.rs b/bin/ream/src/main.rs index 8345af472..005ba2937 100644 --- a/bin/ream/src/main.rs +++ b/bin/ream/src/main.rs @@ -1,5 +1,7 @@ use std::{ - env, process, + env, + ops::Deref, + process, sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -97,7 +99,25 @@ fn main() { pub async fn run_lean_node(config: LeanNodeConfig, executor: ReamExecutor) { info!("starting up lean node..."); - set_lean_network_spec(config.network.clone()); + // Hack: It is bothersome to modify the spec every time we run the lean node. + // Set genesis time to a future time if it is in the past. + // FIXME: Add a script to generate the YAML config file. + let network = { + let current_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("System time is before UNIX epoch") + .as_secs(); + + if config.network.genesis_time < current_timestamp { + let mut network = config.network.deref().clone(); + network.genesis_time = current_timestamp + 3; // Set genesis time to 3 seconds in the future. + Arc::new(network) + } else { + config.network.clone() + } + }; + + set_lean_network_spec(network); // Initialize the lean chain with genesis block and state. let (genesis_block, genesis_state) = lean_genesis::setup_genesis(); From 1468e1896baa97559baf0dc96861f917a5b05c6b Mon Sep 17 00:00:00 2001 From: Jun Song Date: Wed, 6 Aug 2025 22:09:36 +0900 Subject: [PATCH 12/21] docs: adding comments for design decision and responsibility --- bin/ream/assets/lean/sample_spec.yml | 2 +- bin/ream/src/main.rs | 13 +++++- crates/common/chain/lean/src/lean_chain.rs | 6 ++- crates/common/chain/lean/src/service.rs | 5 +++ crates/common/validator/lean/src/service.rs | 44 ++++++++++++++------- crates/networking/p2p/src/network/lean.rs | 21 +++++----- 6 files changed, 61 insertions(+), 30 deletions(-) diff --git a/bin/ream/assets/lean/sample_spec.yml b/bin/ream/assets/lean/sample_spec.yml index 36f64dc91..c3ee9ffea 100644 --- a/bin/ream/assets/lean/sample_spec.yml +++ b/bin/ream/assets/lean/sample_spec.yml @@ -1,3 +1,3 @@ SECONDS_PER_SLOT: 12 GENESIS_TIME: 0 -NUM_VALIDATORS: 4 +NUM_VALIDATORS: 1 diff --git a/bin/ream/src/main.rs b/bin/ream/src/main.rs index 005ba2937..a5b801e27 100644 --- a/bin/ream/src/main.rs +++ b/bin/ream/src/main.rs @@ -96,6 +96,14 @@ fn main() { } /// Runs the lean node. +/// +/// A lean node runs several services with different responsibilities. +/// Refer to each service's documentation for more details. +/// +/// A lean node has one shared state, `LeanChain` (wrapped with synchronization primitives), which +/// is used by all services. +/// +/// Besides the shared state, each service holds the channels to communicate with each other. pub async fn run_lean_node(config: LeanNodeConfig, executor: ReamExecutor) { info!("starting up lean node..."); @@ -124,10 +132,11 @@ pub async fn run_lean_node(config: LeanNodeConfig, executor: ReamExecutor) { let lean_chain = Arc::new(RwLock::new(LeanChain::new(genesis_block, genesis_state))); // Initialize the services that will run in the lean node. - // TODO: Add RPC service for lean node. + // TODO 1: Load keystores from the config. + // TODO 2: Add RPC service for lean node. let chain_service = LeanChainService::new(lean_chain.clone()).await; let network_service = LeanNetworkService::new(lean_chain.clone()).await; - let validator_service = LeanValidatorService::new(lean_chain.clone()).await; + let validator_service = LeanValidatorService::new(lean_chain.clone(), Vec::new()).await; // Start the services concurrently. let chain_future = executor.spawn(async move { diff --git a/crates/common/chain/lean/src/lean_chain.rs b/crates/common/chain/lean/src/lean_chain.rs index 262783601..79b8c592b 100644 --- a/crates/common/chain/lean/src/lean_chain.rs +++ b/crates/common/chain/lean/src/lean_chain.rs @@ -10,6 +10,10 @@ use tree_hash::TreeHash; use crate::slot::get_current_slot; +/// [LeanChain] represents the state that the Lean node should maintain. +/// +/// Most of the fields are based on the Python implementation of [`Staker`](https://github.com/ethereum/research/blob/d225a6775a9b184b5c1fd6c830cc58a375d9535f/3sf-mini/p2p.py#L15-L42), +/// but doesn't include `validator_id` as a node should manage multiple validators. #[derive(Clone, Debug)] pub struct LeanChain { pub chain: HashMap, @@ -94,7 +98,7 @@ impl LeanChain { Ok(()) } - pub fn build_block(&mut self) -> anyhow::Result { + pub fn propose_block(&mut self) -> anyhow::Result { let new_slot = get_current_slot(); let head_state = self diff --git a/crates/common/chain/lean/src/service.rs b/crates/common/chain/lean/src/service.rs index 0719b1c4e..350b9c160 100644 --- a/crates/common/chain/lean/src/service.rs +++ b/crates/common/chain/lean/src/service.rs @@ -13,6 +13,11 @@ use tracing::info; use crate::lean_chain::LeanChain; +/// LeanChainService is responsible for updating the [LeanChain] state. `LeanChain` is updated when: +/// 1. Every third (t=2/4) and fourth (t=3/4) ticks. +/// 2. Receiving new blocks or votes from the network. +/// +/// NOTE: This service will be the core service to implement `receive()` function. pub struct LeanChainService { lean_chain: Arc>, } diff --git a/crates/common/validator/lean/src/service.rs b/crates/common/validator/lean/src/service.rs index c25fca64c..7c6bdc6ee 100644 --- a/crates/common/validator/lean/src/service.rs +++ b/crates/common/validator/lean/src/service.rs @@ -14,17 +14,18 @@ use tracing::info; // TODO: We need to replace this after PQC integration. // For now, we only need ID for keystore. -struct LeanKeystore { +pub struct LeanKeystore { id: u64, } /// ValidatorService is responsible for managing validator operations -/// such as proposing blocks and voting on them. +/// such as proposing blocks and voting on them. This service also holds the keystores +/// for its validators, which are used to sign. /// /// Every first tick (t=0) it proposes a block if it's the validator's turn. /// Every second tick (t=1/4) it votes on the proposed block. /// -/// NOTE: Other ticks should be handled by the other services, such as the consensus service. +/// NOTE: Other ticks should be handled by the other services, such as [LeanChainService]. pub struct ValidatorService { lean_chain: Arc>, @@ -32,12 +33,17 @@ pub struct ValidatorService { } impl ValidatorService { - pub async fn new(lean_chain: Arc>) -> Self { + pub async fn new(lean_chain: Arc>, keystores: Vec) -> Self { + // Hack: If no keystores are provided, create a default one. + let keystores = if keystores.is_empty() { + vec![LeanKeystore { id: 0 }] // Placeholder for keystores + } else { + keystores + }; + ValidatorService { lean_chain, - - // TODO: We need to load keystores from the config. - keystores: Vec::new(), + keystores, } } @@ -75,7 +81,7 @@ impl ValidatorService { 0 => { // First tick (t=0): Propose a block. if let Some(keystore) = self.is_proposer() { - info!("Propose block, validator ID: {}", keystore.id); + info!("Propose block by validator {}", keystore.id); // Acquire the write lock. `accept_new_votes` and `build_block` will modify the lean chain. let mut lean_chain = self.lean_chain.write().await; @@ -83,12 +89,16 @@ impl ValidatorService { // Accept new votes and modify the lean chain. lean_chain.accept_new_votes().expect("Failed to accept new votes"); - // Build a block from the lean chain. - let new_block = lean_chain.build_block().expect("Failed to build block"); + // Build a block and propose the block. + let new_block = lean_chain.propose_block().expect("Failed to build block"); info!( - "Built block for validator {} at slot {}", - keystore.id, new_block.slot + "Built block for validator {}. Block Info(slot: {}, parent: {}, len(votes): {}, state_root: {})", + keystore.id, + new_block.slot, + new_block.parent, + new_block.votes.len(), + new_block.state_root ); // TODO 1: Sign the block with the keystore. @@ -103,14 +113,18 @@ impl ValidatorService { // Build the vote from LeanChain, and modify its validator ID let vote_template = self.lean_chain.read().await.build_vote().expect("Failed to build vote"); + + info!("Built votes for validators: {vote_template:?}"); + let _votes = self.keystores.iter().map(|ks| { let mut vote = vote_template.clone(); vote.validator_id = ks.id; vote }).collect::>(); - // TODO 1: Sign the votes with the keystore. - // TODO 2: Send the votes to the network. + // TODO 1: Send these votes to `LeanChainService`. + // TODO 2: Sign the votes with the keystore. + // TODO 3: Send the votes to the network. } _ => { // Other ticks (t=2/4, t=3/4): Do nothing. @@ -125,7 +139,7 @@ impl ValidatorService { /// Determine if one of the keystores is the proposer for the current slot. fn is_proposer(&self) -> Option<&LeanKeystore> { let current_slot = get_current_slot(); - let proposer_index = current_slot % lean_network_spec().num_validators; + let proposer_index: u64 = current_slot % lean_network_spec().num_validators; self.keystores .iter() diff --git a/crates/networking/p2p/src/network/lean.rs b/crates/networking/p2p/src/network/lean.rs index e928f0642..8b2b2aa3f 100644 --- a/crates/networking/p2p/src/network/lean.rs +++ b/crates/networking/p2p/src/network/lean.rs @@ -4,26 +4,25 @@ use ream_chain_lean::lean_chain::LeanChain; use tokio::sync::RwLock; use tracing::info; +/// NetworkService is responsible for the following: +/// 1. Peer discovery and management. +/// 2. Gossiping blocks and votes. +/// +/// TBD: It will be best if we reuse the existing NetworkManagerService for the beacon node. pub struct NetworkService { - pub time: u64, - lean_chain: Arc>, } impl NetworkService { pub async fn new(lean_chain: Arc>) -> Self { - NetworkService { - time: 0, - - lean_chain, - } + NetworkService { lean_chain } } pub async fn start(self) { info!("NetworkService started"); - - loop { - std::thread::sleep(std::time::Duration::from_secs(10)); - } + info!( + "Current LeanChain head: {}", + self.lean_chain.read().await.head + ); } } From 8033f4d12da60090c23744b51fabe160ad516737 Mon Sep 17 00:00:00 2001 From: Jun Song Date: Wed, 6 Aug 2025 23:17:54 +0900 Subject: [PATCH 13/21] docs: update responsibility for network service (static peers) --- crates/networking/p2p/src/network/lean.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/networking/p2p/src/network/lean.rs b/crates/networking/p2p/src/network/lean.rs index 8b2b2aa3f..15fd4cfb9 100644 --- a/crates/networking/p2p/src/network/lean.rs +++ b/crates/networking/p2p/src/network/lean.rs @@ -5,7 +5,7 @@ use tokio::sync::RwLock; use tracing::info; /// NetworkService is responsible for the following: -/// 1. Peer discovery and management. +/// 1. Peer management. (We will connect with static peers for PQ devnet.) /// 2. Gossiping blocks and votes. /// /// TBD: It will be best if we reuse the existing NetworkManagerService for the beacon node. From 6ea128388e91f21a62d2da9cd9e259da22ac06ce Mon Sep 17 00:00:00 2001 From: Jun Song Date: Thu, 7 Aug 2025 10:03:01 +0900 Subject: [PATCH 14/21] fix: resolve discrepancy between 3sf-mini impl --- crates/common/chain/lean/src/genesis.rs | 11 ++++++++++- crates/common/chain/lean/src/slot.rs | 5 ++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/crates/common/chain/lean/src/genesis.rs b/crates/common/chain/lean/src/genesis.rs index 9a4764cdb..2c1e0e435 100644 --- a/crates/common/chain/lean/src/genesis.rs +++ b/crates/common/chain/lean/src/genesis.rs @@ -21,7 +21,16 @@ fn genesis_state(num_validators: u64) -> LeanState { /// /// Reference: https://github.com/ethereum/research/blob/d225a6775a9b184b5c1fd6c830cc58a375d9535f/3sf-mini/test_p2p.py#L119-L131 pub fn setup_genesis() -> (Block, LeanState) { - let genesis_state = genesis_state(lean_network_spec().num_validators); + let mut genesis_state = genesis_state(lean_network_spec().num_validators); + genesis_state + .historical_block_hashes + .push(B256::ZERO) + .expect("Failed to add genesis block hash"); + genesis_state + .justified_slots + .push(true) + .expect("Failed to add genesis justified slot"); + let genesis_block = genesis_block(genesis_state.tree_hash_root()); (genesis_block, genesis_state) diff --git a/crates/common/chain/lean/src/slot.rs b/crates/common/chain/lean/src/slot.rs index 161bb9f6e..4bc45abc5 100644 --- a/crates/common/chain/lean/src/slot.rs +++ b/crates/common/chain/lean/src/slot.rs @@ -2,8 +2,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use ream_network_spec::networks::lean_network_spec; -/// NOTE: Vitalik's implementation of 3SF-mini adds 2 slots more due to the test setup, unlike our -/// implementation. +/// NOTE: Vitalik's implementation of 3SF-mini adds 2 slots more due to the test setup. /// This is due to the fact that his test code starts at slot 1. pub fn get_current_slot() -> u64 { let network_spec = lean_network_spec(); @@ -15,5 +14,5 @@ pub fn get_current_slot() -> u64 { .duration_since(genesis_instant) .expect("Called before genesis time"); - elapsed.as_secs() / seconds_per_slot + (elapsed.as_secs() / seconds_per_slot) + 2 } From 32330ca106b19c909c5b01391b1a94e7d7fca98e Mon Sep 17 00:00:00 2001 From: Jun Song Date: Thu, 7 Aug 2025 10:04:00 +0900 Subject: [PATCH 15/21] fix: small nit on genesis handling in get_fork_choice_head --- crates/common/consensus/lean/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/common/consensus/lean/src/lib.rs b/crates/common/consensus/lean/src/lib.rs index e4602feba..a14a3d7a1 100644 --- a/crates/common/consensus/lean/src/lib.rs +++ b/crates/common/consensus/lean/src/lib.rs @@ -126,7 +126,7 @@ pub fn get_fork_choice_head( let mut root = *provided_root; // Start at genesis by default - if *root == B256::ZERO { + if root == B256::ZERO { root = blocks .iter() .min_by_key(|(_, block)| block.slot) From 15f9bfc5c366908e54f6b74da58c76580531a722 Mon Sep 17 00:00:00 2001 From: Jun Song Date: Thu, 7 Aug 2025 10:05:11 +0900 Subject: [PATCH 16/21] feat: send vote via channel to LeanChainService --- bin/ream/assets/lean/sample_spec.yml | 2 +- bin/ream/src/main.rs | 15 +++-- crates/common/chain/lean/src/lean_chain.rs | 5 +- crates/common/chain/lean/src/service.rs | 71 +++++++++++++++++++-- crates/common/consensus/lean/src/lib.rs | 8 ++- crates/common/validator/lean/src/service.rs | 38 ++++++++--- 6 files changed, 116 insertions(+), 23 deletions(-) diff --git a/bin/ream/assets/lean/sample_spec.yml b/bin/ream/assets/lean/sample_spec.yml index c3ee9ffea..36f64dc91 100644 --- a/bin/ream/assets/lean/sample_spec.yml +++ b/bin/ream/assets/lean/sample_spec.yml @@ -1,3 +1,3 @@ SECONDS_PER_SLOT: 12 GENESIS_TIME: 0 -NUM_VALIDATORS: 1 +NUM_VALIDATORS: 4 diff --git a/bin/ream/src/main.rs b/bin/ream/src/main.rs index a5b801e27..95133c494 100644 --- a/bin/ream/src/main.rs +++ b/bin/ream/src/main.rs @@ -17,7 +17,11 @@ use ream::cli::{ voluntary_exit::VoluntaryExitConfig, }; use ream_beacon_api_types::id::{ID, ValidatorID}; -use ream_chain_lean::{genesis as lean_genesis, lean_chain::LeanChain, service::LeanChainService}; +use ream_chain_lean::{ + genesis as lean_genesis, + lean_chain::LeanChain, + service::{LeanChainService, LeanChainServiceMessage}, +}; use ream_checkpoint_sync::initialize_db_from_checkpoint; use ream_consensus_misc::{ constants::beacon::set_genesis_validator_root, misc::compute_epoch_at_slot, @@ -40,7 +44,7 @@ use ream_validator_beacon::{ voluntary_exit::process_voluntary_exit, }; use ream_validator_lean::service::ValidatorService as LeanValidatorService; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, mpsc}; use tracing::{error, info}; use tracing_subscriber::EnvFilter; @@ -132,11 +136,14 @@ pub async fn run_lean_node(config: LeanNodeConfig, executor: ReamExecutor) { let lean_chain = Arc::new(RwLock::new(LeanChain::new(genesis_block, genesis_state))); // Initialize the services that will run in the lean node. + let (chain_sender, chain_receiver) = mpsc::unbounded_channel::(); + // TODO 1: Load keystores from the config. // TODO 2: Add RPC service for lean node. - let chain_service = LeanChainService::new(lean_chain.clone()).await; + let chain_service = LeanChainService::new(lean_chain.clone(), chain_receiver).await; let network_service = LeanNetworkService::new(lean_chain.clone()).await; - let validator_service = LeanValidatorService::new(lean_chain.clone(), Vec::new()).await; + let validator_service = + LeanValidatorService::new(lean_chain.clone(), Vec::new(), chain_sender).await; // Start the services concurrently. let chain_future = executor.spawn(async move { diff --git a/crates/common/chain/lean/src/lean_chain.rs b/crates/common/chain/lean/src/lean_chain.rs index 79b8c592b..a1647f80d 100644 --- a/crates/common/chain/lean/src/lean_chain.rs +++ b/crates/common/chain/lean/src/lean_chain.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use alloy_primitives::B256; use ream_consensus_lean::{ - QueueItem, block::Block, get_fork_choice_head, get_latest_justified_hash, is_justifiable_slot, + block::Block, get_fork_choice_head, get_latest_justified_hash, is_justifiable_slot, process_block, state::LeanState, vote::Vote, }; use ssz_types::VariableList; @@ -20,7 +20,6 @@ pub struct LeanChain { pub post_states: HashMap, pub known_votes: Vec, pub new_votes: Vec, - pub dependencies: HashMap>, pub genesis_hash: B256, pub num_validators: u64, pub safe_target: B256, @@ -36,8 +35,6 @@ impl LeanChain { known_votes: Vec::new(), // Votes that we have received but not yet taken into account new_votes: Vec::new(), - // Objects that we will process once we have processed their parents - dependencies: HashMap::new(), // Initialize the chain with the genesis block genesis_hash, num_validators: genesis_state.config.num_validators, diff --git a/crates/common/chain/lean/src/service.rs b/crates/common/chain/lean/src/service.rs index 350b9c160..717a128ab 100644 --- a/crates/common/chain/lean/src/service.rs +++ b/crates/common/chain/lean/src/service.rs @@ -1,18 +1,26 @@ use std::{ + collections::HashMap, sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; +use alloy_primitives::B256; +use ream_consensus_lean::{QueueItem, VoteItem}; use ream_consensus_misc::constants::lean::INTERVALS_PER_SLOT; use ream_network_spec::networks::lean_network_spec; use tokio::{ - sync::RwLock, + sync::{RwLock, mpsc}, time::{Instant, MissedTickBehavior, interval_at}, }; use tracing::info; use crate::lean_chain::LeanChain; +#[derive(Debug, Clone)] +pub struct LeanChainServiceMessage { + pub item: QueueItem, +} + /// LeanChainService is responsible for updating the [LeanChain] state. `LeanChain` is updated when: /// 1. Every third (t=2/4) and fourth (t=3/4) ticks. /// 2. Receiving new blocks or votes from the network. @@ -20,14 +28,25 @@ use crate::lean_chain::LeanChain; /// NOTE: This service will be the core service to implement `receive()` function. pub struct LeanChainService { lean_chain: Arc>, + receiver: mpsc::UnboundedReceiver, + + // Objects that we will process once we have processed their parents + dependencies: HashMap>, } impl LeanChainService { - pub async fn new(lean_chain: Arc>) -> Self { - LeanChainService { lean_chain } + pub async fn new( + lean_chain: Arc>, + receiver: mpsc::UnboundedReceiver, + ) -> Self { + LeanChainService { + lean_chain, + receiver, + dependencies: HashMap::new(), + } } - pub async fn start(self) { + pub async fn start(mut self) { info!("Lean Chain Service started"); // TODO: Duplicate clock logic from ValidatorService. May need to refactor later. @@ -75,7 +94,51 @@ impl LeanChainService { } tick_count += 1; } + Some(message) = self.receiver.recv() => { + self.handle_message(message).await; + } + } + } + } + + async fn handle_message(&mut self, message: LeanChainServiceMessage) { + match message.item { + QueueItem::BlockItem(block) => { + info!("Received block: {:?}", block); } + QueueItem::VoteItem(vote_item) => { + info!("Received vote_item: {:?}", vote_item); + self.handle_vote(vote_item).await; + } + } + } + + async fn handle_vote(&mut self, vote_item: VoteItem) { + let vote = match vote_item { + VoteItem::Signed(vote) => { + // TODO: Validate the signature. + vote.data + } + VoteItem::Unsigned(vote) => vote, + }; + + let lean_chain = self.lean_chain.read().await; + let is_known_vote = lean_chain.known_votes.contains(&vote); + let is_new_vote = lean_chain.new_votes.contains(&vote); + + if is_known_vote || is_new_vote { + // Do nothing + } else if lean_chain.chain.contains_key(&vote.head) { + drop(lean_chain); + + // We should acquire another write lock + let mut lean_chain = self.lean_chain.write().await; + lean_chain.new_votes.push(vote); + } else { + self.dependencies + .entry(vote.head) + .or_default() + .push(QueueItem::VoteItem(VoteItem::Unsigned(vote))); } } } diff --git a/crates/common/consensus/lean/src/lib.rs b/crates/common/consensus/lean/src/lib.rs index a14a3d7a1..9762d0fe0 100644 --- a/crates/common/consensus/lean/src/lib.rs +++ b/crates/common/consensus/lean/src/lib.rs @@ -15,10 +15,16 @@ use crate::{ vote::{SignedVote, Vote}, }; +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub enum VoteItem { + Signed(SignedVote), + Unsigned(Vote), +} + #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub enum QueueItem { BlockItem(Block), - VoteItem(SignedVote), + VoteItem(VoteItem), } /// We allow justification of slots either <= 5 or a perfect square or oblong after diff --git a/crates/common/validator/lean/src/service.rs b/crates/common/validator/lean/src/service.rs index 7c6bdc6ee..2504a3350 100644 --- a/crates/common/validator/lean/src/service.rs +++ b/crates/common/validator/lean/src/service.rs @@ -3,11 +3,14 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use ream_chain_lean::{lean_chain::LeanChain, slot::get_current_slot}; +use ream_chain_lean::{ + lean_chain::LeanChain, service::LeanChainServiceMessage, slot::get_current_slot, +}; +use ream_consensus_lean::{QueueItem, VoteItem}; use ream_consensus_misc::constants::lean::INTERVALS_PER_SLOT; use ream_network_spec::networks::lean_network_spec; use tokio::{ - sync::RwLock, + sync::{RwLock, mpsc}, time::{Instant, MissedTickBehavior, interval_at}, }; use tracing::info; @@ -28,15 +31,24 @@ pub struct LeanKeystore { /// NOTE: Other ticks should be handled by the other services, such as [LeanChainService]. pub struct ValidatorService { lean_chain: Arc>, - keystores: Vec, + chain_sender: mpsc::UnboundedSender, } impl ValidatorService { - pub async fn new(lean_chain: Arc>, keystores: Vec) -> Self { + pub async fn new( + lean_chain: Arc>, + keystores: Vec, + chain_sender: mpsc::UnboundedSender, + ) -> Self { // Hack: If no keystores are provided, create a default one. let keystores = if keystores.is_empty() { - vec![LeanKeystore { id: 0 }] // Placeholder for keystores + vec![ + LeanKeystore { id: 0 }, + LeanKeystore { id: 1 }, + LeanKeystore { id: 2 }, + LeanKeystore { id: 3 }, + ] // Placeholder for keystores } else { keystores }; @@ -44,6 +56,7 @@ impl ValidatorService { ValidatorService { lean_chain, keystores, + chain_sender, } } @@ -116,15 +129,22 @@ impl ValidatorService { info!("Built votes for validators: {vote_template:?}"); - let _votes = self.keystores.iter().map(|ks| { + let votes = self.keystores.iter().map(|ks| { let mut vote = vote_template.clone(); vote.validator_id = ks.id; vote }).collect::>(); - // TODO 1: Send these votes to `LeanChainService`. - // TODO 2: Sign the votes with the keystore. - // TODO 3: Send the votes to the network. + for vote in votes { + self.chain_sender + .send(LeanChainServiceMessage { + item: QueueItem::VoteItem(VoteItem::Unsigned(vote)), + }) + .expect("Failed to send vote to LeanChainService"); + } + + // TODO 1: Sign the votes with the keystore. + // TODO 2: Send the votes to the network. } _ => { // Other ticks (t=2/4, t=3/4): Do nothing. From 9f12a83f9abd2c9d934df225378bdd2e4b5be7c3 Mon Sep 17 00:00:00 2001 From: Jun Song Date: Thu, 7 Aug 2025 10:19:46 +0900 Subject: [PATCH 17/21] feat: reimpl block receive func --- crates/common/chain/lean/src/lean_chain.rs | 2 +- crates/common/chain/lean/src/service.rs | 57 +++++++++++++++++++++- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/crates/common/chain/lean/src/lean_chain.rs b/crates/common/chain/lean/src/lean_chain.rs index a1647f80d..3e5cf306c 100644 --- a/crates/common/chain/lean/src/lean_chain.rs +++ b/crates/common/chain/lean/src/lean_chain.rs @@ -87,7 +87,7 @@ impl LeanChain { } /// Done upon processing new votes or a new block - fn recompute_head(&mut self) -> anyhow::Result<()> { + pub fn recompute_head(&mut self) -> anyhow::Result<()> { let justified_hash = get_latest_justified_hash(&self.post_states).ok_or_else(|| { anyhow::anyhow!("Failed to get latest_justified_hash from post_states") })?; diff --git a/crates/common/chain/lean/src/service.rs b/crates/common/chain/lean/src/service.rs index 717a128ab..ad1b97e1a 100644 --- a/crates/common/chain/lean/src/service.rs +++ b/crates/common/chain/lean/src/service.rs @@ -5,7 +5,7 @@ use std::{ }; use alloy_primitives::B256; -use ream_consensus_lean::{QueueItem, VoteItem}; +use ream_consensus_lean::{QueueItem, VoteItem, block::Block, process_block}; use ream_consensus_misc::constants::lean::INTERVALS_PER_SLOT; use ream_network_spec::networks::lean_network_spec; use tokio::{ @@ -13,6 +13,7 @@ use tokio::{ time::{Instant, MissedTickBehavior, interval_at}, }; use tracing::info; +use tree_hash::TreeHash; use crate::lean_chain::LeanChain; @@ -102,9 +103,14 @@ impl LeanChainService { } async fn handle_message(&mut self, message: LeanChainServiceMessage) { - match message.item { + self.handle_item(message.item).await; + } + + async fn handle_item(&mut self, item: QueueItem) { + match item { QueueItem::BlockItem(block) => { info!("Received block: {:?}", block); + let _ = self.handle_block(block).await; } QueueItem::VoteItem(vote_item) => { info!("Received vote_item: {:?}", vote_item); @@ -113,6 +119,53 @@ impl LeanChainService { } } + async fn handle_block(&mut self, block: Block) -> anyhow::Result<()> { + let block_hash = block.tree_hash_root(); + + let mut lean_chain = self.lean_chain.write().await; + + // If the block is already known, ignore it + if lean_chain.chain.contains_key(&block_hash) { + return Ok(()); + } + + match lean_chain.post_states.get(&block.parent) { + Some(parent_state) => { + let state = process_block(parent_state, &block)?; + + for vote in &block.votes { + if !lean_chain.known_votes.contains(vote) { + lean_chain.known_votes.push(vote.clone()); + } + } + + lean_chain.chain.insert(block_hash, block); + lean_chain.post_states.insert(block_hash, state); + + lean_chain.recompute_head()?; + + drop(lean_chain); + + // Once we have received a block, also process all of its dependencies + if let Some(queue_items) = self.dependencies.remove(&block_hash) { + for item in queue_items { + Box::pin(self.handle_item(item)).await; + } + } + } + None => { + // If we have not yet seen the block's parent, ignore for now, + // process later once we actually see the parent + self.dependencies + .entry(block.parent) + .or_default() + .push(QueueItem::BlockItem(block)); + } + } + + Ok(()) + } + async fn handle_vote(&mut self, vote_item: VoteItem) { let vote = match vote_item { VoteItem::Signed(vote) => { From 92a0fd2c8215d3977b9c7729962a0aacc264478f Mon Sep 17 00:00:00 2001 From: Jun Song Date: Thu, 7 Aug 2025 10:21:41 +0900 Subject: [PATCH 18/21] chore: import anyhow::anyhow --- crates/common/chain/lean/src/lean_chain.rs | 24 +++++++++++----------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/crates/common/chain/lean/src/lean_chain.rs b/crates/common/chain/lean/src/lean_chain.rs index 3e5cf306c..d28d25534 100644 --- a/crates/common/chain/lean/src/lean_chain.rs +++ b/crates/common/chain/lean/src/lean_chain.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use alloy_primitives::B256; +use anyhow::anyhow; use ream_consensus_lean::{ block::Block, get_fork_choice_head, get_latest_justified_hash, is_justifiable_slot, process_block, state::LeanState, vote::Vote, @@ -63,7 +64,7 @@ impl LeanChain { /// Compute the latest block that the staker is allowed to choose as the target pub fn compute_safe_target(&self) -> anyhow::Result { let justified_hash = get_latest_justified_hash(&self.post_states) - .ok_or_else(|| anyhow::anyhow!("No justified hash found in post states"))?; + .ok_or_else(|| anyhow!("No justified hash found in post states"))?; get_fork_choice_head( &self.chain, @@ -88,9 +89,8 @@ impl LeanChain { /// Done upon processing new votes or a new block pub fn recompute_head(&mut self) -> anyhow::Result<()> { - let justified_hash = get_latest_justified_hash(&self.post_states).ok_or_else(|| { - anyhow::anyhow!("Failed to get latest_justified_hash from post_states") - })?; + let justified_hash = get_latest_justified_hash(&self.post_states) + .ok_or_else(|| anyhow!("Failed to get latest_justified_hash from post_states"))?; self.head = get_fork_choice_head(&self.chain, &justified_hash, &self.known_votes, 0)?; Ok(()) } @@ -101,7 +101,7 @@ impl LeanChain { let head_state = self .post_states .get(&self.head) - .ok_or_else(|| anyhow::anyhow!("Post state not found for head: {}", self.head))?; + .ok_or_else(|| anyhow!("Post state not found for head: {}", self.head))?; let mut new_block = Block { slot: new_slot, parent: self.head, @@ -131,7 +131,7 @@ impl LeanChain { new_block .votes .push(vote) - .map_err(|err| anyhow::anyhow!("Failed to add vote to new_block: {err:?}"))?; + .map_err(|err| anyhow!("Failed to add vote to new_block: {err:?}"))?; } } @@ -149,21 +149,21 @@ impl LeanChain { let state = self .post_states .get(&self.head) - .ok_or_else(|| anyhow::anyhow!("Post state not found for head: {}", self.head))?; + .ok_or_else(|| anyhow!("Post state not found for head: {}", self.head))?; let mut target_block = self .chain .get(&self.head) - .ok_or_else(|| anyhow::anyhow!("Block not found in chain for head: {}", self.head))?; + .ok_or_else(|| anyhow!("Block not found in chain for head: {}", self.head))?; // If there is no very recent safe target, then vote for the k'th ancestor // of the head for _ in 0..3 { let safe_target_block = self.chain.get(&self.safe_target).ok_or_else(|| { - anyhow::anyhow!("Block not found for safe target hash: {}", self.safe_target) + anyhow!("Block not found for safe target hash: {}", self.safe_target) })?; if target_block.slot > safe_target_block.slot { target_block = self.chain.get(&target_block.parent).ok_or_else(|| { - anyhow::anyhow!( + anyhow!( "Block not found for target block's parent hash: {}", target_block.parent ) @@ -175,7 +175,7 @@ impl LeanChain { // valid to justify, make sure the target is one of those while !is_justifiable_slot(&state.latest_finalized_slot, &target_block.slot) { target_block = self.chain.get(&target_block.parent).ok_or_else(|| { - anyhow::anyhow!( + anyhow!( "Block not found for target block's parent hash: {}", target_block.parent ) @@ -185,7 +185,7 @@ impl LeanChain { let head_block = self .chain .get(&self.head) - .ok_or_else(|| anyhow::anyhow!("Block not found for head: {}", self.head))?; + .ok_or_else(|| anyhow!("Block not found for head: {}", self.head))?; Ok(Vote { // Replace with actual validator ID From 96af999e76f21f0de74f9d3fdcb8da65582a6093 Mon Sep 17 00:00:00 2001 From: Jun Song Date: Thu, 7 Aug 2025 10:32:35 +0900 Subject: [PATCH 19/21] chore: delete staker.rs --- crates/common/chain/lean/src/lib.rs | 1 - crates/common/chain/lean/src/staker.rs | 348 ------------------------- 2 files changed, 349 deletions(-) delete mode 100644 crates/common/chain/lean/src/staker.rs diff --git a/crates/common/chain/lean/src/lib.rs b/crates/common/chain/lean/src/lib.rs index b54436bdc..c8210f8fc 100644 --- a/crates/common/chain/lean/src/lib.rs +++ b/crates/common/chain/lean/src/lib.rs @@ -2,4 +2,3 @@ pub mod genesis; pub mod lean_chain; pub mod service; pub mod slot; -pub mod staker; diff --git a/crates/common/chain/lean/src/staker.rs b/crates/common/chain/lean/src/staker.rs deleted file mode 100644 index 23457eaf9..000000000 --- a/crates/common/chain/lean/src/staker.rs +++ /dev/null @@ -1,348 +0,0 @@ -use std::collections::HashMap; - -use alloy_primitives::B256; -use anyhow::anyhow; -use ream_consensus_lean::{ - QueueItem, - block::Block, - get_fork_choice_head, get_latest_justified_hash, is_justifiable_slot, process_block, - state::LeanState, - vote::{SignedVote, Vote}, -}; -use ream_consensus_misc::constants::lean::SLOT_DURATION; -use ream_pqc::PQSignature; -use ssz_types::VariableList; -use tracing::info; -use tree_hash::TreeHash; - -pub struct Staker { - pub validator_id: u64, - pub chain: HashMap, - pub post_states: HashMap, - pub known_votes: Vec, - pub new_votes: Vec, - pub dependencies: HashMap>, - pub genesis_hash: B256, - pub num_validators: u64, - pub safe_target: B256, - pub head: B256, -} - -impl Staker { - pub fn new(validator_id: u64, genesis_block: Block, genesis_state: LeanState) -> Staker { - let genesis_hash = genesis_block.tree_hash_root(); - - Staker { - // This node's validator ID - validator_id, - // Votes that we have received and taken into account - known_votes: Vec::new(), - // Votes that we have received but not yet taken into account - new_votes: Vec::new(), - // Objects that we will process once we have processed their parents - dependencies: HashMap::new(), - // Initialize the chain with the genesis block - genesis_hash, - num_validators: genesis_state.config.num_validators, - // Block that it is safe to use to vote as the target - // Diverge from Python implementation: Use genesis hash instead of `None` - safe_target: genesis_hash, - // Head of the chain - head: genesis_hash, - // {block_hash: block} for all blocks that we know about - chain: HashMap::from([(genesis_hash, genesis_block)]), - // {block_hash: post_state} for all blocks that we know about - post_states: HashMap::from([(genesis_hash, genesis_state)]), - } - } - - pub fn latest_justified_hash(&self) -> Option { - get_latest_justified_hash(&self.post_states) - } - - pub fn latest_finalized_hash(&self) -> Option { - self.post_states - .get(&self.head) - .map(|state| state.latest_finalized_hash) - } - - /// Compute the latest block that the staker is allowed to choose as the target - fn compute_safe_target(&self) -> anyhow::Result { - let justified_hash = get_latest_justified_hash(&self.post_states) - .ok_or_else(|| anyhow!("No justified hash found in post states"))?; - - get_fork_choice_head( - &self.chain, - &justified_hash, - &self.new_votes, - self.num_validators * 2 / 3, - ) - } - - /// Process new votes that the staker has received. Vote processing is done - /// at a particular time, because of safe target and view merge rule - fn accept_new_votes(&mut self) -> anyhow::Result<()> { - for new_vote in self.new_votes.drain(..) { - if !self.known_votes.contains(&new_vote) { - self.known_votes.push(new_vote); - } - } - - self.recompute_head()?; - Ok(()) - } - - /// Done upon processing new votes or a new block - fn recompute_head(&mut self) -> anyhow::Result<()> { - let justified_hash = get_latest_justified_hash(&self.post_states) - .ok_or_else(|| anyhow!("Failed to get latest_justified_hash from post_states"))?; - self.head = get_fork_choice_head(&self.chain, &justified_hash, &self.known_votes, 0)?; - Ok(()) - } - - /// Called every second - pub fn tick(&mut self) -> anyhow::Result<()> { - let current_slot = self.get_current_slot()?; - let time_in_slot = { - let network = self - .network - .lock() - .map_err(|err| anyhow!("Failed to acquire network lock: {err:?}"))?; - network.time % SLOT_DURATION - }; - - // t=0: propose a block - if time_in_slot == 0 { - if current_slot % self.num_validators == self.validator_id { - // View merge mechanism: a node accepts attestations that it received - // <= 1/4 before slot start, or attestations in the latest block - self.accept_new_votes()?; - self.propose_block()?; - } - // t=1/4: vote - } else if time_in_slot == SLOT_DURATION / 4 { - self.vote()?; - // t=2/4: compute the safe target (this must be done here to ensure - // that, assuming network latency assumptions are satisfied, anything that - // one honest node receives by this time, every honest node will receive by - // the general attestation deadline) - } else if time_in_slot == SLOT_DURATION * 2 / 4 { - self.safe_target = self.compute_safe_target()?; - // Deadline to accept attestations except for those included in a block - } else if time_in_slot == SLOT_DURATION * 3 / 4 { - self.accept_new_votes()?; - } - - Ok(()) - } - - fn get_current_slot(&self) -> anyhow::Result { - let network = self - .network - .lock() - .map_err(|err| anyhow!("Failed to acquire network lock: {err:?}"))?; - Ok(network.time / SLOT_DURATION + 2) - } - - /// Called when it's the staker's turn to propose a block - fn propose_block(&mut self) -> anyhow::Result<()> { - let new_slot = self.get_current_slot()?; - - let head_block = self - .chain - .get(&self.head) - .ok_or_else(|| anyhow!("Block not found in chain for head: {}", self.head))?; - - info!( - "proposing (Staker {}), head = {}", - self.validator_id, head_block.slot - ); - - let head_state = self - .post_states - .get(&self.head) - .ok_or_else(|| anyhow!("Post state not found for head: {}", self.head))?; - let mut new_block = Block { - slot: new_slot, - parent: self.head, - votes: VariableList::empty(), - // Diverged from Python implementation: Using `B256::ZERO` instead of `None`) - state_root: B256::ZERO, - }; - let mut state: LeanState; - - // Keep attempt to add valid votes from the list of available votes - loop { - state = process_block(head_state, &new_block)?; - - let new_votes_to_add = self - .known_votes - .clone() - .into_iter() - .filter(|vote| vote.source == state.latest_justified_hash) - .filter(|vote| !new_block.votes.contains(vote)) - .collect::>(); - - if new_votes_to_add.is_empty() { - break; - } - - for vote in new_votes_to_add { - new_block - .votes - .push(vote) - .map_err(|err| anyhow!("Failed to add vote to new_block: {err:?}"))?; - } - } - - new_block.state_root = state.tree_hash_root(); - let new_hash = new_block.tree_hash_root(); - - self.chain.insert(new_hash, new_block.clone()); - self.post_states.insert(new_hash, state); - - // TODO: submit to actual network - // self.get_network() - // .borrow_mut() - // .submit(QueueItem::BlockItem(new_block), self.validator_id); - - Ok(()) - } - - /// Called when it's the staker's turn to vote - fn vote(&mut self) -> anyhow::Result<()> { - let state = self - .post_states - .get(&self.head) - .ok_or_else(|| anyhow!("Post state not found for head: {}", self.head))?; - let mut target_block = self - .chain - .get(&self.head) - .ok_or_else(|| anyhow!("Block not found in chain for head: {}", self.head))?; - - // If there is no very recent safe target, then vote for the k'th ancestor - // of the head - for _ in 0..3 { - let safe_target_block = self.chain.get(&self.safe_target).ok_or_else(|| { - anyhow!("Block not found for safe target hash: {}", self.safe_target) - })?; - if target_block.slot > safe_target_block.slot { - target_block = self.chain.get(&target_block.parent).ok_or_else(|| { - anyhow!( - "Block not found for target block's parent hash: {}", - target_block.parent - ) - })?; - } - } - - // If the latest finalized slot is very far back, then only some slots are - // valid to justify, make sure the target is one of those - while !is_justifiable_slot(&state.latest_finalized_slot, &target_block.slot) { - target_block = self.chain.get(&target_block.parent).ok_or_else(|| { - anyhow!( - "Block not found for target block's parent hash: {}", - target_block.parent - ) - })?; - } - - let head_block = self - .chain - .get(&self.head) - .ok_or_else(|| anyhow!("Block not found for head: {}", self.head))?; - - let vote = Vote { - validator_id: self.validator_id, - slot: self.get_current_slot()?, - head: self.head, - head_slot: head_block.slot, - target: target_block.tree_hash_root(), - target_slot: target_block.slot, - source: state.latest_justified_hash, - source_slot: state.latest_justified_slot, - }; - - let signed_vote = SignedVote { - data: vote, - signature: PQSignature {}, - }; - - info!( - "voting (Staker {}), head = {}, t = {}, s = {}", - self.validator_id, &head_block.slot, &target_block.slot, &state.latest_justified_slot - ); - - self.receive(QueueItem::VoteItem(signed_vote))?; - - // TODO: submit to actual network - // self.get_network() - // .borrow_mut() - // .submit(QueueItem::VoteItem(vote), self.validator_id); - - Ok(()) - } - - /// Called by the p2p network - fn receive(&mut self, queue_item: QueueItem) -> anyhow::Result<()> { - match queue_item { - QueueItem::BlockItem(block) => { - let block_hash = block.tree_hash_root(); - - // If the block is already known, ignore it - if self.chain.contains_key(&block_hash) { - return Ok(()); - } - - match self.post_states.get(&block.parent) { - Some(parent_state) => { - let state = process_block(parent_state, &block)?; - - for vote in &block.votes { - if !self.known_votes.contains(vote) { - self.known_votes.push(vote.clone()); - } - } - - self.chain.insert(block_hash, block); - self.post_states.insert(block_hash, state); - - self.recompute_head()?; - - // Once we have received a block, also process all of its dependencies - if let Some(queue_items) = self.dependencies.remove(&block_hash) { - for item in queue_items { - self.receive(item)?; - } - } - } - None => { - // If we have not yet seen the block's parent, ignore for now, - // process later once we actually see the parent - self.dependencies - .entry(block.parent) - .or_default() - .push(QueueItem::BlockItem(block)); - } - } - } - QueueItem::VoteItem(vote) => { - let is_known_vote = self.known_votes.contains(&vote.data); - let is_new_vote = self.new_votes.contains(&vote.data); - - if is_known_vote || is_new_vote { - // Do nothing - } else if self.chain.contains_key(&vote.data.head) { - self.new_votes.push(vote.data); - } else { - self.dependencies - .entry(vote.data.head) - .or_default() - .push(QueueItem::VoteItem(vote)); - } - } - } - - Ok(()) - } -} From ce0bf38e29208f219e7fa493e021ac883af17558 Mon Sep 17 00:00:00 2001 From: Jun Song Date: Thu, 7 Aug 2025 15:13:44 +0900 Subject: [PATCH 20/21] chore: apply Kayden's review (no abbreviation & clear logs --- crates/common/chain/lean/src/service.rs | 27 +++++++++++++++------ crates/common/validator/lean/src/service.rs | 26 ++++++++++++-------- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/crates/common/chain/lean/src/service.rs b/crates/common/chain/lean/src/service.rs index ad1b97e1a..1221131b3 100644 --- a/crates/common/chain/lean/src/service.rs +++ b/crates/common/chain/lean/src/service.rs @@ -15,7 +15,7 @@ use tokio::{ use tracing::info; use tree_hash::TreeHash; -use crate::lean_chain::LeanChain; +use crate::{lean_chain::LeanChain, slot::get_current_slot}; #[derive(Debug, Clone)] pub struct LeanChainServiceMessage { @@ -48,8 +48,6 @@ impl LeanChainService { } pub async fn start(mut self) { - info!("Lean Chain Service started"); - // TODO: Duplicate clock logic from ValidatorService. May need to refactor later. // Get the Lean network specification. @@ -57,6 +55,8 @@ impl LeanChainService { let seconds_per_slot = network_spec.seconds_per_slot; let genesis_time = network_spec.genesis_time; + info!("LeanChainService started with genesis_time={genesis_time}"); + // Calculate the genesis instant from the genesis time (in seconds). let genesis_instant = UNIX_EPOCH + Duration::from_secs(genesis_time); @@ -80,13 +80,15 @@ impl LeanChainService { match tick_count % 4 { 2 => { // Third tick (t=2/4): Compute the safe target. - info!("Compute safe target."); + let current_slot = get_current_slot(); + info!("Computing safe target at slot {current_slot} (tick {tick_count})"); let mut lean_chain = self.lean_chain.write().await; lean_chain.safe_target = lean_chain.compute_safe_target().expect("Failed to compute safe target"); } 3 => { // Fourth tick (t=3/4): Accept new votes. - info!("Accept new votes."); + let current_slot = get_current_slot(); + info!("Accepting new votes at slot {current_slot} (tick {tick_count})"); self.lean_chain.write().await.accept_new_votes().expect("Failed to accept new votes"); } _ => { @@ -109,11 +111,22 @@ impl LeanChainService { async fn handle_item(&mut self, item: QueueItem) { match item { QueueItem::BlockItem(block) => { - info!("Received block: {:?}", block); + let block_hash = block.tree_hash_root(); + info!( + "Received block at slot {} with hash {block_hash:?} from parent {:?}", + block.slot, block.parent + ); let _ = self.handle_block(block).await; } QueueItem::VoteItem(vote_item) => { - info!("Received vote_item: {:?}", vote_item); + let vote = match &vote_item { + VoteItem::Signed(signed) => &signed.data, + VoteItem::Unsigned(vote) => vote, + }; + info!( + "Received vote from validator {} for head {:?} at slot {}", + vote.validator_id, vote.head, vote.slot + ); self.handle_vote(vote_item).await; } } diff --git a/crates/common/validator/lean/src/service.rs b/crates/common/validator/lean/src/service.rs index 2504a3350..3e6681f68 100644 --- a/crates/common/validator/lean/src/service.rs +++ b/crates/common/validator/lean/src/service.rs @@ -61,8 +61,6 @@ impl ValidatorService { } pub async fn start(self) { - info!("Validator Service started"); - // TODO: Duplicate clock logic from LeanChainService. May need to refactor later. // Get the Lean network specification. @@ -70,6 +68,11 @@ impl ValidatorService { let seconds_per_slot = network_spec.seconds_per_slot; let genesis_time = network_spec.genesis_time; + info!( + "ValidatorService started with {} validator(s), genesis_time={genesis_time}", + self.keystores.len() + ); + // Calculate the genesis instant from the genesis time (in seconds). let genesis_instant = UNIX_EPOCH + Duration::from_secs(genesis_time); @@ -93,8 +96,9 @@ impl ValidatorService { match tick_count % 4 { 0 => { // First tick (t=0): Propose a block. + let current_slot = get_current_slot(); if let Some(keystore) = self.is_proposer() { - info!("Propose block by validator {}", keystore.id); + info!("Validator {} proposing block for slot {current_slot} (tick {tick_count})", keystore.id); // Acquire the write lock. `accept_new_votes` and `build_block` will modify the lean chain. let mut lean_chain = self.lean_chain.write().await; @@ -106,7 +110,7 @@ impl ValidatorService { let new_block = lean_chain.propose_block().expect("Failed to build block"); info!( - "Built block for validator {}. Block Info(slot: {}, parent: {}, len(votes): {}, state_root: {})", + "Validator {} built block: slot={}, parent={:?}, votes={}, state_root={:?}", keystore.id, new_block.slot, new_block.parent, @@ -117,21 +121,23 @@ impl ValidatorService { // TODO 1: Sign the block with the keystore. // TODO 2: Send the block to the network. } else { - info!("Not a proposer, skipping block proposal."); + let proposer_index = current_slot % lean_network_spec().num_validators; + info!("Not proposer for slot {current_slot} (proposer is validator {proposer_index}), skipping"); } } 1 => { // Second tick (t=1/4): Vote. - info!("Vote."); + let current_slot = get_current_slot(); + info!("Starting vote phase at slot {current_slot} (tick {tick_count}): {} validator(s) voting", self.keystores.len()); // Build the vote from LeanChain, and modify its validator ID let vote_template = self.lean_chain.read().await.build_vote().expect("Failed to build vote"); - info!("Built votes for validators: {vote_template:?}"); + info!("Built vote template for head {:?} at slot {} with target {:?}", vote_template.head, vote_template.slot, vote_template.target); - let votes = self.keystores.iter().map(|ks| { + let votes = self.keystores.iter().map(|keystore| { let mut vote = vote_template.clone(); - vote.validator_id = ks.id; + vote.validator_id = keystore.id; vote }).collect::>(); @@ -163,6 +169,6 @@ impl ValidatorService { self.keystores .iter() - .find(|ks| ks.id == proposer_index as u64) + .find(|keystore| keystore.id == proposer_index as u64) } } From fe88be5f980a1cbe479bca46dc90f6c289e59e4a Mon Sep 17 00:00:00 2001 From: Jun Song Date: Thu, 7 Aug 2025 17:44:57 +0900 Subject: [PATCH 21/21] chore: apply review from O (improve docs) --- crates/common/chain/lean/src/lean_chain.rs | 4 +++- crates/common/chain/lean/src/service.rs | 24 ++++++++++++++-------- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/crates/common/chain/lean/src/lean_chain.rs b/crates/common/chain/lean/src/lean_chain.rs index d28d25534..af241fae7 100644 --- a/crates/common/chain/lean/src/lean_chain.rs +++ b/crates/common/chain/lean/src/lean_chain.rs @@ -188,7 +188,9 @@ impl LeanChain { .ok_or_else(|| anyhow!("Block not found for head: {}", self.head))?; Ok(Vote { - // Replace with actual validator ID + // NOTE: This is a placeholder for `validator_id`. + // This field will eventually be set by the `ValidatorService` with the actual validator + // IDs. validator_id: 0, slot: get_current_slot(), head: self.head, diff --git a/crates/common/chain/lean/src/service.rs b/crates/common/chain/lean/src/service.rs index 1221131b3..964b4542b 100644 --- a/crates/common/chain/lean/src/service.rs +++ b/crates/common/chain/lean/src/service.rs @@ -119,14 +119,22 @@ impl LeanChainService { let _ = self.handle_block(block).await; } QueueItem::VoteItem(vote_item) => { - let vote = match &vote_item { - VoteItem::Signed(signed) => &signed.data, - VoteItem::Unsigned(vote) => vote, - }; - info!( - "Received vote from validator {} for head {:?} at slot {}", - vote.validator_id, vote.head, vote.slot - ); + match &vote_item { + VoteItem::Signed(signed_vote) => { + let vote = &signed_vote.data; + info!( + "Received signed vote from validator {} for head {:?} at slot {}", + vote.validator_id, vote.head, vote.slot + ); + } + VoteItem::Unsigned(vote) => { + info!( + "Received unsigned vote from validator {} for head {:?} at slot {}", + vote.validator_id, vote.head, vote.slot + ); + } + } + self.handle_vote(vote_item).await; } }