diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index aac538e37dec..4869130b3627 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -163,23 +163,25 @@ where .set_heaviest_tipset_key(head.key())?; let old_head = std::mem::replace(&mut *self.heaviest_tipset_cache.write(), head.clone()); - let changes = match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) { - Ok(changes) => changes, - Err(e) => { - // Do not warn when the old head is genesis - if old_head.epoch() > 0 { - error!("failed to get chain path changes: {e}"); - } - // Fallback to single apply - PathChanges { - applies: vec![head], - reverts: vec![], + if crate::utils::broadcast::has_subscribers(&self.head_changes_tx) { + let changes = match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key()) + { + Ok(changes) => changes, + Err(e) => { + // Do not warn when the old head is genesis + if old_head.epoch() > 0 { + error!("failed to get chain path changes: {e}"); + } + // Fallback to single apply + PathChanges { + applies: vec![head], + reverts: vec![], + } } + }; + if self.head_changes_tx.send(changes).is_err() { + debug!("did not publish changes, no active receivers"); } - }; - - if self.head_changes_tx.send(changes).is_err() { - debug!("did not publish changes, no active receivers"); } Ok(()) diff --git a/src/db/parity_db.rs b/src/db/parity_db.rs index 1d80e78482a4..7823c8c798dc 100644 --- a/src/db/parity_db.rs +++ b/src/db/parity_db.rs @@ -6,10 +6,9 @@ use crate::blocks::TipsetKey; use crate::db::{DBStatistics, parity_db_config::ParityDbConfig}; use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; use crate::rpc::eth::types::EthHash; -use crate::utils::multihash::prelude::*; +use crate::utils::{broadcast::has_subscribers, multihash::prelude::*}; use anyhow::{Context as _, anyhow}; use cid::Cid; -use futures::FutureExt; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::DAG_CBOR; use parity_db::{CompressionType, Db, Operation, Options}; @@ -220,10 +219,6 @@ impl EthMappingsStore for ParityDb { } } -fn has_subscribers(tx: &tokio::sync::broadcast::Sender) -> bool { - tx.closed().now_or_never().is_none() -} - impl Blockstore for ParityDb { fn get(&self, k: &Cid) -> anyhow::Result>> { let column = Self::choose_column(k); diff --git a/src/utils/broadcast/mod.rs b/src/utils/broadcast/mod.rs new file mode 100644 index 000000000000..ee34a6de496a --- /dev/null +++ b/src/utils/broadcast/mod.rs @@ -0,0 +1,12 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +#[cfg(test)] +mod tests; + +use futures::FutureExt as _; + +/// Returns `true` if there are any active subscribers to the given broadcast channel. +pub fn has_subscribers(tx: &tokio::sync::broadcast::Sender) -> bool { + tx.closed().now_or_never().is_none() +} diff --git a/src/utils/broadcast/tests.rs b/src/utils/broadcast/tests.rs new file mode 100644 index 000000000000..27e9af6f65f7 --- /dev/null +++ b/src/utils/broadcast/tests.rs @@ -0,0 +1,25 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::*; + +#[tokio::test] +async fn test_has_subscribers() { + let (tx, mut rx1) = tokio::sync::broadcast::channel::(16); + let mut rx2 = tx.subscribe(); + tx.send(10).unwrap(); + assert_eq!(rx1.recv().await.unwrap(), 10); + drop(rx1); + assert!(has_subscribers(&tx)); + + assert_eq!(rx2.recv().await.unwrap(), 10); + drop(rx2); + assert!(!has_subscribers(&tx)); + + let mut rx3 = tx.subscribe(); + tx.send(10).unwrap(); + assert_eq!(rx3.recv().await.unwrap(), 10); + assert!(has_subscribers(&tx)); + drop(rx3); + assert!(!has_subscribers(&tx)); +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index d724536e4c20..f6efed16ffb1 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,6 +1,7 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +pub mod broadcast; pub mod cache; pub mod cid; pub mod db;