Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/beacon/beacon_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::utils::encoding::serde_byte_array;
use byteorder::{BigEndian, ByteOrder as _};
use digest::Digest as _;
use get_size2::GetSize;
use serde_tuple::{self, Deserialize_tuple, Serialize_tuple};

/// The result from getting an entry from `Drand`.
Expand All @@ -12,7 +13,17 @@ use serde_tuple::{self, Deserialize_tuple, Serialize_tuple};
/// This beacon entry is stored on chain in the block header.
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
#[derive(
Clone, Debug, Default, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize_tuple, Deserialize_tuple,
Clone,
Debug,
Default,
Eq,
PartialEq,
Hash,
Ord,
PartialOrd,
Serialize_tuple,
Deserialize_tuple,
GetSize,
)]
pub struct BeaconEntry {
round: u64,
Expand Down
22 changes: 11 additions & 11 deletions src/beacon/drand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ use super::{
};
use crate::shim::clock::ChainEpoch;
use crate::shim::version::NetworkVersion;
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::net::global_http_client;
use anyhow::Context as _;
use async_trait::async_trait;
use backon::{ExponentialBuilder, Retryable};
use bls_signatures::Serialize as _;
use itertools::Itertools as _;
use lru::LruCache;
use parking_lot::RwLock;
use serde::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
use tracing::debug;
use url::Url;
Expand Down Expand Up @@ -244,7 +243,7 @@ pub struct DrandBeacon {
fil_round_time: u64,

/// Keeps track of verified beacon entries.
verified_beacons: RwLock<LruCache<u64, BeaconEntry>>,
verified_beacons: SizeTrackingLruCache<u64, BeaconEntry>,
}

impl DrandBeacon {
Expand All @@ -262,14 +261,15 @@ impl DrandBeacon {
drand_gen_time: config.chain_info.genesis_time as u64,
fil_round_time: interval,
fil_gen_time: genesis_ts,
verified_beacons: RwLock::new(LruCache::new(
verified_beacons: SizeTrackingLruCache::new_with_default_metrics_registry(
"verified_beacons_cache".into(),
NonZeroUsize::new(CACHE_SIZE).expect("Infallible"),
)),
),
}
}

fn is_verified(&self, entry: &BeaconEntry) -> bool {
let cache = self.verified_beacons.read();
let cache = self.verified_beacons.cache().read();
cache.peek(&entry.round()) == Some(entry)
}
}
Expand Down Expand Up @@ -333,20 +333,20 @@ impl Beacon for DrandBeacon {
};

if is_valid && !validated.is_empty() {
let mut cache = self.verified_beacons.write();
if cache.cap().get() < validated.len() {
tracing::warn!(cap=%cache.cap().get(), validated_len=%validated.len(), "verified_beacons.cap() is too small");
let cap = self.verified_beacons.cap();
if cap < validated.len() {
tracing::warn!(%cap, validated_len=%validated.len(), "verified_beacons.cap() is too small");
}
for entry in validated {
cache.put(entry.round(), entry.clone());
self.verified_beacons.push(entry.round(), entry.clone());
}
}

Ok(is_valid)
}

async fn entry(&self, round: u64) -> anyhow::Result<BeaconEntry> {
let cached: Option<BeaconEntry> = self.verified_beacons.read().peek(&round).cloned();
let cached: Option<BeaconEntry> = self.verified_beacons.peek_cloned(&round);
match cached {
Some(cached_entry) => Ok(cached_entry),
None => {
Expand Down
9 changes: 5 additions & 4 deletions src/message_pool/msgpool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
use crate::message::{Message as MessageTrait, SignedMessage};
use crate::networks::ChainConfig;
use crate::shim::{address::Address, crypto::Signature};
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::get_size::CidWrapper;
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use cid::Cid;
use fvm_ipld_encoding::to_vec;
use lru::LruCache;
use parking_lot::{Mutex, RwLock as SyncRwLock};
use tracing::error;
use utils::{get_base_fee_lower_bound, recover_sig};
Expand Down Expand Up @@ -211,7 +212,7 @@ where
#[allow(clippy::too_many_arguments)]
pub async fn head_change<T>(
api: &T,
bls_sig_cache: &Mutex<LruCache<Cid, Signature>>,
bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
repub_trigger: Arc<flume::Sender<()>>,
republished: &SyncRwLock<HashSet<Cid>>,
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
Expand All @@ -233,7 +234,7 @@ where
let (umsg, smsgs) = api.messages_for_block(block)?;
msgs.extend(smsgs);
for msg in umsg {
let smsg = recover_sig(&mut bls_sig_cache.lock(), msg)?;
let smsg = recover_sig(bls_sig_cache, msg)?;
msgs.push(smsg)
}
}
Expand Down Expand Up @@ -420,7 +421,7 @@ pub mod tests {
let sig = Signature::new_secp256k1(vec![]);
let signed = SignedMessage::new_unchecked(umsg, sig);
let cid = signed.cid();
pool.sig_val_cache.lock().put(cid, ());
pool.sig_val_cache.push(cid.into(), ());
signed
}

Expand Down
27 changes: 17 additions & 10 deletions src/message_pool/msgpool/msg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ use crate::shim::{
gas::{Gas, price_list_by_network_version},
};
use crate::state_manager::is_valid_for_sending;
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::get_size::CidWrapper;
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use anyhow::Context as _;
use cid::Cid;
use futures::StreamExt;
use fvm_ipld_encoding::to_vec;
use itertools::Itertools;
use lru::LruCache;
use nonzero_ext::nonzero;
use parking_lot::{Mutex, RwLock as SyncRwLock};
use tokio::{sync::broadcast::error::RecvError, task::JoinSet, time::interval};
Expand Down Expand Up @@ -182,9 +183,9 @@ pub struct MessagePool<T> {
/// Sender half to send messages to other components
pub network_sender: flume::Sender<NetworkMessage>,
/// A cache for BLS signature keyed by Cid
pub bls_sig_cache: Arc<Mutex<LruCache<Cid, Signature>>>,
pub bls_sig_cache: Arc<SizeTrackingLruCache<CidWrapper, Signature>>,
/// A cache for BLS signature keyed by Cid
pub sig_val_cache: Arc<Mutex<LruCache<Cid, ()>>>,
pub sig_val_cache: Arc<SizeTrackingLruCache<CidWrapper, ()>>,
/// A set of republished messages identified by their Cid
pub republished: Arc<SyncRwLock<HashSet<Cid>>>,
/// Acts as a signal to republish messages from the republished set of
Expand Down Expand Up @@ -261,14 +262,14 @@ where
fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> {
let cid = msg.cid();

if let Some(()) = self.sig_val_cache.lock().get(&cid) {
if let Some(()) = self.sig_val_cache.get_cloned(&(cid).into()) {
return Ok(());
}

msg.verify(self.chain_config.eth_chain_id)
.map_err(|e| Error::Other(e.to_string()))?;

self.sig_val_cache.lock().put(cid, ());
self.sig_val_cache.push(cid.into(), ());

Ok(())
}
Expand Down Expand Up @@ -413,7 +414,7 @@ where

msg_vec.append(smsgs.as_mut());
for msg in umsg {
let smsg = recover_sig(&mut self.bls_sig_cache.lock(), msg)?;
let smsg = recover_sig(self.bls_sig_cache.as_ref(), msg)?;
msg_vec.push(smsg)
}
}
Expand Down Expand Up @@ -471,8 +472,14 @@ where
let local_addrs = Arc::new(SyncRwLock::new(Vec::new()));
let pending = Arc::new(SyncRwLock::new(HashMap::new()));
let tipset = Arc::new(Mutex::new(api.get_heaviest_tipset()));
let bls_sig_cache = Arc::new(Mutex::new(LruCache::new(BLS_SIG_CACHE_SIZE)));
let sig_val_cache = Arc::new(Mutex::new(LruCache::new(SIG_VAL_CACHE_SIZE)));
let bls_sig_cache = Arc::new(SizeTrackingLruCache::new_with_default_metrics_registry(
"bls_sig_cache".into(),
BLS_SIG_CACHE_SIZE,
));
let sig_val_cache = Arc::new(SizeTrackingLruCache::new_with_default_metrics_registry(
"sig_val_cache".into(),
SIG_VAL_CACHE_SIZE,
));
let local_msgs = Arc::new(SyncRwLock::new(HashSet::new()));
let republished = Arc::new(SyncRwLock::new(HashSet::new()));
let block_delay = chain_config.block_delay_secs;
Expand Down Expand Up @@ -583,7 +590,7 @@ where
/// hash-map.
pub(in crate::message_pool) fn add_helper<T>(
api: &T,
bls_sig_cache: &Mutex<LruCache<Cid, Signature>>,
bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
msg: SignedMessage,
sequence: u64,
Expand All @@ -592,7 +599,7 @@ where
T: Provider,
{
if msg.signature().signature_type() == SignatureType::Bls {
bls_sig_cache.lock().put(msg.cid(), msg.signature().clone());
bls_sig_cache.push(msg.cid().into(), msg.signature().clone());
}

if msg.message().gas_limit > 100_000_000 {
Expand Down
10 changes: 5 additions & 5 deletions src/message_pool/msgpool/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
use crate::chain::MINIMUM_BASE_FEE;
use crate::message::{Message as MessageTrait, SignedMessage};
use crate::shim::{crypto::Signature, econ::TokenAmount, message::Message};
use cid::Cid;
use lru::LruCache;
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::get_size::CidWrapper;
use num_rational::BigRational;
use num_traits::ToPrimitive;

Expand Down Expand Up @@ -46,12 +46,12 @@ pub(in crate::message_pool) fn get_gas_perf(gas_reward: &TokenAmount, gas_limit:
/// Attempt to get a signed message that corresponds to an unsigned message in
/// `bls_sig_cache`.
pub(in crate::message_pool) fn recover_sig(
bls_sig_cache: &mut LruCache<Cid, Signature>,
bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
msg: Message,
) -> Result<SignedMessage, Error> {
let val = bls_sig_cache
.get(&msg.cid())
.get_cloned(&(msg.cid()).into())
.ok_or_else(|| Error::Other("Could not recover sig".to_owned()))?;
let smsg = SignedMessage::new_from_parts(msg, val.clone())?;
let smsg = SignedMessage::new_from_parts(msg, val)?;
Ok(smsg)
}
30 changes: 8 additions & 22 deletions src/rpc/methods/f3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ pub use self::types::{
};
use self::{types::*, util::*};
use super::wallet::WalletSign;
use crate::shim::actors::{
convert::{
from_policy_v13_to_v9, from_policy_v13_to_v10, from_policy_v13_to_v11,
from_policy_v13_to_v12, from_policy_v13_to_v14, from_policy_v13_to_v15,
from_policy_v13_to_v16,
},
miner, power,
};
use crate::{
blocks::Tipset,
chain::index::ResolveNullTipset,
Expand All @@ -33,24 +41,12 @@ use crate::{
},
utils::misc::env::is_env_set_and_truthy,
};
use crate::{
blocks::TipsetKey,
shim::actors::{
convert::{
from_policy_v13_to_v9, from_policy_v13_to_v10, from_policy_v13_to_v11,
from_policy_v13_to_v12, from_policy_v13_to_v14, from_policy_v13_to_v15,
from_policy_v13_to_v16,
},
miner, power,
},
};
use ahash::{HashMap, HashSet};
use anyhow::Context as _;
use enumflags2::BitFlags;
use fvm_ipld_blockstore::Blockstore;
use jsonrpsee::core::{client::ClientT as _, params::ArrayParams};
use libp2p::PeerId;
use lru::LruCache;
use num::Signed as _;
use parking_lot::RwLock;
use std::{
Expand Down Expand Up @@ -473,21 +469,11 @@ impl RpcMethod<1> for GetPowerTable {
ctx: Ctx<impl Blockstore + Send + Sync + 'static>,
(f3_tsk,): Self::Params,
) -> Result<Self::Ok, ServerError> {
static CACHE: LazyLock<tokio::sync::Mutex<LruCache<TipsetKey, Vec<F3PowerEntry>>>> =
LazyLock::new(|| {
tokio::sync::Mutex::new(LruCache::new(32.try_into().expect("Infallible")))
});
let tsk = f3_tsk.try_into()?;
let mut cache = CACHE.lock().await;
if let Some(v) = cache.get(&tsk) {
return Ok(v.clone());
}

let start = std::time::Instant::now();
let ts = ctx.chain_index().load_required_tipset(&tsk)?;
let power_entries = Self::compute(&ctx, &ts).await?;
tracing::debug!(epoch=%ts.epoch(), %tsk, "F3.GetPowerTable, took {}", humantime::format_duration(start.elapsed()));
cache.push(tsk, power_entries.clone());
Ok(power_entries)
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/shim/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ use fvm_ipld_encoding::{
ser, strict_bytes,
};
pub use fvm_shared3::TICKET_RANDOMNESS_LOOKBACK;
use get_size2::GetSize;
use num::FromPrimitive;
use num_derive::FromPrimitive;
use schemars::JsonSchema;
use std::borrow::Cow;

/// A cryptographic signature, represented in bytes, of any key protocol.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, PartialEq, Eq, Hash, GetSize)]
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
pub struct Signature {
pub sig_type: SignatureType,
Expand Down Expand Up @@ -307,6 +308,7 @@ pub fn cid_to_replica_commitment_v1(c: &Cid) -> Result<Commitment, &'static str>
strum::Display,
strum::EnumString,
JsonSchema,
GetSize,
)]
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
#[repr(u8)]
Expand Down
Loading
Loading