Skip to content

Commit

Permalink
BlockOutput -> PartialStateComputeResult
Browse files Browse the repository at this point in the history
Shared between the chunk and block executors.
  • Loading branch information
msmouse committed Oct 17, 2024
1 parent 858418a commit d672fb9
Show file tree
Hide file tree
Showing 28 changed files with 334 additions and 376 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 0 additions & 22 deletions execution/executor-types/src/ledger_update_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@

#![forbid(unsafe_code)]

use crate::state_compute_result::StateComputeResult;
use anyhow::{ensure, Result};
use aptos_crypto::HashValue;
use aptos_drop_helper::DropHelper;
use aptos_storage_interface::cached_state_view::ShardedStateCache;
use aptos_types::{
contract_event::ContractEvent,
epoch_state::EpochState,
proof::accumulator::InMemoryTransactionAccumulator,
state_store::ShardedStateUpdates,
transaction::{
Expand Down Expand Up @@ -38,11 +36,6 @@ impl LedgerUpdateOutput {
Self::new_impl(Inner::new_dummy_with_input_txns(txns))
}

#[cfg(any(test, feature = "fuzzing"))]
pub fn new_dummy_with_txns_to_commit(txns: Vec<TransactionToCommit>) -> Self {
Self::new_impl(Inner::new_dummy_with_txns_to_commit(txns))
}

pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self {
Self::new_impl(Inner::new_dummy_with_root_hash(root_hash))
}
Expand Down Expand Up @@ -80,13 +73,6 @@ impl LedgerUpdateOutput {
inner: Arc::new(DropHelper::new(inner)),
}
}

pub fn as_state_compute_result(
&self,
next_epoch_state: Option<EpochState>,
) -> StateComputeResult {
StateComputeResult::new(self.clone(), next_epoch_state)
}
}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -135,14 +121,6 @@ impl Inner {
}
}

#[cfg(any(test, feature = "fuzzing"))]
pub fn new_dummy_with_txns_to_commit(txns: Vec<TransactionToCommit>) -> Self {
Self {
to_commit: txns,
..Default::default()
}
}

pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self {
let transaction_accumulator = Arc::new(
InMemoryTransactionAccumulator::new_empty_with_root_hash(root_hash),
Expand Down
30 changes: 27 additions & 3 deletions execution/executor-types/src/state_compute_result.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::LedgerUpdateOutput;
use crate::{ChunkCommitNotification, LedgerUpdateOutput};
use aptos_crypto::{
hash::{TransactionAccumulatorHasher, ACCUMULATOR_PLACEHOLDER_HASH},
HashValue,
};
use aptos_storage_interface::state_delta::StateDelta;
use aptos_types::{
contract_event::ContractEvent,
epoch_state::EpochState,
Expand All @@ -24,24 +25,33 @@ use std::{cmp::max, sync::Arc};
/// which is going to simply pass the results between StateComputer and PayloadClient.
#[derive(Debug, Default, Clone)]
pub struct StateComputeResult {
ledger_update_output: LedgerUpdateOutput,
pub parent_state: Arc<StateDelta>,
pub result_state: Arc<StateDelta>,
pub ledger_update_output: LedgerUpdateOutput,
/// If set, this is the new epoch info that should be changed to if this is committed.
next_epoch_state: Option<EpochState>,
pub next_epoch_state: Option<EpochState>,
}

impl StateComputeResult {
pub fn new(
parent_state: Arc<StateDelta>,
result_state: Arc<StateDelta>,
ledger_update_output: LedgerUpdateOutput,
next_epoch_state: Option<EpochState>,
) -> Self {
Self {
parent_state,
result_state,
ledger_update_output,
next_epoch_state,
}
}

pub fn new_empty(transaction_accumulator: Arc<InMemoryTransactionAccumulator>) -> Self {
let result_state = Arc::new(StateDelta::new_empty());
Self {
parent_state: result_state.clone(),
result_state,
ledger_update_output: LedgerUpdateOutput::new_empty(transaction_accumulator),
next_epoch_state: None,
}
Expand All @@ -51,7 +61,10 @@ impl StateComputeResult {
/// this function is used in RandomComputeResultStateComputer to assert that the compute
/// function is really called.
pub fn new_dummy_with_root_hash(root_hash: HashValue) -> Self {
let result_state = Arc::new(StateDelta::new_empty());
Self {
parent_state: result_state.clone(),
result_state,
ledger_update_output: LedgerUpdateOutput::new_dummy_with_root_hash(root_hash),
next_epoch_state: None,
}
Expand All @@ -67,7 +80,10 @@ impl StateComputeResult {

#[cfg(any(test, feature = "fuzzing"))]
pub fn new_dummy_with_input_txns(txns: Vec<Transaction>) -> Self {
let result_state = Arc::new(StateDelta::new_empty());
Self {
parent_state: result_state.clone(),
result_state,
ledger_update_output: LedgerUpdateOutput::new_dummy_with_input_txns(txns),
next_epoch_state: None,
}
Expand Down Expand Up @@ -135,4 +151,12 @@ impl StateComputeResult {
pub fn is_reconfiguration_suffix(&self) -> bool {
self.has_reconfiguration() && self.compute_status_for_input_txns().is_empty()
}

pub fn make_chunk_commit_notification(&self) -> ChunkCommitNotification {
ChunkCommitNotification {
subscribable_events: self.ledger_update_output.subscribable_events.clone(),
committed_transactions: self.transactions_to_commit(),
reconfiguration_occurred: self.has_reconfiguration(),
}
}
}
28 changes: 11 additions & 17 deletions execution/executor/src/block_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@

use crate::{
components::{
apply_chunk_output::ApplyChunkOutput,
block_tree::{block_output::BlockOutput, BlockTree},
chunk_output::ChunkOutput,
apply_chunk_output::ApplyChunkOutput, block_tree::BlockTree, chunk_output::ChunkOutput,
partial_state_compute_result::PartialStateComputeResult,
},
logging::{LogEntry, LogSchema},
metrics::{
Expand Down Expand Up @@ -279,7 +278,7 @@ where
let _ = self.block_tree.add_block(
parent_block_id,
block_id,
BlockOutput::new(state, epoch_state),
PartialStateComputeResult::new(parent_output.result_state.clone(), state, epoch_state),
)?;
Ok(state_checkpoint_output)
}
Expand All @@ -306,15 +305,12 @@ where
// At this point of time two things must happen
// 1. The block tree must also have the current block id with or without the ledger update output.
// 2. We must have the ledger update output of the parent block.
let parent_output = parent_block.output.get_ledger_update();
let parent_output = parent_block.output.expect_ledger_update_output();
let parent_accumulator = parent_output.txn_accumulator();
let current_output = block_vec.pop().expect("Must exist").unwrap();
let block = block_vec.pop().expect("Must exist").unwrap();
parent_block.ensure_has_child(block_id)?;
if current_output.output.has_ledger_update() {
return Ok(current_output
.output
.get_ledger_update()
.as_state_compute_result(current_output.output.epoch_state().clone()));
if let Some(complete_result) = block.output.get_complete_result() {
return Ok(complete_result);
}

let output =
Expand All @@ -334,14 +330,12 @@ where
output
};

if !current_output.output.has_reconfiguration() {
if !block.output.has_reconfiguration() {
output.ensure_ends_with_state_checkpoint()?;
}

let state_compute_result =
output.as_state_compute_result(current_output.output.epoch_state().clone());
current_output.output.set_ledger_update(output);
Ok(state_compute_result)
block.output.set_ledger_update_output(output);
Ok(block.output.expect_complete_result())
}

fn pre_commit_block(
Expand All @@ -365,7 +359,7 @@ where
Err(anyhow::anyhow!("Injected error in pre_commit_block.").into())
});

let ledger_update = block.output.get_ledger_update();
let ledger_update = block.output.expect_ledger_update_output();
if !ledger_update.transactions_to_commit().is_empty() {
let _timer = SAVE_TRANSACTIONS.start_timer();
self.db.writer.pre_commit_ledger(
Expand Down
59 changes: 33 additions & 26 deletions execution/executor/src/chunk_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use crate::{
chunk_output::ChunkOutput,
chunk_result_verifier::{ChunkResultVerifier, ReplayChunkVerifier, StateSyncChunkVerifier},
executed_chunk::ExecutedChunk,
partial_state_compute_result::PartialStateComputeResult,
transaction_chunk::{ChunkToApply, ChunkToExecute, TransactionChunk},
},
logging::{LogEntry, LogSchema},
metrics::{APPLY_CHUNK, CHUNK_OTHER_TIMERS, COMMIT_CHUNK, CONCURRENCY_GAUGE, EXECUTE_CHUNK},
};
use anyhow::{anyhow, ensure, Result};
use aptos_drop_helper::DEFAULT_DROPPER;
use aptos_executor_types::{
ChunkCommitNotification, ChunkExecutorTrait, ParsedTransactionOutput, TransactionReplayer,
VerifyExecutionMode,
Expand Down Expand Up @@ -246,38 +246,37 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {

fn commit_chunk_impl(&self) -> Result<ExecutedChunk> {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["commit_chunk_impl__total"]);
let (persisted_state, chunk) = {
let chunk = {
let _timer =
CHUNK_OTHER_TIMERS.timer_with(&["commit_chunk_impl__next_chunk_to_commit"]);
self.commit_queue.lock().next_chunk_to_commit()?
};

if chunk.ledger_info.is_some() || !chunk.transactions_to_commit().is_empty() {
let output = chunk.output.expect_complete_result();
let num_txns = output.transactions_to_commit_len();
if chunk.ledger_info_opt.is_some() || num_txns != 0 {
let _timer = CHUNK_OTHER_TIMERS.timer_with(&["commit_chunk_impl__save_txns"]);
fail_point!("executor::commit_chunk", |_| {
Err(anyhow::anyhow!("Injected error in commit_chunk"))
});
let output = chunk.output.expect_complete_result();
self.db.writer.save_transactions(
chunk.transactions_to_commit(),
persisted_state.next_version(),
persisted_state.base_version,
chunk.ledger_info.as_ref(),
&output.ledger_update_output.to_commit,
output.ledger_update_output.first_version(),
output.parent_state.base_version,
chunk.ledger_info_opt.as_ref(),
false, // sync_commit
&chunk.result_state,
chunk
&output.result_state,
output
.ledger_update_output
.state_updates_until_last_checkpoint
.as_ref(),
Some(&chunk.ledger_update_output.sharded_state_cache),
Some(&output.ledger_update_output.sharded_state_cache),
)?;
}

DEFAULT_DROPPER.schedule_drop(persisted_state);

let _timer = CHUNK_OTHER_TIMERS.timer_with(&["commit_chunk_impl__dequeue_and_return"]);
self.commit_queue
.lock()
.dequeue_committed(chunk.result_state.clone())?;
self.commit_queue.lock().dequeue_committed()?;

Ok(chunk)
}
Expand Down Expand Up @@ -322,9 +321,12 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
self.commit_queue
.lock()
.enqueue_for_ledger_update(ChunkToUpdateLedger {
result_state,
output: PartialStateComputeResult::new(
parent_state.clone(),
result_state,
next_epoch_state,
),
state_checkpoint_output,
next_epoch_state,
chunk_verifier,
})?;

Expand All @@ -347,9 +349,8 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
self.commit_queue.lock().next_chunk_to_update_ledger()?
};
let ChunkToUpdateLedger {
result_state,
output,
state_checkpoint_output,
next_epoch_state,
chunk_verifier,
} = chunk;

Expand All @@ -368,16 +369,18 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {

let ledger_info_opt = chunk_verifier.maybe_select_chunk_ending_ledger_info(
&ledger_update_output,
next_epoch_state.as_ref(),
output.next_epoch_state.as_ref(),
)?;
output.set_ledger_update_output(ledger_update_output);

let executed_chunk = ExecutedChunk {
result_state,
ledger_info: ledger_info_opt,
next_epoch_state,
ledger_update_output,
output,
ledger_info_opt,
};
let num_txns = executed_chunk.transactions_to_commit().len();
let num_txns = executed_chunk
.output
.expect_complete_result()
.transactions_to_commit_len();

let _timer = CHUNK_OTHER_TIMERS.timer_with(&["chunk_update_ledger__save"]);
self.commit_queue
Expand All @@ -400,7 +403,10 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
let commit_notification = {
let _timer =
CHUNK_OTHER_TIMERS.timer_with(&["commit_chunk__into_chunk_commit_notification"]);
executed_chunk.into_chunk_commit_notification()
executed_chunk
.output
.expect_complete_result()
.make_chunk_commit_notification()
};

Ok(commit_notification)
Expand Down Expand Up @@ -505,6 +511,7 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
);

Ok(chunk
.output
.result_state
.current_version
.expect("Version must exist after commit."))
Expand Down
Loading

0 comments on commit d672fb9

Please sign in to comment.