diff --git a/Cargo.lock b/Cargo.lock index 63e9f4022a8c6..0a54c0826e3c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1553,6 +1553,7 @@ dependencies = [ "aptos-types", "bcs 0.1.4", "criterion", + "derive_more", "itertools 0.13.0", "once_cell", "serde", diff --git a/consensus/consensus-types/src/pipelined_block.rs b/consensus/consensus-types/src/pipelined_block.rs index 67a972757fe8a..72713f530d778 100644 --- a/consensus/consensus-types/src/pipelined_block.rs +++ b/consensus/consensus-types/src/pipelined_block.rs @@ -44,6 +44,7 @@ pub struct PipelinedBlock { /// The state_compute_result is calculated for all the pending blocks prior to insertion to /// the tree. The execution results are not persisted: they're recalculated again for the /// pending blocks upon restart. + #[derivative(PartialEq = "ignore")] state_compute_result: StateComputeResult, randomness: OnceCell, pipeline_insertion_time: OnceCell, diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index f56dfa80c23eb..cbc4c85a139b5 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -32,7 +32,9 @@ use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue}; use aptos_executor_types::StateComputeResult; use aptos_infallible::{Mutex, RwLock}; use aptos_logger::prelude::*; -use aptos_types::ledger_info::LedgerInfoWithSignatures; +use aptos_types::{ + ledger_info::LedgerInfoWithSignatures, proof::accumulator::InMemoryTransactionAccumulator, +}; use futures::executor::block_on; #[cfg(test)] use std::collections::VecDeque; @@ -175,18 +177,14 @@ impl BlockStore { root_metadata.accu_hash, ); - let result = StateComputeResult::new( - root_metadata.accu_hash, - root_metadata.frozen_root_hashes, - root_metadata.num_leaves, /* num_leaves */ - vec![], /* parent_root_hashes */ - 0, /* parent_num_leaves */ - None, /* epoch_state */ - vec![], /* compute_status */ - vec![], /* txn_infos */ - vec![], /* reconfig_events */ - None, // block end info - ); + let result = StateComputeResult::new_empty(Arc::new( + InMemoryTransactionAccumulator::new( + root_metadata.frozen_root_hashes, + root_metadata.num_leaves, + ) + .expect("Failed to recover accumulator."), + )); + assert_eq!(result.root_hash(), root_metadata.accu_hash); let pipelined_root_block = PipelinedBlock::new( *root_block, diff --git a/consensus/src/block_storage/block_tree.rs b/consensus/src/block_storage/block_tree.rs index 5d1df54149cbf..4d13c742747d4 100644 --- a/consensus/src/block_storage/block_tree.rs +++ b/consensus/src/block_storage/block_tree.rs @@ -19,7 +19,7 @@ use aptos_types::{ block_info::{BlockInfo, Round}, ledger_info::LedgerInfoWithSignatures, }; -use mirai_annotations::{checked_verify_eq, precondition}; +use mirai_annotations::precondition; use std::{ collections::{vec_deque::VecDeque, BTreeMap, HashMap, HashSet}, sync::Arc, @@ -249,7 +249,6 @@ impl BlockTree { existing_block, block_id, block); - checked_verify_eq!(existing_block.compute_result(), block.compute_result()); Ok(existing_block) } else { match self.get_linkable_block_mut(&block.parent_id()) { diff --git a/consensus/src/execution_pipeline.rs b/consensus/src/execution_pipeline.rs index 8743ae9fbd06f..c95d6c8e1f0f3 100644 --- a/consensus/src/execution_pipeline.rs +++ b/consensus/src/execution_pipeline.rs @@ -35,11 +35,8 @@ use std::{ }; use tokio::sync::{mpsc, oneshot}; -pub type PreCommitHook = Box< - dyn 'static - + FnOnce(&[SignedTransaction], &StateComputeResult) -> BoxFuture<'static, ()> - + Send, ->; +pub type PreCommitHook = + Box BoxFuture<'static, ()> + Send>; #[allow(clippy::unwrap_used)] pub static SIG_VERIFY_POOL: Lazy> = Lazy::new(|| { @@ -287,7 +284,7 @@ impl ExecutionPipeline { } .await; let pipeline_res = res.map(|(output, execution_duration)| { - let pre_commit_hook_fut = pre_commit_hook(&input_txns, &output); + let pre_commit_hook_fut = pre_commit_hook(&output); let pre_commit_fut: BoxFuture<'static, ExecutorResult<()>> = if output.epoch_state().is_some() || !enable_pre_commit { // hack: it causes issue if pre-commit is finished at an epoch ending, and diff --git a/consensus/src/pipeline/buffer_manager.rs b/consensus/src/pipeline/buffer_manager.rs index 4b7d2e9712ddf..1b9136efb971a 100644 --- a/consensus/src/pipeline/buffer_manager.rs +++ b/consensus/src/pipeline/buffer_manager.rs @@ -31,7 +31,7 @@ use aptos_consensus_types::{ pipeline::commit_vote::CommitVote, pipelined_block::PipelinedBlock, }; -use aptos_crypto::{bls12381, HashValue}; +use aptos_crypto::HashValue; use aptos_executor_types::ExecutorResult; use aptos_logger::prelude::*; use aptos_network::protocols::{rpc::error::RpcError, wire::handshake::v1::ProtocolId}; @@ -703,7 +703,7 @@ impl BufferManager { CommitMessage::Vote(CommitVote::new_with_signature( commit_vote.author(), commit_vote.ledger_info().clone(), - bls12381::Signature::dummy_signature(), + aptos_crypto::bls12381::Signature::dummy_signature(), )) }); CommitMessage::Vote(commit_vote) diff --git a/consensus/src/pipeline/tests/test_utils.rs b/consensus/src/pipeline/tests/test_utils.rs index 95a6cfd85f0aa..e099abaceb183 100644 --- a/consensus/src/pipeline/tests/test_utils.rs +++ b/consensus/src/pipeline/tests/test_utils.rs @@ -96,18 +96,7 @@ pub fn prepare_executed_blocks_with_ledger_info( proposals.push(proposal); } - let compute_result = StateComputeResult::new( - executed_hash, - vec![], // dummy subtree - 0, - vec![], - 0, - None, - vec![], - vec![], - vec![], - None, // block end info - ); + let compute_result = StateComputeResult::new_dummy_with_root_hash(executed_hash); let li = LedgerInfo::new( proposals.last().unwrap().block().gen_block_info( diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index 70d99839e465e..1fe6a69d92fbd 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -30,8 +30,7 @@ use aptos_logger::prelude::*; use aptos_metrics_core::IntGauge; use aptos_types::{ account_address::AccountAddress, block_executor::config::BlockExecutorConfigFromOnchain, - block_metadata_ext::BlockMetadataExt, epoch_state::EpochState, - ledger_info::LedgerInfoWithSignatures, randomness::Randomness, transaction::SignedTransaction, + epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, randomness::Randomness, }; use fail::fail_point; use futures::{future::BoxFuture, SinkExt, StreamExt}; @@ -132,44 +131,36 @@ impl ExecutionProxy { fn pre_commit_hook( &self, block: &Block, - metadata: BlockMetadataExt, payload_manager: Arc, ) -> PreCommitHook { let mut pre_commit_notifier = self.pre_commit_notifier.clone(); let state_sync_notifier = self.state_sync_notifier.clone(); let payload = block.payload().cloned(); let timestamp = block.timestamp_usecs(); - let validator_txns = block.validator_txns().cloned().unwrap_or_default(); - let block_id = block.id(); - Box::new( - move |user_txns: &[SignedTransaction], state_compute_result: &StateComputeResult| { - let input_txns = Block::combine_to_input_transactions( - validator_txns, - user_txns.to_vec(), - metadata, - ); - let txns = state_compute_result.transactions_to_commit(input_txns, block_id); - let subscribable_events = state_compute_result.subscribable_events().to_vec(); - Box::pin(async move { - pre_commit_notifier - .send(Box::pin(async move { - if let Err(e) = monitor!( - "notify_state_sync", - state_sync_notifier - .notify_new_commit(txns, subscribable_events) - .await - ) { - error!(error = ?e, "Failed to notify state synchronizer"); - } - - let payload_vec = payload.into_iter().collect(); - payload_manager.notify_commit(timestamp, payload_vec); - })) - .await - .expect("Failed to send pre-commit notification"); - }) - }, - ) + Box::new(move |state_compute_result: &StateComputeResult| { + let state_compute_result = state_compute_result.clone(); + Box::pin(async move { + pre_commit_notifier + .send(Box::pin(async move { + let txns = state_compute_result.transactions_to_commit(); + let subscribable_events = + state_compute_result.subscribable_events().to_vec(); + if let Err(e) = monitor!( + "notify_state_sync", + state_sync_notifier + .notify_new_commit(txns, subscribable_events) + .await + ) { + error!(error = ?e, "Failed to notify state synchronizer"); + } + + let payload_vec = payload.into_iter().collect(); + payload_manager.notify_commit(timestamp, payload_vec); + })) + .await + .expect("Failed to send pre-commit notification"); + }) + }) } } @@ -230,7 +221,7 @@ impl StateComputer for ExecutionProxy { parent_block_id, transaction_generator, block_executor_onchain_config, - self.pre_commit_hook(block, metadata, payload_manager), + self.pre_commit_hook(block, payload_manager), lifetime_guard, ) .await; diff --git a/consensus/src/state_computer_tests.rs b/consensus/src/state_computer_tests.rs index be7bc9fb5340a..bbcb5167f1b27 100644 --- a/consensus/src/state_computer_tests.rs +++ b/consensus/src/state_computer_tests.rs @@ -21,7 +21,7 @@ use aptos_types::{ contract_event::ContractEvent, epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, - transaction::{ExecutionStatus, SignedTransaction, Transaction, TransactionStatus}, + transaction::{SignedTransaction, Transaction, TransactionStatus}, validator_txn::ValidatorTransaction, }; use std::{ @@ -129,18 +129,19 @@ impl BlockExecutorTrait for DummyBlockExecutor { _parent_block_id: HashValue, _state_checkpoint_output: StateCheckpointOutput, ) -> ExecutorResult { - let num_txns = self + let txns = self .blocks_received .lock() .last() .unwrap() .transactions - .num_transactions(); + .clone() + .into_txns() + .into_iter() + .map(|t| t.into_inner()) + .collect(); - Ok(StateComputeResult::new_dummy_with_compute_status(vec![ - TransactionStatus::Keep(ExecutionStatus::Success); - num_txns - ])) + Ok(StateComputeResult::new_dummy_with_input_txns(txns)) } fn pre_commit_block( diff --git a/execution/executor-types/Cargo.toml b/execution/executor-types/Cargo.toml index a3df75cd01ccf..5c1517f7f5973 100644 --- a/execution/executor-types/Cargo.toml +++ b/execution/executor-types/Cargo.toml @@ -21,6 +21,7 @@ aptos-storage-interface = { workspace = true } aptos-types = { workspace = true } bcs = { workspace = true } criterion = { workspace = true } +derive_more = { workspace = true } itertools = { workspace = true } once_cell = { workspace = true } serde = { workspace = true } diff --git a/execution/executor-types/src/ledger_update_output.rs b/execution/executor-types/src/ledger_update_output.rs index beb86a5dd2959..0ccaa3dadebc8 100644 --- a/execution/executor-types/src/ledger_update_output.rs +++ b/execution/executor-types/src/ledger_update_output.rs @@ -17,11 +17,79 @@ use aptos_types::{ Version, }, }; +use derive_more::Deref; use itertools::zip_eq; use std::sync::Arc; -#[derive(Default, Debug)] +#[derive(Clone, Debug, Default, Deref)] pub struct LedgerUpdateOutput { + #[deref] + inner: Arc, +} + +impl LedgerUpdateOutput { + pub fn new_empty(transaction_accumulator: Arc) -> Self { + Self::new_impl(Inner::new_empty(transaction_accumulator)) + } + + #[cfg(any(test, feature = "fuzzing"))] + pub fn new_dummy_with_input_txns(txns: Vec) -> Self { + Self::new_impl(Inner::new_dummy_with_input_txns(txns)) + } + + #[cfg(any(test, feature = "fuzzing"))] + pub fn new_dummy_with_txns_to_commit(txns: Vec) -> Self { + Self::new_impl(Inner::new_dummy_with_txns_to_commit(txns)) + } + + pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self { + Self::new_impl(Inner::new_dummy_with_root_hash(root_hash)) + } + + pub fn reconfig_suffix(&self) -> Self { + Self::new_impl(Inner::new_empty(self.transaction_accumulator.clone())) + } + + pub fn new( + statuses_for_input_txns: Vec, + to_commit: Vec, + subscribable_events: Vec, + transaction_info_hashes: Vec, + state_updates_until_last_checkpoint: Option, + sharded_state_cache: ShardedStateCache, + transaction_accumulator: Arc, + parent_accumulator: Arc, + block_end_info: Option, + ) -> Self { + Self::new_impl(Inner { + statuses_for_input_txns, + to_commit, + subscribable_events, + transaction_info_hashes, + state_updates_until_last_checkpoint, + sharded_state_cache, + transaction_accumulator, + parent_accumulator, + block_end_info, + }) + } + + fn new_impl(inner: Inner) -> Self { + Self { + inner: Arc::new(inner), + } + } + + pub fn as_state_compute_result( + &self, + next_epoch_state: Option, + ) -> StateComputeResult { + StateComputeResult::new(self.clone(), next_epoch_state) + } +} + +#[derive(Default, Debug)] +pub struct Inner { pub statuses_for_input_txns: Vec, pub to_commit: Vec, pub subscribable_events: Vec, @@ -31,20 +99,56 @@ pub struct LedgerUpdateOutput { /// The in-memory Merkle Accumulator representing a blockchain state consistent with the /// `state_tree`. pub transaction_accumulator: Arc, + pub parent_accumulator: Arc, pub block_end_info: Option, } -impl LedgerUpdateOutput { +impl Inner { pub fn new_empty(transaction_accumulator: Arc) -> Self { Self { + parent_accumulator: transaction_accumulator.clone(), transaction_accumulator, ..Default::default() } } - pub fn reconfig_suffix(&self) -> Self { + #[cfg(any(test, feature = "fuzzing"))] + pub fn new_dummy_with_input_txns(txns: Vec) -> Self { + let num_txns = txns.len(); + let to_commit = txns + .into_iter() + .chain(std::iter::once( + aptos_types::transaction::Transaction::StateCheckpoint(HashValue::zero()), + )) + .map(TransactionToCommit::dummy_with_transaction) + .collect(); Self { - transaction_accumulator: Arc::clone(&self.transaction_accumulator), + to_commit, + statuses_for_input_txns: vec![ + TransactionStatus::Keep( + aptos_types::transaction::ExecutionStatus::Success + ); + num_txns + ], + ..Default::default() + } + } + + #[cfg(any(test, feature = "fuzzing"))] + pub fn new_dummy_with_txns_to_commit(txns: Vec) -> Self { + Self { + to_commit: txns, + ..Default::default() + } + } + + pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self { + let transaction_accumulator = Arc::new( + InMemoryTransactionAccumulator::new_empty_with_root_hash(root_hash), + ); + Self { + parent_accumulator: transaction_accumulator.clone(), + transaction_accumulator, ..Default::default() } } @@ -98,27 +202,6 @@ impl LedgerUpdateOutput { Ok(()) } - pub fn as_state_compute_result( - &self, - parent_accumulator: &Arc, - next_epoch_state: Option, - ) -> StateComputeResult { - let txn_accu = self.txn_accumulator(); - - StateComputeResult::new( - txn_accu.root_hash(), - txn_accu.frozen_subtree_roots().clone(), - txn_accu.num_leaves(), - parent_accumulator.frozen_subtree_roots().clone(), - parent_accumulator.num_leaves(), - next_epoch_state, - self.statuses_for_input_txns.clone(), - self.transaction_info_hashes.clone(), - self.subscribable_events.clone(), - self.block_end_info.clone(), - ) - } - pub fn next_version(&self) -> Version { self.transaction_accumulator.num_leaves() as Version } diff --git a/execution/executor-types/src/lib.rs b/execution/executor-types/src/lib.rs index 8ae1701243ca2..3af3e304f6a90 100644 --- a/execution/executor-types/src/lib.rs +++ b/execution/executor-types/src/lib.rs @@ -18,19 +18,20 @@ use aptos_types::{ epoch_state::EpochState, jwks::OBSERVED_JWK_UPDATED_MOVE_TYPE_TAG, ledger_info::LedgerInfoWithSignatures, - proof::{AccumulatorExtensionProof, SparseMerkleProofExt}, + proof::{ + accumulator::InMemoryTransactionAccumulator, AccumulatorExtensionProof, + SparseMerkleProofExt, + }, state_store::{state_key::StateKey, state_value::StateValue}, transaction::{ - block_epilogue::{BlockEndInfo, BlockEpiloguePayload}, - ExecutionStatus, Transaction, TransactionInfo, TransactionListWithProof, - TransactionOutputListWithProof, TransactionStatus, Version, + Transaction, TransactionInfo, TransactionListWithProof, TransactionOutputListWithProof, + TransactionStatus, Version, }, write_set::WriteSet, }; pub use error::{ExecutorError, ExecutorResult}; pub use ledger_update_output::LedgerUpdateOutput; pub use parsed_transaction_output::ParsedTransactionOutput; -use serde::{Deserialize, Serialize}; use std::{ cmp::max, collections::{BTreeSet, HashMap}, @@ -287,102 +288,38 @@ pub struct ChunkCommitNotification { /// of success / failure of the transactions. /// Note that the specific details of compute_status are opaque to StateMachineReplication, /// which is going to simply pass the results between StateComputer and PayloadClient. -#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone)] pub struct StateComputeResult { - /// transaction accumulator root hash is identified as `state_id` in Consensus. - root_hash: HashValue, - /// Represents the roots of all the full subtrees from left to right in this accumulator - /// after the execution. For details, please see [`InMemoryAccumulator`](aptos_types::proof::accumulator::InMemoryAccumulator). - frozen_subtree_roots: Vec, - - /// The frozen subtrees roots of the parent block, - parent_frozen_subtree_roots: Vec, - - /// The number of leaves of the transaction accumulator after executing a proposed block. - /// This state must be persisted to ensure that on restart that the version is calculated correctly. - num_leaves: u64, - - /// The number of leaves after executing the parent block, - parent_num_leaves: u64, - - /// If set, this is the new epoch info that should be changed to if this block is committed. - epoch_state: Option, - /// The compute status (success/failure) of the given payload. The specific details are opaque - /// for StateMachineReplication, which is merely passing it between StateComputer and - /// PayloadClient. - /// - /// Here, only input transactions statuses are kept, and in their order. - /// Input includes BlockMetadata, but doesn't include StateCheckpoint/BlockEpilogue - compute_status_for_input_txns: Vec, - - /// The transaction info hashes of all success txns. - transaction_info_hashes: Vec, - - subscribable_events: Vec, - - block_end_info: Option, + ledger_update_output: LedgerUpdateOutput, + /// If set, this is the new epoch info that should be changed to if this is committed. + next_epoch_state: Option, } impl StateComputeResult { pub fn new( - root_hash: HashValue, - frozen_subtree_roots: Vec, - num_leaves: u64, - parent_frozen_subtree_roots: Vec, - parent_num_leaves: u64, - epoch_state: Option, - compute_status_for_input_txns: Vec, - transaction_info_hashes: Vec, - subscribable_events: Vec, - block_end_info: Option, + ledger_update_output: LedgerUpdateOutput, + next_epoch_state: Option, ) -> Self { Self { - root_hash, - frozen_subtree_roots, - num_leaves, - parent_frozen_subtree_roots, - parent_num_leaves, - epoch_state, - compute_status_for_input_txns, - transaction_info_hashes, - subscribable_events, - block_end_info, + ledger_update_output, + next_epoch_state, } } - /// generate a new dummy state compute result with a given root hash. - /// this function is used in RandomComputeResultStateComputer to assert that the compute - /// function is really called. - pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self { + pub fn new_empty(transaction_accumulator: Arc) -> Self { Self { - root_hash, - frozen_subtree_roots: vec![], - num_leaves: 0, - parent_frozen_subtree_roots: vec![], - parent_num_leaves: 0, - epoch_state: None, - compute_status_for_input_txns: vec![], - transaction_info_hashes: vec![], - subscribable_events: vec![], - block_end_info: None, + ledger_update_output: LedgerUpdateOutput::new_empty(transaction_accumulator), + next_epoch_state: None, } } - pub fn new_dummy_with_num_txns(num_txns: usize) -> Self { + /// generate a new dummy state compute result with a given root hash. + /// this function is used in RandomComputeResultStateComputer to assert that the compute + /// function is really called. + pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self { Self { - root_hash: HashValue::zero(), - frozen_subtree_roots: vec![], - num_leaves: 0, - parent_frozen_subtree_roots: vec![], - parent_num_leaves: 0, - epoch_state: None, - compute_status_for_input_txns: vec![ - TransactionStatus::Keep(ExecutionStatus::Success); - num_txns - ], - transaction_info_hashes: vec![], - subscribable_events: vec![], - block_end_info: None, + ledger_update_output: LedgerUpdateOutput::new_dummy_with_root_hash(root_hash), + next_epoch_state: None, } } @@ -395,129 +332,70 @@ impl StateComputeResult { } #[cfg(any(test, feature = "fuzzing"))] - pub fn new_dummy_with_compute_status(compute_status: Vec) -> Self { - let mut ret = Self::new_dummy(); - ret.compute_status_for_input_txns = compute_status; - ret + pub fn new_dummy_with_input_txns(txns: Vec) -> Self { + Self { + ledger_update_output: LedgerUpdateOutput::new_dummy_with_input_txns(txns), + next_epoch_state: None, + } } pub fn version(&self) -> Version { - max(self.num_leaves, 1) + max(self.ledger_update_output.next_version(), 1) .checked_sub(1) .expect("Integer overflow occurred") } pub fn root_hash(&self) -> HashValue { - self.root_hash + self.ledger_update_output.transaction_accumulator.root_hash } pub fn compute_status_for_input_txns(&self) -> &Vec { - &self.compute_status_for_input_txns + &self.ledger_update_output.statuses_for_input_txns } pub fn transactions_to_commit_len(&self) -> usize { - self.compute_status_for_input_txns() - .iter() - .filter(|status| matches!(status, TransactionStatus::Keep(_))) - .count() - // StateCheckpoint/BlockEpilogue is added if there is no reconfiguration - + (if self.has_reconfiguration() { 0 } else { 1 }) + self.ledger_update_output.to_commit.len() } /// On top of input transactions (which contain BlockMetadata and Validator txns), /// filter out those that should be committed, and add StateCheckpoint/BlockEpilogue if needed. - pub fn transactions_to_commit( - &self, - input_txns: Vec, - block_id: HashValue, - ) -> Vec { - if self.is_reconfiguration_suffix() { - return vec![]; - } - - assert_eq!( - input_txns.len(), - self.compute_status_for_input_txns().len(), - "{:?} != {:?}", - input_txns.iter().map(|t| t.type_name()).collect::>(), - self.compute_status_for_input_txns() - ); - let output = itertools::zip_eq(input_txns, self.compute_status_for_input_txns()) - .filter_map(|(txn, status)| { - assert!( - !txn.is_non_reconfig_block_ending(), - "{:?}: {:?}", - txn, - status - ); - match status { - TransactionStatus::Keep(_) => Some(txn), - _ => None, - } - }) - .chain( - (!self.has_reconfiguration()).then_some(self.block_end_info.clone().map_or( - Transaction::StateCheckpoint(block_id), - |block_end_info| { - Transaction::BlockEpilogue(BlockEpiloguePayload::V0 { - block_id, - block_end_info, - }) - }, - )), - ) - .collect::>(); - - assert!( - self.has_reconfiguration() - || output - .last() - .map_or(false, Transaction::is_non_reconfig_block_ending), - "{:?}", - output.last() - ); - - output + pub fn transactions_to_commit(&self) -> Vec { + self.ledger_update_output + .to_commit + .iter() + .map(|t| t.transaction.clone()) + .collect() } pub fn epoch_state(&self) -> &Option { - &self.epoch_state + &self.next_epoch_state } pub fn extension_proof(&self) -> AccumulatorExtensionProof { - AccumulatorExtensionProof::::new( - self.parent_frozen_subtree_roots.clone(), - self.parent_num_leaves(), - self.transaction_info_hashes().clone(), + AccumulatorExtensionProof::new( + self.ledger_update_output + .transaction_accumulator + .frozen_subtree_roots + .clone(), + self.ledger_update_output.transaction_accumulator.num_leaves, + self.transaction_info_hashes().to_vec(), ) } pub fn transaction_info_hashes(&self) -> &Vec { - &self.transaction_info_hashes + &self.ledger_update_output.transaction_info_hashes } pub fn num_leaves(&self) -> u64 { - self.num_leaves - } - - pub fn frozen_subtree_roots(&self) -> &Vec { - &self.frozen_subtree_roots - } - - pub fn parent_num_leaves(&self) -> u64 { - self.parent_num_leaves - } - - pub fn parent_frozen_subtree_roots(&self) -> &Vec { - &self.parent_frozen_subtree_roots + self.ledger_update_output.next_version() } pub fn has_reconfiguration(&self) -> bool { - self.epoch_state.is_some() + self.next_epoch_state.is_some() } pub fn subscribable_events(&self) -> &[ContractEvent] { - &self.subscribable_events + &self.ledger_update_output.subscribable_events } pub fn is_reconfiguration_suffix(&self) -> bool { diff --git a/execution/executor/src/block_executor.rs b/execution/executor/src/block_executor.rs index ae8ff5ee0035b..b9ae93b5ce62c 100644 --- a/execution/executor/src/block_executor.rs +++ b/execution/executor/src/block_executor.rs @@ -314,10 +314,7 @@ where return Ok(current_output .output .get_ledger_update() - .as_state_compute_result( - parent_accumulator, - current_output.output.epoch_state().clone(), - )); + .as_state_compute_result(current_output.output.epoch_state().clone())); } let output = @@ -341,10 +338,8 @@ where output.ensure_ends_with_state_checkpoint()?; } - let state_compute_result = output.as_state_compute_result( - parent_accumulator, - current_output.output.epoch_state().clone(), - ); + let state_compute_result = + output.as_state_compute_result(current_output.output.epoch_state().clone()); current_output.output.set_ledger_update(output); Ok(state_compute_result) } diff --git a/execution/executor/src/components/apply_chunk_output.rs b/execution/executor/src/components/apply_chunk_output.rs index cf018e8ea9044..ba358ec2b9caa 100644 --- a/execution/executor/src/components/apply_chunk_output.rs +++ b/execution/executor/src/components/apply_chunk_output.rs @@ -149,16 +149,17 @@ impl ApplyChunkOutput { let transaction_accumulator = Arc::new(base_txn_accumulator.append(&transaction_info_hashes)); Ok(( - LedgerUpdateOutput { + LedgerUpdateOutput::new( statuses_for_input_txns, - to_commit: txns_to_commit, + txns_to_commit, subscribable_events, transaction_info_hashes, - state_updates_until_last_checkpoint: state_updates_before_last_checkpoint, + state_updates_before_last_checkpoint, sharded_state_cache, transaction_accumulator, + base_txn_accumulator, block_end_info, - }, + ), to_discard.into_txns(), to_retry.into_txns(), )) diff --git a/execution/executor/src/components/executed_chunk.rs b/execution/executor/src/components/executed_chunk.rs index 28e8e68fcf87e..57009d3ec33c9 100644 --- a/execution/executor/src/components/executed_chunk.rs +++ b/execution/executor/src/components/executed_chunk.rs @@ -4,15 +4,10 @@ #![forbid(unsafe_code)] -use aptos_drop_helper::DEFAULT_DROPPER; use aptos_executor_types::{ should_forward_to_subscription_service, ChunkCommitNotification, LedgerUpdateOutput, }; use aptos_storage_interface::{state_delta::StateDelta, ExecutedTrees}; -#[cfg(test)] -use aptos_types::account_config::NewEpochEvent; -#[cfg(test)] -use aptos_types::contract_event::ContractEvent; use aptos_types::{ epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, transaction::TransactionToCommit, @@ -50,24 +45,20 @@ impl ExecutedChunk { Vec::with_capacity(self.ledger_update_output.to_commit.len()); let mut subscribable_events = Vec::with_capacity(self.ledger_update_output.to_commit.len() * 2); - let mut to_drop = Vec::with_capacity(self.ledger_update_output.to_commit.len()); - for txn_to_commit in self.ledger_update_output.to_commit { + for txn_to_commit in &self.ledger_update_output.to_commit { let TransactionToCommit { transaction, events, - state_updates, - write_set, .. } = txn_to_commit; - committed_transactions.push(transaction); + committed_transactions.push(transaction.clone()); subscribable_events.extend( events - .into_iter() - .filter(should_forward_to_subscription_service), + .iter() + .filter(|evt| should_forward_to_subscription_service(evt)) + .cloned(), ); - to_drop.push((state_updates, write_set)); } - DEFAULT_DROPPER.schedule_drop(to_drop); ChunkCommitNotification { committed_transactions, @@ -89,6 +80,7 @@ impl ExecutedChunk { #[test] fn into_chunk_commit_notification_should_apply_event_filters() { + use aptos_types::{account_config::NewEpochEvent, contract_event::ContractEvent}; let event_1 = ContractEvent::new_v2_with_type_tag_str( "0x2345::random_module::RandomEvent", b"random_data_x".to_vec(), @@ -101,14 +93,11 @@ fn into_chunk_commit_notification_should_apply_event_filters() { ); let event_4 = ContractEvent::from((1, NewEpochEvent::dummy())); - let ledger_update_output = LedgerUpdateOutput { - to_commit: vec![ - TransactionToCommit::dummy_with_events(vec![event_1.clone()]), - TransactionToCommit::dummy_with_events(vec![event_2.clone(), event_3.clone()]), - TransactionToCommit::dummy_with_events(vec![event_4.clone()]), - ], - ..Default::default() - }; + let ledger_update_output = LedgerUpdateOutput::new_dummy_with_txns_to_commit(vec![ + TransactionToCommit::dummy_with_events(vec![event_1.clone()]), + TransactionToCommit::dummy_with_events(vec![event_2.clone(), event_3.clone()]), + TransactionToCommit::dummy_with_events(vec![event_4.clone()]), + ]); let chunk = ExecutedChunk { ledger_update_output, diff --git a/execution/executor/src/tests/mod.rs b/execution/executor/src/tests/mod.rs index c50d2480a1b84..e91e689e4b9a2 100644 --- a/execution/executor/src/tests/mod.rs +++ b/execution/executor/src/tests/mod.rs @@ -14,8 +14,7 @@ use crate::{ use aptos_crypto::{ed25519::Ed25519PrivateKey, HashValue, PrivateKey, SigningKey, Uniform}; use aptos_db::AptosDB; use aptos_executor_types::{ - BlockExecutorTrait, ChunkExecutorTrait, LedgerUpdateOutput, TransactionReplayer, - VerifyExecutionMode, + BlockExecutorTrait, ChunkExecutorTrait, TransactionReplayer, VerifyExecutionMode, }; use aptos_storage_interface::{ async_proof_fetcher::AsyncProofFetcher, DbReaderWriter, ExecutedTrees, Result, @@ -311,7 +310,7 @@ fn test_executor_execute_same_block_multiple_times() { .collect(); let mut responses = vec![]; - for _i in 0..100 { + for _i in 0..10 { let output = executor .execute_block( (block_id, block(txns.clone())).into(), @@ -321,8 +320,14 @@ fn test_executor_execute_same_block_multiple_times() { .unwrap(); responses.push(output); } - responses.dedup(); - assert_eq!(responses.len(), 1); + assert_eq!( + responses + .iter() + .map(|output| output.root_hash()) + .dedup() + .count(), + 1, + ); } fn create_blocks_and_chunks( @@ -494,27 +499,20 @@ fn apply_transaction_by_writeset( next_epoch_state: _, ledger_update_output, } = executed; - let LedgerUpdateOutput { - statuses_for_input_txns: _, - to_commit, - subscribable_events: _, - transaction_info_hashes: _, - state_updates_until_last_checkpoint: state_updates_before_last_checkpoint, - sharded_state_cache, - transaction_accumulator: _, - block_end_info: _, - } = ledger_update_output; db.writer .save_transactions( - &to_commit, + &ledger_update_output.to_commit, ledger_view.txn_accumulator().num_leaves(), ledger_view.state().base_version, ledger_info.as_ref(), true, /* sync_commit */ result_state, - state_updates_before_last_checkpoint, - Some(&sharded_state_cache), + // TODO(aldenhu): avoid clone + ledger_update_output + .state_updates_until_last_checkpoint + .clone(), + Some(&ledger_update_output.sharded_state_cache), ) .unwrap(); } @@ -714,26 +712,19 @@ fn run_transactions_naive( next_epoch_state: _, ledger_update_output, } = executed; - let LedgerUpdateOutput { - statuses_for_input_txns: _, - to_commit, - subscribable_events: _, - transaction_info_hashes: _, - state_updates_until_last_checkpoint: state_updates_before_last_checkpoint, - sharded_state_cache, - transaction_accumulator: _, - block_end_info: _, - } = ledger_update_output; db.writer .save_transactions( - &to_commit, + &ledger_update_output.to_commit, ledger_view.txn_accumulator().num_leaves(), ledger_view.state().base_version, ledger_info.as_ref(), true, /* sync_commit */ result_state, - state_updates_before_last_checkpoint, - Some(&sharded_state_cache), + // TODO(aldenhu): avoid clone + ledger_update_output + .state_updates_until_last_checkpoint + .clone(), + Some(&ledger_update_output.sharded_state_cache), ) .unwrap(); ledger_view = next_ledger_view; diff --git a/types/src/proof/accumulator/mod.rs b/types/src/proof/accumulator/mod.rs index 5858db4438959..89bb18ba2269e 100644 --- a/types/src/proof/accumulator/mod.rs +++ b/types/src/proof/accumulator/mod.rs @@ -85,10 +85,14 @@ where } pub fn new_empty() -> Self { + Self::new_empty_with_root_hash(*ACCUMULATOR_PLACEHOLDER_HASH) + } + + pub fn new_empty_with_root_hash(root_hash: HashValue) -> Self { Self { frozen_subtree_roots: Vec::new(), num_leaves: 0, - root_hash: *ACCUMULATOR_PLACEHOLDER_HASH, + root_hash, phantom: PhantomData, } } diff --git a/types/src/transaction/mod.rs b/types/src/transaction/mod.rs index f3e8a96dfc5c4..a76817793e90c 100644 --- a/types/src/transaction/mod.rs +++ b/types/src/transaction/mod.rs @@ -1567,6 +1567,15 @@ impl TransactionToCommit { } } + #[cfg(any(test, feature = "fuzzing"))] + pub fn dummy_with_transaction(transaction: Transaction) -> Self { + Self { + transaction, + transaction_info: TransactionInfo::dummy(), + ..Self::dummy() + } + } + #[cfg(any(test, feature = "fuzzing"))] pub fn dummy_with_transaction_auxiliary_data( transaction_auxiliary_data: TransactionAuxiliaryData,