Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/message_pool/msgpool/events.rs
Comment thread
akaladarshi marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2019-2026 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

//! Event types published by the pending pool.

use crate::message::SignedMessage;

pub(in crate::message_pool) const MPOOL_UPDATE_CHANNEL_CAPACITY: usize = 256;

/// A change to the pending pool.
#[allow(dead_code)] // TODO: This will be used in https://github.com/ChainSafe/forest/pull/6941
#[derive(Clone, Debug)]
pub enum MpoolUpdate {
Add(SignedMessage),
Remove(SignedMessage),
}
47 changes: 23 additions & 24 deletions src/message_pool/msgpool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
// Copyright 2019-2026 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

pub(in crate::message_pool) mod events;
pub(in crate::message_pool) mod metrics;
pub(in crate::message_pool) mod msg_pool;
pub(in crate::message_pool) mod msg_set;
pub(in crate::message_pool) mod pending_store;
pub(in crate::message_pool) mod provider;
pub mod selection;
#[cfg(test)]
pub mod test_provider;
pub(in crate::message_pool) mod utils;
// TODO: This will be used in https://github.com/ChainSafe/forest/pull/6941
#[allow(unused_imports)]
pub use events::MpoolUpdate;
Comment thread
akaladarshi marked this conversation as resolved.

use std::{borrow::BorrowMut, cmp::Ordering};

Expand All @@ -30,10 +36,8 @@ use utils::{get_base_fee_lower_bound, recover_sig};
use super::errors::Error;
use crate::message_pool::{
msg_chain::{Chains, create_message_chains},
msg_pool::{
MsgSet, StateNonceCacheKey, StrictnessPolicy, TrustPolicy, add_helper, remove,
resolve_to_key,
},
msg_pool::{StateNonceCacheKey, StrictnessPolicy, TrustPolicy, add_helper, resolve_to_key},
msgpool::pending_store::PendingStore,
provider::Provider,
};

Expand All @@ -49,7 +53,7 @@ const MIN_GAS: u64 = 1298450;
async fn republish_pending_messages<T>(
api: &T,
network_sender: &flume::Sender<NetworkMessage>,
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
pending_store: &PendingStore,
cur_tipset: &SyncRwLock<Tipset>,
republished: &SyncRwLock<HashSet<Cid>>,
local_addrs: &SyncRwLock<Vec<Address>>,
Expand All @@ -72,15 +76,11 @@ where
}) else {
continue;
};
if let Some(mset) = pending.read().get(&resolved) {
if let Some(mset) = pending_store.snapshot_for(&resolved) {
if mset.msgs.is_empty() {
continue;
}
let mut pend: HashMap<u64, SignedMessage> = HashMap::with_capacity(mset.msgs.len());
for (nonce, m) in mset.msgs.clone().into_iter() {
pend.insert(nonce, m);
}
pending_map.insert(resolved, pend);
pending_map.insert(resolved, mset.msgs);
}
}

Expand Down Expand Up @@ -217,12 +217,12 @@ where
/// The state nonce cache is naturally invalidated when the tipset changes, since
/// it is keyed by [`TipsetKey`](crate::blocks::TipsetKey).
#[allow(clippy::too_many_arguments)]
pub async fn head_change<T>(
pub(in crate::message_pool) async fn head_change<T>(
api: &T,
bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
repub_trigger: flume::Sender<()>,
republished: &SyncRwLock<HashSet<Cid>>,
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
pending_store: &PendingStore,
cur_tipset: &SyncRwLock<Tipset>,
key_cache: &IdToAddressCache,
state_nonce_cache: &SizeTrackingLruCache<StateNonceCacheKey, u64>,
Expand Down Expand Up @@ -267,7 +267,7 @@ where
let mpool_ctx = MpoolCtx {
api,
key_cache,
pending,
pending_store,
ts: &ts,
};
for b in ts.block_headers() {
Expand Down Expand Up @@ -301,7 +301,7 @@ where
let mpool_ctx = MpoolCtx {
api,
key_cache,
pending,
pending_store,
ts: &cur_ts,
};
for (_, hm) in rmsgs {
Expand All @@ -310,7 +310,7 @@ where
if let Err(e) = add_helper(
api,
bls_sig_cache,
pending,
pending_store,
key_cache,
&cur_ts,
msg,
Expand All @@ -328,13 +328,13 @@ where
pub(in crate::message_pool) struct MpoolCtx<'a, T> {
pub api: &'a T,
pub key_cache: &'a IdToAddressCache,
pub pending: &'a SyncRwLock<HashMap<Address, MsgSet>>,
pub pending_store: &'a PendingStore,
pub ts: &'a Tipset,
}

impl<T: Provider> MpoolCtx<'_, T> {
/// Removes a message from the selected messages map (`rmsgs`). If the
/// message is not found there, falls back to removing it from the pending set.
/// Remove a message from the selected messages map (`rmsgs`). If the
/// message is not there, fall back to removing it from the pending store.
pub(in crate::message_pool) fn remove_from_selected_msgs(
&self,
from: &Address,
Expand All @@ -348,7 +348,7 @@ impl<T: Provider> MpoolCtx<'_, T> {
&& let Ok(resolved) = resolve_to_key(self.api, self.key_cache, from, self.ts)
.inspect_err(|e| tracing::debug!(%from, "remove: failed to resolve address: {e:#}"))
{
remove(&resolved, self.pending, sequence, true)?;
let _ = self.pending_store.remove(&resolved, sequence, true);
}
Ok(())
}
Expand Down Expand Up @@ -688,14 +688,13 @@ pub mod tests {
let msg = create_fake_smsg(&mpool, &Address::new_id(1001), &id_addr, 0, 1000000, 1);
mpool.add(msg).unwrap();

// Pending map should be keyed by key_addr, not id_addr
let pending = mpool.pending.read();
// Pending map should be keyed by key_addr, not id_addr.
assert!(
pending.get(&key_addr).is_some(),
mpool.pending_store.snapshot_for(&key_addr).is_some(),
"pending should be keyed by resolved key address"
);
assert!(
pending.get(&id_addr).is_none(),
mpool.pending_store.snapshot_for(&id_addr).is_none(),
"pending should NOT have entry under raw ID address"
);
}
Expand Down
Loading
Loading