diff --git a/Cargo.lock b/Cargo.lock index 81db10feaf4..b148cd80131 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3220,6 +3220,7 @@ dependencies = [ "bytes", "concat-kdf", "ctr", + "ethereum-types", "ethrex-blockchain", "ethrex-common", "ethrex-rlp", @@ -3330,6 +3331,7 @@ dependencies = [ "bytes", "cfg-if", "envy", + "ethereum-types", "ethrex-blockchain", "ethrex-common", "ethrex-p2p", diff --git a/crates/common/rlp/decode.rs b/crates/common/rlp/decode.rs index 348999ff414..7ea69b25040 100644 --- a/crates/common/rlp/decode.rs +++ b/crates/common/rlp/decode.rs @@ -513,7 +513,7 @@ pub fn decode_bytes(data: &[u8]) -> Result<(&[u8], &[u8]), RLPDecodeError> { /// Pads a slice of bytes with zeros on the left to make it a fixed size slice. /// The size of the data must be less than or equal to the size of the output array. #[inline] -pub(crate) fn static_left_pad(data: &[u8]) -> Result<[u8; N], RLPDecodeError> { +pub fn static_left_pad(data: &[u8]) -> Result<[u8; N], RLPDecodeError> { let mut result = [0; N]; if data.is_empty() { diff --git a/crates/common/rlp/encode.rs b/crates/common/rlp/encode.rs index f18b7ab9f1e..9daad068853 100644 --- a/crates/common/rlp/encode.rs +++ b/crates/common/rlp/encode.rs @@ -248,7 +248,7 @@ impl RLPEncode for Vec { } } -pub(crate) fn encode_length(total_len: usize, buf: &mut dyn BufMut) { +pub fn encode_length(total_len: usize, buf: &mut dyn BufMut) { if total_len < 56 { buf.put_u8(0xc0 + total_len as u8); } else { diff --git a/crates/common/rlp/error.rs b/crates/common/rlp/error.rs index a2bc3349cda..91d4961db2e 100644 --- a/crates/common/rlp/error.rs +++ b/crates/common/rlp/error.rs @@ -15,6 +15,8 @@ pub enum RLPDecodeError { UnexpectedString, #[error("InvalidCompression")] InvalidCompression(#[from] snap::Error), + #[error("IncompatibleProtocol")] + IncompatibleProtocol, #[error("{0}")] Custom(String), } diff --git a/crates/common/types/block.rs b/crates/common/types/block.rs index acb259c7312..68f69d577e3 100644 --- a/crates/common/types/block.rs +++ b/crates/common/types/block.rs @@ -256,7 +256,7 @@ pub fn compute_receipts_root(receipts: &[Receipt]) -> H256 { let iter = receipts .iter() .enumerate() - .map(|(idx, receipt)| (idx.encode_to_vec(), receipt.encode_inner())); + .map(|(idx, receipt)| (idx.encode_to_vec(), receipt.encode_inner_with_bloom())); Trie::compute_hash_from_unsorted_iter(iter) } diff --git a/crates/common/types/receipt.rs b/crates/common/types/receipt.rs index fd2cf074f14..816cdda0665 100644 --- a/crates/common/types/receipt.rs +++ b/crates/common/types/receipt.rs @@ -17,11 +17,106 @@ pub struct Receipt { pub tx_type: TxType, pub succeeded: bool, pub cumulative_gas_used: u64, - pub bloom: Bloom, pub logs: Vec, } impl Receipt { + pub fn new(tx_type: TxType, succeeded: bool, cumulative_gas_used: u64, logs: Vec) -> Self { + Self { + tx_type, + succeeded, + cumulative_gas_used, + logs, + } + } + + pub fn encode_inner(&self) -> Vec { + let mut encoded_data = vec![]; + let tx_type: u8 = self.tx_type as u8; + Encoder::new(&mut encoded_data) + .encode_field(&tx_type) + .encode_field(&self.succeeded) + .encode_field(&self.cumulative_gas_used) + .encode_field(&self.logs) + .finish(); + encoded_data + } + + pub fn encode_inner_with_bloom(&self) -> Vec { + let mut encode_buff = match self.tx_type { + TxType::Legacy => { + vec![] + } + _ => { + vec![self.tx_type as u8] + } + }; + let bloom = bloom_from_logs(&self.logs); + Encoder::new(&mut encode_buff) + .encode_field(&self.succeeded) + .encode_field(&self.cumulative_gas_used) + .encode_field(&bloom) + .encode_field(&self.logs) + .finish(); + encode_buff + } +} + +pub fn bloom_from_logs(logs: &[Log]) -> Bloom { + let mut bloom = Bloom::zero(); + for log in logs { + bloom.accrue(BloomInput::Raw(log.address.as_ref())); + for topic in log.topics.iter() { + bloom.accrue(BloomInput::Raw(topic.as_ref())); + } + } + bloom +} + +impl RLPEncode for Receipt { + fn encode(&self, buf: &mut dyn bytes::BufMut) { + let encoded_inner = self.encode_inner(); + buf.put_slice(&encoded_inner); + } +} + +impl RLPDecode for Receipt { + fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> { + let decoder = Decoder::new(rlp)?; + let (tx_type, decoder): (u8, _) = decoder.decode_field("tx-type")?; + let (succeeded, decoder) = decoder.decode_field("succeeded")?; + let (cumulative_gas_used, decoder) = decoder.decode_field("cumulative_gas_used")?; + let (logs, decoder) = decoder.decode_field("logs")?; + + let Some(tx_type) = TxType::from_u8(tx_type) else { + return Err(RLPDecodeError::Custom( + "Invalid transaction type".to_string(), + )); + }; + + Ok(( + Receipt { + tx_type, + succeeded, + cumulative_gas_used, + logs, + }, + decoder.finish()?, + )) + } +} + +/// Result of a transaction +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct ReceiptWithBloom { + pub tx_type: TxType, + pub succeeded: bool, + pub cumulative_gas_used: u64, + pub bloom: Bloom, + pub logs: Vec, +} + +impl ReceiptWithBloom { pub fn new(tx_type: TxType, succeeded: bool, cumulative_gas_used: u64, logs: Vec) -> Self { Self { tx_type, @@ -31,6 +126,7 @@ impl Receipt { logs, } } + // By reading the typed transactions EIP, and some geth code: // - https://eips.ethereum.org/EIPS/eip-2718 // - https://github.com/ethereum/go-ethereum/blob/330190e476e2a2de4aac712551629a4134f802d5/core/types/receipt.go#L143 @@ -69,7 +165,7 @@ impl Receipt { /// Decodes Receipts in the following formats: /// A) Legacy receipts: rlp(receipt) /// B) Non legacy receipts: tx_type | rlp(receipt). - pub fn decode_inner(rlp: &[u8]) -> Result { + pub fn decode_inner(rlp: &[u8]) -> Result { // Obtain TxType let (tx_type, rlp) = match rlp.first() { Some(tx_type) if *tx_type < 0x7f => { @@ -97,7 +193,7 @@ impl Receipt { let (logs, decoder) = decoder.decode_field("logs")?; decoder.finish()?; - Ok(Receipt { + Ok(Self { tx_type, succeeded, cumulative_gas_used, @@ -107,18 +203,7 @@ impl Receipt { } } -fn bloom_from_logs(logs: &[Log]) -> Bloom { - let mut bloom = Bloom::zero(); - for log in logs { - bloom.accrue(BloomInput::Raw(log.address.as_ref())); - for topic in log.topics.iter() { - bloom.accrue(BloomInput::Raw(topic.as_ref())); - } - } - bloom -} - -impl RLPEncode for Receipt { +impl RLPEncode for ReceiptWithBloom { /// Receipts can be encoded in the following formats: /// A) Legacy receipts: rlp(receipt) /// B) Non legacy receipts: rlp(Bytes(tx_type | rlp(receipt))). @@ -137,7 +222,7 @@ impl RLPEncode for Receipt { } } -impl RLPDecode for Receipt { +impl RLPDecode for ReceiptWithBloom { /// Receipts can be encoded in the following formats: /// A) Legacy receipts: rlp(receipt) /// B) Non legacy receipts: rlp(Bytes(tx_type | rlp(receipt))). @@ -169,7 +254,7 @@ impl RLPDecode for Receipt { let (logs, decoder) = decoder.decode_field("logs")?; Ok(( - Receipt { + ReceiptWithBloom { tx_type, succeeded, cumulative_gas_used, @@ -181,6 +266,29 @@ impl RLPDecode for Receipt { } } +impl From<&Receipt> for ReceiptWithBloom { + fn from(receipt: &Receipt) -> Self { + Self { + tx_type: receipt.tx_type, + succeeded: receipt.succeeded, + cumulative_gas_used: receipt.cumulative_gas_used, + bloom: bloom_from_logs(&receipt.logs), + logs: receipt.logs.clone(), + } + } +} + +impl From<&ReceiptWithBloom> for Receipt { + fn from(receipt: &ReceiptWithBloom) -> Self { + Self { + tx_type: receipt.tx_type, + succeeded: receipt.succeeded, + cumulative_gas_used: receipt.cumulative_gas_used, + logs: receipt.logs.clone(), + } + } +} + /// Data record produced during the execution of a transaction. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct Log { @@ -224,7 +332,6 @@ mod test { tx_type: TxType::Legacy, succeeded: true, cumulative_gas_used: 1200, - bloom: Bloom::random(), logs: vec![Log { address: Address::random(), topics: vec![], @@ -241,7 +348,6 @@ mod test { tx_type: TxType::EIP4844, succeeded: true, cumulative_gas_used: 1500, - bloom: Bloom::random(), logs: vec![Log { address: Address::random(), topics: vec![], @@ -251,10 +357,9 @@ mod test { let encoded_receipt = receipt.encode_to_vec(); assert_eq!(receipt, Receipt::decode(&encoded_receipt).unwrap()) } - #[test] fn test_encode_decode_inner_receipt_legacy() { - let receipt = Receipt { + let receipt = ReceiptWithBloom { tx_type: TxType::Legacy, succeeded: true, cumulative_gas_used: 1200, @@ -266,12 +371,15 @@ mod test { }], }; let encoded_receipt = receipt.encode_inner(); - assert_eq!(receipt, Receipt::decode_inner(&encoded_receipt).unwrap()) + assert_eq!( + receipt, + ReceiptWithBloom::decode_inner(&encoded_receipt).unwrap() + ) } #[test] fn test_encode_decode_receipt_inner_non_legacy() { - let receipt = Receipt { + let receipt = ReceiptWithBloom { tx_type: TxType::EIP4844, succeeded: true, cumulative_gas_used: 1500, @@ -283,6 +391,9 @@ mod test { }], }; let encoded_receipt = receipt.encode_inner(); - assert_eq!(receipt, Receipt::decode_inner(&encoded_receipt).unwrap()) + assert_eq!( + receipt, + ReceiptWithBloom::decode_inner(&encoded_receipt).unwrap() + ) } } diff --git a/crates/networking/p2p/Cargo.toml b/crates/networking/p2p/Cargo.toml index 81494fd41c2..f375aa32893 100644 --- a/crates/networking/p2p/Cargo.toml +++ b/crates/networking/p2p/Cargo.toml @@ -11,7 +11,7 @@ ethrex-blockchain.workspace = true ethrex-rlp.workspace = true ethrex-storage.workspace = true ethrex-trie.workspace = true - +ethereum-types.workspace = true tracing.workspace = true tokio.workspace = true tokio-util.workspace = true diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index fd6cde14419..ded9f448d14 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -17,7 +17,7 @@ use crate::{ blocks::{ BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, BLOCK_HEADER_LIMIT, }, - receipts::{GetReceipts, Receipts}, + receipts::GetReceipts, }, message::Message as RLPxMessage, p2p::{Capability, SUPPORTED_ETH_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES}, @@ -280,10 +280,11 @@ impl PeerHandler { if let Some(receipts) = tokio::time::timeout(PEER_REPLY_TIMEOUT, async move { loop { match receiver.recv().await { - Some(RLPxMessage::Receipts(Receipts { id, receipts })) - if id == request_id => - { - return Some(receipts) + Some(RLPxMessage::Receipts(receipts)) => { + if receipts.get_id() == request_id { + return Some(receipts.get_receipts()); + } + return None; } // Ignore replies that don't match the expected id (such as late responses) Some(_) => continue, diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 720b6684dbb..002d0ea1725 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -6,6 +6,7 @@ use crate::{ backend, blocks::{BlockBodies, BlockHeaders}, receipts::{GetReceipts, Receipts}, + status::StatusMessage, transactions::{GetPooledTransactions, Transactions}, }, frame::RLPxCodec, @@ -46,12 +47,16 @@ use tokio_util::codec::Framed; use tracing::debug; use super::{ - eth::transactions::NewPooledTransactionHashes, p2p::DisconnectReason, utils::log_peer_warn, + eth::{transactions::NewPooledTransactionHashes, update::BlockRangeUpdate}, + p2p::DisconnectReason, + utils::log_peer_warn, }; const PERIODIC_PING_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10); const PERIODIC_TX_BROADCAST_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500); const PERIODIC_TASKS_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_millis(500); +const PERIODIC_BLOCK_RANGE_UPDATE_INTERVAL: std::time::Duration = + std::time::Duration::from_secs(60); pub const MAX_PEERS_TCP_CONNECTIONS: usize = 100; pub(crate) type Aes256Ctr64BE = ctr::Ctr64BE; @@ -83,6 +88,8 @@ pub(crate) struct RLPxConnection { negotiated_snap_capability: Option, next_periodic_ping: Instant, next_tx_broadcast: Instant, + next_block_range_update: Instant, + last_block_range_update_block: u64, broadcasted_txs: HashSet, client_version: String, /// Send end of the channel used to broadcast messages @@ -119,6 +126,8 @@ impl RLPxConnection { negotiated_snap_capability: None, next_periodic_ping: Instant::now() + PERIODIC_TASKS_CHECK_INTERVAL, next_tx_broadcast: Instant::now() + PERIODIC_TX_BROADCAST_INTERVAL, + next_block_range_update: Instant::now() + PERIODIC_BLOCK_RANGE_UPDATE_INTERVAL, + last_block_range_update_block: 0, broadcasted_txs: HashSet::new(), client_version, connection_broadcast_send: connection_broadcast, @@ -402,7 +411,13 @@ impl RLPxConnection { if Instant::now() >= self.next_tx_broadcast { self.send_new_pooled_tx_hashes().await?; self.next_tx_broadcast = Instant::now() + PERIODIC_TX_BROADCAST_INTERVAL; - } + }; + if Instant::now() >= self.next_block_range_update { + self.next_block_range_update = Instant::now() + PERIODIC_BLOCK_RANGE_UPDATE_INTERVAL; + if self.should_send_block_range_update().await? { + self.send_block_range_update().await?; + } + }; Ok(()) } @@ -439,6 +454,30 @@ impl RLPxConnection { Ok(()) } + async fn send_block_range_update(&mut self) -> Result<(), RLPxError> { + // BlockRangeUpdate was introduced in eth/69 + if let Some(eth) = &self.negotiated_eth_capability { + if eth.version >= 69 { + log_peer_debug(&self.node, "Sending BlockRangeUpdate"); + let update = BlockRangeUpdate::new(&self.storage).await?; + let lastet_block = update.lastest_block; + self.send(Message::BlockRangeUpdate(update)).await?; + self.last_block_range_update_block = lastet_block - (lastet_block % 32); + } + } + Ok(()) + } + + async fn should_send_block_range_update(&mut self) -> Result { + let latest_block = self.storage.get_latest_block_number().await?; + if latest_block < self.last_block_range_update_block + || latest_block - self.last_block_range_update_block >= 32 + { + return Ok(true); + } + Ok(false) + } + async fn handle_message( &mut self, message: Message, @@ -463,7 +502,7 @@ impl RLPxConnection { } Message::Status(msg_data) => { if let Some(eth) = &self.negotiated_eth_capability { - backend::validate_status(msg_data, &self.storage, eth.version).await? + backend::validate_status(msg_data, &self.storage, eth).await? }; } Message::GetAccountRange(req) => { @@ -499,12 +538,28 @@ impl RLPxConnection { self.send(Message::BlockBodies(response)).await?; } Message::GetReceipts(GetReceipts { id, block_hashes }) if peer_supports_eth => { - let mut receipts = Vec::new(); - for hash in block_hashes.iter() { - receipts.push(self.storage.get_receipts_for_block(hash)?); + if let Some(eth) = &self.negotiated_eth_capability { + let mut receipts = Vec::new(); + for hash in block_hashes.iter() { + receipts.push(self.storage.get_receipts_for_block(hash)?); + } + let response = Receipts::new(id, receipts, eth)?; + self.send(Message::Receipts(response)).await?; } - let response = Receipts { id, receipts }; - self.send(Message::Receipts(response)).await?; + } + Message::BlockRangeUpdate(update) => { + if update.earliest_block > update.lastest_block { + return Err(RLPxError::InvalidBlockRange); + } + + //TODO implement the logic + log_peer_debug( + &self.node, + &format!( + "Range block update: {} to {}", + update.earliest_block, update.lastest_block + ), + ); } Message::NewPooledTransactionHashes(new_pooled_transaction_hashes) if peer_supports_eth => @@ -579,7 +634,7 @@ impl RLPxConnection { async fn init_peer_conn(&mut self) -> Result<(), RLPxError> { // Sending eth Status if peer supports it if let Some(eth) = self.negotiated_eth_capability.clone() { - let status = backend::get_status(&self.storage, eth.version).await?; + let status = StatusMessage::new(&self.storage, ð).await?; log_peer_debug(&self.node, "Sending status"); self.send(Message::Status(status)).await?; // The next immediate message in the ETH protocol is the @@ -592,7 +647,7 @@ impl RLPxConnection { match msg { Message::Status(msg_data) => { log_peer_debug(&self.node, "Received Status"); - backend::validate_status(msg_data, &self.storage, eth.version).await? + backend::validate_status(msg_data, &self.storage, ð).await? } Message::Disconnect(disconnect) => { return Err(RLPxError::HandshakeError(format!( diff --git a/crates/networking/p2p/rlpx/error.rs b/crates/networking/p2p/rlpx/error.rs index 363e3a02e9c..c939b207863 100644 --- a/crates/networking/p2p/rlpx/error.rs +++ b/crates/networking/p2p/rlpx/error.rs @@ -51,6 +51,10 @@ pub(crate) enum RLPxError { IoError(#[from] std::io::Error), #[error("Failed to decode message due to invalid frame: {0}")] InvalidMessageFrame(String), + #[error("Incompatible Protocol")] + IncompatibleProtocol, + #[error("Invalid block range")] + InvalidBlockRange, } // tokio::sync::mpsc::error::SendError is too large to be part of the RLPxError enum directly diff --git a/crates/networking/p2p/rlpx/eth/backend.rs b/crates/networking/p2p/rlpx/eth/backend.rs index cf9a80d3256..6db802f8cc7 100644 --- a/crates/networking/p2p/rlpx/eth/backend.rs +++ b/crates/networking/p2p/rlpx/eth/backend.rs @@ -1,41 +1,14 @@ -use ethrex_common::{types::ForkId, U256}; +use ethrex_common::types::ForkId; use ethrex_storage::Store; -use crate::rlpx::error::RLPxError; +use crate::rlpx::{error::RLPxError, p2p::Capability}; use super::status::StatusMessage; -pub async fn get_status(storage: &Store, eth_version: u8) -> Result { - let chain_config = storage.get_chain_config()?; - let total_difficulty = U256::from(chain_config.terminal_total_difficulty.unwrap_or_default()); - let network_id = chain_config.chain_id; - - // These blocks must always be available - let genesis_header = storage - .get_block_header(0)? - .ok_or(RLPxError::NotFound("Genesis Block".to_string()))?; - let block_number = storage.get_latest_block_number().await?; - let block_header = storage - .get_block_header(block_number)? - .ok_or(RLPxError::NotFound(format!("Block {block_number}")))?; - - let genesis = genesis_header.hash(); - let block_hash = block_header.hash(); - let fork_id = storage.get_fork_id().await?; - Ok(StatusMessage { - eth_version: eth_version as u32, - network_id, - total_difficulty, - block_hash, - genesis, - fork_id, - }) -} - pub async fn validate_status( msg_data: StatusMessage, storage: &Store, - eth_version: u8, + eth_capability: &Capability, ) -> Result<(), RLPxError> { let chain_config = storage.get_chain_config()?; @@ -56,26 +29,26 @@ pub async fn validate_status( ); //Check networkID - if msg_data.network_id != chain_config.chain_id { + if msg_data.get_network_id() != chain_config.chain_id { return Err(RLPxError::HandshakeError( "Network Id does not match".to_string(), )); } //Check Protocol Version - if msg_data.eth_version as u8 != eth_version { + if msg_data.get_eth_version() != eth_capability.version { return Err(RLPxError::HandshakeError( "Eth protocol version does not match".to_string(), )); } //Check Genesis - if msg_data.genesis != genesis_hash { + if msg_data.get_genesis() != genesis_hash { return Err(RLPxError::HandshakeError( "Genesis does not match".to_string(), )); } // Check ForkID if !fork_id.is_valid( - msg_data.fork_id, + msg_data.get_fork_id(), latest_block_number, latest_block_header.timestamp, chain_config, @@ -90,11 +63,14 @@ pub async fn validate_status( #[cfg(test)] mod tests { use super::validate_status; + use crate::rlpx::eth::eth68::status::StatusMessage68; use crate::rlpx::eth::status::StatusMessage; + use crate::rlpx::p2p::Capability; use ethrex_common::{ types::{ForkId, Genesis}, H256, U256, }; + use ethrex_storage::{EngineType, Store}; use std::{fs::File, io::BufReader}; @@ -120,16 +96,16 @@ mod tests { let genesis_hash = genesis_header.hash(); let fork_id = ForkId::new(config, genesis_header, 2707305664, 123); - let eth_version = 68; - let message = StatusMessage { - eth_version: eth_version as u32, + let eth = Capability::eth(68); + let message = StatusMessage::StatusMessage68(StatusMessage68 { + eth_version: eth.version, network_id: 3503995874084926, total_difficulty, block_hash: H256::random(), genesis: genesis_hash, fork_id, - }; - let result = validate_status(message, &storage, eth_version).await; + }); + let result = validate_status(message, &storage, ð).await; assert!(result.is_ok()); } } diff --git a/crates/networking/p2p/rlpx/eth/eth68/mod.rs b/crates/networking/p2p/rlpx/eth/eth68/mod.rs new file mode 100644 index 00000000000..8abfab10191 --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/eth68/mod.rs @@ -0,0 +1,2 @@ +pub mod receipts; +pub mod status; diff --git a/crates/networking/p2p/rlpx/eth/eth68/receipts.rs b/crates/networking/p2p/rlpx/eth/eth68/receipts.rs new file mode 100644 index 00000000000..100623b552d --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/eth68/receipts.rs @@ -0,0 +1,73 @@ +use crate::rlpx::{ + message::RLPxMessage, + utils::{snappy_compress, snappy_decompress}, +}; +use bytes::BufMut; +use ethrex_common::types::{Receipt, ReceiptWithBloom}; +use ethrex_rlp::{ + error::{RLPDecodeError, RLPEncodeError}, + structs::{Decoder, Encoder}, +}; + +#[derive(Debug)] +pub(crate) struct Receipts68 { + // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response + // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages + pub id: u64, + pub receipts: Vec>, +} + +impl Receipts68 { + pub fn new(id: u64, receipts: Vec>) -> Self { + if receipts.is_empty() { + return Self { + id, + receipts: vec![], + }; + } + let mut transformed_receipts = vec![]; + for r in &receipts { + transformed_receipts.push(vec![ReceiptWithBloom::from(&r[0])]); + } + Self { + id, + receipts: transformed_receipts, + } + } + + pub fn get_receipts(&self) -> Vec> { + if self.receipts.is_empty() { + return vec![]; + } + let mut receipts = vec![]; + for r in &self.receipts { + receipts.push(vec![Receipt::from(&r[0])]); + } + receipts + } +} + +impl RLPxMessage for Receipts68 { + const CODE: u8 = 0x0F; + + fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { + let mut encoded_data = vec![]; + Encoder::new(&mut encoded_data) + .encode_field(&self.id) + .encode_field(&self.receipts) + .finish(); + + let msg_data = snappy_compress(encoded_data)?; + buf.put_slice(&msg_data); + Ok(()) + } + + fn decode(msg_data: &[u8]) -> Result { + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; + let (receipts, _): (Vec>, _) = decoder.decode_field("receipts")?; + + Ok(Receipts68 { id, receipts }) + } +} diff --git a/crates/networking/p2p/rlpx/eth/eth68/status.rs b/crates/networking/p2p/rlpx/eth/eth68/status.rs new file mode 100644 index 00000000000..389a8e3d388 --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/eth68/status.rs @@ -0,0 +1,67 @@ +use crate::rlpx::{ + message::RLPxMessage, + utils::{snappy_compress, snappy_decompress}, +}; +use bytes::BufMut; +use ethrex_common::{ + types::{BlockHash, ForkId}, + U256, +}; +use ethrex_rlp::{ + error::{RLPDecodeError, RLPEncodeError}, + structs::{Decoder, Encoder}, +}; + +#[derive(Debug)] +pub(crate) struct StatusMessage68 { + pub(crate) eth_version: u8, + pub(crate) network_id: u64, + pub(crate) total_difficulty: U256, + pub(crate) block_hash: BlockHash, + pub(crate) genesis: BlockHash, + pub(crate) fork_id: ForkId, +} + +impl RLPxMessage for StatusMessage68 { + const CODE: u8 = 0x00; + fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { + let mut encoded_data = vec![]; + Encoder::new(&mut encoded_data) + .encode_field(&self.eth_version) + .encode_field(&self.network_id) + .encode_field(&self.total_difficulty) + .encode_field(&self.block_hash) + .encode_field(&self.genesis) + .encode_field(&self.fork_id) + .finish(); + + let msg_data = snappy_compress(encoded_data)?; + buf.put_slice(&msg_data); + Ok(()) + } + + fn decode(msg_data: &[u8]) -> Result { + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (eth_version, decoder): (u32, _) = decoder.decode_field("protocolVersion")?; + + assert_eq!(eth_version, 68, "only eth version 68 is supported"); + + let (network_id, decoder): (u64, _) = decoder.decode_field("networkId")?; + let (total_difficulty, decoder): (U256, _) = decoder.decode_field("totalDifficulty")?; + let (block_hash, decoder): (BlockHash, _) = decoder.decode_field("blockHash")?; + let (genesis, decoder): (BlockHash, _) = decoder.decode_field("genesis")?; + let (fork_id, decoder): (ForkId, _) = decoder.decode_field("forkId")?; + // Implementations must ignore any additional list elements + let _padding = decoder.finish_unchecked(); + + Ok(Self { + eth_version: eth_version as u8, + network_id, + total_difficulty, + block_hash, + genesis, + fork_id, + }) + } +} diff --git a/crates/networking/p2p/rlpx/eth/eth69/mod.rs b/crates/networking/p2p/rlpx/eth/eth69/mod.rs new file mode 100644 index 00000000000..8abfab10191 --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/eth69/mod.rs @@ -0,0 +1,2 @@ +pub mod receipts; +pub mod status; diff --git a/crates/networking/p2p/rlpx/eth/eth69/receipts.rs b/crates/networking/p2p/rlpx/eth/eth69/receipts.rs new file mode 100644 index 00000000000..3d46610d081 --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/eth69/receipts.rs @@ -0,0 +1,49 @@ +use crate::rlpx::{ + message::RLPxMessage, + utils::{snappy_compress, snappy_decompress}, +}; +use bytes::BufMut; +use ethrex_common::types::Receipt; +use ethrex_rlp::{ + error::{RLPDecodeError, RLPEncodeError}, + structs::{Decoder, Encoder}, +}; + +#[derive(Debug)] +pub(crate) struct Receipts69 { + // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response + // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages + pub id: u64, + pub receipts: Vec>, +} + +impl Receipts69 { + pub fn new(id: u64, receipts: Vec>) -> Self { + Self { receipts, id } + } +} + +impl RLPxMessage for Receipts69 { + const CODE: u8 = 0x0F; + + fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { + let mut encoded_data = vec![]; + Encoder::new(&mut encoded_data) + .encode_field(&self.id) + .encode_field(&self.receipts) + .finish(); + + let msg_data = snappy_compress(encoded_data)?; + buf.put_slice(&msg_data); + Ok(()) + } + + fn decode(msg_data: &[u8]) -> Result { + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; + let (receipts, _): (Vec>, _) = decoder.decode_field("receipts")?; + + Ok(Self::new(id, receipts)) + } +} diff --git a/crates/networking/p2p/rlpx/eth/eth69/status.rs b/crates/networking/p2p/rlpx/eth/eth69/status.rs new file mode 100644 index 00000000000..f828bc940bb --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/eth69/status.rs @@ -0,0 +1,68 @@ +use crate::rlpx::{ + message::RLPxMessage, + utils::{snappy_compress, snappy_decompress}, +}; +use bytes::BufMut; +use ethrex_common::types::{BlockHash, ForkId}; +use ethrex_rlp::{ + error::{RLPDecodeError, RLPEncodeError}, + structs::{Decoder, Encoder}, +}; + +#[derive(Debug)] +pub(crate) struct StatusMessage69 { + pub(crate) eth_version: u8, + pub(crate) network_id: u64, + pub(crate) genesis: BlockHash, + pub(crate) fork_id: ForkId, + pub(crate) earliest_block: u64, + pub(crate) lastest_block: u64, + pub(crate) lastest_block_hash: BlockHash, +} + +impl RLPxMessage for StatusMessage69 { + const CODE: u8 = 0x00; + fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { + let mut encoded_data = vec![]; + Encoder::new(&mut encoded_data) + .encode_field(&self.eth_version) + .encode_field(&self.network_id) + .encode_field(&self.genesis) + .encode_field(&self.fork_id) + .encode_field(&self.earliest_block) + .encode_field(&self.lastest_block) + .encode_field(&self.lastest_block_hash) + .finish(); + + let msg_data = snappy_compress(encoded_data)?; + buf.put_slice(&msg_data); + Ok(()) + } + + fn decode(msg_data: &[u8]) -> Result { + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (eth_version, decoder): (u32, _) = decoder.decode_field("protocolVersion")?; + + assert_eq!(eth_version, 69, "only eth version 69 is supported"); + + let (network_id, decoder): (u64, _) = decoder.decode_field("networkId")?; + let (genesis, decoder): (BlockHash, _) = decoder.decode_field("genesis")?; + let (fork_id, decoder): (ForkId, _) = decoder.decode_field("forkId")?; + let (earliest_block, decoder): (u64, _) = decoder.decode_field("earliestBlock")?; + let (lastest_block, decoder): (u64, _) = decoder.decode_field("lastestBlock")?; + let (lastest_block_hash, decoder): (BlockHash, _) = decoder.decode_field("latestHash")?; + // Implementations must ignore any additional list elements + let _padding = decoder.finish_unchecked(); + + Ok(Self { + eth_version: eth_version as u8, + network_id, + genesis, + fork_id, + earliest_block, + lastest_block, + lastest_block_hash, + }) + } +} diff --git a/crates/networking/p2p/rlpx/eth.rs b/crates/networking/p2p/rlpx/eth/mod.rs similarity index 73% rename from crates/networking/p2p/rlpx/eth.rs rename to crates/networking/p2p/rlpx/eth/mod.rs index a03ff256caa..3d57bd6630c 100644 --- a/crates/networking/p2p/rlpx/eth.rs +++ b/crates/networking/p2p/rlpx/eth/mod.rs @@ -1,5 +1,8 @@ pub(crate) mod backend; pub(crate) mod blocks; +mod eth68; +mod eth69; pub(crate) mod receipts; pub(crate) mod status; pub(crate) mod transactions; +pub(crate) mod update; diff --git a/crates/networking/p2p/rlpx/eth/receipts.rs b/crates/networking/p2p/rlpx/eth/receipts.rs index 09b62301b53..e2907d00a9f 100644 --- a/crates/networking/p2p/rlpx/eth/receipts.rs +++ b/crates/networking/p2p/rlpx/eth/receipts.rs @@ -1,10 +1,17 @@ +use super::eth68::receipts::Receipts68; +use super::eth69::receipts::Receipts69; use crate::rlpx::{ + error::RLPxError, message::RLPxMessage, + p2p::Capability, utils::{snappy_compress, snappy_decompress}, }; +use ethereum_types::Bloom; + use bytes::BufMut; use ethrex_common::types::{BlockHash, Receipt}; use ethrex_rlp::{ + decode::static_left_pad, error::{RLPDecodeError, RLPEncodeError}, structs::{Decoder, Encoder}, }; @@ -50,16 +57,32 @@ impl RLPxMessage for GetReceipts { // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#receipts-0x10 #[derive(Debug)] -pub(crate) struct Receipts { - // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response - // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages - pub id: u64, - pub receipts: Vec>, +pub(crate) enum Receipts { + Receipts68(Receipts68), + Receipts69(Receipts69), } impl Receipts { - pub fn new(id: u64, receipts: Vec>) -> Self { - Self { receipts, id } + pub fn new(id: u64, receipts: Vec>, eth: &Capability) -> Result { + match eth.version { + 68 => Ok(Receipts::Receipts68(Receipts68::new(id, receipts))), + 69 => Ok(Receipts::Receipts69(Receipts69::new(id, receipts))), + _ => Err(RLPxError::IncompatibleProtocol), + } + } + + pub fn get_receipts(&self) -> Vec> { + match self { + Receipts::Receipts68(msg) => msg.get_receipts(), + Receipts::Receipts69(msg) => msg.receipts.clone(), + } + } + + pub fn get_id(&self) -> u64 { + match self { + Receipts::Receipts68(msg) => msg.id, + Receipts::Receipts69(msg) => msg.id, + } } } @@ -67,35 +90,93 @@ impl RLPxMessage for Receipts { const CODE: u8 = 0x10; fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { - let mut encoded_data = vec![]; - Encoder::new(&mut encoded_data) - .encode_field(&self.id) - .encode_field(&self.receipts) - .finish(); - - let msg_data = snappy_compress(encoded_data)?; - buf.put_slice(&msg_data); - Ok(()) + match self { + Receipts::Receipts68(msg) => msg.encode(buf), + Receipts::Receipts69(msg) => msg.encode(buf), + } } fn decode(msg_data: &[u8]) -> Result { - let decompressed_data = snappy_decompress(msg_data)?; - let decoder = Decoder::new(&decompressed_data)?; - let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; - let (receipts, _): (Vec>, _) = decoder.decode_field("receipts")?; + if has_bloom(msg_data)? { + Ok(Receipts::Receipts68(Receipts68::decode(msg_data)?)) + } else { + Ok(Receipts::Receipts69(Receipts69::decode(msg_data)?)) + } + } +} + +// We should receive something like this: +// [request-id, [[r1], [r2], [r3],... ]] +// in this fn, we're checking if r1 has a bloom field inside +fn has_bloom(msg_data: &[u8]) -> Result { + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (_, decoder): (u64, _) = decoder.decode_field("request-id")?; + + //a list should be received + let (data, _) = decoder.get_encoded_item()?; + let decoder = Decoder::new(&data)?; + //check if the list is empty + if decoder.is_done() { + return Ok(false); + } + + // inner list + let (data, _) = decoder.get_encoded_item()?; + let decoder = Decoder::new(&data)?; + if decoder.is_done() { + return Ok(false); + } - Ok(Self::new(id, receipts)) + // we only need one element + // all elements should be the same + let (data, _) = decoder.get_encoded_item()?; + let data = match data[0] { + 0x80..=0xB7 => { + let length = (data[0] - 0x80) as usize; + if data.len() < length + 1 { + return Err(RLPDecodeError::InvalidLength); + } + &data[1..length + 1] + } + 0xB8..=0xBF => { + let length_of_length = (data[0] - 0xB7) as usize; + if data.len() < length_of_length + 1 { + return Err(RLPDecodeError::InvalidLength); + } + let length_bytes = &data[1..length_of_length + 1]; + let length = usize::from_be_bytes(static_left_pad(length_bytes)?); + if data.len() < length_of_length + length + 1 { + return Err(RLPDecodeError::InvalidLength); + } + &data[length_of_length + 1..length_of_length + length + 1] + } + _ => return Ok(false), + }; + let data = match data[0] { + tx_type if tx_type < 0x7f => &data[1..], + _ => &data[0..], + }; + let decoder = Decoder::new(data)?; + let (_, decoder): (bool, _) = decoder.decode_field("succeeded")?; + let (_, decoder): (u64, _) = decoder.decode_field("cumulative_gas_used")?; + // try to decode the bloom field + match decoder.decode_field::("bloom") { + Ok(_) => Ok(true), + Err(_) => Ok(false), } } #[cfg(test)] mod tests { - use ethrex_common::types::{BlockHash, Receipt}; - + use crate::rlpx::eth::receipts::has_bloom; use crate::rlpx::{ eth::receipts::{GetReceipts, Receipts}, message::RLPxMessage, + p2p::Capability, }; + use ethrex_common::types::transaction::TxType; + use ethrex_common::types::{BlockHash, Receipt}; #[test] fn get_receipts_empty_message() { @@ -130,13 +211,34 @@ mod tests { #[test] fn receipts_empty_message() { let receipts = vec![]; - let receipts = Receipts::new(1, receipts); + let receipts = Receipts::new(1, receipts, &Capability::eth(68)).unwrap(); let mut buf = Vec::new(); receipts.encode(&mut buf).unwrap(); let decoded = Receipts::decode(&buf).unwrap(); - assert_eq!(decoded.id, 1); - assert_eq!(decoded.receipts, Vec::>::new()); + + assert_eq!(decoded.get_id(), 1); + assert_eq!(decoded.get_receipts(), Vec::>::new()); + } + + #[test] + fn receipts_check_bloom() { + let receipts = vec![vec![ + Receipt::new(TxType::EIP7702, true, 210000, vec![]), + Receipt::new(TxType::EIP7702, true, 210000, vec![]), + Receipt::new(TxType::EIP7702, true, 210000, vec![]), + Receipt::new(TxType::EIP7702, true, 210000, vec![]), + ]]; + let receipts68 = Receipts::new(255, receipts.clone(), &Capability::eth(68)).unwrap(); + let receipts69 = Receipts::new(255, receipts, &Capability::eth(69)).unwrap(); + + let mut buf = Vec::new(); + receipts68.encode(&mut buf).unwrap(); + assert!(has_bloom(&buf).unwrap()); + + let mut buf = Vec::new(); + receipts69.encode(&mut buf).unwrap(); + assert!(!has_bloom(&buf).unwrap()); } } diff --git a/crates/networking/p2p/rlpx/eth/status.rs b/crates/networking/p2p/rlpx/eth/status.rs index 575aca833ff..842cce48932 100644 --- a/crates/networking/p2p/rlpx/eth/status.rs +++ b/crates/networking/p2p/rlpx/eth/status.rs @@ -1,67 +1,119 @@ -use crate::rlpx::{ - message::RLPxMessage, - utils::{snappy_compress, snappy_decompress}, -}; +use super::eth68::status::StatusMessage68; +use super::eth69::status::StatusMessage69; +use crate::rlpx::message::RLPxMessage; +use crate::rlpx::utils::snappy_decompress; +use crate::rlpx::{error::RLPxError, p2p::Capability}; use bytes::BufMut; -use ethrex_common::{ - types::{BlockHash, ForkId}, - U256, -}; -use ethrex_rlp::{ - error::{RLPDecodeError, RLPEncodeError}, - structs::{Decoder, Encoder}, -}; +use ethrex_common::types::{BlockHash, ForkId}; +use ethrex_common::U256; +use ethrex_rlp::error::{RLPDecodeError, RLPEncodeError}; +use ethrex_rlp::structs::Decoder; +use ethrex_storage::Store; #[derive(Debug)] -pub(crate) struct StatusMessage { - pub(crate) eth_version: u32, - pub(crate) network_id: u64, - pub(crate) total_difficulty: U256, - pub(crate) block_hash: BlockHash, - pub(crate) genesis: BlockHash, - pub(crate) fork_id: ForkId, +pub enum StatusMessage { + StatusMessage68(StatusMessage68), + StatusMessage69(StatusMessage69), } impl RLPxMessage for StatusMessage { const CODE: u8 = 0x00; fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { - let mut encoded_data = vec![]; - Encoder::new(&mut encoded_data) - .encode_field(&self.eth_version) - .encode_field(&self.network_id) - .encode_field(&self.total_difficulty) - .encode_field(&self.block_hash) - .encode_field(&self.genesis) - .encode_field(&self.fork_id) - .finish(); - - let msg_data = snappy_compress(encoded_data)?; - buf.put_slice(&msg_data); - Ok(()) + match self { + StatusMessage::StatusMessage68(msg) => msg.encode(buf), + StatusMessage::StatusMessage69(msg) => msg.encode(buf), + } } fn decode(msg_data: &[u8]) -> Result { let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; - let (eth_version, decoder): (u32, _) = decoder.decode_field("protocolVersion")?; + let (eth_version, _): (u32, _) = decoder.decode_field("protocolVersion")?; + + match eth_version { + 68 => Ok(StatusMessage::StatusMessage68(StatusMessage68::decode( + msg_data, + )?)), + 69 => Ok(StatusMessage::StatusMessage69(StatusMessage69::decode( + msg_data, + )?)), + _ => Err(RLPDecodeError::IncompatibleProtocol), + } + } +} - assert_eq!(eth_version, 68, "only eth version 68 is supported"); +impl StatusMessage { + pub async fn new(storage: &Store, eth: &Capability) -> Result { + let chain_config = storage.get_chain_config()?; + let total_difficulty = + U256::from(chain_config.terminal_total_difficulty.unwrap_or_default()); + let network_id = chain_config.chain_id; - let (network_id, decoder): (u64, _) = decoder.decode_field("networkId")?; - let (total_difficulty, decoder): (U256, _) = decoder.decode_field("totalDifficulty")?; - let (block_hash, decoder): (BlockHash, _) = decoder.decode_field("blockHash")?; - let (genesis, decoder): (BlockHash, _) = decoder.decode_field("genesis")?; - let (fork_id, decoder): (ForkId, _) = decoder.decode_field("forkId")?; - // Implementations must ignore any additional list elements - let _padding = decoder.finish_unchecked(); + // These blocks must always be available + let genesis_header = storage + .get_block_header(0)? + .ok_or(RLPxError::NotFound("Genesis Block".to_string()))?; + let lastest_block = storage.get_latest_block_number().await?; + let block_header = storage + .get_block_header(lastest_block)? + .ok_or(RLPxError::NotFound(format!("Block {lastest_block}")))?; + + let genesis = genesis_header.hash(); + let lastest_block_hash = block_header.hash(); + let fork_id = ForkId::new( + chain_config, + genesis_header, + block_header.timestamp, + lastest_block, + ); + + match eth.version { + 68 => Ok(StatusMessage::StatusMessage68(StatusMessage68 { + eth_version: eth.version, + network_id, + total_difficulty, + block_hash: lastest_block_hash, + genesis, + fork_id, + })), + 69 => Ok(StatusMessage::StatusMessage69(StatusMessage69 { + eth_version: eth.version, + network_id, + genesis, + fork_id, + earliest_block: 0, + lastest_block, + lastest_block_hash, + })), + _ => Err(RLPxError::IncompatibleProtocol), + } + } + + pub fn get_network_id(&self) -> u64 { + match self { + StatusMessage::StatusMessage68(msg) => msg.network_id, + StatusMessage::StatusMessage69(msg) => msg.network_id, + } + } + + pub fn get_eth_version(&self) -> u8 { + match self { + StatusMessage::StatusMessage68(msg) => msg.eth_version, + StatusMessage::StatusMessage69(msg) => msg.eth_version, + } + } + + pub fn get_fork_id(&self) -> ForkId { + match self { + StatusMessage::StatusMessage68(msg) => msg.fork_id.clone(), + StatusMessage::StatusMessage69(msg) => msg.fork_id.clone(), + } + } - Ok(Self { - eth_version, - network_id, - total_difficulty, - block_hash, - genesis, - fork_id, - }) + pub fn get_genesis(&self) -> BlockHash { + match self { + StatusMessage::StatusMessage68(msg) => msg.genesis, + StatusMessage::StatusMessage69(msg) => msg.genesis, + } } } diff --git a/crates/networking/p2p/rlpx/eth/update.rs b/crates/networking/p2p/rlpx/eth/update.rs new file mode 100644 index 00000000000..91b1b9dce1b --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/update.rs @@ -0,0 +1,67 @@ +use crate::rlpx::error::RLPxError; +use crate::rlpx::{ + message::RLPxMessage, + utils::{snappy_compress, snappy_decompress}, +}; +use bytes::BufMut; +use ethrex_common::types::BlockHash; +use ethrex_rlp::{ + error::{RLPDecodeError, RLPEncodeError}, + structs::{Decoder, Encoder}, +}; +use ethrex_storage::Store; + +#[derive(Debug)] +pub(crate) struct BlockRangeUpdate { + pub(crate) earliest_block: u64, + pub(crate) lastest_block: u64, + pub(crate) lastest_block_hash: BlockHash, +} + +impl BlockRangeUpdate { + pub async fn new(storage: &Store) -> Result { + let lastest_block = storage.get_latest_block_number().await?; + let block_header = storage + .get_block_header(lastest_block)? + .ok_or(RLPxError::NotFound(format!("Block {lastest_block}")))?; + let lastest_block_hash = block_header.hash(); + + Ok(Self { + earliest_block: 0, + lastest_block, + lastest_block_hash, + }) + } +} + +impl RLPxMessage for BlockRangeUpdate { + const CODE: u8 = 0x11; + fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { + let mut encoded_data = vec![]; + Encoder::new(&mut encoded_data) + .encode_field(&self.earliest_block) + .encode_field(&self.lastest_block) + .encode_field(&self.lastest_block_hash) + .finish(); + + let msg_data = snappy_compress(encoded_data)?; + buf.put_slice(&msg_data); + Ok(()) + } + + fn decode(msg_data: &[u8]) -> Result { + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (earliest_block, decoder): (u64, _) = decoder.decode_field("earliestBlock")?; + let (lastest_block, decoder): (u64, _) = decoder.decode_field("lastestBlock")?; + let (lastest_block_hash, decoder): (BlockHash, _) = decoder.decode_field("latestHash")?; + // Implementations must ignore any additional list elements + let _padding = decoder.finish_unchecked(); + + Ok(Self { + earliest_block, + lastest_block, + lastest_block_hash, + }) + } +} diff --git a/crates/networking/p2p/rlpx/message.rs b/crates/networking/p2p/rlpx/message.rs index cd6a2e53c91..b14fd79bb80 100644 --- a/crates/networking/p2p/rlpx/message.rs +++ b/crates/networking/p2p/rlpx/message.rs @@ -8,6 +8,7 @@ use super::eth::status::StatusMessage; use super::eth::transactions::{ GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions, Transactions, }; +use super::eth::update::BlockRangeUpdate; use super::p2p::{DisconnectMessage, HelloMessage, PingMessage, PongMessage}; use super::snap::{ AccountRange, ByteCodes, GetAccountRange, GetByteCodes, GetStorageRanges, GetTrieNodes, @@ -45,6 +46,7 @@ pub(crate) enum Message { PooledTransactions(PooledTransactions), GetReceipts(GetReceipts), Receipts(Receipts), + BlockRangeUpdate(BlockRangeUpdate), // snap capability // https://github.com/ethereum/devp2p/blob/master/caps/snap.md GetAccountRange(GetAccountRange), @@ -81,7 +83,7 @@ impl Message { Message::PooledTransactions(_) => ETH_CAPABILITY_OFFSET + PooledTransactions::CODE, Message::GetReceipts(_) => ETH_CAPABILITY_OFFSET + GetReceipts::CODE, Message::Receipts(_) => ETH_CAPABILITY_OFFSET + Receipts::CODE, - + Message::BlockRangeUpdate(_) => ETH_CAPABILITY_OFFSET + BlockRangeUpdate::CODE, // snap capability Message::GetAccountRange(_) => SNAP_CAPABILITY_OFFSET + GetAccountRange::CODE, Message::AccountRange(_) => SNAP_CAPABILITY_OFFSET + AccountRange::CODE, @@ -126,6 +128,9 @@ impl Message { )), GetReceipts::CODE => Ok(Message::GetReceipts(GetReceipts::decode(data)?)), Receipts::CODE => Ok(Message::Receipts(Receipts::decode(data)?)), + BlockRangeUpdate::CODE => { + Ok(Message::BlockRangeUpdate(BlockRangeUpdate::decode(data)?)) + } _ => Err(RLPDecodeError::MalformedData), } } else { @@ -166,6 +171,7 @@ impl Message { Message::PooledTransactions(msg) => msg.encode(buf), Message::GetReceipts(msg) => msg.encode(buf), Message::Receipts(msg) => msg.encode(buf), + Message::BlockRangeUpdate(msg) => msg.encode(buf), Message::GetAccountRange(msg) => msg.encode(buf), Message::AccountRange(msg) => msg.encode(buf), Message::GetStorageRanges(msg) => msg.encode(buf), @@ -196,6 +202,7 @@ impl Display for Message { Message::GetBlockBodies(_) => "eth:GetBlockBodies".fmt(f), Message::GetReceipts(_) => "eth:GetReceipts".fmt(f), Message::Receipts(_) => "eth:Receipts".fmt(f), + Message::BlockRangeUpdate(_) => "eth:BlockRangeUpdate".fmt(f), Message::GetAccountRange(_) => "snap:GetAccountRange".fmt(f), Message::AccountRange(_) => "snap:AccountRange".fmt(f), Message::GetStorageRanges(_) => "snap:GetStorageRanges".fmt(f), diff --git a/crates/networking/rpc/Cargo.toml b/crates/networking/rpc/Cargo.toml index c9b88bf6ce1..c00c7ca40ee 100644 --- a/crates/networking/rpc/Cargo.toml +++ b/crates/networking/rpc/Cargo.toml @@ -21,6 +21,7 @@ ethrex-blockchain.workspace = true ethrex-p2p.workspace = true ethrex-rlp.workspace = true ethrex-storage-rollup = { workspace = true, optional = true } +ethereum-types.workspace = true hex.workspace = true axum-extra = { version = "0.10.0", features = ["typed-header"] } jsonwebtoken.workspace = true diff --git a/crates/networking/rpc/eth/block.rs b/crates/networking/rpc/eth/block.rs index f33b0c521a9..3ed0c355da5 100644 --- a/crates/networking/rpc/eth/block.rs +++ b/crates/networking/rpc/eth/block.rs @@ -278,7 +278,7 @@ impl RpcHandler for GetRawReceipts { let receipts: Vec = get_all_block_receipts(block_number, header, body, storage) .await? .iter() - .map(|receipt| format!("0x{}", hex::encode(receipt.encode_inner()))) + .map(|receipt| format!("0x{}", hex::encode(receipt.encode_inner_with_bloom()))) .collect(); serde_json::to_value(receipts).map_err(|error| RpcErr::Internal(error.to_string())) } diff --git a/crates/networking/rpc/types/receipt.rs b/crates/networking/rpc/types/receipt.rs index a5d37a8f598..dd9a146d6b3 100644 --- a/crates/networking/rpc/types/receipt.rs +++ b/crates/networking/rpc/types/receipt.rs @@ -1,7 +1,10 @@ use ethrex_common::{ constants::GAS_PER_BLOB, serde_utils, - types::{BlockHash, BlockHeader, BlockNumber, Log, Receipt, Transaction, TxKind, TxType}, + types::{ + bloom_from_logs, BlockHash, BlockHeader, BlockNumber, Log, Receipt, Transaction, TxKind, + TxType, + }, Address, Bloom, Bytes, H256, }; use ethrex_vm::create_contract_address; @@ -59,7 +62,7 @@ impl From for RpcReceiptInfo { tx_type: receipt.tx_type, status: receipt.succeeded, cumulative_gas_used: receipt.cumulative_gas_used, - logs_bloom: receipt.bloom, + logs_bloom: bloom_from_logs(&receipt.logs), } } } @@ -203,7 +206,7 @@ mod tests { use super::*; use ethrex_common::{ types::{Log, TxType}, - Bloom, Bytes, + Bytes, }; use hex_literal::hex; @@ -214,7 +217,6 @@ mod tests { tx_type: TxType::EIP4844, succeeded: true, cumulative_gas_used: 147, - bloom: Bloom::zero(), logs: vec![Log { address: Address::zero(), topics: vec![], @@ -240,7 +242,7 @@ mod tests { }, 0, ); - let expected = r#"{"type":"0x3","status":"0x1","cumulativeGasUsed":"0x93","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","logs":[{"address":"0x0000000000000000000000000000000000000000","topics":[],"data":"0x73747261776265727279","logIndex":"0x0","removed":false,"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x1","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x3"}],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x1","from":"0x0000000000000000000000000000000000000000","to":"0x7435ed30a8b4aeb0877cef0c6e8cffe834eb865f","contractAddress":null,"gasUsed":"0x93","effectiveGasPrice":"0x9d","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x3"}"#; + let expected = r#"{"type":"0x3","status":"0x1","cumulativeGasUsed":"0x93","logsBloom":"0x00000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","logs":[{"address":"0x0000000000000000000000000000000000000000","topics":[],"data":"0x73747261776265727279","logIndex":"0x0","removed":false,"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x1","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x3"}],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x1","from":"0x0000000000000000000000000000000000000000","to":"0x7435ed30a8b4aeb0877cef0c6e8cffe834eb865f","contractAddress":null,"gasUsed":"0x93","effectiveGasPrice":"0x9d","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x3"}"#; assert_eq!(serde_json::to_string(&receipt).unwrap(), expected); } } diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 48d93a18ef9..0400d36b45a 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -1408,7 +1408,6 @@ mod tests { tx_type: TxType::EIP2930, succeeded: true, cumulative_gas_used: 1747, - bloom: Bloom::random(), logs: vec![], }; let block_number = 6; diff --git a/crates/storage/store_db/libmdbx.rs b/crates/storage/store_db/libmdbx.rs index 83e3e3a79c0..568f1db8f61 100644 --- a/crates/storage/store_db/libmdbx.rs +++ b/crates/storage/store_db/libmdbx.rs @@ -1314,7 +1314,7 @@ mod tests { use bytes::Bytes; use ethrex_common::{ types::{BlockHash, Index, Log, TxType}, - Address, Bloom, H256, + Address, H256, }; #[test] @@ -1656,7 +1656,6 @@ mod tests { tx_type: TxType::EIP7702, succeeded: true, cumulative_gas_used: u64::MAX, - bloom: Bloom::default(), logs, } }