Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Separate execution payloads in the DB #3157

Closed
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
407 changes: 364 additions & 43 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ strum = { version = "0.24.0", features = ["derive"] }
logging = { path = "../../common/logging" }
execution_layer = { path = "../execution_layer" }
sensitive_url = { path = "../../common/sensitive_url" }
superstruct = "0.4.1"
superstruct = "0.5.0"
hex = "0.4.2"

[[test]]
name = "beacon_chain_tests"
Expand Down
106 changes: 86 additions & 20 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,11 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator};
use store::{Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp};
use store::{
DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
};
use task_executor::ShutdownReason;
use tree_hash::TreeHash;
use types::beacon_state::CloneConfig;
use types::*;

Expand Down Expand Up @@ -587,7 +590,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
let block = self
.get_block(&block_root)?
.get_blinded_block(&block_root)?
.ok_or(Error::MissingBeaconBlock(block_root))?;
let state = self
.get_state(&block.state_root(), Some(block.slot()))?
Expand Down Expand Up @@ -752,11 +755,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
request_slot: Slot,
skips: WhenSlotSkipped,
) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> {
) -> Result<Option<SignedBlindedBeaconBlock<T::EthSpec>>, Error> {
let root = self.block_root_at_slot(request_slot, skips)?;

if let Some(block_root) = root {
Ok(self.store.get_block(&block_root)?)
Ok(self.store.get_blinded_block(&block_root)?)
} else {
Ok(None)
}
Expand Down Expand Up @@ -961,28 +964,84 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// ## Errors
///
/// May return a database error.
pub fn get_block_checking_early_attester_cache(
pub async fn get_block_checking_early_attester_cache(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> {
let block_opt = self
.store
.get_block(block_root)?
.or_else(|| self.early_attester_cache.get_block(*block_root));

Ok(block_opt)
if let Some(block) = self.early_attester_cache.get_block(*block_root) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice optimization!

return Ok(Some(block));
}
self.get_block(block_root).await
}

/// Returns the block at the given root, if any.
///
/// ## Errors
///
/// May return a database error.
pub fn get_block(
pub async fn get_block(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> {
Ok(self.store.get_block(block_root)?)
// Load block from database, returning immediately if we have the full block w payload
// stored.
let blinded_block = match self.store.try_get_full_block(block_root)? {
Some(DatabaseBlock::Full(block)) => return Ok(Some(block)),
Some(DatabaseBlock::Blinded(block)) => block,
None => return Ok(None),
};

// If we only have a blinded block, load the execution payload from the EL.
let block_message = blinded_block.message();
let execution_payload_header = &block_message
.execution_payload()
.map_err(|_| Error::BlockVariantLacksExecutionPayload(*block_root))?
.execution_payload_header;

let exec_block_hash = execution_payload_header.block_hash;

let execution_payload = self
.execution_layer
.as_ref()
.ok_or(Error::ExecutionLayerMissing)?
.get_payload_by_block_hash(exec_block_hash)
.await
.map_err(|e| Error::ExecutionLayerErrorPayloadReconstruction(exec_block_hash, e))?
.ok_or(Error::BlockHashMissingFromExecutionLayer(exec_block_hash))?;

// Verify payload integrity.
let header_from_payload = ExecutionPayloadHeader::from(&execution_payload);
if header_from_payload != *execution_payload_header {
for txn in &execution_payload.transactions {
debug!(
self.log,
"Reconstructed txn";
"bytes" => format!("0x{}", hex::encode(&**txn)),
);
}

return Err(Error::InconsistentPayloadReconstructed {
slot: blinded_block.slot(),
exec_block_hash,
canonical_payload_root: execution_payload_header.tree_hash_root(),
reconstructed_payload_root: header_from_payload.tree_hash_root(),
canonical_transactions_root: execution_payload_header.transactions_root,
reconstructed_transactions_root: header_from_payload.transactions_root,
});
}

// Add the payload to the block to form a full block.
blinded_block
.try_into_full_block(Some(execution_payload))
.ok_or(Error::AddPayloadLogicError)
.map(Some)
}

pub fn get_blinded_block(
&self,
block_root: &Hash256,
) -> Result<Option<SignedBlindedBeaconBlock<T::EthSpec>>, Error> {
Ok(self.store.get_blinded_block(block_root)?)
}

/// Returns the state at the given root, if any.
Expand Down Expand Up @@ -3373,7 +3432,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map::<Result<_, Error>, _>(Ok)
.unwrap_or_else(|| {
let beacon_block = self
.get_block(&beacon_block_root)?
.store
.get_full_block(&beacon_block_root)?
.ok_or(Error::MissingBeaconBlock(beacon_block_root))?;

let beacon_state_root = beacon_block.state_root();
Expand Down Expand Up @@ -4525,11 +4585,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// This could be a very expensive operation and should only be done in testing/analysis
/// activities.
pub fn chain_dump(&self) -> Result<Vec<BeaconSnapshot<T::EthSpec>>, Error> {
#[allow(clippy::type_complexity)]
pub fn chain_dump(
&self,
) -> Result<Vec<BeaconSnapshot<T::EthSpec, BlindedPayload<T::EthSpec>>>, Error> {
let mut dump = vec![];

let mut last_slot = BeaconSnapshot {
beacon_block: self.head()?.beacon_block,
beacon_block: self.head()?.beacon_block.into(),
beacon_block_root: self.head()?.beacon_block_root,
beacon_state: self.head()?.beacon_state,
};
Expand All @@ -4543,9 +4606,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
break; // Genesis has been reached.
}

let beacon_block = self.store.get_block(&beacon_block_root)?.ok_or_else(|| {
Error::DBInconsistent(format!("Missing block {}", beacon_block_root))
})?;
let beacon_block = self
.store
.get_blinded_block(&beacon_block_root)?
.ok_or_else(|| {
Error::DBInconsistent(format!("Missing block {}", beacon_block_root))
})?;
let beacon_state_root = beacon_block.state_root();
let beacon_state = self
.store
Expand Down Expand Up @@ -4630,7 +4696,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
visited.insert(block_hash);

if signed_beacon_block.slot() % T::EthSpec::slots_per_epoch() == 0 {
let block = self.get_block(&block_hash).unwrap().unwrap();
let block = self.get_blinded_block(&block_hash).unwrap().unwrap();
let state = self
.get_state(&block.state_root(), Some(block.slot()))
.unwrap()
Expand Down
9 changes: 5 additions & 4 deletions beacon_node/beacon_chain/src/beacon_fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use std::sync::Arc;
use store::{Error as StoreError, HotColdDB, ItemStore};
use superstruct::superstruct;
use types::{
BeaconBlock, BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, Hash256, Slot,
BeaconBlock, BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, ExecPayload, Hash256,
Slot,
};

#[derive(Debug)]
Expand Down Expand Up @@ -254,9 +255,9 @@ where
self.time = slot
}

fn on_verified_block(
fn on_verified_block<Payload: ExecPayload<E>>(
&mut self,
_block: &BeaconBlock<E>,
_block: &BeaconBlock<E, Payload>,
block_root: Hash256,
state: &BeaconState<E>,
) -> Result<(), Self::Error> {
Expand Down Expand Up @@ -300,7 +301,7 @@ where
metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES);
let justified_block = self
.store
.get_block(&self.justified_checkpoint.root)
.get_blinded_block(&self.justified_checkpoint.root)
.map_err(Error::FailedToReadBlock)?
.ok_or(Error::MissingBlock(self.justified_checkpoint.root))?
.deconstruct()
Expand Down
15 changes: 9 additions & 6 deletions beacon_node/beacon_chain/src/beacon_snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use serde_derive::Serialize;
use types::{beacon_state::CloneConfig, BeaconState, EthSpec, Hash256, SignedBeaconBlock};
use types::{
beacon_state::CloneConfig, BeaconState, EthSpec, ExecPayload, FullPayload, Hash256,
SignedBeaconBlock,
};

/// Represents some block and its associated state. Generally, this will be used for tracking the
/// head, justified head and finalized head.
#[derive(Clone, Serialize, PartialEq, Debug)]
pub struct BeaconSnapshot<E: EthSpec> {
pub beacon_block: SignedBeaconBlock<E>,
pub struct BeaconSnapshot<E: EthSpec, Payload: ExecPayload<E> = FullPayload<E>> {
pub beacon_block: SignedBeaconBlock<E, Payload>,
pub beacon_block_root: Hash256,
pub beacon_state: BeaconState<E>,
}

impl<E: EthSpec> BeaconSnapshot<E> {
impl<E: EthSpec, Payload: ExecPayload<E>> BeaconSnapshot<E, Payload> {
/// Create a new checkpoint.
pub fn new(
beacon_block: SignedBeaconBlock<E>,
beacon_block: SignedBeaconBlock<E, Payload>,
beacon_block_root: Hash256,
beacon_state: BeaconState<E>,
) -> Self {
Expand All @@ -36,7 +39,7 @@ impl<E: EthSpec> BeaconSnapshot<E> {
/// Update all fields of the checkpoint.
pub fn update(
&mut self,
beacon_block: SignedBeaconBlock<E>,
beacon_block: SignedBeaconBlock<E, Payload>,
beacon_block_root: Hash256,
beacon_state: BeaconState<E>,
) {
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/beacon_chain/src/block_reward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::lighthouse::{AttestationRewards, BlockReward, BlockRewardMeta};
use operation_pool::{AttMaxCover, MaxCover};
use state_processing::per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards;
use types::{BeaconBlockRef, BeaconState, EthSpec, Hash256, RelativeEpoch};
use types::{BeaconBlockRef, BeaconState, EthSpec, ExecPayload, Hash256, RelativeEpoch};

impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn compute_block_reward(
pub fn compute_block_reward<Payload: ExecPayload<T::EthSpec>>(
&self,
block: BeaconBlockRef<'_, T::EthSpec>,
block: BeaconBlockRef<'_, T::EthSpec, Payload>,
block_root: Hash256,
state: &BeaconState<T::EthSpec>,
) -> Result<BlockReward, BeaconChainError> {
Expand Down
10 changes: 5 additions & 5 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp
use tree_hash::TreeHash;
use types::ExecPayload;
use types::{
BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, CloneConfig, Epoch, EthSpec,
ExecutionBlockHash, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch,
SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
BeaconBlockRef, BeaconState, BeaconStateError, BlindedPayload, ChainSpec, CloneConfig, Epoch,
EthSpec, ExecutionBlockHash, Hash256, InconsistentFork, PublicKey, PublicKeyBytes,
RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};

const POS_PANDA_BANNER: &str = r#"
Expand Down Expand Up @@ -567,7 +567,7 @@ pub struct FullyVerifiedBlock<'a, T: BeaconChainTypes> {
pub block: SignedBeaconBlock<T::EthSpec>,
pub block_root: Hash256,
pub state: BeaconState<T::EthSpec>,
pub parent_block: SignedBeaconBlock<T::EthSpec>,
pub parent_block: SignedBeaconBlock<T::EthSpec, BlindedPayload<T::EthSpec>>,
pub confirmation_db_batch: Vec<StoreOp<'a, T::EthSpec>>,
pub payload_verification_status: PayloadVerificationStatus,
}
Expand Down Expand Up @@ -1569,7 +1569,7 @@ fn load_parent<T: BeaconChainTypes>(
// indicate that we don't yet know the parent.
let root = block.parent_root();
let parent_block = chain
.get_block(&block.parent_root())
.get_blinded_block(&block.parent_root())
.map_err(BlockError::BeaconChainError)?
.ok_or_else(|| {
// Return a `MissingBeaconBlock` error instead of a `ParentUnknown` error since
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ where
.ok_or("Fork choice not found in store")?;

let genesis_block = store
.get_block(&chain.genesis_block_root)
.get_blinded_block(&chain.genesis_block_root)
.map_err(|e| descriptive_db_error("genesis block", &e))?
.ok_or("Genesis block not found in store")?;
let genesis_state = store
Expand Down Expand Up @@ -588,7 +588,7 @@ where
// Try to decode the head block according to the current fork, if that fails, try
// to backtrack to before the most recent fork.
let (head_block_root, head_block, head_reverted) =
match store.get_block(&initial_head_block_root) {
match store.get_full_block(&initial_head_block_root) {
Ok(Some(block)) => (initial_head_block_root, block, false),
Ok(None) => return Err("Head block not found in store".into()),
Err(StoreError::SszDecodeError(_)) => {
Expand Down Expand Up @@ -986,10 +986,10 @@ mod test {
assert_eq!(
chain
.store
.get_block(&Hash256::zero())
.get_blinded_block(&Hash256::zero())
.expect("should read db")
.expect("should find genesis block"),
block,
block.clone().into(),
"should store genesis block under zero hash alias"
);
assert_eq!(
Expand Down
12 changes: 12 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ pub enum BeaconChainError {
},
AltairForkDisabled,
ExecutionLayerMissing,
BlockVariantLacksExecutionPayload(Hash256),
ExecutionLayerErrorPayloadReconstruction(ExecutionBlockHash, execution_layer::Error),
BlockHashMissingFromExecutionLayer(ExecutionBlockHash),
InconsistentPayloadReconstructed {
slot: Slot,
exec_block_hash: ExecutionBlockHash,
canonical_payload_root: Hash256,
reconstructed_payload_root: Hash256,
canonical_transactions_root: Hash256,
reconstructed_transactions_root: Hash256,
},
AddPayloadLogicError,
ExecutionForkChoiceUpdateFailed(execution_layer::Error),
PrepareProposerBlockingFailed(execution_layer::Error),
ExecutionForkChoiceUpdateInvalid {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/execution_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ pub async fn prepare_execution_payload<T: BeaconChainTypes, Payload: ExecPayload
} else {
chain
.store
.get_block(&finalized_root)
.get_blinded_block(&finalized_root)
.map_err(BlockProductionError::FailedToReadFinalizedBlock)?
.ok_or(BlockProductionError::MissingFinalizedBlock(finalized_root))?
.message()
Expand Down
12 changes: 9 additions & 3 deletions beacon_node/beacon_chain/src/fork_revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub fn revert_to_fork_boundary<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>
);
let block_iter = ParentRootBlockIterator::fork_tolerant(&store, head_block_root);

process_results(block_iter, |mut iter| {
let (block_root, blinded_block) = process_results(block_iter, |mut iter| {
iter.find_map(|(block_root, block)| {
if block.slot() < fork_epoch.start_slot(E::slots_per_epoch()) {
Some((block_root, block))
Expand All @@ -69,7 +69,13 @@ pub fn revert_to_fork_boundary<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>
e, CORRUPT_DB_MESSAGE
)
})?
.ok_or_else(|| format!("No pre-fork blocks found. {}", CORRUPT_DB_MESSAGE))
.ok_or_else(|| format!("No pre-fork blocks found. {}", CORRUPT_DB_MESSAGE))?;

let block = store
.make_full_block(&block_root, blinded_block)
.map_err(|e| format!("Unable to add payload to new head block: {:?}", e))?;

Ok((block_root, block))
}

/// Reset fork choice to the finalized checkpoint of the supplied head state.
Expand Down Expand Up @@ -97,7 +103,7 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
let finalized_checkpoint = head_state.finalized_checkpoint();
let finalized_block_root = finalized_checkpoint.root;
let finalized_block = store
.get_block(&finalized_block_root)
.get_full_block(&finalized_block_root)
.map_err(|e| format!("Error loading finalized block: {:?}", e))?
.ok_or_else(|| {
format!(
Expand Down
Loading