Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/message_pool/msgpool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::utils::get_size::CidWrapper;
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use cid::Cid;
use fvm_ipld_encoding::to_vec;
use parking_lot::{Mutex, RwLock as SyncRwLock};
use parking_lot::RwLock as SyncRwLock;
use tracing::error;
use utils::{get_base_fee_lower_bound, recover_sig};

Expand Down Expand Up @@ -57,15 +57,15 @@ async fn republish_pending_messages<T>(
api: &T,
network_sender: &flume::Sender<NetworkMessage>,
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
cur_tipset: &Mutex<Arc<Tipset>>,
cur_tipset: &SyncRwLock<Arc<Tipset>>,
republished: &SyncRwLock<HashSet<Cid>>,
local_addrs: &SyncRwLock<Vec<Address>>,
chain_config: &ChainConfig,
) -> Result<(), Error>
where
T: Provider,
{
let ts = cur_tipset.lock().clone();
let ts = cur_tipset.read().clone();
let mut pending_map: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();

republished.write().clear();
Expand Down Expand Up @@ -216,7 +216,7 @@ pub async fn head_change<T>(
repub_trigger: Arc<flume::Sender<()>>,
republished: &SyncRwLock<HashSet<Cid>>,
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
cur_tipset: &Mutex<Arc<Tipset>>,
cur_tipset: &SyncRwLock<Arc<Tipset>>,
revert: Vec<Tipset>,
apply: Vec<Tipset>,
) -> Result<(), Error>
Expand All @@ -227,7 +227,7 @@ where
let mut rmsgs: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();
for ts in revert {
let pts = api.load_tipset(ts.parents())?;
*cur_tipset.lock() = pts;
*cur_tipset.write() = pts;

let mut msgs: Vec<SignedMessage> = Vec::new();
for block in ts.block_headers() {
Expand Down Expand Up @@ -266,7 +266,7 @@ where
}
}
}
*cur_tipset.lock() = Arc::new(ts);
*cur_tipset.write() = Arc::new(ts);
}
if repub {
repub_trigger
Expand All @@ -276,7 +276,7 @@ where
}
for (_, hm) in rmsgs {
for (_, msg) in hm {
let sequence = get_state_sequence(api, &msg.from(), &cur_tipset.lock().clone())?;
let sequence = get_state_sequence(api, &msg.from(), &cur_tipset.read().clone())?;
if let Err(e) = add_helper(api, bls_sig_cache, pending, msg, sequence) {
error!("Failed to read message from reorg to mpool: {}", e);
}
Expand Down Expand Up @@ -616,7 +616,7 @@ pub mod tests {
// sleep allows for async block to update mpool's cur_tipset
tokio::time::sleep(Duration::new(2, 0)).await;

let cur_ts = mpool.cur_tipset.lock().clone();
let cur_ts = mpool.current_tipset();
assert_eq!(cur_ts.as_ref(), &tipset);
}

Expand Down
25 changes: 14 additions & 11 deletions src/message_pool/msgpool/msg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use futures::StreamExt;
use fvm_ipld_encoding::to_vec;
use itertools::Itertools;
use nonzero_ext::nonzero;
use parking_lot::{Mutex, RwLock as SyncRwLock};
use parking_lot::RwLock as SyncRwLock;
use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval};
use tracing::warn;

Expand Down Expand Up @@ -177,7 +177,7 @@ pub struct MessagePool<T> {
/// A map of pending messages where the key is the address
pub pending: Arc<SyncRwLock<HashMap<Address, MsgSet>>>,
/// The current tipset (a set of blocks)
pub cur_tipset: Arc<Mutex<Arc<Tipset>>>,
pub cur_tipset: Arc<SyncRwLock<Arc<Tipset>>>,
/// The underlying provider
pub api: Arc<T>,
/// Sender half to send messages to other components
Expand All @@ -202,6 +202,11 @@ impl<T> MessagePool<T>
where
T: Provider,
{
/// Gets the current tipset
pub fn current_tipset(&self) -> Arc<Tipset> {
self.cur_tipset.read().clone()
}

/// Add a signed message to the pool and its address.
fn add_local(&self, m: SignedMessage) -> Result<(), Error> {
self.local_addrs.write().push(m.from());
Expand All @@ -214,7 +219,7 @@ where
pub async fn push(&self, msg: SignedMessage) -> Result<Cid, Error> {
self.check_message(&msg)?;
let cid = msg.cid();
let cur_ts = self.cur_tipset.lock().clone();
let cur_ts = self.current_tipset();
let publish = self.add_tipset(msg.clone(), &cur_ts, true)?;
let msg_ser = to_vec(&msg)?;
let network_name = self.chain_config.network.genesis_name();
Expand Down Expand Up @@ -249,10 +254,8 @@ where
/// fits the parameters to be pushed to the `MessagePool`.
pub fn add(&self, msg: SignedMessage) -> Result<(), Error> {
self.check_message(&msg)?;

let tip = self.cur_tipset.lock().clone();

self.add_tipset(msg, &tip, false)?;
let ts = self.current_tipset();
self.add_tipset(msg, &ts, false)?;
Ok(())
}

Expand Down Expand Up @@ -320,7 +323,7 @@ where
/// the pending hash-map.
fn add_helper(&self, msg: SignedMessage) -> Result<(), Error> {
let from = msg.from();
let cur_ts = self.cur_tipset.lock().clone();
let cur_ts = self.current_tipset();
add_helper(
self.api.as_ref(),
self.bls_sig_cache.as_ref(),
Expand All @@ -333,7 +336,7 @@ where
/// Get the sequence for a given address, return Error if there is a failure
/// to retrieve the respective sequence.
pub fn get_sequence(&self, addr: &Address) -> Result<u64, Error> {
let cur_ts = self.cur_tipset.lock().clone();
let cur_ts = self.current_tipset();

let sequence = self.get_state_sequence(addr, &cur_ts)?;

Expand Down Expand Up @@ -378,7 +381,7 @@ where
)
}

let cur_ts = self.cur_tipset.lock().clone();
let cur_ts = self.current_tipset();

Ok((out, cur_ts))
}
Expand Down Expand Up @@ -471,7 +474,7 @@ where
{
let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
let pending = Arc::new(SyncRwLock::new(HashMap::new()));
let tipset = Arc::new(Mutex::new(api.get_heaviest_tipset()));
let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset()));
let bls_sig_cache = Arc::new(SizeTrackingLruCache::new_with_default_metrics_registry(
"bls_sig_cache".into(),
BLS_SIG_CACHE_SIZE,
Expand Down
2 changes: 1 addition & 1 deletion src/message_pool/msgpool/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ where
/// for inclusion from the pool, given the ticket quality of a miner.
/// This method selects messages for including in a block.
pub fn select_messages(&self, ts: &Tipset, tq: f64) -> Result<Vec<SignedMessage>, Error> {
let cur_ts = self.cur_tipset.lock().clone();
let cur_ts = self.current_tipset();
// if the ticket quality is high enough that the first block has higher
// probability than any other block, then we don't bother with optimal
// selection because the first block will always have higher effective
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/methods/gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl GasEstimateGasLimit {
.map(|s| s.into_iter().map(ChainMessage::Signed).collect::<Vec<_>>())
.unwrap_or_default();

let ts = data.mpool.cur_tipset.lock().clone();
let ts = data.mpool.current_tipset();
// Pretend that the message is signed. This has an influence on the gas
// cost. We obviously can't generate a valid signature. Instead, we just
// fill the signature with zeros. The validity is not checked.
Expand Down
Loading