diff --git a/Cargo.lock b/Cargo.lock index 3b9ebaddbda..8dd77d07ec8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3260,6 +3260,7 @@ dependencies = [ "bytes", "concat-kdf", "ctr", + "ethereum-types", "ethrex-blockchain", "ethrex-common", "ethrex-rlp", @@ -3366,6 +3367,7 @@ dependencies = [ "bytes", "cfg-if", "envy", + "ethereum-types", "ethrex-blockchain", "ethrex-common", "ethrex-p2p", diff --git a/crates/common/rlp/decode.rs b/crates/common/rlp/decode.rs index 348999ff414..7ea69b25040 100644 --- a/crates/common/rlp/decode.rs +++ b/crates/common/rlp/decode.rs @@ -513,7 +513,7 @@ pub fn decode_bytes(data: &[u8]) -> Result<(&[u8], &[u8]), RLPDecodeError> { /// Pads a slice of bytes with zeros on the left to make it a fixed size slice. /// The size of the data must be less than or equal to the size of the output array. #[inline] -pub(crate) fn static_left_pad(data: &[u8]) -> Result<[u8; N], RLPDecodeError> { +pub fn static_left_pad(data: &[u8]) -> Result<[u8; N], RLPDecodeError> { let mut result = [0; N]; if data.is_empty() { diff --git a/crates/common/rlp/error.rs b/crates/common/rlp/error.rs index a2bc3349cda..91d4961db2e 100644 --- a/crates/common/rlp/error.rs +++ b/crates/common/rlp/error.rs @@ -15,6 +15,8 @@ pub enum RLPDecodeError { UnexpectedString, #[error("InvalidCompression")] InvalidCompression(#[from] snap::Error), + #[error("IncompatibleProtocol")] + IncompatibleProtocol, #[error("{0}")] Custom(String), } diff --git a/crates/networking/p2p/Cargo.toml b/crates/networking/p2p/Cargo.toml index 3f8f8e75cca..015d43dfe85 100644 --- a/crates/networking/p2p/Cargo.toml +++ b/crates/networking/p2p/Cargo.toml @@ -11,7 +11,7 @@ ethrex-blockchain.workspace = true ethrex-rlp.workspace = true ethrex-storage.workspace = true ethrex-trie.workspace = true - +ethereum-types.workspace = true tracing.workspace = true tokio.workspace = true tokio-util.workspace = true diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 0ec0771b3af..861a15de662 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -17,7 +17,7 @@ use crate::{ blocks::{ BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, BLOCK_HEADER_LIMIT, }, - receipts::{GetReceipts, Receipts}, + receipts::GetReceipts, }, message::Message as RLPxMessage, p2p::{Capability, SUPPORTED_ETH_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES}, @@ -207,10 +207,11 @@ impl PeerHandler { if let Some(receipts) = tokio::time::timeout(PEER_REPLY_TIMEOUT, async move { loop { match receiver.recv().await { - Some(RLPxMessage::Receipts(Receipts { id, receipts })) - if id == request_id => - { - return Some(receipts) + Some(RLPxMessage::Receipts(receipts)) => { + if receipts.get_id() == request_id { + return Some(receipts.get_receipts()); + } + return None; } // Ignore replies that don't match the expected id (such as late responses) Some(_) => continue, diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index da6e5cda2ac..5bc1aa445f5 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -6,6 +6,7 @@ use crate::{ backend, blocks::{BlockBodies, BlockHeaders}, receipts::{GetReceipts, Receipts}, + status::StatusMessage, transactions::{GetPooledTransactions, Transactions}, }, frame::RLPxCodec, @@ -463,7 +464,7 @@ impl RLPxConnection { } Message::Status(msg_data) => { if let Some(eth) = &self.negotiated_eth_capability { - backend::validate_status(msg_data, &self.storage, eth.version).await? + backend::validate_status(msg_data, &self.storage, eth).await? }; } Message::GetAccountRange(req) => { @@ -499,12 +500,14 @@ impl RLPxConnection { self.send(Message::BlockBodies(response)).await?; } Message::GetReceipts(GetReceipts { id, block_hashes }) if peer_supports_eth => { - let mut receipts = Vec::new(); - for hash in block_hashes.iter() { - receipts.push(self.storage.get_receipts_for_block(hash)?); + if let Some(eth) = &self.negotiated_eth_capability { + let mut receipts = Vec::new(); + for hash in block_hashes.iter() { + receipts.push(self.storage.get_receipts_for_block(hash)?); + } + let response = Receipts::new(id, receipts, eth)?; + self.send(Message::Receipts(response)).await?; } - let response = Receipts { id, receipts }; - self.send(Message::Receipts(response)).await?; } Message::NewPooledTransactionHashes(new_pooled_transaction_hashes) if peer_supports_eth => @@ -579,7 +582,7 @@ impl RLPxConnection { async fn init_peer_conn(&mut self) -> Result<(), RLPxError> { // Sending eth Status if peer supports it if let Some(eth) = self.negotiated_eth_capability.clone() { - let status = backend::get_status(&self.storage, eth.version).await?; + let status = StatusMessage::new(&self.storage, ð).await?; log_peer_debug(&self.node, "Sending status"); self.send(Message::Status(status)).await?; // The next immediate message in the ETH protocol is the @@ -592,7 +595,7 @@ impl RLPxConnection { match msg { Message::Status(msg_data) => { log_peer_debug(&self.node, "Received Status"); - backend::validate_status(msg_data, &self.storage, eth.version).await? + backend::validate_status(msg_data, &self.storage, ð).await? } Message::Disconnect(disconnect) => { return Err(RLPxError::HandshakeError(format!( diff --git a/crates/networking/p2p/rlpx/error.rs b/crates/networking/p2p/rlpx/error.rs index 363e3a02e9c..6d482289b0c 100644 --- a/crates/networking/p2p/rlpx/error.rs +++ b/crates/networking/p2p/rlpx/error.rs @@ -51,6 +51,8 @@ pub(crate) enum RLPxError { IoError(#[from] std::io::Error), #[error("Failed to decode message due to invalid frame: {0}")] InvalidMessageFrame(String), + #[error("IncompatibleProtocol")] + IncompatibleProtocol, } // tokio::sync::mpsc::error::SendError is too large to be part of the RLPxError enum directly diff --git a/crates/networking/p2p/rlpx/eth/backend.rs b/crates/networking/p2p/rlpx/eth/backend.rs index 191e4a1bc75..0fdad0dd1e7 100644 --- a/crates/networking/p2p/rlpx/eth/backend.rs +++ b/crates/networking/p2p/rlpx/eth/backend.rs @@ -1,46 +1,14 @@ -use ethrex_common::{types::ForkId, U256}; +use ethrex_common::types::ForkId; use ethrex_storage::Store; -use crate::rlpx::error::RLPxError; +use crate::rlpx::{error::RLPxError, p2p::Capability}; use super::status::StatusMessage; -pub async fn get_status(storage: &Store, eth_version: u8) -> Result { - let chain_config = storage.get_chain_config()?; - let total_difficulty = U256::from(chain_config.terminal_total_difficulty.unwrap_or_default()); - let network_id = chain_config.chain_id; - - // These blocks must always be available - let genesis_header = storage - .get_block_header(0)? - .ok_or(RLPxError::NotFound("Genesis Block".to_string()))?; - let block_number = storage.get_latest_block_number().await?; - let block_header = storage - .get_block_header(block_number)? - .ok_or(RLPxError::NotFound(format!("Block {block_number}")))?; - - let genesis = genesis_header.compute_block_hash(); - let block_hash = block_header.compute_block_hash(); - let fork_id = ForkId::new( - chain_config, - genesis_header, - block_header.timestamp, - block_number, - ); - Ok(StatusMessage { - eth_version: eth_version as u32, - network_id, - total_difficulty, - block_hash, - genesis, - fork_id, - }) -} - pub async fn validate_status( msg_data: StatusMessage, storage: &Store, - eth_version: u8, + eth_capability: &Capability, ) -> Result<(), RLPxError> { let chain_config = storage.get_chain_config()?; @@ -61,26 +29,26 @@ pub async fn validate_status( ); //Check networkID - if msg_data.network_id != chain_config.chain_id { + if msg_data.get_network_id() != chain_config.chain_id { return Err(RLPxError::HandshakeError( "Network Id does not match".to_string(), )); } //Check Protocol Version - if msg_data.eth_version as u8 != eth_version { + if msg_data.get_eth_version() != eth_capability.version { return Err(RLPxError::HandshakeError( "Eth protocol version does not match".to_string(), )); } //Check Genesis - if msg_data.genesis != genesis_hash { + if msg_data.get_genesis() != genesis_hash { return Err(RLPxError::HandshakeError( "Genesis does not match".to_string(), )); } // Check ForkID if !fork_id.is_valid( - msg_data.fork_id, + msg_data.get_fork_id(), latest_block_number, latest_block_header.timestamp, chain_config, @@ -95,11 +63,14 @@ pub async fn validate_status( #[cfg(test)] mod tests { use super::validate_status; + use crate::rlpx::eth::eth68::status::StatusMessage68; use crate::rlpx::eth::status::StatusMessage; + use crate::rlpx::p2p::Capability; use ethrex_common::{ types::{ForkId, Genesis}, H256, U256, }; + use ethrex_storage::{EngineType, Store}; use std::{fs::File, io::BufReader}; @@ -125,16 +96,16 @@ mod tests { let genesis_hash = genesis_header.compute_block_hash(); let fork_id = ForkId::new(config, genesis_header, 2707305664, 123); - let eth_version = 68; - let message = StatusMessage { - eth_version: eth_version as u32, + let eth = Capability::eth(68); + let message = StatusMessage::StatusMessage68(StatusMessage68 { + eth_version: eth.version, network_id: 3503995874084926, total_difficulty, block_hash: H256::random(), genesis: genesis_hash, fork_id, - }; - let result = validate_status(message, &storage, eth_version).await; + }); + let result = validate_status(message, &storage, ð).await; assert!(result.is_ok()); } } diff --git a/crates/networking/p2p/rlpx/eth/eth68/mod.rs b/crates/networking/p2p/rlpx/eth/eth68/mod.rs new file mode 100644 index 00000000000..9ce64e6e8f5 --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/eth68/mod.rs @@ -0,0 +1,2 @@ +pub mod receipt; +pub mod status; diff --git a/crates/networking/p2p/rlpx/eth/eth68/receipt.rs b/crates/networking/p2p/rlpx/eth/eth68/receipt.rs new file mode 100644 index 00000000000..cffdb46bb90 --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/eth68/receipt.rs @@ -0,0 +1,49 @@ +use crate::rlpx::{ + message::RLPxMessage, + utils::{snappy_compress, snappy_decompress}, +}; +use bytes::BufMut; +use ethrex_common::types::Receipt; +use ethrex_rlp::{ + error::{RLPDecodeError, RLPEncodeError}, + structs::{Decoder, Encoder}, +}; + +#[derive(Debug)] +pub(crate) struct Receipts68 { + // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response + // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages + pub id: u64, + pub receipts: Vec>, +} + +impl Receipts68 { + pub fn new(id: u64, receipts: Vec>) -> Self { + Self { receipts, id } + } +} + +impl RLPxMessage for Receipts68 { + const CODE: u8 = 0x1F; + + fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { + let mut encoded_data = vec![]; + Encoder::new(&mut encoded_data) + .encode_field(&self.id) + .encode_field(&self.receipts) + .finish(); + + let msg_data = snappy_compress(encoded_data)?; + buf.put_slice(&msg_data); + Ok(()) + } + + fn decode(msg_data: &[u8]) -> Result { + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; + let (receipts, _): (Vec>, _) = decoder.decode_field("receipts")?; + + Ok(Self::new(id, receipts)) + } +} diff --git a/crates/networking/p2p/rlpx/eth/eth68/status.rs b/crates/networking/p2p/rlpx/eth/eth68/status.rs new file mode 100644 index 00000000000..326679bac49 --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/eth68/status.rs @@ -0,0 +1,67 @@ +use crate::rlpx::{ + message::RLPxMessage, + utils::{snappy_compress, snappy_decompress}, +}; +use bytes::BufMut; +use ethrex_common::{ + types::{BlockHash, ForkId}, + U256, +}; +use ethrex_rlp::{ + error::{RLPDecodeError, RLPEncodeError}, + structs::{Decoder, Encoder}, +}; + +#[derive(Debug)] +pub(crate) struct StatusMessage68 { + pub(crate) eth_version: u8, + pub(crate) network_id: u64, + pub(crate) total_difficulty: U256, + pub(crate) block_hash: BlockHash, + pub(crate) genesis: BlockHash, + pub(crate) fork_id: ForkId, +} + +impl RLPxMessage for StatusMessage68 { + const CODE: u8 = 0x10; + fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { + let mut encoded_data = vec![]; + Encoder::new(&mut encoded_data) + .encode_field(&self.eth_version) + .encode_field(&self.network_id) + .encode_field(&self.total_difficulty) + .encode_field(&self.block_hash) + .encode_field(&self.genesis) + .encode_field(&self.fork_id) + .finish(); + + let msg_data = snappy_compress(encoded_data)?; + buf.put_slice(&msg_data); + Ok(()) + } + + fn decode(msg_data: &[u8]) -> Result { + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (eth_version, decoder): (u32, _) = decoder.decode_field("protocolVersion")?; + + assert_eq!(eth_version, 68, "only eth version 68 is supported"); + + let (network_id, decoder): (u64, _) = decoder.decode_field("networkId")?; + let (total_difficulty, decoder): (U256, _) = decoder.decode_field("totalDifficulty")?; + let (block_hash, decoder): (BlockHash, _) = decoder.decode_field("blockHash")?; + let (genesis, decoder): (BlockHash, _) = decoder.decode_field("genesis")?; + let (fork_id, decoder): (ForkId, _) = decoder.decode_field("forkId")?; + // Implementations must ignore any additional list elements + let _padding = decoder.finish_unchecked(); + + Ok(Self { + eth_version: eth_version as u8, + network_id, + total_difficulty, + block_hash, + genesis, + fork_id, + }) + } +} diff --git a/crates/networking/p2p/rlpx/eth/eth69/mod.rs b/crates/networking/p2p/rlpx/eth/eth69/mod.rs new file mode 100644 index 00000000000..822c7293f86 --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/eth69/mod.rs @@ -0,0 +1 @@ +pub mod status; diff --git a/crates/networking/p2p/rlpx/eth/eth69/receipt.rs b/crates/networking/p2p/rlpx/eth/eth69/receipt.rs new file mode 100644 index 00000000000..f32e271f891 --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/eth69/receipt.rs @@ -0,0 +1,49 @@ +use crate::rlpx::{ + message::RLPxMessage, + utils::{snappy_compress, snappy_decompress}, +}; +use bytes::BufMut; +use ethrex_common::types::Receipt; +use ethrex_rlp::{ + error::{RLPDecodeError, RLPEncodeError}, + structs::{Decoder, Encoder}, +}; + +#[derive(Debug)] +pub(crate) struct Receipts69 { + // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response + // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages + pub id: u64, + pub receipts: Vec>, +} + +impl Receipts69 { + pub fn new(id: u64, receipts: Vec>) -> Self { + Self { receipts, id } + } +} + +impl RLPxMessage for Receipts69 { + const CODE: u8 = 0x1F; + + fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { + let mut encoded_data = vec![]; + Encoder::new(&mut encoded_data) + .encode_field(&self.id) + .encode_field(&self.receipts) + .finish(); + + let msg_data = snappy_compress(encoded_data)?; + buf.put_slice(&msg_data); + Ok(()) + } + + fn decode(msg_data: &[u8]) -> Result { + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; + let (receipts, _): (Vec>, _) = decoder.decode_field("receipts")?; + + Ok(Self::new(id, receipts)) + } +} diff --git a/crates/networking/p2p/rlpx/eth/eth69/status.rs b/crates/networking/p2p/rlpx/eth/eth69/status.rs new file mode 100644 index 00000000000..bb52a228ad6 --- /dev/null +++ b/crates/networking/p2p/rlpx/eth/eth69/status.rs @@ -0,0 +1,67 @@ +use crate::rlpx::{ + message::RLPxMessage, + utils::{snappy_compress, snappy_decompress}, +}; +use bytes::BufMut; +use ethrex_common::{ + types::{BlockHash, ForkId}, + U256, +}; +use ethrex_rlp::{ + error::{RLPDecodeError, RLPEncodeError}, + structs::{Decoder, Encoder}, +}; + +#[derive(Debug)] +pub(crate) struct StatusMessage69 { + pub(crate) eth_version: u8, + pub(crate) network_id: u64, + pub(crate) total_difficulty: U256, + pub(crate) block_hash: BlockHash, + pub(crate) genesis: BlockHash, + pub(crate) fork_id: ForkId, +} + +impl RLPxMessage for StatusMessage69 { + const CODE: u8 = 0x10; + fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { + let mut encoded_data = vec![]; + Encoder::new(&mut encoded_data) + .encode_field(&self.eth_version) + .encode_field(&self.network_id) + .encode_field(&self.total_difficulty) + .encode_field(&self.block_hash) + .encode_field(&self.genesis) + .encode_field(&self.fork_id) + .finish(); + + let msg_data = snappy_compress(encoded_data)?; + buf.put_slice(&msg_data); + Ok(()) + } + + fn decode(msg_data: &[u8]) -> Result { + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (eth_version, decoder): (u32, _) = decoder.decode_field("protocolVersion")?; + + assert_eq!(eth_version, 69, "only eth version 69 is supported"); + + let (network_id, decoder): (u64, _) = decoder.decode_field("networkId")?; + let (total_difficulty, decoder): (U256, _) = decoder.decode_field("totalDifficulty")?; + let (block_hash, decoder): (BlockHash, _) = decoder.decode_field("blockHash")?; + let (genesis, decoder): (BlockHash, _) = decoder.decode_field("genesis")?; + let (fork_id, decoder): (ForkId, _) = decoder.decode_field("forkId")?; + // Implementations must ignore any additional list elements + let _padding = decoder.finish_unchecked(); + + Ok(Self { + eth_version: eth_version as u8, + network_id, + total_difficulty, + block_hash, + genesis, + fork_id, + }) + } +} diff --git a/crates/networking/p2p/rlpx/eth.rs b/crates/networking/p2p/rlpx/eth/mod.rs similarity index 84% rename from crates/networking/p2p/rlpx/eth.rs rename to crates/networking/p2p/rlpx/eth/mod.rs index a03ff256caa..08a0d34784f 100644 --- a/crates/networking/p2p/rlpx/eth.rs +++ b/crates/networking/p2p/rlpx/eth/mod.rs @@ -1,5 +1,7 @@ pub(crate) mod backend; pub(crate) mod blocks; +mod eth68; +mod eth69; pub(crate) mod receipts; pub(crate) mod status; pub(crate) mod transactions; diff --git a/crates/networking/p2p/rlpx/eth/receipts.rs b/crates/networking/p2p/rlpx/eth/receipts.rs index 31b23c100cb..9589ed7039d 100644 --- a/crates/networking/p2p/rlpx/eth/receipts.rs +++ b/crates/networking/p2p/rlpx/eth/receipts.rs @@ -1,10 +1,16 @@ +use super::eth68::receipt::Receipts68; use crate::rlpx::{ + error::RLPxError, message::RLPxMessage, + p2p::Capability, utils::{snappy_compress, snappy_decompress}, }; +use ethereum_types::Bloom; + use bytes::BufMut; use ethrex_common::types::{BlockHash, Receipt}; use ethrex_rlp::{ + decode::static_left_pad, error::{RLPDecodeError, RLPEncodeError}, structs::{Decoder, Encoder}, }; @@ -50,16 +56,32 @@ impl RLPxMessage for GetReceipts { // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#receipts-0x10 #[derive(Debug)] -pub(crate) struct Receipts { - // id is a u64 chosen by the requesting peer, the responding peer must mirror the value for the response - // https://github.com/ethereum/devp2p/blob/master/caps/eth.md#protocol-messages - pub id: u64, - pub receipts: Vec>, +pub(crate) enum Receipts { + Receipts68(Receipts68), + // Receipts69(Receipts69), } impl Receipts { - pub fn new(id: u64, receipts: Vec>) -> Self { - Self { receipts, id } + pub fn new(id: u64, receipts: Vec>, eth: &Capability) -> Result { + match eth.version { + 68 => Ok(Receipts::Receipts68(Receipts68::new(id, receipts))), + //69 => Ok(Receipts::Receipts69(Receipts69::new(id, receipts))), + _ => Err(RLPxError::IncompatibleProtocol), + } + } + + pub fn get_receipts(&self) -> Vec> { + match self { + Receipts::Receipts68(msg) => msg.receipts.clone(), + //Receipts::Receipts69(msg) => msg.receipts.clone(), + } + } + + pub fn get_id(&self) -> u64 { + match self { + Receipts::Receipts68(msg) => msg.id, + //Receipts::Receipts69(msg) => msg.id, + } } } @@ -67,35 +89,96 @@ impl RLPxMessage for Receipts { const CODE: u8 = 0x20; fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { - let mut encoded_data = vec![]; - Encoder::new(&mut encoded_data) - .encode_field(&self.id) - .encode_field(&self.receipts) - .finish(); - - let msg_data = snappy_compress(encoded_data)?; - buf.put_slice(&msg_data); - Ok(()) + match self { + Receipts::Receipts68(msg) => msg.encode(buf), + //Receipts::Receipts69(msg) => msg.encode(buf), + } } fn decode(msg_data: &[u8]) -> Result { - let decompressed_data = snappy_decompress(msg_data)?; - let decoder = Decoder::new(&decompressed_data)?; - let (id, decoder): (u64, _) = decoder.decode_field("request-id")?; - let (receipts, _): (Vec>, _) = decoder.decode_field("receipts")?; + if has_bloom(msg_data)? { + Ok(Receipts::Receipts68(Receipts68::decode(msg_data)?)) + } else { + // this should be eth/69 when implemented + //Ok(Receipts::Receipts69(Receipts69::decode(msg_data)?)) + Err(RLPDecodeError::IncompatibleProtocol) + } + } +} + +// We should receive something like this: +// [request-id, [[r1, r2, r3,... ]]] +// in this fn, we're checking if r1 has a bloom field inside +fn has_bloom(msg_data: &[u8]) -> Result { + let decompressed_data = snappy_decompress(msg_data)?; + let decoder = Decoder::new(&decompressed_data)?; + let (_, decoder): (u64, _) = decoder.decode_field("request-id")?; + + //a list should be received + let (data, _) = decoder.get_encoded_item()?; + let decoder = Decoder::new(&data)?; + //check if the list is empty + if decoder.is_done() { + return Ok(true); + } + + // inner list + let (data, _) = decoder.get_encoded_item()?; + let decoder = Decoder::new(&data)?; + if decoder.is_done() { + return Ok(true); + } - Ok(Self::new(id, receipts)) + // we only need one element + // all elements should be the same + let (data, _) = decoder.get_encoded_item()?; + let data = match data[0] { + 0x80..=0xB7 => { + let length = (data[0] - 0x80) as usize; + if data.len() < length + 1 { + return Err(RLPDecodeError::InvalidLength); + } + &data[1..length + 1] + } + 0xB8..=0xBF => { + let length_of_length = (data[0] - 0xB7) as usize; + if data.len() < length_of_length + 1 { + return Err(RLPDecodeError::InvalidLength); + } + let length_bytes = &data[1..length_of_length + 1]; + let length = usize::from_be_bytes(static_left_pad(length_bytes)?); + if data.len() < length_of_length + length + 1 { + return Err(RLPDecodeError::InvalidLength); + } + &data[length_of_length + 1..length_of_length + length + 1] + } + _ => return Err(RLPDecodeError::InvalidLength), + }; + + let data = match data[0] { + tx_type if tx_type < 0x7f => &data[1..], + _ => &data[0..], + }; + let decoder = Decoder::new(data)?; + let (_, decoder): (bool, _) = decoder.decode_field("succeeded")?; + let (_, decoder): (u64, _) = decoder.decode_field("cumulative_gas_used")?; + // try to decode the bloom field + match decoder.decode_field::("bloom") { + Ok(_) => Ok(true), + Err(_) => Ok(false), } } #[cfg(test)] mod tests { - use ethrex_common::types::{BlockHash, Receipt}; - + use crate::rlpx::eth::receipts::has_bloom; use crate::rlpx::{ eth::receipts::{GetReceipts, Receipts}, message::RLPxMessage, + p2p::Capability, }; + use ethrex_common::types::transaction::TxType; + use ethrex_common::types::{BlockHash, Receipt}; #[test] fn get_receipts_empty_message() { @@ -130,13 +213,30 @@ mod tests { #[test] fn receipts_empty_message() { let receipts = vec![]; - let receipts = Receipts::new(1, receipts); + let receipts = Receipts::new(1, receipts, &Capability::eth(68)).unwrap(); let mut buf = Vec::new(); receipts.encode(&mut buf).unwrap(); let decoded = Receipts::decode(&buf).unwrap(); - assert_eq!(decoded.id, 1); - assert_eq!(decoded.receipts, Vec::>::new()); + + assert_eq!(decoded.get_id(), 1); + assert_eq!(decoded.get_receipts(), Vec::>::new()); + } + + #[test] + fn receipts_check_bloom() { + let receipts = vec![vec![ + Receipt::new(TxType::EIP7702, true, 210000, vec![]), + Receipt::new(TxType::EIP7702, true, 210000, vec![]), + Receipt::new(TxType::EIP7702, true, 210000, vec![]), + Receipt::new(TxType::EIP7702, true, 210000, vec![]), + ]]; + let receipts = Receipts::new(255, receipts, &Capability::eth(68)).unwrap(); + + let mut buf = Vec::new(); + receipts.encode(&mut buf).unwrap(); + + assert!(has_bloom(&buf).unwrap()); } } diff --git a/crates/networking/p2p/rlpx/eth/status.rs b/crates/networking/p2p/rlpx/eth/status.rs index 27b7cd1aafc..0655f91c5df 100644 --- a/crates/networking/p2p/rlpx/eth/status.rs +++ b/crates/networking/p2p/rlpx/eth/status.rs @@ -1,67 +1,109 @@ -use crate::rlpx::{ - message::RLPxMessage, - utils::{snappy_compress, snappy_decompress}, -}; +use super::eth68::status::StatusMessage68; +// use super::eth69::status::StatusMessage69; +use crate::rlpx::message::RLPxMessage; +use crate::rlpx::utils::snappy_decompress; +use crate::rlpx::{error::RLPxError, p2p::Capability}; use bytes::BufMut; -use ethrex_common::{ - types::{BlockHash, ForkId}, - U256, -}; -use ethrex_rlp::{ - error::{RLPDecodeError, RLPEncodeError}, - structs::{Decoder, Encoder}, -}; +use ethrex_common::types::{BlockHash, ForkId}; +use ethrex_common::U256; +use ethrex_rlp::error::{RLPDecodeError, RLPEncodeError}; +use ethrex_rlp::structs::Decoder; +use ethrex_storage::Store; #[derive(Debug)] -pub(crate) struct StatusMessage { - pub(crate) eth_version: u32, - pub(crate) network_id: u64, - pub(crate) total_difficulty: U256, - pub(crate) block_hash: BlockHash, - pub(crate) genesis: BlockHash, - pub(crate) fork_id: ForkId, +pub enum StatusMessage { + StatusMessage68(StatusMessage68), + // StatusMessage69(StatusMessage69), } impl RLPxMessage for StatusMessage { const CODE: u8 = 0x10; fn encode(&self, buf: &mut dyn BufMut) -> Result<(), RLPEncodeError> { - let mut encoded_data = vec![]; - Encoder::new(&mut encoded_data) - .encode_field(&self.eth_version) - .encode_field(&self.network_id) - .encode_field(&self.total_difficulty) - .encode_field(&self.block_hash) - .encode_field(&self.genesis) - .encode_field(&self.fork_id) - .finish(); - - let msg_data = snappy_compress(encoded_data)?; - buf.put_slice(&msg_data); - Ok(()) + match self { + StatusMessage::StatusMessage68(msg) => msg.encode(buf), + // StatusMessage::StatusMessage69(msg) => msg.encode(buf), + } } fn decode(msg_data: &[u8]) -> Result { let decompressed_data = snappy_decompress(msg_data)?; let decoder = Decoder::new(&decompressed_data)?; - let (eth_version, decoder): (u32, _) = decoder.decode_field("protocolVersion")?; + let (eth_version, _): (u32, _) = decoder.decode_field("protocolVersion")?; + + match eth_version { + 68 => Ok(StatusMessage::StatusMessage68(StatusMessage68::decode( + msg_data, + )?)), + 69 => Err(RLPDecodeError::IncompatibleProtocol), + _ => Err(RLPDecodeError::IncompatibleProtocol), + } + } +} - assert_eq!(eth_version, 68, "only eth version 68 is supported"); +impl StatusMessage { + pub async fn new(storage: &Store, eth: &Capability) -> Result { + let chain_config = storage.get_chain_config()?; + let total_difficulty = + U256::from(chain_config.terminal_total_difficulty.unwrap_or_default()); + let network_id = chain_config.chain_id; - let (network_id, decoder): (u64, _) = decoder.decode_field("networkId")?; - let (total_difficulty, decoder): (U256, _) = decoder.decode_field("totalDifficulty")?; - let (block_hash, decoder): (BlockHash, _) = decoder.decode_field("blockHash")?; - let (genesis, decoder): (BlockHash, _) = decoder.decode_field("genesis")?; - let (fork_id, decoder): (ForkId, _) = decoder.decode_field("forkId")?; - // Implementations must ignore any additional list elements - let _padding = decoder.finish_unchecked(); + // These blocks must always be available + let genesis_header = storage + .get_block_header(0)? + .ok_or(RLPxError::NotFound("Genesis Block".to_string()))?; + let block_number = storage.get_latest_block_number().await?; + let block_header = storage + .get_block_header(block_number)? + .ok_or(RLPxError::NotFound(format!("Block {block_number}")))?; + + let genesis = genesis_header.compute_block_hash(); + let block_hash = block_header.compute_block_hash(); + let fork_id = ForkId::new( + chain_config, + genesis_header, + block_header.timestamp, + block_number, + ); + + match eth.version { + 68 => Ok(StatusMessage::StatusMessage68(StatusMessage68 { + eth_version: eth.version, + network_id, + total_difficulty, + block_hash, + genesis, + fork_id, + })), + 69 => Err(RLPxError::IncompatibleProtocol), + _ => Err(RLPxError::IncompatibleProtocol), + } + } + + pub fn get_network_id(&self) -> u64 { + match self { + StatusMessage::StatusMessage68(msg) => msg.network_id, + // StatusMessage::StatusMessage69(msg) => msg.network_id, + } + } + + pub fn get_eth_version(&self) -> u8 { + match self { + StatusMessage::StatusMessage68(msg) => msg.eth_version, + // StatusMessage::StatusMessage69(msg) => msg.eth_version, + } + } + + pub fn get_fork_id(&self) -> ForkId { + match self { + StatusMessage::StatusMessage68(msg) => msg.fork_id.clone(), + // StatusMessage::StatusMessage69(msg) => msg.fork_id.clone(), + } + } - Ok(Self { - eth_version, - network_id, - total_difficulty, - block_hash, - genesis, - fork_id, - }) + pub fn get_genesis(&self) -> BlockHash { + match self { + StatusMessage::StatusMessage68(msg) => msg.genesis, + // StatusMessage::StatusMessage69(msg) => msg.genesis, + } } } diff --git a/crates/networking/rpc/Cargo.toml b/crates/networking/rpc/Cargo.toml index 3a9337cbde6..54c2ff1e08b 100644 --- a/crates/networking/rpc/Cargo.toml +++ b/crates/networking/rpc/Cargo.toml @@ -21,6 +21,7 @@ ethrex-blockchain.workspace = true ethrex-p2p.workspace = true ethrex-rlp.workspace = true ethrex-storage-rollup = { workspace = true, optional = true } +ethereum-types.workspace = true hex.workspace = true axum-extra = { version = "0.10.0", features = ["typed-header"] } jsonwebtoken.workspace = true