-
Notifications
You must be signed in to change notification settings - Fork 2.4k
chore(trie): add e2e for reorg #18293
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
40162ae
609b8af
dd2a129
0082cdc
d6b89e4
01bc941
1137015
99ff0f2
f75f1a8
55b506f
e279d43
26486e7
6d5d113
8757799
3fedd6d
73f397f
eb908f3
8634a2a
e37b0d3
ff18f7c
ab3300d
7a7f0f1
6fcf463
f04fe88
5cfb1f5
ae7680c
079d786
e250978
2fec5f6
f1b2871
43ee266
e0abdd9
eb6b484
946206c
2c1eaec
eade8f0
99f5427
a5cf73e
6b4d5b2
ed0ae11
82a8dfe
f85422d
0f730e1
bc925b2
310e60a
cd2f9b2
db5bb11
905c5dc
b145018
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,7 @@ | ||
| use crate::{network::NetworkTestContext, payload::PayloadTestContext, rpc::RpcTestContext}; | ||
| use crate::{ | ||
| network::NetworkTestContext, payload::PayloadTestContext, rpc::RpcTestContext, | ||
| testsuite::TrieUpdateEvent, | ||
| }; | ||
| use alloy_consensus::{transaction::TxHashRef, BlockHeader}; | ||
| use alloy_eips::BlockId; | ||
| use alloy_primitives::{BlockHash, BlockNumber, Bytes, Sealable, B256}; | ||
|
|
@@ -17,13 +20,14 @@ use reth_node_builder::{rpc::RethRpcAddOns, FullNode, NodeTypes}; | |
|
|
||
| use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes}; | ||
| use reth_provider::{ | ||
| BlockReader, BlockReaderIdExt, CanonStateNotificationStream, CanonStateSubscriptions, | ||
| HeaderProvider, StageCheckpointReader, | ||
| BlockReader, BlockReaderIdExt, CanonStateNotification, CanonStateNotificationStream, | ||
| CanonStateSubscriptions, HeaderProvider, StageCheckpointReader, | ||
| }; | ||
| use reth_rpc_builder::auth::AuthServerHandle; | ||
| use reth_rpc_eth_api::helpers::{EthApiSpec, EthTransactions, TraceExt}; | ||
| use reth_stages_types::StageId; | ||
| use std::pin::Pin; | ||
| use tokio::sync::mpsc; | ||
| use tokio_stream::StreamExt; | ||
| use url::Url; | ||
|
|
||
|
|
@@ -304,6 +308,101 @@ where | |
| self.inner.auth_server_handle().clone() | ||
| } | ||
|
|
||
| /// Start a background task that forwards trie updates from canonical state notifications | ||
| /// to the test environment via the provided channel | ||
| pub fn start_trie_update_forwarder( | ||
| &mut self, | ||
| node_idx: usize, | ||
| tx: mpsc::UnboundedSender<TrieUpdateEvent>, | ||
| ) where | ||
| AddOns: RethRpcAddOns<Node> + 'static, | ||
| { | ||
| // Take the canonical stream (we can't clone it, so we move it into the task) | ||
| // This means each node can only have one forwarder, which is fine for our use case | ||
| let canonical_stream = std::mem::replace( | ||
| &mut self.canonical_stream, | ||
| self.inner.provider.canonical_state_stream(), | ||
| ); | ||
|
|
||
| tokio::spawn(async move { | ||
| println!("Starting trie update forwarder for node {}", node_idx); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. leftover println |
||
| tracing::debug!("Starting trie update forwarder for node {}", node_idx); | ||
|
|
||
| let mut stream = canonical_stream; | ||
| while let Some(notification) = stream.next().await { | ||
| println!( | ||
| "Node {} received canonical state notification: {:?}", | ||
| node_idx, | ||
| std::mem::discriminant(¬ification) | ||
| ); | ||
| match notification { | ||
| CanonStateNotification::Commit { new } => { | ||
| // Extract trie updates from committed chain | ||
| if let Some(trie_updates) = new.trie_updates() { | ||
| // Get the tip block hash from the chain | ||
| if let Some(tip_header) = new.headers().last() { | ||
| let tip_hash: alloy_primitives::B256 = tip_header.hash(); | ||
|
|
||
| let event = TrieUpdateEvent { | ||
| node_idx, | ||
| block_hash: tip_hash, | ||
| trie_updates: trie_updates.clone(), | ||
| }; | ||
|
|
||
| // Send the event, ignore errors if receiver is dropped | ||
| let _ = tx.send(event); | ||
|
|
||
| tracing::debug!( | ||
| "Forwarded trie updates for committed block {} from node {}", | ||
| tip_hash, | ||
| node_idx | ||
| ); | ||
| } | ||
| } else { | ||
| println!("Node {}: No trie updates in committed chain", node_idx); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to remove after debugging |
||
| tracing::debug!( | ||
| "No trie updates in committed chain for node {}", | ||
| node_idx | ||
| ); | ||
| } | ||
| } | ||
| CanonStateNotification::Reorg { old: _old, new } => { | ||
| // Extract trie updates from the new canonical chain after reorg | ||
| if let Some(trie_updates) = new.trie_updates() { | ||
| // Get all blocks in the new chain and forward their trie updates | ||
| for header in new.headers() { | ||
| let block_hash: alloy_primitives::B256 = header.hash(); | ||
|
|
||
| let event = TrieUpdateEvent { | ||
| node_idx, | ||
| block_hash, | ||
| trie_updates: trie_updates.clone(), | ||
| }; | ||
|
|
||
| // Send the event, ignore errors if receiver is dropped | ||
| let _ = tx.send(event); | ||
|
|
||
| tracing::debug!( | ||
| "Forwarded trie updates for reorged block {} from node {}", | ||
| block_hash, | ||
| node_idx | ||
| ); | ||
| } | ||
| } else { | ||
| println!("Node {}: No trie updates in reorged chain", node_idx); | ||
| tracing::debug!( | ||
| "No trie updates in reorged chain for node {}", | ||
| node_idx | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| tracing::debug!("Trie update forwarder ended for node {}", node_idx); | ||
| }); | ||
| } | ||
|
|
||
| /// Creates a [`crate::testsuite::NodeClient`] from this test context. | ||
| /// | ||
| /// This helper method extracts the necessary handles and creates a client | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shekhirin @mediocregopher @Rjected would appreciate a look over here as its what is required to replicate the test.
However as with anything with the trie state would need a double look
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my understanding of single block chains - the chain contains exactly one executed block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Afaict this is ok, the only behavior change is that CanonStateNotification::Commit contains trie updates for single block chains now, which the documentation states is the case anyway.