diff --git a/Makefile b/Makefile index f3f20b89f0..a9a1bb8618 100644 --- a/Makefile +++ b/Makefile @@ -47,10 +47,14 @@ build-release: test: cargo test --lib --all cargo test --lib --all --features=runtime-benchmarks + # Run fc-mapping-sync tests with SQL feature to ensure both backends are tested + cargo test --lib -p fc-mapping-sync --features=sql # Run all unit tests with release profile test-release: cargo test --release --lib --all cargo test --release --lib --all --features=runtime-benchmarks + # Run fc-mapping-sync tests with SQL feature to ensure both backends are tested + cargo test --release --lib -p fc-mapping-sync --features=sql .PHONY: integration-test integration-test-lint # Check code format and lint of integration tests diff --git a/client/mapping-sync/src/kv/mod.rs b/client/mapping-sync/src/kv/mod.rs index 0cd2197b31..e638e0ad57 100644 --- a/client/mapping-sync/src/kv/mod.rs +++ b/client/mapping-sync/src/kv/mod.rs @@ -22,7 +22,7 @@ mod worker; pub use worker::MappingSyncWorker; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; // Substrate use sc_client_api::backend::{Backend, StorageProvider}; @@ -35,7 +35,11 @@ use fc_storage::StorageOverride; use fp_consensus::{FindLogError, Hashes, Log, PostLog, PreLog}; use fp_rpc::EthereumRuntimeRPCApi; -use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy}; +use crate::{ + emit_block_notification, BlockNotificationContext, EthereumBlockNotification, + EthereumBlockNotificationSinks, SyncStrategy, +}; +use worker::BestBlockInfo; pub fn sync_block>( storage_override: Arc>, @@ -155,6 +159,7 @@ pub fn sync_one_block( pubsub_notification_sinks: Arc< EthereumBlockNotificationSinks>, >, + best_at_import: &mut HashMap>, ) -> Result where C: ProvideRuntimeApi, @@ -216,20 +221,26 @@ where .meta() .write_current_syncing_tips(current_syncing_tips)?; } - // Notify on import and remove closed channels. - // Only notify when the node is node in major syncing. - let sinks = &mut pubsub_notification_sinks.lock(); - sinks.retain(|sink| { - if !sync_oracle.is_major_syncing() { - let hash = operating_header.hash(); - let is_new_best = client.info().best_hash == hash; - sink.unbounded_send(EthereumBlockNotification { is_new_best, hash }) - .is_ok() - } else { - // Remove from the pool if in major syncing. - false - } - }); + // Notify on import and remove closed channels using the unified notification mechanism. + let hash = operating_header.hash(); + // Use the `is_new_best` status from import time if available. + // This avoids race conditions where the best hash may have changed + // between import and sync time (e.g., during rapid reorgs). + // Fall back to current best hash check for blocks synced during catch-up. + let best_info = best_at_import.remove(&hash); + let is_new_best = best_info.is_some() || client.info().best_hash == hash; + let reorg_info = best_info.and_then(|info| info.reorg_info); + + emit_block_notification( + pubsub_notification_sinks.as_ref(), + sync_oracle.as_ref(), + BlockNotificationContext { + hash, + is_new_best, + reorg_info, + }, + ); + Ok(true) } @@ -245,6 +256,7 @@ pub fn sync_blocks( pubsub_notification_sinks: Arc< EthereumBlockNotificationSinks>, >, + best_at_import: &mut HashMap>, ) -> Result where C: ProvideRuntimeApi, @@ -265,9 +277,16 @@ where strategy, sync_oracle.clone(), pubsub_notification_sinks.clone(), + best_at_import, )?; } + // Prune old entries from best_at_import to prevent unbounded growth. + // Entries for finalized blocks are no longer needed since finalized blocks + // cannot be reorged and their is_new_best status is irrelevant. + let finalized_number = client.info().finalized_number; + best_at_import.retain(|_, info| info.block_number > finalized_number); + Ok(synced_any) } diff --git a/client/mapping-sync/src/kv/worker.rs b/client/mapping-sync/src/kv/worker.rs index a14f49fcf1..cf96acb8a3 100644 --- a/client/mapping-sync/src/kv/worker.rs +++ b/client/mapping-sync/src/kv/worker.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{pin::Pin, sync::Arc, time::Duration}; +use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}; use futures::{ prelude::*, @@ -37,7 +37,15 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use fc_storage::StorageOverride; use fp_rpc::EthereumRuntimeRPCApi; -use crate::SyncStrategy; +use crate::{ReorgInfo, SyncStrategy}; + +/// Information tracked at import time for a block that was `is_new_best`. +pub struct BestBlockInfo { + /// The block number (for pruning purposes). + pub block_number: ::Number, + /// Reorg info if this block became best as part of a reorganization. + pub reorg_info: Option>, +} pub struct MappingSyncWorker { import_notifications: ImportNotifications, @@ -57,6 +65,13 @@ pub struct MappingSyncWorker { sync_oracle: Arc, pubsub_notification_sinks: Arc>>, + + /// Tracks block hashes that were `is_new_best` at the time of their import notification, + /// along with their block number for pruning purposes and optional reorg info. + /// This is used to correctly determine `is_new_best` when syncing blocks, avoiding race + /// conditions where the best hash may have changed between import and sync time. + /// Entries are pruned when blocks become finalized to prevent unbounded growth. + best_at_import: HashMap>, } impl Unpin for MappingSyncWorker {} @@ -94,6 +109,7 @@ impl MappingSyncWorker { sync_oracle, pubsub_notification_sinks, + best_at_import: HashMap::new(), } } } @@ -114,8 +130,25 @@ where loop { match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) { Poll::Pending => break, - Poll::Ready(Some(_)) => { + Poll::Ready(Some(notification)) => { fire = true; + // Track blocks that were `is_new_best` at import time to avoid race + // conditions when determining `is_new_best` at sync time. + // We store the block number to enable pruning of old entries, + // and reorg info if this block became best as part of a reorg. + if notification.is_new_best { + // For notification: include new_best_hash per Ethereum spec. + let reorg_info = notification.tree_route.as_ref().map(|tree_route| { + ReorgInfo::from_tree_route(tree_route, notification.hash) + }); + self.best_at_import.insert( + notification.hash, + BestBlockInfo { + block_number: *notification.header.number(), + reorg_info, + }, + ); + } } Poll::Ready(None) => return Poll::Ready(None), } @@ -138,7 +171,12 @@ where if fire { self.inner_delay = None; - match crate::kv::sync_blocks( + // Temporarily take ownership of best_at_import to avoid borrow checker issues + // (we can't have both an immutable borrow of self.client and a mutable borrow + // of self.best_at_import at the same time) + let mut best_at_import = std::mem::take(&mut self.best_at_import); + + let result = crate::kv::sync_blocks( self.client.as_ref(), self.substrate_backend.as_ref(), self.storage_override.clone(), @@ -148,7 +186,13 @@ where self.strategy, self.sync_oracle.clone(), self.pubsub_notification_sinks.clone(), - ) { + &mut best_at_import, + ); + + // Restore the best_at_import set + self.best_at_import = best_at_import; + + match result { Ok(have_next) => { self.have_next = have_next; Poll::Ready(Some(())) diff --git a/client/mapping-sync/src/lib.rs b/client/mapping-sync/src/lib.rs index 9b7da90ad7..24d824e0c0 100644 --- a/client/mapping-sync/src/lib.rs +++ b/client/mapping-sync/src/lib.rs @@ -23,6 +23,8 @@ pub mod kv; #[cfg(feature = "sql")] pub mod sql; +use sp_blockchain::TreeRoute; +use sp_consensus::SyncOracle; use sp_runtime::traits::Block as BlockT; #[derive(Copy, Clone, Eq, PartialEq)] @@ -34,8 +36,99 @@ pub enum SyncStrategy { pub type EthereumBlockNotificationSinks = parking_lot::Mutex>>; -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +/// Information about a chain reorganization. +/// +/// When a reorg occurs, this struct contains the blocks that were removed from +/// the canonical chain (retracted) and the blocks that were added (enacted). +/// The `common_ancestor` is the last block that remains canonical in both +/// the old and new chains. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ReorgInfo { + /// The common ancestor block hash between the old and new canonical chains. + pub common_ancestor: Block::Hash, + /// Blocks that were removed from the canonical chain (old fork). + pub retracted: Vec, + /// Blocks that were added to the canonical chain (new fork), excluding `new_best`. + pub enacted: Vec, + /// The new best block hash that triggered this reorg. + pub new_best: Block::Hash, +} + +impl ReorgInfo { + /// Create reorg info from a tree route and the new best block hash. + /// + /// `tree_route` is "from old best to new best parent", so `enacted()` excludes + /// the new best block itself. The `new_best` is stored separately and callers + /// should handle emitting it after the enacted blocks. + pub fn from_tree_route(tree_route: &TreeRoute, new_best: Block::Hash) -> Self { + let retracted = tree_route + .retracted() + .iter() + .map(|hash_and_number| hash_and_number.hash) + .collect(); + + let enacted = tree_route + .enacted() + .iter() + .map(|hash_and_number| hash_and_number.hash) + .collect(); + + Self { + common_ancestor: tree_route.common_block().hash, + retracted, + enacted, + new_best, + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] pub struct EthereumBlockNotification { pub is_new_best: bool, pub hash: Block::Hash, + /// Optional reorg information. Present when this block became best as part of a reorg. + pub reorg_info: Option>, +} + +/// Context for emitting block notifications. +/// Contains all information needed to emit a notification consistently +/// across both KV and SQL backends. +pub struct BlockNotificationContext { + /// The block hash being notified about. + pub hash: Block::Hash, + /// Whether this block is the new best block. + pub is_new_best: bool, + /// Optional reorg information if this block became best as part of a reorg. + pub reorg_info: Option>, +} + +/// Emit block notification to all registered sinks. +/// +/// This function provides a unified notification mechanism for both KV and SQL backends: +/// - Clears all sinks when major syncing (to prevent stale subscriptions) +/// - Sends notification to all sinks and removes closed sinks when not syncing +/// +/// Both backends should call this function after completing block sync/indexing +/// to ensure consistent notification behavior regardless of the storage backend used. +pub fn emit_block_notification( + pubsub_notification_sinks: &EthereumBlockNotificationSinks>, + sync_oracle: &dyn SyncOracle, + context: BlockNotificationContext, +) { + let sinks = &mut pubsub_notification_sinks.lock(); + + if sync_oracle.is_major_syncing() { + // Remove all sinks when major syncing to prevent stale subscriptions + sinks.clear(); + return; + } + + sinks.retain(|sink| { + sink.unbounded_send(EthereumBlockNotification { + is_new_best: context.is_new_best, + hash: context.hash, + reorg_info: context.reorg_info.clone(), + }) + .is_ok() + }); } diff --git a/client/mapping-sync/src/sql/mod.rs b/client/mapping-sync/src/sql/mod.rs index b9c6974612..f6cfb18039 100644 --- a/client/mapping-sync/src/sql/mod.rs +++ b/client/mapping-sync/src/sql/mod.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::{ops::DerefMut, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use futures::prelude::*; // Substrate @@ -29,17 +29,27 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto // Frontier use fp_rpc::EthereumRuntimeRPCApi; -use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrategy}; +use crate::{ + emit_block_notification, BlockNotificationContext, EthereumBlockNotification, + EthereumBlockNotificationSinks, ReorgInfo, SyncStrategy, +}; /// Defines the commands for the sync worker. #[derive(Debug)] -pub enum WorkerCommand { +pub enum WorkerCommand> { /// Resume indexing from the last indexed canon block. ResumeSync, /// Index leaves. IndexLeaves(Vec), /// Index the best block known so far via import notifications. - IndexBestBlock(H256), + /// Emits a pubsub notification after indexing with the provided `is_new_best` status. + /// When `reorg_info` is Some, the notification includes reorg information. + IndexBestBlock { + block_hash: H256, + /// Whether this block was the new best at import time. + is_new_best: bool, + reorg_info: Option>, + }, /// Canonicalize the enacted and retracted blocks reported via import notifications. Canonicalize { common: H256, @@ -78,9 +88,10 @@ where substrate_backend: Arc, indexer_backend: Arc>, pubsub_notification_sinks: Arc< - EthereumBlockNotificationSinks>, + EthereumBlockNotificationSinks>, >, - ) -> tokio::sync::mpsc::Sender { + sync_oracle: Arc, + ) -> tokio::sync::mpsc::Sender> { let (tx, mut rx) = tokio::sync::mpsc::channel(100); tokio::task::spawn(async move { while let Some(cmd) = rx.recv().await { @@ -122,7 +133,11 @@ where .await; } } - WorkerCommand::IndexBestBlock(block_hash) => { + WorkerCommand::IndexBestBlock { + block_hash, + is_new_best, + reorg_info, + } => { index_canonical_block_and_ancestors( client.clone(), substrate_backend.clone(), @@ -130,13 +145,18 @@ where block_hash, ) .await; - let sinks = &mut pubsub_notification_sinks.lock(); - for sink in sinks.iter() { - let _ = sink.unbounded_send(EthereumBlockNotification { - is_new_best: true, + // Emit notification after indexing so blocks are queryable. + // Uses the unified notification mechanism for consistent behavior + // with the KV backend. + emit_block_notification( + pubsub_notification_sinks.as_ref(), + sync_oracle.as_ref(), + BlockNotificationContext { hash: block_hash, - }); - } + is_new_best, + reorg_info, + }, + ); } WorkerCommand::Canonicalize { common, @@ -145,6 +165,7 @@ where } => { canonicalize_blocks(indexer_backend.clone(), common, enacted, retracted) .await; + // Notification is emitted by IndexBestBlock after indexing completes } WorkerCommand::CheckIndexedBlocks => { // Fix any indexed blocks that did not have their logs indexed @@ -188,6 +209,7 @@ where substrate_backend.clone(), indexer_backend.clone(), pubsub_notification_sinks.clone(), + sync_oracle.clone(), ) .await; @@ -212,12 +234,6 @@ where if let Ok(leaves) = substrate_backend.blockchain().leaves() { tx.send(WorkerCommand::IndexLeaves(leaves)).await.ok(); } - if sync_oracle.is_major_syncing() { - let sinks = &mut pubsub_notification_sinks.lock(); - if !sinks.is_empty() { - *sinks.deref_mut() = vec![]; - } - } } notification = notifications.next() => if let Some(notification) = notification { log::debug!( @@ -229,32 +245,32 @@ where notification.is_new_best, ); if notification.is_new_best { - if let Some(tree_route) = notification.tree_route { + let reorg_info = if let Some(ref tree_route) = notification.tree_route { log::debug!( target: "frontier-sql", "🔀 Re-org happened at new best {}, proceeding to canonicalize db", notification.hash ); - let retracted = tree_route - .retracted() - .iter() - .map(|hash_and_number| hash_and_number.hash) - .collect::>(); - let enacted = tree_route - .enacted() - .iter() - .map(|hash_and_number| hash_and_number.hash) - .collect::>(); - - let common = tree_route.common_block().hash; + let info = ReorgInfo::from_tree_route(tree_route, notification.hash); + // Note: new_best is handled separately by IndexBestBlock. tx.send(WorkerCommand::Canonicalize { - common, - enacted, - retracted, + common: info.common_ancestor, + enacted: info.enacted.clone(), + retracted: info.retracted.clone(), }).await.ok(); - } + Some(info) + } else { + None + }; - tx.send(WorkerCommand::IndexBestBlock(notification.hash)).await.ok(); + // Index the best block and emit notification after indexing completes. + // This ensures blocks are available via storage_override when the + // notification is processed. + tx.send(WorkerCommand::IndexBestBlock { + block_hash: notification.hash, + is_new_best: notification.is_new_best, + reorg_info, + }).await.ok(); } } } diff --git a/client/rpc/src/eth_pubsub.rs b/client/rpc/src/eth_pubsub.rs index 5ac30fa7cc..47de38a059 100644 --- a/client/rpc/src/eth_pubsub.rs +++ b/client/rpc/src/eth_pubsub.rs @@ -21,6 +21,7 @@ use std::{marker::PhantomData, sync::Arc}; use ethereum::TransactionV3 as EthereumTransaction; use futures::{future, FutureExt as _, StreamExt as _}; use jsonrpsee::{core::traits::IdProvider, server::PendingSubscriptionSink}; +use log::debug; // Substrate use sc_client_api::{ backend::{Backend, StorageProvider}, @@ -118,16 +119,21 @@ where } } - fn notify_header( - &self, - notification: EthereumBlockNotification, - ) -> future::Ready> { - let res = if notification.is_new_best { - self.storage_override.current_block(notification.hash) - } else { - None - }; - future::ready(res.map(PubSubResult::header)) + /// Get headers for enacted blocks during a reorg, including the new best block. + /// + /// Per Ethereum spec (https://github.com/ethereum/go-ethereum/wiki/RPC-PUB-SUB#newheads): + /// "When a chain reorganization occurs, this subscription will emit an event + /// containing all new headers (blocks) for the new chain. This means that you + /// may see multiple headers emitted with the same height (block number)." + /// + /// Returns headers in ascending order (oldest first), with `new_best` last. + fn get_reorg_headers(&self, enacted: &[B::Hash], new_best: B::Hash) -> Vec { + enacted + .iter() + .chain(std::iter::once(&new_best)) + .filter_map(|hash| self.storage_override.current_block(*hash)) + .map(PubSubResult::header) + .collect() } fn notify_logs( @@ -252,10 +258,48 @@ where let fut = async move { match kind { Kind::NewHeads => { - let stream = block_notification_stream - .filter_map(move |notification| pubsub.notify_header(notification)); + // Per Ethereum spec, when a reorg occurs, we must emit all headers + // for the new canonical chain. The reorg_info field in the notification + // contains the enacted blocks when a reorg occurred. + let stream = block_notification_stream.filter_map(move |notification| { + if !notification.is_new_best { + return future::ready(None); + } + + // Check if this block came from a reorg + let headers = if let Some(ref reorg_info) = notification.reorg_info { + debug!( + target: "eth-pubsub", + "Reorg detected: new_best={:?}, {} blocks retracted, {} blocks enacted", + reorg_info.new_best, + reorg_info.retracted.len(), + reorg_info.enacted.len() + ); + // Emit all enacted blocks followed by the new best block + pubsub.get_reorg_headers(&reorg_info.enacted, reorg_info.new_best) + } else { + // Normal case: just emit the new block + if let Some(block) = + pubsub.storage_override.current_block(notification.hash) + { + vec![PubSubResult::header(block)] + } else { + return future::ready(None); + } + }; + + if headers.is_empty() { + return future::ready(None); + } + + future::ready(Some(headers)) + }); + + // Flatten the Vec into individual PubSubResult items + let flat_stream = stream.flat_map(futures::stream::iter); + PendingSubscription::from(pending) - .pipe_from_stream(stream, BoundedVecDeque::new(16)) + .pipe_from_stream(flat_stream, BoundedVecDeque::new(16)) .await } Kind::Logs => { diff --git a/ts-tests/tests/test-newheads-compliance.ts b/ts-tests/tests/test-newheads-compliance.ts new file mode 100644 index 0000000000..e3ccca1169 --- /dev/null +++ b/ts-tests/tests/test-newheads-compliance.ts @@ -0,0 +1,345 @@ +import { expect } from "chai"; +import { step } from "mocha-steps"; + +import { customRequest, describeWithFrontierWs } from "./util"; + +/** + * Tests for newHeads subscription compliance with Ethereum specification: + * https://github.com/ethereum/go-ethereum/wiki/RPC-PUB-SUB#newheads + * + * Per the spec: + * - "Fires a notification each time a new header is appended to the chain, including chain reorganizations." + * - "In case of a chain reorganization the subscription will emit all new headers for the new chain." + * - "Therefore the subscription can emit multiple headers on the same height." + */ +describeWithFrontierWs("Frontier RPC (newHeads Compliance)", (context) => { + let subscription; + + // Helper to create a block with optional parent hash for forking + async function createBlock(finalize: boolean = true, parentHash: string | null = null) { + const response = await customRequest(context.web3, "engine_createBlock", [true, finalize, parentHash]); + if (!response.result) { + throw new Error(`Unexpected result: ${JSON.stringify(response)}`); + } + await new Promise((resolve) => setTimeout(() => resolve(), 500)); + return response.result.hash; + } + + step("newHeads should include all required Ethereum-spec fields", async function () { + subscription = context.web3.eth.subscribe("newBlockHeaders", function (error, result) {}); + + let data = null; + let dataResolve = null; + let dataPromise = new Promise((resolve) => { + dataResolve = resolve; + }); + + subscription.on("data", function (d: any) { + data = d; + subscription.unsubscribe(); + dataResolve(); + }); + + await createBlock(); + await dataPromise; + + // Verify all Ethereum-spec required fields are present + // https://github.com/ethereum/go-ethereum/wiki/RPC-PUB-SUB#newheads + expect(data).to.have.property("hash"); + expect(data).to.have.property("parentHash"); + expect(data).to.have.property("sha3Uncles"); + expect(data).to.have.property("miner"); + expect(data).to.have.property("stateRoot"); + expect(data).to.have.property("transactionsRoot"); + expect(data).to.have.property("receiptsRoot"); + expect(data).to.have.property("logsBloom"); + expect(data).to.have.property("difficulty"); + expect(data).to.have.property("number"); + expect(data).to.have.property("gasLimit"); + expect(data).to.have.property("gasUsed"); + expect(data).to.have.property("timestamp"); + expect(data).to.have.property("extraData"); + expect(data).to.have.property("nonce"); + + // Verify hash formats + expect(data.hash).to.match(/^0x[0-9a-fA-F]{64}$/); + expect(data.parentHash).to.match(/^0x[0-9a-fA-F]{64}$/); + }).timeout(40000); + + step("newHeads should emit headers in order for normal block production", async function () { + subscription = context.web3.eth.subscribe("newBlockHeaders", function (error, result) {}); + + const headers: any[] = []; + const targetCount = 3; + let dataResolve = null; + let dataPromise = new Promise((resolve) => { + dataResolve = resolve; + }); + + subscription.on("data", function (d: any) { + headers.push(d); + if (headers.length >= targetCount) { + subscription.unsubscribe(); + dataResolve(); + } + }); + + // Create 3 blocks sequentially + for (let i = 0; i < targetCount; i++) { + await createBlock(); + } + await dataPromise; + + // Verify headers are in ascending block number order + for (let i = 1; i < headers.length; i++) { + expect(headers[i].number).to.be.greaterThan(headers[i - 1].number); + // Each block's parent should be the previous block + expect(headers[i].parentHash).to.equal(headers[i - 1].hash); + } + }).timeout(60000); + + step("newHeads should emit multiple headers at same height during chain reorganization", async function () { + // This test verifies the Ethereum spec requirement: + // "In case of a chain reorganization the subscription will emit all new headers for the new chain." + // "Therefore the subscription can emit multiple headers on the same height." + + // Subscribe FIRST so we see all headers including the initial chain + subscription = context.web3.eth.subscribe("newBlockHeaders", function (error, result) {}); + + const headers: any[] = []; + let dataResolve = null; + let dataPromise = new Promise((resolve) => { + dataResolve = resolve; + }); + + await new Promise((resolve) => { + subscription.on("connected", function (d: any) { + resolve(); + }); + }); + + subscription.on("data", function (d: any) { + headers.push(d); + // We expect: block1, block2, then reorg emitting ForkBlock + ForkBlock2, then ForkBlock3 + // That's at least 5 headers, with block2 and ForkBlock at the same height + if (headers.length >= 5) { + subscription.unsubscribe(); + dataResolve(); + } + }); + + // Create base chain: Genesis -> Block1 -> Block2 + const block1Hash = await createBlock(false); // Don't finalize to allow forking + const block2Hash = await createBlock(false); + + // Create a fork from block1 that's longer than the current chain + // IMPORTANT: Must explicitly chain fork blocks, otherwise they build on current best (block2) + // This creates: Genesis -> Block1 -> ForkBlock -> ForkBlock2 -> ForkBlock3 + // Which is longer than: Genesis -> Block1 -> Block2 + const forkBlock1Hash = await createBlock(false, block1Hash); // ForkBlock at same height as Block2 + const forkBlock2Hash = await createBlock(false, forkBlock1Hash); // ForkBlock2 - triggers reorg + await createBlock(false, forkBlock2Hash); // ForkBlock3 + + // Wait for headers with timeout + await Promise.race([ + dataPromise, + new Promise((_, reject) => setTimeout(() => reject(new Error("Timeout waiting for reorg headers")), 15000)), + ]).catch(() => { + // Timeout is acceptable if we got some headers + subscription.unsubscribe(); + }); + + // Verify we received headers + expect(headers.length).to.be.greaterThan(0, "Should have received at least one header"); + + // Log headers for debugging + console.log( + `Received ${headers.length} headers during test:`, + headers.map((h) => ({ number: h.number, hash: h.hash?.slice(0, 10) })) + ); + + // Check if we have multiple headers at the same height (the key spec requirement) + const heightCounts: { [key: number]: number } = {}; + for (const h of headers) { + heightCounts[h.number] = (heightCounts[h.number] || 0) + 1; + } + const duplicateHeights = Object.entries(heightCounts).filter(([_, count]) => count > 1); + console.log(`Heights with multiple headers:`, duplicateHeights); + }).timeout(60000); + + step("newHeads should emit all enacted blocks during reorg in ascending order", async function () { + // Subscribe FIRST to capture all headers + subscription = context.web3.eth.subscribe("newBlockHeaders", function (error, result) {}); + + const headers: any[] = []; + let dataResolve = null; + let dataPromise = new Promise((resolve) => { + dataResolve = resolve; + }); + + await new Promise((resolve) => { + subscription.on("connected", function (d: any) { + resolve(); + }); + }); + + subscription.on("data", function (d: any) { + headers.push(d); + // Expect: A1, A2, then reorg emitting B2+B3, then B4, B5 + if (headers.length >= 6) { + subscription.unsubscribe(); + dataResolve(); + } + }); + + // Create initial chain: Genesis -> A1 -> A2 + const a1Hash = await createBlock(false); + const a2Hash = await createBlock(false); + + // Create competing chain from A1: Genesis -> A1 -> B2 -> B3 -> B4 -> B5 + // IMPORTANT: Must explicitly chain fork blocks to build a proper fork + // This is longer than A-chain and should trigger reorg + const b2Hash = await createBlock(false, a1Hash); // B2 at same height as A2 + const b3Hash = await createBlock(false, b2Hash); // B3 - triggers reorg + const b4Hash = await createBlock(false, b3Hash); // B4 + await createBlock(false, b4Hash); // B5 + + await Promise.race([ + dataPromise, + new Promise((_, reject) => setTimeout(() => reject(new Error("Timeout waiting for headers")), 15000)), + ]).catch(() => { + subscription.unsubscribe(); + }); + + // Verify headers are in ascending block number order (per Ethereum spec) + if (headers.length > 1) { + for (let i = 1; i < headers.length; i++) { + expect(headers[i].number).to.be.greaterThanOrEqual( + headers[i - 1].number, + "Headers should be emitted in ascending order" + ); + } + } + + console.log( + `Received ${headers.length} enacted headers:`, + headers.map((h) => ({ number: h.number, hash: h.hash?.slice(0, 10) })) + ); + + // Check for duplicate heights (evidence of reorg) + const heightCounts: { [key: number]: number } = {}; + for (const h of headers) { + heightCounts[h.number] = (heightCounts[h.number] || 0) + 1; + } + const duplicateHeights = Object.entries(heightCounts).filter(([_, count]) => count > 1); + console.log(`Heights with multiple headers:`, duplicateHeights); + }).timeout(80000); + + step("newHeads should handle deep forks with multiple enacted blocks", async function () { + // Test a deeper reorg scenario: + // Original chain: A1 -> A2 -> A3 -> A4 + // Fork from A1: A1 -> B2 -> B3 -> B4 -> B5 -> B6 + // This retracts 3 blocks (A2, A3, A4) and enacts 5 blocks (B2, B3, B4, B5, B6) + + subscription = context.web3.eth.subscribe("newBlockHeaders", function (error, result) {}); + + const headers: any[] = []; + let dataResolve = null; + let dataPromise = new Promise((resolve) => { + dataResolve = resolve; + }); + + await new Promise((resolve) => { + subscription.on("connected", function (d: any) { + resolve(); + }); + }); + + subscription.on("data", function (d: any) { + headers.push(d); + // Expect: A1, A2, A3, A4 (4 headers) + // Then reorg emitting B2, B3, B4, B5, B6 (5 headers) + // Total: 9 headers minimum + if (headers.length >= 9) { + subscription.unsubscribe(); + dataResolve(); + } + }); + + // Create original chain: A1 -> A2 -> A3 -> A4 + const a1Hash = await createBlock(false); + const a2Hash = await createBlock(false); + const a3Hash = await createBlock(false); + const a4Hash = await createBlock(false); + + // Create competing chain from A1 that's longer + // Fork: A1 -> B2 -> B3 -> B4 -> B5 -> B6 + const b2Hash = await createBlock(false, a1Hash); + const b3Hash = await createBlock(false, b2Hash); + const b4Hash = await createBlock(false, b3Hash); + const b5Hash = await createBlock(false, b4Hash); + await createBlock(false, b5Hash); // B6 - this triggers the reorg + + await Promise.race([ + dataPromise, + new Promise((_, reject) => + setTimeout(() => reject(new Error("Timeout waiting for deep fork headers")), 20000) + ), + ]).catch(() => { + subscription.unsubscribe(); + }); + + console.log( + `Deep fork test - Received ${headers.length} headers:`, + headers.map((h) => ({ number: h.number, hash: h.hash?.slice(0, 10) })) + ); + + // Count headers at each height + const heightCounts: { [key: number]: number } = {}; + for (const h of headers) { + heightCounts[h.number] = (heightCounts[h.number] || 0) + 1; + } + + // We should have multiple headers at heights where A-chain and B-chain overlap + // A2, A3, A4 are at the same heights as B2, B3, B4 + const duplicateHeights = Object.entries(heightCounts).filter(([_, count]) => count > 1); + console.log(`Heights with multiple headers:`, duplicateHeights); + + // Verify we got headers for all enacted blocks + // The reorg should emit B2, B3, B4, B5, B6 (5 blocks) + expect(headers.length).to.be.greaterThanOrEqual( + 9, + "Should receive headers for original chain + all enacted blocks" + ); + + // Per Ethereum spec, during a reorg the new chain headers are emitted in ascending order. + // However, they may be at heights we already saw (hence "multiple headers at same height"). + // The overall sequence might look like: 14, 15, 16, 17, [reorg: 15, 16, 17, 18, 19] + // So we verify that the enacted blocks (after the reorg point) are in ascending order. + + // Find where heights start decreasing (reorg point) + let reorgIndex = -1; + for (let i = 1; i < headers.length; i++) { + if (headers[i].number < headers[i - 1].number) { + reorgIndex = i; + break; + } + } + + if (reorgIndex > 0) { + // Verify enacted blocks after reorg are in ascending order + for (let i = reorgIndex + 1; i < headers.length; i++) { + expect(headers[i].number).to.be.greaterThanOrEqual( + headers[i - 1].number, + "Enacted blocks during reorg should be in ascending order" + ); + } + } + + // Verify we have at least 3 heights with duplicates (A2/B2, A3/B3, A4/B4) + expect(duplicateHeights.length).to.be.greaterThanOrEqual( + 3, + "Should have multiple headers at overlapping heights during deep reorg" + ); + }).timeout(120000); +});