Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/message_pool/msgpool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
use crate::message::{MessageRead as _, SignedMessage};
use crate::networks::ChainConfig;
use crate::shim::{address::Address, crypto::Signature};
use crate::state_manager::IdToAddressCache;
use crate::utils::ShallowClone as _;
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::get_size::CidWrapper;
Expand Down Expand Up @@ -52,7 +53,7 @@ async fn republish_pending_messages<T>(
cur_tipset: &SyncRwLock<Tipset>,
republished: &SyncRwLock<HashSet<Cid>>,
local_addrs: &SyncRwLock<Vec<Address>>,
key_cache: &SizeTrackingLruCache<Address, Address>,
key_cache: &IdToAddressCache,
chain_config: &ChainConfig,
) -> Result<(), Error>
where
Expand Down Expand Up @@ -223,7 +224,7 @@ pub async fn head_change<T>(
republished: &SyncRwLock<HashSet<Cid>>,
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
cur_tipset: &SyncRwLock<Tipset>,
key_cache: &SizeTrackingLruCache<Address, Address>,
key_cache: &IdToAddressCache,
state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
revert: Vec<Tipset>,
apply: Vec<Tipset>,
Expand Down Expand Up @@ -326,7 +327,7 @@ where

pub(in crate::message_pool) struct MpoolCtx<'a, T> {
pub api: &'a T,
pub key_cache: &'a SizeTrackingLruCache<Address, Address>,
pub key_cache: &'a IdToAddressCache,
pub pending: &'a SyncRwLock<HashMap<Address, MsgSet>>,
pub ts: &'a Tipset,
}
Expand Down
98 changes: 49 additions & 49 deletions src/message_pool/msgpool/msg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use crate::shim::{
econ::TokenAmount,
gas::{Gas, price_list_by_network_version},
};
use crate::state_manager::IdToAddressCache;
use crate::state_manager::utils::is_valid_for_sending;
use crate::utils::ShallowClone as _;
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::get_size::CidWrapper;
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
Expand Down Expand Up @@ -280,13 +282,13 @@ pub struct MessagePool<T> {
/// Sender half to send messages to other components
pub network_sender: flume::Sender<NetworkMessage>,
/// A cache for BLS signature keyed by Cid
pub bls_sig_cache: Arc<SizeTrackingLruCache<CidWrapper, Signature>>,
pub bls_sig_cache: SizeTrackingLruCache<CidWrapper, Signature>,
/// A cache for BLS signature keyed by Cid
pub sig_val_cache: Arc<SizeTrackingLruCache<CidWrapper, ()>>,
/// Cache for ID address to key address resolution.
pub key_cache: Arc<SizeTrackingLruCache<Address, Address>>,
pub sig_val_cache: SizeTrackingLruCache<CidWrapper, ()>,
/// Cache for ID address ID to key address resolution.
pub key_cache: IdToAddressCache,
/// Cache for state nonce lookups keyed by (`TipsetKey`, `Address`).
pub state_nonce_cache: Arc<SizeTrackingLruCache<StateNonceCacheKey, u64>>,
pub state_nonce_cache: SizeTrackingLruCache<StateNonceCacheKey, u64>,
/// A set of republished messages identified by their Cid
pub republished: Arc<SyncRwLock<HashSet<Cid>>>,
/// Acts as a signal to republish messages from the republished set of
Expand All @@ -303,25 +305,27 @@ pub struct MessagePool<T> {
/// Non-ID addresses are returned unchanged.
pub(in crate::message_pool) fn resolve_to_key<T: Provider>(
api: &T,
key_cache: &SizeTrackingLruCache<Address, Address>,
key_cache: &IdToAddressCache,
addr: &Address,
cur_ts: &Tipset,
) -> Result<Address, Error> {
if addr.protocol() != Protocol::ID {
Comment thread
hanabi1224 marked this conversation as resolved.
return Ok(*addr);
}
if let Some(resolved) = key_cache.get_cloned(addr) {
let id = addr.id().ok();
Comment thread
hanabi1224 marked this conversation as resolved.
if let Some(id) = &id
&& let Some(resolved) = key_cache.get_cloned(id)
{
return Ok(resolved);
}
let resolved = api.resolve_to_key(addr, cur_ts)?;
key_cache.push(*addr, resolved);
let resolved = api.resolve_to_deterministic_address_at_finality(addr, cur_ts)?;
if let Some(id) = id {
key_cache.push(id, resolved);
}
Ok(resolved)
}

/// Get the state nonce for an address, accounting for messages already included in `cur_ts`.
pub(in crate::message_pool) fn get_state_sequence<T: Provider>(
api: &T,
key_cache: &SizeTrackingLruCache<Address, Address>,
key_cache: &IdToAddressCache,
state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
addr: &Address,
cur_ts: &Tipset,
Expand Down Expand Up @@ -371,7 +375,7 @@ where
}

pub fn resolve_to_key(&self, addr: &Address, cur_ts: &Tipset) -> Result<Address, Error> {
resolve_to_key(self.api.as_ref(), self.key_cache.as_ref(), addr, cur_ts)
resolve_to_key(self.api.as_ref(), &self.key_cache, addr, cur_ts)
}

/// Add a signed message to the pool and its address.
Expand Down Expand Up @@ -531,9 +535,9 @@ where
let cur_ts = self.current_tipset();
add_helper(
self.api.as_ref(),
self.bls_sig_cache.as_ref(),
&self.bls_sig_cache,
self.pending.as_ref(),
self.key_cache.as_ref(),
&self.key_cache,
&cur_ts,
msg,
self.get_state_sequence(&from, &cur_ts)?,
Expand Down Expand Up @@ -570,8 +574,8 @@ where
fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result<u64, Error> {
get_state_sequence(
self.api.as_ref(),
self.key_cache.as_ref(),
self.state_nonce_cache.as_ref(),
&self.key_cache,
&self.state_nonce_cache,
addr,
cur_ts,
)
Expand Down Expand Up @@ -640,7 +644,7 @@ where

msg_vec.append(smsgs.as_mut());
for msg in umsg {
let smsg = recover_sig(self.bls_sig_cache.as_ref(), msg)?;
let smsg = recover_sig(&self.bls_sig_cache, msg)?;
msg_vec.push(smsg)
}
}
Expand Down Expand Up @@ -690,13 +694,13 @@ where
{
head_change(
self.api.as_ref(),
self.bls_sig_cache.as_ref(),
&self.bls_sig_cache,
self.repub_trigger.clone(),
self.republished.as_ref(),
self.pending.as_ref(),
self.cur_tipset.as_ref(),
self.key_cache.as_ref(),
self.state_nonce_cache.as_ref(),
&self.key_cache,
&self.state_nonce_cache,
revert,
apply,
)
Expand All @@ -722,22 +726,13 @@ where
let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
let pending = Arc::new(SyncRwLock::new(HashMap::new()));
let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset()));
let bls_sig_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
"bls_sig".into(),
BLS_SIG_CACHE_SIZE,
));
let sig_val_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
"sig_val".into(),
SIG_VAL_CACHE_SIZE,
));
let key_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
"mpool_key".into(),
KEY_CACHE_SIZE,
));
let state_nonce_cache = Arc::new(SizeTrackingLruCache::new_with_metrics(
"state_nonce".into(),
STATE_NONCE_CACHE_SIZE,
));
let bls_sig_cache =
SizeTrackingLruCache::new_with_metrics("bls_sig".into(), BLS_SIG_CACHE_SIZE);
let sig_val_cache =
SizeTrackingLruCache::new_with_metrics("sig_val".into(), SIG_VAL_CACHE_SIZE);
let key_cache = SizeTrackingLruCache::new_with_metrics("mpool_key".into(), KEY_CACHE_SIZE);
let state_nonce_cache =
SizeTrackingLruCache::new_with_metrics("state_nonce".into(), STATE_NONCE_CACHE_SIZE);
let local_msgs = Arc::new(SyncRwLock::new(HashSet::new()));
let republished = Arc::new(SyncRwLock::new(HashSet::new()));
let block_delay = chain_config.block_delay_secs;
Expand Down Expand Up @@ -765,11 +760,11 @@ where
let mut head_changes_rx = mp.api.subscribe_head_changes();

let api = mp.api.clone();
let bls_sig_cache = mp.bls_sig_cache.clone();
let bls_sig_cache = mp.bls_sig_cache.shallow_clone();
let pending = mp.pending.clone();
let republished = mp.republished.clone();
let key_cache = mp.key_cache.clone();
let state_nonce_cache = mp.state_nonce_cache.clone();
let key_cache = mp.key_cache.shallow_clone();
let state_nonce_cache = mp.state_nonce_cache.shallow_clone();

let current_ts = mp.cur_tipset.clone();
let repub_trigger = mp.repub_trigger.clone();
Expand All @@ -781,13 +776,13 @@ where
Ok(HeadChanges { reverts, applies }) => {
if let Err(e) = head_change(
api.as_ref(),
bls_sig_cache.as_ref(),
&bls_sig_cache,
repub_trigger.clone(),
republished.as_ref(),
pending.as_ref(),
&current_ts,
key_cache.as_ref(),
state_nonce_cache.as_ref(),
&key_cache,
&state_nonce_cache,
reverts,
applies,
)
Expand All @@ -811,7 +806,7 @@ where
let cur_tipset = mp.cur_tipset.clone();
let republished = mp.republished.clone();
let local_addrs = mp.local_addrs.clone();
let key_cache = mp.key_cache.clone();
let key_cache = mp.key_cache.shallow_clone();
let network_sender = Arc::new(mp.network_sender.clone());
let republish_interval = u64::from(10 * block_delay + chain_config.propagation_delay_secs);
// Reacts to republishing requests
Expand All @@ -830,7 +825,7 @@ where
cur_tipset.as_ref(),
republished.as_ref(),
local_addrs.as_ref(),
key_cache.as_ref(),
&key_cache,
&chain_config,
)
.await
Expand All @@ -854,7 +849,7 @@ pub(in crate::message_pool) fn add_helper<T>(
api: &T,
bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
key_cache: &SizeTrackingLruCache<Address, Address>,
key_cache: &IdToAddressCache,
cur_ts: &Tipset,
msg: SignedMessage,
sequence: u64,
Expand Down Expand Up @@ -1572,11 +1567,16 @@ mod tests {
.unwrap();

// f0300 exists in lookback state (root_a) → resolves successfully.
let result = Provider::resolve_to_key(&cs, &Address::new_id(300), &head).unwrap();
let result = Provider::resolve_to_deterministic_address_at_finality(
&cs,
&Address::new_id(300),
&head,
)
.unwrap();
assert_eq!(result, bls_a);

// f0400 exists only in head state (root_b), not in lookback → fails.
Provider::resolve_to_key(&cs, &Address::new_id(400), &head)
Provider::resolve_to_deterministic_address_at_finality(&cs, &Address::new_id(400), &head)
.expect_err("actor only in head state must not resolve via finality lookback");
}
}
18 changes: 13 additions & 5 deletions src/message_pool/msgpool/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ pub trait Provider {
fn load_tipset(&self, tsk: &TipsetKey) -> Result<Tipset, Error>;
/// Computes the base fee
fn chain_compute_base_fee(&self, ts: &Tipset) -> Result<TokenAmount, Error>;
/// Resolve an address to its key form using the tipset's parent state.
fn resolve_to_key(&self, addr: &Address, ts: &Tipset) -> Result<Address, Error>;
/// Similar to [`crate::state_manager::StateManager::resolve_to_deterministic_address`] but fails if the ID address being resolved isn't reorg-stable yet.
/// It should not be used for consensus-critical subsystems.
fn resolve_to_deterministic_address_at_finality(
&self,
addr: &Address,
ts: &Tipset,
) -> Result<Address, Error>;
/// Return all messages included in the given tipset.
fn messages_for_tipset(&self, ts: &Tipset) -> Result<Arc<Vec<ChainMessage>>, Error>;
// Get max number of messages per actor in the pool
Expand Down Expand Up @@ -103,9 +108,11 @@ impl<DB: Blockstore> Provider for ChainStore<DB> {
.map_err(|err| err.into())
}

/// Resolves an address to its deterministic key form using the state at
/// finality look-back, This ensures the resolved address is reorg-stable.
fn resolve_to_key(&self, addr: &Address, ts: &Tipset) -> Result<Address, Error> {
fn resolve_to_deterministic_address_at_finality(
&self,
addr: &Address,
ts: &Tipset,
) -> Result<Address, Error> {
match addr.protocol() {
BLS | Secp256k1 | Delegated => Ok(*addr),
Actor => Err(Error::Other(
Expand All @@ -121,6 +128,7 @@ impl<DB: Blockstore> Provider for ChainStore<DB> {
)
.map_err(|e| Error::Other(e.to_string()))?
} else {
// Matches the logic at <https://github.com/filecoin-project/lotus/blob/v1.35.1/chain/stmgr/stmgr.go#L361>
ts.clone()
};

Expand Down
7 changes: 4 additions & 3 deletions src/message_pool/msgpool/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::message::{MessageRead as _, SignedMessage};
use crate::message_pool::msg_chain::MsgChainNode;
use crate::shim::crypto::SignatureType;
use crate::shim::{address::Address, econ::TokenAmount};
use crate::state_manager::IdToAddressCache;
use ahash::{HashMap, HashMapExt};
use anyhow::{Context, bail, ensure};
use parking_lot::RwLock;
Expand Down Expand Up @@ -665,9 +666,9 @@ where
// Run head change to do reorg detection
run_head_change(
self.api.as_ref(),
self.bls_sig_cache.as_ref(),
&self.bls_sig_cache,
&self.pending,
self.key_cache.as_ref(),
&self.key_cache,
cur_ts.clone(),
ts.clone(),
&mut result,
Expand Down Expand Up @@ -820,7 +821,7 @@ pub(in crate::message_pool) fn run_head_change<T>(
api: &T,
bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
pending: &RwLock<HashMap<Address, MsgSet>>,
key_cache: &SizeTrackingLruCache<Address, Address>,
key_cache: &IdToAddressCache,
from: Tipset,
to: Tipset,
rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>,
Expand Down
6 changes: 5 additions & 1 deletion src/message_pool/msgpool/test_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,11 @@ impl Provider for TestApi {
Ok(TokenAmount::from_atto(100))
}

fn resolve_to_key(&self, addr: &Address, _ts: &Tipset) -> Result<Address, Error> {
fn resolve_to_deterministic_address_at_finality(
&self,
addr: &Address,
_ts: &Tipset,
) -> Result<Address, Error> {
Ok(self.inner.lock().resolve_addr(addr))
}

Expand Down
2 changes: 2 additions & 0 deletions src/shim/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::sync::{
atomic::{AtomicU8, Ordering},
};

pub type AddressId = u64;

/// Zero address used to avoid allowing it to be used for verification.
/// This is intentionally disallowed because it is an edge case with Filecoin's BLS
/// signature verification.
Expand Down
6 changes: 5 additions & 1 deletion src/state_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::shim::actors::init::{self, State};
use crate::shim::actors::miner::{MinerInfo, MinerPower, Partition};
use crate::shim::actors::verifreg::{Allocation, AllocationID, Claim};
use crate::shim::actors::*;
use crate::shim::address::AddressId;
use crate::shim::crypto::{Signature, SignatureType};
use crate::shim::{
actors::{
Expand Down Expand Up @@ -87,6 +88,7 @@ use tracing::{error, info, instrument, warn};
const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
const DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
pub const EVENTS_AMT_BITWIDTH: u32 = 5;
pub type IdToAddressCache = SizeTrackingLruCache<AddressId, Address>;

/// Result of executing an individual chain message in a tipset.
///
Expand Down Expand Up @@ -192,7 +194,7 @@ pub struct StateManager<DB> {
cs: Arc<ChainStore<DB>>,
/// This is a cache which indexes tipsets to their calculated state output (state root, receipt root).
cache: TipsetStateCache<ExecutedTipset>,
id_to_deterministic_address_cache: SizeTrackingLruCache<u64, Address>,
id_to_deterministic_address_cache: IdToAddressCache,
beacon: Arc<crate::beacon::BeaconSchedule>,
engine: Arc<MultiEngine>,
}
Expand Down Expand Up @@ -1788,6 +1790,8 @@ where
state.verified_client_data_cap(self.blockstore(), id)
}

/// Similar to [`StateTree::resolve_to_deterministic_addr`] but does not allow [`crate::shim::address::Protocol::Actor`] type of addresses.
/// Uses the [`Tipset`] `ts` to generate the VM state.
pub async fn resolve_to_deterministic_address(
self: &Arc<Self>,
address: Address,
Expand Down
Loading