From a7587076941d5493611c39cf8fbc54c17d7b4e49 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 4 Dec 2025 20:38:52 +0800 Subject: [PATCH] feat: add eth_block_cache to improve performance of a few Eth RPC method --- docs/dictionary.txt | 1 + docs/docs/users/reference/env_variables.md | 1 + src/beacon/drand.rs | 5 +- src/chain/store/chain_store.rs | 13 +- src/cli/subcommands/chain_cmd/list.rs | 3 +- src/libp2p/service.rs | 5 +- src/rpc/methods/eth.rs | 149 +++++++++++------- src/rpc/methods/eth/types.rs | 15 +- src/shim/message.rs | 6 - src/state_migration/common/mod.rs | 5 +- src/state_migration/common/state_migration.rs | 14 +- src/tool/subcommands/state_compute_cmd.rs | 3 +- 12 files changed, 137 insertions(+), 83 deletions(-) diff --git a/docs/dictionary.txt b/docs/dictionary.txt index b8386f2902ea..6410f70e2363 100644 --- a/docs/dictionary.txt +++ b/docs/dictionary.txt @@ -28,6 +28,7 @@ DHT DigitalOcean Drand enums +Eth Ethereum F3 f3 diff --git a/docs/docs/users/reference/env_variables.md b/docs/docs/users/reference/env_variables.md index 3475862d30f8..4780572843f2 100644 --- a/docs/docs/users/reference/env_variables.md +++ b/docs/docs/users/reference/env_variables.md @@ -51,6 +51,7 @@ process. | `FOREST_DISABLE_BAD_BLOCK_CACHE` | 1 or true | empty | 1 | Whether or not to disable bad block cache | | `FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE` | positive integer | 268435456 | 536870912 | The default zstd frame cache max size in bytes | | `FOREST_JWT_DISABLE_EXP_VALIDATION` | 1 or true | empty | 1 | Whether or not to disable JWT expiration validation | +| `FOREST_ETH_BLOCK_CACHE_SIZE` | positive integer | 500 | 1 | The size of Eth block cache | ### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT` diff --git a/src/beacon/drand.rs b/src/beacon/drand.rs index 26f8f561dcaf..e38a226bdaa9 100644 --- a/src/beacon/drand.rs +++ b/src/beacon/drand.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use backon::{ExponentialBuilder, Retryable}; use bls_signatures::Serialize as _; use itertools::Itertools as _; +use nonzero_ext::nonzero; use serde::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize}; use tracing::debug; use url::Url; @@ -243,7 +244,7 @@ impl DrandBeacon { /// Construct a new `DrandBeacon`. pub fn new(genesis_ts: u64, interval: u64, config: &DrandConfig<'_>) -> Self { assert_ne!(genesis_ts, 0, "Genesis timestamp cannot be 0"); - const CACHE_SIZE: usize = 1000; + const CACHE_SIZE: NonZeroUsize = nonzero!(1000usize); Self { servers: config.servers.clone(), hash: config.chain_info.hash.to_string(), @@ -256,7 +257,7 @@ impl DrandBeacon { fil_gen_time: genesis_ts, verified_beacons: SizeTrackingLruCache::new_with_metrics( "verified_beacons".into(), - NonZeroUsize::new(CACHE_SIZE).expect("Infallible"), + CACHE_SIZE, ), } } diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index 1c178d1809ce..8f9dd4d86da8 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -31,7 +31,8 @@ use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::CborStore; use itertools::Itertools; -use parking_lot::Mutex; +use nonzero_ext::nonzero; +use parking_lot::{Mutex, RwLock}; use serde::{Serialize, de::DeserializeOwned}; use std::{num::NonZeroUsize, sync::Arc}; use tokio::sync::broadcast::{self, Sender as Publisher}; @@ -64,6 +65,9 @@ pub struct ChainStore { /// Heaviest tipset key provider heaviest_tipset_key_provider: Arc, + /// Heaviest tipset cache + heaviest_tipset_cache: Arc>>, + /// Used as a cache for tipset `lookbacks`. chain_index: Arc>>, @@ -131,6 +135,7 @@ where tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config.clone()), db, heaviest_tipset_key_provider, + heaviest_tipset_cache: Default::default(), genesis_block_header, validated_blocks, eth_mappings, @@ -145,6 +150,7 @@ where pub fn set_heaviest_tipset(&self, ts: Tipset) -> Result<(), Error> { self.heaviest_tipset_key_provider .set_heaviest_tipset_key(ts.key())?; + *self.heaviest_tipset_cache.write() = Some(ts.clone()); if self.publisher.send(HeadChange::Apply(ts)).is_err() { debug!("did not publish head change, no active receivers"); } @@ -219,6 +225,9 @@ where /// Returns the currently tracked heaviest tipset. pub fn heaviest_tipset(&self) -> Tipset { + if let Some(ts) = &*self.heaviest_tipset_cache.read() { + return ts.clone(); + } let tsk = self .heaviest_tipset_key_provider .heaviest_tipset_key() @@ -600,7 +609,7 @@ impl MsgsInTipsetCache { /// Reads the intended cache size for this process from the environment or uses the default. fn read_cache_size() -> NonZeroUsize { // Arbitrary number, can be adjusted - const DEFAULT: NonZeroUsize = NonZeroUsize::new(100).expect("infallible"); + const DEFAULT: NonZeroUsize = nonzero!(100usize); std::env::var("FOREST_MESSAGES_IN_TIPSET_CACHE_SIZE") .ok() .and_then(|s| s.parse().ok()) diff --git a/src/cli/subcommands/chain_cmd/list.rs b/src/cli/subcommands/chain_cmd/list.rs index 5401ba72b35c..bf9e35544a9e 100644 --- a/src/cli/subcommands/chain_cmd/list.rs +++ b/src/cli/subcommands/chain_cmd/list.rs @@ -5,6 +5,7 @@ use std::num::NonZeroUsize; use anyhow::Context as _; use itertools::Itertools; +use nonzero_ext::nonzero; use num::{BigInt, Zero as _}; use crate::{ @@ -25,7 +26,7 @@ pub struct ChainListCommand { #[arg(long)] epoch: Option, /// Number of tipsets - #[arg(long, default_value_t = NonZeroUsize::new(30).unwrap())] + #[arg(long, default_value_t = nonzero!(30usize))] count: NonZeroUsize, #[arg(long)] /// View gas statistics for the chain diff --git a/src/libp2p/service.rs b/src/libp2p/service.rs index b42d9ed36a2a..7a09f8a6709f 100644 --- a/src/libp2p/service.rs +++ b/src/libp2p/service.rs @@ -35,6 +35,7 @@ use libp2p::{ swarm::{DialError, SwarmEvent}, tcp, yamux, }; +use nonzero_ext::nonzero; use tokio_stream::wrappers::IntervalStream; use tracing::{debug, error, info, trace, warn}; @@ -206,9 +207,7 @@ where .with_behaviour(|_| behaviour)? .with_swarm_config(|config| { config - .with_notify_handler_buffer_size( - std::num::NonZeroUsize::new(20).expect("Not zero"), - ) + .with_notify_handler_buffer_size(nonzero!(20usize)) .with_per_connection_event_buffer_size(64) .with_idle_connection_timeout(Duration::from_secs(60 * 10)) }) diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index ff2d5d556e32..650426c05ac4 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -53,8 +53,10 @@ use crate::shim::gas::GasOutputs; use crate::shim::message::Message; use crate::shim::trace::{CallReturn, ExecutionEvent}; use crate::shim::{clock::ChainEpoch, state_tree::StateTree}; +use crate::utils::cache::SizeTrackingLruCache; use crate::utils::db::BlockstoreExt as _; use crate::utils::encoding::from_slice_with_fallback; +use crate::utils::get_size::{CidWrapper, big_int_heap_size_helper}; use crate::utils::misc::env::env_or_default; use crate::utils::multihash::prelude::*; use ahash::HashSet; @@ -64,11 +66,14 @@ use enumflags2::BitFlags; use filter::{ParsedFilter, ParsedFilterTipsets}; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::{CBOR, DAG_CBOR, IPLD_RAW, RawBytes}; +use get_size2::GetSize; use ipld_core::ipld::Ipld; use itertools::Itertools; +use nonzero_ext::nonzero; use num::{BigInt, Zero as _}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use std::num::NonZeroUsize; use std::ops::RangeInclusive; use std::str::FromStr; use std::sync::{Arc, LazyLock}; @@ -132,6 +137,12 @@ pub struct EthBigInt( ); lotus_json_with_self!(EthBigInt); +impl GetSize for EthBigInt { + fn get_heap_size(&self) -> usize { + big_int_heap_size_helper(&self.0) + } +} + impl From for EthBigInt { fn from(amount: TokenAmount) -> Self { (&amount).into() @@ -152,18 +163,28 @@ pub struct Nonce( #[serde(with = "crate::lotus_json::hexify_bytes")] pub ethereum_types::H64, ); - lotus_json_with_self!(Nonce); +impl GetSize for Nonce { + fn get_heap_size(&self) -> usize { + 0 + } +} + #[derive(PartialEq, Debug, Deserialize, Serialize, Default, Clone, JsonSchema)] pub struct Bloom( #[schemars(with = "String")] #[serde(with = "crate::lotus_json::hexify_bytes")] pub ethereum_types::Bloom, ); - lotus_json_with_self!(Bloom); +impl GetSize for Bloom { + fn get_heap_size(&self) -> usize { + 0 + } +} + impl Bloom { pub fn accrue(&mut self, input: &[u8]) { self.0.accrue(ethereum_types::BloomInput::Raw(input)); @@ -182,6 +203,7 @@ impl Bloom { JsonSchema, derive_more::From, derive_more::Into, + GetSize, )] pub struct EthUint64( #[schemars(with = "String")] @@ -449,7 +471,7 @@ impl ExtBlockNumberOrHash { } } -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, GetSize)] #[serde(untagged)] // try a Vec, then a Vec pub enum Transactions { Hash(Vec), @@ -481,7 +503,7 @@ impl Default for Transactions { } } -#[derive(PartialEq, Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] +#[derive(PartialEq, Debug, Clone, Default, Serialize, Deserialize, JsonSchema, GetSize)] #[serde(rename_all = "camelCase")] pub struct Block { pub hash: EthHash, @@ -526,7 +548,7 @@ impl Block { lotus_json_with_self!(Block); -#[derive(PartialEq, Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] +#[derive(PartialEq, Debug, Clone, Default, Serialize, Deserialize, JsonSchema, GetSize)] #[serde(rename_all = "camelCase")] pub struct ApiEthTx { pub chain_id: EthUint64, @@ -1392,64 +1414,77 @@ pub async fn block_from_filecoin_tipset( tipset: Tipset, full_tx_info: bool, ) -> Result { - let parent_cid = tipset.parents().cid()?; - - let block_number = EthUint64(tipset.epoch() as u64); - - let tsk = tipset.key(); - let block_cid = tsk.cid()?; - let block_hash: EthHash = block_cid.into(); - - let (state_root, msgs_and_receipts) = execute_tipset(&data, &tipset).await?; + static ETH_BLOCK_CACHE: LazyLock> = + LazyLock::new(|| { + const DEFAULT_CACHE_SIZE: NonZeroUsize = nonzero!(500usize); + let cache_size = std::env::var("FOREST_ETH_BLOCK_CACHE_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_CACHE_SIZE); + SizeTrackingLruCache::new_with_metrics("eth_block".into(), cache_size) + }); + + let block_cid = tipset.key().cid()?; + let mut block = if let Some(b) = ETH_BLOCK_CACHE.get_cloned(&block_cid.into()) { + b + } else { + let parent_cid = tipset.parents().cid()?; + let block_number = EthUint64(tipset.epoch() as u64); + let block_hash: EthHash = block_cid.into(); + + let (state_root, msgs_and_receipts) = execute_tipset(&data, &tipset).await?; + + let state_tree = StateTree::new_from_root(data.store_owned(), &state_root)?; + + let mut full_transactions = vec![]; + let mut gas_used = 0; + for (i, (msg, receipt)) in msgs_and_receipts.iter().enumerate() { + let ti = EthUint64(i as u64); + gas_used += receipt.gas_used(); + let smsg = match msg { + ChainMessage::Signed(msg) => msg.clone(), + ChainMessage::Unsigned(msg) => { + let sig = Signature::new_bls(vec![]); + SignedMessage::new_unchecked(msg.clone(), sig) + } + }; - let state_tree = StateTree::new_from_root(data.store_owned(), &state_root)?; + let mut tx = new_eth_tx_from_signed_message( + &smsg, + &state_tree, + data.chain_config().eth_chain_id, + )?; + tx.block_hash = block_hash.clone(); + tx.block_number = block_number.clone(); + tx.transaction_index = ti; + full_transactions.push(tx); + } - let mut full_transactions = vec![]; - let mut hash_transactions = vec![]; - let mut gas_used = 0; - for (i, (msg, receipt)) in msgs_and_receipts.iter().enumerate() { - let ti = EthUint64(i as u64); - gas_used += receipt.gas_used(); - let smsg = match msg { - ChainMessage::Signed(msg) => msg.clone(), - ChainMessage::Unsigned(msg) => { - let sig = Signature::new_bls(vec![]); - SignedMessage::new_unchecked(msg.clone(), sig) - } + let b = Block { + hash: block_hash, + number: block_number, + parent_hash: parent_cid.into(), + timestamp: EthUint64(tipset.block_headers().first().timestamp), + base_fee_per_gas: tipset + .block_headers() + .first() + .parent_base_fee + .clone() + .into(), + gas_used: EthUint64(gas_used), + transactions: Transactions::Full(full_transactions), + ..Block::new(!msgs_and_receipts.is_empty(), tipset.len()) }; + ETH_BLOCK_CACHE.push(block_cid.into(), b.clone()); + b + }; - let mut tx = - new_eth_tx_from_signed_message(&smsg, &state_tree, data.chain_config().eth_chain_id)?; - tx.block_hash = block_hash.clone(); - tx.block_number = block_number.clone(); - tx.transaction_index = ti; - - if full_tx_info { - full_transactions.push(tx); - } else { - hash_transactions.push(tx.hash.to_string()); - } + if !full_tx_info && let Transactions::Full(transactions) = &block.transactions { + block.transactions = + Transactions::Hash(transactions.iter().map(|tx| tx.hash.to_string()).collect()) } - Ok(Block { - hash: block_hash, - number: block_number, - parent_hash: parent_cid.into(), - timestamp: EthUint64(tipset.block_headers().first().timestamp), - base_fee_per_gas: tipset - .block_headers() - .first() - .parent_base_fee - .clone() - .into(), - gas_used: EthUint64(gas_used), - transactions: if full_tx_info { - Transactions::Full(full_transactions) - } else { - Transactions::Hash(hash_transactions) - }, - ..Block::new(!msgs_and_receipts.is_empty(), tipset.len()) - }) + Ok(block) } pub enum EthGetBlockByHash {} diff --git a/src/rpc/methods/eth/types.rs b/src/rpc/methods/eth/types.rs index 58debb704027..4f918c15fc23 100644 --- a/src/rpc/methods/eth/types.rs +++ b/src/rpc/methods/eth/types.rs @@ -5,6 +5,7 @@ use super::*; use crate::blocks::CachingBlockHeader; use crate::rpc::eth::pubsub_trait::LogFilter; use anyhow::ensure; +use get_size2::GetSize; use ipld_core::serde::SerdeError; use jsonrpsee::core::traits::IdProvider; use jsonrpsee::types::SubscriptionId; @@ -28,6 +29,7 @@ pub const METHOD_GET_STORAGE_AT: u64 = 5; JsonSchema, derive_more::From, derive_more::Into, + GetSize, )] pub struct EthBytes( #[schemars(with = "String")] @@ -117,6 +119,12 @@ pub struct EthAddress( ); lotus_json_with_self!(EthAddress); +impl GetSize for EthAddress { + fn get_heap_size(&self) -> usize { + 0 + } +} + impl EthAddress { pub fn to_filecoin_address(&self) -> anyhow::Result { if self.is_masked_id() { @@ -393,9 +401,14 @@ impl TryFrom for Message { )] #[displaydoc("{0:#x}")] pub struct EthHash(#[schemars(with = "String")] pub ethereum_types::H256); - lotus_json_with_self!(EthHash); +impl GetSize for EthHash { + fn get_heap_size(&self) -> usize { + 0 + } +} + #[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Hash, Clone)] pub struct FilterID(EthHash); diff --git a/src/shim/message.rs b/src/shim/message.rs index 47e3f97518d4..018990ceffa8 100644 --- a/src/shim/message.rs +++ b/src/shim/message.rs @@ -41,12 +41,6 @@ fn raw_bytes_heap_size(b: &RawBytes) -> usize { b.bytes().get_heap_size() } -// impl GetSize for Message{ -// fn get_heap_size(&self) -> usize { - -// } -// } - impl From for Message { fn from(other: Message_v4) -> Self { Self { diff --git a/src/state_migration/common/mod.rs b/src/state_migration/common/mod.rs index bc4c2de21224..9629160dd07e 100644 --- a/src/state_migration/common/mod.rs +++ b/src/state_migration/common/mod.rs @@ -130,15 +130,14 @@ pub(in crate::state_migration) struct TypeMigrator; #[cfg(test)] mod tests { - use std::num::NonZeroUsize; - use super::MigrationCache; use crate::utils::cid::CidCborExt; use cid::Cid; + use nonzero_ext::nonzero; #[test] fn test_migration_cache() { - let cache = MigrationCache::new(NonZeroUsize::new(10).unwrap()); + let cache = MigrationCache::new(nonzero!(10usize)); let cid = Cid::from_cbor_blake2b256(&42).unwrap(); cache.push("Cthulhu".to_owned(), cid); assert_eq!(cache.get("Cthulhu"), Some(cid)); diff --git a/src/state_migration/common/state_migration.rs b/src/state_migration/common/state_migration.rs index d8fd4bcad21c..106d510dd3e5 100644 --- a/src/state_migration/common/state_migration.rs +++ b/src/state_migration/common/state_migration.rs @@ -1,21 +1,21 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::num::NonZeroUsize; use std::sync::Arc; use std::sync::atomic::AtomicU64; +use super::{ + MigrationCache, Migrator, PostMigrationCheckArc, PostMigratorArc, + migration_job::{MigrationJob, MigrationJobOutput}, + verifier::MigrationVerifier, +}; use crate::cid_collections::CidHashMap; use crate::shim::{clock::ChainEpoch, state_tree::StateTree}; -use crate::state_migration::common::MigrationCache; use cid::Cid; use fvm_ipld_blockstore::Blockstore; +use nonzero_ext::nonzero; use parking_lot::Mutex; -use super::PostMigrationCheckArc; -use super::{Migrator, PostMigratorArc, verifier::MigrationVerifier}; -use crate::state_migration::common::migration_job::{MigrationJob, MigrationJobOutput}; - /// Handles several cases of migration: /// - nil migrations, essentially mapping one Actor to another, /// - migrations where state upgrade is required, @@ -79,7 +79,7 @@ impl StateMigration { verifier.verify_migration(store, &self.migrations, &actors_in)?; } - let cache = MigrationCache::new(NonZeroUsize::new(10_000).expect("infallible")); + let cache = MigrationCache::new(nonzero!(10_000usize)); let num_threads = std::env::var("FOREST_STATE_MIGRATION_THREADS") .ok() .and_then(|s| s.parse().ok()) diff --git a/src/tool/subcommands/state_compute_cmd.rs b/src/tool/subcommands/state_compute_cmd.rs index 16d2ea220cc8..17db9f1ce704 100644 --- a/src/tool/subcommands/state_compute_cmd.rs +++ b/src/tool/subcommands/state_compute_cmd.rs @@ -15,6 +15,7 @@ use crate::{ shim::clock::ChainEpoch, state_manager::{StateManager, StateOutput}, }; +use nonzero_ext::nonzero; use std::{num::NonZeroUsize, path::PathBuf, sync::Arc, time::Instant}; /// Interact with Filecoin chain state @@ -116,7 +117,7 @@ pub struct ReplayComputeCommand { #[arg(long, required = true)] chain: NetworkChain, /// Number of times to repeat the state computation - #[arg(short, long, default_value_t = NonZeroUsize::new(1).unwrap())] + #[arg(short, long, default_value_t = nonzero!(1usize))] n: NonZeroUsize, }