diff --git a/src/consensus/parlia/mod.rs b/src/consensus/parlia/mod.rs index f8fbcad..72be25e 100644 --- a/src/consensus/parlia/mod.rs +++ b/src/consensus/parlia/mod.rs @@ -16,6 +16,7 @@ pub mod attestation; pub mod gas; pub mod hooks; pub mod slash_pool; +pub mod vote_pool; pub mod transaction_splitter; pub mod consensus; pub mod util; @@ -33,6 +34,7 @@ pub use error::ParliaConsensusError; pub use consensus::ParliaConsensus; pub use util::hash_with_chain_id; pub use provider::SnapshotProvider; +pub use vote_pool as votes; // A single object-safe trait to represent the Parlia consensus object when held globally. // This combines the execution-facing validator API with the consensus engine trait. diff --git a/src/consensus/parlia/vote_pool.rs b/src/consensus/parlia/vote_pool.rs new file mode 100644 index 0000000..d798891 --- /dev/null +++ b/src/consensus/parlia/vote_pool.rs @@ -0,0 +1,56 @@ +use once_cell::sync::Lazy; +use std::{collections::HashSet, sync::Mutex}; + +use alloy_primitives::B256; + +use super::vote::VoteEnvelope; + +/// Global in-memory pool of incoming Parlia votes. +/// +/// This mirrors the simple approach used by the slashing pool: keep votes in +/// memory until they're consumed by another component. Votes are de-duplicated +/// by their RLP hash. +struct VotePool { + /// Hashes of votes we've already seen in this window. + seen_hashes: HashSet, + /// Collected votes (deduplicated by `seen_hashes`). + votes: Vec, +} + +impl VotePool { + fn new() -> Self { + Self { seen_hashes: HashSet::new(), votes: Vec::new() } + } + + fn insert(&mut self, vote: VoteEnvelope) { + let vote_hash = vote.hash(); + if self.seen_hashes.insert(vote_hash) { + self.votes.push(vote); + } + } + + fn drain(&mut self) -> Vec { + self.seen_hashes.clear(); + std::mem::take(&mut self.votes) + } + + fn len(&self) -> usize { self.votes.len() } +} + +/// Global singleton pool. +static VOTE_POOL: Lazy> = Lazy::new(|| Mutex::new(VotePool::new())); + +/// Insert a single vote into the pool (deduplicated by hash). +pub fn put_vote(vote: VoteEnvelope) { + VOTE_POOL.lock().expect("vote pool poisoned").insert(vote); +} + +/// Drain all pending votes. +pub fn drain() -> Vec { + VOTE_POOL.lock().expect("vote pool poisoned").drain() +} + +/// Current number of queued votes. +pub fn len() -> usize { VOTE_POOL.lock().expect("vote pool poisoned").len() } + + diff --git a/src/node/network/bsc_protocol/protocol/handler.rs b/src/node/network/bsc_protocol/protocol/handler.rs new file mode 100644 index 0000000..8c59148 --- /dev/null +++ b/src/node/network/bsc_protocol/protocol/handler.rs @@ -0,0 +1,50 @@ +use reth_network_api::PeerId; +use reth_network::protocol::{ConnectionHandler, OnNotSupported, ProtocolHandler}; +use reth_eth_wire::{capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol}; +use std::net::SocketAddr; + +use super::proto::BscProtoMessage; +use crate::node::network::bsc_protocol::stream::BscVotesConnection; + +#[derive(Clone, Debug, Default)] +pub struct BscProtocolHandler; + +pub struct BscConnectionHandler; + +impl ProtocolHandler for BscProtocolHandler { + type ConnectionHandler = BscConnectionHandler; + + fn on_incoming(&self, _socket_addr: SocketAddr) -> Option { + Some(BscConnectionHandler) + } + + fn on_outgoing(&self, _socket_addr: SocketAddr, _peer_id: PeerId) -> Option { + Some(BscConnectionHandler) + } +} + +impl ConnectionHandler for BscConnectionHandler { + type Connection = BscVotesConnection; + + fn protocol(&self) -> Protocol { BscProtoMessage::protocol() } + + fn on_unsupported_by_peer( + self, + _supported: &SharedCapabilities, + _direction: reth_network_api::Direction, + _peer_id: PeerId, + ) -> OnNotSupported { + OnNotSupported::KeepAlive + } + + fn into_connection( + self, + _direction: reth_network_api::Direction, + _peer_id: PeerId, + conn: ProtocolConnection, + ) -> Self::Connection { + BscVotesConnection::new(conn) + } +} + + diff --git a/src/node/network/bsc_protocol/protocol/proto.rs b/src/node/network/bsc_protocol/protocol/proto.rs new file mode 100644 index 0000000..399fd14 --- /dev/null +++ b/src/node/network/bsc_protocol/protocol/proto.rs @@ -0,0 +1,112 @@ +use reth_eth_wire::{protocol::Protocol, Capability}; + +#[repr(u8)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum BscProtoMessageId { + Capability = 0x00, + Votes = 0x01, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct BscProtoMessage; + +impl BscProtoMessage { + /// Returns the capability for the `bsc` protocol. + pub fn capability() -> Capability { Capability::new_static("bsc", 1) } + + /// Returns the protocol for the `bsc` protocol. + /// Reserve 2 message IDs (0x00 and 0x01) to allow for extension. + pub fn protocol() -> Protocol { Protocol::new(Self::capability(), 2) } +} + +#[cfg(test)] +mod tests { + use super::BscProtoMessageId; + use alloy_primitives::{hex, B256, FixedBytes}; + use alloy_rlp::{RlpEncodable, RlpDecodable}; + use crate::consensus::parlia::vote::{VoteData, VoteEnvelope}; + use crate::node::network::votes::VotesPacket; + + /// Wrapper struct to match Go's RLP encoding of struct{Votes []*VoteEnvelope} + #[derive(RlpEncodable, RlpDecodable)] + struct VotesWrapper(Vec); + + fn b256(s: &str) -> B256 { + let hex_str = s.trim_start_matches("0x"); + B256::from_slice(&hex::decode(hex_str).unwrap()) + } + + fn bytes48(s: &str) -> FixedBytes<48> { + let v = hex::decode(s).unwrap(); + FixedBytes::from_slice(&v) + } + + fn bytes96(s: &str) -> FixedBytes<96> { + let v = hex::decode(s).unwrap(); + FixedBytes::from_slice(&v) + } + + #[test] + fn test_bsc1_votes_messages_rlp_matches_reference() { + let bls_pub = "b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bc"; + let signatures = [ + "91f8a39f99a0b3632d248e635ab0e7b5ff68071fa4def5c2df9f249db7d393c0ba89eb28a65f2a6ba836baddb961b9c312c70a87d130edf944b340649218335c91078cce808da75ff69f673bab3ecdf068c33b1ab147c54298056b19e9cc625d", + "a56cd257f9a4b4830c9bfadaa751c7b1d4e9c6899127c145987a55a7cfa0d1b7d114cb2523ea4e2efee0326cfc5a1cd912eaf7f0c4c0be3193677284533f1709fbd75471a9fb22aea358cdbf2e900628d7c504ce7245e8af6fdd1039dfa3c0bd", + "893f8aff7fc523a7aff006aaba71fbde5f1eee1f4683d405892ffb9ab9282a5dae01054210ff6873ee76f86b9afdef2e128932b26696e3f7e1de7fe7d3fdd1c41273912ff5d1002cba176dbf84e1fe2edb60b114129b89e1329a03f7d9843d04", + "b3585bf55f1e0d8bc0f544a386e6fc4ec37de52330f69b450f579050acda6279a8a38172ed2f01dfdb57cf7dd2a314970aa8a3168234cbd29adfc6a0efd080f57d7e195dafbf5b6db087e8b943aa634f797f8f6d4e5bf04681d7ce2218e59465", + "9366e823b456049cd10ed1aa8f02a58ce2fa4caea7e8c776d6aec9c42f4263b40b0f2d76cc55a598b378381f32ef33520d47e28707027c25e38eb971cddb379e0ded5e814ce70108d65855084a11484fd08447520b7ce79ac1e680020b243747", + "aafd383c9537d750358ea077d45476cf6c1541e717c690ebe5dc5442c2af732fba17b45c60b2c49de94f5121f318b6ae021c56ae06588c6552f1d5b87a166cb8050f287b528b20556033603f6a6649ccec4792c86ae5f6353cf6b7527ac40127", + "90d9dc467a64fe7852b2e0117806409a8f12a036849d69396282088f8d86adb3adcd46b1fde51b4630a6b64c2f8652f30a46609c49b33f50c9f4641e30900ee420f9b81b2ad59a2376dcf4e065ecf832fbf738ad5b73becd2f7add27e6c14d5f", + "8f7d6bc28626dc143208aaa7b97146510f01b1a108dead65f8fddf0ec07835bca91081f9e759656d52dd7d4adaac14220c8c62aa1dd09151fe8000ce4347b100ac496593058ae11b40c74b3049d38076d07301c9dc9586baf93d9c81b4e5d424", + "b6c17077217baa5930fb04e14b6ba7203b3c854e8f1363b48ad753d96db1b4ffed36d60d8b67e86f7f76504f0abefff80ed1e4f11ff192dbfc26a0f059f7b9f66f9e883fef208cc3f58c7ce49d8e854cf8a0e48c59c7407ebfe946cfd62bf3be", + "979b1d101e51731749c72fb160dd1245d69ebd6ca81c0519464d3bca9ec3db493cf4b45ebbb7f60fbd12f0705bd788000558bdedc335cedac2100169965b2794fae8a386b2e9ece86ea6952fadeb8501d9aad00e091713cc06d30c5885c3ecf0", + "8d035b04d8ef6c13117acc1ed9d0586a141f123494f21eeaaead5dd9f623933541b293eef403d2f3e8ede84f9dfe3dc10cbd3fa6bdf3e977dcf2d18a4dca84f8bd9b24fca8e7de4180b9aa6208ad6e756b1c81e98afc8e6994824b5c076857f8", + ]; + + let vote_data_set = [ + (0u64, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 1u64, "0xd0bc67b50915467ada963c35ee00950f664788e47da8139d8c178653171034f1"), + (0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 2, "0xc2d18d5a59d65da573f70c4d30448482418894e018b0d189db24ea4fd02d7aa1"), + (0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 4, "0xbd1bdaf8a8f5c00c464df2856a9e2ef23b8dcc906e6490d3cd295ebb5eb124c3"), + (0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 8, "0x3073782ecabb5ef0673e95962273482347a2c7b30a0a7124c664443d0a43f1e1"), + (0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 16, "0xc119833266327fd7e0cd929c6a847ae7d1689df5066dfdde2e52f51c0ecbcc3f"), + (0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 32, "0x3b5650bcb98381e463871a15a3f601cdc26843d76f4d3461333d7feae38a1786"), + (0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 64, "0x5e38b4d98904178d60d58f5bc1687b0c7df114a51f2007d3ee3e6e732539f130"), + (0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 128, "0xa4a64a7d511d3ff6982b5a79e9a485508477b98996c570a220f9daea0c7682f8"), + (0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 256, "0xd313672c2653fc13e75a9dafdcee93f444caf2cffb04585d3e306fd15418b7e2"), + (0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 512, "0x3c5fe2e5439ca7a7f1a3de7d5c0914c37261451c87654397dd45f207109839ae"), + (0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 1024, "0x088eeeb07acff0db3ae2585195e9fd23bdf54b55077cab87d1632b08dd2c043b"), + ]; + + let vote_address = bytes48(bls_pub); + let mut votes: Vec = Vec::with_capacity(vote_data_set.len()); + for (i, (sn, sh, tn, th)) in vote_data_set.into_iter().enumerate() { + let data = VoteData { source_number: sn, source_hash: b256(sh), target_number: tn, target_hash: b256(th) }; + let signature = bytes96(signatures[i]); + votes.push(VoteEnvelope { vote_address, signature, data }); + } + + // Encode using wrapper struct to match Go's RLP of struct{Votes []*VoteEnvelope} + let wrapped = VotesWrapper(votes.clone()); + let encoded = alloy_rlp::encode(&wrapped); + + let want_hex = "f90973f90970f8d9b0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb86091f8a39f99a0b3632d248e635ab0e7b5ff68071fa4def5c2df9f249db7d393c0ba89eb28a65f2a6ba836baddb961b9c312c70a87d130edf944b340649218335c91078cce808da75ff69f673bab3ecdf068c33b1ab147c54298056b19e9cc625df84480a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe3401a0d0bc67b50915467ada963c35ee00950f664788e47da8139d8c178653171034f1f8d9b0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb860a56cd257f9a4b4830c9bfadaa751c7b1d4e9c6899127c145987a55a7cfa0d1b7d114cb2523ea4e2efee0326cfc5a1cd912eaf7f0c4c0be3193677284533f1709fbd75471a9fb22aea358cdbf2e900628d7c504ce7245e8af6fdd1039dfa3c0bdf84480a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe3402a0c2d18d5a59d65da573f70c4d30448482418894e018b0d189db24ea4fd02d7aa1f8d9b0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb860893f8aff7fc523a7aff006aaba71fbde5f1eee1f4683d405892ffb9ab9282a5dae01054210ff6873ee76f86b9afdef2e128932b26696e3f7e1de7fe7d3fdd1c41273912ff5d1002cba176dbf84e1fe2edb60b114129b89e1329a03f7d9843d04f84480a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe3404a0bd1bdaf8a8f5c00c464df2856a9e2ef23b8dcc906e6490d3cd295ebb5eb124c3f8d9b0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb860b3585bf55f1e0d8bc0f544a386e6fc4ec37de52330f69b450f579050acda6279a8a38172ed2f01dfdb57cf7dd2a314970aa8a3168234cbd29adfc6a0efd080f57d7e195dafbf5b6db087e8b943aa634f797f8f6d4e5bf04681d7ce2218e59465f84480a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe3408a03073782ecabb5ef0673e95962273482347a2c7b30a0a7124c664443d0a43f1e1f8d9b0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb8609366e823b456049cd10ed1aa8f02a58ce2fa4caea7e8c776d6aec9c42f4263b40b0f2d76cc55a598b378381f32ef33520d47e28707027c25e38eb971cddb379e0ded5e814ce70108d65855084a11484fd08447520b7ce79ac1e680020b243747f84480a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe3410a0c119833266327fd7e0cd929c6a847ae7d1689df5066dfdde2e52f51c0ecbcc3ff8d9b0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb860aafd383c9537d750358ea077d45476cf6c1541e717c690ebe5dc5442c2af732fba17b45c60b2c49de94f5121f318b6ae021c56ae06588c6552f1d5b87a166cb8050f287b528b20556033603f6a6649ccec4792c86ae5f6353cf6b7527ac40127f84480a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe3420a03b5650bcb98381e463871a15a3f601cdc26843d76f4d3461333d7feae38a1786f8d9b0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb86090d9dc467a64fe7852b2e0117806409a8f12a036849d69396282088f8d86adb3adcd46b1fde51b4630a6b64c2f8652f30a46609c49b33f50c9f4641e30900ee420f9b81b2ad59a2376dcf4e065ecf832fbf738ad5b73becd2f7add27e6c14d5ff84480a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe3440a05e38b4d98904178d60d58f5bc1687b0c7df114a51f2007d3ee3e6e732539f130f8dab0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb8608f7d6bc28626dc143208aaa7b97146510f01b1a108dead65f8fddf0ec07835bca91081f9e759656d52dd7d4adaac14220c8c62aa1dd09151fe8000ce4347b100ac496593058ae11b40c74b3049d38076d07301c9dc9586baf93d9c81b4e5d424f84580a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe348180a0a4a64a7d511d3ff6982b5a79e9a485508477b98996c570a220f9daea0c7682f8f8dbb0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb860b6c17077217baa5930fb04e14b6ba7203b3c854e8f1363b48ad753d96db1b4ffed36d60d8b67e86f7f76504f0abefff80ed1e4f11ff192dbfc26a0f059f7b9f66f9e883fef208cc3f58c7ce49d8e854cf8a0e48c59c7407ebfe946cfd62bf3bef84680a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34820100a0d313672c2653fc13e75a9dafdcee93f444caf2cffb04585d3e306fd15418b7e2f8dbb0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb860979b1d101e51731749c72fb160dd1245d69ebd6ca81c0519464d3bca9ec3db493cf4b45ebbb7f60fbd12f0705bd788000558bdedc335cedac2100169965b2794fae8a386b2e9ece86ea6952fadeb8501d9aad00e091713cc06d30c5885c3ecf0f84680a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34820200a03c5fe2e5439ca7a7f1a3de7d5c0914c37261451c87654397dd45f207109839aef8dbb0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb8608d035b04d8ef6c13117acc1ed9d0586a141f123494f21eeaaead5dd9f623933541b293eef403d2f3e8ede84f9dfe3dc10cbd3fa6bdf3e977dcf2d18a4dca84f8bd9b24fca8e7de4180b9aa6208ad6e756b1c81e98afc8e6994824b5c076857f8f84680a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34820400a0088eeeb07acff0db3ae2585195e9fd23bdf54b55077cab87d1632b08dd2c043b"; + + let want = hex::decode(want_hex).unwrap(); + assert_eq!(encoded, want, "RLP-encoded votes packet must match reference"); + + // For VotesPacket, we expect just the inner votes array (not wrapped) + // since VotesPacket is a newtype around Vec + let votes_only = alloy_rlp::encode(&votes); + let with_msg_id = { + let mut v = Vec::with_capacity(1 + votes_only.len()); + v.push(BscProtoMessageId::Votes as u8); + v.extend_from_slice(&votes_only); + v + }; + let encoded_packet = alloy_rlp::encode(&VotesPacket(votes)); + assert_eq!(with_msg_id, encoded_packet); + } +} + + + diff --git a/src/node/network/bsc_protocol/stream.rs b/src/node/network/bsc_protocol/stream.rs new file mode 100644 index 0000000..4285f65 --- /dev/null +++ b/src/node/network/bsc_protocol/stream.rs @@ -0,0 +1,46 @@ +use alloy_primitives::bytes::BytesMut; +use alloy_rlp::Decodable; +use futures::{Stream, StreamExt}; +use std::{pin::Pin, task::{Context, Poll}}; +use reth_eth_wire::multiplex::ProtocolConnection; + +use crate::node::network::votes::{VotesPacket, BscCapPacket, handle_votes_broadcast}; +use super::protocol::proto::BscProtoMessageId; + +/// Stream that handles incoming BSC protocol messages (currently only Votes). +pub struct BscVotesConnection { + conn: ProtocolConnection, +} + +impl BscVotesConnection { + pub fn new(conn: ProtocolConnection) -> Self { Self { conn } } +} + +impl Stream for BscVotesConnection { + type Item = BytesMut; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let Some(raw) = futures::ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; + let slice = raw.as_ref(); + if slice.is_empty() { return Poll::Pending } + match slice[0] { + x if x == BscProtoMessageId::Votes as u8 => { + if let Ok(packet) = VotesPacket::decode(&mut &slice[..]) { + handle_votes_broadcast(packet); + } + } + x if x == BscProtoMessageId::Capability as u8 => { + // Decode and ignore capability for v1 + let _ = BscCapPacket::decode(&mut &slice[..]); + } + _ => { + // Unknown message id; ignore. + } + } + // This protocol does not proactively send responses; keep the connection open. + Poll::Pending + } +} + + diff --git a/src/node/network/mod.rs b/src/node/network/mod.rs index 6f7cf2c..79f8d21 100644 --- a/src/node/network/mod.rs +++ b/src/node/network/mod.rs @@ -31,6 +31,14 @@ pub mod block_import; pub mod bootnodes; pub mod handshake; pub(crate) mod upgrade_status; +pub(crate) mod votes; +pub(crate) mod bsc_protocol { + pub mod protocol { + pub mod handler; + pub mod proto; + } + pub mod stream; +} /// BSC `NewBlock` message value. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BscNewBlock(pub NewBlock); @@ -215,7 +223,9 @@ impl BscNetworkBuilder { .with_pow() .block_import(Box::new(BscBlockImport::new(handle))) .discovery(discv4) - .eth_rlpx_handshake(Arc::new(BscHandshake::default())); + .eth_rlpx_handshake(Arc::new(BscHandshake::default())) + .add_rlpx_sub_protocol(bsc_protocol::protocol::handler::BscProtocolHandler::default()); + let mut network_config = ctx.build_network_config(network_builder); // Ensure our advertised fork ID matches the fork filter we validate against. diff --git a/src/node/network/votes.rs b/src/node/network/votes.rs new file mode 100644 index 0000000..5425cf7 --- /dev/null +++ b/src/node/network/votes.rs @@ -0,0 +1,68 @@ +use alloy_rlp::{Decodable, Encodable, RlpDecodable, RlpEncodable}; +use bytes::{BufMut, Bytes}; + +use crate::consensus::parlia::{vote::VoteEnvelope, votes}; +use crate::node::network::bsc_protocol::protocol::proto::BscProtoMessageId; + +/// BSC capability packet: version + extra RLP value (opaque), message id 0x00 +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BscCapPacket { + pub protocol_version: u64, + pub extra: Bytes, +} + +impl Encodable for BscCapPacket { + fn encode(&self, out: &mut dyn BufMut) { + (BscProtoMessageId::Capability as u8).encode(out); + CapPayload { protocol_version: self.protocol_version, extra: self.extra.clone() } + .encode(out); + } +} + +impl Decodable for BscCapPacket { + fn decode(buf: &mut &[u8]) -> alloy_rlp::Result { + let message_id = u8::decode(buf)?; + if message_id != (BscProtoMessageId::Capability as u8) { + return Err(alloy_rlp::Error::Custom("Invalid message ID for BscCapPacket")); + } + let CapPayload { protocol_version, extra } = CapPayload::decode(buf)?; + Ok(Self { protocol_version, extra }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, RlpEncodable, RlpDecodable)] +struct CapPayload { + protocol_version: u64, + extra: Bytes, +} + +/// VotesPacket carries a list of votes (message id 0x01) +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct VotesPacket(pub Vec); + +impl Encodable for VotesPacket { + fn encode(&self, out: &mut dyn BufMut) { + (BscProtoMessageId::Votes as u8).encode(out); + self.0.encode(out); + } +} + +impl Decodable for VotesPacket { + fn decode(buf: &mut &[u8]) -> alloy_rlp::Result { + let message_id = u8::decode(buf)?; + if message_id != (BscProtoMessageId::Votes as u8) { + return Err(alloy_rlp::Error::Custom("Invalid message ID for VotesPacket")); + } + let votes = Vec::::decode(buf)?; + Ok(Self(votes)) + } +} + +/// Handle an incoming `VotesPacket` from a peer. +/// To avoid DoS from massive batches, only enqueue the first vote if present, +/// mirroring Geth's logic. +pub fn handle_votes_broadcast(packet: VotesPacket) { + if let Some(first) = packet.0.into_iter().next() { + votes::put_vote(first); + } +} \ No newline at end of file