From 25ee78cd4eb647b05d1ae385382d004964de0389 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 1 Dec 2025 19:09:48 +0800 Subject: [PATCH 1/2] fix: avoid deep-cloning Tipset and FullTipset --- src/beacon/drand.rs | 5 - src/benchmark_private/tipset_validation.rs | 4 +- src/blocks/header.rs | 7 -- src/blocks/ticket.rs | 1 + src/blocks/tipset.rs | 91 +++++++----------- src/blocks/vrf_proof.rs | 6 +- src/chain/store/chain_store.rs | 20 ++-- src/chain/store/index.rs | 58 ++++++------ src/chain_sync/chain_follower.rs | 57 ++++++------ src/chain_sync/network_context.rs | 19 +--- src/chain_sync/tipset_syncer.rs | 4 +- src/cli/subcommands/chain_cmd.rs | 3 +- src/cli/subcommands/f3_cmd.rs | 4 +- src/cli/subcommands/info_cmd.rs | 24 +++-- src/daemon/db_util.rs | 2 +- src/daemon/mod.rs | 8 +- src/fil_cns/validation.rs | 8 +- src/interpreter/fvm2.rs | 4 +- src/interpreter/fvm3.rs | 4 +- src/interpreter/fvm4.rs | 4 +- src/interpreter/vm.rs | 2 +- src/libp2p/chain_exchange/message.rs | 10 +- src/message_pool/msgpool/mod.rs | 13 ++- src/message_pool/msgpool/msg_pool.rs | 14 ++- src/message_pool/msgpool/provider.rs | 8 +- src/message_pool/msgpool/selection.rs | 10 +- src/message_pool/msgpool/test_provider.rs | 21 ++--- src/rpc/methods/chain.rs | 44 ++++----- src/rpc/methods/eth.rs | 44 ++++----- src/rpc/methods/eth/filter/mod.rs | 6 +- src/rpc/methods/f3.rs | 4 +- src/rpc/methods/f3/types.rs | 6 -- src/rpc/methods/gas.rs | 4 +- src/rpc/methods/state.rs | 14 +-- src/rpc/methods/sync.rs | 7 +- src/rpc/mod.rs | 2 +- src/state_manager/chain_rand.rs | 16 ++-- src/state_manager/mod.rs | 92 +++++++++---------- src/state_manager/tests.rs | 22 ++--- src/tool/offline_server/server.rs | 4 +- .../subcommands/api_cmd/api_compare_tests.rs | 4 +- src/tool/subcommands/archive_cmd.rs | 18 ++-- src/tool/subcommands/benchmark_cmd.rs | 5 +- src/tool/subcommands/snapshot_cmd.rs | 4 +- src/tool/subcommands/state_compute_cmd.rs | 2 +- 45 files changed, 299 insertions(+), 410 deletions(-) diff --git a/src/beacon/drand.rs b/src/beacon/drand.rs index de8fbc93d049..26f8f561dcaf 100644 --- a/src/beacon/drand.rs +++ b/src/beacon/drand.rs @@ -63,11 +63,6 @@ pub struct DrandConfig<'a> { pub struct BeaconSchedule(pub Vec); impl BeaconSchedule { - /// Constructs a new, empty `BeaconSchedule` with the specified capacity. - pub fn with_capacity(capacity: usize) -> Self { - BeaconSchedule(Vec::with_capacity(capacity)) - } - /// Returns the beacon entries for a given epoch. /// When the beacon for the given epoch is on a new beacon, randomness /// entries are taken from the last two rounds. diff --git a/src/benchmark_private/tipset_validation.rs b/src/benchmark_private/tipset_validation.rs index 0fe64fdd0228..b6e5b4abbbca 100644 --- a/src/benchmark_private/tipset_validation.rs +++ b/src/benchmark_private/tipset_validation.rs @@ -45,7 +45,7 @@ async fn get_snapshot(chain: &NetworkChain, epoch: i64) -> anyhow::Result anyhow::Result<(Arc>, Arc)> { +) -> anyhow::Result<(Arc>, Tipset)> { let snap_car = AnyCar::try_from(snapshot)?; let ts = Arc::new(snap_car.heaviest_tipset()?); let db = Arc::new(ManyCar::new(MemoryDB::default()).with_read_only(snap_car)?); @@ -66,7 +66,7 @@ async fn prepare_validation( Ok((state_manager, ts)) } -async fn validate(state_manager: Arc>, ts: Arc) { +async fn validate(state_manager: Arc>, ts: Tipset) { state_manager .compute_tipset_state(ts, crate::state_manager::NO_CALLBACK, VMTrace::NotTraced) .await diff --git a/src/blocks/header.rs b/src/blocks/header.rs index 3ebcf24711ad..a7a87b2db623 100644 --- a/src/blocks/header.rs +++ b/src/blocks/header.rs @@ -189,13 +189,6 @@ impl RawBlockHeader { blk.signature = None; fvm_ipld_encoding::to_vec(&blk).expect("block serialization cannot fail") } - - /// If the block timestamp is within the allowable clock drift - pub fn is_within_clock_drift(&self) -> bool { - self.timestamp - <= (chrono::Utc::now().timestamp() as u64) - .saturating_add(crate::shim::clock::ALLOWABLE_CLOCK_DRIFT) - } } // The derive macro does not compile for some reason diff --git a/src/blocks/ticket.rs b/src/blocks/ticket.rs index 86339446b084..0c83e2b68152 100644 --- a/src/blocks/ticket.rs +++ b/src/blocks/ticket.rs @@ -28,6 +28,7 @@ pub struct Ticket { } impl Ticket { + #[cfg(test)] /// Ticket constructor pub fn new(vrfproof: VRFProof) -> Self { Self { vrfproof } diff --git a/src/blocks/tipset.rs b/src/blocks/tipset.rs index 490d5de22caa..edb568eddadd 100644 --- a/src/blocks/tipset.rs +++ b/src/blocks/tipset.rs @@ -25,7 +25,6 @@ use num::BigInt; use nunny::{Vec as NonEmpty, vec as nonempty}; use serde::{Deserialize, Serialize}; use thiserror::Error; -use tracing::info; /// A set of `CIDs` forming a unique key for a Tipset. /// Equal keys will have equivalent iteration order, but note that the `CIDs` @@ -151,9 +150,9 @@ impl IntoIterator for TipsetKey { pub struct Tipset { /// Sorted #[get_size(size_fn = nunny_vec_heap_size_helper)] - headers: NonEmpty, + headers: Arc>, // key is lazily initialized via `fn key()`. - key: OnceLock, + key: Arc>, } impl From for Tipset { @@ -171,8 +170,8 @@ impl From<&CachingBlockHeader> for Tipset { impl From for Tipset { fn from(value: CachingBlockHeader) -> Self { Self { - headers: nonempty![value], - key: OnceLock::new(), + headers: nonempty![value].into(), + key: OnceLock::new().into(), } } } @@ -180,8 +179,8 @@ impl From for Tipset { impl From> for Tipset { fn from(headers: NonEmpty) -> Self { Self { - headers, - key: OnceLock::new(), + headers: headers.into(), + key: OnceLock::new().into(), } } } @@ -202,14 +201,12 @@ impl quickcheck::Arbitrary for Tipset { } impl From for Tipset { - fn from(full_tipset: FullTipset) -> Self { - let key = full_tipset.key; - let headers = full_tipset - .blocks + fn from(FullTipset { key, blocks }: FullTipset) -> Self { + let headers = Arc::unwrap_or_clone(blocks) .into_iter_ne() .map(|block| block.header) - .collect_vec(); - + .collect_vec() + .into(); Tipset { headers, key } } } @@ -254,8 +251,8 @@ impl Tipset { verify_block_headers(&headers)?; Ok(Self { - headers, - key: OnceLock::new(), + headers: headers.into(), + key: OnceLock::new().into(), }) } @@ -309,9 +306,6 @@ impl Tipset { pub fn block_headers(&self) -> &NonEmpty { &self.headers } - pub fn into_block_headers(self) -> NonEmpty { - self.headers - } /// Returns the smallest ticket of all blocks in the tipset pub fn min_ticket(&self) -> Option<&Ticket> { self.min_ticket_block().ticket.as_ref() @@ -355,6 +349,7 @@ impl Tipset { } /// Returns true if self wins according to the Filecoin tie-break rule /// (FIP-0023) + #[cfg(test)] pub fn break_weight_tie(&self, other: &Tipset) -> bool { // blocks are already sorted by ticket let broken = self @@ -369,9 +364,9 @@ impl Tipset { ticket.vrfproof < other_ticket.vrfproof }); if broken { - info!("Weight tie broken in favour of {}", self.key()); + tracing::info!("Weight tie broken in favour of {}", self.key()); } else { - info!("Weight tie left unbroken, default to {}", other.key()); + tracing::info!("Weight tie left unbroken, default to {}", other.key()); } broken } @@ -396,21 +391,6 @@ impl Tipset { }) } - /// Returns an iterator of all tipsets - pub fn chain_arc( - self: Arc, - store: &impl Blockstore, - ) -> impl Iterator> + '_ { - let mut tipset = Some(self); - std::iter::from_fn(move || { - let child = tipset.take()?; - tipset = Tipset::load_required(store, child.parents()) - .ok() - .map(Arc::new); - Some(child) - }) - } - /// Fetch the genesis block header for a given tipset. pub fn genesis(&self, store: &impl Blockstore) -> anyhow::Result { // Scanning through millions of epochs to find the genesis is quite @@ -450,22 +430,15 @@ impl Tipset { } anyhow::bail!("Genesis block not found") } - - /// Check if `self` is the child of `other` - pub fn is_child_of(&self, other: &Self) -> bool { - // Note: the extra `&& self.epoch() > other.epoch()` check in lotus is dropped - // See - self.parents() == other.key() - } } /// `FullTipset` is an expanded version of a tipset that contains all the blocks /// and messages. #[derive(Debug, Clone, Eq)] pub struct FullTipset { - blocks: NonEmpty, + blocks: Arc>, // key is lazily initialized via `fn key()`. - key: OnceLock, + key: Arc>, } impl std::hash::Hash for FullTipset { @@ -478,8 +451,8 @@ impl std::hash::Hash for FullTipset { impl From for FullTipset { fn from(block: Block) -> Self { FullTipset { - blocks: nonempty![block], - key: OnceLock::new(), + blocks: nonempty![block].into(), + key: OnceLock::new().into(), } } } @@ -492,21 +465,23 @@ impl PartialEq for FullTipset { impl FullTipset { pub fn new(blocks: impl IntoIterator) -> Result { - let blocks = NonEmpty::new( - // sort blocks on creation to allow for more seamless conversions between - // FullTipset and Tipset - blocks - .into_iter() - .sorted_by_cached_key(|it| it.header.tipset_sort_key()) - .collect(), - ) - .map_err(|_| CreateTipsetError::Empty)?; + let blocks = Arc::new( + NonEmpty::new( + // sort blocks on creation to allow for more seamless conversions between + // FullTipset and Tipset + blocks + .into_iter() + .sorted_by_cached_key(|it| it.header.tipset_sort_key()) + .collect(), + ) + .map_err(|_| CreateTipsetError::Empty)?, + ); verify_block_headers(blocks.iter().map(|it| &it.header))?; Ok(Self { blocks, - key: OnceLock::new(), + key: Arc::new(OnceLock::new()), }) } /// Returns the first block of the tipset. @@ -519,7 +494,7 @@ impl FullTipset { } /// Returns all blocks in a full tipset. pub fn into_blocks(self) -> NonEmpty { - self.blocks + Arc::unwrap_or_clone(self.blocks) } /// Converts the full tipset into a [Tipset] which removes the messages /// attached. @@ -637,8 +612,8 @@ mod lotus_json { let Self(tipset) = self; TipsetLotusJsonInner { cids: tipset.key().clone(), - blocks: tipset.clone().into_block_headers(), height: tipset.epoch(), + blocks: tipset.block_headers().clone(), } .serialize(serializer) } diff --git a/src/blocks/vrf_proof.rs b/src/blocks/vrf_proof.rs index 70c30c2f1160..da5858648156 100644 --- a/src/blocks/vrf_proof.rs +++ b/src/blocks/vrf_proof.rs @@ -1,7 +1,7 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use crate::utils::encoding::{blake2b_256, serde_byte_array}; +use crate::utils::encoding::serde_byte_array; use get_size2::GetSize; use serde::{Deserialize, Serialize}; @@ -12,6 +12,7 @@ pub struct VRFProof(#[serde(with = "serde_byte_array")] pub Vec); impl VRFProof { /// Creates a `VRFProof` from a raw vector. + #[cfg(test)] pub fn new(output: Vec) -> Self { Self(output) } @@ -22,8 +23,9 @@ impl VRFProof { } /// Compute the `BLAKE2b256` digest of the proof. + #[allow(dead_code)] pub fn digest(&self) -> [u8; 32] { - blake2b_256(&self.0) + crate::utils::encoding::blake2b_256(&self.0) } } diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index 7e597d8be59e..1c178d1809ce 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -48,7 +48,7 @@ pub type ChainEpochDelta = ChainEpoch; /// contained in message type. #[derive(Clone, Debug)] pub enum HeadChange { - Apply(Arc), + Apply(Tipset), } /// Stores chain data such as heaviest tipset and cached tipset info at each @@ -142,7 +142,7 @@ where } /// Sets heaviest tipset - pub fn set_heaviest_tipset(&self, ts: Arc) -> Result<(), Error> { + pub fn set_heaviest_tipset(&self, ts: Tipset) -> Result<(), Error> { self.heaviest_tipset_key_provider .set_heaviest_tipset_key(ts.key())?; if self.publisher.send(HeadChange::Apply(ts)).is_err() { @@ -163,7 +163,7 @@ where // Expand tipset to include other compatible blocks at the epoch. let expanded = self.expand_tipset(ts.min_ticket_block().clone())?; - self.update_heaviest(Arc::new(expanded))?; + self.update_heaviest(expanded)?; Ok(()) } @@ -218,7 +218,7 @@ where } /// Returns the currently tracked heaviest tipset. - pub fn heaviest_tipset(&self) -> Arc { + pub fn heaviest_tipset(&self) -> Tipset { let tsk = self .heaviest_tipset_key_provider .heaviest_tipset_key() @@ -261,7 +261,7 @@ where pub fn load_required_tipset_or_heaviest<'a>( &self, maybe_key: impl Into>, - ) -> Result, Error> { + ) -> Result { match maybe_key.into() { Some(key) => self.chain_index.load_required_tipset(key), None => Ok(self.heaviest_tipset()), @@ -270,11 +270,11 @@ where /// Determines if provided tipset is heavier than existing known heaviest /// tipset - fn update_heaviest(&self, ts: Arc) -> Result<(), Error> { + fn update_heaviest(&self, ts: Tipset) -> Result<(), Error> { // Calculate heaviest weight before matching to avoid deadlock with mutex let heaviest_weight = fil_cns::weight(self.blockstore(), &self.heaviest_tipset())?; - let new_weight = fil_cns::weight(self.blockstore(), ts.as_ref())?; + let new_weight = fil_cns::weight(self.blockstore(), &ts)?; let curr_weight = heaviest_weight; if new_weight > curr_weight { @@ -321,9 +321,9 @@ where pub fn get_lookback_tipset_for_round( chain_index: &Arc>>, chain_config: &Arc, - heaviest_tipset: &Arc, + heaviest_tipset: &Tipset, round: ChainEpoch, - ) -> Result<(Arc, Cid), Error> + ) -> Result<(Tipset, Cid), Error> where DB: Send + Sync + 'static, { @@ -350,7 +350,7 @@ where // (takes seconds to minutes). It's only acceptable here because this situation is // so rare (may happen in dev-networks, doesn't happen in calibnet or mainnet.) &crate::shim::machine::GLOBAL_MULTI_ENGINE, - Arc::clone(heaviest_tipset), + heaviest_tipset.clone(), crate::state_manager::NO_CALLBACK, VMTrace::NotTraced, ) diff --git a/src/chain/store/index.rs b/src/chain/store/index.rs index 3fd73602899a..a579c3434c3a 100644 --- a/src/chain/store/index.rs +++ b/src/chain/store/index.rs @@ -1,8 +1,8 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use std::num::NonZeroUsize; use std::sync::LazyLock; -use std::{num::NonZeroUsize, sync::Arc}; use crate::beacon::{BeaconEntry, IGNORE_DRAND_VAR}; use crate::blocks::{Tipset, TipsetKey}; @@ -18,7 +18,7 @@ use num::Integer; const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(131072_usize); -type TipsetCache = SizeTrackingLruCache>; +type TipsetCache = SizeTrackingLruCache; /// Keeps look-back tipsets in cache at a given interval `skip_length` and can /// be used to look-back at the chain to retrieve an old tipset. @@ -51,7 +51,7 @@ impl ChainIndex { /// Loads a tipset from memory given the tipset keys and cache. Semantically /// identical to [`Tipset::load`] but the result is cached. - pub fn load_tipset(&self, tsk: &TipsetKey) -> Result>, Error> { + pub fn load_tipset(&self, tsk: &TipsetKey) -> Result, Error> { let cache_enabled = !is_env_truthy("FOREST_TIPSET_CACHE_DISABLED"); if cache_enabled && let Some(ts) = self.ts_cache.get_cloned(tsk) { metrics::LRU_CACHE_HIT @@ -60,7 +60,7 @@ impl ChainIndex { return Ok(Some(ts)); } - let ts_opt = Tipset::load(&self.db, tsk)?.map(Arc::new); + let ts_opt = Tipset::load(&self.db, tsk)?; if cache_enabled && let Some(ts) = &ts_opt { self.ts_cache.push(tsk.clone(), ts.clone()); metrics::LRU_CACHE_MISS @@ -74,7 +74,7 @@ impl ChainIndex { /// Loads a tipset from memory given the tipset keys and cache. /// This calls fails if the tipset is missing or invalid. Semantically /// identical to [`Tipset::load_required`] but the result is cached. - pub fn load_required_tipset(&self, tsk: &TipsetKey) -> Result, Error> { + pub fn load_required_tipset(&self, tsk: &TipsetKey) -> Result { self.load_tipset(tsk)? .ok_or_else(|| Error::NotFound("Key for header".into())) } @@ -121,9 +121,9 @@ impl ChainIndex { pub fn tipset_by_height( &self, to: ChainEpoch, - mut from: Arc, + mut from: Tipset, resolve: ResolveNullTipset, - ) -> Result, Error> { + ) -> Result { use crate::shim::policy::policy_constants::CHAIN_FINALITY; static CACHE: LazyLock> = LazyLock::new(|| { @@ -153,7 +153,7 @@ impl ChainIndex { } if to == 0 { - return Ok(Arc::new(Tipset::from(from.genesis(&self.db)?))); + return Ok(Tipset::from(from.genesis(&self.db)?)); } if to > from.epoch() { return Err(Error::Other(format!( @@ -188,7 +188,7 @@ impl ChainIndex { /// Iterate from the given tipset to genesis. Missing tipsets cut the chain /// short. Semantically identical to [`Tipset::chain`] but the results are /// cached. - pub fn chain(&self, from: Arc) -> impl Iterator> + '_ { + pub fn chain(&self, from: Tipset) -> impl Iterator + '_ { let mut tipset = Some(from); std::iter::from_fn(move || { let child = tipset.take()?; @@ -198,8 +198,8 @@ impl ChainIndex { } /// Finds the latest beacon entry given a tipset up to 20 tipsets behind - pub fn latest_beacon_entry(&self, tipset: Arc) -> Result { - for ts in tipset.chain_arc(&self.db).take(20) { + pub fn latest_beacon_entry(&self, tipset: Tipset) -> Result { + for ts in tipset.chain(&self.db).take(20) { if let Some(entry) = ts.min_ticket_block().beacon_entries.last() { return Ok(entry.clone()); } @@ -222,11 +222,13 @@ impl ChainIndex { #[cfg(test)] mod tests { - use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }; use super::*; - use crate::blocks::CachingBlockHeader; - use crate::blocks::RawBlockHeader; + use crate::blocks::{CachingBlockHeader, RawBlockHeader}; use crate::db::MemoryDB; use crate::utils::db::CborStoreExt; @@ -268,18 +270,16 @@ mod tests { // epoch 2 is null. ResolveNullTipset decided whether to return epoch 1 or epoch 3 assert_eq!( index - .tipset_by_height(2, Arc::new(epoch4.clone()), ResolveNullTipset::TakeOlder) - .unwrap() - .as_ref(), - &epoch1 + .tipset_by_height(2, epoch4.clone(), ResolveNullTipset::TakeOlder) + .unwrap(), + epoch1 ); assert_eq!( index - .tipset_by_height(2, Arc::new(epoch4), ResolveNullTipset::TakeNewer) - .unwrap() - .as_ref(), - &epoch3 + .tipset_by_height(2, epoch4, ResolveNullTipset::TakeNewer) + .unwrap(), + epoch3 ); } @@ -306,18 +306,16 @@ mod tests { // The chain as forked, epoch 2 and 3 are ambiguous assert_eq!( index - .tipset_by_height(2, Arc::new(epoch3a), ResolveNullTipset::TakeOlder) - .unwrap() - .as_ref(), - &epoch2a + .tipset_by_height(2, epoch3a, ResolveNullTipset::TakeOlder) + .unwrap(), + epoch2a ); assert_eq!( index - .tipset_by_height(2, Arc::new(epoch3b), ResolveNullTipset::TakeOlder) - .unwrap() - .as_ref(), - &epoch2b + .tipset_by_height(2, epoch3b, ResolveNullTipset::TakeOlder) + .unwrap(), + epoch2b ); } } diff --git a/src/chain_sync/chain_follower.rs b/src/chain_sync/chain_follower.rs index af567d85e238..ad895da03830 100644 --- a/src/chain_sync/chain_follower.rs +++ b/src/chain_sync/chain_follower.rs @@ -38,7 +38,7 @@ use fvm_ipld_blockstore::Blockstore; use itertools::Itertools; use libp2p::PeerId; use parking_lot::{Mutex, RwLock}; -use std::{ops::Deref as _, sync::Arc, time::Instant}; +use std::{sync::Arc, time::Instant}; use tokio::{sync::Notify, task::JoinSet}; use tracing::{debug, error, info, trace, warn}; @@ -53,7 +53,7 @@ pub struct ChainFollower { pub network: SyncNetworkContext, /// Genesis tipset - genesis: Arc, + genesis: Tipset, /// Bad blocks cache, updates based on invalid state transitions. /// Will mark any invalid blocks and all children as bad in this bounded @@ -64,10 +64,10 @@ pub struct ChainFollower { net_handler: flume::Receiver, /// Tipset channel sender - pub tipset_sender: flume::Sender>, + pub tipset_sender: flume::Sender, /// Tipset channel receiver - tipset_receiver: flume::Receiver>, + tipset_receiver: flume::Receiver, /// When `stateless_mode` is true, forest connects to the P2P network but /// does not execute any state transitions. This drastically reduces the @@ -83,7 +83,7 @@ impl ChainFollower { pub fn new( state_manager: Arc>, network: SyncNetworkContext, - genesis: Arc, + genesis: Tipset, net_handler: flume::Receiver, stateless_mode: bool, mem_pool: Arc>>, @@ -131,11 +131,11 @@ pub async fn chain_follower( state_manager: Arc>, bad_block_cache: Option>, network_rx: flume::Receiver, - tipset_receiver: flume::Receiver>, + tipset_receiver: flume::Receiver, network: SyncNetworkContext, mem_pool: Arc>>, sync_status: SyncStatus, - genesis: Arc, + genesis: Tipset, stateless_mode: bool, ) -> anyhow::Result<()> { let state_changed = Arc::new(Notify::new()); @@ -202,7 +202,7 @@ pub async fn chain_follower( { state_machine .lock() - .update(SyncEvent::NewFullTipsets(vec![Arc::new(tipset)])); + .update(SyncEvent::NewFullTipsets(vec![tipset])); state_changed.notify_one(); } } @@ -499,17 +499,17 @@ pub fn load_full_tipset( } enum SyncEvent { - NewFullTipsets(Vec>), - BadTipset(Arc), + NewFullTipsets(Vec), + BadTipset(FullTipset), ValidatedTipset { - tipset: Arc, + tipset: FullTipset, is_proposed_head: bool, }, } impl std::fmt::Display for SyncEvent { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - fn tss_to_string(tss: &[Arc]) -> String { + fn tss_to_string(tss: &[FullTipset]) -> String { format!( "epoch: {}-{}", tss.first().map(|ts| ts.epoch()).unwrap_or_default(), @@ -539,7 +539,7 @@ struct SyncStateMachine { cs: Arc>, bad_block_cache: Option>, // Map from TipsetKey to FullTipset - tipsets: HashMap>, + tipsets: HashMap, stateless_mode: bool, } @@ -558,7 +558,7 @@ impl SyncStateMachine { } // Compute the list of chains from the tipsets map - fn chains(&self) -> Vec>> { + fn chains(&self) -> Vec> { let mut chains = Vec::new(); let mut remaining_tipsets = self.tipsets.clone(); @@ -614,7 +614,7 @@ impl SyncStateMachine { } } - fn add_full_tipset(&mut self, tipset: Arc) { + fn add_full_tipset(&mut self, tipset: FullTipset) { if let Err(why) = TipsetValidator(&tipset).validate( &self.cs, self.bad_block_cache.as_ref().map(AsRef::as_ref), @@ -672,14 +672,14 @@ impl SyncStateMachine { // Create and insert new merged tipset if let Ok(merged_tipset) = FullTipset::new(merged_blocks) { self.tipsets - .insert(merged_tipset.key().clone(), Arc::new(merged_tipset)); + .insert(merged_tipset.key().clone(), merged_tipset); } } // Mark blocks in tipset as bad. // Mark all descendants of tipsets as bad. // Remove all bad tipsets from the tipset map. - fn mark_bad_tipset(&mut self, tipset: Arc) { + fn mark_bad_tipset(&mut self, tipset: FullTipset) { let mut stack = vec![tipset]; while let Some(tipset) = stack.pop() { self.tipsets.remove(tipset.key()); @@ -704,20 +704,20 @@ impl SyncStateMachine { } } - fn mark_validated_tipset(&mut self, tipset: Arc, is_proposed_head: bool) { + fn mark_validated_tipset(&mut self, tipset: FullTipset, is_proposed_head: bool) { if !self.is_parent_validated(&tipset) { tracing::error!(epoch = %tipset.epoch(), tsk = %tipset.key(), "Tipset must be validated"); return; } self.tipsets.remove(tipset.key()); - let tipset = tipset.deref().clone().into_tipset(); + let tipset = tipset.into_tipset(); // cs.put_tipset requires state and doesn't work in stateless mode if self.stateless_mode { let epoch = tipset.epoch(); let terse_key = tipset.key().terse(); if self.cs.heaviest_tipset().weight() < tipset.weight() { - if let Err(e) = self.cs.set_heaviest_tipset(Arc::new(tipset)) { + if let Err(e) = self.cs.set_heaviest_tipset(tipset) { error!("Error setting heaviest tipset: {}", e); } else { info!("Heaviest tipset: {} ({})", epoch, terse_key); @@ -727,7 +727,7 @@ impl SyncStateMachine { if let Err(e) = self.cs.put_tipset(&tipset) { error!("Error putting tipset: {e}"); } - } else if let Err(e) = self.cs.set_heaviest_tipset(tipset.into()) { + } else if let Err(e) = self.cs.set_heaviest_tipset(tipset) { error!("Error setting heaviest tipset: {e}"); } } @@ -795,7 +795,7 @@ impl SyncStateMachine { #[derive(PartialEq, Eq, Hash, Clone, Debug)] enum SyncTask { ValidateTipset { - tipset: Arc, + tipset: FullTipset, is_proposed_head: bool, }, FetchTipset(TipsetKey, ChainEpoch), @@ -851,7 +851,7 @@ impl SyncTask { match validate_tipset( &state_manager, cs, - tipset.deref().clone(), + tipset.clone(), &genesis, bad_block_cache, ) @@ -869,9 +869,7 @@ impl SyncTask { } SyncTask::FetchTipset(key, epoch) => { match get_full_tipset_batch(network.clone(), cs.clone(), None, &key).await { - Ok(parents) => Some(SyncEvent::NewFullTipsets( - parents.into_iter().map(Arc::new).collect(), - )), + Ok(parents) => Some(SyncEvent::NewFullTipsets(parents)), Err(e) => { tracing::warn!(%key, %epoch, "failed to fetch tipset: {e}"); None @@ -933,8 +931,7 @@ mod tests { .unwrap(), ); - cs.set_heaviest_tipset(Arc::new(cs.genesis_tipset())) - .unwrap(); + cs.set_heaviest_tipset(cs.genesis_tipset()).unwrap(); (cs, c4u) } @@ -976,7 +973,7 @@ mod tests { secp_messages: vec![], }]) .unwrap(); - state_machine.update(SyncEvent::NewFullTipsets(vec![Arc::new(full_tipset)])); + state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset])); } // Record validation order by processing all validation tasks in each iteration @@ -1044,7 +1041,7 @@ mod tests { secp_messages: vec![], }]) .unwrap(); - state_machine.update(SyncEvent::NewFullTipsets(vec![Arc::new(full_tipset)])); + state_machine.update(SyncEvent::NewFullTipsets(vec![full_tipset])); } let chains = state_machine diff --git a/src/chain_sync/network_context.rs b/src/chain_sync/network_context.rs index bb4a5a90820c..148bcdb2431a 100644 --- a/src/chain_sync/network_context.rs +++ b/src/chain_sync/network_context.rs @@ -148,14 +148,10 @@ where peer_id: Option, tsk: &TipsetKey, count: NonZeroU64, - ) -> Result>, String> { - self.handle_chain_exchange_request( - peer_id, - tsk, - count, - HEADERS, - |tipsets: &Vec>| validate_network_tipsets(tipsets, tsk), - ) + ) -> Result, String> { + self.handle_chain_exchange_request(peer_id, tsk, count, HEADERS, |tipsets: &Vec| { + validate_network_tipsets(tipsets, tsk) + }) .await } @@ -467,7 +463,7 @@ where /// Validates network tipsets that are sorted by epoch in descending order with the below checks /// 1. The latest(first) tipset has the desired tipset key /// 2. The sorted tipsets are chained by their tipset keys -fn validate_network_tipsets(tipsets: &[Arc], start_tipset_key: &TipsetKey) -> bool { +fn validate_network_tipsets(tipsets: &[Tipset], start_tipset_key: &TipsetKey) -> bool { if let Some(start) = tipsets.first() { if start.key() != start_tipset_key { tracing::warn!(epoch=%start.epoch(), expected=%start_tipset_key, actual=%start.key(), "start tipset key mismatch"); @@ -601,11 +597,6 @@ mod tests { -> t3 @ [third] -> t4 @ [fourth] }; - let t0 = Arc::new(t0.clone()); - let t1 = Arc::new(t1.clone()); - let t2 = Arc::new(t2.clone()); - let t3 = Arc::new(t3.clone()); - let t4 = Arc::new(t4.clone()); assert!(validate_network_tipsets( &[t4.clone(), t3.clone(), t2.clone(), t1.clone(), t0.clone()], t4.key() diff --git a/src/chain_sync/tipset_syncer.rs b/src/chain_sync/tipset_syncer.rs index f77c5fa14762..9c65f3e83312 100644 --- a/src/chain_sync/tipset_syncer.rs +++ b/src/chain_sync/tipset_syncer.rs @@ -232,7 +232,7 @@ async fn validate_block( // Base fee check validations.spawn_blocking({ let smoke_height = state_manager.chain_config().epoch(Height::Smoke); - let base_tipset = Arc::clone(&base_tipset); + let base_tipset = base_tipset.clone(); let block_store = state_manager.blockstore_owned(); let block = Arc::clone(&block); move || { @@ -352,7 +352,7 @@ async fn validate_block( async fn check_block_messages( state_manager: Arc>, block: Arc, - base_tipset: Arc, + base_tipset: Tipset, ) -> Result<(), TipsetSyncerError> { let network_version = state_manager .chain_config() diff --git a/src/cli/subcommands/chain_cmd.rs b/src/cli/subcommands/chain_cmd.rs index c15bab97725c..d15e4a168a47 100644 --- a/src/cli/subcommands/chain_cmd.rs +++ b/src/cli/subcommands/chain_cmd.rs @@ -16,7 +16,6 @@ use anyhow::{bail, ensure}; use cid::Cid; use clap::Subcommand; use nunny::Vec as NonEmpty; -use std::sync::Arc; #[derive(Debug, Clone, clap::ValueEnum)] pub enum Format { @@ -143,7 +142,7 @@ impl ChainCommands { async fn tipset_by_epoch_or_offset( client: &rpc::Client, epoch_or_offset: i64, -) -> Result, jsonrpsee::core::ClientError> { +) -> Result { let current_head = ChainHead::call(client, ()).await?; let target_epoch = match epoch_or_offset.is_negative() { diff --git a/src/cli/subcommands/f3_cmd.rs b/src/cli/subcommands/f3_cmd.rs index a788202adc95..2f2b29e04843 100644 --- a/src/cli/subcommands/f3_cmd.rs +++ b/src/cli/subcommands/f3_cmd.rs @@ -6,7 +6,7 @@ mod tests; use std::{ borrow::Cow, - sync::{Arc, LazyLock}, + sync::LazyLock, time::{Duration, Instant}, }; @@ -151,7 +151,7 @@ impl F3Commands { async fn get_heads( client: &rpc::Client, - ) -> anyhow::Result<(Arc, FinalityCertificate)> { + ) -> anyhow::Result<(Tipset, FinalityCertificate)> { let cert_head = client.call(F3GetLatestCertificate::request(())?).await?; let chain_head = client.call(ChainHead::request(())?).await?; Ok((chain_head, cert_head)) diff --git a/src/cli/subcommands/info_cmd.rs b/src/cli/subcommands/info_cmd.rs index 73b733bb00d7..0d749f422b3a 100644 --- a/src/cli/subcommands/info_cmd.rs +++ b/src/cli/subcommands/info_cmd.rs @@ -188,19 +188,17 @@ mod tests { use crate::shim::{address::Address, econ::TokenAmount}; use chrono::DateTime; use quickcheck_macros::quickcheck; - use std::{str::FromStr, sync::Arc, time::Duration}; + use std::{str::FromStr, time::Duration}; use super::{NodeStatusInfo, SyncStatus}; - fn mock_tipset_at(seconds_since_unix_epoch: u64) -> Arc { - let mock_header = CachingBlockHeader::new(RawBlockHeader { + fn mock_tipset_at(seconds_since_unix_epoch: u64) -> Tipset { + CachingBlockHeader::new(RawBlockHeader { miner_address: Address::from_str("f2kmbjvz7vagl2z6pfrbjoggrkjofxspp7cqtw2zy").unwrap(), timestamp: seconds_since_unix_epoch, ..Default::default() - }); - let tipset = Tipset::from(&mock_header); - - Arc::new(tipset) + }) + .into() } fn mock_node_status() -> NodeStatusInfo { @@ -233,7 +231,7 @@ mod tests { fn test_sync_status_ok(duration: Duration) { let tipset = mock_tipset_at(duration.as_secs() + (EPOCH_DURATION_SECONDS as u64 * 3 / 2)); - let status = node_status(duration, tipset.as_ref()); + let status = node_status(duration, &tipset); assert_ne!(status.sync_status, SyncStatus::Slow); assert_ne!(status.sync_status, SyncStatus::Behind); @@ -243,7 +241,7 @@ mod tests { fn test_sync_status_behind(duration: Duration) { let duration = duration + Duration::from_secs(300); let tipset = mock_tipset_at(duration.as_secs().saturating_sub(200)); - let status = node_status(duration, tipset.as_ref()); + let status = node_status(duration, &tipset); assert!(status.health.is_finite()); assert_ne!(status.sync_status, SyncStatus::Ok); @@ -258,7 +256,7 @@ mod tests { .as_secs() .saturating_sub(EPOCH_DURATION_SECONDS as u64 * 4), ); - let status = node_status(duration, tipset.as_ref()); + let status = node_status(duration, &tipset); assert!(status.health.is_finite()); assert_ne!(status.sync_status, SyncStatus::Behind); assert_ne!(status.sync_status, SyncStatus::Ok); @@ -268,7 +266,7 @@ mod tests { fn block_sync_timestamp() { let duration = Duration::from_secs(60); let tipset = mock_tipset_at(duration.as_secs() - 10); - let status = node_status(duration, tipset.as_ref()); + let status = node_status(duration, &tipset); assert!( status @@ -292,7 +290,7 @@ mod tests { fn chain_status_test() { let duration = Duration::from_secs(100_000); let tipset = mock_tipset_at(duration.as_secs() - 59); - let status = node_status(duration, tipset.as_ref()); + let status = node_status(duration, &tipset); let expected_status_fmt = "[sync: Slow! (59s behind)] [basefee: 0 FIL] [epoch: 0]".to_string(); assert!( @@ -302,7 +300,7 @@ mod tests { ); let tipset = mock_tipset_at(duration.as_secs() - 30000); - let status = node_status(duration, tipset.as_ref()); + let status = node_status(duration, &tipset); let expected_status_fmt = "[sync: Behind! (8h 20m behind)] [basefee: 0 FIL] [epoch: 0]".to_string(); diff --git a/src/daemon/db_util.rs b/src/daemon/db_util.rs index 3fb7cf90fb23..b5933e512cc3 100644 --- a/src/daemon/db_util.rs +++ b/src/daemon/db_util.rs @@ -330,7 +330,7 @@ where let tsk = ts.key().clone(); let state_output = state_manager - .compute_tipset_state(Arc::new(ts.clone()), NO_CALLBACK, VMTrace::NotTraced) + .compute_tipset_state(ts.clone(), NO_CALLBACK, VMTrace::NotTraced) .await?; for events_root in state_output.events_roots.iter().flatten() { tracing::trace!("Indexing events root @{epoch}: {events_root}"); diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 40c6621b8f35..ff24667e38f1 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -154,9 +154,7 @@ async fn maybe_import_snapshot( let ts_epoch = ts.epoch(); // Explicitly set heaviest tipset here in case HEAD_KEY has already been set // in the current setting store - ctx.state_manager - .chain_store() - .set_heaviest_tipset(ts.into())?; + ctx.state_manager.chain_store().set_heaviest_tipset(ts)?; debug!( "Loaded car DB at {} and set current head to epoch {ts_epoch}", car_db_path.display(), @@ -321,9 +319,7 @@ fn create_chain_follower( let chain_follower = ChainFollower::new( ctx.state_manager.clone(), network, - Arc::new(Tipset::from( - ctx.state_manager.chain_store().genesis_block_header(), - )), + Tipset::from(ctx.state_manager.chain_store().genesis_block_header()), p2p_service.network_receiver(), opts.stateless, mpool, diff --git a/src/fil_cns/validation.rs b/src/fil_cns/validation.rs index 81fef8564466..e72b5927522f 100644 --- a/src/fil_cns/validation.rs +++ b/src/fil_cns/validation.rs @@ -61,12 +61,8 @@ pub(in crate::fil_cns) async fn validate_block { rand: Box, - heaviest_tipset: Arc, + heaviest_tipset: Tipset, epoch: ChainEpoch, root: Cid, chain_index: Arc>>, @@ -43,7 +43,7 @@ pub struct ForestExternsV2 { impl ForestExternsV2 { pub fn new( rand: impl Rand + 'static, - heaviest_tipset: Arc, + heaviest_tipset: Tipset, epoch: ChainEpoch, root: Cid, chain_index: Arc>>, diff --git a/src/interpreter/fvm3.rs b/src/interpreter/fvm3.rs index a8377a0f4650..840ba0dab527 100644 --- a/src/interpreter/fvm3.rs +++ b/src/interpreter/fvm3.rs @@ -36,7 +36,7 @@ use fvm3::{ pub struct ForestExterns { rand: Box, - heaviest_tipset: Arc, + heaviest_tipset: Tipset, epoch: ChainEpoch, root: Cid, chain_index: Arc>>, @@ -47,7 +47,7 @@ pub struct ForestExterns { impl ForestExterns { pub fn new( rand: impl Rand + 'static, - heaviest_tipset: Arc, + heaviest_tipset: Tipset, epoch: ChainEpoch, root: Cid, chain_index: Arc>>, diff --git a/src/interpreter/fvm4.rs b/src/interpreter/fvm4.rs index d6b0ddc1198b..a0c40faacb07 100644 --- a/src/interpreter/fvm4.rs +++ b/src/interpreter/fvm4.rs @@ -36,7 +36,7 @@ use fvm4::{ pub struct ForestExterns { rand: Box, - heaviest_tipset: Arc, + heaviest_tipset: Tipset, epoch: ChainEpoch, root: Cid, chain_index: Arc>>, @@ -47,7 +47,7 @@ pub struct ForestExterns { impl ForestExterns { pub fn new( rand: impl Rand + 'static, - heaviest_tipset: Arc, + heaviest_tipset: Tipset, epoch: ChainEpoch, root: Cid, chain_index: Arc>>, diff --git a/src/interpreter/vm.rs b/src/interpreter/vm.rs index a9037b855850..fb6fbe52ccd5 100644 --- a/src/interpreter/vm.rs +++ b/src/interpreter/vm.rs @@ -149,7 +149,7 @@ pub struct ExecutionContext { // This tipset identifies of the blockchain. It functions as a starting // point when searching for ancestors. It may be any tipset as long as its // epoch is at or higher than the epoch in `epoch`. - pub heaviest_tipset: Arc, + pub heaviest_tipset: Tipset, // State-tree generated by the parent tipset. pub state_tree_root: Cid, // Epoch of the messages to be executed. diff --git a/src/libp2p/chain_exchange/message.rs b/src/libp2p/chain_exchange/message.rs index bdd34f1a17c9..6619bdf6e163 100644 --- a/src/libp2p/chain_exchange/message.rs +++ b/src/libp2p/chain_exchange/message.rs @@ -1,7 +1,7 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::{convert::TryFrom, sync::Arc}; +use std::convert::TryFrom; use crate::blocks::{BLOCK_MESSAGE_LIMIT, Block, CachingBlockHeader, FullTipset, Tipset}; use crate::message::SignedMessage; @@ -173,14 +173,6 @@ impl TryFrom for Tipset { } } -impl TryFrom for Arc { - type Error = String; - - fn try_from(tsb: TipsetBundle) -> Result { - Tipset::try_from(tsb).map(Arc::new) - } -} - impl TryFrom for CompactedMessages { type Error = String; diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index 243fd3e2d6a3..f7734279f5b7 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -57,7 +57,7 @@ async fn republish_pending_messages( api: &T, network_sender: &flume::Sender, pending: &SyncRwLock>, - cur_tipset: &SyncRwLock>, + cur_tipset: &SyncRwLock, republished: &SyncRwLock>, local_addrs: &SyncRwLock>, chain_config: &ChainConfig, @@ -85,7 +85,7 @@ where } } - let msgs = select_messages_for_block(api, chain_config, ts.as_ref(), pending_map)?; + let msgs = select_messages_for_block(api, chain_config, &ts, pending_map)?; let network_name = chain_config.network.genesis_name(); for m in msgs.iter() { @@ -216,7 +216,7 @@ pub async fn head_change( repub_trigger: Arc>, republished: &SyncRwLock>, pending: &SyncRwLock>, - cur_tipset: &SyncRwLock>, + cur_tipset: &SyncRwLock, revert: Vec, apply: Vec, ) -> Result<(), Error> @@ -266,7 +266,7 @@ where } } } - *cur_tipset.write() = Arc::new(ts); + *cur_tipset.write() = ts; } if repub { repub_trigger @@ -610,14 +610,13 @@ pub mod tests { let header = mock_block(1, 1); let tipset = Tipset::from(&header.clone()); - let ts = tipset.clone(); - mpool.api.set_heaviest_tipset(Arc::new(ts)); + mpool.api.set_heaviest_tipset(tipset.clone()); // sleep allows for async block to update mpool's cur_tipset tokio::time::sleep(Duration::new(2, 0)).await; let cur_ts = mpool.current_tipset(); - assert_eq!(cur_ts.as_ref(), &tipset); + assert_eq!(cur_ts, tipset); } #[tokio::test] diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 1bee407b2d3e..be716505bb73 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -177,7 +177,7 @@ pub struct MessagePool { /// A map of pending messages where the key is the address pub pending: Arc>>, /// The current tipset (a set of blocks) - pub cur_tipset: Arc>>, + pub cur_tipset: Arc>, /// The underlying provider pub api: Arc, /// Sender half to send messages to other components @@ -203,7 +203,7 @@ where T: Provider, { /// Gets the current tipset - pub fn current_tipset(&self) -> Arc { + pub fn current_tipset(&self) -> Tipset { self.cur_tipset.read().clone() } @@ -369,7 +369,7 @@ where /// Return a tuple that contains a vector of all signed messages and the /// current tipset for self. - pub fn pending(&self) -> Result<(Vec, Arc), Error> { + pub fn pending(&self) -> Result<(Vec, Tipset), Error> { let mut out: Vec = Vec::new(); let pending = self.pending.read().clone(); @@ -521,11 +521,9 @@ where match subscriber.recv().await { Ok(ts) => { let (cur, rev, app) = match ts { - HeadChange::Apply(tipset) => ( - cur_tipset.clone(), - Vec::new(), - vec![tipset.as_ref().clone()], - ), + HeadChange::Apply(tipset) => { + (cur_tipset.clone(), Vec::new(), vec![tipset]) + } }; head_change( api.as_ref(), diff --git a/src/message_pool/msgpool/provider.rs b/src/message_pool/msgpool/provider.rs index 6a2d5bde01dd..c989cbaba664 100644 --- a/src/message_pool/msgpool/provider.rs +++ b/src/message_pool/msgpool/provider.rs @@ -33,7 +33,7 @@ pub trait Provider { /// Update `Mpool`'s `cur_tipset` whenever there is a change to the provider fn subscribe_head_changes(&self) -> Subscriber; /// Get the heaviest Tipset in the provider - fn get_heaviest_tipset(&self) -> Arc; + fn get_heaviest_tipset(&self) -> Tipset; /// Add a message to the `MpoolProvider`, return either Cid or Error /// depending on successful put fn put_message(&self, msg: &ChainMessage) -> Result; @@ -47,7 +47,7 @@ pub trait Provider { h: &CachingBlockHeader, ) -> Result<(Vec, Vec), Error>; /// Return a tipset given the tipset keys from the `ChainStore` - fn load_tipset(&self, tsk: &TipsetKey) -> Result, Error>; + fn load_tipset(&self, tsk: &TipsetKey) -> Result; /// Computes the base fee fn chain_compute_base_fee(&self, ts: &Tipset) -> Result; // Get max number of messages per actor in the pool @@ -85,7 +85,7 @@ where self.subscriber.subscribe() } - fn get_heaviest_tipset(&self) -> Arc { + fn get_heaviest_tipset(&self) -> Tipset { self.sm.chain_store().heaviest_tipset() } @@ -111,7 +111,7 @@ where crate::chain::block_messages(self.sm.blockstore(), h).map_err(|err| err.into()) } - fn load_tipset(&self, tsk: &TipsetKey) -> Result, Error> { + fn load_tipset(&self, tsk: &TipsetKey) -> Result { Ok(self .sm .chain_store() diff --git a/src/message_pool/msgpool/selection.rs b/src/message_pool/msgpool/selection.rs index 83ee3c2d7028..f0851e602311 100644 --- a/src/message_pool/msgpool/selection.rs +++ b/src/message_pool/msgpool/selection.rs @@ -6,7 +6,7 @@ //! `select_messages` API which selects an appropriate set of messages such that //! it optimizes miner reward and chain capacity. See for more details -use std::{borrow::BorrowMut, cmp::Ordering, sync::Arc}; +use std::{borrow::BorrowMut, cmp::Ordering}; use crate::blocks::{BLOCK_MESSAGE_LIMIT, Tipset}; use crate::message::{Message, SignedMessage}; @@ -822,17 +822,17 @@ pub(in crate::message_pool) fn run_head_change( where T: Provider, { - let mut left = Arc::new(from); - let mut right = Arc::new(to); + let mut left = from; + let mut right = to; let mut left_chain = Vec::new(); let mut right_chain = Vec::new(); while left != right { if left.epoch() > right.epoch() { - left_chain.push(left.as_ref().clone()); + left_chain.push(left.clone()); let par = api.load_tipset(left.parents())?; left = par; } else { - right_chain.push(right.as_ref().clone()); + right_chain.push(right.clone()); let par = api.load_tipset(right.parents())?; right = par; } diff --git a/src/message_pool/msgpool/test_provider.rs b/src/message_pool/msgpool/test_provider.rs index 36b8b3b40b19..ef97b0d2cd5d 100644 --- a/src/message_pool/msgpool/test_provider.rs +++ b/src/message_pool/msgpool/test_provider.rs @@ -3,14 +3,15 @@ //! Contains mock implementations for testing internal `MessagePool` APIs -use std::{convert::TryFrom, sync::Arc}; +use std::convert::TryFrom; -use crate::blocks::RawBlockHeader; -use crate::blocks::VRFProof; -use crate::blocks::{CachingBlockHeader, ElectionProof, Ticket, Tipset, TipsetKey}; +use crate::blocks::{ + CachingBlockHeader, ElectionProof, RawBlockHeader, Ticket, Tipset, TipsetKey, VRFProof, +}; use crate::chain::HeadChange; use crate::cid_collections::CidHashMap; use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage}; +use crate::message_pool::{Error, provider::Provider}; use crate::shim::{address::Address, econ::TokenAmount, message::Message, state_tree::ActorState}; use ahash::HashMap; use async_trait::async_trait; @@ -18,8 +19,6 @@ use cid::Cid; use num::BigInt; use parking_lot::Mutex; use tokio::sync::broadcast; - -use crate::message_pool::{Error, provider::Provider}; use tokio::sync::broadcast::{Receiver as Subscriber, Sender as Publisher}; /// Structure used for creating a provider when writing tests involving message @@ -81,7 +80,7 @@ impl TestApi { } /// Set the heaviest tipset for `TestApi` - pub fn set_heaviest_tipset(&self, ts: Arc) { + pub fn set_heaviest_tipset(&self, ts: Tipset) { self.publisher.send(HeadChange::Apply(ts)).unwrap(); } @@ -124,8 +123,8 @@ impl Provider for TestApi { self.publisher.subscribe() } - fn get_heaviest_tipset(&self) -> Arc { - Arc::new(Tipset::from(create_header(1))) + fn get_heaviest_tipset(&self) -> Tipset { + Tipset::from(create_header(1)) } fn put_message(&self, _msg: &ChainMessage) -> Result { @@ -187,11 +186,11 @@ impl Provider for TestApi { } } - fn load_tipset(&self, tsk: &TipsetKey) -> Result, Error> { + fn load_tipset(&self, tsk: &TipsetKey) -> Result { let inner = self.inner.lock(); for ts in &inner.tipsets { if tsk == ts.key() { - return Ok(ts.clone().into()); + return Ok(ts.clone()); } } Err(Error::InvalidToAddr) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index d24a8b1573de..773c0c6295cf 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -40,11 +40,7 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use sha2::Sha256; use std::fs::File; -use std::{ - collections::VecDeque, - path::PathBuf, - sync::{Arc, LazyLock}, -}; +use std::{collections::VecDeque, path::PathBuf, sync::LazyLock}; use tokio::sync::{ Mutex, broadcast::{self, Receiver as Subscriber}, @@ -141,7 +137,7 @@ impl RpcMethod<0> for ChainGetFinalizedTipset { ); type Params = (); - type Ok = Arc; + type Ok = Tipset; async fn handle( ctx: Ctx, @@ -174,7 +170,7 @@ impl RpcMethod<0> for ChainGetFinalizedTipset { async fn get_f3_finality_tipset( ctx: &Ctx, ec_finality_epoch: ChainEpoch, -) -> Result> { +) -> Result { let f3_finalized_cert = crate::rpc::f3::F3GetLatestCertificate::handle(ctx.clone(), ()) .await .map_err(|e| anyhow::anyhow!("Failed to get F3 certificate: {}", e))?; @@ -867,7 +863,7 @@ impl RpcMethod<2> for ChainGetTipSetByHeight { const DESCRIPTION: Option<&'static str> = Some("Returns the tipset at the specified height."); type Params = (ChainEpoch, ApiTipsetKey); - type Ok = Arc; + type Ok = Tipset; async fn handle( ctx: Ctx, @@ -896,7 +892,7 @@ impl RpcMethod<2> for ChainGetTipSetAfterHeight { ); type Params = (ChainEpoch, ApiTipsetKey); - type Ok = Arc; + type Ok = Tipset; async fn handle( ctx: Ctx, @@ -937,7 +933,7 @@ impl RpcMethod<0> for ChainHead { const DESCRIPTION: Option<&'static str> = Some("Returns the chain head (heaviest tipset)."); type Params = (); - type Ok = Arc; + type Ok = Tipset; async fn handle(ctx: Ctx, (): Self::Params) -> Result { let heaviest = ctx.chain_store().heaviest_tipset(); @@ -974,7 +970,7 @@ impl RpcMethod<1> for ChainGetTipSet { const DESCRIPTION: Option<&'static str> = Some("Returns the tipset with the specified CID."); type Params = (ApiTipsetKey,); - type Ok = Arc; + type Ok = Tipset; async fn handle( ctx: Ctx, @@ -996,7 +992,7 @@ impl RpcMethod<1> for ChainGetTipSetV2 { const DESCRIPTION: Option<&'static str> = Some("Returns the tipset with the specified CID."); type Params = (ApiTipsetKey,); - type Ok = Arc; + type Ok = Tipset; async fn handle(_: Ctx, _: Self::Params) -> Result { Err(ServerError::unsupported_method()) @@ -1245,13 +1241,13 @@ pub struct ApiHeadChange { pub change: String, #[serde(rename = "Val", with = "crate::lotus_json")] #[schemars(with = "LotusJson")] - pub tipset: Arc, + pub tipset: Tipset, } lotus_json_with_self!(ApiHeadChange); #[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(tag = "Type", content = "Val", rename_all = "snake_case")] -pub enum PathChange> { +pub enum PathChange { Revert(T), Apply(T), } @@ -1287,23 +1283,21 @@ impl HasLotusJson for PathChange { "Height": 0 } }), - Self::Revert(Arc::new(Tipset::from(RawBlockHeader::default()))), + Self::Revert(RawBlockHeader::default().into()), )] } fn into_lotus_json(self) -> Self::LotusJson { match self { - PathChange::Revert(it) => { - PathChange::Revert(Arc::unwrap_or_clone(it).into_lotus_json()) - } - PathChange::Apply(it) => PathChange::Apply(Arc::unwrap_or_clone(it).into_lotus_json()), + PathChange::Revert(it) => PathChange::Revert(it.into_lotus_json()), + PathChange::Apply(it) => PathChange::Apply(it.into_lotus_json()), } } fn from_lotus_json(lotus_json: Self::LotusJson) -> Self { match lotus_json { - PathChange::Revert(it) => PathChange::Revert(Tipset::from_lotus_json(it).into()), - PathChange::Apply(it) => PathChange::Apply(Tipset::from_lotus_json(it).into()), + PathChange::Revert(it) => PathChange::Revert(Tipset::from_lotus_json(it)), + PathChange::Apply(it) => PathChange::Apply(Tipset::from_lotus_json(it)), } } } @@ -1336,8 +1330,6 @@ quickcheck::quickcheck! { #[cfg(test)] mod tests { use super::*; - use PathChange::{Apply, Revert}; - use crate::{ blocks::{Chain4U, RawBlockHeader, chain4u}, db::{ @@ -1346,6 +1338,8 @@ mod tests { }, networks::{self, ChainConfig}, }; + use PathChange::{Apply, Revert}; + use std::sync::Arc; #[test] fn revert_to_ancestor_linear() { @@ -1540,8 +1534,8 @@ mod tests { let expected = expected .into_iter() .map(|change| match change { - PathChange::Revert(it) => PathChange::Revert(Arc::new(it.make_tipset())), - PathChange::Apply(it) => PathChange::Apply(Arc::new(it.make_tipset())), + PathChange::Revert(it) => PathChange::Revert(it.make_tipset()), + PathChange::Apply(it) => PathChange::Apply(it.make_tipset()), }) .collect::>(); if expected != actual { diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index a59703f6bcac..ff2d5d556e32 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -871,9 +871,9 @@ fn get_tipset_from_hash( fn resolve_predefined_tipset( chain: &ChainStore, - head: Arc, + head: Tipset, predefined: Predefined, -) -> anyhow::Result> { +) -> anyhow::Result { match predefined { Predefined::Earliest => bail!("block param \"earliest\" is not supported"), Predefined::Pending => Ok(head), @@ -883,10 +883,10 @@ fn resolve_predefined_tipset( fn resolve_ext_predefined_tipset( chain: &ChainStore, - head: Arc, + head: Tipset, ext_predefined: ExtPredefined, resolve: ResolveNullTipset, -) -> anyhow::Result> { +) -> anyhow::Result { if let Ok(common) = Predefined::try_from(&ext_predefined) { resolve_predefined_tipset(chain, head, common) } else { @@ -912,10 +912,10 @@ fn resolve_ext_predefined_tipset( fn resolve_block_number_tipset( chain: &ChainStore, - head: Arc, + head: Tipset, block_number: EthInt64, resolve: ResolveNullTipset, -) -> anyhow::Result> { +) -> anyhow::Result { let height = ChainEpoch::from(block_number.0); if height > head.epoch() - 1 { bail!("requested a future epoch (beyond \"latest\")"); @@ -927,12 +927,12 @@ fn resolve_block_number_tipset( fn resolve_block_hash_tipset( chain: &ChainStore, - head: Arc, + head: Tipset, block_hash: &EthHash, require_canonical: bool, resolve: ResolveNullTipset, -) -> anyhow::Result> { - let ts = Arc::new(get_tipset_from_hash(chain, block_hash)?); +) -> anyhow::Result { + let ts = get_tipset_from_hash(chain, block_hash)?; // verify that the tipset is in the canonical chain if require_canonical { // walk up the current chain (our head) until we reach ts.epoch() @@ -951,7 +951,7 @@ fn tipset_by_block_number_or_hash( chain: &ChainStore, block_param: BlockNumberOrHash, resolve: ResolveNullTipset, -) -> anyhow::Result> { +) -> anyhow::Result { let head = chain.heaviest_tipset(); match block_param { BlockNumberOrHash::PredefinedBlock(predefined) => { @@ -975,7 +975,7 @@ fn tipset_by_ext_block_number_or_hash( chain: &ChainStore, block_param: ExtBlockNumberOrHash, resolve: ResolveNullTipset, -) -> anyhow::Result> { +) -> anyhow::Result { let head = chain.heaviest_tipset(); match block_param { ExtBlockNumberOrHash::PredefinedBlock(ext_predefined) => { @@ -997,7 +997,7 @@ fn tipset_by_ext_block_number_or_hash( async fn execute_tipset( data: &Ctx, - tipset: &Arc, + tipset: &Tipset, ) -> Result<(Cid, Vec<(ChainMessage, Receipt)>)> { let msgs = data.chain_store().messages_for_tipset(tipset)?; @@ -1267,7 +1267,7 @@ fn new_eth_tx( async fn new_eth_tx_receipt( ctx: &Ctx, - tipset: &Arc, + tipset: &Tipset, tx: &ApiEthTx, msg_receipt: &Receipt, ) -> anyhow::Result { @@ -1334,7 +1334,7 @@ async fn new_eth_tx_receipt( pub async fn eth_logs_for_block_and_transaction( ctx: &Ctx, - ts: &Arc, + ts: &Tipset, block_hash: &EthHash, tx_hash: &EthHash, ) -> anyhow::Result> { @@ -1348,7 +1348,7 @@ pub async fn eth_logs_for_block_and_transaction( ctx: &Ctx, - ts: &Arc, + ts: &Tipset, spec: Option, tx_hash: Option<&EthHash>, ) -> anyhow::Result> { @@ -1389,7 +1389,7 @@ fn get_signed_message(ctx: &Ctx, message_cid: Cid) -> Result pub async fn block_from_filecoin_tipset( data: Ctx, - tipset: Arc, + tipset: Tipset, full_tx_info: bool, ) -> Result { let parent_cid = tipset.parents().cid()?; @@ -1812,7 +1812,7 @@ impl RpcMethod<2> for EthEstimateGas { async fn apply_message( ctx: &Ctx, - tipset: Option>, + tipset: Option, msg: Message, ) -> Result where @@ -1896,7 +1896,7 @@ impl EthEstimateGas { data: &Ctx, msg: &Message, prior_messages: &[ChainMessage], - ts: Arc, + ts: Tipset, ) -> anyhow::Result where DB: Blockstore + Send + Sync + 'static, @@ -1908,7 +1908,7 @@ impl EthEstimateGas { data: &Ctx, mut msg: Message, prior_messages: &[ChainMessage], - ts: Arc, + ts: Tipset, limit: u64, ) -> anyhow::Result where @@ -1990,7 +1990,7 @@ impl RpcMethod<3> for EthFeeHistory { let mut rewards_array = vec![]; let mut gas_used_ratio_array = vec![]; for ts in tipset - .chain_arc(ctx.store()) + .chain(ctx.store()) .filter(|i| i.epoch() > 0) .take(block_count as _) { @@ -2123,7 +2123,7 @@ impl RpcMethod<2> for EthGetCode { }; let api_invoc_result = 'invoc: { - for ts in ts.chain_arc(ctx.store()) { + for ts in ts.chain(ctx.store()) { match ctx.state_manager.call(&message, Some(ts)) { Ok(res) => { break 'invoc res; @@ -2194,7 +2194,7 @@ impl RpcMethod<3> for EthGetStorageAt { ..Default::default() }; let api_invoc_result = 'invoc: { - for ts in ts.chain_arc(ctx.store()) { + for ts in ts.chain(ctx.store()) { match ctx.state_manager.call(&message, Some(ts)) { Ok(res) => { break 'invoc res; diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index 36b2e917cf98..26af0f091299 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -271,7 +271,7 @@ impl EthEventHandler { pub async fn collect_events( ctx: &Ctx, - tipset: &Arc, + tipset: &Tipset, spec: Option<&impl Matcher>, skip_event: SkipEvent, collected_events: &mut Vec, @@ -367,7 +367,7 @@ impl EthEventHandler { pub async fn collect_chain_events( ctx: &Ctx, - tipset: &Arc, + tipset: &Tipset, events_root: &Cid, ) -> anyhow::Result> { let state_events = ctx @@ -434,7 +434,7 @@ impl EthEventHandler { ResolveNullTipset::TakeOlder, )?; for tipset in max_tipset - .chain_arc(&ctx.store()) + .chain(&ctx.store()) .take_while(|ts| ts.epoch() >= *range.start()) { Self::collect_events(ctx, &tipset, Some(pf), skip_event, &mut collected_events) diff --git a/src/rpc/methods/f3.rs b/src/rpc/methods/f3.rs index 5f0e6702d8e5..9cd4c60c4906 100644 --- a/src/rpc/methods/f3.rs +++ b/src/rpc/methods/f3.rs @@ -159,7 +159,7 @@ pub enum GetPowerTable {} impl GetPowerTable { async fn compute( ctx: &Ctx, - ts: &Arc, + ts: &Tipset, ) -> anyhow::Result> { // The RAM overhead on mainnet is ~14MiB const BLOCKSTORE_CACHE_CAP: usize = 65536; @@ -581,7 +581,7 @@ impl RpcMethod<1> for Finalize { finalized_ts.epoch() ); if !head - .chain_arc(ctx.store()) + .chain(ctx.store()) .take_while(|ts| ts.epoch() >= finalized_ts.epoch()) .any(|ts| ts == finalized_ts) { diff --git a/src/rpc/methods/f3/types.rs b/src/rpc/methods/f3/types.rs index 3672c0b751a3..aba06e1cdb1f 100644 --- a/src/rpc/methods/f3/types.rs +++ b/src/rpc/methods/f3/types.rs @@ -112,12 +112,6 @@ impl From for F3TipSet { } } -impl From> for F3TipSet { - fn from(ts: Arc) -> Self { - Arc::unwrap_or_clone(ts).into() - } -} - #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "PascalCase")] pub struct ECTipSet { diff --git a/src/rpc/methods/gas.rs b/src/rpc/methods/gas.rs index 9e2584be644b..f52e951735d6 100644 --- a/src/rpc/methods/gas.rs +++ b/src/rpc/methods/gas.rs @@ -1,8 +1,6 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::sync::Arc; - use super::state::InvocResult; use crate::blocks::Tipset; use crate::chain::{BASE_FEE_MAX_CHANGE_DENOM, BLOCK_GAS_TARGET}; @@ -199,7 +197,7 @@ impl GasEstimateGasLimit { mut msg: Message, ApiTipsetKey(tsk): &ApiTipsetKey, trace_config: VMTrace, - ) -> anyhow::Result<(InvocResult, ApplyRet, Vec, Arc)> + ) -> anyhow::Result<(InvocResult, ApplyRet, Vec, Tipset)> where DB: Blockstore + Send + Sync + 'static, { diff --git a/src/rpc/methods/state.rs b/src/rpc/methods/state.rs index 26b3934fc2a2..31371f2b9ccd 100644 --- a/src/rpc/methods/state.rs +++ b/src/rpc/methods/state.rs @@ -225,9 +225,7 @@ impl RpcMethod<2> for StateLookupID { (address, ApiTipsetKey(tsk)): Self::Params, ) -> Result { let ts = ctx.chain_store().load_required_tipset_or_heaviest(&tsk)?; - Ok(ctx - .state_manager - .lookup_required_id(&address, ts.as_ref())?) + Ok(ctx.state_manager.lookup_required_id(&address, &ts)?) } } @@ -274,9 +272,7 @@ impl RpcMethod<2> for StateVerifierStatus { (address, ApiTipsetKey(tsk)): Self::Params, ) -> Result { let ts = ctx.chain_store().load_required_tipset_or_heaviest(&tsk)?; - let aid = ctx - .state_manager - .lookup_required_id(&address, ts.as_ref())?; + let aid = ctx.state_manager.lookup_required_id(&address, &ts)?; let verifreg_state: verifreg::State = ctx.state_manager.get_actor_state(&ts)?; Ok(verifreg_state.verifier_data_cap(ctx.store(), aid.into())?) } @@ -1475,7 +1471,7 @@ impl RpcMethod<2> for ForestStateCompute { let mut futures = FuturesOrdered::new(); for ts in to_ts - .chain_arc(ctx.store()) + .chain(ctx.store()) .take_while(|ts| ts.epoch() >= from_ts.epoch()) { let chain_store = ctx.chain_store().clone(); @@ -2539,11 +2535,11 @@ impl RpcMethod<3> for StateListMessages { } else if let Some(to) = from_to.to { // this is following lotus logic, it probably should be `if let` instead of `else if let` // see - if ctx.state_manager.lookup_id(&to, ts.as_ref())?.is_none() { + if ctx.state_manager.lookup_id(&to, &ts)?.is_none() { return Ok(vec![]); } } else if let Some(from) = from_to.from - && ctx.state_manager.lookup_id(&from, ts.as_ref())?.is_none() + && ctx.state_manager.lookup_id(&from, &ts)?.is_none() { return Ok(vec![]); } diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 508b2d55ca35..fa2a13b66b31 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -11,7 +11,6 @@ use cid::Cid; use enumflags2::BitFlags; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::to_vec; -use std::sync::Arc; pub use types::*; use crate::chain; @@ -140,7 +139,7 @@ impl RpcMethod<1> for SyncSubmitBlock { .context("failed to validate the tipset")?; ctx.tipset_send - .try_send(Arc::new(ts)) + .try_send(ts) .context("tipset queue is full")?; ctx.network_send().send(NetworkMessage::PubsubMessage { @@ -209,9 +208,7 @@ mod tests { let ts = Tipset::from(header); let db = cs_for_test.blockstore(); let tsk = ts.key(); - cs_for_test - .set_heaviest_tipset(Arc::new(ts.clone())) - .unwrap(); + cs_for_test.set_heaviest_tipset(ts.clone()).unwrap(); for i in tsk.to_cids() { let bz2 = bz.clone(); diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 15b80d5d712e..20944cea8177 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -460,7 +460,7 @@ pub struct RPCState { pub sync_status: crate::chain_sync::SyncStatus, pub eth_event_handler: Arc, pub sync_network_context: SyncNetworkContext, - pub tipset_send: flume::Sender>, + pub tipset_send: flume::Sender, pub start_time: chrono::DateTime, pub snapshot_progress_tracker: SnapshotProgressTracker, pub shutdown: mpsc::Sender<()>, diff --git a/src/state_manager/chain_rand.rs b/src/state_manager/chain_rand.rs index 3e80c7de58e0..862fdb4aa065 100644 --- a/src/state_manager/chain_rand.rs +++ b/src/state_manager/chain_rand.rs @@ -18,7 +18,7 @@ use fvm_ipld_blockstore::Blockstore; /// Allows for deriving the randomness from a particular tipset. pub struct ChainRand { chain_config: Arc, - tipset: Arc, + tipset: Tipset, chain_index: Arc>>, beacon: Arc, } @@ -27,7 +27,7 @@ impl Clone for ChainRand { fn clone(&self) -> Self { ChainRand { chain_config: self.chain_config.clone(), - tipset: Arc::clone(&self.tipset), + tipset: self.tipset.clone(), chain_index: self.chain_index.clone(), beacon: self.beacon.clone(), } @@ -40,7 +40,7 @@ where { pub fn new( chain_config: Arc, - tipset: Arc, + tipset: Tipset, chain_index: Arc>>, beacon: Arc, ) -> Self { @@ -59,7 +59,7 @@ where round: ChainEpoch, lookback: bool, ) -> anyhow::Result<[u8; 32]> { - let ts = Arc::clone(&self.tipset); + let ts = self.tipset.clone(); if round > ts.epoch() { bail!("cannot draw randomness from the future"); @@ -113,13 +113,13 @@ where round: ChainEpoch, lookback: bool, ) -> anyhow::Result<[u8; 32]> { - let rand_ts: Arc = self.get_beacon_randomness_tipset(round, lookback)?; + let rand_ts: Tipset = self.get_beacon_randomness_tipset(round, lookback)?; let be = self.chain_index.latest_beacon_entry(rand_ts)?; Ok(digest(be.signature())) } pub fn extract_beacon_entry_for_epoch(&self, epoch: ChainEpoch) -> anyhow::Result { - let mut rand_ts: Arc = self.get_beacon_randomness_tipset(epoch, false)?; + let mut rand_ts: Tipset = self.get_beacon_randomness_tipset(epoch, false)?; let (_, beacon) = self.beacon.beacon_for_epoch(epoch)?; let round = beacon.max_beacon_round_for_epoch(self.chain_config.network_version(epoch), epoch); @@ -146,8 +146,8 @@ where &self, round: ChainEpoch, lookback: bool, - ) -> anyhow::Result> { - let ts = Arc::clone(&self.tipset); + ) -> anyhow::Result { + let ts = self.tipset.clone(); if round > ts.epoch() { bail!("cannot draw randomness from the future"); diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index 863bfb599862..1fa90d31a6a6 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -205,7 +205,7 @@ where } /// Returns the currently tracked heaviest tipset. - pub fn heaviest_tipset(&self) -> Arc { + pub fn heaviest_tipset(&self) -> Tipset { self.chain_store().heaviest_tipset() } @@ -359,7 +359,7 @@ where self.cs.chain_config() } - pub fn chain_rand(&self, tipset: Arc) -> ChainRand { + pub fn chain_rand(&self, tipset: Tipset) -> ChainRand { ChainRand::new( self.chain_config().clone(), tipset, @@ -458,7 +458,7 @@ where /// Returns the pair of (state root, message receipt root). This will /// either be cached or will be calculated and fill the cache. Tipset /// state for a given tipset is guaranteed not to be computed twice. - pub async fn tipset_state(self: &Arc, tipset: &Arc) -> anyhow::Result { + pub async fn tipset_state(self: &Arc, tipset: &Tipset) -> anyhow::Result { let StateOutput { state_root, receipt_root, @@ -469,7 +469,7 @@ where pub async fn tipset_state_output( self: &Arc, - tipset: &Arc, + tipset: &Tipset, ) -> anyhow::Result { let key = tipset.key(); self.cache @@ -489,7 +489,7 @@ where trace!("Computing state for tipset at epoch {}", tipset.epoch()); let state_output = self - .compute_tipset_state(Arc::clone(tipset), NO_CALLBACK, VMTrace::NotTraced) + .compute_tipset_state(tipset.clone(), NO_CALLBACK, VMTrace::NotTraced) .await?; for events_root in state_output.events_roots.iter().flatten() { trace!("Indexing events root @{}: {}", tipset.epoch(), events_root); @@ -529,7 +529,7 @@ where #[instrument(skip(self))] pub async fn tipset_message_receipts( self: &Arc, - tipset: &Arc, + tipset: &Tipset, ) -> anyhow::Result> { let key = tipset.key(); let ts = tipset.clone(); @@ -553,7 +553,7 @@ where #[instrument(skip(self))] pub async fn tipset_state_events( self: &Arc, - tipset: &Arc, + tipset: &Tipset, events_root: Option<&Cid>, ) -> anyhow::Result { let key = tipset.key(); @@ -584,7 +584,7 @@ where &self, msg: &Message, rand: ChainRand, - tipset: &Arc, + tipset: &Tipset, ) -> Result { let mut msg = msg.clone(); @@ -605,7 +605,7 @@ where let genesis_info = GenesisInfo::from_chain_config(self.chain_config().clone()); let mut vm = VM::new( ExecutionContext { - heaviest_tipset: Arc::clone(tipset), + heaviest_tipset: tipset.clone(), state_tree_root: *state_cid, epoch: height, rand: Box::new(rand), @@ -657,19 +657,15 @@ where /// runs the given message and returns its result without any persisted /// changes. - pub fn call( - &self, - message: &Message, - tipset: Option>, - ) -> Result { + pub fn call(&self, message: &Message, tipset: Option) -> Result { let ts = tipset.unwrap_or_else(|| self.heaviest_tipset()); - let chain_rand = self.chain_rand(Arc::clone(&ts)); + let chain_rand = self.chain_rand(ts.clone()); self.call_raw(message, chain_rand, &ts) } pub async fn apply_on_state_with_gas( self: &Arc, - tipset: Option>, + tipset: Option, msg: Message, ) -> anyhow::Result { let ts = tipset.unwrap_or_else(|| self.heaviest_tipset()); @@ -713,7 +709,7 @@ where self: &Arc, message: &mut ChainMessage, prior_messages: &[ChainMessage], - tipset: Option>, + tipset: Option, trace_config: VMTrace, ) -> Result<(InvocResult, ApplyRet, Duration), Error> { let ts = tipset.unwrap_or_else(|| self.heaviest_tipset()); @@ -721,7 +717,7 @@ where .tipset_state(&ts) .await .map_err(|e| Error::Other(format!("Could not load tipset state: {e}")))?; - let chain_rand = self.chain_rand(Arc::clone(&ts)); + let chain_rand = self.chain_rand(ts.clone()); // Since we're simulating a future message, pretend we're applying it in the // "next" tipset @@ -732,7 +728,7 @@ where let (ret, duration) = stacker::grow(64 << 20, || -> ApplyResult { let mut vm = VM::new( ExecutionContext { - heaviest_tipset: Arc::clone(&ts), + heaviest_tipset: ts.clone(), state_tree_root: st, epoch, rand: Box::new(chain_rand), @@ -770,11 +766,7 @@ where /// Replays the given message and returns the result of executing the /// indicated message, assuming it was executed in the indicated tipset. - pub async fn replay( - self: &Arc, - ts: Arc, - mcid: Cid, - ) -> Result { + pub async fn replay(self: &Arc, ts: Tipset, mcid: Cid) -> Result { let this = Arc::clone(self); tokio::task::spawn_blocking(move || this.replay_blocking(ts, mcid)) .await @@ -784,7 +776,7 @@ where /// Blocking version of `replay` pub fn replay_blocking( self: &Arc, - ts: Arc, + ts: Tipset, mcid: Cid, ) -> Result { const REPLAY_HALT: &str = "replay_halt"; @@ -899,7 +891,7 @@ where #[instrument(skip_all)] pub async fn compute_tipset_state( self: &Arc, - tipset: Arc, + tipset: Tipset, callback: Option) -> anyhow::Result<()> + Send + 'static>, enable_tracing: VMTrace, ) -> Result { @@ -914,7 +906,7 @@ where #[tracing::instrument(skip_all)] pub fn compute_tipset_state_blocking( &self, - tipset: Arc, + tipset: Tipset, callback: Option) -> anyhow::Result<()>>, enable_tracing: VMTrace, ) -> Result { @@ -944,7 +936,7 @@ where self: &Arc, height: ChainEpoch, messages: Vec, - tipset: Arc, + tipset: Tipset, callback: Option) -> anyhow::Result<()> + Send + 'static>, enable_tracing: VMTrace, ) -> Result { @@ -961,7 +953,7 @@ where &self, height: ChainEpoch, messages: Vec, - tipset: Arc, + tipset: Tipset, callback: Option) -> anyhow::Result<()>>, enable_tracing: VMTrace, ) -> Result { @@ -1039,18 +1031,18 @@ where fn check_search( &self, - mut current: Arc, + mut current: Tipset, message: &ChainMessage, look_back_limit: Option, allow_replaced: Option, - ) -> Result, Receipt)>, Error> { + ) -> Result, Error> { let allow_replaced = allow_replaced.unwrap_or(true); let message_from_address = message.from(); let message_sequence = message.sequence(); let mut current_actor_state = self .get_required_actor(&message_from_address, *current.parent_state()) .map_err(Error::state)?; - let message_from_id = self.lookup_required_id(&message_from_address, current.as_ref())?; + let message_from_id = self.lookup_required_id(&message_from_address, ¤t)?; while current.epoch() > look_back_limit.unwrap_or_default() { let parent_tipset = self .chain_index() @@ -1070,7 +1062,7 @@ where && parent_actor_state.as_ref().unwrap().sequence <= message_sequence) { let receipt = self - .tipset_executed_message(current.as_ref(), message, allow_replaced)? + .tipset_executed_message(¤t, message, allow_replaced)? .context("Failed to get receipt with tipset_executed_message")?; return Ok(Some((current, receipt))); } @@ -1088,16 +1080,16 @@ where fn search_back_for_message( &self, - current: Arc, + current: Tipset, message: &ChainMessage, look_back_limit: Option, allow_replaced: Option, - ) -> Result, Receipt)>, Error> { + ) -> Result, Error> { self.check_search(current, message, look_back_limit, allow_replaced) } /// Returns a message receipt from a given tipset and message CID. - pub fn get_receipt(&self, tipset: Arc, msg: Cid) -> Result { + pub fn get_receipt(&self, tipset: Tipset, msg: Cid) -> Result { let m = crate::chain::get_chain_message(self.blockstore(), &msg) .map_err(|e| Error::Other(e.to_string()))?; let message_receipt = self.tipset_executed_message(&tipset, &m, true)?; @@ -1124,7 +1116,7 @@ where confidence: i64, look_back_limit: Option, allow_replaced: Option, - ) -> Result<(Option>, Option), Error> { + ) -> Result<(Option, Option), Error> { let mut subscriber = self.cs.publisher().subscribe(); let (sender, mut receiver) = oneshot::channel::<()>(); let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid) @@ -1136,7 +1128,7 @@ where return Ok((Some(current_tipset.clone()), Some(r))); } - let mut candidate_tipset: Option> = None; + let mut candidate_tipset: Option = None; let mut candidate_receipt: Option = None; let sm_cloned = Arc::clone(self); @@ -1243,11 +1235,11 @@ where pub async fn search_for_message( &self, - from: Option>, + from: Option, msg_cid: Cid, look_back_limit: Option, allow_replaced: Option, - ) -> Result, Receipt)>, Error> { + ) -> Result, Error> { let from = from.unwrap_or_else(|| self.heaviest_tipset()); let message = crate::chain::get_chain_message(self.blockstore(), &msg_cid) .map_err(|err| Error::Other(format!("failed to load message {err}")))?; @@ -1396,7 +1388,7 @@ where pub async fn resolve_to_key_addr( self: &Arc, addr: &Address, - ts: &Arc, + ts: &Tipset, ) -> Result { match addr.protocol() { Protocol::BLS | Protocol::Secp256k1 | Protocol::Delegated => return Ok(*addr), @@ -1426,7 +1418,7 @@ where pub async fn miner_get_base_info( self: &Arc, beacon_schedule: &BeaconSchedule, - tipset: Arc, + tipset: Tipset, addr: Address, epoch: ChainEpoch, ) -> anyhow::Result> { @@ -1571,7 +1563,7 @@ where pub fn validate_tipsets(&self, tipsets: T) -> anyhow::Result<()> where - T: Iterator> + Send, + T: Iterator + Send, { let genesis_timestamp = self.chain_store().genesis_block_header().timestamp; validate_tipsets( @@ -1658,7 +1650,7 @@ where pub async fn resolve_to_deterministic_address( self: &Arc, address: Address, - ts: &Arc, + ts: &Tipset, ) -> anyhow::Result
{ use crate::shim::address::Protocol::*; match address.protocol() { @@ -1712,7 +1704,7 @@ where self.chain_config().clone(), self.beacon_schedule().clone(), &self.engine, - Arc::new(tipset.clone()), + tipset.clone(), Some(callback), VMTrace::Traced, )?; @@ -1770,7 +1762,7 @@ pub fn validate_tipsets( ) -> anyhow::Result<()> where DB: Blockstore + Send + Sync + 'static, - T: Iterator> + Send, + T: Iterator + Send, { use rayon::iter::ParallelIterator as _; tipsets @@ -1895,7 +1887,7 @@ pub fn apply_block_messages( chain_config: Arc, beacon: Arc, engine: &MultiEngine, - tipset: Arc, + tipset: Tipset, mut callback: Option) -> anyhow::Result<()>>, enable_tracing: VMTrace, ) -> anyhow::Result @@ -1926,7 +1918,7 @@ where let rand = ChainRand::new( Arc::clone(&chain_config), - Arc::clone(&tipset), + tipset.clone(), Arc::clone(&chain_index), beacon, ); @@ -1937,7 +1929,7 @@ where genesis_info.get_vm_circulating_supply(epoch, chain_index.db(), &state_root)?; VM::new( ExecutionContext { - heaviest_tipset: Arc::clone(&tipset), + heaviest_tipset: tipset.clone(), state_tree_root: state_root, epoch, rand: Box::new(rand.clone()), @@ -2011,7 +2003,7 @@ where pub fn compute_state( _height: ChainEpoch, messages: Vec, - tipset: Arc, + tipset: Tipset, genesis_timestamp: u64, chain_index: Arc>>, chain_config: Arc, diff --git a/src/state_manager/tests.rs b/src/state_manager/tests.rs index f98680742fa3..be9b70a044bb 100644 --- a/src/state_manager/tests.rs +++ b/src/state_manager/tests.rs @@ -80,7 +80,7 @@ fn setup_chain_with_tipsets() -> TestChainSetup { .unwrap(); chain_store - .set_heaviest_tipset(Arc::new(chain_store.genesis_tipset())) + .set_heaviest_tipset(chain_store.genesis_tipset()) .unwrap(); TestChainSetup { @@ -127,9 +127,7 @@ fn test_try_lookup_state_from_next_tipset_success() { assert_eq!(child_block.epoch, 11); assert_eq!(child_block.state_root, state_root); - chain_store - .set_heaviest_tipset(Arc::new(child_ts.clone())) - .unwrap(); + chain_store.set_heaviest_tipset(child_ts.clone()).unwrap(); let state_manager = Arc::new(StateManager::new(chain_store).unwrap()); @@ -160,9 +158,7 @@ fn test_try_lookup_state_from_next_tipset_no_next_tipset() { assert_eq!(a.epoch, 10); - chain_store - .set_heaviest_tipset(Arc::new(a_ts.clone())) - .unwrap(); + chain_store.set_heaviest_tipset(a_ts.clone()).unwrap(); let state_manager = Arc::new(StateManager::new(chain_store).unwrap()); @@ -219,9 +215,7 @@ fn test_try_lookup_state_from_next_tipset_different_parent() { // a tipset key should be different from `a1` tipset key assert_ne!(a_ts.key(), a1_ts.key()); - chain_store - .set_heaviest_tipset(Arc::new(b_ts.clone())) - .unwrap(); + chain_store.set_heaviest_tipset(b_ts.clone()).unwrap(); let state_manager = Arc::new(StateManager::new(chain_store).unwrap()); @@ -262,9 +256,7 @@ fn test_try_lookup_state_from_next_tipset_missing_receipt_root() { assert_eq!(a.epoch, 10); assert_eq!(b.epoch, 11); - chain_store - .set_heaviest_tipset(Arc::new(b_ts.clone())) - .unwrap(); + chain_store.set_heaviest_tipset(b_ts.clone()).unwrap(); let state_manager = Arc::new(StateManager::new(chain_store).unwrap()); @@ -305,9 +297,7 @@ fn test_try_lookup_state_from_next_tipset_missing_state_root() { assert_eq!(a.epoch, 10); assert_eq!(b.epoch, 11); - chain_store - .set_heaviest_tipset(Arc::new(b_ts.clone())) - .unwrap(); + chain_store.set_heaviest_tipset(b_ts.clone()).unwrap(); let state_manager = Arc::new(StateManager::new(chain_store).unwrap()); diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 8418f88b2300..14f72b95f3a1 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -89,7 +89,7 @@ pub async fn start_offline_server( &db, ) .await?; - let head_ts = Arc::new(db.heaviest_tipset()?); + let head_ts = db.heaviest_tipset()?; let chain_store = Arc::new(ChainStore::new( db.clone(), db.clone(), @@ -134,7 +134,7 @@ pub async fn start_offline_server( if validate_until_epoch <= head_ts.epoch() { state_manager.validate_tipsets( head_ts - .chain_arc(&db) + .chain(&db) .take_while(|ts| ts.epoch() >= validate_until_epoch), )?; } diff --git a/src/tool/subcommands/api_cmd/api_compare_tests.rs b/src/tool/subcommands/api_cmd/api_compare_tests.rs index e808862b0e1b..44a46b485b6a 100644 --- a/src/tool/subcommands/api_cmd/api_compare_tests.rs +++ b/src/tool/subcommands/api_cmd/api_compare_tests.rs @@ -2179,7 +2179,7 @@ async fn revalidate_chain(db: Arc, n_ts_to_validate: usize) -> anyhow:: genesis_header.clone(), )?); let state_manager = Arc::new(StateManager::new(chain_store.clone())?); - let head_ts = Arc::new(db.heaviest_tipset()?); + let head_ts = db.heaviest_tipset()?; // Set proof parameter data dir and make sure the proofs are available. Otherwise, // validation might fail due to missing proof parameters. @@ -2187,7 +2187,7 @@ async fn revalidate_chain(db: Arc, n_ts_to_validate: usize) -> anyhow:: ensure_proof_params_downloaded().await?; state_manager.validate_tipsets( head_ts - .chain_arc(&db) + .chain(&db) .take(SAFE_EPOCH_DELAY as usize + n_ts_to_validate), )?; diff --git a/src/tool/subcommands/archive_cmd.rs b/src/tool/subcommands/archive_cmd.rs index 789e16b1f44d..85ad825507e7 100644 --- a/src/tool/subcommands/archive_cmd.rs +++ b/src/tool/subcommands/archive_cmd.rs @@ -263,7 +263,7 @@ impl ArchiveCommands { let heaviest_tipset = store.heaviest_tipset()?; do_export( &store.into(), - heaviest_tipset.into(), + heaviest_tipset, output_path, epoch, depth, @@ -512,7 +512,7 @@ fn build_output_path( #[allow(clippy::too_many_arguments)] pub async fn do_export( store: &Arc, - root: Arc, + root: Tipset, output_path: PathBuf, epoch_option: Option, depth: ChainEpochDelta, @@ -544,7 +544,7 @@ pub async fn do_export( .context("unable to get a tipset at given height")?; let seen = if let Some(diff) = diff { - let diff_ts: Arc = index + let diff_ts: Tipset = index .tipset_by_height(diff, ts.clone(), ResolveNullTipset::TakeOlder) .context("diff epoch must be smaller than target epoch")?; let diff_ts: &Tipset = &diff_ts; @@ -768,7 +768,7 @@ async fn show_tipset_diff( let store = Arc::new(ManyCar::try_from(snapshot_files)?); - let heaviest_tipset = Arc::new(store.heaviest_tipset()?); + let heaviest_tipset = store.heaviest_tipset()?; if heaviest_tipset.epoch() <= epoch { anyhow::bail!( "Highest epoch must be at least 1 greater than the target epoch. \ @@ -791,13 +791,13 @@ async fn show_tipset_diff( let beacon = Arc::new(chain_config.get_beacon_schedule(timestamp)); let tipset = chain_index.tipset_by_height( epoch, - Arc::clone(&heaviest_tipset), + heaviest_tipset.clone(), ResolveNullTipset::TakeOlder, )?; let child_tipset = chain_index.tipset_by_height( epoch + 1, - Arc::clone(&heaviest_tipset), + heaviest_tipset.clone(), ResolveNullTipset::TakeNewer, )?; @@ -990,7 +990,7 @@ async fn export_lite_snapshot( let force = false; do_export( &store, - root.into(), + root, output_path.clone(), Some(epoch), depth, @@ -1023,7 +1023,7 @@ async fn export_diff_snapshot( let force = false; do_export( &store, - root.into(), + root, output_path.clone(), Some(epoch), depth, @@ -1165,7 +1165,7 @@ mod tests { let heaviest_tipset = store.heaviest_tipset().unwrap(); do_export( &store.into(), - heaviest_tipset.into(), + heaviest_tipset, output_path.path().into(), Some(0), 1, diff --git a/src/tool/subcommands/benchmark_cmd.rs b/src/tool/subcommands/benchmark_cmd.rs index 27b9dd802b51..6f45a33f337f 100644 --- a/src/tool/subcommands/benchmark_cmd.rs +++ b/src/tool/subcommands/benchmark_cmd.rs @@ -18,7 +18,6 @@ use futures::{StreamExt, TryStreamExt}; use fvm_ipld_encoding::DAG_CBOR; use indicatif::{ProgressBar, ProgressStyle}; use itertools::Itertools; -use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; use tokio::{ @@ -203,7 +202,7 @@ async fn benchmark_exporting( let idx = ChainIndex::new(&store); let ts = idx.tipset_by_height( epoch.unwrap_or(heaviest.epoch()), - Arc::new(heaviest), + heaviest, ResolveNullTipset::TakeOlder, )?; // We don't do any sanity checking for 'depth'. The output is discarded so @@ -214,7 +213,7 @@ async fn benchmark_exporting( let blocks = stream_chain( Arc::clone(&store), - ts.deref().clone().chain_owned(Arc::clone(&store)), + ts.clone().chain_owned(Arc::clone(&store)), stateroot_lookup_limit, ); diff --git a/src/tool/subcommands/snapshot_cmd.rs b/src/tool/subcommands/snapshot_cmd.rs index f3c3dc886769..49b2c34d4efc 100644 --- a/src/tool/subcommands/snapshot_cmd.rs +++ b/src/tool/subcommands/snapshot_cmd.rs @@ -428,7 +428,7 @@ where // Prepare tipsets for validation let tipsets = chain_index - .chain(Arc::new(ts)) + .chain(ts) .take_while(|tipset| tipset.epoch() >= last_epoch) .inspect(|tipset| { pb.set_message(format!("epoch queue: {}", tipset.epoch() - last_epoch)); @@ -480,7 +480,7 @@ fn print_computed_state(snapshot: PathBuf, epoch: ChainEpoch, json: bool) -> any } let beacon = Arc::new(chain_config.get_beacon_schedule(timestamp)); let tipset = chain_index - .tipset_by_height(epoch, Arc::new(ts), ResolveNullTipset::TakeOlder) + .tipset_by_height(epoch, ts, ResolveNullTipset::TakeOlder) .with_context(|| format!("couldn't get a tipset at height {epoch}"))?; let mut message_calls = vec![]; diff --git a/src/tool/subcommands/state_compute_cmd.rs b/src/tool/subcommands/state_compute_cmd.rs index c3684ec9e470..16d2ea220cc8 100644 --- a/src/tool/subcommands/state_compute_cmd.rs +++ b/src/tool/subcommands/state_compute_cmd.rs @@ -124,7 +124,7 @@ impl ReplayComputeCommand { pub async fn run(self) -> anyhow::Result<()> { let Self { snapshot, chain, n } = self; let snap_car = AnyCar::try_from(&snapshot)?; - let ts = Arc::new(snap_car.heaviest_tipset()?); + let ts = snap_car.heaviest_tipset()?; let epoch = ts.epoch(); let db = Arc::new(ManyCar::new(MemoryDB::default()).with_read_only(snap_car)?); let chain_config = Arc::new(ChainConfig::from_chain(&chain)); From 4619211c4d26f6a428289ac8c9f3e2923ee09166 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Mon, 1 Dec 2025 19:25:10 +0800 Subject: [PATCH 2/2] fix build --- src/benchmark_private/tipset_validation.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/benchmark_private/tipset_validation.rs b/src/benchmark_private/tipset_validation.rs index b6e5b4abbbca..e1957393bad0 100644 --- a/src/benchmark_private/tipset_validation.rs +++ b/src/benchmark_private/tipset_validation.rs @@ -47,7 +47,7 @@ async fn prepare_validation( snapshot: &Path, ) -> anyhow::Result<(Arc>, Tipset)> { let snap_car = AnyCar::try_from(snapshot)?; - let ts = Arc::new(snap_car.heaviest_tipset()?); + let ts = snap_car.heaviest_tipset()?; let db = Arc::new(ManyCar::new(MemoryDB::default()).with_read_only(snap_car)?); let chain_config = Arc::new(ChainConfig::from_chain(chain)); let genesis_header =