Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure executor crate #15008

Merged
merged 3 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions execution/executor-benchmark/src/native_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use crate::{
metrics::TIMER,
};
use anyhow::Result;
use aptos_executor::{
block_executor::TransactionBlockExecutor, components::chunk_output::ChunkOutput,
};
use aptos_executor::block_executor::TransactionBlockExecutor;
use aptos_executor_types::execution_output::ExecutionOutput;
use aptos_storage_interface::cached_state_view::CachedStateView;
use aptos_types::{
account_address::AccountAddress,
Expand Down Expand Up @@ -351,7 +350,7 @@ impl TransactionBlockExecutor for NativeExecutor {
transactions: ExecutableTransactions,
state_view: CachedStateView,
_onchain_config: BlockExecutorConfigFromOnchain,
) -> Result<ChunkOutput> {
) -> Result<ExecutionOutput> {
let transactions = match transactions {
ExecutableTransactions::Unsharded(txns) => txns,
_ => todo!("sharded execution not yet supported"),
Expand Down Expand Up @@ -422,7 +421,7 @@ impl TransactionBlockExecutor for NativeExecutor {
})
.collect::<Result<Vec<_>>>()
})?;
Ok(ChunkOutput {
Ok(ExecutionOutput {
transactions: transactions.into_iter().map(|t| t.into_inner()).collect(),
transaction_outputs,
state_cache: state_view.into_state_cache(),
Expand Down
21 changes: 21 additions & 0 deletions execution/executor-types/src/execution_output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright © Aptos Foundation
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

#![forbid(unsafe_code)]

use aptos_storage_interface::cached_state_view::StateCache;
use aptos_types::transaction::{block_epilogue::BlockEndInfo, Transaction, TransactionOutput};

pub struct ExecutionOutput {
/// Input transactions.
pub transactions: Vec<Transaction>,
/// Raw VM output.
pub transaction_outputs: Vec<TransactionOutput>,
/// Carries the frozen base state view, so all in-mem nodes involved won't drop before the
/// execution result is processed; as well as all the accounts touched during execution, together
/// with their proofs.
pub state_cache: StateCache,
/// Optional StateCheckpoint payload
pub block_end_info: Option<BlockEndInfo>,
}
1 change: 1 addition & 0 deletions execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use std::{
};

mod error;
pub mod execution_output;
mod ledger_update_output;
pub mod parsed_transaction_output;
pub mod state_checkpoint_output;
Expand Down
8 changes: 0 additions & 8 deletions execution/executor/proptest-regressions/tests/mod.txt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
mod test;

use crate::{
components::partial_state_compute_result::PartialStateComputeResult,
logging::{LogEntry, LogSchema},
types::partial_state_compute_result::PartialStateComputeResult,
};
use anyhow::{anyhow, ensure, Result};
use aptos_consensus_types::block::Block as ConsensusBlock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::components::{
block_tree::{epoch_genesis_block_id, BlockLookup, BlockTree},
partial_state_compute_result::PartialStateComputeResult,
use crate::{
block_executor::block_tree::{epoch_genesis_block_id, BlockLookup, BlockTree},
types::partial_state_compute_result::PartialStateComputeResult,
};
use aptos_crypto::{hash::PRE_GENESIS_BLOCK_ID, HashValue};
use aptos_infallible::Mutex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@
#![forbid(unsafe_code)]

use crate::{
components::{
block_tree::BlockTree, chunk_output::ChunkOutput, do_ledger_update::DoLedgerUpdate,
partial_state_compute_result::PartialStateComputeResult,
},
logging::{LogEntry, LogSchema},
metrics::{
COMMIT_BLOCKS, CONCURRENCY_GAUGE, EXECUTE_BLOCK, OTHER_TIMERS, SAVE_TRANSACTIONS,
TRANSACTIONS_SAVED, UPDATE_LEDGER, VM_EXECUTE_BLOCK,
},
types::partial_state_compute_result::PartialStateComputeResult,
workflow::{
do_get_execution_output::DoGetExecutionOutput, do_ledger_update::DoLedgerUpdate,
do_state_checkpoint::DoStateCheckpoint,
},
};
use anyhow::Result;
use aptos_crypto::HashValue;
use aptos_executor_types::{
state_checkpoint_output::StateCheckpointOutput, state_compute_result::StateComputeResult,
BlockExecutorTrait, ExecutorError, ExecutorResult,
execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput,
state_compute_result::StateComputeResult, BlockExecutorTrait, ExecutorError, ExecutorResult,
};
use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER;
use aptos_infallible::RwLock;
Expand All @@ -38,24 +39,31 @@ use aptos_types::{
state_store::{state_value::StateValue, StateViewId},
};
use aptos_vm::AptosVM;
use block_tree::BlockTree;
use fail::fail_point;
use std::{marker::PhantomData, sync::Arc};

pub mod block_tree;

pub trait TransactionBlockExecutor: Send + Sync {
fn execute_transaction_block(
transactions: ExecutableTransactions,
state_view: CachedStateView,
onchain_config: BlockExecutorConfigFromOnchain,
) -> Result<ChunkOutput>;
) -> Result<ExecutionOutput>;
}

impl TransactionBlockExecutor for AptosVM {
fn execute_transaction_block(
transactions: ExecutableTransactions,
state_view: CachedStateView,
onchain_config: BlockExecutorConfigFromOnchain,
) -> Result<ChunkOutput> {
ChunkOutput::by_transaction_execution::<AptosVM>(transactions, state_view, onchain_config)
) -> Result<ExecutionOutput> {
DoGetExecutionOutput::by_transaction_execution::<AptosVM>(
transactions,
state_view,
onchain_config,
)
}
}

Expand Down Expand Up @@ -267,7 +275,17 @@ where
let _timer = OTHER_TIMERS.timer_with(&["state_checkpoint"]);

THREAD_MANAGER.get_exe_cpu_pool().install(|| {
chunk_output.into_state_checkpoint_output(parent_output.state(), block_id)
fail_point!("executor::block_state_checkpoint", |_| {
Err(anyhow::anyhow!("Injected error in block state checkpoint."))
});

DoStateCheckpoint::run(
chunk_output,
parent_output.state(),
Some(block_id),
None,
/*is_block=*/ true,
)
})?
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

#![forbid(unsafe_code)]

use crate::components::{
chunk_result_verifier::ChunkResultVerifier, executed_chunk::ExecutedChunk,
partial_state_compute_result::PartialStateComputeResult,
use crate::{
chunk_executor::chunk_result_verifier::ChunkResultVerifier,
types::{
executed_chunk::ExecutedChunk, partial_state_compute_result::PartialStateComputeResult,
},
};
use anyhow::{anyhow, ensure, Result};
use aptos_executor_types::state_checkpoint_output::StateCheckpointOutput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,15 @@
#![forbid(unsafe_code)]

use crate::{
components::{
apply_chunk_output::ApplyChunkOutput,
chunk_commit_queue::{ChunkCommitQueue, ChunkToUpdateLedger},
chunk_output::ChunkOutput,
chunk_result_verifier::{ChunkResultVerifier, ReplayChunkVerifier, StateSyncChunkVerifier},
do_ledger_update::DoLedgerUpdate,
executed_chunk::ExecutedChunk,
partial_state_compute_result::PartialStateComputeResult,
transaction_chunk::{ChunkToApply, ChunkToExecute, TransactionChunk},
},
logging::{LogEntry, LogSchema},
metrics::{APPLY_CHUNK, CHUNK_OTHER_TIMERS, COMMIT_CHUNK, CONCURRENCY_GAUGE, EXECUTE_CHUNK},
types::{
executed_chunk::ExecutedChunk, partial_state_compute_result::PartialStateComputeResult,
},
workflow::{
do_get_execution_output::DoGetExecutionOutput, do_ledger_update::DoLedgerUpdate,
do_state_checkpoint::DoStateCheckpoint,
},
};
use anyhow::{anyhow, ensure, Result};
use aptos_executor_types::{
Expand Down Expand Up @@ -44,6 +41,8 @@ use aptos_types::{
write_set::WriteSet,
};
use aptos_vm::VMExecutor;
use chunk_commit_queue::{ChunkCommitQueue, ChunkToUpdateLedger};
use chunk_result_verifier::{ChunkResultVerifier, ReplayChunkVerifier, StateSyncChunkVerifier};
use fail::fail_point;
use itertools::multizip;
use std::{
Expand All @@ -55,6 +54,11 @@ use std::{
},
time::Instant,
};
use transaction_chunk::{ChunkToApply, ChunkToExecute, TransactionChunk};

pub mod chunk_commit_queue;
pub mod chunk_result_verifier;
pub mod transaction_chunk;

pub struct ChunkExecutor<V> {
db: DbReaderWriter,
Expand Down Expand Up @@ -301,14 +305,13 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
let chunk_output = chunk.into_output::<V>(state_view)?;

// Calculate state snapshot
let (result_state, next_epoch_state, state_checkpoint_output) =
ApplyChunkOutput::calculate_state_checkpoint(
chunk_output,
&self.commit_queue.lock().latest_state(),
None, // append_state_checkpoint_to_block
Some(chunk_verifier.state_checkpoint_hashes()),
false, // is_block
)?;
let (result_state, next_epoch_state, state_checkpoint_output) = DoStateCheckpoint::run(
chunk_output,
&self.commit_queue.lock().latest_state(),
None, // append_state_checkpoint_to_block
Some(chunk_verifier.state_checkpoint_hashes()),
false, // is_block
)?;

// Enqueue for next stage.
self.commit_queue
Expand Down Expand Up @@ -597,7 +600,7 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
.collect::<Vec<SignatureVerifiedTransaction>>();

// State sync executor shouldn't have block gas limit.
let chunk_output = ChunkOutput::by_transaction_execution::<V>(
let chunk_output = DoGetExecutionOutput::by_transaction_execution::<V>(
txns.into(),
state_view,
BlockExecutorConfigFromOnchain::new_no_block_limit(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
components::chunk_output::ChunkOutput,
metrics::{CHUNK_OTHER_TIMERS, VM_EXECUTE_CHUNK},
workflow::do_get_execution_output::DoGetExecutionOutput,
};
use anyhow::Result;
use aptos_executor_types::execution_output::ExecutionOutput;
use aptos_experimental_runtimes::thread_manager::optimal_min_len;
use aptos_metrics_core::TimerHelper;
use aptos_storage_interface::cached_state_view::CachedStateView;
Expand Down Expand Up @@ -37,7 +38,7 @@ pub trait TransactionChunk {
self.len() == 0
}

fn into_output<V: VMExecutor>(self, state_view: CachedStateView) -> Result<ChunkOutput>;
fn into_output<V: VMExecutor>(self, state_view: CachedStateView) -> Result<ExecutionOutput>;
}

pub struct ChunkToExecute {
Expand All @@ -54,7 +55,7 @@ impl TransactionChunk for ChunkToExecute {
self.transactions.len()
}

fn into_output<V: VMExecutor>(self, state_view: CachedStateView) -> Result<ChunkOutput> {
fn into_output<V: VMExecutor>(self, state_view: CachedStateView) -> Result<ExecutionOutput> {
let ChunkToExecute {
transactions,
first_version: _,
Expand All @@ -76,7 +77,7 @@ impl TransactionChunk for ChunkToExecute {
};

let _timer = VM_EXECUTE_CHUNK.start_timer();
ChunkOutput::by_transaction_execution::<V>(
DoGetExecutionOutput::by_transaction_execution::<V>(
sig_verified_txns.into(),
state_view,
BlockExecutorConfigFromOnchain::new_no_block_limit(),
Expand All @@ -99,13 +100,13 @@ impl TransactionChunk for ChunkToApply {
self.transactions.len()
}

fn into_output<V: VMExecutor>(self, state_view: CachedStateView) -> Result<ChunkOutput> {
fn into_output<V: VMExecutor>(self, state_view: CachedStateView) -> Result<ExecutionOutput> {
let Self {
transactions,
transaction_outputs,
first_version: _,
} = self;

ChunkOutput::by_transaction_output(transactions, transaction_outputs, state_view)
DoGetExecutionOutput::by_transaction_output(transactions, transaction_outputs, state_view)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

#![forbid(unsafe_code)]

use crate::components::{chunk_output::ChunkOutput, executed_chunk::ExecutedChunk};
use crate::{
types::executed_chunk::ExecutedChunk,
workflow::{do_get_execution_output::DoGetExecutionOutput, ApplyExecutionOutput},
};
use anyhow::{anyhow, ensure, format_err, Result};
use aptos_crypto::HashValue;
use aptos_logger::prelude::*;
Expand Down Expand Up @@ -128,13 +131,14 @@ pub fn calculate_genesis<V: VMExecutor>(
get_state_epoch(&base_state_view)?
};

let (mut chunk, _, _) = ChunkOutput::by_transaction_execution::<V>(
let execution_output = DoGetExecutionOutput::by_transaction_execution::<V>(
vec![genesis_txn.clone().into()].into(),
base_state_view,
BlockExecutorConfigFromOnchain::new_no_block_limit(),
)?
.apply_to_ledger(&executed_trees, None)?;
let output = &chunk.output;
)?;

let output = ApplyExecutionOutput::run(execution_output, &executed_trees, None)?;

ensure!(
output.expect_ledger_update_output().num_txns() != 0,
"Genesis txn execution failed."
Expand Down Expand Up @@ -163,15 +167,14 @@ pub fn calculate_genesis<V: VMExecutor>(
"Genesis txn didn't output reconfig event."
);

let output = output.expect_complete_result();
let ledger_info_with_sigs = LedgerInfoWithSignatures::new(
LedgerInfo::new(
BlockInfo::new(
epoch,
GENESIS_ROUND,
genesis_block_id(),
output
.ledger_update_output
.expect_ledger_update_output()
.transaction_accumulator
.root_hash(),
genesis_version,
Expand All @@ -182,9 +185,12 @@ pub fn calculate_genesis<V: VMExecutor>(
),
AggregateSignature::empty(), /* signatures */
);
chunk.ledger_info_opt = Some(ledger_info_with_sigs);
let executed_chunk = ExecutedChunk {
output,
ledger_info_opt: Some(ledger_info_with_sigs),
};

let committer = GenesisCommitter::new(db.writer.clone(), chunk)?;
let committer = GenesisCommitter::new(db.writer.clone(), executed_chunk)?;
info!(
"Genesis calculated: ledger_info_with_sigs {:?}, waypoint {:?}",
&committer.output.ledger_info_opt, committer.waypoint,
Expand Down
Loading
Loading