diff --git a/Cargo.lock b/Cargo.lock index cfc940bdd..874a3c27a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5161,6 +5161,7 @@ dependencies = [ "rand 0.8.5", "ream-account-manager", "ream-beacon-api-types", + "ream-chain-lean", "ream-checkpoint-sync", "ream-consensus-beacon", "ream-consensus-misc", @@ -5262,7 +5263,6 @@ dependencies = [ "ream-consensus-lean", "ream-consensus-misc", "ream-network-spec", - "ream-p2p", "ream-pqc", "serde", "ssz_types", @@ -5570,6 +5570,7 @@ dependencies = [ "libp2p-identity", "libp2p-mplex", "parking_lot", + "ream-chain-lean", "ream-consensus-beacon", "ream-consensus-misc", "ream-discv5", @@ -5756,6 +5757,11 @@ dependencies = [ name = "ream-validator-lean" version = "0.1.0" dependencies = [ + "ream-chain-lean", + "ream-consensus-lean", + "ream-consensus-misc", + "ream-network-spec", + "tokio", "tracing", ] diff --git a/bin/ream/Cargo.toml b/bin/ream/Cargo.toml index 3564d137c..692e42812 100644 --- a/bin/ream/Cargo.toml +++ b/bin/ream/Cargo.toml @@ -31,6 +31,7 @@ url.workspace = true # ream dependencies ream-account-manager.workspace = true ream-beacon-api-types.workspace = true +ream-chain-lean.workspace = true ream-checkpoint-sync.workspace = true ream-consensus-beacon.workspace = true ream-consensus-misc.workspace = true diff --git a/bin/ream/assets/lean/sample_spec.yml b/bin/ream/assets/lean/sample_spec.yml index a2928acba..36f64dc91 100644 --- a/bin/ream/assets/lean/sample_spec.yml +++ b/bin/ream/assets/lean/sample_spec.yml @@ -1,2 +1,3 @@ SECONDS_PER_SLOT: 12 GENESIS_TIME: 0 +NUM_VALIDATORS: 4 diff --git a/bin/ream/src/main.rs b/bin/ream/src/main.rs index a162256de..95133c494 100644 --- a/bin/ream/src/main.rs +++ b/bin/ream/src/main.rs @@ -1,5 +1,7 @@ use std::{ - env, process, + env, + ops::Deref, + process, sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -15,6 +17,11 @@ use ream::cli::{ voluntary_exit::VoluntaryExitConfig, }; use ream_beacon_api_types::id::{ID, ValidatorID}; +use ream_chain_lean::{ + genesis as lean_genesis, + lean_chain::LeanChain, + service::{LeanChainService, LeanChainServiceMessage}, +}; use ream_checkpoint_sync::initialize_db_from_checkpoint; use ream_consensus_misc::{ constants::beacon::set_genesis_validator_root, misc::compute_epoch_at_slot, @@ -37,6 +44,7 @@ use ream_validator_beacon::{ voluntary_exit::process_voluntary_exit, }; use ream_validator_lean::service::ValidatorService as LeanValidatorService; +use tokio::sync::{RwLock, mpsc}; use tracing::{error, info}; use tracing_subscriber::EnvFilter; @@ -92,14 +100,55 @@ fn main() { } /// Runs the lean node. +/// +/// A lean node runs several services with different responsibilities. +/// Refer to each service's documentation for more details. +/// +/// A lean node has one shared state, `LeanChain` (wrapped with synchronization primitives), which +/// is used by all services. +/// +/// Besides the shared state, each service holds the channels to communicate with each other. pub async fn run_lean_node(config: LeanNodeConfig, executor: ReamExecutor) { info!("starting up lean node..."); - set_lean_network_spec(config.network.clone()); + // Hack: It is bothersome to modify the spec every time we run the lean node. + // Set genesis time to a future time if it is in the past. + // FIXME: Add a script to generate the YAML config file. + let network = { + let current_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("System time is before UNIX epoch") + .as_secs(); + + if config.network.genesis_time < current_timestamp { + let mut network = config.network.deref().clone(); + network.genesis_time = current_timestamp + 3; // Set genesis time to 3 seconds in the future. + Arc::new(network) + } else { + config.network.clone() + } + }; - let network_service = LeanNetworkService::new().await; - let validator_service = LeanValidatorService::new().await; + set_lean_network_spec(network); + // Initialize the lean chain with genesis block and state. + let (genesis_block, genesis_state) = lean_genesis::setup_genesis(); + let lean_chain = Arc::new(RwLock::new(LeanChain::new(genesis_block, genesis_state))); + + // Initialize the services that will run in the lean node. + let (chain_sender, chain_receiver) = mpsc::unbounded_channel::(); + + // TODO 1: Load keystores from the config. + // TODO 2: Add RPC service for lean node. + let chain_service = LeanChainService::new(lean_chain.clone(), chain_receiver).await; + let network_service = LeanNetworkService::new(lean_chain.clone()).await; + let validator_service = + LeanValidatorService::new(lean_chain.clone(), Vec::new(), chain_sender).await; + + // Start the services concurrently. + let chain_future = executor.spawn(async move { + chain_service.start().await; + }); let network_future = executor.spawn(async move { network_service.start().await; }); @@ -108,6 +157,9 @@ pub async fn run_lean_node(config: LeanNodeConfig, executor: ReamExecutor) { }); tokio::select! { + _ = chain_future => { + info!("Chain service has stopped unexpectedly"); + } _ = network_future => { info!("Network service has stopped unexpectedly"); } diff --git a/crates/common/chain/lean/Cargo.toml b/crates/common/chain/lean/Cargo.toml index 9bc6137bd..f071190f0 100644 --- a/crates/common/chain/lean/Cargo.toml +++ b/crates/common/chain/lean/Cargo.toml @@ -22,5 +22,4 @@ tree_hash.workspace = true ream-consensus-lean.workspace = true ream-consensus-misc.workspace = true ream-network-spec.workspace = true -ream-p2p.workspace = true ream-pqc.workspace = true diff --git a/crates/common/chain/lean/src/genesis.rs b/crates/common/chain/lean/src/genesis.rs new file mode 100644 index 000000000..2c1e0e435 --- /dev/null +++ b/crates/common/chain/lean/src/genesis.rs @@ -0,0 +1,37 @@ +use alloy_primitives::B256; +use ream_consensus_lean::{block::Block, state::LeanState}; +use ream_network_spec::networks::lean_network_spec; +use ssz_types::VariableList; +use tree_hash::TreeHash; + +fn genesis_block(state_root: B256) -> Block { + Block { + slot: 1, + parent: B256::ZERO, + votes: VariableList::empty(), + state_root, + } +} + +fn genesis_state(num_validators: u64) -> LeanState { + LeanState::new(num_validators) +} + +/// Setup the genesis block and state for the Lean chain. +/// +/// Reference: https://github.com/ethereum/research/blob/d225a6775a9b184b5c1fd6c830cc58a375d9535f/3sf-mini/test_p2p.py#L119-L131 +pub fn setup_genesis() -> (Block, LeanState) { + let mut genesis_state = genesis_state(lean_network_spec().num_validators); + genesis_state + .historical_block_hashes + .push(B256::ZERO) + .expect("Failed to add genesis block hash"); + genesis_state + .justified_slots + .push(true) + .expect("Failed to add genesis justified slot"); + + let genesis_block = genesis_block(genesis_state.tree_hash_root()); + + (genesis_block, genesis_state) +} diff --git a/crates/common/chain/lean/src/lean_chain.rs b/crates/common/chain/lean/src/lean_chain.rs new file mode 100644 index 000000000..af241fae7 --- /dev/null +++ b/crates/common/chain/lean/src/lean_chain.rs @@ -0,0 +1,206 @@ +use std::collections::HashMap; + +use alloy_primitives::B256; +use anyhow::anyhow; +use ream_consensus_lean::{ + block::Block, get_fork_choice_head, get_latest_justified_hash, is_justifiable_slot, + process_block, state::LeanState, vote::Vote, +}; +use ssz_types::VariableList; +use tree_hash::TreeHash; + +use crate::slot::get_current_slot; + +/// [LeanChain] represents the state that the Lean node should maintain. +/// +/// Most of the fields are based on the Python implementation of [`Staker`](https://github.com/ethereum/research/blob/d225a6775a9b184b5c1fd6c830cc58a375d9535f/3sf-mini/p2p.py#L15-L42), +/// but doesn't include `validator_id` as a node should manage multiple validators. +#[derive(Clone, Debug)] +pub struct LeanChain { + pub chain: HashMap, + pub post_states: HashMap, + pub known_votes: Vec, + pub new_votes: Vec, + pub genesis_hash: B256, + pub num_validators: u64, + pub safe_target: B256, + pub head: B256, +} + +impl LeanChain { + pub fn new(genesis_block: Block, genesis_state: LeanState) -> LeanChain { + let genesis_hash = genesis_block.tree_hash_root(); + + LeanChain { + // Votes that we have received and taken into account + known_votes: Vec::new(), + // Votes that we have received but not yet taken into account + new_votes: Vec::new(), + // Initialize the chain with the genesis block + genesis_hash, + num_validators: genesis_state.config.num_validators, + // Block that it is safe to use to vote as the target + // Diverge from Python implementation: Use genesis hash instead of `None` + safe_target: genesis_hash, + // Head of the chain + head: genesis_hash, + // {block_hash: block} for all blocks that we know about + chain: HashMap::from([(genesis_hash, genesis_block)]), + // {block_hash: post_state} for all blocks that we know about + post_states: HashMap::from([(genesis_hash, genesis_state)]), + } + } + + pub fn latest_justified_hash(&self) -> Option { + get_latest_justified_hash(&self.post_states) + } + + pub fn latest_finalized_hash(&self) -> Option { + self.post_states + .get(&self.head) + .map(|state| state.latest_finalized_hash) + } + + /// Compute the latest block that the staker is allowed to choose as the target + pub fn compute_safe_target(&self) -> anyhow::Result { + let justified_hash = get_latest_justified_hash(&self.post_states) + .ok_or_else(|| anyhow!("No justified hash found in post states"))?; + + get_fork_choice_head( + &self.chain, + &justified_hash, + &self.new_votes, + self.num_validators * 2 / 3, + ) + } + + /// Process new votes that the staker has received. Vote processing is done + /// at a particular time, because of safe target and view merge rule + pub fn accept_new_votes(&mut self) -> anyhow::Result<()> { + for new_vote in self.new_votes.drain(..) { + if !self.known_votes.contains(&new_vote) { + self.known_votes.push(new_vote); + } + } + + self.recompute_head()?; + Ok(()) + } + + /// Done upon processing new votes or a new block + pub fn recompute_head(&mut self) -> anyhow::Result<()> { + let justified_hash = get_latest_justified_hash(&self.post_states) + .ok_or_else(|| anyhow!("Failed to get latest_justified_hash from post_states"))?; + self.head = get_fork_choice_head(&self.chain, &justified_hash, &self.known_votes, 0)?; + Ok(()) + } + + pub fn propose_block(&mut self) -> anyhow::Result { + let new_slot = get_current_slot(); + + let head_state = self + .post_states + .get(&self.head) + .ok_or_else(|| anyhow!("Post state not found for head: {}", self.head))?; + let mut new_block = Block { + slot: new_slot, + parent: self.head, + votes: VariableList::empty(), + // Diverged from Python implementation: Using `B256::ZERO` instead of `None`) + state_root: B256::ZERO, + }; + let mut state: LeanState; + + // Keep attempt to add valid votes from the list of available votes + loop { + state = process_block(head_state, &new_block)?; + + let new_votes_to_add = self + .known_votes + .clone() + .into_iter() + .filter(|vote| vote.source == state.latest_justified_hash) + .filter(|vote| !new_block.votes.contains(vote)) + .collect::>(); + + if new_votes_to_add.is_empty() { + break; + } + + for vote in new_votes_to_add { + new_block + .votes + .push(vote) + .map_err(|err| anyhow!("Failed to add vote to new_block: {err:?}"))?; + } + } + + new_block.state_root = state.tree_hash_root(); + + let digest = new_block.tree_hash_root(); + + self.chain.insert(digest, new_block.clone()); + self.post_states.insert(digest, state); + + Ok(new_block) + } + + pub fn build_vote(&self) -> anyhow::Result { + let state = self + .post_states + .get(&self.head) + .ok_or_else(|| anyhow!("Post state not found for head: {}", self.head))?; + let mut target_block = self + .chain + .get(&self.head) + .ok_or_else(|| anyhow!("Block not found in chain for head: {}", self.head))?; + + // If there is no very recent safe target, then vote for the k'th ancestor + // of the head + for _ in 0..3 { + let safe_target_block = self.chain.get(&self.safe_target).ok_or_else(|| { + anyhow!("Block not found for safe target hash: {}", self.safe_target) + })?; + if target_block.slot > safe_target_block.slot { + target_block = self.chain.get(&target_block.parent).ok_or_else(|| { + anyhow!( + "Block not found for target block's parent hash: {}", + target_block.parent + ) + })?; + } + } + + // If the latest finalized slot is very far back, then only some slots are + // valid to justify, make sure the target is one of those + while !is_justifiable_slot(&state.latest_finalized_slot, &target_block.slot) { + target_block = self.chain.get(&target_block.parent).ok_or_else(|| { + anyhow!( + "Block not found for target block's parent hash: {}", + target_block.parent + ) + })?; + } + + let head_block = self + .chain + .get(&self.head) + .ok_or_else(|| anyhow!("Block not found for head: {}", self.head))?; + + Ok(Vote { + // NOTE: This is a placeholder for `validator_id`. + // This field will eventually be set by the `ValidatorService` with the actual validator + // IDs. + validator_id: 0, + slot: get_current_slot(), + head: self.head, + head_slot: head_block.slot, + target: target_block.tree_hash_root(), + target_slot: target_block.slot, + source: state.latest_justified_hash, + source_slot: state.latest_justified_slot, + }) + } + + // TODO: Add necessary methods for receive. +} diff --git a/crates/common/chain/lean/src/lib.rs b/crates/common/chain/lean/src/lib.rs index c8f122945..c8210f8fc 100644 --- a/crates/common/chain/lean/src/lib.rs +++ b/crates/common/chain/lean/src/lib.rs @@ -1 +1,4 @@ -pub mod staker; +pub mod genesis; +pub mod lean_chain; +pub mod service; +pub mod slot; diff --git a/crates/common/chain/lean/src/service.rs b/crates/common/chain/lean/src/service.rs new file mode 100644 index 000000000..964b4542b --- /dev/null +++ b/crates/common/chain/lean/src/service.rs @@ -0,0 +1,218 @@ +use std::{ + collections::HashMap, + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use alloy_primitives::B256; +use ream_consensus_lean::{QueueItem, VoteItem, block::Block, process_block}; +use ream_consensus_misc::constants::lean::INTERVALS_PER_SLOT; +use ream_network_spec::networks::lean_network_spec; +use tokio::{ + sync::{RwLock, mpsc}, + time::{Instant, MissedTickBehavior, interval_at}, +}; +use tracing::info; +use tree_hash::TreeHash; + +use crate::{lean_chain::LeanChain, slot::get_current_slot}; + +#[derive(Debug, Clone)] +pub struct LeanChainServiceMessage { + pub item: QueueItem, +} + +/// LeanChainService is responsible for updating the [LeanChain] state. `LeanChain` is updated when: +/// 1. Every third (t=2/4) and fourth (t=3/4) ticks. +/// 2. Receiving new blocks or votes from the network. +/// +/// NOTE: This service will be the core service to implement `receive()` function. +pub struct LeanChainService { + lean_chain: Arc>, + receiver: mpsc::UnboundedReceiver, + + // Objects that we will process once we have processed their parents + dependencies: HashMap>, +} + +impl LeanChainService { + pub async fn new( + lean_chain: Arc>, + receiver: mpsc::UnboundedReceiver, + ) -> Self { + LeanChainService { + lean_chain, + receiver, + dependencies: HashMap::new(), + } + } + + pub async fn start(mut self) { + // TODO: Duplicate clock logic from ValidatorService. May need to refactor later. + + // Get the Lean network specification. + let network_spec = lean_network_spec(); + let seconds_per_slot = network_spec.seconds_per_slot; + let genesis_time = network_spec.genesis_time; + + info!("LeanChainService started with genesis_time={genesis_time}"); + + // Calculate the genesis instant from the genesis time (in seconds). + let genesis_instant = UNIX_EPOCH + Duration::from_secs(genesis_time); + + // Assume genesis time is "always" in the future, + // as we don't support syncing features yet. + let interval_start = Instant::now() + + genesis_instant + .duration_since(SystemTime::now()) + .expect("Genesis time is in the past"); + + let mut tick_count = 0u64; + let mut interval = interval_at( + interval_start, + Duration::from_secs(seconds_per_slot / INTERVALS_PER_SLOT), + ); + interval.set_missed_tick_behavior(MissedTickBehavior::Burst); + + loop { + tokio::select! { + _ = interval.tick() => { + match tick_count % 4 { + 2 => { + // Third tick (t=2/4): Compute the safe target. + let current_slot = get_current_slot(); + info!("Computing safe target at slot {current_slot} (tick {tick_count})"); + let mut lean_chain = self.lean_chain.write().await; + lean_chain.safe_target = lean_chain.compute_safe_target().expect("Failed to compute safe target"); + } + 3 => { + // Fourth tick (t=3/4): Accept new votes. + let current_slot = get_current_slot(); + info!("Accepting new votes at slot {current_slot} (tick {tick_count})"); + self.lean_chain.write().await.accept_new_votes().expect("Failed to accept new votes"); + } + _ => { + // Other ticks (t=0, t=1/4): Do nothing. + } + } + tick_count += 1; + } + Some(message) = self.receiver.recv() => { + self.handle_message(message).await; + } + } + } + } + + async fn handle_message(&mut self, message: LeanChainServiceMessage) { + self.handle_item(message.item).await; + } + + async fn handle_item(&mut self, item: QueueItem) { + match item { + QueueItem::BlockItem(block) => { + let block_hash = block.tree_hash_root(); + info!( + "Received block at slot {} with hash {block_hash:?} from parent {:?}", + block.slot, block.parent + ); + let _ = self.handle_block(block).await; + } + QueueItem::VoteItem(vote_item) => { + match &vote_item { + VoteItem::Signed(signed_vote) => { + let vote = &signed_vote.data; + info!( + "Received signed vote from validator {} for head {:?} at slot {}", + vote.validator_id, vote.head, vote.slot + ); + } + VoteItem::Unsigned(vote) => { + info!( + "Received unsigned vote from validator {} for head {:?} at slot {}", + vote.validator_id, vote.head, vote.slot + ); + } + } + + self.handle_vote(vote_item).await; + } + } + } + + async fn handle_block(&mut self, block: Block) -> anyhow::Result<()> { + let block_hash = block.tree_hash_root(); + + let mut lean_chain = self.lean_chain.write().await; + + // If the block is already known, ignore it + if lean_chain.chain.contains_key(&block_hash) { + return Ok(()); + } + + match lean_chain.post_states.get(&block.parent) { + Some(parent_state) => { + let state = process_block(parent_state, &block)?; + + for vote in &block.votes { + if !lean_chain.known_votes.contains(vote) { + lean_chain.known_votes.push(vote.clone()); + } + } + + lean_chain.chain.insert(block_hash, block); + lean_chain.post_states.insert(block_hash, state); + + lean_chain.recompute_head()?; + + drop(lean_chain); + + // Once we have received a block, also process all of its dependencies + if let Some(queue_items) = self.dependencies.remove(&block_hash) { + for item in queue_items { + Box::pin(self.handle_item(item)).await; + } + } + } + None => { + // If we have not yet seen the block's parent, ignore for now, + // process later once we actually see the parent + self.dependencies + .entry(block.parent) + .or_default() + .push(QueueItem::BlockItem(block)); + } + } + + Ok(()) + } + + async fn handle_vote(&mut self, vote_item: VoteItem) { + let vote = match vote_item { + VoteItem::Signed(vote) => { + // TODO: Validate the signature. + vote.data + } + VoteItem::Unsigned(vote) => vote, + }; + + let lean_chain = self.lean_chain.read().await; + let is_known_vote = lean_chain.known_votes.contains(&vote); + let is_new_vote = lean_chain.new_votes.contains(&vote); + + if is_known_vote || is_new_vote { + // Do nothing + } else if lean_chain.chain.contains_key(&vote.head) { + drop(lean_chain); + + // We should acquire another write lock + let mut lean_chain = self.lean_chain.write().await; + lean_chain.new_votes.push(vote); + } else { + self.dependencies + .entry(vote.head) + .or_default() + .push(QueueItem::VoteItem(VoteItem::Unsigned(vote))); + } + } +} diff --git a/crates/common/chain/lean/src/slot.rs b/crates/common/chain/lean/src/slot.rs new file mode 100644 index 000000000..4bc45abc5 --- /dev/null +++ b/crates/common/chain/lean/src/slot.rs @@ -0,0 +1,18 @@ +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use ream_network_spec::networks::lean_network_spec; + +/// NOTE: Vitalik's implementation of 3SF-mini adds 2 slots more due to the test setup. +/// This is due to the fact that his test code starts at slot 1. +pub fn get_current_slot() -> u64 { + let network_spec = lean_network_spec(); + let seconds_per_slot = network_spec.seconds_per_slot; + let genesis_time = network_spec.genesis_time; + + let genesis_instant = UNIX_EPOCH + Duration::from_secs(genesis_time); + let elapsed = SystemTime::now() + .duration_since(genesis_instant) + .expect("Called before genesis time"); + + (elapsed.as_secs() / seconds_per_slot) + 2 +} diff --git a/crates/common/chain/lean/src/staker.rs b/crates/common/chain/lean/src/staker.rs deleted file mode 100644 index a821b750f..000000000 --- a/crates/common/chain/lean/src/staker.rs +++ /dev/null @@ -1,360 +0,0 @@ -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; - -use alloy_primitives::B256; -use anyhow::anyhow; -use ream_consensus_lean::{ - QueueItem, - block::Block, - get_fork_choice_head, get_latest_justified_hash, is_justifiable_slot, process_block, - state::LeanState, - vote::{SignedVote, Vote}, -}; -use ream_consensus_misc::constants::lean::SLOT_DURATION; -use ream_p2p::network::lean::NetworkService; -use ream_pqc::PQSignature; -use ssz_types::VariableList; -use tracing::info; -use tree_hash::TreeHash; - -pub struct Staker { - pub validator_id: u64, - pub chain: HashMap, - pub network: Arc>, - pub post_states: HashMap, - pub known_votes: Vec, - pub new_votes: Vec, - pub dependencies: HashMap>, - pub genesis_hash: B256, - pub num_validators: u64, - pub safe_target: B256, - pub head: B256, -} - -impl Staker { - pub fn new( - validator_id: u64, - network: Arc>, - genesis_block: Block, - genesis_state: LeanState, - ) -> Staker { - let genesis_hash = genesis_block.tree_hash_root(); - - Staker { - // This node's validator ID - validator_id, - // Hook to the p2p network - network, - // Votes that we have received and taken into account - known_votes: Vec::new(), - // Votes that we have received but not yet taken into account - new_votes: Vec::new(), - // Objects that we will process once we have processed their parents - dependencies: HashMap::new(), - // Initialize the chain with the genesis block - genesis_hash, - num_validators: genesis_state.config.num_validators, - // Block that it is safe to use to vote as the target - // Diverge from Python implementation: Use genesis hash instead of `None` - safe_target: genesis_hash, - // Head of the chain - head: genesis_hash, - // {block_hash: block} for all blocks that we know about - chain: HashMap::from([(genesis_hash, genesis_block)]), - // {block_hash: post_state} for all blocks that we know about - post_states: HashMap::from([(genesis_hash, genesis_state)]), - } - } - - pub fn latest_justified_hash(&self) -> Option { - get_latest_justified_hash(&self.post_states) - } - - pub fn latest_finalized_hash(&self) -> Option { - self.post_states - .get(&self.head) - .map(|state| state.latest_finalized_hash) - } - - /// Compute the latest block that the staker is allowed to choose as the target - fn compute_safe_target(&self) -> anyhow::Result { - let justified_hash = get_latest_justified_hash(&self.post_states) - .ok_or_else(|| anyhow!("No justified hash found in post states"))?; - - get_fork_choice_head( - &self.chain, - &justified_hash, - &self.new_votes, - self.num_validators * 2 / 3, - ) - } - - /// Process new votes that the staker has received. Vote processing is done - /// at a particular time, because of safe target and view merge rule - fn accept_new_votes(&mut self) -> anyhow::Result<()> { - for new_vote in self.new_votes.drain(..) { - if !self.known_votes.contains(&new_vote) { - self.known_votes.push(new_vote); - } - } - - self.recompute_head()?; - Ok(()) - } - - /// Done upon processing new votes or a new block - fn recompute_head(&mut self) -> anyhow::Result<()> { - let justified_hash = get_latest_justified_hash(&self.post_states) - .ok_or_else(|| anyhow!("Failed to get latest_justified_hash from post_states"))?; - self.head = get_fork_choice_head(&self.chain, &justified_hash, &self.known_votes, 0)?; - Ok(()) - } - - /// Called every second - pub fn tick(&mut self) -> anyhow::Result<()> { - let current_slot = self.get_current_slot()?; - let time_in_slot = { - let network = self - .network - .lock() - .map_err(|err| anyhow!("Failed to acquire network lock: {err:?}"))?; - network.time % SLOT_DURATION - }; - - // t=0: propose a block - if time_in_slot == 0 { - if current_slot % self.num_validators == self.validator_id { - // View merge mechanism: a node accepts attestations that it received - // <= 1/4 before slot start, or attestations in the latest block - self.accept_new_votes()?; - self.propose_block()?; - } - // t=1/4: vote - } else if time_in_slot == SLOT_DURATION / 4 { - self.vote()?; - // t=2/4: compute the safe target (this must be done here to ensure - // that, assuming network latency assumptions are satisfied, anything that - // one honest node receives by this time, every honest node will receive by - // the general attestation deadline) - } else if time_in_slot == SLOT_DURATION * 2 / 4 { - self.safe_target = self.compute_safe_target()?; - // Deadline to accept attestations except for those included in a block - } else if time_in_slot == SLOT_DURATION * 3 / 4 { - self.accept_new_votes()?; - } - - Ok(()) - } - - fn get_current_slot(&self) -> anyhow::Result { - let network = self - .network - .lock() - .map_err(|err| anyhow!("Failed to acquire network lock: {err:?}"))?; - Ok(network.time / SLOT_DURATION + 2) - } - - /// Called when it's the staker's turn to propose a block - fn propose_block(&mut self) -> anyhow::Result<()> { - let new_slot = self.get_current_slot()?; - - let head_block = self - .chain - .get(&self.head) - .ok_or_else(|| anyhow!("Block not found in chain for head: {}", self.head))?; - - info!( - "proposing (Staker {}), head = {}", - self.validator_id, head_block.slot - ); - - let head_state = self - .post_states - .get(&self.head) - .ok_or_else(|| anyhow!("Post state not found for head: {}", self.head))?; - let mut new_block = Block { - slot: new_slot, - parent: self.head, - votes: VariableList::empty(), - // Diverged from Python implementation: Using `B256::ZERO` instead of `None`) - state_root: B256::ZERO, - }; - let mut state: LeanState; - - // Keep attempt to add valid votes from the list of available votes - loop { - state = process_block(head_state, &new_block)?; - - let new_votes_to_add = self - .known_votes - .clone() - .into_iter() - .filter(|vote| vote.source == state.latest_justified_hash) - .filter(|vote| !new_block.votes.contains(vote)) - .collect::>(); - - if new_votes_to_add.is_empty() { - break; - } - - for vote in new_votes_to_add { - new_block - .votes - .push(vote) - .map_err(|err| anyhow!("Failed to add vote to new_block: {err:?}"))?; - } - } - - new_block.state_root = state.tree_hash_root(); - let new_hash = new_block.tree_hash_root(); - - self.chain.insert(new_hash, new_block.clone()); - self.post_states.insert(new_hash, state); - - // TODO: submit to actual network - // self.get_network() - // .borrow_mut() - // .submit(QueueItem::BlockItem(new_block), self.validator_id); - - Ok(()) - } - - /// Called when it's the staker's turn to vote - fn vote(&mut self) -> anyhow::Result<()> { - let state = self - .post_states - .get(&self.head) - .ok_or_else(|| anyhow!("Post state not found for head: {}", self.head))?; - let mut target_block = self - .chain - .get(&self.head) - .ok_or_else(|| anyhow!("Block not found in chain for head: {}", self.head))?; - - // If there is no very recent safe target, then vote for the k'th ancestor - // of the head - for _ in 0..3 { - let safe_target_block = self.chain.get(&self.safe_target).ok_or_else(|| { - anyhow!("Block not found for safe target hash: {}", self.safe_target) - })?; - if target_block.slot > safe_target_block.slot { - target_block = self.chain.get(&target_block.parent).ok_or_else(|| { - anyhow!( - "Block not found for target block's parent hash: {}", - target_block.parent - ) - })?; - } - } - - // If the latest finalized slot is very far back, then only some slots are - // valid to justify, make sure the target is one of those - while !is_justifiable_slot(&state.latest_finalized_slot, &target_block.slot) { - target_block = self.chain.get(&target_block.parent).ok_or_else(|| { - anyhow!( - "Block not found for target block's parent hash: {}", - target_block.parent - ) - })?; - } - - let head_block = self - .chain - .get(&self.head) - .ok_or_else(|| anyhow!("Block not found for head: {}", self.head))?; - - let vote = Vote { - validator_id: self.validator_id, - slot: self.get_current_slot()?, - head: self.head, - head_slot: head_block.slot, - target: target_block.tree_hash_root(), - target_slot: target_block.slot, - source: state.latest_justified_hash, - source_slot: state.latest_justified_slot, - }; - - let signed_vote = SignedVote { - data: vote, - signature: PQSignature {}, - }; - - info!( - "voting (Staker {}), head = {}, t = {}, s = {}", - self.validator_id, &head_block.slot, &target_block.slot, &state.latest_justified_slot - ); - - self.receive(QueueItem::VoteItem(signed_vote))?; - - // TODO: submit to actual network - // self.get_network() - // .borrow_mut() - // .submit(QueueItem::VoteItem(vote), self.validator_id); - - Ok(()) - } - - /// Called by the p2p network - fn receive(&mut self, queue_item: QueueItem) -> anyhow::Result<()> { - match queue_item { - QueueItem::BlockItem(block) => { - let block_hash = block.tree_hash_root(); - - // If the block is already known, ignore it - if self.chain.contains_key(&block_hash) { - return Ok(()); - } - - match self.post_states.get(&block.parent) { - Some(parent_state) => { - let state = process_block(parent_state, &block)?; - - for vote in &block.votes { - if !self.known_votes.contains(vote) { - self.known_votes.push(vote.clone()); - } - } - - self.chain.insert(block_hash, block); - self.post_states.insert(block_hash, state); - - self.recompute_head()?; - - // Once we have received a block, also process all of its dependencies - if let Some(queue_items) = self.dependencies.remove(&block_hash) { - for item in queue_items { - self.receive(item)?; - } - } - } - None => { - // If we have not yet seen the block's parent, ignore for now, - // process later once we actually see the parent - self.dependencies - .entry(block.parent) - .or_default() - .push(QueueItem::BlockItem(block)); - } - } - } - QueueItem::VoteItem(vote) => { - let is_known_vote = self.known_votes.contains(&vote.data); - let is_new_vote = self.new_votes.contains(&vote.data); - - if is_known_vote || is_new_vote { - // Do nothing - } else if self.chain.contains_key(&vote.data.head) { - self.new_votes.push(vote.data); - } else { - self.dependencies - .entry(vote.data.head) - .or_default() - .push(QueueItem::VoteItem(vote)); - } - } - } - - Ok(()) - } -} diff --git a/crates/common/consensus/lean/src/lib.rs b/crates/common/consensus/lean/src/lib.rs index e4602feba..9762d0fe0 100644 --- a/crates/common/consensus/lean/src/lib.rs +++ b/crates/common/consensus/lean/src/lib.rs @@ -15,10 +15,16 @@ use crate::{ vote::{SignedVote, Vote}, }; +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub enum VoteItem { + Signed(SignedVote), + Unsigned(Vote), +} + #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub enum QueueItem { BlockItem(Block), - VoteItem(SignedVote), + VoteItem(VoteItem), } /// We allow justification of slots either <= 5 or a perfect square or oblong after @@ -126,7 +132,7 @@ pub fn get_fork_choice_head( let mut root = *provided_root; // Start at genesis by default - if *root == B256::ZERO { + if root == B256::ZERO { root = blocks .iter() .min_by_key(|(_, block)| block.slot) diff --git a/crates/common/consensus/misc/src/constants/lean.rs b/crates/common/consensus/misc/src/constants/lean.rs index d68aabaf2..4a8dcdad2 100644 --- a/crates/common/consensus/misc/src/constants/lean.rs +++ b/crates/common/consensus/misc/src/constants/lean.rs @@ -1,3 +1,6 @@ +/// 3SF-mini divides a slot into 4 intervals. +/// Reference: https://github.com/ethereum/research/blob/d225a6775a9b184b5c1fd6c830cc58a375d9535f/3sf-mini/p2p.py#L77-L98 +pub const INTERVALS_PER_SLOT: u64 = 4; pub const MAX_HISTORICAL_BLOCK_HASHES: u64 = 262144; pub const SLOT_DURATION: u64 = 12; pub const VALIDATOR_REGISTRY_LIMIT: u64 = 4096; diff --git a/crates/common/network_spec/src/networks/lean.rs b/crates/common/network_spec/src/networks/lean.rs index 33d55f353..c8237da34 100644 --- a/crates/common/network_spec/src/networks/lean.rs +++ b/crates/common/network_spec/src/networks/lean.rs @@ -10,6 +10,7 @@ pub static LEAN_NETWORK_SPEC: OnceLock> = OnceLock::new(); pub struct LeanNetworkSpec { pub genesis_time: u64, pub seconds_per_slot: u64, + pub num_validators: u64, } /// MUST be called only once at the start of the application to initialize static diff --git a/crates/common/validator/lean/Cargo.toml b/crates/common/validator/lean/Cargo.toml index 54687680a..51fd51a43 100644 --- a/crates/common/validator/lean/Cargo.toml +++ b/crates/common/validator/lean/Cargo.toml @@ -10,4 +10,11 @@ rust-version.workspace = true version.workspace = true [dependencies] +tokio.workspace = true tracing = { workspace = true, features = ["log"] } + +# ream dependencies +ream-chain-lean.workspace = true +ream-consensus-lean.workspace = true +ream-consensus-misc.workspace = true +ream-network-spec.workspace = true diff --git a/crates/common/validator/lean/src/service.rs b/crates/common/validator/lean/src/service.rs index 587fe690a..3e6681f68 100644 --- a/crates/common/validator/lean/src/service.rs +++ b/crates/common/validator/lean/src/service.rs @@ -1,19 +1,174 @@ -use std::{thread::sleep, time::Duration}; +use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use ream_chain_lean::{ + lean_chain::LeanChain, service::LeanChainServiceMessage, slot::get_current_slot, +}; +use ream_consensus_lean::{QueueItem, VoteItem}; +use ream_consensus_misc::constants::lean::INTERVALS_PER_SLOT; +use ream_network_spec::networks::lean_network_spec; +use tokio::{ + sync::{RwLock, mpsc}, + time::{Instant, MissedTickBehavior, interval_at}, +}; use tracing::info; -pub struct ValidatorService {} +// TODO: We need to replace this after PQC integration. +// For now, we only need ID for keystore. +pub struct LeanKeystore { + id: u64, +} + +/// ValidatorService is responsible for managing validator operations +/// such as proposing blocks and voting on them. This service also holds the keystores +/// for its validators, which are used to sign. +/// +/// Every first tick (t=0) it proposes a block if it's the validator's turn. +/// Every second tick (t=1/4) it votes on the proposed block. +/// +/// NOTE: Other ticks should be handled by the other services, such as [LeanChainService]. +pub struct ValidatorService { + lean_chain: Arc>, + keystores: Vec, + chain_sender: mpsc::UnboundedSender, +} impl ValidatorService { - pub async fn new() -> Self { - ValidatorService {} + pub async fn new( + lean_chain: Arc>, + keystores: Vec, + chain_sender: mpsc::UnboundedSender, + ) -> Self { + // Hack: If no keystores are provided, create a default one. + let keystores = if keystores.is_empty() { + vec![ + LeanKeystore { id: 0 }, + LeanKeystore { id: 1 }, + LeanKeystore { id: 2 }, + LeanKeystore { id: 3 }, + ] // Placeholder for keystores + } else { + keystores + }; + + ValidatorService { + lean_chain, + keystores, + chain_sender, + } } pub async fn start(self) { - info!("Validator Service started"); + // TODO: Duplicate clock logic from LeanChainService. May need to refactor later. + + // Get the Lean network specification. + let network_spec = lean_network_spec(); + let seconds_per_slot = network_spec.seconds_per_slot; + let genesis_time = network_spec.genesis_time; + + info!( + "ValidatorService started with {} validator(s), genesis_time={genesis_time}", + self.keystores.len() + ); + + // Calculate the genesis instant from the genesis time (in seconds). + let genesis_instant = UNIX_EPOCH + Duration::from_secs(genesis_time); + + // Assume genesis time is "always" in the future, + // as we don't support syncing features yet. + let interval_start = Instant::now() + + genesis_instant + .duration_since(SystemTime::now()) + .expect("Genesis time is in the past"); + + let mut tick_count = 0u64; + let mut interval = interval_at( + interval_start, + Duration::from_secs(seconds_per_slot / INTERVALS_PER_SLOT), + ); + interval.set_missed_tick_behavior(MissedTickBehavior::Burst); loop { - sleep(Duration::from_secs(10)); + tokio::select! { + _ = interval.tick() => { + match tick_count % 4 { + 0 => { + // First tick (t=0): Propose a block. + let current_slot = get_current_slot(); + if let Some(keystore) = self.is_proposer() { + info!("Validator {} proposing block for slot {current_slot} (tick {tick_count})", keystore.id); + + // Acquire the write lock. `accept_new_votes` and `build_block` will modify the lean chain. + let mut lean_chain = self.lean_chain.write().await; + + // Accept new votes and modify the lean chain. + lean_chain.accept_new_votes().expect("Failed to accept new votes"); + + // Build a block and propose the block. + let new_block = lean_chain.propose_block().expect("Failed to build block"); + + info!( + "Validator {} built block: slot={}, parent={:?}, votes={}, state_root={:?}", + keystore.id, + new_block.slot, + new_block.parent, + new_block.votes.len(), + new_block.state_root + ); + + // TODO 1: Sign the block with the keystore. + // TODO 2: Send the block to the network. + } else { + let proposer_index = current_slot % lean_network_spec().num_validators; + info!("Not proposer for slot {current_slot} (proposer is validator {proposer_index}), skipping"); + } + } + 1 => { + // Second tick (t=1/4): Vote. + let current_slot = get_current_slot(); + info!("Starting vote phase at slot {current_slot} (tick {tick_count}): {} validator(s) voting", self.keystores.len()); + + // Build the vote from LeanChain, and modify its validator ID + let vote_template = self.lean_chain.read().await.build_vote().expect("Failed to build vote"); + + info!("Built vote template for head {:?} at slot {} with target {:?}", vote_template.head, vote_template.slot, vote_template.target); + + let votes = self.keystores.iter().map(|keystore| { + let mut vote = vote_template.clone(); + vote.validator_id = keystore.id; + vote + }).collect::>(); + + for vote in votes { + self.chain_sender + .send(LeanChainServiceMessage { + item: QueueItem::VoteItem(VoteItem::Unsigned(vote)), + }) + .expect("Failed to send vote to LeanChainService"); + } + + // TODO 1: Sign the votes with the keystore. + // TODO 2: Send the votes to the network. + } + _ => { + // Other ticks (t=2/4, t=3/4): Do nothing. + } + } + tick_count += 1; + } + } } } + + /// Determine if one of the keystores is the proposer for the current slot. + fn is_proposer(&self) -> Option<&LeanKeystore> { + let current_slot = get_current_slot(); + let proposer_index: u64 = current_slot % lean_network_spec().num_validators; + + self.keystores + .iter() + .find(|keystore| keystore.id == proposer_index as u64) + } } diff --git a/crates/networking/p2p/Cargo.toml b/crates/networking/p2p/Cargo.toml index c436c8f4a..5a4a45169 100644 --- a/crates/networking/p2p/Cargo.toml +++ b/crates/networking/p2p/Cargo.toml @@ -38,6 +38,7 @@ tree_hash.workspace = true unsigned-varint = { version = "0.8", features = ["codec"] } # ream dependencies +ream-chain-lean.workspace = true ream-consensus-beacon.workspace = true ream-consensus-misc.workspace = true ream-discv5.workspace = true diff --git a/crates/networking/p2p/src/network/lean.rs b/crates/networking/p2p/src/network/lean.rs index d48dd93c0..15fd4cfb9 100644 --- a/crates/networking/p2p/src/network/lean.rs +++ b/crates/networking/p2p/src/network/lean.rs @@ -1,19 +1,28 @@ +use std::sync::Arc; + +use ream_chain_lean::lean_chain::LeanChain; +use tokio::sync::RwLock; use tracing::info; +/// NetworkService is responsible for the following: +/// 1. Peer management. (We will connect with static peers for PQ devnet.) +/// 2. Gossiping blocks and votes. +/// +/// TBD: It will be best if we reuse the existing NetworkManagerService for the beacon node. pub struct NetworkService { - pub time: u64, + lean_chain: Arc>, } impl NetworkService { - pub async fn new() -> Self { - NetworkService { time: 0 } + pub async fn new(lean_chain: Arc>) -> Self { + NetworkService { lean_chain } } pub async fn start(self) { info!("NetworkService started"); - - loop { - std::thread::sleep(std::time::Duration::from_secs(10)); - } + info!( + "Current LeanChain head: {}", + self.lean_chain.read().await.head + ); } }