Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6296d16
perf: add lru cache for load_executed_tipset to speed up hot queries
hanabi1224 Mar 18, 2026
5fdd7b1
resolve AI comments
hanabi1224 Mar 18, 2026
d0b182b
populate cache after state computation
hanabi1224 Mar 18, 2026
eec3715
Merge branch 'hm/cache-load_executed_tipset' of github.com:ChainSafe/…
hanabi1224 Mar 18, 2026
62eb46c
fix
hanabi1224 Mar 18, 2026
a1fb72e
Merge remote-tracking branch 'origin/main' into hm/cache-load_execute…
hanabi1224 Mar 18, 2026
a12b2b2
code comment
hanabi1224 Mar 18, 2026
0a6a7ff
Merge remote-tracking branch 'origin/main' into hm/cache-load_execute…
hanabi1224 Mar 19, 2026
4ec750c
fix comment
hanabi1224 Mar 19, 2026
17417c7
cleanup
hanabi1224 Mar 21, 2026
5ad8102
Merge remote-tracking branch 'origin/main' into hm/cache-load_execute…
hanabi1224 Mar 21, 2026
bc454b1
Merge branch 'main' into hm/cache-load_executed_tipset
hanabi1224 Mar 23, 2026
194ef9d
fix
hanabi1224 Mar 23, 2026
35ae695
Merge remote-tracking branch 'origin/main' into hm/cache-load_execute…
hanabi1224 Mar 23, 2026
d34d65e
lint
hanabi1224 Mar 23, 2026
1fe1825
fix
hanabi1224 Mar 23, 2026
842d1db
optimize
hanabi1224 Mar 23, 2026
d1aabf0
Merge branch 'main' into hm/cache-load_executed_tipset
hanabi1224 Mar 23, 2026
4004e0f
Merge remote-tracking branch 'origin/main' into hm/cache-load_execute…
hanabi1224 Mar 23, 2026
653b2f0
make ExecutedTipset cheap to clone
hanabi1224 Mar 23, 2026
f2b17de
Merge remote-tracking branch 'origin/main' into hm/cache-load_execute…
hanabi1224 Mar 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/chain/store/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl<DB: Blockstore> ChainIndex<DB> {
SizeTrackingLruCache::new_with_metrics(
"tipset_by_height".into(),
// 20480 * 900 = 18432000 which is sufficient for mainnet
20480.try_into().expect("infallible"),
nonzero!(20480_usize),
)
});

Expand Down
13 changes: 10 additions & 3 deletions src/interpreter/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ type ForestExecutorV4<DB> = DefaultExecutor_v4<ForestKernelV4<DB>>;

pub type ApplyResult = anyhow::Result<(ApplyRet, Duration)>;

pub type ApplyBlockResult =
anyhow::Result<(Vec<Receipt>, Vec<Vec<StampedEvent>>, Vec<Option<Cid>>), anyhow::Error>;
pub type ApplyBlockResult = anyhow::Result<(
Vec<Receipt>,
Vec<Option<Vec<StampedEvent>>>,
Vec<Option<Cid>>,
)>;

/// Comes from <https://github.com/filecoin-project/lotus/blob/v1.23.2/chain/vm/fvm.go#L473>
pub const IMPLICIT_MESSAGE_GAS_LIMIT: i64 = i64::MAX / 2;
Expand Down Expand Up @@ -387,7 +390,11 @@ where
receipts.push(msg_receipt.clone());

events_roots.push(ret.msg_receipt().events_root());
events.push(ret.events());
if ret.msg_receipt().events_root().is_some() {
events.push(Some(ret.events()));
} else {
events.push(None);
}

// Add processed Cid to set of processed messages
processed.insert(cid);
Expand Down
17 changes: 5 additions & 12 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,8 @@ impl Block {
let ExecutedTipset {
state_root,
executed_messages,
} = ctx
.state_manager
.load_executed_tipset_without_events(&tipset)
.await?;
..
} = ctx.state_manager.load_executed_tipset(&tipset).await?;
let has_transactions = !executed_messages.is_empty();
let state_tree = ctx.state_manager.get_state_tree(&state_root)?;

Expand Down Expand Up @@ -1419,10 +1417,8 @@ async fn get_block_receipts<DB: Blockstore + Send + Sync + 'static>(
let ExecutedTipset {
state_root,
executed_messages,
} = ctx
.state_manager
.load_executed_tipset_without_events(&ts_ref)
.await?;
..
} = ctx.state_manager.load_executed_tipset(&ts_ref).await?;

// Load the state tree
let state_tree = ctx.state_manager.get_state_tree(&state_root)?;
Expand Down Expand Up @@ -1933,10 +1929,7 @@ async fn eth_fee_history<B: Blockstore + Send + Sync + 'static>(
let base_fee = &ts.block_headers().first().parent_base_fee;
let ExecutedTipset {
executed_messages, ..
} = ctx
.state_manager
.load_executed_tipset_without_events(&ts)
.await?;
} = ctx.state_manager.load_executed_tipset(&ts).await?;
let mut tx_gas_rewards = Vec::with_capacity(executed_messages.len());
for ExecutedMessage {
message, receipt, ..
Expand Down
128 changes: 83 additions & 45 deletions src/state_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ use crate::shim::{
use crate::state_manager::cache::TipsetStateCache;
use crate::state_manager::chain_rand::draw_randomness;
use crate::state_migration::run_state_migrations;
use crate::utils::get_size::GetSize;
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::get_size::{GetSize, vec_heap_size_helper};
use ahash::{HashMap, HashMapExt};
use anyhow::{Context as _, bail, ensure};
use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _};
Expand All @@ -78,6 +79,7 @@ use rayon::prelude::ParallelBridge;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::ops::RangeInclusive;
use std::sync::LazyLock;
use std::time::Duration;
use std::{num::NonZeroUsize, sync::Arc};
use tokio::sync::{RwLock, broadcast::error::RecvError};
Expand All @@ -89,30 +91,55 @@ pub const EVENTS_AMT_BITWIDTH: u32 = 5;
/// Intermediary for retrieving state objects and updating actor states.
type CidPair = (Cid, Cid);

fn executed_tipset_cache() -> &'static SizeTrackingLruCache<TipsetKey, ExecutedTipset> {
// A tipset key should always map to a deterministic state output, so it's safe to cache the entire executed tipset with the same key.
static CACHE: LazyLock<SizeTrackingLruCache<TipsetKey, ExecutedTipset>> = LazyLock::new(|| {
Comment thread
hanabi1224 marked this conversation as resolved.
Outdated
// 100-200MiB on mainet with capacity 1024
SizeTrackingLruCache::new_with_metrics("executed_tipset".into(), nonzero!(1024usize))
});
&CACHE
}

/// Result of executing an individual chain message in a tipset.
///
/// Includes the executed message itself, the execution receipt, and
/// optional events emitted by the actor during execution.
#[derive(Debug, Clone)]
pub struct ExecutedMessage {
pub message: ChainMessage,
pub receipt: Receipt,
pub events: Option<Vec<StampedEvent>>,
}

impl GetSize for ExecutedMessage {
fn get_heap_size(&self) -> usize {
self.message.get_heap_size()
+ self.receipt.get_heap_size()
+ self
.events
.as_ref()
.map(vec_heap_size_helper)
.unwrap_or_default()
}
}

/// Aggregated execution result for a tipset.
///
/// `state_root` is the resulting state tree root after message execution
/// and `executed_messages` contains per-message execution details.
#[derive(Debug, Clone)]
pub struct ExecutedTipset {
pub state_root: Cid,
#[allow(dead_code)]
pub receipt_root: Cid,
pub executed_messages: Vec<ExecutedMessage>,
}

/// Options controlling how `load_executed_tipset` fetches extra execution data.
///
/// `include_events` toggles whether event logs are loaded from receipts.
pub struct LoadExecutedTipsetOptions {
pub include_events: bool,
impl GetSize for ExecutedTipset {
fn get_heap_size(&self) -> usize {
// state_root(Cid) has no heap allocation, so we only calculate the heap size of executed_messages
vec_heap_size_helper(&self.executed_messages)
}
}

#[derive(Debug, Default, Clone, GetSize)]
Expand Down Expand Up @@ -471,48 +498,29 @@ where
.await
}

/// Load an executed tipset, including message receipts and state root,
/// without loading event logs from receipts.
pub async fn load_executed_tipset_without_events(
self: &Arc<Self>,
ts: &Tipset,
) -> anyhow::Result<ExecutedTipset> {
let receipt_ts = self.chain_store().load_child_tipset(ts).ok();
self.load_executed_tipset_inner(
ts,
receipt_ts.as_ref(),
LoadExecutedTipsetOptions {
include_events: false,
},
)
.await
}

/// Load an executed tipset, including message receipts and state root,
/// with event logs loaded when available.
/// Load an executed tipset, including state root, message receipts and events with caching.
pub async fn load_executed_tipset(
self: &Arc<Self>,
ts: &Tipset,
) -> anyhow::Result<ExecutedTipset> {
let cache = executed_tipset_cache();
if let Some(cached) = cache.get_cloned(ts.key()) {
return Ok(cached);
}
let receipt_ts = self.chain_store().load_child_tipset(ts).ok();
self.load_executed_tipset_inner(
ts,
receipt_ts.as_ref(),
LoadExecutedTipsetOptions {
include_events: true,
},
)
.await
let result = self
.load_executed_tipset_inner(ts, receipt_ts.as_ref())
.await?;
cache.push(ts.key().clone(), result.clone());
Comment thread
hanabi1224 marked this conversation as resolved.
Outdated
Ok(result)
}

async fn load_executed_tipset_inner(
self: &Arc<Self>,
msg_ts: &Tipset,
// when `msg_ts` is the current head, `receipt_ts` is `None`
receipt_ts: Option<&Tipset>,
options: LoadExecutedTipsetOptions,
) -> anyhow::Result<ExecutedTipset> {
let LoadExecutedTipsetOptions { include_events } = options;
if let Some(receipt_ts) = receipt_ts {
anyhow::ensure!(
msg_ts.key() == receipt_ts.parents(),
Expand All @@ -521,19 +529,21 @@ where
}
let messages = self.chain_store().messages_for_tipset(msg_ts)?;
let mut recomputed = false;
let (state_root, receipts) = match receipt_ts.and_then(|ts| {
Receipt::get_receipts(self.cs.blockstore(), *ts.parent_message_receipts())
let (state_root, receipt_root, receipts) = match receipt_ts.and_then(|ts| {
let receipt_root = *ts.parent_message_receipts();
Receipt::get_receipts(self.cs.blockstore(), receipt_root)
.ok()
.map(|r| (*ts.parent_state(), r))
.map(|r| (*ts.parent_state(), receipt_root, r))
}) {
Some((state_root, receipts)) => (state_root, receipts),
Some((state_root, receipt_root, receipts)) => (state_root, receipt_root, receipts),
None => {
let state_output = self
.compute_tipset_state(msg_ts.clone(), NO_CALLBACK, VMTrace::NotTraced)
.await?;
recomputed = true;
(
state_output.state_root,
state_output.receipt_root,
Receipt::get_receipts(self.cs.blockstore(), state_output.receipt_root)?,
)
}
Expand All @@ -546,7 +556,7 @@ where
);
let mut executed_messages = Vec::with_capacity(messages.len());
for (message, receipt) in messages.into_iter().zip(receipts.into_iter()) {
let events = if include_events && let Some(events_root) = receipt.events_root() {
let events = if let Some(events_root) = receipt.events_root() {
Some(
match StampedEvent::get_events(self.cs.blockstore(), &events_root) {
Ok(events) => events,
Expand Down Expand Up @@ -574,6 +584,7 @@ where
}
Ok(ExecutedTipset {
state_root,
receipt_root,
executed_messages,
})
}
Expand Down Expand Up @@ -2090,29 +2101,56 @@ where
vm.apply_block_messages(&block_messages, epoch, callback)?;

// step 5: construct receipt root from receipts
let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts)?;
let receipt_root = Amtv0::new_from_iter(chain_index.db(), receipts.iter())?;

// step 6: store events AMTs in the blockstore
for (msg_events, events_root) in events.iter().zip(events_roots.iter()) {
if let Some(event_root) = events_root {
for (events, events_root) in events.iter().zip(events_roots.iter()) {
if let Some(events) = events {
let event_root =
events_root.context("events root should be present when events present")?;
// Store the events AMT - the root CID should match the one computed by FVM
let derived_event_root = Amt::new_from_iter_with_bit_width(
chain_index.db(),
EVENTS_AMT_BITWIDTH,
msg_events.iter(),
events.iter(),
)
.map_err(|e| Error::Other(format!("failed to store events AMT: {e}")))?;

// Verify the stored root matches the FVM-computed root
ensure!(
derived_event_root.eq(event_root),
derived_event_root == event_root,
"Events AMT root mismatch: derived={derived_event_root}, actual={event_root}."
);
}
}

let state_root = vm.flush()?;

// Update executed tipset cache
let messages: Vec<ChainMessage> = block_messages
.into_iter()
.flat_map(|bm| bm.messages)
.collect_vec();
anyhow::ensure!(
messages.len() == receipts.len() && messages.len() == events.len(),
"length of messages, receipts, and events should match",
);
let executed_tipset = ExecutedTipset {
state_root,
receipt_root,
executed_messages: messages
.into_iter()
.zip(receipts)
.zip(events)
.map(|((message, receipt), events)| ExecutedMessage {
message,
receipt,
events,
})
.collect(),
};
executed_tipset_cache().push(tipset.key().clone(), executed_tipset);

Ok(StateOutput {
state_root,
receipt_root,
Expand Down
4 changes: 4 additions & 0 deletions src/utils/get_size/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ macro_rules! impl_vec_alike_heap_size_helper {
};
}

pub fn vec_heap_size_helper<T: GetSize>(v: &Vec<T>) -> usize {
impl_vec_alike_heap_size_helper!(v, T)
}

pub fn vec_heap_size_with_fn_helper<T>(v: &Vec<T>, get_heap_size: impl Fn(&T) -> usize) -> usize {
impl_vec_alike_heap_size_with_fn_helper!(v, T, std::mem::size_of::<T>, get_heap_size)
}
Expand Down
Loading