Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/consensus/parlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod attestation;
pub mod gas;
pub mod hooks;
pub mod slash_pool;
pub mod vote_pool;
pub mod transaction_splitter;
pub mod consensus;
pub mod util;
Expand All @@ -33,6 +34,7 @@ pub use error::ParliaConsensusError;
pub use consensus::ParliaConsensus;
pub use util::hash_with_chain_id;
pub use provider::SnapshotProvider;
pub use vote_pool as votes;

// A single object-safe trait to represent the Parlia consensus object when held globally.
// This combines the execution-facing validator API with the consensus engine trait.
Expand Down
56 changes: 56 additions & 0 deletions src/consensus/parlia/vote_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use once_cell::sync::Lazy;
use std::{collections::HashSet, sync::Mutex};

use alloy_primitives::B256;

use super::vote::VoteEnvelope;

/// Global in-memory pool of incoming Parlia votes.
///
/// This mirrors the simple approach used by the slashing pool: keep votes in
/// memory until they're consumed by another component. Votes are de-duplicated
/// by their RLP hash.
struct VotePool {
/// Hashes of votes we've already seen in this window.
seen_hashes: HashSet<B256>,
/// Collected votes (deduplicated by `seen_hashes`).
votes: Vec<VoteEnvelope>,
}

impl VotePool {
fn new() -> Self {
Self { seen_hashes: HashSet::new(), votes: Vec::new() }
}

fn insert(&mut self, vote: VoteEnvelope) {
let vote_hash = vote.hash();
if self.seen_hashes.insert(vote_hash) {
self.votes.push(vote);
}
}

fn drain(&mut self) -> Vec<VoteEnvelope> {
self.seen_hashes.clear();
std::mem::take(&mut self.votes)
}

fn len(&self) -> usize { self.votes.len() }
}

/// Global singleton pool.
static VOTE_POOL: Lazy<Mutex<VotePool>> = Lazy::new(|| Mutex::new(VotePool::new()));

/// Insert a single vote into the pool (deduplicated by hash).
pub fn put_vote(vote: VoteEnvelope) {
VOTE_POOL.lock().expect("vote pool poisoned").insert(vote);
}

/// Drain all pending votes.
pub fn drain() -> Vec<VoteEnvelope> {
VOTE_POOL.lock().expect("vote pool poisoned").drain()
}

/// Current number of queued votes.
pub fn len() -> usize { VOTE_POOL.lock().expect("vote pool poisoned").len() }


50 changes: 50 additions & 0 deletions src/node/network/bsc_protocol/protocol/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use reth_network_api::PeerId;
use reth_network::protocol::{ConnectionHandler, OnNotSupported, ProtocolHandler};
use reth_eth_wire::{capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol};
use std::net::SocketAddr;

use super::proto::BscProtoMessage;
use crate::node::network::bsc_protocol::stream::BscVotesConnection;

#[derive(Clone, Debug, Default)]
pub struct BscProtocolHandler;

pub struct BscConnectionHandler;

impl ProtocolHandler for BscProtocolHandler {
type ConnectionHandler = BscConnectionHandler;

fn on_incoming(&self, _socket_addr: SocketAddr) -> Option<Self::ConnectionHandler> {
Some(BscConnectionHandler)
}

fn on_outgoing(&self, _socket_addr: SocketAddr, _peer_id: PeerId) -> Option<Self::ConnectionHandler> {
Some(BscConnectionHandler)
}
}

impl ConnectionHandler for BscConnectionHandler {
type Connection = BscVotesConnection;

fn protocol(&self) -> Protocol { BscProtoMessage::protocol() }

fn on_unsupported_by_peer(
self,
_supported: &SharedCapabilities,
_direction: reth_network_api::Direction,
_peer_id: PeerId,
) -> OnNotSupported {
OnNotSupported::KeepAlive
}

fn into_connection(
self,
_direction: reth_network_api::Direction,
_peer_id: PeerId,
conn: ProtocolConnection,
) -> Self::Connection {
BscVotesConnection::new(conn)
}
}


112 changes: 112 additions & 0 deletions src/node/network/bsc_protocol/protocol/proto.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use reth_eth_wire::{protocol::Protocol, Capability};

#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BscProtoMessageId {
Capability = 0x00,
Votes = 0x01,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BscProtoMessage;

impl BscProtoMessage {
/// Returns the capability for the `bsc` protocol.
pub fn capability() -> Capability { Capability::new_static("bsc", 1) }

/// Returns the protocol for the `bsc` protocol.
/// Reserve 2 message IDs (0x00 and 0x01) to allow for extension.
pub fn protocol() -> Protocol { Protocol::new(Self::capability(), 2) }
}

#[cfg(test)]
mod tests {
use super::BscProtoMessageId;
use alloy_primitives::{hex, B256, FixedBytes};
use alloy_rlp::{RlpEncodable, RlpDecodable};
use crate::consensus::parlia::vote::{VoteData, VoteEnvelope};
use crate::node::network::votes::VotesPacket;

/// Wrapper struct to match Go's RLP encoding of struct{Votes []*VoteEnvelope}
#[derive(RlpEncodable, RlpDecodable)]
struct VotesWrapper(Vec<VoteEnvelope>);

fn b256(s: &str) -> B256 {
let hex_str = s.trim_start_matches("0x");
B256::from_slice(&hex::decode(hex_str).unwrap())
}

fn bytes48(s: &str) -> FixedBytes<48> {
let v = hex::decode(s).unwrap();
FixedBytes::from_slice(&v)
}

fn bytes96(s: &str) -> FixedBytes<96> {
let v = hex::decode(s).unwrap();
FixedBytes::from_slice(&v)
}

#[test]
fn test_bsc1_votes_messages_rlp_matches_reference() {
let bls_pub = "b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bc";
let signatures = [
"91f8a39f99a0b3632d248e635ab0e7b5ff68071fa4def5c2df9f249db7d393c0ba89eb28a65f2a6ba836baddb961b9c312c70a87d130edf944b340649218335c91078cce808da75ff69f673bab3ecdf068c33b1ab147c54298056b19e9cc625d",
"a56cd257f9a4b4830c9bfadaa751c7b1d4e9c6899127c145987a55a7cfa0d1b7d114cb2523ea4e2efee0326cfc5a1cd912eaf7f0c4c0be3193677284533f1709fbd75471a9fb22aea358cdbf2e900628d7c504ce7245e8af6fdd1039dfa3c0bd",
"893f8aff7fc523a7aff006aaba71fbde5f1eee1f4683d405892ffb9ab9282a5dae01054210ff6873ee76f86b9afdef2e128932b26696e3f7e1de7fe7d3fdd1c41273912ff5d1002cba176dbf84e1fe2edb60b114129b89e1329a03f7d9843d04",
"b3585bf55f1e0d8bc0f544a386e6fc4ec37de52330f69b450f579050acda6279a8a38172ed2f01dfdb57cf7dd2a314970aa8a3168234cbd29adfc6a0efd080f57d7e195dafbf5b6db087e8b943aa634f797f8f6d4e5bf04681d7ce2218e59465",
"9366e823b456049cd10ed1aa8f02a58ce2fa4caea7e8c776d6aec9c42f4263b40b0f2d76cc55a598b378381f32ef33520d47e28707027c25e38eb971cddb379e0ded5e814ce70108d65855084a11484fd08447520b7ce79ac1e680020b243747",
"aafd383c9537d750358ea077d45476cf6c1541e717c690ebe5dc5442c2af732fba17b45c60b2c49de94f5121f318b6ae021c56ae06588c6552f1d5b87a166cb8050f287b528b20556033603f6a6649ccec4792c86ae5f6353cf6b7527ac40127",
"90d9dc467a64fe7852b2e0117806409a8f12a036849d69396282088f8d86adb3adcd46b1fde51b4630a6b64c2f8652f30a46609c49b33f50c9f4641e30900ee420f9b81b2ad59a2376dcf4e065ecf832fbf738ad5b73becd2f7add27e6c14d5f",
"8f7d6bc28626dc143208aaa7b97146510f01b1a108dead65f8fddf0ec07835bca91081f9e759656d52dd7d4adaac14220c8c62aa1dd09151fe8000ce4347b100ac496593058ae11b40c74b3049d38076d07301c9dc9586baf93d9c81b4e5d424",
"b6c17077217baa5930fb04e14b6ba7203b3c854e8f1363b48ad753d96db1b4ffed36d60d8b67e86f7f76504f0abefff80ed1e4f11ff192dbfc26a0f059f7b9f66f9e883fef208cc3f58c7ce49d8e854cf8a0e48c59c7407ebfe946cfd62bf3be",
"979b1d101e51731749c72fb160dd1245d69ebd6ca81c0519464d3bca9ec3db493cf4b45ebbb7f60fbd12f0705bd788000558bdedc335cedac2100169965b2794fae8a386b2e9ece86ea6952fadeb8501d9aad00e091713cc06d30c5885c3ecf0",
"8d035b04d8ef6c13117acc1ed9d0586a141f123494f21eeaaead5dd9f623933541b293eef403d2f3e8ede84f9dfe3dc10cbd3fa6bdf3e977dcf2d18a4dca84f8bd9b24fca8e7de4180b9aa6208ad6e756b1c81e98afc8e6994824b5c076857f8",
];

let vote_data_set = [
(0u64, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 1u64, "0xd0bc67b50915467ada963c35ee00950f664788e47da8139d8c178653171034f1"),
(0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 2, "0xc2d18d5a59d65da573f70c4d30448482418894e018b0d189db24ea4fd02d7aa1"),
(0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 4, "0xbd1bdaf8a8f5c00c464df2856a9e2ef23b8dcc906e6490d3cd295ebb5eb124c3"),
(0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 8, "0x3073782ecabb5ef0673e95962273482347a2c7b30a0a7124c664443d0a43f1e1"),
(0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 16, "0xc119833266327fd7e0cd929c6a847ae7d1689df5066dfdde2e52f51c0ecbcc3f"),
(0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 32, "0x3b5650bcb98381e463871a15a3f601cdc26843d76f4d3461333d7feae38a1786"),
(0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 64, "0x5e38b4d98904178d60d58f5bc1687b0c7df114a51f2007d3ee3e6e732539f130"),
(0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 128, "0xa4a64a7d511d3ff6982b5a79e9a485508477b98996c570a220f9daea0c7682f8"),
(0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 256, "0xd313672c2653fc13e75a9dafdcee93f444caf2cffb04585d3e306fd15418b7e2"),
(0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 512, "0x3c5fe2e5439ca7a7f1a3de7d5c0914c37261451c87654397dd45f207109839ae"),
(0, "0x6d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34", 1024, "0x088eeeb07acff0db3ae2585195e9fd23bdf54b55077cab87d1632b08dd2c043b"),
];

let vote_address = bytes48(bls_pub);
let mut votes: Vec<VoteEnvelope> = Vec::with_capacity(vote_data_set.len());
for (i, (sn, sh, tn, th)) in vote_data_set.into_iter().enumerate() {
let data = VoteData { source_number: sn, source_hash: b256(sh), target_number: tn, target_hash: b256(th) };
let signature = bytes96(signatures[i]);
votes.push(VoteEnvelope { vote_address, signature, data });
}

// Encode using wrapper struct to match Go's RLP of struct{Votes []*VoteEnvelope}
let wrapped = VotesWrapper(votes.clone());
let encoded = alloy_rlp::encode(&wrapped);

let want_hex = "f90973f90970f8d9b0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb86091f8a39f99a0b3632d248e635ab0e7b5ff68071fa4def5c2df9f249db7d393c0ba89eb28a65f2a6ba836baddb961b9c312c70a87d130edf944b340649218335c91078cce808da75ff69f673bab3ecdf068c33b1ab147c54298056b19e9cc625df84480a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe3401a0d0bc67b50915467ada963c35ee00950f664788e47da8139d8c178653171034f1f8d9b0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb860a56cd257f9a4b4830c9bfadaa751c7b1d4e9c6899127c145987a55a7cfa0d1b7d114cb2523ea4e2efee0326cfc5a1cd912eaf7f0c4c0be3193677284533f1709fbd75471a9fb22aea358cdbf2e900628d7c504ce7245e8af6fdd1039dfa3c0bdf84480a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe3402a0c2d18d5a59d65da573f70c4d30448482418894e018b0d189db24ea4fd02d7aa1f8d9b0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb860893f8aff7fc523a7aff006aaba71fbde5f1eee1f4683d405892ffb9ab9282a5dae01054210ff6873ee76f86b9afdef2e128932b26696e3f7e1de7fe7d3fdd1c41273912ff5d1002cba176dbf84e1fe2edb60b114129b89e1329a03f7d9843d04f84480a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe3404a0bd1bdaf8a8f5c00c464df2856a9e2ef23b8dcc906e6490d3cd295ebb5eb124c3f8d9b0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb860b3585bf55f1e0d8bc0f544a386e6fc4ec37de52330f69b450f579050acda6279a8a38172ed2f01dfdb57cf7dd2a314970aa8a3168234cbd29adfc6a0efd080f57d7e195dafbf5b6db087e8b943aa634f797f8f6d4e5bf04681d7ce2218e59465f84480a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe3408a03073782ecabb5ef0673e95962273482347a2c7b30a0a7124c664443d0a43f1e1f8d9b0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb8609366e823b456049cd10ed1aa8f02a58ce2fa4caea7e8c776d6aec9c42f4263b40b0f2d76cc55a598b378381f32ef33520d47e28707027c25e38eb971cddb379e0ded5e814ce70108d65855084a11484fd08447520b7ce79ac1e680020b243747f84480a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe3410a0c119833266327fd7e0cd929c6a847ae7d1689df5066dfdde2e52f51c0ecbcc3ff8d9b0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb860aafd383c9537d750358ea077d45476cf6c1541e717c690ebe5dc5442c2af732fba17b45c60b2c49de94f5121f318b6ae021c56ae06588c6552f1d5b87a166cb8050f287b528b20556033603f6a6649ccec4792c86ae5f6353cf6b7527ac40127f84480a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe3420a03b5650bcb98381e463871a15a3f601cdc26843d76f4d3461333d7feae38a1786f8d9b0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb86090d9dc467a64fe7852b2e0117806409a8f12a036849d69396282088f8d86adb3adcd46b1fde51b4630a6b64c2f8652f30a46609c49b33f50c9f4641e30900ee420f9b81b2ad59a2376dcf4e065ecf832fbf738ad5b73becd2f7add27e6c14d5ff84480a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe3440a05e38b4d98904178d60d58f5bc1687b0c7df114a51f2007d3ee3e6e732539f130f8dab0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb8608f7d6bc28626dc143208aaa7b97146510f01b1a108dead65f8fddf0ec07835bca91081f9e759656d52dd7d4adaac14220c8c62aa1dd09151fe8000ce4347b100ac496593058ae11b40c74b3049d38076d07301c9dc9586baf93d9c81b4e5d424f84580a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe348180a0a4a64a7d511d3ff6982b5a79e9a485508477b98996c570a220f9daea0c7682f8f8dbb0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb860b6c17077217baa5930fb04e14b6ba7203b3c854e8f1363b48ad753d96db1b4ffed36d60d8b67e86f7f76504f0abefff80ed1e4f11ff192dbfc26a0f059f7b9f66f9e883fef208cc3f58c7ce49d8e854cf8a0e48c59c7407ebfe946cfd62bf3bef84680a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34820100a0d313672c2653fc13e75a9dafdcee93f444caf2cffb04585d3e306fd15418b7e2f8dbb0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb860979b1d101e51731749c72fb160dd1245d69ebd6ca81c0519464d3bca9ec3db493cf4b45ebbb7f60fbd12f0705bd788000558bdedc335cedac2100169965b2794fae8a386b2e9ece86ea6952fadeb8501d9aad00e091713cc06d30c5885c3ecf0f84680a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34820200a03c5fe2e5439ca7a7f1a3de7d5c0914c37261451c87654397dd45f207109839aef8dbb0b32d4d46a7127dcc865f0d30f2ee3dcd5983b686f4e3a9202afc8b608652001c9938906ae1ff1417486096e32511f1bcb8608d035b04d8ef6c13117acc1ed9d0586a141f123494f21eeaaead5dd9f623933541b293eef403d2f3e8ede84f9dfe3dc10cbd3fa6bdf3e977dcf2d18a4dca84f8bd9b24fca8e7de4180b9aa6208ad6e756b1c81e98afc8e6994824b5c076857f8f84680a06d3c66c5357ec91d5c43af47e234a939b22557cbb552dc45bebbceeed90fbe34820400a0088eeeb07acff0db3ae2585195e9fd23bdf54b55077cab87d1632b08dd2c043b";

let want = hex::decode(want_hex).unwrap();
assert_eq!(encoded, want, "RLP-encoded votes packet must match reference");

// For VotesPacket, we expect just the inner votes array (not wrapped)
// since VotesPacket is a newtype around Vec<VoteEnvelope>
let votes_only = alloy_rlp::encode(&votes);
let with_msg_id = {
let mut v = Vec::with_capacity(1 + votes_only.len());
v.push(BscProtoMessageId::Votes as u8);
v.extend_from_slice(&votes_only);
v
};
let encoded_packet = alloy_rlp::encode(&VotesPacket(votes));
assert_eq!(with_msg_id, encoded_packet);
}
}



46 changes: 46 additions & 0 deletions src/node/network/bsc_protocol/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use alloy_primitives::bytes::BytesMut;
use alloy_rlp::Decodable;
use futures::{Stream, StreamExt};
use std::{pin::Pin, task::{Context, Poll}};
use reth_eth_wire::multiplex::ProtocolConnection;

use crate::node::network::votes::{VotesPacket, BscCapPacket, handle_votes_broadcast};
use super::protocol::proto::BscProtoMessageId;

/// Stream that handles incoming BSC protocol messages (currently only Votes).
pub struct BscVotesConnection {
conn: ProtocolConnection,
}

impl BscVotesConnection {
pub fn new(conn: ProtocolConnection) -> Self { Self { conn } }
}

impl Stream for BscVotesConnection {
type Item = BytesMut;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let Some(raw) = futures::ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) };
let slice = raw.as_ref();
if slice.is_empty() { return Poll::Pending }
match slice[0] {
x if x == BscProtoMessageId::Votes as u8 => {
if let Ok(packet) = VotesPacket::decode(&mut &slice[..]) {
handle_votes_broadcast(packet);
}
}
x if x == BscProtoMessageId::Capability as u8 => {
// Decode and ignore capability for v1
let _ = BscCapPacket::decode(&mut &slice[..]);
}
_ => {
// Unknown message id; ignore.
}
}
// This protocol does not proactively send responses; keep the connection open.
Poll::Pending
}
}


12 changes: 11 additions & 1 deletion src/node/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ pub mod block_import;
pub mod bootnodes;
pub mod handshake;
pub(crate) mod upgrade_status;
pub(crate) mod votes;
pub(crate) mod bsc_protocol {
pub mod protocol {
pub mod handler;
pub mod proto;
}
pub mod stream;
}
/// BSC `NewBlock` message value.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BscNewBlock(pub NewBlock<BscBlock>);
Expand Down Expand Up @@ -215,7 +223,9 @@ impl BscNetworkBuilder {
.with_pow()
.block_import(Box::new(BscBlockImport::new(handle)))
.discovery(discv4)
.eth_rlpx_handshake(Arc::new(BscHandshake::default()));
.eth_rlpx_handshake(Arc::new(BscHandshake::default()))
.add_rlpx_sub_protocol(bsc_protocol::protocol::handler::BscProtocolHandler::default());


let mut network_config = ctx.build_network_config(network_builder);
// Ensure our advertised fork ID matches the fork filter we validate against.
Expand Down
Loading