diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index 2dba45cf89d..63f64197139 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -16,6 +16,7 @@ use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic}; use crate::message::{MessageRead as _, SignedMessage}; use crate::networks::ChainConfig; use crate::shim::{address::Address, crypto::Signature}; +use crate::state_manager::IdToAddressCache; use crate::utils::ShallowClone as _; use crate::utils::cache::SizeTrackingLruCache; use crate::utils::get_size::CidWrapper; @@ -52,7 +53,7 @@ async fn republish_pending_messages( cur_tipset: &SyncRwLock, republished: &SyncRwLock>, local_addrs: &SyncRwLock>, - key_cache: &SizeTrackingLruCache, + key_cache: &IdToAddressCache, chain_config: &ChainConfig, ) -> Result<(), Error> where @@ -223,7 +224,7 @@ pub async fn head_change( republished: &SyncRwLock>, pending: &SyncRwLock>, cur_tipset: &SyncRwLock, - key_cache: &SizeTrackingLruCache, + key_cache: &IdToAddressCache, state_nonce_cache: &SizeTrackingLruCache, revert: Vec, apply: Vec, @@ -326,7 +327,7 @@ where pub(in crate::message_pool) struct MpoolCtx<'a, T> { pub api: &'a T, - pub key_cache: &'a SizeTrackingLruCache, + pub key_cache: &'a IdToAddressCache, pub pending: &'a SyncRwLock>, pub ts: &'a Tipset, } diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 04003b37554..369e3905354 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -23,7 +23,9 @@ use crate::shim::{ econ::TokenAmount, gas::{Gas, price_list_by_network_version}, }; +use crate::state_manager::IdToAddressCache; use crate::state_manager::utils::is_valid_for_sending; +use crate::utils::ShallowClone as _; use crate::utils::cache::SizeTrackingLruCache; use crate::utils::get_size::CidWrapper; use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; @@ -280,13 +282,13 @@ pub struct MessagePool { /// Sender half to send messages to other components pub network_sender: flume::Sender, /// A cache for BLS signature keyed by Cid - pub bls_sig_cache: Arc>, + pub bls_sig_cache: SizeTrackingLruCache, /// A cache for BLS signature keyed by Cid - pub sig_val_cache: Arc>, - /// Cache for ID address to key address resolution. - pub key_cache: Arc>, + pub sig_val_cache: SizeTrackingLruCache, + /// Cache for ID address ID to key address resolution. + pub key_cache: IdToAddressCache, /// Cache for state nonce lookups keyed by (`TipsetKey`, `Address`). - pub state_nonce_cache: Arc>, + pub state_nonce_cache: SizeTrackingLruCache, /// A set of republished messages identified by their Cid pub republished: Arc>>, /// Acts as a signal to republish messages from the republished set of @@ -303,25 +305,27 @@ pub struct MessagePool { /// Non-ID addresses are returned unchanged. pub(in crate::message_pool) fn resolve_to_key( api: &T, - key_cache: &SizeTrackingLruCache, + key_cache: &IdToAddressCache, addr: &Address, cur_ts: &Tipset, ) -> Result { - if addr.protocol() != Protocol::ID { - return Ok(*addr); - } - if let Some(resolved) = key_cache.get_cloned(addr) { + let id = addr.id().ok(); + if let Some(id) = &id + && let Some(resolved) = key_cache.get_cloned(id) + { return Ok(resolved); } - let resolved = api.resolve_to_key(addr, cur_ts)?; - key_cache.push(*addr, resolved); + let resolved = api.resolve_to_deterministic_address_at_finality(addr, cur_ts)?; + if let Some(id) = id { + key_cache.push(id, resolved); + } Ok(resolved) } /// Get the state nonce for an address, accounting for messages already included in `cur_ts`. pub(in crate::message_pool) fn get_state_sequence( api: &T, - key_cache: &SizeTrackingLruCache, + key_cache: &IdToAddressCache, state_nonce_cache: &SizeTrackingLruCache, addr: &Address, cur_ts: &Tipset, @@ -371,7 +375,7 @@ where } pub fn resolve_to_key(&self, addr: &Address, cur_ts: &Tipset) -> Result { - resolve_to_key(self.api.as_ref(), self.key_cache.as_ref(), addr, cur_ts) + resolve_to_key(self.api.as_ref(), &self.key_cache, addr, cur_ts) } /// Add a signed message to the pool and its address. @@ -531,9 +535,9 @@ where let cur_ts = self.current_tipset(); add_helper( self.api.as_ref(), - self.bls_sig_cache.as_ref(), + &self.bls_sig_cache, self.pending.as_ref(), - self.key_cache.as_ref(), + &self.key_cache, &cur_ts, msg, self.get_state_sequence(&from, &cur_ts)?, @@ -570,8 +574,8 @@ where fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result { get_state_sequence( self.api.as_ref(), - self.key_cache.as_ref(), - self.state_nonce_cache.as_ref(), + &self.key_cache, + &self.state_nonce_cache, addr, cur_ts, ) @@ -640,7 +644,7 @@ where msg_vec.append(smsgs.as_mut()); for msg in umsg { - let smsg = recover_sig(self.bls_sig_cache.as_ref(), msg)?; + let smsg = recover_sig(&self.bls_sig_cache, msg)?; msg_vec.push(smsg) } } @@ -690,13 +694,13 @@ where { head_change( self.api.as_ref(), - self.bls_sig_cache.as_ref(), + &self.bls_sig_cache, self.repub_trigger.clone(), self.republished.as_ref(), self.pending.as_ref(), self.cur_tipset.as_ref(), - self.key_cache.as_ref(), - self.state_nonce_cache.as_ref(), + &self.key_cache, + &self.state_nonce_cache, revert, apply, ) @@ -722,22 +726,13 @@ where let local_addrs = Arc::new(SyncRwLock::new(Vec::new())); let pending = Arc::new(SyncRwLock::new(HashMap::new())); let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset())); - let bls_sig_cache = Arc::new(SizeTrackingLruCache::new_with_metrics( - "bls_sig".into(), - BLS_SIG_CACHE_SIZE, - )); - let sig_val_cache = Arc::new(SizeTrackingLruCache::new_with_metrics( - "sig_val".into(), - SIG_VAL_CACHE_SIZE, - )); - let key_cache = Arc::new(SizeTrackingLruCache::new_with_metrics( - "mpool_key".into(), - KEY_CACHE_SIZE, - )); - let state_nonce_cache = Arc::new(SizeTrackingLruCache::new_with_metrics( - "state_nonce".into(), - STATE_NONCE_CACHE_SIZE, - )); + let bls_sig_cache = + SizeTrackingLruCache::new_with_metrics("bls_sig".into(), BLS_SIG_CACHE_SIZE); + let sig_val_cache = + SizeTrackingLruCache::new_with_metrics("sig_val".into(), SIG_VAL_CACHE_SIZE); + let key_cache = SizeTrackingLruCache::new_with_metrics("mpool_key".into(), KEY_CACHE_SIZE); + let state_nonce_cache = + SizeTrackingLruCache::new_with_metrics("state_nonce".into(), STATE_NONCE_CACHE_SIZE); let local_msgs = Arc::new(SyncRwLock::new(HashSet::new())); let republished = Arc::new(SyncRwLock::new(HashSet::new())); let block_delay = chain_config.block_delay_secs; @@ -765,11 +760,11 @@ where let mut head_changes_rx = mp.api.subscribe_head_changes(); let api = mp.api.clone(); - let bls_sig_cache = mp.bls_sig_cache.clone(); + let bls_sig_cache = mp.bls_sig_cache.shallow_clone(); let pending = mp.pending.clone(); let republished = mp.republished.clone(); - let key_cache = mp.key_cache.clone(); - let state_nonce_cache = mp.state_nonce_cache.clone(); + let key_cache = mp.key_cache.shallow_clone(); + let state_nonce_cache = mp.state_nonce_cache.shallow_clone(); let current_ts = mp.cur_tipset.clone(); let repub_trigger = mp.repub_trigger.clone(); @@ -781,13 +776,13 @@ where Ok(HeadChanges { reverts, applies }) => { if let Err(e) = head_change( api.as_ref(), - bls_sig_cache.as_ref(), + &bls_sig_cache, repub_trigger.clone(), republished.as_ref(), pending.as_ref(), ¤t_ts, - key_cache.as_ref(), - state_nonce_cache.as_ref(), + &key_cache, + &state_nonce_cache, reverts, applies, ) @@ -811,7 +806,7 @@ where let cur_tipset = mp.cur_tipset.clone(); let republished = mp.republished.clone(); let local_addrs = mp.local_addrs.clone(); - let key_cache = mp.key_cache.clone(); + let key_cache = mp.key_cache.shallow_clone(); let network_sender = Arc::new(mp.network_sender.clone()); let republish_interval = u64::from(10 * block_delay + chain_config.propagation_delay_secs); // Reacts to republishing requests @@ -830,7 +825,7 @@ where cur_tipset.as_ref(), republished.as_ref(), local_addrs.as_ref(), - key_cache.as_ref(), + &key_cache, &chain_config, ) .await @@ -854,7 +849,7 @@ pub(in crate::message_pool) fn add_helper( api: &T, bls_sig_cache: &SizeTrackingLruCache, pending: &SyncRwLock>, - key_cache: &SizeTrackingLruCache, + key_cache: &IdToAddressCache, cur_ts: &Tipset, msg: SignedMessage, sequence: u64, @@ -1572,11 +1567,16 @@ mod tests { .unwrap(); // f0300 exists in lookback state (root_a) → resolves successfully. - let result = Provider::resolve_to_key(&cs, &Address::new_id(300), &head).unwrap(); + let result = Provider::resolve_to_deterministic_address_at_finality( + &cs, + &Address::new_id(300), + &head, + ) + .unwrap(); assert_eq!(result, bls_a); // f0400 exists only in head state (root_b), not in lookback → fails. - Provider::resolve_to_key(&cs, &Address::new_id(400), &head) + Provider::resolve_to_deterministic_address_at_finality(&cs, &Address::new_id(400), &head) .expect_err("actor only in head state must not resolve via finality lookback"); } } diff --git a/src/message_pool/msgpool/provider.rs b/src/message_pool/msgpool/provider.rs index 4849bc404b5..2f76e96a8d1 100644 --- a/src/message_pool/msgpool/provider.rs +++ b/src/message_pool/msgpool/provider.rs @@ -48,8 +48,13 @@ pub trait Provider { fn load_tipset(&self, tsk: &TipsetKey) -> Result; /// Computes the base fee fn chain_compute_base_fee(&self, ts: &Tipset) -> Result; - /// Resolve an address to its key form using the tipset's parent state. - fn resolve_to_key(&self, addr: &Address, ts: &Tipset) -> Result; + /// Similar to [`crate::state_manager::StateManager::resolve_to_deterministic_address`] but fails if the ID address being resolved isn't reorg-stable yet. + /// It should not be used for consensus-critical subsystems. + fn resolve_to_deterministic_address_at_finality( + &self, + addr: &Address, + ts: &Tipset, + ) -> Result; /// Return all messages included in the given tipset. fn messages_for_tipset(&self, ts: &Tipset) -> Result>, Error>; // Get max number of messages per actor in the pool @@ -103,9 +108,11 @@ impl Provider for ChainStore { .map_err(|err| err.into()) } - /// Resolves an address to its deterministic key form using the state at - /// finality look-back, This ensures the resolved address is reorg-stable. - fn resolve_to_key(&self, addr: &Address, ts: &Tipset) -> Result { + fn resolve_to_deterministic_address_at_finality( + &self, + addr: &Address, + ts: &Tipset, + ) -> Result { match addr.protocol() { BLS | Secp256k1 | Delegated => Ok(*addr), Actor => Err(Error::Other( @@ -121,6 +128,7 @@ impl Provider for ChainStore { ) .map_err(|e| Error::Other(e.to_string()))? } else { + // Matches the logic at ts.clone() }; diff --git a/src/message_pool/msgpool/selection.rs b/src/message_pool/msgpool/selection.rs index b8f4598a24e..9fa9acc74d0 100644 --- a/src/message_pool/msgpool/selection.rs +++ b/src/message_pool/msgpool/selection.rs @@ -13,6 +13,7 @@ use crate::message::{MessageRead as _, SignedMessage}; use crate::message_pool::msg_chain::MsgChainNode; use crate::shim::crypto::SignatureType; use crate::shim::{address::Address, econ::TokenAmount}; +use crate::state_manager::IdToAddressCache; use ahash::{HashMap, HashMapExt}; use anyhow::{Context, bail, ensure}; use parking_lot::RwLock; @@ -665,9 +666,9 @@ where // Run head change to do reorg detection run_head_change( self.api.as_ref(), - self.bls_sig_cache.as_ref(), + &self.bls_sig_cache, &self.pending, - self.key_cache.as_ref(), + &self.key_cache, cur_ts.clone(), ts.clone(), &mut result, @@ -820,7 +821,7 @@ pub(in crate::message_pool) fn run_head_change( api: &T, bls_sig_cache: &SizeTrackingLruCache, pending: &RwLock>, - key_cache: &SizeTrackingLruCache, + key_cache: &IdToAddressCache, from: Tipset, to: Tipset, rmsgs: &mut HashMap>, diff --git a/src/message_pool/msgpool/test_provider.rs b/src/message_pool/msgpool/test_provider.rs index 2c7f11367a0..2e9b2276703 100644 --- a/src/message_pool/msgpool/test_provider.rs +++ b/src/message_pool/msgpool/test_provider.rs @@ -226,7 +226,11 @@ impl Provider for TestApi { Ok(TokenAmount::from_atto(100)) } - fn resolve_to_key(&self, addr: &Address, _ts: &Tipset) -> Result { + fn resolve_to_deterministic_address_at_finality( + &self, + addr: &Address, + _ts: &Tipset, + ) -> Result { Ok(self.inner.lock().resolve_addr(addr)) } diff --git a/src/shim/address.rs b/src/shim/address.rs index 503fb5abcc3..82cc7033007 100644 --- a/src/shim/address.rs +++ b/src/shim/address.rs @@ -19,6 +19,8 @@ use std::sync::{ atomic::{AtomicU8, Ordering}, }; +pub type AddressId = u64; + /// Zero address used to avoid allowing it to be used for verification. /// This is intentionally disallowed because it is an edge case with Filecoin's BLS /// signature verification. diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index 95c36fce19d..57d8181cd8b 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -32,6 +32,7 @@ use crate::shim::actors::init::{self, State}; use crate::shim::actors::miner::{MinerInfo, MinerPower, Partition}; use crate::shim::actors::verifreg::{Allocation, AllocationID, Claim}; use crate::shim::actors::*; +use crate::shim::address::AddressId; use crate::shim::crypto::{Signature, SignatureType}; use crate::shim::{ actors::{ @@ -87,6 +88,7 @@ use tracing::{error, info, instrument, warn}; const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize); const DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize); pub const EVENTS_AMT_BITWIDTH: u32 = 5; +pub type IdToAddressCache = SizeTrackingLruCache; /// Result of executing an individual chain message in a tipset. /// @@ -192,7 +194,7 @@ pub struct StateManager { cs: Arc>, /// This is a cache which indexes tipsets to their calculated state output (state root, receipt root). cache: TipsetStateCache, - id_to_deterministic_address_cache: SizeTrackingLruCache, + id_to_deterministic_address_cache: IdToAddressCache, beacon: Arc, engine: Arc, } @@ -1788,6 +1790,8 @@ where state.verified_client_data_cap(self.blockstore(), id) } + /// Similar to [`StateTree::resolve_to_deterministic_addr`] but does not allow [`crate::shim::address::Protocol::Actor`] type of addresses. + /// Uses the [`Tipset`] `ts` to generate the VM state. pub async fn resolve_to_deterministic_address( self: &Arc, address: Address,