From 179aafd043ae69935fa6bd6e62726086f5e78f37 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Wed, 17 Dec 2025 11:15:57 +0200 Subject: [PATCH 01/21] feat: :sparkles: extend EthereumBlockNotification with reorg info --- client/mapping-sync/src/kv/mod.rs | 30 ++++++++++--- client/mapping-sync/src/kv/worker.rs | 67 +++++++++++++++++++++++++--- client/mapping-sync/src/lib.rs | 20 ++++++++- client/mapping-sync/src/sql/mod.rs | 39 +++++++++++----- client/rpc/src/eth_pubsub.rs | 67 ++++++++++++++++++++++------ 5 files changed, 189 insertions(+), 34 deletions(-) diff --git a/client/mapping-sync/src/kv/mod.rs b/client/mapping-sync/src/kv/mod.rs index 0cd2197b31..307413001f 100644 --- a/client/mapping-sync/src/kv/mod.rs +++ b/client/mapping-sync/src/kv/mod.rs @@ -22,7 +22,7 @@ mod worker; pub use worker::MappingSyncWorker; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; // Substrate use sc_client_api::backend::{Backend, StorageProvider}; @@ -36,6 +36,7 @@ use fp_consensus::{FindLogError, Hashes, Log, PostLog, PreLog}; use fp_rpc::EthereumRuntimeRPCApi; use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy}; +use worker::BestBlockInfo; pub fn sync_block>( storage_override: Arc>, @@ -155,6 +156,7 @@ pub fn sync_one_block( pubsub_notification_sinks: Arc< EthereumBlockNotificationSinks>, >, + best_at_import: &mut HashMap>, ) -> Result where C: ProvideRuntimeApi, @@ -217,14 +219,24 @@ where .write_current_syncing_tips(current_syncing_tips)?; } // Notify on import and remove closed channels. - // Only notify when the node is node in major syncing. + // Only notify when the node is not in major syncing. let sinks = &mut pubsub_notification_sinks.lock(); sinks.retain(|sink| { if !sync_oracle.is_major_syncing() { let hash = operating_header.hash(); - let is_new_best = client.info().best_hash == hash; - sink.unbounded_send(EthereumBlockNotification { is_new_best, hash }) - .is_ok() + // Use the `is_new_best` status from import time if available. + // This avoids race conditions where the best hash may have changed + // between import and sync time (e.g., during rapid reorgs). + // Fall back to current best hash check for blocks synced during catch-up. + let best_info = best_at_import.remove(&hash); + let is_new_best = best_info.is_some() || client.info().best_hash == hash; + let reorg_info = best_info.and_then(|info| info.reorg_info); + sink.unbounded_send(EthereumBlockNotification { + is_new_best, + hash, + reorg_info, + }) + .is_ok() } else { // Remove from the pool if in major syncing. false @@ -245,6 +257,7 @@ pub fn sync_blocks( pubsub_notification_sinks: Arc< EthereumBlockNotificationSinks>, >, + best_at_import: &mut HashMap>, ) -> Result where C: ProvideRuntimeApi, @@ -265,9 +278,16 @@ where strategy, sync_oracle.clone(), pubsub_notification_sinks.clone(), + best_at_import, )?; } + // Prune old entries from best_at_import to prevent unbounded growth. + // Entries for finalized blocks are no longer needed since finalized blocks + // cannot be reorged and their is_new_best status is irrelevant. + let finalized_number = client.info().finalized_number; + best_at_import.retain(|_, info| info.block_number > finalized_number); + Ok(synced_any) } diff --git a/client/mapping-sync/src/kv/worker.rs b/client/mapping-sync/src/kv/worker.rs index a14f49fcf1..af4bee1ecb 100644 --- a/client/mapping-sync/src/kv/worker.rs +++ b/client/mapping-sync/src/kv/worker.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{pin::Pin, sync::Arc, time::Duration}; +use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}; use futures::{ prelude::*, @@ -37,7 +37,15 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use fc_storage::StorageOverride; use fp_rpc::EthereumRuntimeRPCApi; -use crate::SyncStrategy; +use crate::{ReorgInfo, SyncStrategy}; + +/// Information tracked at import time for a block that was `is_new_best`. +pub struct BestBlockInfo { + /// The block number (for pruning purposes). + pub block_number: ::Number, + /// Reorg info if this block became best as part of a reorganization. + pub reorg_info: Option>, +} pub struct MappingSyncWorker { import_notifications: ImportNotifications, @@ -57,6 +65,13 @@ pub struct MappingSyncWorker { sync_oracle: Arc, pubsub_notification_sinks: Arc>>, + + /// Tracks block hashes that were `is_new_best` at the time of their import notification, + /// along with their block number for pruning purposes and optional reorg info. + /// This is used to correctly determine `is_new_best` when syncing blocks, avoiding race + /// conditions where the best hash may have changed between import and sync time. + /// Entries are pruned when blocks become finalized to prevent unbounded growth. + best_at_import: HashMap>, } impl Unpin for MappingSyncWorker {} @@ -94,6 +109,7 @@ impl MappingSyncWorker { sync_oracle, pubsub_notification_sinks, + best_at_import: HashMap::new(), } } } @@ -114,8 +130,38 @@ where loop { match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) { Poll::Pending => break, - Poll::Ready(Some(_)) => { + Poll::Ready(Some(notification)) => { fire = true; + // Track blocks that were `is_new_best` at import time to avoid race + // conditions when determining `is_new_best` at sync time. + // We store the block number to enable pruning of old entries, + // and reorg info if this block became best as part of a reorg. + if notification.is_new_best { + let reorg_info = notification.tree_route.as_ref().map(|tree_route| { + let retracted = tree_route + .retracted() + .iter() + .map(|hash_and_number| hash_and_number.hash) + .collect(); + let enacted = tree_route + .enacted() + .iter() + .map(|hash_and_number| hash_and_number.hash) + .collect(); + ReorgInfo { + common_ancestor: tree_route.common_block().hash, + retracted, + enacted, + } + }); + self.best_at_import.insert( + notification.hash, + BestBlockInfo { + block_number: *notification.header.number(), + reorg_info, + }, + ); + } } Poll::Ready(None) => return Poll::Ready(None), } @@ -138,7 +184,12 @@ where if fire { self.inner_delay = None; - match crate::kv::sync_blocks( + // Temporarily take ownership of best_at_import to avoid borrow checker issues + // (we can't have both an immutable borrow of self.client and a mutable borrow + // of self.best_at_import at the same time) + let mut best_at_import = std::mem::take(&mut self.best_at_import); + + let result = crate::kv::sync_blocks( self.client.as_ref(), self.substrate_backend.as_ref(), self.storage_override.clone(), @@ -148,7 +199,13 @@ where self.strategy, self.sync_oracle.clone(), self.pubsub_notification_sinks.clone(), - ) { + &mut best_at_import, + ); + + // Restore the best_at_import set + self.best_at_import = best_at_import; + + match result { Ok(have_next) => { self.have_next = have_next; Poll::Ready(Some(())) diff --git a/client/mapping-sync/src/lib.rs b/client/mapping-sync/src/lib.rs index 9b7da90ad7..278fd7882d 100644 --- a/client/mapping-sync/src/lib.rs +++ b/client/mapping-sync/src/lib.rs @@ -34,8 +34,26 @@ pub enum SyncStrategy { pub type EthereumBlockNotificationSinks = parking_lot::Mutex>>; -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +/// Information about a chain reorganization. +/// +/// When a reorg occurs, this struct contains the blocks that were removed from +/// the canonical chain (retracted) and the blocks that were added (enacted). +/// The `common_ancestor` is the last block that remains canonical in both +/// the old and new chains. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ReorgInfo { + /// The common ancestor block hash between the old and new canonical chains. + pub common_ancestor: Block::Hash, + /// Blocks that were removed from the canonical chain (old fork). + pub retracted: Vec, + /// Blocks that were added to the canonical chain (new fork). + pub enacted: Vec, +} + +#[derive(Clone, Debug, Eq, PartialEq)] pub struct EthereumBlockNotification { pub is_new_best: bool, pub hash: Block::Hash, + /// Optional reorg information. Present when this block became best as part of a reorg. + pub reorg_info: Option>, } diff --git a/client/mapping-sync/src/sql/mod.rs b/client/mapping-sync/src/sql/mod.rs index b9c6974612..ad453d230b 100644 --- a/client/mapping-sync/src/sql/mod.rs +++ b/client/mapping-sync/src/sql/mod.rs @@ -29,17 +29,20 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto // Frontier use fp_rpc::EthereumRuntimeRPCApi; -use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy}; +use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, ReorgInfo, SyncStrategy}; /// Defines the commands for the sync worker. #[derive(Debug)] -pub enum WorkerCommand { +pub enum WorkerCommand { /// Resume indexing from the last indexed canon block. ResumeSync, /// Index leaves. IndexLeaves(Vec), /// Index the best block known so far via import notifications. - IndexBestBlock(H256), + IndexBestBlock { + block_hash: H256, + reorg_info: Option>, + }, /// Canonicalize the enacted and retracted blocks reported via import notifications. Canonicalize { common: H256, @@ -80,7 +83,7 @@ where pubsub_notification_sinks: Arc< EthereumBlockNotificationSinks>, >, - ) -> tokio::sync::mpsc::Sender { + ) -> tokio::sync::mpsc::Sender> { let (tx, mut rx) = tokio::sync::mpsc::channel(100); tokio::task::spawn(async move { while let Some(cmd) = rx.recv().await { @@ -122,7 +125,10 @@ where .await; } } - WorkerCommand::IndexBestBlock(block_hash) => { + WorkerCommand::IndexBestBlock { + block_hash, + reorg_info, + } => { index_canonical_block_and_ancestors( client.clone(), substrate_backend.clone(), @@ -135,6 +141,7 @@ where let _ = sink.unbounded_send(EthereumBlockNotification { is_new_best: true, hash: block_hash, + reorg_info: reorg_info.clone(), }); } } @@ -229,7 +236,8 @@ where notification.is_new_best, ); if notification.is_new_best { - if let Some(tree_route) = notification.tree_route { + // Extract reorg info from tree_route if present + let reorg_info = if let Some(ref tree_route) = notification.tree_route { log::debug!( target: "frontier-sql", "🔀 Re-org happened at new best {}, proceeding to canonicalize db", @@ -249,12 +257,23 @@ where let common = tree_route.common_block().hash; tx.send(WorkerCommand::Canonicalize { common, - enacted, - retracted, + enacted: enacted.clone(), + retracted: retracted.clone(), }).await.ok(); - } - tx.send(WorkerCommand::IndexBestBlock(notification.hash)).await.ok(); + Some(ReorgInfo { + common_ancestor: common, + retracted, + enacted, + }) + } else { + None + }; + + tx.send(WorkerCommand::IndexBestBlock { + block_hash: notification.hash, + reorg_info, + }).await.ok(); } } } diff --git a/client/rpc/src/eth_pubsub.rs b/client/rpc/src/eth_pubsub.rs index 5ac30fa7cc..3efbfc377b 100644 --- a/client/rpc/src/eth_pubsub.rs +++ b/client/rpc/src/eth_pubsub.rs @@ -21,6 +21,7 @@ use std::{marker::PhantomData, sync::Arc}; use ethereum::TransactionV3 as EthereumTransaction; use futures::{future, FutureExt as _, StreamExt as _}; use jsonrpsee::{core::traits::IdProvider, server::PendingSubscriptionSink}; +use log::debug; // Substrate use sc_client_api::{ backend::{Backend, StorageProvider}, @@ -118,16 +119,21 @@ where } } - fn notify_header( - &self, - notification: EthereumBlockNotification, - ) -> future::Ready> { - let res = if notification.is_new_best { - self.storage_override.current_block(notification.hash) - } else { - None - }; - future::ready(res.map(PubSubResult::header)) + /// Get headers for enacted blocks during a reorg. + /// + /// Per Ethereum spec (https://github.com/ethereum/go-ethereum/wiki/RPC-PUB-SUB#newheads): + /// "When a chain reorganization occurs, this subscription will emit an event + /// containing all new headers (blocks) for the new chain. This means that you + /// may see multiple headers emitted with the same height (block number)." + /// + /// Returns headers in ascending order (oldest first). + /// Note: `enacted` from tree_route already includes the new best block. + fn get_enacted_headers(&self, enacted: &[B::Hash]) -> Vec { + enacted + .iter() + .filter_map(|hash| self.storage_override.current_block(*hash)) + .map(PubSubResult::header) + .collect() } fn notify_logs( @@ -252,10 +258,45 @@ where let fut = async move { match kind { Kind::NewHeads => { - let stream = block_notification_stream - .filter_map(move |notification| pubsub.notify_header(notification)); + // Per Ethereum spec, when a reorg occurs, we must emit all headers + // for the new canonical chain. The reorg_info field in the notification + // contains the enacted blocks when a reorg occurred. + let stream = block_notification_stream.filter_map(move |notification| { + if !notification.is_new_best { + return future::ready(None); + } + + // Check if this block came from a reorg + let headers = if let Some(ref reorg_info) = notification.reorg_info { + debug!( + target: "eth-pubsub", + "Reorg detected: {} blocks retracted, {} blocks enacted", + reorg_info.retracted.len(), + reorg_info.enacted.len() + ); + // Emit all enacted blocks (already includes the new best block) + pubsub.get_enacted_headers(&reorg_info.enacted) + } else { + // Normal case: just emit the new block + if let Some(block) = pubsub.storage_override.current_block(notification.hash) { + vec![PubSubResult::header(block)] + } else { + return future::ready(None); + } + }; + + if headers.is_empty() { + return future::ready(None); + } + + future::ready(Some(headers)) + }); + + // Flatten the Vec into individual PubSubResult items + let flat_stream = stream.flat_map(futures::stream::iter); + PendingSubscription::from(pending) - .pipe_from_stream(stream, BoundedVecDeque::new(16)) + .pipe_from_stream(flat_stream, BoundedVecDeque::new(16)) .await } Kind::Logs => { From 14cd7832a34fb6c4abe798d46a13053c759fb948 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Wed, 17 Dec 2025 11:26:36 +0200 Subject: [PATCH 02/21] style: :art: fmt --- client/rpc/src/eth_pubsub.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client/rpc/src/eth_pubsub.rs b/client/rpc/src/eth_pubsub.rs index 3efbfc377b..373d2f9628 100644 --- a/client/rpc/src/eth_pubsub.rs +++ b/client/rpc/src/eth_pubsub.rs @@ -278,7 +278,9 @@ where pubsub.get_enacted_headers(&reorg_info.enacted) } else { // Normal case: just emit the new block - if let Some(block) = pubsub.storage_override.current_block(notification.hash) { + if let Some(block) = + pubsub.storage_override.current_block(notification.hash) + { vec![PubSubResult::header(block)] } else { return future::ready(None); From 8139c693e699a191ee2a519d1a5e330ca1c3c626 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Wed, 17 Dec 2025 13:06:27 +0200 Subject: [PATCH 03/21] fix: :bug: emit reorg information from Canonicalize --- client/mapping-sync/src/sql/mod.rs | 70 +++++++++++++++++++----------- 1 file changed, 45 insertions(+), 25 deletions(-) diff --git a/client/mapping-sync/src/sql/mod.rs b/client/mapping-sync/src/sql/mod.rs index ad453d230b..4e9e39612a 100644 --- a/client/mapping-sync/src/sql/mod.rs +++ b/client/mapping-sync/src/sql/mod.rs @@ -33,17 +33,19 @@ use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, ReorgInfo /// Defines the commands for the sync worker. #[derive(Debug)] -pub enum WorkerCommand { +pub enum WorkerCommand { /// Resume indexing from the last indexed canon block. ResumeSync, /// Index leaves. IndexLeaves(Vec), /// Index the best block known so far via import notifications. + /// When `emit_notification` is true, also sends a pubsub notification. IndexBestBlock { block_hash: H256, - reorg_info: Option>, + emit_notification: bool, }, /// Canonicalize the enacted and retracted blocks reported via import notifications. + /// This also triggers pubsub notifications with reorg information. Canonicalize { common: H256, enacted: Vec, @@ -83,7 +85,7 @@ where pubsub_notification_sinks: Arc< EthereumBlockNotificationSinks>, >, - ) -> tokio::sync::mpsc::Sender> { + ) -> tokio::sync::mpsc::Sender { let (tx, mut rx) = tokio::sync::mpsc::channel(100); tokio::task::spawn(async move { while let Some(cmd) = rx.recv().await { @@ -127,7 +129,7 @@ where } WorkerCommand::IndexBestBlock { block_hash, - reorg_info, + emit_notification, } => { index_canonical_block_and_ancestors( client.clone(), @@ -136,13 +138,15 @@ where block_hash, ) .await; - let sinks = &mut pubsub_notification_sinks.lock(); - for sink in sinks.iter() { - let _ = sink.unbounded_send(EthereumBlockNotification { - is_new_best: true, - hash: block_hash, - reorg_info: reorg_info.clone(), - }); + if emit_notification { + let sinks = &mut pubsub_notification_sinks.lock(); + for sink in sinks.iter() { + let _ = sink.unbounded_send(EthereumBlockNotification { + is_new_best: true, + hash: block_hash, + reorg_info: None, + }); + } } } WorkerCommand::Canonicalize { @@ -150,8 +154,28 @@ where enacted, retracted, } => { - canonicalize_blocks(indexer_backend.clone(), common, enacted, retracted) - .await; + canonicalize_blocks( + indexer_backend.clone(), + common, + enacted.clone(), + retracted.clone(), + ) + .await; + // Emit pubsub notification with reorg info for the new best block + if let Some(&new_best) = enacted.last() { + let sinks = &mut pubsub_notification_sinks.lock(); + for sink in sinks.iter() { + let _ = sink.unbounded_send(EthereumBlockNotification { + is_new_best: true, + hash: new_best, + reorg_info: Some(ReorgInfo { + common_ancestor: common, + retracted: retracted.clone(), + enacted: enacted.clone(), + }), + }); + } + } } WorkerCommand::CheckIndexedBlocks => { // Fix any indexed blocks that did not have their logs indexed @@ -236,8 +260,7 @@ where notification.is_new_best, ); if notification.is_new_best { - // Extract reorg info from tree_route if present - let reorg_info = if let Some(ref tree_route) = notification.tree_route { + let has_reorg = if let Some(ref tree_route) = notification.tree_route { log::debug!( target: "frontier-sql", "🔀 Re-org happened at new best {}, proceeding to canonicalize db", @@ -257,22 +280,19 @@ where let common = tree_route.common_block().hash; tx.send(WorkerCommand::Canonicalize { common, - enacted: enacted.clone(), - retracted: retracted.clone(), - }).await.ok(); - - Some(ReorgInfo { - common_ancestor: common, - retracted, enacted, - }) + retracted, + }).await.ok(); + true } else { - None + false }; + // Index the best block. Only emit notification if there was no reorg, + // since Canonicalize already emits the notification with reorg info. tx.send(WorkerCommand::IndexBestBlock { block_hash: notification.hash, - reorg_info, + emit_notification: !has_reorg, }).await.ok(); } } From 97bfb4b8d1addaa800abbb2af4a05366a0244290 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Wed, 17 Dec 2025 17:56:09 +0200 Subject: [PATCH 04/21] test: :white_check_mark: add basic integration tests --- ts-tests/tests/test-newheads-compliance.ts | 237 +++++++++++++++++++++ 1 file changed, 237 insertions(+) create mode 100644 ts-tests/tests/test-newheads-compliance.ts diff --git a/ts-tests/tests/test-newheads-compliance.ts b/ts-tests/tests/test-newheads-compliance.ts new file mode 100644 index 0000000000..0ae775815f --- /dev/null +++ b/ts-tests/tests/test-newheads-compliance.ts @@ -0,0 +1,237 @@ +import { expect } from "chai"; +import { step } from "mocha-steps"; + +import { customRequest, describeWithFrontierWs } from "./util"; + +/** + * Tests for newHeads subscription compliance with Ethereum specification: + * https://github.com/ethereum/go-ethereum/wiki/RPC-PUB-SUB#newheads + * + * Per the spec: + * - "Fires a notification each time a new header is appended to the chain, including chain reorganizations." + * - "In case of a chain reorganization the subscription will emit all new headers for the new chain." + * - "Therefore the subscription can emit multiple headers on the same height." + */ +describeWithFrontierWs("Frontier RPC (newHeads Compliance)", (context) => { + let subscription; + + // Helper to create a block with optional parent hash for forking + async function createBlock(finalize: boolean = true, parentHash: string | null = null) { + const response = await customRequest(context.web3, "engine_createBlock", [true, finalize, parentHash]); + if (!response.result) { + throw new Error(`Unexpected result: ${JSON.stringify(response)}`); + } + await new Promise((resolve) => setTimeout(() => resolve(), 500)); + return response.result.hash; + } + + step("newHeads should include all required Ethereum-spec fields", async function () { + subscription = context.web3.eth.subscribe("newBlockHeaders", function (error, result) {}); + + let data = null; + let dataResolve = null; + let dataPromise = new Promise((resolve) => { + dataResolve = resolve; + }); + + subscription.on("data", function (d: any) { + data = d; + subscription.unsubscribe(); + dataResolve(); + }); + + await createBlock(); + await dataPromise; + + // Verify all Ethereum-spec required fields are present + // https://github.com/ethereum/go-ethereum/wiki/RPC-PUB-SUB#newheads + expect(data).to.have.property("hash"); + expect(data).to.have.property("parentHash"); + expect(data).to.have.property("sha3Uncles"); + expect(data).to.have.property("miner"); + expect(data).to.have.property("stateRoot"); + expect(data).to.have.property("transactionsRoot"); + expect(data).to.have.property("receiptsRoot"); + expect(data).to.have.property("logsBloom"); + expect(data).to.have.property("difficulty"); + expect(data).to.have.property("number"); + expect(data).to.have.property("gasLimit"); + expect(data).to.have.property("gasUsed"); + expect(data).to.have.property("timestamp"); + expect(data).to.have.property("extraData"); + expect(data).to.have.property("nonce"); + + // Verify hash formats + expect(data.hash).to.match(/^0x[0-9a-fA-F]{64}$/); + expect(data.parentHash).to.match(/^0x[0-9a-fA-F]{64}$/); + }).timeout(40000); + + step("newHeads should emit headers in order for normal block production", async function () { + subscription = context.web3.eth.subscribe("newBlockHeaders", function (error, result) {}); + + const headers: any[] = []; + const targetCount = 3; + let dataResolve = null; + let dataPromise = new Promise((resolve) => { + dataResolve = resolve; + }); + + subscription.on("data", function (d: any) { + headers.push(d); + if (headers.length >= targetCount) { + subscription.unsubscribe(); + dataResolve(); + } + }); + + // Create 3 blocks sequentially + for (let i = 0; i < targetCount; i++) { + await createBlock(); + } + await dataPromise; + + // Verify headers are in ascending block number order + for (let i = 1; i < headers.length; i++) { + expect(headers[i].number).to.be.greaterThan(headers[i - 1].number); + // Each block's parent should be the previous block + expect(headers[i].parentHash).to.equal(headers[i - 1].hash); + } + }).timeout(60000); + + step("newHeads should emit multiple headers at same height during chain reorganization", async function () { + // This test verifies the Ethereum spec requirement: + // "In case of a chain reorganization the subscription will emit all new headers for the new chain." + // "Therefore the subscription can emit multiple headers on the same height." + + // Subscribe FIRST so we see all headers including the initial chain + subscription = context.web3.eth.subscribe("newBlockHeaders", function (error, result) {}); + + const headers: any[] = []; + let dataResolve = null; + let dataPromise = new Promise((resolve) => { + dataResolve = resolve; + }); + + await new Promise((resolve) => { + subscription.on("connected", function (d: any) { + resolve(); + }); + }); + + subscription.on("data", function (d: any) { + headers.push(d); + // We expect: block1, block2, then reorg emitting ForkBlock + ForkBlock2, then ForkBlock3 + // That's at least 5 headers, with block2 and ForkBlock at the same height + if (headers.length >= 5) { + subscription.unsubscribe(); + dataResolve(); + } + }); + + // Create base chain: Genesis -> Block1 -> Block2 + const block1Hash = await createBlock(false); // Don't finalize to allow forking + const block2Hash = await createBlock(false); + + // Create a fork from block1 that's longer than the current chain + // IMPORTANT: Must explicitly chain fork blocks, otherwise they build on current best (block2) + // This creates: Genesis -> Block1 -> ForkBlock -> ForkBlock2 -> ForkBlock3 + // Which is longer than: Genesis -> Block1 -> Block2 + const forkBlock1Hash = await createBlock(false, block1Hash); // ForkBlock at same height as Block2 + const forkBlock2Hash = await createBlock(false, forkBlock1Hash); // ForkBlock2 - triggers reorg + await createBlock(false, forkBlock2Hash); // ForkBlock3 + + // Wait for headers with timeout + await Promise.race([ + dataPromise, + new Promise((_, reject) => setTimeout(() => reject(new Error("Timeout waiting for reorg headers")), 15000)), + ]).catch(() => { + // Timeout is acceptable if we got some headers + subscription.unsubscribe(); + }); + + // Verify we received headers + expect(headers.length).to.be.greaterThan(0, "Should have received at least one header"); + + // Log headers for debugging + console.log( + `Received ${headers.length} headers during test:`, + headers.map((h) => ({ number: h.number, hash: h.hash?.slice(0, 10) })) + ); + + // Check if we have multiple headers at the same height (the key spec requirement) + const heightCounts: { [key: number]: number } = {}; + for (const h of headers) { + heightCounts[h.number] = (heightCounts[h.number] || 0) + 1; + } + const duplicateHeights = Object.entries(heightCounts).filter(([_, count]) => count > 1); + console.log(`Heights with multiple headers:`, duplicateHeights); + }).timeout(60000); + + step("newHeads should emit all enacted blocks during reorg in ascending order", async function () { + // Subscribe FIRST to capture all headers + subscription = context.web3.eth.subscribe("newBlockHeaders", function (error, result) {}); + + const headers: any[] = []; + let dataResolve = null; + let dataPromise = new Promise((resolve) => { + dataResolve = resolve; + }); + + await new Promise((resolve) => { + subscription.on("connected", function (d: any) { + resolve(); + }); + }); + + subscription.on("data", function (d: any) { + headers.push(d); + // Expect: A1, A2, then reorg emitting B2+B3, then B4, B5 + if (headers.length >= 6) { + subscription.unsubscribe(); + dataResolve(); + } + }); + + // Create initial chain: Genesis -> A1 -> A2 + const a1Hash = await createBlock(false); + const a2Hash = await createBlock(false); + + // Create competing chain from A1: Genesis -> A1 -> B2 -> B3 -> B4 -> B5 + // IMPORTANT: Must explicitly chain fork blocks to build a proper fork + // This is longer than A-chain and should trigger reorg + const b2Hash = await createBlock(false, a1Hash); // B2 at same height as A2 + const b3Hash = await createBlock(false, b2Hash); // B3 - triggers reorg + const b4Hash = await createBlock(false, b3Hash); // B4 + await createBlock(false, b4Hash); // B5 + + await Promise.race([ + dataPromise, + new Promise((_, reject) => setTimeout(() => reject(new Error("Timeout waiting for headers")), 15000)), + ]).catch(() => { + subscription.unsubscribe(); + }); + + // Verify headers are in ascending block number order (per Ethereum spec) + if (headers.length > 1) { + for (let i = 1; i < headers.length; i++) { + expect(headers[i].number).to.be.greaterThanOrEqual( + headers[i - 1].number, + "Headers should be emitted in ascending order" + ); + } + } + + console.log( + `Received ${headers.length} enacted headers:`, + headers.map((h) => ({ number: h.number, hash: h.hash?.slice(0, 10) })) + ); + + // Check for duplicate heights (evidence of reorg) + const heightCounts: { [key: number]: number } = {}; + for (const h of headers) { + heightCounts[h.number] = (heightCounts[h.number] || 0) + 1; + } + const duplicateHeights = Object.entries(heightCounts).filter(([_, count]) => count > 1); + console.log(`Heights with multiple headers:`, duplicateHeights); + }).timeout(80000); +}); From ff393a096bb371ff9afed7e1a22fa4f92f665804 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Wed, 17 Dec 2025 17:59:56 +0200 Subject: [PATCH 05/21] fix: :bug: fix newHead missing enacted block --- client/mapping-sync/src/kv/worker.rs | 6 +- client/mapping-sync/src/sql/mod.rs | 89 ++++++++++++++-------------- client/rpc/src/eth_pubsub.rs | 1 - 3 files changed, 50 insertions(+), 46 deletions(-) diff --git a/client/mapping-sync/src/kv/worker.rs b/client/mapping-sync/src/kv/worker.rs index af4bee1ecb..266008d635 100644 --- a/client/mapping-sync/src/kv/worker.rs +++ b/client/mapping-sync/src/kv/worker.rs @@ -143,11 +143,15 @@ where .iter() .map(|hash_and_number| hash_and_number.hash) .collect(); - let enacted = tree_route + // tree_route.enacted() returns blocks from common ancestor (exclusive) + // to new best (EXCLUSIVE). We need to include the new best block + // (notification.hash) to get the complete enacted list. + let mut enacted: Vec<_> = tree_route .enacted() .iter() .map(|hash_and_number| hash_and_number.hash) .collect(); + enacted.push(notification.hash); ReorgInfo { common_ancestor: tree_route.common_block().hash, retracted, diff --git a/client/mapping-sync/src/sql/mod.rs b/client/mapping-sync/src/sql/mod.rs index 4e9e39612a..cfe08c9849 100644 --- a/client/mapping-sync/src/sql/mod.rs +++ b/client/mapping-sync/src/sql/mod.rs @@ -31,6 +31,14 @@ use fp_rpc::EthereumRuntimeRPCApi; use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, ReorgInfo, SyncStrategy}; +/// Reorg information passed between commands. +#[derive(Debug, Clone)] +pub struct ReorgData { + pub common_ancestor: H256, + pub enacted: Vec, + pub retracted: Vec, +} + /// Defines the commands for the sync worker. #[derive(Debug)] pub enum WorkerCommand { @@ -39,13 +47,13 @@ pub enum WorkerCommand { /// Index leaves. IndexLeaves(Vec), /// Index the best block known so far via import notifications. - /// When `emit_notification` is true, also sends a pubsub notification. + /// When `reorg_data` is Some, emits a pubsub notification with reorg info after indexing. + /// When `reorg_data` is None, emits a simple new block notification. IndexBestBlock { block_hash: H256, - emit_notification: bool, + reorg_data: Option, }, /// Canonicalize the enacted and retracted blocks reported via import notifications. - /// This also triggers pubsub notifications with reorg information. Canonicalize { common: H256, enacted: Vec, @@ -129,7 +137,7 @@ where } WorkerCommand::IndexBestBlock { block_hash, - emit_notification, + reorg_data, } => { index_canonical_block_and_ancestors( client.clone(), @@ -138,15 +146,18 @@ where block_hash, ) .await; - if emit_notification { - let sinks = &mut pubsub_notification_sinks.lock(); - for sink in sinks.iter() { - let _ = sink.unbounded_send(EthereumBlockNotification { - is_new_best: true, - hash: block_hash, - reorg_info: None, - }); - } + // Emit notification after indexing so blocks are available via storage_override + let sinks = &mut pubsub_notification_sinks.lock(); + for sink in sinks.iter() { + let _ = sink.unbounded_send(EthereumBlockNotification { + is_new_best: true, + hash: block_hash, + reorg_info: reorg_data.as_ref().map(|data| ReorgInfo { + common_ancestor: data.common_ancestor, + retracted: data.retracted.clone(), + enacted: data.enacted.clone(), + }), + }); } } WorkerCommand::Canonicalize { @@ -154,28 +165,9 @@ where enacted, retracted, } => { - canonicalize_blocks( - indexer_backend.clone(), - common, - enacted.clone(), - retracted.clone(), - ) - .await; - // Emit pubsub notification with reorg info for the new best block - if let Some(&new_best) = enacted.last() { - let sinks = &mut pubsub_notification_sinks.lock(); - for sink in sinks.iter() { - let _ = sink.unbounded_send(EthereumBlockNotification { - is_new_best: true, - hash: new_best, - reorg_info: Some(ReorgInfo { - common_ancestor: common, - retracted: retracted.clone(), - enacted: enacted.clone(), - }), - }); - } - } + canonicalize_blocks(indexer_backend.clone(), common, enacted, retracted) + .await; + // Notification is emitted by IndexBestBlock after indexing completes } WorkerCommand::CheckIndexedBlocks => { // Fix any indexed blocks that did not have their logs indexed @@ -260,7 +252,7 @@ where notification.is_new_best, ); if notification.is_new_best { - let has_reorg = if let Some(ref tree_route) = notification.tree_route { + let reorg_data = if let Some(ref tree_route) = notification.tree_route { log::debug!( target: "frontier-sql", "🔀 Re-org happened at new best {}, proceeding to canonicalize db", @@ -271,28 +263,37 @@ where .iter() .map(|hash_and_number| hash_and_number.hash) .collect::>(); - let enacted = tree_route + // tree_route.enacted() returns blocks from common ancestor (exclusive) + // to new best (exclusive). We need to include the new best block + // (notification.hash) to get the complete enacted list. + let mut enacted: Vec<_> = tree_route .enacted() .iter() .map(|hash_and_number| hash_and_number.hash) - .collect::>(); + .collect(); + enacted.push(notification.hash); let common = tree_route.common_block().hash; tx.send(WorkerCommand::Canonicalize { common, + enacted: enacted.clone(), + retracted: retracted.clone(), + }).await.ok(); + Some(ReorgData { + common_ancestor: common, enacted, retracted, - }).await.ok(); - true + }) } else { - false + None }; - // Index the best block. Only emit notification if there was no reorg, - // since Canonicalize already emits the notification with reorg info. + // Index the best block and emit notification after indexing completes. + // This ensures blocks are available via storage_override when the + // notification is processed. tx.send(WorkerCommand::IndexBestBlock { block_hash: notification.hash, - emit_notification: !has_reorg, + reorg_data, }).await.ok(); } } diff --git a/client/rpc/src/eth_pubsub.rs b/client/rpc/src/eth_pubsub.rs index 373d2f9628..967c6e782b 100644 --- a/client/rpc/src/eth_pubsub.rs +++ b/client/rpc/src/eth_pubsub.rs @@ -127,7 +127,6 @@ where /// may see multiple headers emitted with the same height (block number)." /// /// Returns headers in ascending order (oldest first). - /// Note: `enacted` from tree_route already includes the new best block. fn get_enacted_headers(&self, enacted: &[B::Hash]) -> Vec { enacted .iter() From e21e8ab27b2fc09900176dab1dfd57b7d4fd0319 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Wed, 17 Dec 2025 18:10:15 +0200 Subject: [PATCH 06/21] test: :white_check_mark: test deeper forks --- ts-tests/tests/test-newheads-compliance.ts | 103 +++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/ts-tests/tests/test-newheads-compliance.ts b/ts-tests/tests/test-newheads-compliance.ts index 0ae775815f..c491475a9c 100644 --- a/ts-tests/tests/test-newheads-compliance.ts +++ b/ts-tests/tests/test-newheads-compliance.ts @@ -234,4 +234,107 @@ describeWithFrontierWs("Frontier RPC (newHeads Compliance)", (context) => { const duplicateHeights = Object.entries(heightCounts).filter(([_, count]) => count > 1); console.log(`Heights with multiple headers:`, duplicateHeights); }).timeout(80000); + + step("newHeads should handle deep forks with multiple enacted blocks", async function () { + // Test a deeper reorg scenario: + // Original chain: A1 -> A2 -> A3 -> A4 + // Fork from A1: A1 -> B2 -> B3 -> B4 -> B5 -> B6 + // This retracts 3 blocks (A2, A3, A4) and enacts 5 blocks (B2, B3, B4, B5, B6) + + subscription = context.web3.eth.subscribe("newBlockHeaders", function (error, result) {}); + + const headers: any[] = []; + let dataResolve = null; + let dataPromise = new Promise((resolve) => { + dataResolve = resolve; + }); + + await new Promise((resolve) => { + subscription.on("connected", function (d: any) { + resolve(); + }); + }); + + subscription.on("data", function (d: any) { + headers.push(d); + // Expect: A1, A2, A3, A4 (4 headers) + // Then reorg emitting B2, B3, B4, B5, B6 (5 headers) + // Total: 9 headers minimum + if (headers.length >= 9) { + subscription.unsubscribe(); + dataResolve(); + } + }); + + // Create original chain: A1 -> A2 -> A3 -> A4 + const a1Hash = await createBlock(false); + const a2Hash = await createBlock(false); + const a3Hash = await createBlock(false); + const a4Hash = await createBlock(false); + + // Create competing chain from A1 that's longer + // Fork: A1 -> B2 -> B3 -> B4 -> B5 -> B6 + const b2Hash = await createBlock(false, a1Hash); + const b3Hash = await createBlock(false, b2Hash); + const b4Hash = await createBlock(false, b3Hash); + const b5Hash = await createBlock(false, b4Hash); + await createBlock(false, b5Hash); // B6 - this triggers the reorg + + await Promise.race([ + dataPromise, + new Promise((_, reject) => setTimeout(() => reject(new Error("Timeout waiting for deep fork headers")), 20000)), + ]).catch(() => { + subscription.unsubscribe(); + }); + + console.log( + `Deep fork test - Received ${headers.length} headers:`, + headers.map((h) => ({ number: h.number, hash: h.hash?.slice(0, 10) })) + ); + + // Count headers at each height + const heightCounts: { [key: number]: number } = {}; + for (const h of headers) { + heightCounts[h.number] = (heightCounts[h.number] || 0) + 1; + } + + // We should have multiple headers at heights where A-chain and B-chain overlap + // A2, A3, A4 are at the same heights as B2, B3, B4 + const duplicateHeights = Object.entries(heightCounts).filter(([_, count]) => count > 1); + console.log(`Heights with multiple headers:`, duplicateHeights); + + // Verify we got headers for all enacted blocks + // The reorg should emit B2, B3, B4, B5, B6 (5 blocks) + expect(headers.length).to.be.greaterThanOrEqual(9, "Should receive headers for original chain + all enacted blocks"); + + // Per Ethereum spec, during a reorg the new chain headers are emitted in ascending order. + // However, they may be at heights we already saw (hence "multiple headers at same height"). + // The overall sequence might look like: 14, 15, 16, 17, [reorg: 15, 16, 17, 18, 19] + // So we verify that the enacted blocks (after the reorg point) are in ascending order. + + // Find where heights start decreasing (reorg point) + let reorgIndex = -1; + for (let i = 1; i < headers.length; i++) { + if (headers[i].number < headers[i - 1].number) { + reorgIndex = i; + break; + } + } + + if (reorgIndex > 0) { + // Verify enacted blocks after reorg are in ascending order + for (let i = reorgIndex + 1; i < headers.length; i++) { + expect(headers[i].number).to.be.greaterThanOrEqual( + headers[i - 1].number, + "Enacted blocks during reorg should be in ascending order" + ); + } + } + + // Verify we have at least 3 heights with duplicates (A2/B2, A3/B3, A4/B4) + expect(duplicateHeights.length).to.be.greaterThanOrEqual( + 3, + "Should have multiple headers at overlapping heights during deep reorg" + ); + }).timeout(120000); }); From c4ae8cfb8e88b1784811665d4ade394a173c7ae8 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Wed, 17 Dec 2025 19:13:19 +0200 Subject: [PATCH 07/21] style: :art: fmt --- ts-tests/tests/test-newheads-compliance.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/ts-tests/tests/test-newheads-compliance.ts b/ts-tests/tests/test-newheads-compliance.ts index c491475a9c..e3ccca1169 100644 --- a/ts-tests/tests/test-newheads-compliance.ts +++ b/ts-tests/tests/test-newheads-compliance.ts @@ -282,7 +282,9 @@ describeWithFrontierWs("Frontier RPC (newHeads Compliance)", (context) => { await Promise.race([ dataPromise, - new Promise((_, reject) => setTimeout(() => reject(new Error("Timeout waiting for deep fork headers")), 20000)), + new Promise((_, reject) => + setTimeout(() => reject(new Error("Timeout waiting for deep fork headers")), 20000) + ), ]).catch(() => { subscription.unsubscribe(); }); @@ -305,7 +307,10 @@ describeWithFrontierWs("Frontier RPC (newHeads Compliance)", (context) => { // Verify we got headers for all enacted blocks // The reorg should emit B2, B3, B4, B5, B6 (5 blocks) - expect(headers.length).to.be.greaterThanOrEqual(9, "Should receive headers for original chain + all enacted blocks"); + expect(headers.length).to.be.greaterThanOrEqual( + 9, + "Should receive headers for original chain + all enacted blocks" + ); // Per Ethereum spec, during a reorg the new chain headers are emitted in ascending order. // However, they may be at heights we already saw (hence "multiple headers at same height"). From 44e751759a12489b5970cc81f31279f45938a668 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Thu, 18 Dec 2025 12:37:33 +0200 Subject: [PATCH 08/21] fix: :bug: sort enacted blocks --- client/rpc/src/eth_pubsub.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/client/rpc/src/eth_pubsub.rs b/client/rpc/src/eth_pubsub.rs index 967c6e782b..fd7705f532 100644 --- a/client/rpc/src/eth_pubsub.rs +++ b/client/rpc/src/eth_pubsub.rs @@ -128,11 +128,18 @@ where /// /// Returns headers in ascending order (oldest first). fn get_enacted_headers(&self, enacted: &[B::Hash]) -> Vec { - enacted + // Fetch all blocks and collect them with their block numbers for sorting + + let mut blocks: Vec<_> = enacted .iter() .filter_map(|hash| self.storage_override.current_block(*hash)) - .map(PubSubResult::header) - .collect() + .collect(); + + // Sort by block number in ascending order (oldest first) + // This ensures clients receive parent blocks before child blocks + blocks.sort_by_key(|block| block.header.number); + + blocks.into_iter().map(PubSubResult::header).collect() } fn notify_logs( From 2945ddaf0538d18d56772e348c205f37f86b32ed Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Thu, 18 Dec 2025 15:12:31 +0200 Subject: [PATCH 09/21] fix: :bug: guarantee async sends ordering --- client/rpc/src/eth_pubsub.rs | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/client/rpc/src/eth_pubsub.rs b/client/rpc/src/eth_pubsub.rs index fd7705f532..8e7751504f 100644 --- a/client/rpc/src/eth_pubsub.rs +++ b/client/rpc/src/eth_pubsub.rs @@ -267,9 +267,21 @@ where // Per Ethereum spec, when a reorg occurs, we must emit all headers // for the new canonical chain. The reorg_info field in the notification // contains the enacted blocks when a reorg occurred. - let stream = block_notification_stream.filter_map(move |notification| { + // + // IMPORTANT: We send headers sequentially (one at a time, waiting for + // each send to complete) to guarantee correct ordering. Using + // pipe_from_stream with flat_map can cause race conditions where + // multiple items arrive faster than they're sent, leading to + // non-deterministic ordering due to future::select behavior. + let Ok(sink) = pending.accept().await else { + return; + }; + let subscription = Subscription::from(sink); + + let mut stream = block_notification_stream; + while let Some(notification) = stream.next().await { if !notification.is_new_best { - return future::ready(None); + continue; } // Check if this block came from a reorg @@ -289,23 +301,17 @@ where { vec![PubSubResult::header(block)] } else { - return future::ready(None); + continue; } }; - if headers.is_empty() { - return future::ready(None); + // Send each header sequentially to guarantee order + for header in headers { + if subscription.send(&header).await.is_err() { + return; + } } - - future::ready(Some(headers)) - }); - - // Flatten the Vec into individual PubSubResult items - let flat_stream = stream.flat_map(futures::stream::iter); - - PendingSubscription::from(pending) - .pipe_from_stream(flat_stream, BoundedVecDeque::new(16)) - .await + } } Kind::Logs => { let stream = block_notification_stream From 6d644a6088a7f6e8c9c4cd7abdf2fe6bc9688795 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Thu, 18 Dec 2025 15:17:15 +0200 Subject: [PATCH 10/21] style: :art: fmt --- client/rpc/src/eth_pubsub.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/client/rpc/src/eth_pubsub.rs b/client/rpc/src/eth_pubsub.rs index 8e7751504f..b6427a0c08 100644 --- a/client/rpc/src/eth_pubsub.rs +++ b/client/rpc/src/eth_pubsub.rs @@ -129,7 +129,6 @@ where /// Returns headers in ascending order (oldest first). fn get_enacted_headers(&self, enacted: &[B::Hash]) -> Vec { // Fetch all blocks and collect them with their block numbers for sorting - let mut blocks: Vec<_> = enacted .iter() .filter_map(|hash| self.storage_override.current_block(*hash)) From 6d93dbfd8e31efe672324eef379b7865c38a0f6b Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Thu, 18 Dec 2025 17:27:20 +0200 Subject: [PATCH 11/21] Revert "style: :art: fmt" This reverts commit 6d644a6088a7f6e8c9c4cd7abdf2fe6bc9688795. --- client/rpc/src/eth_pubsub.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/client/rpc/src/eth_pubsub.rs b/client/rpc/src/eth_pubsub.rs index b6427a0c08..8e7751504f 100644 --- a/client/rpc/src/eth_pubsub.rs +++ b/client/rpc/src/eth_pubsub.rs @@ -129,6 +129,7 @@ where /// Returns headers in ascending order (oldest first). fn get_enacted_headers(&self, enacted: &[B::Hash]) -> Vec { // Fetch all blocks and collect them with their block numbers for sorting + let mut blocks: Vec<_> = enacted .iter() .filter_map(|hash| self.storage_override.current_block(*hash)) From f76a514470c86e74d5b655dab08c3c31bea0e4d4 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Thu, 18 Dec 2025 17:28:16 +0200 Subject: [PATCH 12/21] Revert "fix: :bug: guarantee async sends ordering" This reverts commit 2945ddaf0538d18d56772e348c205f37f86b32ed. --- client/rpc/src/eth_pubsub.rs | 36 +++++++++++++++--------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/client/rpc/src/eth_pubsub.rs b/client/rpc/src/eth_pubsub.rs index 8e7751504f..fd7705f532 100644 --- a/client/rpc/src/eth_pubsub.rs +++ b/client/rpc/src/eth_pubsub.rs @@ -267,21 +267,9 @@ where // Per Ethereum spec, when a reorg occurs, we must emit all headers // for the new canonical chain. The reorg_info field in the notification // contains the enacted blocks when a reorg occurred. - // - // IMPORTANT: We send headers sequentially (one at a time, waiting for - // each send to complete) to guarantee correct ordering. Using - // pipe_from_stream with flat_map can cause race conditions where - // multiple items arrive faster than they're sent, leading to - // non-deterministic ordering due to future::select behavior. - let Ok(sink) = pending.accept().await else { - return; - }; - let subscription = Subscription::from(sink); - - let mut stream = block_notification_stream; - while let Some(notification) = stream.next().await { + let stream = block_notification_stream.filter_map(move |notification| { if !notification.is_new_best { - continue; + return future::ready(None); } // Check if this block came from a reorg @@ -301,17 +289,23 @@ where { vec![PubSubResult::header(block)] } else { - continue; + return future::ready(None); } }; - // Send each header sequentially to guarantee order - for header in headers { - if subscription.send(&header).await.is_err() { - return; - } + if headers.is_empty() { + return future::ready(None); } - } + + future::ready(Some(headers)) + }); + + // Flatten the Vec into individual PubSubResult items + let flat_stream = stream.flat_map(futures::stream::iter); + + PendingSubscription::from(pending) + .pipe_from_stream(flat_stream, BoundedVecDeque::new(16)) + .await } Kind::Logs => { let stream = block_notification_stream From cd9a8c1e6ad446bca08d6c39415d1adb2429f2a8 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Thu, 18 Dec 2025 17:28:34 +0200 Subject: [PATCH 13/21] Revert "fix: :bug: sort enacted blocks" This reverts commit 44e751759a12489b5970cc81f31279f45938a668. --- client/rpc/src/eth_pubsub.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/client/rpc/src/eth_pubsub.rs b/client/rpc/src/eth_pubsub.rs index fd7705f532..967c6e782b 100644 --- a/client/rpc/src/eth_pubsub.rs +++ b/client/rpc/src/eth_pubsub.rs @@ -128,18 +128,11 @@ where /// /// Returns headers in ascending order (oldest first). fn get_enacted_headers(&self, enacted: &[B::Hash]) -> Vec { - // Fetch all blocks and collect them with their block numbers for sorting - - let mut blocks: Vec<_> = enacted + enacted .iter() .filter_map(|hash| self.storage_override.current_block(*hash)) - .collect(); - - // Sort by block number in ascending order (oldest first) - // This ensures clients receive parent blocks before child blocks - blocks.sort_by_key(|block| block.header.number); - - blocks.into_iter().map(PubSubResult::header).collect() + .map(PubSubResult::header) + .collect() } fn notify_logs( From 44b7f00ad3371c7d27f2c690a5ceb182ba8cf2be Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Fri, 19 Dec 2025 15:19:39 +0200 Subject: [PATCH 14/21] ci: :construction_worker: test sql backend in CI --- Makefile | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Makefile b/Makefile index f3f20b89f0..a9a1bb8618 100644 --- a/Makefile +++ b/Makefile @@ -47,10 +47,14 @@ build-release: test: cargo test --lib --all cargo test --lib --all --features=runtime-benchmarks + # Run fc-mapping-sync tests with SQL feature to ensure both backends are tested + cargo test --lib -p fc-mapping-sync --features=sql # Run all unit tests with release profile test-release: cargo test --release --lib --all cargo test --release --lib --all --features=runtime-benchmarks + # Run fc-mapping-sync tests with SQL feature to ensure both backends are tested + cargo test --release --lib -p fc-mapping-sync --features=sql .PHONY: integration-test integration-test-lint # Check code format and lint of integration tests From dbd053746197d9a2964a4690ddf34de1f2ea9bf5 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Fri, 19 Dec 2025 15:58:16 +0200 Subject: [PATCH 15/21] refactor: :recycle: dedupliacte common logic between sql and kv backends --- client/mapping-sync/src/kv/worker.rs | 22 +-------- client/mapping-sync/src/lib.rs | 27 +++++++++++ client/mapping-sync/src/sql/mod.rs | 72 ++++++++-------------------- 3 files changed, 50 insertions(+), 71 deletions(-) diff --git a/client/mapping-sync/src/kv/worker.rs b/client/mapping-sync/src/kv/worker.rs index 266008d635..ee92799907 100644 --- a/client/mapping-sync/src/kv/worker.rs +++ b/client/mapping-sync/src/kv/worker.rs @@ -37,7 +37,7 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use fc_storage::StorageOverride; use fp_rpc::EthereumRuntimeRPCApi; -use crate::{ReorgInfo, SyncStrategy}; +use crate::{extract_reorg_info, ReorgInfo, SyncStrategy}; /// Information tracked at import time for a block that was `is_new_best`. pub struct BestBlockInfo { @@ -138,25 +138,7 @@ where // and reorg info if this block became best as part of a reorg. if notification.is_new_best { let reorg_info = notification.tree_route.as_ref().map(|tree_route| { - let retracted = tree_route - .retracted() - .iter() - .map(|hash_and_number| hash_and_number.hash) - .collect(); - // tree_route.enacted() returns blocks from common ancestor (exclusive) - // to new best (EXCLUSIVE). We need to include the new best block - // (notification.hash) to get the complete enacted list. - let mut enacted: Vec<_> = tree_route - .enacted() - .iter() - .map(|hash_and_number| hash_and_number.hash) - .collect(); - enacted.push(notification.hash); - ReorgInfo { - common_ancestor: tree_route.common_block().hash, - retracted, - enacted, - } + extract_reorg_info(tree_route, notification.hash) }); self.best_at_import.insert( notification.hash, diff --git a/client/mapping-sync/src/lib.rs b/client/mapping-sync/src/lib.rs index 278fd7882d..efb35cfe28 100644 --- a/client/mapping-sync/src/lib.rs +++ b/client/mapping-sync/src/lib.rs @@ -23,6 +23,7 @@ pub mod kv; #[cfg(feature = "sql")] pub mod sql; +use sp_blockchain::TreeRoute; use sp_runtime::traits::Block as BlockT; #[derive(Copy, Clone, Eq, PartialEq)] @@ -57,3 +58,29 @@ pub struct EthereumBlockNotification { /// Optional reorg information. Present when this block became best as part of a reorg. pub reorg_info: Option>, } + +/// Extract reorg information from a tree route. +pub fn extract_reorg_info( + tree_route: &TreeRoute, + new_best_hash: Block::Hash, +) -> ReorgInfo { + let retracted = tree_route + .retracted() + .iter() + .map(|hash_and_number| hash_and_number.hash) + .collect(); + + // enacted() excludes the new best block, so we append it manually. + let mut enacted: Vec<_> = tree_route + .enacted() + .iter() + .map(|hash_and_number| hash_and_number.hash) + .collect(); + enacted.push(new_best_hash); + + ReorgInfo { + common_ancestor: tree_route.common_block().hash, + retracted, + enacted, + } +} diff --git a/client/mapping-sync/src/sql/mod.rs b/client/mapping-sync/src/sql/mod.rs index cfe08c9849..7f1d5dc47c 100644 --- a/client/mapping-sync/src/sql/mod.rs +++ b/client/mapping-sync/src/sql/mod.rs @@ -29,29 +29,21 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto // Frontier use fp_rpc::EthereumRuntimeRPCApi; -use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, ReorgInfo, SyncStrategy}; - -/// Reorg information passed between commands. -#[derive(Debug, Clone)] -pub struct ReorgData { - pub common_ancestor: H256, - pub enacted: Vec, - pub retracted: Vec, -} +use crate::{extract_reorg_info, EthereumBlockNotification, EthereumBlockNotificationSinks, ReorgInfo, SyncStrategy}; /// Defines the commands for the sync worker. #[derive(Debug)] -pub enum WorkerCommand { +pub enum WorkerCommand> { /// Resume indexing from the last indexed canon block. ResumeSync, /// Index leaves. IndexLeaves(Vec), /// Index the best block known so far via import notifications. - /// When `reorg_data` is Some, emits a pubsub notification with reorg info after indexing. - /// When `reorg_data` is None, emits a simple new block notification. + /// When `reorg_info` is Some, emits a pubsub notification with reorg info after indexing. + /// When `reorg_info` is None, emits a simple new block notification. IndexBestBlock { block_hash: H256, - reorg_data: Option, + reorg_info: Option>, }, /// Canonicalize the enacted and retracted blocks reported via import notifications. Canonicalize { @@ -93,7 +85,7 @@ where pubsub_notification_sinks: Arc< EthereumBlockNotificationSinks>, >, - ) -> tokio::sync::mpsc::Sender { + ) -> tokio::sync::mpsc::Sender> { let (tx, mut rx) = tokio::sync::mpsc::channel(100); tokio::task::spawn(async move { while let Some(cmd) = rx.recv().await { @@ -137,7 +129,7 @@ where } WorkerCommand::IndexBestBlock { block_hash, - reorg_data, + reorg_info, } => { index_canonical_block_and_ancestors( client.clone(), @@ -146,19 +138,16 @@ where block_hash, ) .await; - // Emit notification after indexing so blocks are available via storage_override + // Emit notification after indexing so blocks are queryable. let sinks = &mut pubsub_notification_sinks.lock(); - for sink in sinks.iter() { - let _ = sink.unbounded_send(EthereumBlockNotification { + sinks.retain(|sink| { + sink.unbounded_send(EthereumBlockNotification { is_new_best: true, hash: block_hash, - reorg_info: reorg_data.as_ref().map(|data| ReorgInfo { - common_ancestor: data.common_ancestor, - retracted: data.retracted.clone(), - enacted: data.enacted.clone(), - }), - }); - } + reorg_info: reorg_info.clone(), + }) + .is_ok() + }); } WorkerCommand::Canonicalize { common, @@ -252,38 +241,19 @@ where notification.is_new_best, ); if notification.is_new_best { - let reorg_data = if let Some(ref tree_route) = notification.tree_route { + let reorg_info = if let Some(ref tree_route) = notification.tree_route { log::debug!( target: "frontier-sql", "🔀 Re-org happened at new best {}, proceeding to canonicalize db", notification.hash ); - let retracted = tree_route - .retracted() - .iter() - .map(|hash_and_number| hash_and_number.hash) - .collect::>(); - // tree_route.enacted() returns blocks from common ancestor (exclusive) - // to new best (exclusive). We need to include the new best block - // (notification.hash) to get the complete enacted list. - let mut enacted: Vec<_> = tree_route - .enacted() - .iter() - .map(|hash_and_number| hash_and_number.hash) - .collect(); - enacted.push(notification.hash); - - let common = tree_route.common_block().hash; + let info = extract_reorg_info(tree_route, notification.hash); tx.send(WorkerCommand::Canonicalize { - common, - enacted: enacted.clone(), - retracted: retracted.clone(), + common: info.common_ancestor, + enacted: info.enacted.clone(), + retracted: info.retracted.clone(), }).await.ok(); - Some(ReorgData { - common_ancestor: common, - enacted, - retracted, - }) + Some(info) } else { None }; @@ -293,7 +263,7 @@ where // notification is processed. tx.send(WorkerCommand::IndexBestBlock { block_hash: notification.hash, - reorg_data, + reorg_info, }).await.ok(); } } From bfc435c44ce3dbba0318bcb91186b7eb6f5feab5 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Fri, 19 Dec 2025 16:07:22 +0200 Subject: [PATCH 16/21] style: :art: fmt --- client/mapping-sync/src/kv/worker.rs | 7 ++++--- client/mapping-sync/src/sql/mod.rs | 5 ++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/client/mapping-sync/src/kv/worker.rs b/client/mapping-sync/src/kv/worker.rs index ee92799907..304b92de21 100644 --- a/client/mapping-sync/src/kv/worker.rs +++ b/client/mapping-sync/src/kv/worker.rs @@ -137,9 +137,10 @@ where // We store the block number to enable pruning of old entries, // and reorg info if this block became best as part of a reorg. if notification.is_new_best { - let reorg_info = notification.tree_route.as_ref().map(|tree_route| { - extract_reorg_info(tree_route, notification.hash) - }); + let reorg_info = notification + .tree_route + .as_ref() + .map(|tree_route| extract_reorg_info(tree_route, notification.hash)); self.best_at_import.insert( notification.hash, BestBlockInfo { diff --git a/client/mapping-sync/src/sql/mod.rs b/client/mapping-sync/src/sql/mod.rs index 7f1d5dc47c..7ee6e849a8 100644 --- a/client/mapping-sync/src/sql/mod.rs +++ b/client/mapping-sync/src/sql/mod.rs @@ -29,7 +29,10 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto // Frontier use fp_rpc::EthereumRuntimeRPCApi; -use crate::{extract_reorg_info, EthereumBlockNotification, EthereumBlockNotificationSinks, ReorgInfo, SyncStrategy}; +use crate::{ + extract_reorg_info, EthereumBlockNotification, EthereumBlockNotificationSinks, ReorgInfo, + SyncStrategy, +}; /// Defines the commands for the sync worker. #[derive(Debug)] From 4582a9ad203e5350e13f6a244e752ccea83bbc11 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Fri, 19 Dec 2025 17:43:41 +0200 Subject: [PATCH 17/21] refactor: :recycle: uniform notification process across backends --- client/mapping-sync/src/kv/mod.rs | 49 +++++++++++++++--------------- client/mapping-sync/src/lib.rs | 44 +++++++++++++++++++++++++++ client/mapping-sync/src/sql/mod.rs | 44 +++++++++++++++------------ 3 files changed, 92 insertions(+), 45 deletions(-) diff --git a/client/mapping-sync/src/kv/mod.rs b/client/mapping-sync/src/kv/mod.rs index 307413001f..e638e0ad57 100644 --- a/client/mapping-sync/src/kv/mod.rs +++ b/client/mapping-sync/src/kv/mod.rs @@ -35,7 +35,10 @@ use fc_storage::StorageOverride; use fp_consensus::{FindLogError, Hashes, Log, PostLog, PreLog}; use fp_rpc::EthereumRuntimeRPCApi; -use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy}; +use crate::{ + emit_block_notification, BlockNotificationContext, EthereumBlockNotification, + EthereumBlockNotificationSinks, SyncStrategy, +}; use worker::BestBlockInfo; pub fn sync_block>( @@ -218,30 +221,26 @@ where .meta() .write_current_syncing_tips(current_syncing_tips)?; } - // Notify on import and remove closed channels. - // Only notify when the node is not in major syncing. - let sinks = &mut pubsub_notification_sinks.lock(); - sinks.retain(|sink| { - if !sync_oracle.is_major_syncing() { - let hash = operating_header.hash(); - // Use the `is_new_best` status from import time if available. - // This avoids race conditions where the best hash may have changed - // between import and sync time (e.g., during rapid reorgs). - // Fall back to current best hash check for blocks synced during catch-up. - let best_info = best_at_import.remove(&hash); - let is_new_best = best_info.is_some() || client.info().best_hash == hash; - let reorg_info = best_info.and_then(|info| info.reorg_info); - sink.unbounded_send(EthereumBlockNotification { - is_new_best, - hash, - reorg_info, - }) - .is_ok() - } else { - // Remove from the pool if in major syncing. - false - } - }); + // Notify on import and remove closed channels using the unified notification mechanism. + let hash = operating_header.hash(); + // Use the `is_new_best` status from import time if available. + // This avoids race conditions where the best hash may have changed + // between import and sync time (e.g., during rapid reorgs). + // Fall back to current best hash check for blocks synced during catch-up. + let best_info = best_at_import.remove(&hash); + let is_new_best = best_info.is_some() || client.info().best_hash == hash; + let reorg_info = best_info.and_then(|info| info.reorg_info); + + emit_block_notification( + pubsub_notification_sinks.as_ref(), + sync_oracle.as_ref(), + BlockNotificationContext { + hash, + is_new_best, + reorg_info, + }, + ); + Ok(true) } diff --git a/client/mapping-sync/src/lib.rs b/client/mapping-sync/src/lib.rs index efb35cfe28..7f3f22b853 100644 --- a/client/mapping-sync/src/lib.rs +++ b/client/mapping-sync/src/lib.rs @@ -24,6 +24,7 @@ pub mod kv; pub mod sql; use sp_blockchain::TreeRoute; +use sp_consensus::SyncOracle; use sp_runtime::traits::Block as BlockT; #[derive(Copy, Clone, Eq, PartialEq)] @@ -84,3 +85,46 @@ pub fn extract_reorg_info( enacted, } } + +/// Context for emitting block notifications. +/// Contains all information needed to emit a notification consistently +/// across both KV and SQL backends. +pub struct BlockNotificationContext { + /// The block hash being notified about. + pub hash: Block::Hash, + /// Whether this block is the new best block. + pub is_new_best: bool, + /// Optional reorg information if this block became best as part of a reorg. + pub reorg_info: Option>, +} + +/// Emit block notification to all registered sinks. +/// +/// This function provides a unified notification mechanism for both KV and SQL backends: +/// - Clears all sinks when major syncing (to prevent stale subscriptions) +/// - Sends notification to all sinks and removes closed sinks when not syncing +/// +/// Both backends should call this function after completing block sync/indexing +/// to ensure consistent notification behavior regardless of the storage backend used. +pub fn emit_block_notification( + pubsub_notification_sinks: &EthereumBlockNotificationSinks>, + sync_oracle: &dyn SyncOracle, + context: BlockNotificationContext, +) { + let sinks = &mut pubsub_notification_sinks.lock(); + + if sync_oracle.is_major_syncing() { + // Remove all sinks when major syncing to prevent stale subscriptions + sinks.clear(); + return; + } + + sinks.retain(|sink| { + sink.unbounded_send(EthereumBlockNotification { + is_new_best: context.is_new_best, + hash: context.hash, + reorg_info: context.reorg_info.clone(), + }) + .is_ok() + }); +} diff --git a/client/mapping-sync/src/sql/mod.rs b/client/mapping-sync/src/sql/mod.rs index 7ee6e849a8..67dbec12c8 100644 --- a/client/mapping-sync/src/sql/mod.rs +++ b/client/mapping-sync/src/sql/mod.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{ops::DerefMut, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use futures::prelude::*; // Substrate @@ -30,8 +30,8 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto use fp_rpc::EthereumRuntimeRPCApi; use crate::{ - extract_reorg_info, EthereumBlockNotification, EthereumBlockNotificationSinks, ReorgInfo, - SyncStrategy, + emit_block_notification, extract_reorg_info, BlockNotificationContext, + EthereumBlockNotification, EthereumBlockNotificationSinks, ReorgInfo, SyncStrategy, }; /// Defines the commands for the sync worker. @@ -42,10 +42,12 @@ pub enum WorkerCommand> { /// Index leaves. IndexLeaves(Vec), /// Index the best block known so far via import notifications. - /// When `reorg_info` is Some, emits a pubsub notification with reorg info after indexing. - /// When `reorg_info` is None, emits a simple new block notification. + /// Emits a pubsub notification after indexing with the provided `is_new_best` status. + /// When `reorg_info` is Some, the notification includes reorg information. IndexBestBlock { block_hash: H256, + /// Whether this block was the new best at import time. + is_new_best: bool, reorg_info: Option>, }, /// Canonicalize the enacted and retracted blocks reported via import notifications. @@ -86,8 +88,9 @@ where substrate_backend: Arc, indexer_backend: Arc>, pubsub_notification_sinks: Arc< - EthereumBlockNotificationSinks>, + EthereumBlockNotificationSinks>, >, + sync_oracle: Arc, ) -> tokio::sync::mpsc::Sender> { let (tx, mut rx) = tokio::sync::mpsc::channel(100); tokio::task::spawn(async move { @@ -132,6 +135,7 @@ where } WorkerCommand::IndexBestBlock { block_hash, + is_new_best, reorg_info, } => { index_canonical_block_and_ancestors( @@ -142,15 +146,17 @@ where ) .await; // Emit notification after indexing so blocks are queryable. - let sinks = &mut pubsub_notification_sinks.lock(); - sinks.retain(|sink| { - sink.unbounded_send(EthereumBlockNotification { - is_new_best: true, + // Uses the unified notification mechanism for consistent behavior + // with the KV backend. + emit_block_notification( + pubsub_notification_sinks.as_ref(), + sync_oracle.as_ref(), + BlockNotificationContext { hash: block_hash, - reorg_info: reorg_info.clone(), - }) - .is_ok() - }); + is_new_best, + reorg_info, + }, + ); } WorkerCommand::Canonicalize { common, @@ -203,6 +209,7 @@ where substrate_backend.clone(), indexer_backend.clone(), pubsub_notification_sinks.clone(), + sync_oracle.clone(), ) .await; @@ -227,12 +234,8 @@ where if let Ok(leaves) = substrate_backend.blockchain().leaves() { tx.send(WorkerCommand::IndexLeaves(leaves)).await.ok(); } - if sync_oracle.is_major_syncing() { - let sinks = &mut pubsub_notification_sinks.lock(); - if !sinks.is_empty() { - *sinks.deref_mut() = vec![]; - } - } + // Note: is_major_syncing() sink clearing is now handled by + // emit_block_notification() for consistency with KV backend. } notification = notifications.next() => if let Some(notification) = notification { log::debug!( @@ -266,6 +269,7 @@ where // notification is processed. tx.send(WorkerCommand::IndexBestBlock { block_hash: notification.hash, + is_new_best: notification.is_new_best, reorg_info, }).await.ok(); } From 0ccc2d0360a514c0cde2b06d238a32098c3fb174 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Fri, 19 Dec 2025 19:24:07 +0200 Subject: [PATCH 18/21] docs: :memo: remove comments --- client/mapping-sync/src/sql/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/client/mapping-sync/src/sql/mod.rs b/client/mapping-sync/src/sql/mod.rs index 67dbec12c8..00da08749f 100644 --- a/client/mapping-sync/src/sql/mod.rs +++ b/client/mapping-sync/src/sql/mod.rs @@ -234,8 +234,6 @@ where if let Ok(leaves) = substrate_backend.blockchain().leaves() { tx.send(WorkerCommand::IndexLeaves(leaves)).await.ok(); } - // Note: is_major_syncing() sink clearing is now handled by - // emit_block_notification() for consistency with KV backend. } notification = notifications.next() => if let Some(notification) = notification { log::debug!( From c95ff80415933d8c5753268a031e3823a9ded38c Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Wed, 7 Jan 2026 17:26:46 +0200 Subject: [PATCH 19/21] refactor: :recycle: add defensive check --- client/mapping-sync/src/lib.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/client/mapping-sync/src/lib.rs b/client/mapping-sync/src/lib.rs index 7f3f22b853..b5a8b6e602 100644 --- a/client/mapping-sync/src/lib.rs +++ b/client/mapping-sync/src/lib.rs @@ -71,13 +71,18 @@ pub fn extract_reorg_info( .map(|hash_and_number| hash_and_number.hash) .collect(); - // enacted() excludes the new best block, so we append it manually. + // tree_route is "from old best to new best parent", so enacted() excludes + // the new best block itself. We append it manually, with a defensive check + // in case the TreeRoute implementation changes in the future. let mut enacted: Vec<_> = tree_route .enacted() .iter() .map(|hash_and_number| hash_and_number.hash) .collect(); - enacted.push(new_best_hash); + + if enacted.last() != Some(&new_best_hash) { + enacted.push(new_best_hash); + } ReorgInfo { common_ancestor: tree_route.common_block().hash, From 6e3404894370ef80ec41385caeeefc30e5eb5f68 Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Wed, 7 Jan 2026 18:06:38 +0200 Subject: [PATCH 20/21] fix: :bug: properly distinguish enacted blocks from new best --- client/mapping-sync/src/kv/worker.rs | 10 ++--- client/mapping-sync/src/lib.rs | 63 ++++++++++++++-------------- client/mapping-sync/src/sql/mod.rs | 27 ++++++++---- client/rpc/src/eth_pubsub.rs | 14 ++++--- 4 files changed, 64 insertions(+), 50 deletions(-) diff --git a/client/mapping-sync/src/kv/worker.rs b/client/mapping-sync/src/kv/worker.rs index 304b92de21..cf96acb8a3 100644 --- a/client/mapping-sync/src/kv/worker.rs +++ b/client/mapping-sync/src/kv/worker.rs @@ -37,7 +37,7 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use fc_storage::StorageOverride; use fp_rpc::EthereumRuntimeRPCApi; -use crate::{extract_reorg_info, ReorgInfo, SyncStrategy}; +use crate::{ReorgInfo, SyncStrategy}; /// Information tracked at import time for a block that was `is_new_best`. pub struct BestBlockInfo { @@ -137,10 +137,10 @@ where // We store the block number to enable pruning of old entries, // and reorg info if this block became best as part of a reorg. if notification.is_new_best { - let reorg_info = notification - .tree_route - .as_ref() - .map(|tree_route| extract_reorg_info(tree_route, notification.hash)); + // For notification: include new_best_hash per Ethereum spec. + let reorg_info = notification.tree_route.as_ref().map(|tree_route| { + ReorgInfo::from_tree_route(tree_route, notification.hash) + }); self.best_at_import.insert( notification.hash, BestBlockInfo { diff --git a/client/mapping-sync/src/lib.rs b/client/mapping-sync/src/lib.rs index b5a8b6e602..24d824e0c0 100644 --- a/client/mapping-sync/src/lib.rs +++ b/client/mapping-sync/src/lib.rs @@ -48,8 +48,38 @@ pub struct ReorgInfo { pub common_ancestor: Block::Hash, /// Blocks that were removed from the canonical chain (old fork). pub retracted: Vec, - /// Blocks that were added to the canonical chain (new fork). + /// Blocks that were added to the canonical chain (new fork), excluding `new_best`. pub enacted: Vec, + /// The new best block hash that triggered this reorg. + pub new_best: Block::Hash, +} + +impl ReorgInfo { + /// Create reorg info from a tree route and the new best block hash. + /// + /// `tree_route` is "from old best to new best parent", so `enacted()` excludes + /// the new best block itself. The `new_best` is stored separately and callers + /// should handle emitting it after the enacted blocks. + pub fn from_tree_route(tree_route: &TreeRoute, new_best: Block::Hash) -> Self { + let retracted = tree_route + .retracted() + .iter() + .map(|hash_and_number| hash_and_number.hash) + .collect(); + + let enacted = tree_route + .enacted() + .iter() + .map(|hash_and_number| hash_and_number.hash) + .collect(); + + Self { + common_ancestor: tree_route.common_block().hash, + retracted, + enacted, + new_best, + } + } } #[derive(Clone, Debug, Eq, PartialEq)] @@ -60,37 +90,6 @@ pub struct EthereumBlockNotification { pub reorg_info: Option>, } -/// Extract reorg information from a tree route. -pub fn extract_reorg_info( - tree_route: &TreeRoute, - new_best_hash: Block::Hash, -) -> ReorgInfo { - let retracted = tree_route - .retracted() - .iter() - .map(|hash_and_number| hash_and_number.hash) - .collect(); - - // tree_route is "from old best to new best parent", so enacted() excludes - // the new best block itself. We append it manually, with a defensive check - // in case the TreeRoute implementation changes in the future. - let mut enacted: Vec<_> = tree_route - .enacted() - .iter() - .map(|hash_and_number| hash_and_number.hash) - .collect(); - - if enacted.last() != Some(&new_best_hash) { - enacted.push(new_best_hash); - } - - ReorgInfo { - common_ancestor: tree_route.common_block().hash, - retracted, - enacted, - } -} - /// Context for emitting block notifications. /// Contains all information needed to emit a notification consistently /// across both KV and SQL backends. diff --git a/client/mapping-sync/src/sql/mod.rs b/client/mapping-sync/src/sql/mod.rs index 00da08749f..c7d1a4b735 100644 --- a/client/mapping-sync/src/sql/mod.rs +++ b/client/mapping-sync/src/sql/mod.rs @@ -30,8 +30,8 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto use fp_rpc::EthereumRuntimeRPCApi; use crate::{ - emit_block_notification, extract_reorg_info, BlockNotificationContext, - EthereumBlockNotification, EthereumBlockNotificationSinks, ReorgInfo, SyncStrategy, + emit_block_notification, BlockNotificationContext, EthereumBlockNotification, + EthereumBlockNotificationSinks, ReorgInfo, SyncStrategy, }; /// Defines the commands for the sync worker. @@ -251,13 +251,26 @@ where "🔀 Re-org happened at new best {}, proceeding to canonicalize db", notification.hash ); - let info = extract_reorg_info(tree_route, notification.hash); + // For Canonicalize: use tree_route directly (new_best handled by IndexBestBlock). + // Note: Including new_best_hash in enacted is harmless (no-op if not indexed, + // correct update if indexed) but we keep separation for clarity. + let retracted: Vec<_> = tree_route + .retracted() + .iter() + .map(|hash_and_number| hash_and_number.hash) + .collect(); + let enacted: Vec<_> = tree_route + .enacted() + .iter() + .map(|hash_and_number| hash_and_number.hash) + .collect(); tx.send(WorkerCommand::Canonicalize { - common: info.common_ancestor, - enacted: info.enacted.clone(), - retracted: info.retracted.clone(), + common: tree_route.common_block().hash, + enacted, + retracted, }).await.ok(); - Some(info) + // For notification: include new_best_hash per Ethereum spec. + Some(ReorgInfo::from_tree_route(tree_route, notification.hash)) } else { None }; diff --git a/client/rpc/src/eth_pubsub.rs b/client/rpc/src/eth_pubsub.rs index 967c6e782b..47de38a059 100644 --- a/client/rpc/src/eth_pubsub.rs +++ b/client/rpc/src/eth_pubsub.rs @@ -119,17 +119,18 @@ where } } - /// Get headers for enacted blocks during a reorg. + /// Get headers for enacted blocks during a reorg, including the new best block. /// /// Per Ethereum spec (https://github.com/ethereum/go-ethereum/wiki/RPC-PUB-SUB#newheads): /// "When a chain reorganization occurs, this subscription will emit an event /// containing all new headers (blocks) for the new chain. This means that you /// may see multiple headers emitted with the same height (block number)." /// - /// Returns headers in ascending order (oldest first). - fn get_enacted_headers(&self, enacted: &[B::Hash]) -> Vec { + /// Returns headers in ascending order (oldest first), with `new_best` last. + fn get_reorg_headers(&self, enacted: &[B::Hash], new_best: B::Hash) -> Vec { enacted .iter() + .chain(std::iter::once(&new_best)) .filter_map(|hash| self.storage_override.current_block(*hash)) .map(PubSubResult::header) .collect() @@ -269,12 +270,13 @@ where let headers = if let Some(ref reorg_info) = notification.reorg_info { debug!( target: "eth-pubsub", - "Reorg detected: {} blocks retracted, {} blocks enacted", + "Reorg detected: new_best={:?}, {} blocks retracted, {} blocks enacted", + reorg_info.new_best, reorg_info.retracted.len(), reorg_info.enacted.len() ); - // Emit all enacted blocks (already includes the new best block) - pubsub.get_enacted_headers(&reorg_info.enacted) + // Emit all enacted blocks followed by the new best block + pubsub.get_reorg_headers(&reorg_info.enacted, reorg_info.new_best) } else { // Normal case: just emit the new block if let Some(block) = From 220bee40a1116a418d63d7cffe9546b83dc4d20f Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Wed, 7 Jan 2026 18:30:38 +0200 Subject: [PATCH 21/21] refactor: :recycle: reduce code duplication --- client/mapping-sync/src/sql/mod.rs | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/client/mapping-sync/src/sql/mod.rs b/client/mapping-sync/src/sql/mod.rs index c7d1a4b735..f6cfb18039 100644 --- a/client/mapping-sync/src/sql/mod.rs +++ b/client/mapping-sync/src/sql/mod.rs @@ -251,26 +251,14 @@ where "🔀 Re-org happened at new best {}, proceeding to canonicalize db", notification.hash ); - // For Canonicalize: use tree_route directly (new_best handled by IndexBestBlock). - // Note: Including new_best_hash in enacted is harmless (no-op if not indexed, - // correct update if indexed) but we keep separation for clarity. - let retracted: Vec<_> = tree_route - .retracted() - .iter() - .map(|hash_and_number| hash_and_number.hash) - .collect(); - let enacted: Vec<_> = tree_route - .enacted() - .iter() - .map(|hash_and_number| hash_and_number.hash) - .collect(); + let info = ReorgInfo::from_tree_route(tree_route, notification.hash); + // Note: new_best is handled separately by IndexBestBlock. tx.send(WorkerCommand::Canonicalize { - common: tree_route.common_block().hash, - enacted, - retracted, + common: info.common_ancestor, + enacted: info.enacted.clone(), + retracted: info.retracted.clone(), }).await.ok(); - // For notification: include new_best_hash per Ethereum spec. - Some(ReorgInfo::from_tree_route(tree_route, notification.hash)) + Some(info) } else { None };