Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use super::{
index::{ChainIndex, ResolveNullTipset},
tipset_tracker::TipsetTracker,
};
use crate::db::{EthMappingsStore, EthMappingsStoreExt};
use crate::interpreter::{BlockMessages, VMTrace};
use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage};
Expand All @@ -23,6 +22,10 @@ use crate::{
blocks::{CachingBlockHeader, Tipset, TipsetKey, TxMeta},
db::HeaviestTipsetKeyProvider,
};
use crate::{
db::{EthMappingsStore, EthMappingsStoreExt},
rpc::chain::PathChange,
};
use crate::{fil_cns, utils::cache::SizeTrackingLruCache};
use ahash::{HashMap, HashMapExt, HashSet};
use anyhow::Context as _;
Expand All @@ -47,10 +50,7 @@ pub type ChainEpochDelta = ChainEpoch;

/// `Enum` for `pubsub` channel that defines message type variant and data
/// contained in message type.
#[derive(Clone, Debug)]
pub enum HeadChange {
Apply(Tipset),
}
pub type HeadChange = PathChange<Tipset>;

/// Stores chain data such as heaviest tipset and cached tipset info at each
/// epoch. This structure is thread-safe, and all caches are wrapped in a mutex
Expand Down Expand Up @@ -142,14 +142,30 @@ where
}

/// Sets heaviest tipset
pub fn set_heaviest_tipset(&self, ts: Tipset) -> Result<(), Error> {
pub fn set_heaviest_tipset(&self, head: Tipset) -> Result<(), Error> {
self.heaviest_tipset_key_provider
.set_heaviest_tipset_key(ts.key())?;
*self.heaviest_tipset_cache.write() = Some(ts.clone());
ts.key().save(self.blockstore())?;
if self.publisher.send(HeadChange::Apply(ts)).is_err() {
debug!("did not publish head change, no active receivers");
.set_heaviest_tipset_key(head.key())?;
let old_head = (*self.heaviest_tipset_cache.write()).replace(head.clone());
head.key().save(self.blockstore())?;
if let Some(old_head) = old_head {
match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) {
Ok(changes) => {
for change in changes {
if self.publisher.send(change).is_err() {
debug!("did not publish change, no active receivers");
}
}
}
Err(e) => {
warn!("failed to get chain path changes: {e}")
}
}
} else {
if self.publisher.send(HeadChange::Apply(head)).is_err() {
debug!("did not publish head change, no active receivers");
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

Ok(())
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

Expand Down
14 changes: 6 additions & 8 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,14 +505,12 @@ fn maybe_start_indexer_service(

// Continuously listen for head changes
loop {
let HeadChange::Apply(ts) = receiver.recv().await?;

tracing::debug!("Indexing tipset {}", ts.key());

let delegated_messages =
chain_store.headers_delegated_messages(ts.block_headers().iter())?;

chain_store.process_signed_messages(&delegated_messages)?;
if let HeadChange::Apply(ts) = receiver.recv().await? {
tracing::debug!("Indexing tipset {}", ts.key());
let delegated_messages =
chain_store.headers_delegated_messages(ts.block_headers().iter())?;
chain_store.process_signed_messages(&delegated_messages)?;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
});

Expand Down
17 changes: 8 additions & 9 deletions src/message_pool/msgpool/msg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,28 +551,27 @@ where
let pending = mp.pending.clone();
let republished = mp.republished.clone();

let cur_tipset = mp.cur_tipset.clone();
let current_ts = mp.cur_tipset.clone();
let repub_trigger = mp.repub_trigger.clone();

// Reacts to new HeadChanges
services.spawn(async move {
loop {
match subscriber.recv().await {
Ok(ts) => {
let (cur, rev, app) = match ts {
HeadChange::Apply(tipset) => {
(cur_tipset.clone(), Vec::new(), vec![tipset])
}
Ok(change) => {
let (reverts, applies) = match change {
HeadChange::Apply(ts) => (vec![], vec![ts]),
HeadChange::Revert(ts) => (vec![ts], vec![]),
};
head_change(
api.as_ref(),
bls_sig_cache.as_ref(),
repub_trigger.clone(),
republished.as_ref(),
pending.as_ref(),
cur.as_ref(),
rev,
app,
&current_ts,
reverts,
applies,
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
)
.await
.context("Error changing head")?;
Expand Down
33 changes: 17 additions & 16 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,20 @@ pub(crate) fn new_heads<DB: Blockstore + Send + Sync + 'static>(

let handle = tokio::spawn(async move {
while let Ok(v) = subscriber.recv().await {
let headers = match v {
HeadChange::Apply(ts) => {
// Convert the tipset to an Ethereum block with full transaction info
// Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block
match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await {
Ok(block) => ApiHeaders(block),
Err(e) => {
tracing::error!("Failed to convert tipset to eth block: {}", e);
continue;
if let HeadChange::Apply(ts) = v {
// Convert the tipset to an Ethereum block with full transaction info
// Note: In Filecoin's Eth RPC, a tipset maps to a single Ethereum block
match EthBlock::from_filecoin_tipset(data.clone(), ts, TxInfo::Full).await {
Ok(block) => {
if let Err(e) = sender.send(ApiHeaders(block)) {
tracing::error!("Failed to send headers: {}", e);
break;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
}
Err(e) => {
tracing::error!("Failed to convert tipset to eth block: {}", e);
}
}
};
if let Err(e) = sender.send(headers) {
tracing::error!("Failed to send headers: {}", e);
break;
}
}
});
Expand Down Expand Up @@ -149,6 +147,7 @@ pub(crate) fn logs<DB: Blockstore + Sync + Send + 'static>(
}
}
}
HeadChange::Revert(_) => {}
}
}
});
Expand Down Expand Up @@ -850,7 +849,7 @@ impl RpcMethod<2> for ChainGetPath {
(from, to): Self::Params,
_: &http::Extensions,
) -> Result<Self::Ok, ServerError> {
impl_chain_get_path(ctx.chain_store(), &from, &to).map_err(Into::into)
chain_get_path(ctx.chain_store(), &from, &to).map_err(Into::into)
}
}

Expand All @@ -871,7 +870,7 @@ impl RpcMethod<2> for ChainGetPath {
/// ```
///
/// Exposes errors from the [`Blockstore`], and returns an error if there is no common ancestor.
fn impl_chain_get_path(
pub fn chain_get_path(
chain_store: &ChainStore<impl Blockstore>,
from: &TipsetKey,
to: &TipsetKey,
Expand Down Expand Up @@ -903,6 +902,7 @@ fn impl_chain_get_path(
to_apply = next;
}
}

Ok(all_reverts
.into_iter()
.map(PathChange::Revert)
Expand Down Expand Up @@ -1332,6 +1332,7 @@ pub(crate) fn chain_notify<DB: Blockstore>(
while let Ok(v) = subscriber.recv().await {
let (change, tipset) = match v {
HeadChange::Apply(ts) => ("apply".into(), ts),
HeadChange::Revert(ts) => ("revert".into(), ts),
};

if sender.send(vec![ApiHeadChange { change, tipset }]).is_err() {
Expand Down Expand Up @@ -1791,7 +1792,7 @@ mod tests {
}

let actual =
impl_chain_get_path(store, from.make_tipset().key(), to.make_tipset().key()).unwrap();
chain_get_path(store, from.make_tipset().key(), to.make_tipset().key()).unwrap();
let expected = expected
.into_iter()
.map(|change| match change {
Expand Down
3 changes: 2 additions & 1 deletion src/state_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ where
let mut subscriber_poll = tokio::task::spawn(async move {
loop {
match subscriber.recv().await {
Ok(subscriber) => match subscriber {
Ok(head_change) => match head_change {
HeadChange::Apply(tipset) => {
if candidate_tipset
.as_ref()
Expand All @@ -1237,6 +1237,7 @@ where
candidate_receipt = Some(receipt)
}
}
HeadChange::Revert(_) => {}
},
Err(RecvError::Lagged(i)) => {
warn!(
Expand Down
Loading