Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ DHT
DigitalOcean
Drand
enums
Eth
Ethereum
F3
f3
Expand Down
1 change: 1 addition & 0 deletions docs/docs/users/reference/env_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ process.
| `FOREST_DISABLE_BAD_BLOCK_CACHE` | 1 or true | empty | 1 | Whether or not to disable bad block cache |
| `FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE` | positive integer | 268435456 | 536870912 | The default zstd frame cache max size in bytes |
| `FOREST_JWT_DISABLE_EXP_VALIDATION` | 1 or true | empty | 1 | Whether or not to disable JWT expiration validation |
| `FOREST_ETH_BLOCK_CACHE_SIZE` | positive integer | 500 | 1 | The size of Eth block cache |

### `FOREST_F3_SIDECAR_FFI_BUILD_OPT_OUT`

Expand Down
5 changes: 3 additions & 2 deletions src/beacon/drand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use async_trait::async_trait;
use backon::{ExponentialBuilder, Retryable};
use bls_signatures::Serialize as _;
use itertools::Itertools as _;
use nonzero_ext::nonzero;
use serde::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
use tracing::debug;
use url::Url;
Expand Down Expand Up @@ -243,7 +244,7 @@ impl DrandBeacon {
/// Construct a new `DrandBeacon`.
pub fn new(genesis_ts: u64, interval: u64, config: &DrandConfig<'_>) -> Self {
assert_ne!(genesis_ts, 0, "Genesis timestamp cannot be 0");
const CACHE_SIZE: usize = 1000;
const CACHE_SIZE: NonZeroUsize = nonzero!(1000usize);
Self {
servers: config.servers.clone(),
hash: config.chain_info.hash.to_string(),
Expand All @@ -256,7 +257,7 @@ impl DrandBeacon {
fil_gen_time: genesis_ts,
verified_beacons: SizeTrackingLruCache::new_with_metrics(
"verified_beacons".into(),
NonZeroUsize::new(CACHE_SIZE).expect("Infallible"),
CACHE_SIZE,
),
}
}
Expand Down
13 changes: 11 additions & 2 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore;
use itertools::Itertools;
use parking_lot::Mutex;
use nonzero_ext::nonzero;
use parking_lot::{Mutex, RwLock};
use serde::{Serialize, de::DeserializeOwned};
use std::{num::NonZeroUsize, sync::Arc};
use tokio::sync::broadcast::{self, Sender as Publisher};
Expand Down Expand Up @@ -64,6 +65,9 @@ pub struct ChainStore<DB> {
/// Heaviest tipset key provider
heaviest_tipset_key_provider: Arc<dyn HeaviestTipsetKeyProvider + Sync + Send>,

/// Heaviest tipset cache
heaviest_tipset_cache: Arc<RwLock<Option<Tipset>>>,

/// Used as a cache for tipset `lookbacks`.
chain_index: Arc<ChainIndex<Arc<DB>>>,

Expand Down Expand Up @@ -131,6 +135,7 @@ where
tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config.clone()),
db,
heaviest_tipset_key_provider,
heaviest_tipset_cache: Default::default(),
genesis_block_header,
validated_blocks,
eth_mappings,
Expand All @@ -145,6 +150,7 @@ where
pub fn set_heaviest_tipset(&self, ts: Tipset) -> Result<(), Error> {
self.heaviest_tipset_key_provider
.set_heaviest_tipset_key(ts.key())?;
*self.heaviest_tipset_cache.write() = Some(ts.clone());
if self.publisher.send(HeadChange::Apply(ts)).is_err() {
debug!("did not publish head change, no active receivers");
}
Expand Down Expand Up @@ -219,6 +225,9 @@ where

/// Returns the currently tracked heaviest tipset.
pub fn heaviest_tipset(&self) -> Tipset {
if let Some(ts) = &*self.heaviest_tipset_cache.read() {
return ts.clone();
}
let tsk = self
.heaviest_tipset_key_provider
.heaviest_tipset_key()
Expand Down Expand Up @@ -600,7 +609,7 @@ impl MsgsInTipsetCache {
/// Reads the intended cache size for this process from the environment or uses the default.
fn read_cache_size() -> NonZeroUsize {
// Arbitrary number, can be adjusted
const DEFAULT: NonZeroUsize = NonZeroUsize::new(100).expect("infallible");
const DEFAULT: NonZeroUsize = nonzero!(100usize);
std::env::var("FOREST_MESSAGES_IN_TIPSET_CACHE_SIZE")
.ok()
.and_then(|s| s.parse().ok())
Expand Down
3 changes: 2 additions & 1 deletion src/cli/subcommands/chain_cmd/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::num::NonZeroUsize;

use anyhow::Context as _;
use itertools::Itertools;
use nonzero_ext::nonzero;
use num::{BigInt, Zero as _};

use crate::{
Expand All @@ -25,7 +26,7 @@ pub struct ChainListCommand {
#[arg(long)]
epoch: Option<u64>,
/// Number of tipsets
#[arg(long, default_value_t = NonZeroUsize::new(30).unwrap())]
#[arg(long, default_value_t = nonzero!(30usize))]
count: NonZeroUsize,
#[arg(long)]
/// View gas statistics for the chain
Expand Down
5 changes: 2 additions & 3 deletions src/libp2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use libp2p::{
swarm::{DialError, SwarmEvent},
tcp, yamux,
};
use nonzero_ext::nonzero;
use tokio_stream::wrappers::IntervalStream;
use tracing::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -206,9 +207,7 @@ where
.with_behaviour(|_| behaviour)?
.with_swarm_config(|config| {
config
.with_notify_handler_buffer_size(
std::num::NonZeroUsize::new(20).expect("Not zero"),
)
.with_notify_handler_buffer_size(nonzero!(20usize))
.with_per_connection_event_buffer_size(64)
.with_idle_connection_timeout(Duration::from_secs(60 * 10))
})
Expand Down
149 changes: 92 additions & 57 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ use crate::shim::gas::GasOutputs;
use crate::shim::message::Message;
use crate::shim::trace::{CallReturn, ExecutionEvent};
use crate::shim::{clock::ChainEpoch, state_tree::StateTree};
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::db::BlockstoreExt as _;
use crate::utils::encoding::from_slice_with_fallback;
use crate::utils::get_size::{CidWrapper, big_int_heap_size_helper};
use crate::utils::misc::env::env_or_default;
use crate::utils::multihash::prelude::*;
use ahash::HashSet;
Expand All @@ -64,11 +66,14 @@ use enumflags2::BitFlags;
use filter::{ParsedFilter, ParsedFilterTipsets};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::{CBOR, DAG_CBOR, IPLD_RAW, RawBytes};
use get_size2::GetSize;
use ipld_core::ipld::Ipld;
use itertools::Itertools;
use nonzero_ext::nonzero;
use num::{BigInt, Zero as _};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::num::NonZeroUsize;
use std::ops::RangeInclusive;
use std::str::FromStr;
use std::sync::{Arc, LazyLock};
Expand Down Expand Up @@ -132,6 +137,12 @@ pub struct EthBigInt(
);
lotus_json_with_self!(EthBigInt);

impl GetSize for EthBigInt {
fn get_heap_size(&self) -> usize {
big_int_heap_size_helper(&self.0)
}
}

impl From<TokenAmount> for EthBigInt {
fn from(amount: TokenAmount) -> Self {
(&amount).into()
Expand All @@ -152,18 +163,28 @@ pub struct Nonce(
#[serde(with = "crate::lotus_json::hexify_bytes")]
pub ethereum_types::H64,
);

lotus_json_with_self!(Nonce);

impl GetSize for Nonce {
fn get_heap_size(&self) -> usize {
0
}
}

#[derive(PartialEq, Debug, Deserialize, Serialize, Default, Clone, JsonSchema)]
pub struct Bloom(
#[schemars(with = "String")]
#[serde(with = "crate::lotus_json::hexify_bytes")]
pub ethereum_types::Bloom,
);

lotus_json_with_self!(Bloom);

impl GetSize for Bloom {
fn get_heap_size(&self) -> usize {
0
}
}

impl Bloom {
pub fn accrue(&mut self, input: &[u8]) {
self.0.accrue(ethereum_types::BloomInput::Raw(input));
Expand All @@ -182,6 +203,7 @@ impl Bloom {
JsonSchema,
derive_more::From,
derive_more::Into,
GetSize,
)]
pub struct EthUint64(
#[schemars(with = "String")]
Expand Down Expand Up @@ -449,7 +471,7 @@ impl ExtBlockNumberOrHash {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, GetSize)]
#[serde(untagged)] // try a Vec<String>, then a Vec<Tx>
pub enum Transactions {
Hash(Vec<String>),
Expand Down Expand Up @@ -481,7 +503,7 @@ impl Default for Transactions {
}
}

#[derive(PartialEq, Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
#[derive(PartialEq, Debug, Clone, Default, Serialize, Deserialize, JsonSchema, GetSize)]
#[serde(rename_all = "camelCase")]
pub struct Block {
pub hash: EthHash,
Expand Down Expand Up @@ -526,7 +548,7 @@ impl Block {

lotus_json_with_self!(Block);

#[derive(PartialEq, Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
#[derive(PartialEq, Debug, Clone, Default, Serialize, Deserialize, JsonSchema, GetSize)]
#[serde(rename_all = "camelCase")]
pub struct ApiEthTx {
pub chain_id: EthUint64,
Expand Down Expand Up @@ -1392,64 +1414,77 @@ pub async fn block_from_filecoin_tipset<DB: Blockstore + Send + Sync + 'static>(
tipset: Tipset,
full_tx_info: bool,
) -> Result<Block> {
let parent_cid = tipset.parents().cid()?;

let block_number = EthUint64(tipset.epoch() as u64);

let tsk = tipset.key();
let block_cid = tsk.cid()?;
let block_hash: EthHash = block_cid.into();

let (state_root, msgs_and_receipts) = execute_tipset(&data, &tipset).await?;
static ETH_BLOCK_CACHE: LazyLock<SizeTrackingLruCache<CidWrapper, Block>> =
LazyLock::new(|| {
const DEFAULT_CACHE_SIZE: NonZeroUsize = nonzero!(500usize);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the expected memory footprint of a maxed-out eth block cache?

Copy link
Copy Markdown
Contributor Author

@hanabi1224 hanabi1224 Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the metrics, a rough estimation would be 265108 * 500 / 3 = 44184667 ~= 42MiB

# HELP cache_eth_block_14_size_bytes Size of LruCache eth_block_14 in bytes
# TYPE cache_eth_block_14_size_bytes gauge
# UNIT cache_eth_block_14_size_bytes bytes
cache_eth_block_14_size_bytes 265108
# HELP eth_block_14_len Length of LruCache eth_block_14
# TYPE eth_block_14_len gauge
eth_block_14_len 3
# HELP eth_block_14_cap Capacity of LruCache eth_block_14
# TYPE eth_block_14_cap gauge
eth_block_14_cap 500

let cache_size = std::env::var("FOREST_ETH_BLOCK_CACHE_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_CACHE_SIZE);
SizeTrackingLruCache::new_with_metrics("eth_block".into(), cache_size)
});

let block_cid = tipset.key().cid()?;
let mut block = if let Some(b) = ETH_BLOCK_CACHE.get_cloned(&block_cid.into()) {
b
} else {
let parent_cid = tipset.parents().cid()?;
let block_number = EthUint64(tipset.epoch() as u64);
let block_hash: EthHash = block_cid.into();

let (state_root, msgs_and_receipts) = execute_tipset(&data, &tipset).await?;

let state_tree = StateTree::new_from_root(data.store_owned(), &state_root)?;

let mut full_transactions = vec![];
let mut gas_used = 0;
for (i, (msg, receipt)) in msgs_and_receipts.iter().enumerate() {
let ti = EthUint64(i as u64);
gas_used += receipt.gas_used();
let smsg = match msg {
ChainMessage::Signed(msg) => msg.clone(),
ChainMessage::Unsigned(msg) => {
let sig = Signature::new_bls(vec![]);
SignedMessage::new_unchecked(msg.clone(), sig)
}
};

let state_tree = StateTree::new_from_root(data.store_owned(), &state_root)?;
let mut tx = new_eth_tx_from_signed_message(
&smsg,
&state_tree,
data.chain_config().eth_chain_id,
)?;
tx.block_hash = block_hash.clone();
tx.block_number = block_number.clone();
tx.transaction_index = ti;
full_transactions.push(tx);
}

let mut full_transactions = vec![];
let mut hash_transactions = vec![];
let mut gas_used = 0;
for (i, (msg, receipt)) in msgs_and_receipts.iter().enumerate() {
let ti = EthUint64(i as u64);
gas_used += receipt.gas_used();
let smsg = match msg {
ChainMessage::Signed(msg) => msg.clone(),
ChainMessage::Unsigned(msg) => {
let sig = Signature::new_bls(vec![]);
SignedMessage::new_unchecked(msg.clone(), sig)
}
let b = Block {
hash: block_hash,
number: block_number,
parent_hash: parent_cid.into(),
timestamp: EthUint64(tipset.block_headers().first().timestamp),
base_fee_per_gas: tipset
.block_headers()
.first()
.parent_base_fee
.clone()
.into(),
gas_used: EthUint64(gas_used),
transactions: Transactions::Full(full_transactions),
..Block::new(!msgs_and_receipts.is_empty(), tipset.len())
};
ETH_BLOCK_CACHE.push(block_cid.into(), b.clone());
b
};

let mut tx =
new_eth_tx_from_signed_message(&smsg, &state_tree, data.chain_config().eth_chain_id)?;
tx.block_hash = block_hash.clone();
tx.block_number = block_number.clone();
tx.transaction_index = ti;

if full_tx_info {
full_transactions.push(tx);
} else {
hash_transactions.push(tx.hash.to_string());
}
if !full_tx_info && let Transactions::Full(transactions) = &block.transactions {
block.transactions =
Transactions::Hash(transactions.iter().map(|tx| tx.hash.to_string()).collect())
}

Ok(Block {
hash: block_hash,
number: block_number,
parent_hash: parent_cid.into(),
timestamp: EthUint64(tipset.block_headers().first().timestamp),
base_fee_per_gas: tipset
.block_headers()
.first()
.parent_base_fee
.clone()
.into(),
gas_used: EthUint64(gas_used),
transactions: if full_tx_info {
Transactions::Full(full_transactions)
} else {
Transactions::Hash(hash_transactions)
},
..Block::new(!msgs_and_receipts.is_empty(), tipset.len())
})
Ok(block)
}

pub enum EthGetBlockByHash {}
Expand Down
Loading
Loading