diff --git a/Cargo.toml b/Cargo.toml index ce686a6dfb37..6515960b4583 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ libp2p = { version = "0.56", default-features = false } libp2p-swarm-test = { version = "0.6", default-features = false, features = ["tokio"] } multihash-codetable = { version = "0.1", features = ["blake2b", "blake2s", "blake3", "sha2", "sha3", "strobe"] } rust2go = "0.4" -serde = { version = "1", default-features = false, features = ["derive"] } +serde = { version = "1", default-features = false, features = ["derive", "rc"] } serde_yaml = "0.9" tokio = "1" diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index 4869130b3627..1aa5f057d45a 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -7,14 +7,11 @@ use super::{ tipset_tracker::TipsetTracker, }; use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; -use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage}; +use crate::message::{ChainMessage, SignedMessage}; use crate::networks::{ChainConfig, Height}; use crate::rpc::eth::{eth_tx_from_signed_eth_message, types::EthHash}; use crate::shim::clock::ChainEpoch; -use crate::shim::{ - address::Address, econ::TokenAmount, executor::Receipt, message::Message, - state_tree::StateTree, version::NetworkVersion, -}; +use crate::shim::{executor::Receipt, message::Message, version::NetworkVersion}; use crate::state_manager::StateOutput; use crate::utils::db::{BlockstoreExt, CborStoreExt}; use crate::{ @@ -30,7 +27,7 @@ use crate::{ interpreter::{BlockMessages, VMTrace}, rpc::chain::PathChanges, }; -use ahash::{HashMap, HashMapExt, HashSet}; +use ahash::{HashMap, HashSet}; use anyhow::Context as _; use cid::Cid; use fil_actors_shared::fvm_ipld_amt::Amtv0 as Amt; @@ -89,6 +86,9 @@ pub struct ChainStore { /// Needed by the Ethereum mapping. chain_config: Arc, + + /// Cache for messages in tipsets, keyed by tipset key. + messages_in_tipset_cache: MessagesInTipsetCache, } impl BitswapStoreRead for ChainStore @@ -151,11 +151,17 @@ where validated_blocks, eth_mappings, chain_config, + messages_in_tipset_cache: Default::default(), }; Ok(cs) } + /// Cache for messages in tipsets, keyed by tipset key. + pub fn messages_in_tipset_cache(&self) -> &MessagesInTipsetCache { + &self.messages_in_tipset_cache + } + /// Sets heaviest tipset pub fn set_heaviest_tipset(&self, head: Tipset) -> Result<(), Error> { head.key().save(self.blockstore())?; @@ -338,9 +344,13 @@ where /// Retrieves ordered valid messages from a `Tipset`. This will only include /// messages that will be passed through the VM. - pub fn messages_for_tipset(&self, ts: &Tipset) -> Result, Error> { - let bmsgs = BlockMessages::for_tipset(&self.db, ts)?; - Ok(bmsgs.into_iter().flat_map(|bm| bm.messages).collect()) + pub fn messages_for_tipset(&self, ts: &Tipset) -> Result>, Error> { + Ok(self + .messages_in_tipset_cache() + .get_or_insert_with(ts.key(), || { + let bmsgs = BlockMessages::for_tipset(&self.db, ts)?; + Ok(bmsgs.into_iter().flat_map(|bm| bm.messages).collect_vec()) + })?) } /// Gets look-back tipset (and state-root of that tipset) for block @@ -596,43 +606,47 @@ where /// on performance measurements, is resource-intensive and can be a bottleneck for certain /// use-cases. This cache is intended to be used with a complementary function; /// [`messages_for_tipset_with_cache`]. -pub struct MsgsInTipsetCache { - cache: SizeTrackingLruCache>, +pub struct MessagesInTipsetCache { + cache: SizeTrackingLruCache>>, } -impl MsgsInTipsetCache { +impl MessagesInTipsetCache { pub fn new(capacity: NonZeroUsize) -> Self { Self { cache: SizeTrackingLruCache::new_with_metrics("msg_in_tipset".into(), capacity), } } - pub fn get(&self, key: &TipsetKey) -> Option> { + pub fn get(&self, key: &TipsetKey) -> Option>> { self.cache.get_cloned(key) } - pub fn get_or_insert_with(&self, key: &TipsetKey, f: F) -> anyhow::Result> + pub fn get_or_insert_with( + &self, + key: &TipsetKey, + f: F, + ) -> anyhow::Result>> where F: FnOnce() -> anyhow::Result>, { - if self.cache.contains(key) { - Ok(self.get(key).expect("cache entry disappeared!")) + if let Some(cached) = self.get(key) { + Ok(cached) } else { - let v = f()?; - self.insert(key.clone(), v.clone()); - Ok(v) + Ok(self.insert(key.clone(), f()?)) } } - pub fn insert(&self, key: TipsetKey, mut value: Vec) { + pub fn insert(&self, key: TipsetKey, mut value: Vec) -> Arc> { value.shrink_to_fit(); - self.cache.push(key, value); + let value = Arc::new(value); + self.cache.push(key, value.clone()); + value } /// Reads the intended cache size for this process from the environment or uses the default. fn read_cache_size() -> NonZeroUsize { // Arbitrary number, can be adjusted - const DEFAULT: NonZeroUsize = nonzero!(100usize); + const DEFAULT: NonZeroUsize = nonzero!(1024usize); std::env::var("FOREST_MESSAGES_IN_TIPSET_CACHE_SIZE") .ok() .and_then(|s| s.parse().ok()) @@ -640,89 +654,12 @@ impl MsgsInTipsetCache { } } -impl Default for MsgsInTipsetCache { +impl Default for MessagesInTipsetCache { fn default() -> Self { Self::new(Self::read_cache_size()) } } -/// Same as [`messages_for_tipset`] but uses a cache to store messages for each tipset. -pub fn messages_for_tipset_with_cache( - db: &Arc, - ts: &Tipset, - cache: &MsgsInTipsetCache, -) -> Result, Error> -where - DB: Blockstore, -{ - let key = ts.key(); - cache - .get_or_insert_with(key, || { - messages_for_tipset(db, ts).context("failed to get messages for tipset") - }) - .map_err(Into::into) -} - -/// Given a tipset this function will return all unique messages in that tipset. -/// Note: This function is resource-intensive and can be a bottleneck for certain use-cases. -/// Consider using [`messages_for_tipset_with_cache`] for better performance. -pub fn messages_for_tipset(db: &Arc, ts: &Tipset) -> Result, Error> -where - DB: Blockstore, -{ - let mut applied: HashMap = HashMap::new(); - let mut balances: HashMap = HashMap::new(); - let state = StateTree::new_from_tipset(Arc::clone(db), ts)?; - - // message to get all messages for block_header into a single iterator - let mut get_message_for_block_header = - |b: &CachingBlockHeader| -> Result, Error> { - let (unsigned, signed) = block_messages(db, b)?; - let mut messages = Vec::with_capacity(unsigned.len() + signed.len()); - let unsigned_box = unsigned.into_iter().map(ChainMessage::Unsigned); - let signed_box = signed.into_iter().map(ChainMessage::Signed); - - for message in unsigned_box.chain(signed_box) { - let from_address = &message.from(); - if !applied.contains_key(from_address) { - let actor_state = state - .get_actor(from_address)? - .ok_or_else(|| Error::Other("Actor state not found".to_string()))?; - applied.insert(*from_address, actor_state.sequence); - balances.insert(*from_address, actor_state.balance.clone().into()); - } - if let Some(seq) = applied.get_mut(from_address) { - if *seq != message.sequence() { - continue; - } - *seq += 1; - } else { - continue; - } - if let Some(bal) = balances.get_mut(from_address) { - if *bal < message.required_funds() { - continue; - } - *bal -= message.required_funds(); - } else { - continue; - } - - messages.push(message) - } - - Ok(messages) - }; - - ts.block_headers() - .iter() - .try_fold(Vec::new(), |mut message_vec, b| { - let mut messages = get_message_for_block_header(b)?; - message_vec.append(&mut messages); - Ok(message_vec) - }) -} - /// Returns messages from key-value store based on a slice of [`Cid`]s. pub fn messages_from_cids(db: &DB, keys: &[Cid]) -> Result, Error> where @@ -803,16 +740,16 @@ mod tests { #[test] fn test_messages_in_tipset_cache() { - let cache = MsgsInTipsetCache::new(2.try_into().unwrap()); + let cache = MessagesInTipsetCache::new(nonzero!(2_usize)); let key1 = TipsetKey::from(nunny::vec![Cid::new_v1( DAG_CBOR, MultihashCode::Blake2b256.digest(&[1]) )]); assert!(cache.get(&key1).is_none()); - let msgs = vec![ChainMessage::Unsigned(Message::default())]; + let msgs = vec![Message::default().into()]; cache.insert(key1.clone(), msgs.clone()); - assert_eq!(msgs, cache.get(&key1).unwrap()); + assert_eq!(&msgs, &*cache.get(&key1).unwrap()); let inserter_executed: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); @@ -821,7 +758,10 @@ mod tests { Ok(msgs.clone()) }; - assert_eq!(msgs, cache.get_or_insert_with(&key1, key_inserter).unwrap()); + assert_eq!( + &msgs, + &*cache.get_or_insert_with(&key1, key_inserter).unwrap() + ); assert!(!inserter_executed.load(std::sync::atomic::Ordering::Relaxed)); let key2 = TipsetKey::from(nunny::vec![Cid::new_v1( @@ -830,7 +770,10 @@ mod tests { )]); assert!(cache.get(&key2).is_none()); - assert_eq!(msgs, cache.get_or_insert_with(&key2, key_inserter).unwrap()); + assert_eq!( + &msgs, + &*cache.get_or_insert_with(&key2, key_inserter).unwrap() + ); assert!(inserter_executed.load(std::sync::atomic::Ordering::Relaxed)); } } diff --git a/src/chain_sync/tipset_syncer.rs b/src/chain_sync/tipset_syncer.rs index 7cad8b543ba4..1c6993eeda90 100644 --- a/src/chain_sync/tipset_syncer.rs +++ b/src/chain_sync/tipset_syncer.rs @@ -23,7 +23,7 @@ use crate::{ }; use crate::{ eth::is_valid_eth_tx_for_sending, - message::{Message as MessageTrait, valid_for_block_inclusion}, + message::{MessageRead as _, valid_for_block_inclusion}, }; use ahash::HashMap; use cid::Cid; diff --git a/src/cli/subcommands/mpool_cmd.rs b/src/cli/subcommands/mpool_cmd.rs index 996e165d1f3e..c83a814c2cbd 100644 --- a/src/cli/subcommands/mpool_cmd.rs +++ b/src/cli/subcommands/mpool_cmd.rs @@ -3,7 +3,7 @@ use crate::blocks::Tipset; use crate::lotus_json::{HasLotusJson as _, NotNullVec}; -use crate::message::SignedMessage; +use crate::message::{MessageRead as _, SignedMessage}; use crate::rpc::{self, prelude::*, types::ApiTipsetKey}; use crate::shim::address::StrictAddress; use crate::shim::message::Message; @@ -52,8 +52,6 @@ fn filter_messages( to: Option<&StrictAddress>, from: Option<&StrictAddress>, ) -> anyhow::Result> { - use crate::message::Message; - let filtered = messages .into_iter() .filter(|msg| { @@ -285,7 +283,6 @@ impl MpoolCommands { mod tests { use super::*; use crate::key_management::{KeyStore, KeyStoreConfig, Wallet}; - use crate::message::{Message, SignedMessage}; use crate::message_pool::tests::create_smsg; use crate::shim::crypto::SignatureType; use itertools::Itertools as _; diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 91c0a974010e..a83a86cfc7a0 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -398,7 +398,6 @@ fn maybe_start_rpc_service( let tipset_send = chain_follower.tipset_sender.clone(); let keystore = ctx.keystore.clone(); let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone(); - let msgs_in_tipset = Arc::new(crate::chain::MsgsInTipsetCache::default()); async move { let rpc_listener = tokio::net::TcpListener::bind(rpc_address) .await @@ -412,7 +411,6 @@ fn maybe_start_rpc_service( keystore, mpool, bad_blocks, - msgs_in_tipset, sync_status, eth_event_handler, sync_network_context, diff --git a/src/eth/transaction.rs b/src/eth/transaction.rs index 495d73fa8ecd..9d989d4f49fc 100644 --- a/src/eth/transaction.rs +++ b/src/eth/transaction.rs @@ -17,7 +17,7 @@ use rlp::Rlp; use spire_enum::prelude::delegated_enum; use crate::{ - message::{Message as _, SignedMessage}, + message::{MessageRead as _, SignedMessage}, rpc::eth::types::EthAddress, shim::{address::Address, crypto::SignatureType, message::Message, version::NetworkVersion}, }; diff --git a/src/interpreter/vm.rs b/src/interpreter/vm.rs index 6985c695577a..9dc3e26939bb 100644 --- a/src/interpreter/vm.rs +++ b/src/interpreter/vm.rs @@ -12,7 +12,7 @@ use crate::interpreter::{ fvm4::ForestExterns as ForestExternsV4, }; use crate::message::ChainMessage; -use crate::message::Message as MessageTrait; +use crate::message::MessageRead as _; use crate::networks::{ChainConfig, NetworkChain}; use crate::shim::actors::{AwardBlockRewardParams, cron, reward}; use crate::shim::{ @@ -115,14 +115,8 @@ impl BlockMessages { let (usm, sm) = block_messages(db, b)?; let mut messages = Vec::with_capacity(usm.len() + sm.len()); - messages.extend( - usm.into_iter() - .filter_map(|m| select_msg(ChainMessage::Unsigned(m))), - ); - messages.extend( - sm.into_iter() - .filter_map(|m| select_msg(ChainMessage::Signed(m))), - ); + messages.extend(usm.into_iter().filter_map(|m| select_msg(m.into()))); + messages.extend(sm.into_iter().filter_map(|m| select_msg(m.into()))); Ok(BlockMessages { miner: b.miner_address, @@ -336,7 +330,7 @@ where if let Some(mut callback) = callback { callback(MessageCallbackCtx { cid: cron_msg.cid(), - message: &ChainMessage::Unsigned(cron_msg), + message: &cron_msg.into(), apply_ret: &ret, at: CalledAt::Cron, duration, @@ -421,7 +415,7 @@ where if let Some(callback) = &mut callback { callback(MessageCallbackCtx { cid: rew_msg.cid(), - message: &ChainMessage::Unsigned(rew_msg), + message: &rew_msg.into(), apply_ret: &ret, at: CalledAt::Reward, duration, diff --git a/src/message/chain_message.rs b/src/message/chain_message.rs index e7b81e271726..27dfb7a77170 100644 --- a/src/message/chain_message.rs +++ b/src/message/chain_message.rs @@ -1,21 +1,36 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use super::Message as MessageTrait; +use super::{Message as MessageTrait, MessageRead}; use crate::message::signed_message::SignedMessage; use crate::shim::message::MethodNum; use crate::shim::{address::Address, econ::TokenAmount, message::Message}; use fvm_ipld_encoding::RawBytes; use get_size2::GetSize; use serde::{Deserialize, Serialize}; +use spire_enum::prelude::delegated_enum; +use std::sync::Arc; /// `Enum` to encapsulate signed and unsigned messages. Useful when working with /// both types +#[delegated_enum] #[derive(Clone, Debug, Serialize, Deserialize, Hash, Eq, PartialEq, GetSize, derive_more::From)] #[serde(untagged)] pub enum ChainMessage { - Unsigned(Message), - Signed(SignedMessage), + Unsigned(Arc), + Signed(Arc), +} + +impl From for ChainMessage { + fn from(msg: Message) -> Self { + Arc::new(msg).into() + } +} + +impl From for ChainMessage { + fn from(msg: SignedMessage) -> Self { + Arc::new(msg).into() + } } impl ChainMessage { @@ -27,10 +42,7 @@ impl ChainMessage { } pub fn cid(&self) -> cid::Cid { - match self { - ChainMessage::Unsigned(msg) => msg.cid(), - ChainMessage::Signed(msg) => msg.cid(), - } + delegate_chain_message!(self.cid()) } /// Tests if a message is equivalent to another replacing message. @@ -41,93 +53,62 @@ impl ChainMessage { pub fn equal_call(&self, other: &Self) -> bool { self.message().equal_call(other.message()) } -} -impl MessageTrait for ChainMessage { - fn from(&self) -> Address { + pub fn set_sequence(&mut self, new_sequence: u64) { match self { - Self::Signed(t) => t.from(), - Self::Unsigned(t) => t.from, + Self::Unsigned(m) => Arc::make_mut(m).set_sequence(new_sequence), + Self::Signed(sm) => Arc::make_mut(sm).set_sequence(new_sequence), } } +} + +impl MessageRead for ChainMessage { + fn from(&self) -> Address { + delegate_chain_message!(self.from()) + } fn to(&self) -> Address { - match self { - Self::Signed(t) => t.to(), - Self::Unsigned(t) => t.to, - } + delegate_chain_message!(self.to()) } fn sequence(&self) -> u64 { - match self { - Self::Signed(t) => t.sequence(), - Self::Unsigned(t) => t.sequence, - } + delegate_chain_message!(self.sequence()) } fn value(&self) -> TokenAmount { - match self { - Self::Signed(t) => t.value(), - Self::Unsigned(t) => t.value.clone(), - } + delegate_chain_message!(self.value()) } fn method_num(&self) -> MethodNum { - match self { - Self::Signed(t) => t.method_num(), - Self::Unsigned(t) => t.method_num, - } + delegate_chain_message!(self.method_num()) } fn params(&self) -> &RawBytes { - match self { - Self::Signed(t) => t.params(), - Self::Unsigned(t) => t.params(), - } + delegate_chain_message!(self.params()) } fn gas_limit(&self) -> u64 { - match self { - Self::Signed(t) => t.gas_limit(), - Self::Unsigned(t) => t.gas_limit(), - } - } - fn set_gas_limit(&mut self, token_amount: u64) { - match self { - Self::Signed(t) => t.set_gas_limit(token_amount), - Self::Unsigned(t) => t.set_gas_limit(token_amount), - } - } - fn set_sequence(&mut self, new_sequence: u64) { - match self { - Self::Signed(t) => t.set_sequence(new_sequence), - Self::Unsigned(t) => t.set_sequence(new_sequence), - } + delegate_chain_message!(self.gas_limit()) } fn required_funds(&self) -> TokenAmount { - match self { - Self::Signed(t) => t.required_funds(), - Self::Unsigned(t) => &t.gas_fee_cap * t.gas_limit + &t.value, - } + delegate_chain_message!(self.required_funds()) } fn gas_fee_cap(&self) -> TokenAmount { - match self { - Self::Signed(t) => t.gas_fee_cap(), - Self::Unsigned(t) => t.gas_fee_cap.clone(), - } + delegate_chain_message!(self.gas_fee_cap()) } fn gas_premium(&self) -> TokenAmount { - match self { - Self::Signed(t) => t.gas_premium(), - Self::Unsigned(t) => t.gas_premium.clone(), - } + delegate_chain_message!(self.gas_premium()) + } +} + +impl MessageTrait for ChainMessage { + fn set_gas_limit(&mut self, amount: u64) { + delegate_chain_message!(self => |i| Arc::make_mut(i).set_gas_limit(amount)) + } + + fn set_sequence(&mut self, sequence: u64) { + delegate_chain_message!(self => |i| Arc::make_mut(i).set_sequence(sequence)) } fn set_gas_fee_cap(&mut self, cap: TokenAmount) { - match self { - Self::Signed(t) => t.set_gas_fee_cap(cap), - Self::Unsigned(t) => t.set_gas_fee_cap(cap), - } + delegate_chain_message!(self => |i| Arc::make_mut(i).set_gas_fee_cap(cap)) } fn set_gas_premium(&mut self, prem: TokenAmount) { - match self { - Self::Signed(t) => t.set_gas_premium(prem), - Self::Unsigned(t) => t.set_gas_premium(prem), - } + delegate_chain_message!(self => |i| Arc::make_mut(i).set_gas_premium(prem)) } } diff --git a/src/message/mod.rs b/src/message/mod.rs index bb9ad579b163..682878d2f6f3 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -12,9 +12,10 @@ use fvm_ipld_encoding::RawBytes; use num::Zero; pub use signed_message::SignedMessage; -/// Message interface to interact with Signed and unsigned messages in a generic +/// Message interface to make read-only interactions with Signed and unsigned messages in a generic /// context. -pub trait Message { +#[auto_impl::auto_impl(&, Arc)] +pub trait MessageRead { /// Returns the from address of the message. fn from(&self) -> Address; /// Returns the destination address of the message. @@ -27,10 +28,6 @@ pub trait Message { fn method_num(&self) -> MethodNum; /// Returns the encoded parameters for the method call. fn params(&self) -> &RawBytes; - /// sets the gas limit for the message. - fn set_gas_limit(&mut self, amount: u64); - /// sets a new sequence to the message. - fn set_sequence(&mut self, sequence: u64); /// Returns the gas limit for the message. fn gas_limit(&self) -> u64; /// Returns the required funds for the message. @@ -39,10 +36,6 @@ pub trait Message { fn gas_fee_cap(&self) -> TokenAmount; /// gets gas premium for the message. fn gas_premium(&self) -> TokenAmount; - /// sets the gas fee cap. - fn set_gas_fee_cap(&mut self, cap: TokenAmount); - /// sets the gas premium. - fn set_gas_premium(&mut self, prem: TokenAmount); /// This method returns the effective gas premium claimable by the miner /// given the supplied base fee. This method is not used anywhere except the `Eth` API. /// @@ -56,7 +49,20 @@ pub trait Message { } } -impl Message for ShimMessage { +/// Message interface to interact with Signed and unsigned messages in a generic +/// context. +pub trait Message: MessageRead { + /// sets the gas limit for the message. + fn set_gas_limit(&mut self, amount: u64); + /// sets a new sequence to the message. + fn set_sequence(&mut self, sequence: u64); + /// sets the gas fee cap. + fn set_gas_fee_cap(&mut self, cap: TokenAmount); + /// sets the gas premium. + fn set_gas_premium(&mut self, prem: TokenAmount); +} + +impl MessageRead for ShimMessage { fn from(&self) -> Address { self.from } @@ -78,12 +84,6 @@ impl Message for ShimMessage { fn gas_limit(&self) -> u64 { self.gas_limit } - fn set_gas_limit(&mut self, token_amount: u64) { - self.gas_limit = token_amount; - } - fn set_sequence(&mut self, new_sequence: u64) { - self.sequence = new_sequence; - } fn required_funds(&self) -> TokenAmount { &self.gas_fee_cap * self.gas_limit } @@ -93,11 +93,18 @@ impl Message for ShimMessage { fn gas_premium(&self) -> TokenAmount { self.gas_premium.clone() } +} +impl Message for ShimMessage { + fn set_gas_limit(&mut self, token_amount: u64) { + self.gas_limit = token_amount; + } + fn set_sequence(&mut self, new_sequence: u64) { + self.sequence = new_sequence; + } fn set_gas_fee_cap(&mut self, cap: TokenAmount) { self.gas_fee_cap = cap; } - fn set_gas_premium(&mut self, prem: TokenAmount) { self.gas_premium = prem; } diff --git a/src/message/signed_message.rs b/src/message/signed_message.rs index fa2731468c0c..a5864379565d 100644 --- a/src/message/signed_message.rs +++ b/src/message/signed_message.rs @@ -1,7 +1,7 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use super::Message as MessageTrait; +use super::{Message as MessageTrait, MessageRead}; use crate::eth::EthChainId; use crate::shim::message::MethodNum; use crate::shim::{ @@ -108,7 +108,7 @@ impl SignedMessage { } } -impl MessageTrait for SignedMessage { +impl MessageRead for SignedMessage { fn from(&self) -> Address { self.message.from() } @@ -130,12 +130,6 @@ impl MessageTrait for SignedMessage { fn gas_limit(&self) -> u64 { self.message.gas_limit() } - fn set_gas_limit(&mut self, token_amount: u64) { - self.message.set_gas_limit(token_amount); - } - fn set_sequence(&mut self, new_sequence: u64) { - self.message.set_sequence(new_sequence); - } fn required_funds(&self) -> TokenAmount { self.message.required_funds() } @@ -145,11 +139,18 @@ impl MessageTrait for SignedMessage { fn gas_premium(&self) -> TokenAmount { self.message.gas_premium() } +} +impl MessageTrait for SignedMessage { + fn set_gas_limit(&mut self, token_amount: u64) { + self.message.set_gas_limit(token_amount); + } + fn set_sequence(&mut self, new_sequence: u64) { + self.message.set_sequence(new_sequence); + } fn set_gas_fee_cap(&mut self, cap: TokenAmount) { self.message.set_gas_fee_cap(cap) } - fn set_gas_premium(&mut self, prem: TokenAmount) { self.message.set_gas_premium(prem) } diff --git a/src/message_pool/msg_chain.rs b/src/message_pool/msg_chain.rs index b41a046524da..c2410db1de20 100644 --- a/src/message_pool/msg_chain.rs +++ b/src/message_pool/msg_chain.rs @@ -7,7 +7,7 @@ use std::{ ops::{Index, IndexMut}, }; -use crate::message::{Message, SignedMessage}; +use crate::message::{MessageRead as _, SignedMessage}; use crate::networks::ChainConfig; use crate::shim::{ address::Address, diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index 6fe48064d8bb..63ed23c8c85f 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -13,7 +13,7 @@ use std::{borrow::BorrowMut, cmp::Ordering}; use crate::blocks::Tipset; use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic}; -use crate::message::{Message as MessageTrait, SignedMessage}; +use crate::message::{MessageRead as _, SignedMessage}; use crate::networks::ChainConfig; use crate::shim::{address::Address, crypto::Signature}; use crate::utils::cache::SizeTrackingLruCache; diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 8a68edc85ab3..be4eea8ebf1d 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -14,7 +14,7 @@ use crate::chain::{HeadChanges, MINIMUM_BASE_FEE}; use crate::db::SettingsStore; use crate::eth::is_valid_eth_tx_for_sending; use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic}; -use crate::message::{ChainMessage, Message, SignedMessage, valid_for_block_inclusion}; +use crate::message::{ChainMessage, MessageRead as _, SignedMessage, valid_for_block_inclusion}; use crate::networks::{ChainConfig, NEWEST_NETWORK_VERSION}; use crate::rpc::eth::types::EthAddress; use crate::shim::{ @@ -640,8 +640,8 @@ where bls_sig_cache.push(msg.cid().into(), msg.signature().clone()); } - api.put_message(&ChainMessage::Signed(msg.clone()))?; - api.put_message(&ChainMessage::Unsigned(msg.message().clone()))?; + api.put_message(&ChainMessage::Signed(msg.clone().into()))?; + api.put_message(&ChainMessage::Unsigned(msg.message().clone().into()))?; let mut pending = pending.write(); let from = msg.from(); diff --git a/src/message_pool/msgpool/selection.rs b/src/message_pool/msgpool/selection.rs index b73d6b60f1be..3b66947aedd8 100644 --- a/src/message_pool/msgpool/selection.rs +++ b/src/message_pool/msgpool/selection.rs @@ -9,7 +9,7 @@ use std::{borrow::BorrowMut, cmp::Ordering}; use crate::blocks::{BLOCK_MESSAGE_LIMIT, Tipset}; -use crate::message::{Message, SignedMessage}; +use crate::message::{MessageRead as _, SignedMessage}; use crate::message_pool::msg_chain::MsgChainNode; use crate::shim::crypto::SignatureType; use crate::shim::{address::Address, econ::TokenAmount}; @@ -871,14 +871,9 @@ where mod test_selection { use std::sync::Arc; + use super::*; use crate::db::MemoryDB; use crate::key_management::{KeyStore, KeyStoreConfig, Wallet}; - use crate::message::Message; - use crate::shim::crypto::SignatureType; - use crate::shim::econ::BLOCK_GAS_LIMIT; - use tokio::task::JoinSet; - - use super::*; use crate::message_pool::{ head_change, msgpool::{ @@ -886,6 +881,9 @@ mod test_selection { tests::{create_fake_smsg, create_smsg}, }, }; + use crate::shim::crypto::SignatureType; + use crate::shim::econ::BLOCK_GAS_LIMIT; + use tokio::task::JoinSet; const TEST_GAS_LIMIT: i64 = 6955002; diff --git a/src/message_pool/msgpool/test_provider.rs b/src/message_pool/msgpool/test_provider.rs index 5dd08dff2a1f..db46af2a42ab 100644 --- a/src/message_pool/msgpool/test_provider.rs +++ b/src/message_pool/msgpool/test_provider.rs @@ -10,7 +10,7 @@ use crate::blocks::{ }; use crate::chain::HeadChanges; use crate::cid_collections::CidHashMap; -use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage}; +use crate::message::{ChainMessage, MessageRead as _, SignedMessage}; use crate::message_pool::{Error, provider::Provider}; use crate::shim::{address::Address, econ::TokenAmount, message::Message, state_tree::ActorState}; use ahash::HashMap; diff --git a/src/message_pool/msgpool/utils.rs b/src/message_pool/msgpool/utils.rs index ac7c70f7d0e1..b4dbb4243f95 100644 --- a/src/message_pool/msgpool/utils.rs +++ b/src/message_pool/msgpool/utils.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT use crate::chain::MINIMUM_BASE_FEE; -use crate::message::{Message as MessageTrait, SignedMessage}; +use crate::message::{MessageRead as _, SignedMessage}; use crate::shim::{crypto::Signature, econ::TokenAmount, message::Message}; use crate::utils::cache::SizeTrackingLruCache; use crate::utils::get_size::CidWrapper; diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 35b9dfa1eac9..e49ab6f85a1a 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -45,6 +45,7 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use sha2::Sha256; use std::fs::File; +use std::sync::Arc; use std::{collections::VecDeque, path::PathBuf, sync::LazyLock}; use tokio::sync::{ Mutex, @@ -239,8 +240,8 @@ impl RpcMethod<1> for ChainGetMessage { .get_cbor(&message_cid)? .with_context(|| format!("can't find message with cid {message_cid}"))?; let message = match chain_message { - ChainMessage::Signed(m) => m.into_message(), - ChainMessage::Unsigned(m) => m, + ChainMessage::Signed(m) => Arc::unwrap_or_clone(m).into_message(), + ChainMessage::Unsigned(m) => Arc::unwrap_or_clone(m), }; let cid = message.cid(); diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 801486b68ffd..b631d726ff6a 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -27,7 +27,7 @@ use crate::eth::{ }; use crate::interpreter::VMTrace; use crate::lotus_json::{HasLotusJson, lotus_json_with_self}; -use crate::message::{ChainMessage, Message as _, SignedMessage}; +use crate::message::{ChainMessage, Message as _, MessageRead as _, SignedMessage}; use crate::rpc::{ ApiPaths, Ctx, EthEventHandler, LOOKBACK_NO_LIMIT, Permission, RpcMethod, RpcMethodExt as _, error::ServerError, @@ -518,19 +518,24 @@ impl Block { { let ti = EthUint64(i as u64); gas_used += receipt.gas_used(); - let smsg = match message { - ChainMessage::Signed(msg) => msg.clone(), + let mut tx = match message { + ChainMessage::Signed(smsg) => new_eth_tx_from_signed_message( + &smsg, + &state_tree, + ctx.chain_config().eth_chain_id, + )?, ChainMessage::Unsigned(msg) => { - let sig = Signature::new_bls(vec![]); - SignedMessage::new_unchecked(msg.clone(), sig) + let tx = eth_tx_from_native_message( + &msg, + &state_tree, + ctx.chain_config().eth_chain_id, + )?; + ApiEthTx { + hash: msg.cid().into(), + ..tx + } } }; - - let mut tx = new_eth_tx_from_signed_message( - &smsg, - &state_tree, - ctx.chain_config().eth_chain_id, - )?; tx.block_hash = block_hash; tx.block_number = block_number; tx.transaction_index = ti; diff --git a/src/rpc/methods/gas.rs b/src/rpc/methods/gas.rs index 9c0a64c0bbe8..e70f7ae95f3c 100644 --- a/src/rpc/methods/gas.rs +++ b/src/rpc/methods/gas.rs @@ -5,7 +5,7 @@ use super::state::InvocResult; use crate::blocks::Tipset; use crate::chain::{BASE_FEE_MAX_CHANGE_DENOM, BLOCK_GAS_TARGET}; use crate::interpreter::VMTrace; -use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage}; +use crate::message::{ChainMessage, Message as _, MessageRead as _, SignedMessage}; use crate::rpc::chain::FlattenedApiMessage; use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod, error::ServerError, types::*}; use crate::shim::executor::ApplyRet; @@ -106,7 +106,7 @@ struct GasMeta { } pub async fn estimate_gas_premium( - data: &Ctx, + ctx: &Ctx, mut nblocksincl: u64, ApiTipsetKey(ts_key): &ApiTipsetKey, ) -> Result { @@ -117,18 +117,16 @@ pub async fn estimate_gas_premium( let mut prices: Vec = Vec::new(); let mut blocks = 0; - let mut ts = data - .chain_store() - .load_required_tipset_or_heaviest(ts_key)?; + let mut ts = ctx.chain_store().load_required_tipset_or_heaviest(ts_key)?; for _ in 0..(nblocksincl * 2) { if ts.epoch() == 0 { break; } - let pts = data.chain_index().load_required_tipset(ts.parents())?; - blocks += pts.block_headers().len(); - let msgs = - crate::chain::messages_for_tipset_with_cache(data.store(), &pts, &data.msgs_in_tipset)?; + let parent_ts = ctx.chain_index().load_required_tipset(ts.parents())?; + blocks += parent_ts.block_headers().len(); + + let msgs = ctx.chain_store().messages_for_tipset(&parent_ts)?; prices.append( &mut msgs @@ -139,7 +137,7 @@ pub async fn estimate_gas_premium( }) .collect(), ); - ts = pts; + ts = parent_ts; } let mut premium = compute_gas_premium(prices, blocks as u64); @@ -237,7 +235,7 @@ impl GasEstimateGasLimit { let pending = data.mpool.pending_for(&from_a); let prior_messages: Vec = pending - .map(|s| s.into_iter().map(ChainMessage::Signed).collect_vec()) + .map(|s| s.into_iter().map(Into::into).collect_vec()) .unwrap_or_default(); let ts = data.mpool.current_tipset(); @@ -245,17 +243,18 @@ impl GasEstimateGasLimit { // cost. We obviously can't generate a valid signature. Instead, we just // fill the signature with zeros. The validity is not checked. let mut chain_msg = match from_a.protocol() { - Protocol::Secp256k1 => ChainMessage::Signed(SignedMessage::new_unchecked( - msg, - Signature::new_secp256k1(vec![0; SECP_SIG_LEN]), - )), - Protocol::Delegated => ChainMessage::Signed(SignedMessage::new_unchecked( + Protocol::Secp256k1 => { + SignedMessage::new_unchecked(msg, Signature::new_secp256k1(vec![0; SECP_SIG_LEN])) + .into() + } + Protocol::Delegated => SignedMessage::new_unchecked( msg, // In Lotus, delegated signatures have the same length as SECP256k1. // This may or may not change in the future. Signature::new(SignatureType::Delegated, vec![0; SECP_SIG_LEN]), - )), - _ => ChainMessage::Unsigned(msg), + ) + .into(), + _ => msg.into(), }; let (invoc_res, apply_ret, _, _) = data diff --git a/src/rpc/methods/state.rs b/src/rpc/methods/state.rs index 8f0f191d8b3b..2c05838fa774 100644 --- a/src/rpc/methods/state.rs +++ b/src/rpc/methods/state.rs @@ -2656,7 +2656,7 @@ impl RpcMethod<3> for StateListMessages { while cur_ts.epoch() >= max_height { let msgs = ctx.chain_store().messages_for_tipset(&cur_ts)?; - for msg in msgs { + for msg in msgs.iter() { if from_to.matches(msg.message()) { out.push(msg.cid()); } diff --git a/src/rpc/methods/state/types.rs b/src/rpc/methods/state/types.rs index 65a9f20efb60..4edd6a6e3575 100644 --- a/src/rpc/methods/state/types.rs +++ b/src/rpc/methods/state/types.rs @@ -3,7 +3,7 @@ use crate::blocks::TipsetKey; use crate::lotus_json::{LotusJson, lotus_json_with_self}; -use crate::message::Message as _; +use crate::message::MessageRead as _; use crate::shim::executor::ApplyRet; use crate::shim::{ address::Address, diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 070614ff78a4..62384d37cbd7 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -236,7 +236,6 @@ mod tests { keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory).unwrap())), mpool: Arc::new(pool), bad_blocks: Some(Default::default()), - msgs_in_tipset: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::default())), eth_event_handler: Arc::new(EthEventHandler::new()), sync_network_context, diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index f7cc4bef31be..49c6a5480bdd 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -473,7 +473,6 @@ pub struct RPCState { pub state_manager: Arc>, pub mpool: Arc>>>, pub bad_blocks: Option>, - pub msgs_in_tipset: Arc, pub sync_status: crate::chain_sync::SyncStatus, pub eth_event_handler: Arc, pub sync_network_context: SyncNetworkContext, diff --git a/src/shim/crypto.rs b/src/shim/crypto.rs index bae85bfc1aad..b505c129065c 100644 --- a/src/shim/crypto.rs +++ b/src/shim/crypto.rs @@ -5,7 +5,7 @@ pub use super::fvm_shared_latest::{ }; use super::version::NetworkVersion; use crate::eth::{EthChainId, EthTx}; -use crate::message::{Message, SignedMessage}; +use crate::message::{MessageRead as _, SignedMessage}; use anyhow::{Context, ensure}; use bls_signatures::{PublicKey as BlsPublicKey, Signature as BlsSignature}; use cid::Cid; diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index a60b9aaa23d4..bc5603e4e333 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -24,7 +24,7 @@ use crate::interpreter::{ }; use crate::interpreter::{MessageCallbackCtx, VMTrace}; use crate::lotus_json::{LotusJson, lotus_json_with_self}; -use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage}; +use crate::message::{ChainMessage, Message as _, MessageRead as _, SignedMessage}; use crate::networks::ChainConfig; use crate::rpc::state::{ApiInvocResult, InvocResult, MessageGasCost}; use crate::rpc::types::{MiningBaseInfo, SectorOnChainInfo}; @@ -545,7 +545,7 @@ where receipts.len() ); let mut executed_messages = Vec::with_capacity(messages.len()); - for (message, receipt) in messages.into_iter().zip(receipts.into_iter()) { + for (message, receipt) in messages.iter().cloned().zip(receipts.into_iter()) { let events = if include_events && let Some(events_root) = receipt.events_root() { Some( match StampedEvent::get_events(self.cs.blockstore(), &events_root) { @@ -691,17 +691,19 @@ where // cost. We obviously can't generate a valid signature. Instead, we just // fill the signature with zeros. The validity is not checked. let mut chain_msg = match from_a.protocol() { - Protocol::Secp256k1 => ChainMessage::Signed(SignedMessage::new_unchecked( + Protocol::Secp256k1 => SignedMessage::new_unchecked( msg.clone(), Signature::new_secp256k1(vec![0; SECP_SIG_LEN]), - )), - Protocol::Delegated => ChainMessage::Signed(SignedMessage::new_unchecked( + ) + .into(), + Protocol::Delegated => SignedMessage::new_unchecked( msg.clone(), // In Lotus, delegated signatures have the same length as SECP256k1. // This may or may not change in the future. Signature::new(SignatureType::Delegated, vec![0; SECP_SIG_LEN]), - )), - _ => ChainMessage::Unsigned(msg.clone()), + ) + .into(), + _ => msg.clone().into(), }; let (_invoc_res, apply_ret, duration, state_root) = self diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 01b273eaa55c..97e0281c1327 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -121,7 +121,6 @@ where keystore: Arc::new(RwLock::new(keystore)), mpool: Arc::new(message_pool), bad_blocks: Default::default(), - msgs_in_tipset: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)), sync_network_context, diff --git a/src/tool/subcommands/api_cmd/api_compare_tests.rs b/src/tool/subcommands/api_cmd/api_compare_tests.rs index 950ff4d524cf..fe2fa2a792bd 100644 --- a/src/tool/subcommands/api_cmd/api_compare_tests.rs +++ b/src/tool/subcommands/api_cmd/api_compare_tests.rs @@ -7,7 +7,7 @@ use crate::chain::ChainStore; use crate::db::car::ManyCar; use crate::eth::EthChainId as EthChainIdType; use crate::lotus_json::HasLotusJson; -use crate::message::{Message as _, SignedMessage}; +use crate::message::{MessageRead as _, SignedMessage}; use crate::rpc::auth::AuthNewParams; use crate::rpc::beacon::BeaconGetEntry; use crate::rpc::eth::{ diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index e20f7e4f866f..fde2b5b63536 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -148,7 +148,6 @@ async fn ctx( keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)), mpool: Arc::new(message_pool), bad_blocks: Default::default(), - msgs_in_tipset: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::new()), sync_network_context, diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 463f95e220e0..8550ecfb67a7 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -166,7 +166,6 @@ async fn ctx( keystore: Arc::new(RwLock::new(KeyStore::new(KeyStoreConfig::Memory)?)), mpool: Arc::new(message_pool), bad_blocks: Default::default(), - msgs_in_tipset: Default::default(), sync_status: Arc::new(RwLock::new(SyncStatusReport::init())), eth_event_handler: Arc::new(EthEventHandler::new()), sync_network_context, diff --git a/src/tool/subcommands/snapshot_cmd.rs b/src/tool/subcommands/snapshot_cmd.rs index ae9ab0a4eccf..a162e25cfc00 100644 --- a/src/tool/subcommands/snapshot_cmd.rs +++ b/src/tool/subcommands/snapshot_cmd.rs @@ -529,7 +529,7 @@ mod structured { use crate::state_manager::utils::structured; use crate::{ interpreter::CalledAt, - message::{ChainMessage, Message as _}, + message::{ChainMessage, MessageRead as _}, shim::executor::ApplyRet, }; use std::time::Duration; diff --git a/src/utils/cache/lru.rs b/src/utils/cache/lru.rs index b6fb24fe367c..9e265cc88e19 100644 --- a/src/utils/cache/lru.rs +++ b/src/utils/cache/lru.rs @@ -104,14 +104,6 @@ where self.cache.write().insert(k, v) } - pub fn contains(&self, k: &Q) -> bool - where - K: Borrow, - Q: Hash + Eq + ?Sized, - { - self.cache.read().contains_key(k) - } - pub fn get_cloned(&self, k: &Q) -> Option where K: Borrow,