Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
179aafd
feat: :sparkles: extend EthereumBlockNotification with reorg info
manuelmauro Dec 17, 2025
14cd783
style: :art: fmt
manuelmauro Dec 17, 2025
8139c69
fix: :bug: emit reorg information from Canonicalize
manuelmauro Dec 17, 2025
7548b4f
Merge remote-tracking branch 'upstream' into manuel/make-new-heads-co…
manuelmauro Dec 17, 2025
97bfb4b
test: :white_check_mark: add basic integration tests
manuelmauro Dec 17, 2025
ff393a0
fix: :bug: fix newHead missing enacted block
manuelmauro Dec 17, 2025
e21e8ab
test: :white_check_mark: test deeper forks
manuelmauro Dec 17, 2025
c4ae8cf
style: :art: fmt
manuelmauro Dec 17, 2025
44e7517
fix: :bug: sort enacted blocks
manuelmauro Dec 18, 2025
2945dda
fix: :bug: guarantee async sends ordering
manuelmauro Dec 18, 2025
6d644a6
style: :art: fmt
manuelmauro Dec 18, 2025
6d93dbf
Revert "style: :art: fmt"
manuelmauro Dec 18, 2025
f76a514
Revert "fix: :bug: guarantee async sends ordering"
manuelmauro Dec 18, 2025
cd9a8c1
Revert "fix: :bug: sort enacted blocks"
manuelmauro Dec 18, 2025
44b7f00
ci: :construction_worker: test sql backend in CI
manuelmauro Dec 19, 2025
dbd0537
refactor: :recycle: dedupliacte common logic between sql and kv backends
manuelmauro Dec 19, 2025
da3e4f0
Merge remote-tracking branch 'upstream/master' into manuel/make-new-h…
manuelmauro Dec 19, 2025
bfc435c
style: :art: fmt
manuelmauro Dec 19, 2025
4582a9a
refactor: :recycle: uniform notification process across backends
manuelmauro Dec 19, 2025
0ccc2d0
docs: :memo: remove comments
manuelmauro Dec 19, 2025
c95ff80
refactor: :recycle: add defensive check
manuelmauro Jan 7, 2026
6e34048
fix: :bug: properly distinguish enacted blocks from new best
manuelmauro Jan 7, 2026
220bee4
refactor: :recycle: reduce code duplication
manuelmauro Jan 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ build-release:
test:
cargo test --lib --all
cargo test --lib --all --features=runtime-benchmarks
# Run fc-mapping-sync tests with SQL feature to ensure both backends are tested
cargo test --lib -p fc-mapping-sync --features=sql
# Run all unit tests with release profile
test-release:
cargo test --release --lib --all
cargo test --release --lib --all --features=runtime-benchmarks
# Run fc-mapping-sync tests with SQL feature to ensure both backends are tested
cargo test --release --lib -p fc-mapping-sync --features=sql

.PHONY: integration-test integration-test-lint
# Check code format and lint of integration tests
Expand Down
51 changes: 35 additions & 16 deletions client/mapping-sync/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod worker;

pub use worker::MappingSyncWorker;

use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

// Substrate
use sc_client_api::backend::{Backend, StorageProvider};
Expand All @@ -35,7 +35,11 @@ use fc_storage::StorageOverride;
use fp_consensus::{FindLogError, Hashes, Log, PostLog, PreLog};
use fp_rpc::EthereumRuntimeRPCApi;

use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy};
use crate::{
emit_block_notification, BlockNotificationContext, EthereumBlockNotification,
EthereumBlockNotificationSinks, SyncStrategy,
};
use worker::BestBlockInfo;

pub fn sync_block<Block: BlockT, C: HeaderBackend<Block>>(
storage_override: Arc<dyn StorageOverride<Block>>,
Expand Down Expand Up @@ -155,6 +159,7 @@ pub fn sync_one_block<Block: BlockT, C, BE>(
pubsub_notification_sinks: Arc<
EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
>,
best_at_import: &mut HashMap<Block::Hash, BestBlockInfo<Block>>,
) -> Result<bool, String>
where
C: ProvideRuntimeApi<Block>,
Expand Down Expand Up @@ -216,20 +221,26 @@ where
.meta()
.write_current_syncing_tips(current_syncing_tips)?;
}
// Notify on import and remove closed channels.
// Only notify when the node is node in major syncing.
let sinks = &mut pubsub_notification_sinks.lock();
sinks.retain(|sink| {
if !sync_oracle.is_major_syncing() {
let hash = operating_header.hash();
let is_new_best = client.info().best_hash == hash;
sink.unbounded_send(EthereumBlockNotification { is_new_best, hash })
.is_ok()
} else {
// Remove from the pool if in major syncing.
false
}
});
// Notify on import and remove closed channels using the unified notification mechanism.
let hash = operating_header.hash();
// Use the `is_new_best` status from import time if available.
// This avoids race conditions where the best hash may have changed
// between import and sync time (e.g., during rapid reorgs).
// Fall back to current best hash check for blocks synced during catch-up.
let best_info = best_at_import.remove(&hash);
let is_new_best = best_info.is_some() || client.info().best_hash == hash;
let reorg_info = best_info.and_then(|info| info.reorg_info);

emit_block_notification(
pubsub_notification_sinks.as_ref(),
sync_oracle.as_ref(),
BlockNotificationContext {
hash,
is_new_best,
reorg_info,
},
);

Ok(true)
}

Expand All @@ -245,6 +256,7 @@ pub fn sync_blocks<Block: BlockT, C, BE>(
pubsub_notification_sinks: Arc<
EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
>,
best_at_import: &mut HashMap<Block::Hash, BestBlockInfo<Block>>,
) -> Result<bool, String>
where
C: ProvideRuntimeApi<Block>,
Expand All @@ -265,9 +277,16 @@ where
strategy,
sync_oracle.clone(),
pubsub_notification_sinks.clone(),
best_at_import,
)?;
}

// Prune old entries from best_at_import to prevent unbounded growth.
// Entries for finalized blocks are no longer needed since finalized blocks
// cannot be reorged and their is_new_best status is irrelevant.
let finalized_number = client.info().finalized_number;
best_at_import.retain(|_, info| info.block_number > finalized_number);

Ok(synced_any)
}

Expand Down
54 changes: 49 additions & 5 deletions client/mapping-sync/src/kv/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::{pin::Pin, sync::Arc, time::Duration};
use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration};

use futures::{
prelude::*,
Expand All @@ -37,7 +37,15 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use fc_storage::StorageOverride;
use fp_rpc::EthereumRuntimeRPCApi;

use crate::SyncStrategy;
use crate::{ReorgInfo, SyncStrategy};

/// Information tracked at import time for a block that was `is_new_best`.
pub struct BestBlockInfo<Block: BlockT> {
/// The block number (for pruning purposes).
pub block_number: <Block::Header as HeaderT>::Number,
/// Reorg info if this block became best as part of a reorganization.
pub reorg_info: Option<ReorgInfo<Block>>,
}

pub struct MappingSyncWorker<Block: BlockT, C, BE> {
import_notifications: ImportNotifications<Block>,
Expand All @@ -57,6 +65,13 @@ pub struct MappingSyncWorker<Block: BlockT, C, BE> {
sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
pubsub_notification_sinks:
Arc<crate::EthereumBlockNotificationSinks<crate::EthereumBlockNotification<Block>>>,

/// Tracks block hashes that were `is_new_best` at the time of their import notification,
/// along with their block number for pruning purposes and optional reorg info.
/// This is used to correctly determine `is_new_best` when syncing blocks, avoiding race
/// conditions where the best hash may have changed between import and sync time.
/// Entries are pruned when blocks become finalized to prevent unbounded growth.
best_at_import: HashMap<Block::Hash, BestBlockInfo<Block>>,
}

impl<Block: BlockT, C, BE> Unpin for MappingSyncWorker<Block, C, BE> {}
Expand Down Expand Up @@ -94,6 +109,7 @@ impl<Block: BlockT, C, BE> MappingSyncWorker<Block, C, BE> {

sync_oracle,
pubsub_notification_sinks,
best_at_import: HashMap::new(),
}
}
}
Expand All @@ -114,8 +130,25 @@ where
loop {
match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) {
Poll::Pending => break,
Poll::Ready(Some(_)) => {
Poll::Ready(Some(notification)) => {
fire = true;
// Track blocks that were `is_new_best` at import time to avoid race
// conditions when determining `is_new_best` at sync time.
// We store the block number to enable pruning of old entries,
// and reorg info if this block became best as part of a reorg.
if notification.is_new_best {
// For notification: include new_best_hash per Ethereum spec.
let reorg_info = notification.tree_route.as_ref().map(|tree_route| {
ReorgInfo::from_tree_route(tree_route, notification.hash)
});
self.best_at_import.insert(
notification.hash,
BestBlockInfo {
block_number: *notification.header.number(),
reorg_info,
},
);
}
}
Poll::Ready(None) => return Poll::Ready(None),
}
Expand All @@ -138,7 +171,12 @@ where
if fire {
self.inner_delay = None;

match crate::kv::sync_blocks(
// Temporarily take ownership of best_at_import to avoid borrow checker issues
// (we can't have both an immutable borrow of self.client and a mutable borrow
// of self.best_at_import at the same time)
let mut best_at_import = std::mem::take(&mut self.best_at_import);

let result = crate::kv::sync_blocks(
self.client.as_ref(),
self.substrate_backend.as_ref(),
self.storage_override.clone(),
Expand All @@ -148,7 +186,13 @@ where
self.strategy,
self.sync_oracle.clone(),
self.pubsub_notification_sinks.clone(),
) {
&mut best_at_import,
);

// Restore the best_at_import set
self.best_at_import = best_at_import;

match result {
Ok(have_next) => {
self.have_next = have_next;
Poll::Ready(Some(()))
Expand Down
95 changes: 94 additions & 1 deletion client/mapping-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub mod kv;
#[cfg(feature = "sql")]
pub mod sql;

use sp_blockchain::TreeRoute;
use sp_consensus::SyncOracle;
use sp_runtime::traits::Block as BlockT;

#[derive(Copy, Clone, Eq, PartialEq)]
Expand All @@ -34,8 +36,99 @@ pub enum SyncStrategy {
pub type EthereumBlockNotificationSinks<T> =
parking_lot::Mutex<Vec<sc_utils::mpsc::TracingUnboundedSender<T>>>;

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
/// Information about a chain reorganization.
///
/// When a reorg occurs, this struct contains the blocks that were removed from
/// the canonical chain (retracted) and the blocks that were added (enacted).
/// The `common_ancestor` is the last block that remains canonical in both
/// the old and new chains.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ReorgInfo<Block: BlockT> {
/// The common ancestor block hash between the old and new canonical chains.
pub common_ancestor: Block::Hash,
/// Blocks that were removed from the canonical chain (old fork).
pub retracted: Vec<Block::Hash>,
/// Blocks that were added to the canonical chain (new fork), excluding `new_best`.
pub enacted: Vec<Block::Hash>,
/// The new best block hash that triggered this reorg.
pub new_best: Block::Hash,
}

impl<Block: BlockT> ReorgInfo<Block> {
/// Create reorg info from a tree route and the new best block hash.
///
/// `tree_route` is "from old best to new best parent", so `enacted()` excludes
/// the new best block itself. The `new_best` is stored separately and callers
/// should handle emitting it after the enacted blocks.
pub fn from_tree_route(tree_route: &TreeRoute<Block>, new_best: Block::Hash) -> Self {
let retracted = tree_route
.retracted()
.iter()
.map(|hash_and_number| hash_and_number.hash)
.collect();

let enacted = tree_route
.enacted()
.iter()
.map(|hash_and_number| hash_and_number.hash)
.collect();

Self {
common_ancestor: tree_route.common_block().hash,
retracted,
enacted,
new_best,
}
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct EthereumBlockNotification<Block: BlockT> {
pub is_new_best: bool,
pub hash: Block::Hash,
/// Optional reorg information. Present when this block became best as part of a reorg.
pub reorg_info: Option<ReorgInfo<Block>>,
}

/// Context for emitting block notifications.
/// Contains all information needed to emit a notification consistently
/// across both KV and SQL backends.
pub struct BlockNotificationContext<Block: BlockT> {
/// The block hash being notified about.
pub hash: Block::Hash,
/// Whether this block is the new best block.
pub is_new_best: bool,
/// Optional reorg information if this block became best as part of a reorg.
pub reorg_info: Option<ReorgInfo<Block>>,
}

/// Emit block notification to all registered sinks.
///
/// This function provides a unified notification mechanism for both KV and SQL backends:
/// - Clears all sinks when major syncing (to prevent stale subscriptions)
/// - Sends notification to all sinks and removes closed sinks when not syncing
///
/// Both backends should call this function after completing block sync/indexing
/// to ensure consistent notification behavior regardless of the storage backend used.
pub fn emit_block_notification<Block: BlockT>(
pubsub_notification_sinks: &EthereumBlockNotificationSinks<EthereumBlockNotification<Block>>,
sync_oracle: &dyn SyncOracle,
context: BlockNotificationContext<Block>,
) {
let sinks = &mut pubsub_notification_sinks.lock();

if sync_oracle.is_major_syncing() {
// Remove all sinks when major syncing to prevent stale subscriptions
sinks.clear();
return;
}

sinks.retain(|sink| {
sink.unbounded_send(EthereumBlockNotification {
is_new_best: context.is_new_best,
hash: context.hash,
reorg_info: context.reorg_info.clone(),
})
.is_ok()
});
}
Loading
Loading