diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 53583390fa3..692e85ccce3 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -38,6 +38,7 @@ use crate::light_client_finality_update_verification::{ use crate::light_client_optimistic_update_verification::{ Error as LightClientOptimisticUpdateError, VerifiedLightClientOptimisticUpdate, }; +use crate::lightclient_proofs_cache::{LightclientServerCache, LightclientUpdatesToBroadcast}; use crate::migrate::BackgroundMigrator; use crate::naive_aggregation_pool::{ AggregatedAttestationMap, Error as NaiveAggregationError, NaiveAggregationPool, @@ -339,6 +340,8 @@ struct PartialBeaconBlock { bls_to_execution_changes: Vec, } +pub type LightclientProducerEvent = (Hash256, Slot, SyncAggregate); + pub type BeaconForkChoice = ForkChoice< BeaconForkChoiceStore< ::EthSpec, @@ -418,10 +421,6 @@ pub struct BeaconChain { /// Maintains a record of which validators we've seen BLS to execution changes for. pub(crate) observed_bls_to_execution_changes: Mutex>, - /// The most recently validated light client finality update received on gossip. - pub latest_seen_finality_update: Mutex>>, - /// The most recently validated light client optimistic update received on gossip. - pub latest_seen_optimistic_update: Mutex>>, /// Provides information from the Ethereum 1 (PoW) chain. pub eth1_chain: Option>, /// Interfaces with the execution client. @@ -464,6 +463,11 @@ pub struct BeaconChain { pub block_times_cache: Arc>, /// A cache used to track pre-finalization block roots for quick rejection. pub pre_finalization_block_cache: PreFinalizationBlockCache, + /// A cache used to produce lightclient server messages + pub lightclient_server_cache: LightclientServerCache, + /// Sender to signal the lightclient server to produce new updates + pub lightclient_server_tx: + Option>>, /// Sender given to tasks, so that if they encounter a state in which execution cannot /// continue they can request that everything shuts down. pub shutdown_sender: Sender, @@ -1295,6 +1299,19 @@ impl BeaconChain { self.state_at_slot(load_slot, StateSkipConfig::WithoutStateRoots) } + pub fn recompute_and_cache_lightclient_updates( + &self, + (parent_root, slot, sync_aggregate): LightclientProducerEvent, + ) -> Result, Error> { + self.lightclient_server_cache.recompute_and_cache_updates( + &self.log, + self.store.clone(), + &parent_root, + slot, + &sync_aggregate, + ) + } + /// Returns the current heads of the `BeaconChain`. For the canonical head, see `Self::head`. /// /// Returns `(block_root, block_slot)`. @@ -3433,6 +3450,18 @@ impl BeaconChain { }; let current_finalized_checkpoint = state.finalized_checkpoint(); + // compute state proofs for light client updates before inserting the state into the + // snapshot cache. + self.lightclient_server_cache + .cache_state_data( + &self.spec, block, block_root, + // mutable reference on the state is needed to compute merkle proofs + &mut state, + ) + .unwrap_or_else(|e| { + error!(self.log, "error caching lightclient data {:?}", e); + }); + self.snapshot_cache .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) .ok_or(Error::SnapshotCacheLockTimeout) @@ -3803,6 +3832,26 @@ impl BeaconChain { })); } } + + // Do not trigger lightclient server update producer for old blocks, to extra work + // during sync. + if block_delay_total < self.slot_clock.slot_duration() * 32 { + if let Some(lightclient_server_tx) = self.lightclient_server_tx.clone() { + if let Ok(sync_aggregate) = block.body().sync_aggregate() { + if let Err(e) = lightclient_server_tx.try_send(( + block.parent_root(), + block.slot(), + sync_aggregate.clone(), + )) { + warn!( + self.log, + "Failed to send lightclient server event"; + "error" => ?e + ); + } + } + } + } } // For the current and next epoch of this state, ensure we have the shuffling from this diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index fbd255126ee..0ce11bddf53 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,4 +1,6 @@ -use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY}; +use crate::beacon_chain::{ + CanonicalHead, LightclientProducerEvent, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY, +}; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::data_availability_checker::DataAvailabilityChecker; use crate::eth1_chain::{CachingEth1Backend, SszEth1}; @@ -6,6 +8,7 @@ use crate::eth1_finalization_cache::Eth1FinalizationCache; use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; use crate::head_tracker::HeadTracker; +use crate::lightclient_proofs_cache::LightclientServerCache; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; @@ -87,6 +90,7 @@ pub struct BeaconChainBuilder { event_handler: Option>, slot_clock: Option, shutdown_sender: Option>, + lightclient_server_tx: Option>>, head_tracker: Option, validator_pubkey_cache: Option>, spec: ChainSpec, @@ -129,6 +133,7 @@ where event_handler: None, slot_clock: None, shutdown_sender: None, + lightclient_server_tx: None, head_tracker: None, validator_pubkey_cache: None, spec: TEthSpec::default_spec(), @@ -603,6 +608,15 @@ where self } + /// Sets a `Sender` to allow the beacon chain to trigger lightclient update production. + pub fn lightclient_server_tx( + mut self, + sender: tokio::sync::mpsc::Sender>, + ) -> Self { + self.lightclient_server_tx = Some(sender); + self + } + /// Creates a new, empty operation pool. fn empty_op_pool(mut self) -> Self { self.op_pool = Some(OperationPool::new()); @@ -883,8 +897,6 @@ where observed_proposer_slashings: <_>::default(), observed_attester_slashings: <_>::default(), observed_bls_to_execution_changes: <_>::default(), - latest_seen_finality_update: <_>::default(), - latest_seen_optimistic_update: <_>::default(), eth1_chain: self.eth1_chain, execution_layer: self.execution_layer, genesis_validators_root, @@ -912,6 +924,8 @@ where validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), attester_cache: <_>::default(), early_attester_cache: <_>::default(), + lightclient_server_cache: LightclientServerCache::new(), + lightclient_server_tx: self.lightclient_server_tx, shutdown_sender: self .shutdown_sender .ok_or("Cannot build without a shutdown sender.")?, diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index e2d37078ac5..23056c84ca3 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -30,6 +30,7 @@ pub mod historical_blocks; pub mod kzg_utils; pub mod light_client_finality_update_verification; pub mod light_client_optimistic_update_verification; +mod lightclient_proofs_cache; pub mod merge_readiness; pub mod metrics; pub mod migrate; @@ -58,8 +59,8 @@ pub mod validator_pubkey_cache; pub use self::beacon_chain::{ AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconBlockResponse, BeaconBlockResponseType, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult, - ForkChoiceError, OverrideForkchoiceUpdate, ProduceBlockVerification, StateSkipConfig, - WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, + ForkChoiceError, LightclientProducerEvent, OverrideForkchoiceUpdate, ProduceBlockVerification, + StateSkipConfig, WhenSlotSkipped, INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON, INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON, }; pub use self::beacon_snapshot::BeaconSnapshot; diff --git a/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs b/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs index 791d63ccfe5..6beeb06ca3c 100644 --- a/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs +++ b/beacon_node/beacon_chain/src/light_client_finality_update_verification.rs @@ -3,9 +3,7 @@ use derivative::Derivative; use slot_clock::SlotClock; use std::time::Duration; use strum::AsRefStr; -use types::{ - light_client_update::Error as LightClientUpdateError, LightClientFinalityUpdate, Slot, -}; +use types::{light_client_update::Error as LightClientUpdateError, LightClientFinalityUpdate}; /// Returned when a light client finality update was not successfully verified. It might not have been verified for /// two reasons: @@ -63,42 +61,28 @@ impl VerifiedLightClientFinalityUpdate { /// Returns `Ok(Self)` if the `light_client_finality_update` is valid to be (re)published on the gossip /// network. pub fn verify( - light_client_finality_update: LightClientFinalityUpdate, + rcv_finality_update: LightClientFinalityUpdate, chain: &BeaconChain, seen_timestamp: Duration, ) -> Result { - let gossiped_finality_slot = light_client_finality_update.finalized_header.beacon.slot; - let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0); - let signature_slot = light_client_finality_update.signature_slot; - let start_time = chain.slot_clock.start_of(signature_slot); - let mut latest_seen_finality_update = chain.latest_seen_finality_update.lock(); - - let head = chain.canonical_head.cached_head(); - let head_block = &head.snapshot.beacon_block; - let attested_block_root = head_block.message().parent_root(); - let attested_block = chain - .get_blinded_block(&attested_block_root)? - .ok_or(Error::FailedConstructingUpdate)?; - let mut attested_state = chain - .get_state(&attested_block.state_root(), Some(attested_block.slot()))? - .ok_or(Error::FailedConstructingUpdate)?; - - let finalized_block_root = attested_state.finalized_checkpoint().root; - let finalized_block = chain - .get_blinded_block(&finalized_block_root)? + let latest_finality_update = chain + .lightclient_server_cache + .get_latest_finality_update() .ok_or(Error::FailedConstructingUpdate)?; - let latest_seen_finality_update_slot = match latest_seen_finality_update.as_ref() { - Some(update) => update.finalized_header.beacon.slot, - None => Slot::new(0), - }; // verify that no other finality_update with a lower or equal // finalized_header.slot was already forwarded on the network - if gossiped_finality_slot <= latest_seen_finality_update_slot { + if rcv_finality_update.finalized_header.beacon.slot + <= latest_finality_update.finalized_header.beacon.slot + { return Err(Error::FinalityUpdateAlreadySeen); } // verify that enough time has passed for the block to have been propagated + let start_time = chain + .slot_clock + .start_of(rcv_finality_update.signature_slot); + let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0); match start_time { Some(time) => { if seen_timestamp + chain.spec.maximum_gossip_clock_disparity() @@ -110,24 +94,13 @@ impl VerifiedLightClientFinalityUpdate { None => return Err(Error::SigSlotStartIsNone), } - let head_state = &head.snapshot.beacon_state; - let finality_update = LightClientFinalityUpdate::new( - &chain.spec, - head_state, - head_block, - &mut attested_state, - &finalized_block, - )?; - // verify that the gossiped finality update is the same as the locally constructed one. - if finality_update != light_client_finality_update { + if latest_finality_update != rcv_finality_update { return Err(Error::InvalidLightClientFinalityUpdate); } - *latest_seen_finality_update = Some(light_client_finality_update.clone()); - Ok(Self { - light_client_finality_update, + light_client_finality_update: rcv_finality_update, seen_timestamp, }) } diff --git a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs index 374cc9a7753..7d5e96d008b 100644 --- a/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs +++ b/beacon_node/beacon_chain/src/light_client_optimistic_update_verification.rs @@ -4,9 +4,7 @@ use eth2::types::Hash256; use slot_clock::SlotClock; use std::time::Duration; use strum::AsRefStr; -use types::{ - light_client_update::Error as LightClientUpdateError, LightClientOptimisticUpdate, Slot, -}; +use types::{light_client_update::Error as LightClientUpdateError, LightClientOptimisticUpdate}; /// Returned when a light client optimistic update was not successfully verified. It might not have been verified for /// two reasons: @@ -67,38 +65,15 @@ impl VerifiedLightClientOptimisticUpdate { /// Returns `Ok(Self)` if the `light_client_optimistic_update` is valid to be (re)published on the gossip /// network. pub fn verify( - light_client_optimistic_update: LightClientOptimisticUpdate, + rcv_optimistic_update: LightClientOptimisticUpdate, chain: &BeaconChain, seen_timestamp: Duration, ) -> Result { - let gossiped_optimistic_slot = light_client_optimistic_update.attested_header.beacon.slot; - let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0); - let signature_slot = light_client_optimistic_update.signature_slot; - let start_time = chain.slot_clock.start_of(signature_slot); - let mut latest_seen_optimistic_update = chain.latest_seen_optimistic_update.lock(); - - let head = chain.canonical_head.cached_head(); - let head_block = &head.snapshot.beacon_block; - let attested_block_root = head_block.message().parent_root(); - let attested_block = chain - .get_blinded_block(&attested_block_root)? - .ok_or(Error::FailedConstructingUpdate)?; - - let attested_state = chain - .get_state(&attested_block.state_root(), Some(attested_block.slot()))? - .ok_or(Error::FailedConstructingUpdate)?; - let latest_seen_optimistic_update_slot = match latest_seen_optimistic_update.as_ref() { - Some(update) => update.attested_header.beacon.slot, - None => Slot::new(0), - }; - - // verify that no other optimistic_update with a lower or equal - // optimistic_header.slot was already forwarded on the network - if gossiped_optimistic_slot <= latest_seen_optimistic_update_slot { - return Err(Error::OptimisticUpdateAlreadySeen); - } - // verify that enough time has passed for the block to have been propagated + let start_time = chain + .slot_clock + .start_of(rcv_optimistic_update.signature_slot); + let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0); match start_time { Some(time) => { if seen_timestamp + chain.spec.maximum_gossip_clock_disparity() @@ -110,30 +85,21 @@ impl VerifiedLightClientOptimisticUpdate { None => return Err(Error::SigSlotStartIsNone), } - // check if we can process the optimistic update immediately - // otherwise queue - let canonical_root = light_client_optimistic_update - .attested_header - .beacon - .canonical_root(); - - if canonical_root != head_block.message().parent_root() { - return Err(Error::UnknownBlockParentRoot(canonical_root)); - } - - let optimistic_update = - LightClientOptimisticUpdate::new(&chain.spec, head_block, &attested_state)?; + let latest_optimistic_update = chain + .lightclient_server_cache + .get_latest_optimistic_update() + .ok_or(Error::FailedConstructingUpdate)?; // verify that the gossiped optimistic update is the same as the locally constructed one. - if optimistic_update != light_client_optimistic_update { + if latest_optimistic_update != rcv_optimistic_update { return Err(Error::InvalidLightClientOptimisticUpdate); } - *latest_seen_optimistic_update = Some(light_client_optimistic_update.clone()); - + let parent_root = rcv_optimistic_update.attested_header.beacon.parent_root; Ok(Self { - light_client_optimistic_update, - parent_root: canonical_root, + light_client_optimistic_update: rcv_optimistic_update, + // TODO: why is the parent_root necessary here? + parent_root, seen_timestamp, }) } diff --git a/beacon_node/beacon_chain/src/lightclient_proofs_cache.rs b/beacon_node/beacon_chain/src/lightclient_proofs_cache.rs new file mode 100644 index 00000000000..8f62fb4908e --- /dev/null +++ b/beacon_node/beacon_chain/src/lightclient_proofs_cache.rs @@ -0,0 +1,256 @@ +use crate::errors::BeaconChainError; +use crate::{BeaconChainTypes, BeaconStore}; +use parking_lot::{Mutex, RwLock}; +use slog::{debug, Logger}; +use ssz_types::FixedVector; +use types::light_client_update::{FinalizedRootProofLen, FINALIZED_ROOT_INDEX}; +use types::{ + BeaconBlockRef, BeaconState, ChainSpec, EthSpec, ForkName, Hash256, LightClientFinalityUpdate, + LightClientHeader, LightClientOptimisticUpdate, Slot, SyncAggregate, +}; + +/// A prev block cache miss requires to re-generate the state of the post-parent block. Items in the +/// prev block cache are very small 32 * (6 + 1) = 224 bytes. 32 is an arbitrary number that +/// represents unlikely re-orgs, while keeping the cache very small. +const PREV_BLOCK_CACHE_SIZE: usize = 32; + +pub type LightclientUpdatesToBroadcast = ( + Option>, + Option>, +); + +/// This cache computes light client messages ahead of time, required to satisfy p2p and API +/// requests. These messages include proofs on historical states, so on-demand computation is +/// expensive. +/// +pub struct LightclientServerCache { + /// Tracks a single global latest finality update out of all imported blocks. + /// + /// TODO: Active discussion with @etan-status if this cache should be fork aware to return + /// latest canonical instead of global latest. + latest_finality_update: RwLock>>, + /// Tracks a single global latest optimistic update out of all imported blocks. + latest_optimistic_update: RwLock>>, + /// Caches state proofs by block root + prev_block_cache: Mutex>, +} + +impl LightclientServerCache { + pub fn new() -> Self { + Self { + latest_finality_update: None.into(), + latest_optimistic_update: None.into(), + prev_block_cache: lru::LruCache::new(PREV_BLOCK_CACHE_SIZE).into(), + } + } + + /// Compute and cache state proofs for latter production of light-client messages. Does not + /// trigger block replay. May result in multiple DB write ops. + /// TODO: Should return StoreOps to batch with rest of db operations? + pub fn cache_state_data( + &self, + spec: &ChainSpec, + block: BeaconBlockRef, + block_root: Hash256, + block_post_state: &mut BeaconState, + ) -> Result<(), BeaconChainError> { + // Only post-altair + if spec.fork_name_at_slot::(block.slot()) == ForkName::Base { + return Ok(()); + } + + // Persist in memory cache for a descendent block + + let cached_data = LightclientCachedData::from_state(block_post_state)?; + self.prev_block_cache.lock().put(block_root, cached_data); + + Ok(()) + } + + /// Given a block with a SyncAggregte computes better or more recent light client updates. The + /// results are cached either on disk or memory to be served via p2p and rest API + pub fn recompute_and_cache_updates( + &self, + log: &Logger, + store: BeaconStore, + block_parent_root: &Hash256, + block_slot: Slot, + sync_aggregate: &SyncAggregate, + ) -> Result, BeaconChainError> { + let signature_slot = block_slot; + let attested_block_root = block_parent_root; + + let attested_block = store.get_blinded_block(attested_block_root)?.ok_or( + BeaconChainError::DBInconsistent(format!( + "Block not available {:?}", + attested_block_root + )), + )?; + + let cached_parts = self.get_or_compute_prev_block_cache( + store.clone(), + attested_block_root, + &attested_block.state_root(), + attested_block.slot(), + )?; + + let attested_slot = attested_block.slot(); + let mut new_optimistic_update = None; + let mut new_finality_update = None; + + // Spec: Full nodes SHOULD provide the LightClientOptimisticUpdate with the highest + // attested_header.beacon.slot (if multiple, highest signature_slot) as selected by fork choice + let is_latest_optimistic = match &self.latest_optimistic_update.read().clone() { + Some(latest_optimistic_update) => { + is_latest_optimistic_update(latest_optimistic_update, attested_slot, signature_slot) + } + None => true, + }; + if is_latest_optimistic { + // can create an optimistic update, that is more recent + new_optimistic_update = Some(LightClientOptimisticUpdate { + attested_header: block_to_light_client_header(attested_block.message()), + sync_aggregate: sync_aggregate.clone(), + signature_slot, + }); + *self.latest_optimistic_update.write() = new_optimistic_update.clone(); + }; + + // Spec: Full nodes SHOULD provide the LightClientFinalityUpdate with the highest + // attested_header.beacon.slot (if multiple, highest signature_slot) as selected by fork choice + let is_latest_finality = match &self.latest_finality_update.read().clone() { + Some(latest_finality_update) => { + is_latest_finality_update(latest_finality_update, attested_slot, signature_slot) + } + None => true, + }; + if is_latest_finality & !cached_parts.finalized_block_root.is_zero() { + // Immediately after checkpoint sync the finalized block may not be available yet. + if let Some(finalized_block) = + store.get_blinded_block(&cached_parts.finalized_block_root)? + { + new_finality_update = Some(LightClientFinalityUpdate { + // TODO: may want to cache this result from latest_optimistic_update if producing a + // lightclient header becomes expensive + attested_header: block_to_light_client_header(attested_block.message()), + finalized_header: block_to_light_client_header(finalized_block.message()), + finality_branch: cached_parts.finality_branch.clone(), + sync_aggregate: sync_aggregate.clone(), + signature_slot, + }); + *self.latest_finality_update.write() = new_finality_update.clone(); + } else { + debug!( + log, + "Finalized block not available in store for lightclient server"; + "finalized_block_root" => format!("{}", cached_parts.finalized_block_root), + ); + } + } + + Ok((new_optimistic_update, new_finality_update)) + } + + /// Retrieves prev block cached data from cache. If not present re-computes by retrieving the + /// parent state, and inserts an entry to the cache. + /// + /// In separate function since FnOnce of get_or_insert can not be fallible. + fn get_or_compute_prev_block_cache( + &self, + store: BeaconStore, + block_root: &Hash256, + block_state_root: &Hash256, + block_slot: Slot, + ) -> Result { + // Attempt to get the value from the cache first. + if let Some(cached_parts) = self.prev_block_cache.lock().get(block_root) { + return Ok(cached_parts.clone()); + } + + // Compute the value, handling potential errors. + let mut state = store + .get_state(block_state_root, Some(block_slot))? + .ok_or_else(|| { + BeaconChainError::DBInconsistent(format!("Missing state {:?}", block_state_root)) + })?; + let new_value = LightclientCachedData::from_state(&mut state)?; + + // Insert value and return owned + self.prev_block_cache + .lock() + .put(*block_root, new_value.clone()); + Ok(new_value) + } + + pub fn get_latest_finality_update(&self) -> Option> { + self.latest_finality_update.read().clone() + } + + pub fn get_latest_optimistic_update(&self) -> Option> { + self.latest_optimistic_update.read().clone() + } +} + +impl Default for LightclientServerCache { + fn default() -> Self { + Self::new() + } +} + +type FinalityBranch = FixedVector; + +#[derive(Clone)] +struct LightclientCachedData { + finality_branch: FinalityBranch, + finalized_block_root: Hash256, +} + +impl LightclientCachedData { + fn from_state(state: &mut BeaconState) -> Result { + Ok(Self { + finality_branch: state.compute_merkle_proof(FINALIZED_ROOT_INDEX)?.into(), + finalized_block_root: state.finalized_checkpoint().root, + }) + } +} + +// Implements spec priorization rules: +// > Full nodes SHOULD provide the LightClientFinalityUpdate with the highest attested_header.beacon.slot (if multiple, highest signature_slot) +// +// ref: https://github.com/ethereum/consensus-specs/blob/113c58f9bf9c08867f6f5f633c4d98e0364d612a/specs/altair/light-client/full-node.md#create_light_client_finality_update +fn is_latest_finality_update( + prev: &LightClientFinalityUpdate, + attested_slot: Slot, + signature_slot: Slot, +) -> bool { + if attested_slot > prev.attested_header.beacon.slot { + true + } else { + attested_slot == prev.attested_header.beacon.slot && signature_slot > prev.signature_slot + } +} + +// Implements spec priorization rules: +// > Full nodes SHOULD provide the LightClientOptimisticUpdate with the highest attested_header.beacon.slot (if multiple, highest signature_slot) +// +// ref: https://github.com/ethereum/consensus-specs/blob/113c58f9bf9c08867f6f5f633c4d98e0364d612a/specs/altair/light-client/full-node.md#create_light_client_optimistic_update +fn is_latest_optimistic_update( + prev: &LightClientOptimisticUpdate, + attested_slot: Slot, + signature_slot: Slot, +) -> bool { + if attested_slot > prev.attested_header.beacon.slot { + true + } else { + attested_slot == prev.attested_header.beacon.slot && signature_slot > prev.signature_slot + } +} + +fn block_to_light_client_header( + block: BeaconBlockRef>, +) -> LightClientHeader { + // TODO: make fork aware + LightClientHeader { + beacon: block.block_header(), + } +} diff --git a/beacon_node/client/src/broadcast_lightclient_updates.rs b/beacon_node/client/src/broadcast_lightclient_updates.rs new file mode 100644 index 00000000000..1f4856346b7 --- /dev/null +++ b/beacon_node/client/src/broadcast_lightclient_updates.rs @@ -0,0 +1,99 @@ +use beacon_chain::{BeaconChain, BeaconChainTypes, LightclientProducerEvent}; +use eth2::types::EventKind; +use network::NetworkMessage; +use slog::{debug, error, Logger}; +use tokio::sync::mpsc::{Receiver, UnboundedSender}; + +// Each LightclientProducerEvent is ~200 bytes. With the lightclient server producing only recent +// updates it is okay to drop some events in case of overloading. In normal network conditions +// there's one event emitted per block at most every 12 seconds, while consuming the event should +// take a few miliseconds. 32 is a small enough arbitrary number. +pub(crate) const LIGHTCLIENT_SERVER_CHANNEL_CAPACITY: usize = 32; + +pub async fn compute_lightclient_updates( + chain: &BeaconChain, + mut lightclient_server_rv: Receiver>, + log: &Logger, + network_send: Option>>, +) { + // lightclient_server_rv is Some if lightclient flag is enabled + // + // Should only receive events for recent blocks, import_block filters by blocks close to clock. + // + // Intents to process SyncAggregates of all recent blocks sequentially, without skipping. + // Uses a bounded receiver, so may drop some SyncAggregates if very overloaded. This is okay + // since only the most recent updates have value. + while let Some((block_root, slot, sync_aggregate)) = lightclient_server_rv.recv().await { + let (optimistic_update_to_broadcast, finality_update_to_broadcast) = match chain + .recompute_and_cache_lightclient_updates((block_root, slot, sync_aggregate)) + { + Ok(value) => value, + Err(e) => { + error!(log, "error computing lightclient updates {:?}", e); + continue; + } + }; + + if let Some(event_handler) = chain.event_handler.as_ref() { + if let Some(optimistic_update) = optimistic_update_to_broadcast.clone() { + event_handler.register(EventKind::LightClientOptimisticUpdate(Box::new( + optimistic_update, + ))); + } + if let Some(finality_update) = finality_update_to_broadcast.clone() { + event_handler.register(EventKind::LightClientFinalityUpdate(Box::new( + finality_update, + ))); + } + } + + // network_send is Some when the network is enabled + if let Some(ref network_send) = network_send { + if let Some(optimistic_update) = optimistic_update_to_broadcast { + let signature_slot = optimistic_update.signature_slot; + let pubsub_message = lighthouse_network::PubsubMessage::LightClientOptimisticUpdate( + Box::new(optimistic_update), + ); + if let Err(e) = network_send.send(NetworkMessage::Publish { + messages: vec![pubsub_message], + }) { + debug!( + log, + "Failed to publish PubsubMessage::LightClientOptimisticUpdate message"; + "slot" => signature_slot, + "error" => ?e, + ); + } else { + debug!( + log, + "Published PubsubMessage::LightClientOptimisticUpdate"; + "slot" => signature_slot, + ); + } + } + + if let Some(finality_update) = finality_update_to_broadcast { + let signature_slot = finality_update.signature_slot; + let pubsub_message = lighthouse_network::PubsubMessage::LightClientFinalityUpdate( + Box::new(finality_update), + ); + if let Err(e) = network_send.send(NetworkMessage::Publish { + messages: vec![pubsub_message], + }) { + debug!( + log, + "Failed to publish PubsubMessage::LightClientFinalityUpdate message"; + "slot" => signature_slot, + "error" => ?e, + ); + } else { + debug!( + log, + "Published PubsubMessage::LightClientFinalityUpdate"; + "slot" => signature_slot, + ); + } + } + } + } +} diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index cedf347b9a8..50a8f3b73f0 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -1,4 +1,7 @@ use crate::address_change_broadcast::broadcast_address_changes_at_capella; +use crate::broadcast_lightclient_updates::{ + compute_lightclient_updates, LIGHTCLIENT_SERVER_CHANNEL_CAPACITY, +}; use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::notifier::spawn_notifier; use crate::Client; @@ -6,6 +9,7 @@ use beacon_chain::data_availability_checker::start_availability_cache_maintenanc use beacon_chain::otb_verification_service::start_otb_verification_service; use beacon_chain::proposer_prep_service::start_proposer_prep_service; use beacon_chain::schema_change::migrate_schema; +use beacon_chain::LightclientProducerEvent; use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::{CachingEth1Backend, Eth1Chain}, @@ -35,6 +39,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use timer::spawn_timer; +use tokio::sync::mpsc::Receiver; use tokio::sync::oneshot; use types::{ test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec, @@ -76,6 +81,7 @@ pub struct ClientBuilder { slasher: Option>>, beacon_processor_config: Option, beacon_processor_channels: Option>, + lightclient_server_rv: Option>>, eth_spec_instance: T::EthSpec, } @@ -111,6 +117,7 @@ where eth_spec_instance, beacon_processor_config: None, beacon_processor_channels: None, + lightclient_server_rv: None, } } @@ -199,6 +206,16 @@ where builder }; + let builder = if config.network.enable_light_client_server { + let (tx, rv) = tokio::sync::mpsc::channel::>( + LIGHTCLIENT_SERVER_CHANNEL_CAPACITY, + ); + self.lightclient_server_rv = Some(rv); + builder.lightclient_server_tx(tx) + } else { + builder + }; + let chain_exists = builder.store_contains_beacon_chain().unwrap_or(false); // If the client is expect to resume but there's no beacon chain in the database, @@ -816,7 +833,7 @@ where } // Spawn a service to publish BLS to execution changes at the Capella fork. - if let Some(network_senders) = self.network_senders { + if let Some(network_senders) = self.network_senders.clone() { let inner_chain = beacon_chain.clone(); let broadcast_context = runtime_context.service_context("addr_bcast".to_string()); @@ -833,6 +850,28 @@ where "addr_broadcast", ); } + + // Spawn service to publish lightclient updates at some interval into the slot + if let Some(lightclient_server_rv) = self.lightclient_server_rv { + let inner_chain = beacon_chain.clone(); + let broadcast_context = + runtime_context.service_context("lcserv_bcast".to_string()); + let log = broadcast_context.log().clone(); + let network_senders = self.network_senders.clone(); + broadcast_context.executor.spawn( + async move { + compute_lightclient_updates( + &inner_chain, + lightclient_server_rv, + &log, + network_senders + .map(|network_senders| network_senders.network_send()), + ) + .await + }, + "lcserv_broadcast", + ); + } } start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone()); diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 399aa06511e..55f9b1dda63 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -1,6 +1,7 @@ extern crate slog; mod address_change_broadcast; +mod broadcast_lightclient_updates; pub mod config; mod metrics; mod notifier; diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 5b00a80bdf0..7a5a302389b 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -2471,9 +2471,8 @@ pub fn serve( accept_header: Option| { task_spawner.blocking_response_task(Priority::P1, move || { let update = chain - .latest_seen_optimistic_update - .lock() - .clone() + .lightclient_server_cache + .get_latest_optimistic_update() .ok_or_else(|| { warp_utils::reject::custom_not_found( "No LightClientOptimisticUpdate is available".to_string(), @@ -2518,9 +2517,8 @@ pub fn serve( accept_header: Option| { task_spawner.blocking_response_task(Priority::P1, move || { let update = chain - .latest_seen_finality_update - .lock() - .clone() + .lightclient_server_cache + .get_latest_finality_update() .ok_or_else(|| { warp_utils::reject::custom_not_found( "No LightClientFinalityUpdate is available".to_string(), diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index d5fa50ba219..24e57eb8424 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1677,7 +1677,10 @@ impl ApiTester { Err(e) => panic!("query failed incorrectly: {e:?}"), }; - let expected = self.chain.latest_seen_optimistic_update.lock().clone(); + let expected = self + .chain + .lightclient_server_cache + .get_latest_optimistic_update(); assert_eq!(result, expected); self @@ -1693,7 +1696,10 @@ impl ApiTester { Err(e) => panic!("query failed incorrectly: {e:?}"), }; - let expected = self.chain.latest_seen_finality_update.lock().clone(); + let expected = self + .chain + .lightclient_server_cache + .get_latest_finality_update(); assert_eq!(result, expected); self