From 6d7954a219f6c54166a2ed66083b6c6ebed66762 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Sun, 16 Mar 2025 18:22:31 -0700 Subject: [PATCH 1/8] Support receiving verified transactions from the vortexor --- Cargo.lock | 10 +++++ Cargo.toml | 2 + core/Cargo.toml | 1 + core/src/lib.rs | 1 + core/src/tpu.rs | 44 +++++++++++++++++--- core/src/validator.rs | 1 + gossip/src/cluster_info.rs | 26 ++++++++++++ programs/sbf/Cargo.lock | 9 +++++ svm/examples/Cargo.lock | 9 +++++ validator/src/commands/run/args.rs | 8 ++++ validator/src/commands/run/execute.rs | 14 +++++++ vortexor-receiver/Cargo.toml | 26 ++++++++++++ vortexor-receiver/Readme.md | 48 ++++++++++++++++++++++ vortexor-receiver/src/lib.rs | 1 + vortexor-receiver/src/receiver.rs | 58 +++++++++++++++++++++++++++ 15 files changed, 253 insertions(+), 5 deletions(-) create mode 100644 vortexor-receiver/Cargo.toml create mode 100644 vortexor-receiver/Readme.md create mode 100644 vortexor-receiver/src/lib.rs create mode 100644 vortexor-receiver/src/receiver.rs diff --git a/Cargo.lock b/Cargo.lock index dfb692199ab11e..55554c393adfe3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7571,6 +7571,7 @@ dependencies = [ "solana-unified-scheduler-logic", "solana-unified-scheduler-pool", "solana-version", + "solana-vortexor-receiver", "solana-vote", "solana-vote-program", "solana-wen-restart", @@ -11123,6 +11124,15 @@ dependencies = [ "x509-parser", ] +[[package]] +name = "solana-vortexor-receiver" +version = "2.3.0" +dependencies = [ + "assert_matches", + "solana-perf", + "solana-streamer", +] + [[package]] name = "solana-vote" version = "2.3.0" diff --git a/Cargo.toml b/Cargo.toml index e861f53e3a3fd6..10e1377a58d9bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,6 +149,7 @@ members = [ "validator", "version", "vortexor", + "vortexor-receiver", "vote", "watchtower", "wen-restart", @@ -566,6 +567,7 @@ solana-type-overrides = { path = "type-overrides", version = "=2.3.0" } solana-udp-client = { path = "udp-client", version = "=2.3.0" } solana-validator-exit = "2.2.1" solana-version = { path = "version", version = "=2.3.0" } +solana-vortexor-receiver = { path = "vortexor-receiver", version = "=2.3.0" } solana-vote = { path = "vote", version = "=2.3.0" } solana-vote-interface = "2.2.3" solana-vote-program = { path = "programs/vote", version = "=2.3.0", default-features = false } diff --git a/core/Cargo.toml b/core/Cargo.toml index ab04280ee6dba2..bc3d6762a8a062 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -96,6 +96,7 @@ solana-transaction-status = { workspace = true } solana-turbine = { workspace = true } solana-unified-scheduler-pool = { workspace = true } solana-version = { workspace = true } +solana-vortexor-receiver = { workspace = true } solana-vote = { workspace = true } solana-vote-program = { workspace = true } solana-wen-restart = { workspace = true } diff --git a/core/src/lib.rs b/core/src/lib.rs index 5c6b6bafdbb001..4272ee04e3b43e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -41,6 +41,7 @@ mod tpu_entry_notifier; pub mod tvu; pub mod unfrozen_gossip_verified_vote_hashes; pub mod validator; +mod vortexor_receiver_adapter; pub mod vote_simulator; pub mod voting_service; pub mod warm_quic_cache_service; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 281957f500509e..70eac217f87303 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -23,6 +23,7 @@ use { staked_nodes_updater_service::StakedNodesUpdaterService, tpu_entry_notifier::TpuEntryNotifier, validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure}, + vortexor_receiver_adapter::VortexorReceiverAdapter, }, bytes::Bytes, crossbeam_channel::{bounded, unbounded, Receiver}, @@ -71,11 +72,26 @@ pub struct TpuSockets { pub transactions_quic: Vec, pub transactions_forwards_quic: Vec, pub vote_quic: Vec, + pub vortexor_receivers: Option>, +} + +enum SigVerifier { + Local(SigVerifyStage), + Remote(VortexorReceiverAdapter), +} + +impl SigVerifier { + fn join(self) -> thread::Result<()> { + match self { + SigVerifier::Local(sig_verify_stage) => sig_verify_stage.join(), + SigVerifier::Remote(vortexor_receiver_adapter) => vortexor_receiver_adapter.join(), + } + } } pub struct Tpu { fetch_stage: FetchStage, - sigverify_stage: SigVerifyStage, + sig_verifier: SigVerifier, vote_sigverify_stage: SigVerifyStage, banking_stage: BankingStage, forwarding_stage: JoinHandle<()>, @@ -140,6 +156,7 @@ impl Tpu { transactions_quic: transactions_quic_sockets, transactions_forwards_quic: transactions_forwards_quic_sockets, vote_quic: tpu_vote_quic_sockets, + vortexor_receivers, } = sockets; let (packet_sender, packet_receiver) = unbounded(); @@ -228,12 +245,29 @@ impl Tpu { .unwrap(); let (forward_stage_sender, forward_stage_receiver) = bounded(1024); - let sigverify_stage = { + let sig_verifier = if let Some(vortexor_receivers) = vortexor_receivers { + info!("starting vortexor adapter"); + let sockets = vortexor_receivers.into_iter().map(Arc::new).collect(); + let adapter = VortexorReceiverAdapter::new( + sockets, + Duration::from_millis(5), + tpu_coalesce, + non_vote_sender, + exit.clone(), + ); + SigVerifier::Remote(adapter) + } else { + info!("starting regular sigverify stage"); let verifier = TransactionSigVerifier::new( non_vote_sender, enable_block_production_forwarding.then(|| forward_stage_sender.clone()), ); - SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier") + SigVerifier::Local(SigVerifyStage::new( + packet_receiver, + verifier, + "solSigVerTpu", + "tpu-verifier", + )) }; let vote_sigverify_stage = { @@ -317,7 +351,7 @@ impl Tpu { ( Self { fetch_stage, - sigverify_stage, + sig_verifier, vote_sigverify_stage, banking_stage, forwarding_stage, @@ -337,7 +371,7 @@ impl Tpu { pub fn join(self) -> thread::Result<()> { let results = vec![ self.fetch_stage.join(), - self.sigverify_stage.join(), + self.sig_verifier.join(), self.vote_sigverify_stage.join(), self.cluster_info_vote_listener.join(), self.banking_stage.join(), diff --git a/core/src/validator.rs b/core/src/validator.rs index b281fd290648cb..64f9444047e071 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1576,6 +1576,7 @@ impl Validator { transactions_quic: node.sockets.tpu_quic, transactions_forwards_quic: node.sockets.tpu_forwards_quic, vote_quic: node.sockets.tpu_vote_quic, + vortexor_receivers: node.sockets.vortexor_receivers, }, &rpc_subscriptions, transaction_status_sender, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 47af8b70556b73..ffe6d6ac00495e 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2379,6 +2379,7 @@ pub struct Sockets { pub tpu_quic: Vec, pub tpu_forwards_quic: Vec, pub tpu_vote_quic: Vec, + pub vortexor_receivers: Option>, } pub struct NodeConfig { @@ -2387,6 +2388,8 @@ pub struct NodeConfig { pub bind_ip_addr: IpAddr, pub public_tpu_addr: Option, pub public_tpu_forwards_addr: Option, + pub vortexor_receiver_addr: Option, + /// The number of TVU receive sockets to create pub num_tvu_receive_sockets: NonZeroUsize, /// The number of TVU retransmit sockets to create @@ -2540,6 +2543,7 @@ impl Node { tpu_quic, tpu_forwards_quic, tpu_vote_quic, + vortexor_receivers: None, }, } } @@ -2703,6 +2707,7 @@ impl Node { tpu_quic, tpu_forwards_quic, tpu_vote_quic, + vortexor_receivers: None, }, } } @@ -2717,6 +2722,7 @@ impl Node { num_tvu_receive_sockets, num_tvu_retransmit_sockets, num_quic_endpoints, + vortexor_receiver_addr, } = config; let (gossip_port, (gossip, ip_echo)) = @@ -2830,6 +2836,23 @@ impl Node { info.set_serve_repair(QUIC, (addr, serve_repair_quic_port)) .unwrap(); + let vortexor_receivers = vortexor_receiver_addr.map(|vortexor_receiver_addr| { + multi_bind_in_range_with_config( + vortexor_receiver_addr.ip(), + ( + vortexor_receiver_addr.port(), + vortexor_receiver_addr.port() + 1, + ), + socket_config_reuseport, + 32, + ) + .unwrap_or_else(|_| { + panic!("Could not bind to the set vortexor_receiver_addr {vortexor_receiver_addr}") + }) + .1 + }); + + info!("vortexor_receivers is {vortexor_receivers:?}"); trace!("new ContactInfo: {:?}", info); Node { @@ -2853,6 +2876,7 @@ impl Node { tpu_quic, tpu_forwards_quic, tpu_vote_quic, + vortexor_receivers, }, } } @@ -3327,6 +3351,7 @@ mod tests { num_tvu_receive_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS, num_tvu_retransmit_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS, num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS, + vortexor_receiver_addr: None, }; let node = Node::new_with_external_ip(&solana_pubkey::new_rand(), config); @@ -3351,6 +3376,7 @@ mod tests { num_tvu_receive_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS, num_tvu_retransmit_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS, num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS, + vortexor_receiver_addr: None, }; let node = Node::new_with_external_ip(&solana_pubkey::new_rand(), config); diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index b1ae37e0a9f32a..9ac1ec536963c3 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5911,6 +5911,7 @@ dependencies = [ "solana-turbine", "solana-unified-scheduler-pool", "solana-version", + "solana-vortexor-receiver", "solana-vote", "solana-vote-program", "solana-wen-restart", @@ -9161,6 +9162,14 @@ dependencies = [ "solana-serde-varint", ] +[[package]] +name = "solana-vortexor-receiver" +version = "2.3.0" +dependencies = [ + "solana-perf", + "solana-streamer", +] + [[package]] name = "solana-vote" version = "2.3.0" diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 0043eb8e9ed961..05e08a52724f22 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -5769,6 +5769,7 @@ dependencies = [ "solana-turbine", "solana-unified-scheduler-pool", "solana-version", + "solana-vortexor-receiver", "solana-vote", "solana-vote-program", "solana-wen-restart", @@ -8502,6 +8503,14 @@ dependencies = [ "solana-serde-varint", ] +[[package]] +name = "solana-vortexor-receiver" +version = "2.3.0" +dependencies = [ + "solana-perf", + "solana-streamer", +] + [[package]] name = "solana-vote" version = "2.3.0" diff --git a/validator/src/commands/run/args.rs b/validator/src/commands/run/args.rs index 1c65982b2a9cd4..98dce6f3a5f4ab 100644 --- a/validator/src/commands/run/args.rs +++ b/validator/src/commands/run/args.rs @@ -377,6 +377,14 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a, --entrypoint or localhostwhen --entrypoint is not provided]", ), ) + .arg( + Arg::with_name("tpu_vortexor_receiver_address") + .long("tpu-vortexor-receiver-address") + .value_name("HOST:PORT") + .takes_value(true) + .validator(solana_net_utils::is_host_port) + .help("TPU Vortexor Receiver address to which verified transaction packet will be forwarded."), + ) .arg( Arg::with_name("public_rpc_addr") .long("public-rpc-address") diff --git a/validator/src/commands/run/execute.rs b/validator/src/commands/run/execute.rs index 9e0dc8daa97691..e21ab5cbc42813 100644 --- a/validator/src/commands/run/execute.rs +++ b/validator/src/commands/run/execute.rs @@ -1140,6 +1140,19 @@ pub fn execute( }) .transpose()?; + let tpu_vortexor_receiver_address = + matches + .value_of("tpu_vortexor_receiver_address") + .map(|tpu_vortexor_receiver_address| { + solana_net_utils::parse_host_port(tpu_vortexor_receiver_address).unwrap_or_else( + |err| { + eprintln!("Failed to parse --tpu-vortexor-receiver-address: {err}"); + exit(1); + }, + ) + }); + + info!("tpu_vortexor_receiver_address is {tpu_vortexor_receiver_address:?}"); let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", NonZeroUsize); let tpu_max_connections_per_peer = @@ -1166,6 +1179,7 @@ pub fn execute( num_tvu_receive_sockets: tvu_receive_threads, num_tvu_retransmit_sockets: tvu_retransmit_threads, num_quic_endpoints, + vortexor_receiver_addr: tpu_vortexor_receiver_address, }; let cluster_entrypoints = entrypoint_addrs diff --git a/vortexor-receiver/Cargo.toml b/vortexor-receiver/Cargo.toml new file mode 100644 index 00000000000000..c8843c9ab86c71 --- /dev/null +++ b/vortexor-receiver/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "solana-vortexor-receiver" +description = "Solana TPU Vortexor Receiver" +documentation = "https://docs.rs/solana-vortexor-receiver" +publish = false +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } + +[dependencies] +solana-perf = { workspace = true } +solana-streamer = { workspace = true } + +[dev-dependencies] +assert_matches = { workspace = true } +solana-streamer = { workspace = true, features = ["dev-context-only-utils"] } + +[lib] +crate-type = ["lib"] +name = "solana_vortexor_receiver" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/vortexor-receiver/Readme.md b/vortexor-receiver/Readme.md new file mode 100644 index 00000000000000..578f89ed70e89a --- /dev/null +++ b/vortexor-receiver/Readme.md @@ -0,0 +1,48 @@ +The vortexor is a service which can be used to offload receiving transaction from the public, doing signature verifications and deduplications from the core validator which can focus on processing and executing the transactions. The filtered transactions can be forwarded to the validators linked with the vortexor. +This vortexor makes the TPU transaction ingestion more scalable compared to single node solution. + +The archietecture diagram of the Vorexor with the relationship to the validator. + + +--------------------- VORTEXOR ------------------------+ + | | + | +-------------+ +--------------------+ | + TPU --> | | TPU Streamer| -----> | SigVerifier/Dedup | | + /QUIC | +-------------+ +--------------------+ | + | | | | + | v v | + | +----------------+ +------------------------+ | + | | Subscription |<----| VerifiedPacketForwarder| | + | | Management | +------------------------+ | + | +----------------+ | | + +--------------------------------|----------------------+ + ^ | + heart beat/subscriptions | v + +-------------------- AGAVE VALIDATOR ------------------+ + | | + | +----------------+ +-----------------------+ | +Validator Config-> | | Subscription | | VerifiedPacketReceiver| | +Admin RPC | | Management | | | | + | +----------------+ +-----------------------+ | + | | | | + | | v | + | v +-----------+ | + | +--------------------+ | Banking | | + Gossip <--------|--| Gossip/Contact Info| | Stage | | + | +--------------------+ +-----------+ | + +-------------------------------------------------------+ + + +The Vorexor is a new executable which can be deployed on to different nodes from the core Agave validator. +It has the following major components: + +1. The TPU Streamer -- this is built from the existing QUIC based TPU streamer +2. The SigVerify/Dedup -- this is built/refactored from the existing SigVerify component +3. Subscription Management -- This is responsible for managing subscriptions from the validator. + Subscriptions action include subscription for transactions and cancel subscriptions. +4. VerifiedPacketForwarder -- This is responsible for forwarding the verified transaction packets + to the subscribed validators. We target use UDP/QUIC to send transactions to the validators + The validators can use firewall rules to allow transactions only from the vortexor. + +In the validator, there is new component which receives the verified packets sent from the vortexor which directly sends the packets to the banking stage. The validator's Admin RPC is enhanced to configure the peering vortexor. The ContactInfo of the validator is updated with the address of the vortexor when it is linked with the validator. There is periodic heartbeat messages sent from the vortexor to the validator. If there are not transactions sent and no heartbeat messages from the vortexor within configurable timeout window, the validator may decide the vortexor is dead or disconnected it may choose to use another vortexor or use its own native QUI TPU streamer by updating the ContactInfo about TPU address. + +This module implements the VerifiedPacketReceiver. diff --git a/vortexor-receiver/src/lib.rs b/vortexor-receiver/src/lib.rs new file mode 100644 index 00000000000000..4c0db7eaa35a61 --- /dev/null +++ b/vortexor-receiver/src/lib.rs @@ -0,0 +1 @@ +pub mod receiver; diff --git a/vortexor-receiver/src/receiver.rs b/vortexor-receiver/src/receiver.rs new file mode 100644 index 00000000000000..31581620abdb5a --- /dev/null +++ b/vortexor-receiver/src/receiver.rs @@ -0,0 +1,58 @@ +/// This is responsible for receiving the verified and deduplicated transactions +/// from the vortexor and sending down to the banking stage. +use { + solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}, + solana_streamer::streamer::{self, PacketBatchSender, StreamerReceiveStats}, + std::{ + net::UdpSocket, + sync::{atomic::AtomicBool, Arc}, + thread::{self, JoinHandle}, + time::Duration, + }, +}; + +pub struct VerifiedPacketReceiver { + thread_hdls: Vec>, +} + +impl VerifiedPacketReceiver { + pub fn new( + sockets: Vec>, + sender: &PacketBatchSender, + coalesce: Duration, + in_vote_only_mode: Option>, + exit: Arc, + ) -> Self { + let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024); + + let tpu_stats = Arc::new(StreamerReceiveStats::new("vortexor_receiver")); + + let thread_hdls = sockets + .into_iter() + .enumerate() + .map(|(i, socket)| { + streamer::receiver( + format!("solVtxRcvr{i:02}"), + socket, + exit.clone(), + sender.clone(), + recycler.clone(), + tpu_stats.clone(), + Some(coalesce), + true, + in_vote_only_mode.clone(), + false, // unstaked connections + ) + }) + .collect(); + + Self { thread_hdls } + } + + pub fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls { + thread_hdl.join()?; + } + Ok(()) + } +} From b656fb678cd324124a5cd1da31e801e7aebbc533 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Sun, 16 Mar 2025 18:41:34 -0700 Subject: [PATCH 2/8] vortexor receiver adapter --- core/src/vortexor_receiver_adapter.rs | 108 ++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 core/src/vortexor_receiver_adapter.rs diff --git a/core/src/vortexor_receiver_adapter.rs b/core/src/vortexor_receiver_adapter.rs new file mode 100644 index 00000000000000..7c68883585880b --- /dev/null +++ b/core/src/vortexor_receiver_adapter.rs @@ -0,0 +1,108 @@ +//! Tempory solution to receive from the receiver and forward the packets to banking stage + +use { + crate::banking_trace::TracedSender, + agave_banking_stage_ingress_types::BankingPacketBatch, + crossbeam_channel::{unbounded, Receiver, RecvTimeoutError}, + solana_perf::packet::PacketBatch, + solana_vortexor_receiver::receiver::VerifiedPacketReceiver, + std::{ + net::UdpSocket, + sync::{atomic::AtomicBool, Arc}, + thread::{self, Builder, JoinHandle}, + time::{Duration, Instant}, + }, +}; + +pub struct VortexorReceiverAdapter { + thread_hdl: JoinHandle<()>, + receiver: VerifiedPacketReceiver, +} + +impl VortexorReceiverAdapter { + pub fn new( + sockets: Vec>, + recv_timeout: Duration, + tpu_coalesce: Duration, + packets_sender: TracedSender, + exit: Arc, + ) -> Self { + let (batch_sender, batch_receiver) = unbounded(); + + let receiver = + VerifiedPacketReceiver::new(sockets, &batch_sender, tpu_coalesce, None, exit.clone()); + + let thread_hdl = Builder::new() + .name("vtxRcvAdptr".to_string()) + .spawn(move || { + Self::recv_send(batch_receiver, recv_timeout, 8, packets_sender); + }) + .unwrap(); + Self { + thread_hdl, + receiver, + } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join()?; + self.receiver.join() + } + + fn recv_send( + packet_batch_receiver: Receiver, + recv_timeout: Duration, + batch_size: usize, + traced_sender: TracedSender, + ) { + loop { + match Self::receive_until(packet_batch_receiver.clone(), recv_timeout, batch_size) { + Ok(packet_batch) => { + let count = packet_batch.len(); + // Send out packet batches + match traced_sender.send(packet_batch) { + Ok(_) => { + info!("Sent vortexor batch {count} successfully"); + continue; + } + Err(_err) => { + info!("Failed to send batch {count}"); + break; + } + } + } + Err(err) => match err { + RecvTimeoutError::Timeout => { + continue; + } + RecvTimeoutError::Disconnected => { + break; + } + }, + } + } + } + + /// Receives packet batches from sigverify stage with a timeout + fn receive_until( + packet_batch_receiver: Receiver, + recv_timeout: Duration, + batch_size: usize, + ) -> Result { + let start = Instant::now(); + + let message = packet_batch_receiver.recv_timeout(recv_timeout)?; + let mut packet_batches = Vec::new(); + packet_batches.push(message); + + while let Ok(message) = packet_batch_receiver.try_recv() { + packet_batches.push(message); + + if start.elapsed() >= recv_timeout || packet_batches.len() >= batch_size { + break; + } + } + + Ok(Arc::new(packet_batches)) + } +} From 9858332c1340a7e75d6889aa110a725bcb8ea4d3 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 9 Apr 2025 13:17:34 -0700 Subject: [PATCH 3/8] Update some comments --- core/src/tpu.rs | 1 + core/src/vortexor_receiver_adapter.rs | 6 ++- vortexor-receiver/Readme.md | 61 ++++++++++++++++----------- vortexor-receiver/src/receiver.rs | 2 +- 4 files changed, 43 insertions(+), 27 deletions(-) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 70eac217f87303..c7e7a1d5b6dfc0 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -75,6 +75,7 @@ pub struct TpuSockets { pub vortexor_receivers: Option>, } +/// The `SigVerifier` enum is used to determine whether to use a local or remote signature verifier. enum SigVerifier { Local(SigVerifyStage), Remote(VortexorReceiverAdapter), diff --git a/core/src/vortexor_receiver_adapter.rs b/core/src/vortexor_receiver_adapter.rs index 7c68883585880b..14296e86ec30b2 100644 --- a/core/src/vortexor_receiver_adapter.rs +++ b/core/src/vortexor_receiver_adapter.rs @@ -1,4 +1,6 @@ -//! Tempory solution to receive from the receiver and forward the packets to banking stage +//! Vortexor receiver adapter which wraps the VerifiedPacketReceiver +//! to receive packet batches from the remote and sends the packets to the +//! banking stage. use { crate::banking_trace::TracedSender, @@ -83,7 +85,7 @@ impl VortexorReceiverAdapter { } } - /// Receives packet batches from sigverify stage with a timeout + /// Receives packet batches from VerifiedPacketReceiver with a timeout fn receive_until( packet_batch_receiver: Receiver, recv_timeout: Duration, diff --git a/vortexor-receiver/Readme.md b/vortexor-receiver/Readme.md index 578f89ed70e89a..46fd68d509fa8e 100644 --- a/vortexor-receiver/Readme.md +++ b/vortexor-receiver/Readme.md @@ -1,27 +1,53 @@ -The vortexor is a service which can be used to offload receiving transaction from the public, doing signature verifications and deduplications from the core validator which can focus on processing and executing the transactions. The filtered transactions can be forwarded to the validators linked with the vortexor. -This vortexor makes the TPU transaction ingestion more scalable compared to single node solution. +# Introduction +The Vortexor is a service that can offload the tasks of receiving transactions +from the public, performing signature verifications, and deduplications from the +core validator, enabling it to focus on processing and executing the +transactions. The verified and filtered transactions will then be forwarded to +the validators linked with the Vortexor. This setup makes the TPU transaction +ingestion and verification more scalable compared to a single-node solution. -The archietecture diagram of the Vorexor with the relationship to the validator. +This module implements the VerifiedPacketReceiver in the below architecture +which encapsulates the functionality of receiving the verified packet batches +from the vortexor. In the first impelementation, we use UDP to receive the +verified packets from the vortexor. It is designed to support other protocol +option such as using QUIC. +# Architecture +Figure 1 describes the architecture diagram of the Vortexor and its +relationship with the validator. + + +---------------------+ + | Solana | + | RPC / Web Socket | + | Service | + +---------------------+ + | + v +--------------------- VORTEXOR ------------------------+ - | | + | | | + | +------------------+ | + | | StakedKeyUpdater | | + | +------------------+ | + | | | + | v | | +-------------+ +--------------------+ | TPU --> | | TPU Streamer| -----> | SigVerifier/Dedup | | /QUIC | +-------------+ +--------------------+ | - | | | | - | v v | + | | | | + | v v | | +----------------+ +------------------------+ | | | Subscription |<----| VerifiedPacketForwarder| | | | Management | +------------------------+ | | +----------------+ | | +--------------------------------|----------------------+ - ^ | - heart beat/subscriptions | v + ^ | (UDP/QUIC) + Heartbeat/subscriptions | | + | v +-------------------- AGAVE VALIDATOR ------------------+ | | | +----------------+ +-----------------------+ | -Validator Config-> | | Subscription | | VerifiedPacketReceiver| | -Admin RPC | | Management | | | | + Config-> | | Subscription | | VerifiedPacketReceiver| | + Admin RPC | | Management | | | | | +----------------+ +-----------------------+ | | | | | | | v | @@ -31,18 +57,5 @@ Admin RPC | | Management | | | | | +--------------------+ +-----------+ | +-------------------------------------------------------+ + Figure 1. -The Vorexor is a new executable which can be deployed on to different nodes from the core Agave validator. -It has the following major components: - -1. The TPU Streamer -- this is built from the existing QUIC based TPU streamer -2. The SigVerify/Dedup -- this is built/refactored from the existing SigVerify component -3. Subscription Management -- This is responsible for managing subscriptions from the validator. - Subscriptions action include subscription for transactions and cancel subscriptions. -4. VerifiedPacketForwarder -- This is responsible for forwarding the verified transaction packets - to the subscribed validators. We target use UDP/QUIC to send transactions to the validators - The validators can use firewall rules to allow transactions only from the vortexor. - -In the validator, there is new component which receives the verified packets sent from the vortexor which directly sends the packets to the banking stage. The validator's Admin RPC is enhanced to configure the peering vortexor. The ContactInfo of the validator is updated with the address of the vortexor when it is linked with the validator. There is periodic heartbeat messages sent from the vortexor to the validator. If there are not transactions sent and no heartbeat messages from the vortexor within configurable timeout window, the validator may decide the vortexor is dead or disconnected it may choose to use another vortexor or use its own native QUI TPU streamer by updating the ContactInfo about TPU address. - -This module implements the VerifiedPacketReceiver. diff --git a/vortexor-receiver/src/receiver.rs b/vortexor-receiver/src/receiver.rs index 31581620abdb5a..4a8c3490498591 100644 --- a/vortexor-receiver/src/receiver.rs +++ b/vortexor-receiver/src/receiver.rs @@ -41,7 +41,7 @@ impl VerifiedPacketReceiver { Some(coalesce), true, in_vote_only_mode.clone(), - false, // unstaked connections + false, // is_staked_service ) }) .collect(); From 17b2a1b3db9e0fe9f5fdd03cee2f77505d802f7e Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 9 Apr 2025 15:52:40 -0700 Subject: [PATCH 4/8] Update some comments --- vortexor-receiver/Readme.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/vortexor-receiver/Readme.md b/vortexor-receiver/Readme.md index 46fd68d509fa8e..90834f86469ee3 100644 --- a/vortexor-receiver/Readme.md +++ b/vortexor-receiver/Readme.md @@ -57,5 +57,4 @@ relationship with the validator. | +--------------------+ +-----------+ | +-------------------------------------------------------+ - Figure 1. - + Figure 1. \ No newline at end of file From f5c69ee30b7415e5412efe09d2c34e4ff06f51e5 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 9 Apr 2025 16:23:33 -0700 Subject: [PATCH 5/8] Gaurd against 0 packets --- vortexor/src/sender.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vortexor/src/sender.rs b/vortexor/src/sender.rs index c5526cce311fea..89812ee8b7607d 100644 --- a/vortexor/src/sender.rs +++ b/vortexor/src/sender.rs @@ -83,7 +83,9 @@ impl PacketBatchSender { for batch in &packet_batches { for packet_batch in batch.iter() { for packet in packet_batch { - packets.push(packet.data(0..).unwrap()); + if let Some(data) = packet.data(0..) { + packets.push(data); + } } } } From ab02dd8e806517c83d6aef72df4aad90279f61e6 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 9 Apr 2025 16:34:57 -0700 Subject: [PATCH 6/8] update some debug messages --- core/src/vortexor_receiver_adapter.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/vortexor_receiver_adapter.rs b/core/src/vortexor_receiver_adapter.rs index 14296e86ec30b2..2831ebd9a2bc51 100644 --- a/core/src/vortexor_receiver_adapter.rs +++ b/core/src/vortexor_receiver_adapter.rs @@ -64,11 +64,11 @@ impl VortexorReceiverAdapter { // Send out packet batches match traced_sender.send(packet_batch) { Ok(_) => { - info!("Sent vortexor batch {count} successfully"); + trace!("Sent batch: {count} received from vortexor successfully"); continue; } - Err(_err) => { - info!("Failed to send batch {count}"); + Err(err) => { + debug!("Failed to send batch {count} {err:?}"); break; } } From 9931dfc85dfe68b50742009f3718b2505963ae90 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Fri, 11 Apr 2025 13:15:35 -0700 Subject: [PATCH 7/8] disable regular tpu streamers if vortexor is in-place --- core/src/tpu.rs | 92 +++++++++++++++++++++++++++++-------------------- 1 file changed, 55 insertions(+), 37 deletions(-) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index c7e7a1d5b6dfc0..b890e6304a7739 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -98,8 +98,8 @@ pub struct Tpu { forwarding_stage: JoinHandle<()>, cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_stage: BroadcastStage, - tpu_quic_t: thread::JoinHandle<()>, - tpu_forwards_quic_t: thread::JoinHandle<()>, + tpu_quic_t: Option>, + tpu_forwards_quic_t: Option>, tpu_entry_notifier: Option, staked_nodes_updater_service: StakedNodesUpdaterService, tracer_thread_hdl: TracerThread, @@ -211,39 +211,49 @@ impl Tpu { ) .unwrap(); - // Streamer for TPU - let SpawnServerResult { - endpoints: _, - thread: tpu_quic_t, - key_updater, - } = spawn_server_multi( - "solQuicTpu", - "quic_streamer_tpu", - transactions_quic_sockets, - keypair, - packet_sender, - exit.clone(), - staked_nodes.clone(), - tpu_quic_server_config, - ) - .unwrap(); + let (tpu_quic_t, key_updater) = if vortexor_receivers.is_none() { + // Streamer for TPU + let SpawnServerResult { + endpoints: _, + thread: tpu_quic_t, + key_updater, + } = spawn_server_multi( + "solQuicTpu", + "quic_streamer_tpu", + transactions_quic_sockets, + keypair, + packet_sender, + exit.clone(), + staked_nodes.clone(), + tpu_quic_server_config, + ) + .unwrap(); + (Some(tpu_quic_t), Some(key_updater)) + } else { + (None, None) + }; - // Streamer for TPU forward - let SpawnServerResult { - endpoints: _, - thread: tpu_forwards_quic_t, - key_updater: forwards_key_updater, - } = spawn_server_multi( - "solQuicTpuFwd", - "quic_streamer_tpu_forwards", - transactions_forwards_quic_sockets, - keypair, - forwarded_packet_sender, - exit.clone(), - staked_nodes.clone(), - tpu_fwd_quic_server_config, - ) - .unwrap(); + let (tpu_forwards_quic_t, forwards_key_updater) = if vortexor_receivers.is_none() { + // Streamer for TPU forward + let SpawnServerResult { + endpoints: _, + thread: tpu_forwards_quic_t, + key_updater: forwards_key_updater, + } = spawn_server_multi( + "solQuicTpuFwd", + "quic_streamer_tpu_forwards", + transactions_forwards_quic_sockets, + keypair, + forwarded_packet_sender, + exit.clone(), + staked_nodes.clone(), + tpu_fwd_quic_server_config, + ) + .unwrap(); + (Some(tpu_forwards_quic_t), Some(forwards_key_updater)) + } else { + (None, None) + }; let (forward_stage_sender, forward_stage_receiver) = bounded(1024); let sig_verifier = if let Some(vortexor_receivers) = vortexor_receivers { @@ -349,6 +359,14 @@ impl Tpu { turbine_quic_endpoint_sender, ); + let mut key_updaters: Vec> = Vec::new(); + if let Some(key_updater) = key_updater { + key_updaters.push(key_updater); + } + if let Some(forwards_key_updater) = forwards_key_updater { + key_updaters.push(forwards_key_updater); + } + key_updaters.push(vote_streamer_key_updater); ( Self { fetch_stage, @@ -365,7 +383,7 @@ impl Tpu { tracer_thread_hdl, tpu_vote_quic_t, }, - vec![key_updater, forwards_key_updater, vote_streamer_key_updater], + key_updaters, ) } @@ -378,8 +396,8 @@ impl Tpu { self.banking_stage.join(), self.forwarding_stage.join(), self.staked_nodes_updater_service.join(), - self.tpu_quic_t.join(), - self.tpu_forwards_quic_t.join(), + self.tpu_quic_t.map_or(Ok(()), |t| t.join()), + self.tpu_forwards_quic_t.map_or(Ok(()), |t| t.join()), self.tpu_vote_quic_t.join(), ]; let broadcast_result = self.broadcast_stage.join(); From d956aae962f7b1063432fd96432ce6540384e859 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Fri, 11 Apr 2025 15:34:59 -0700 Subject: [PATCH 8/8] disable regular tpu streamers if vortexor is in-place --- core/src/tpu.rs | 1 + core/src/vortexor_receiver_adapter.rs | 43 ++++++++++++++++++++------- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index b890e6304a7739..cc18627cf31ea4 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -264,6 +264,7 @@ impl Tpu { Duration::from_millis(5), tpu_coalesce, non_vote_sender, + enable_block_production_forwarding.then(|| forward_stage_sender.clone()), exit.clone(), ); SigVerifier::Remote(adapter) diff --git a/core/src/vortexor_receiver_adapter.rs b/core/src/vortexor_receiver_adapter.rs index 2831ebd9a2bc51..099e2d087055ba 100644 --- a/core/src/vortexor_receiver_adapter.rs +++ b/core/src/vortexor_receiver_adapter.rs @@ -5,7 +5,7 @@ use { crate::banking_trace::TracedSender, agave_banking_stage_ingress_types::BankingPacketBatch, - crossbeam_channel::{unbounded, Receiver, RecvTimeoutError}, + crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, solana_perf::packet::PacketBatch, solana_vortexor_receiver::receiver::VerifiedPacketReceiver, std::{ @@ -16,6 +16,21 @@ use { }, }; +// Macro to send the packet batch to the sender +macro_rules! send { + ($sender:expr, $batch:expr, $count:expr) => { + match $sender.send($batch) { + Ok(_) => { + trace!("Sent batch: {} received from vortexor successfully", $count); + } + Err(err) => { + debug!("Failed to send batch {} error: {:?}", $count, err); + break; + } + } + }; +} + pub struct VortexorReceiverAdapter { thread_hdl: JoinHandle<()>, receiver: VerifiedPacketReceiver, @@ -27,6 +42,7 @@ impl VortexorReceiverAdapter { recv_timeout: Duration, tpu_coalesce: Duration, packets_sender: TracedSender, + forward_stage_sender: Option>, exit: Arc, ) -> Self { let (batch_sender, batch_receiver) = unbounded(); @@ -37,7 +53,13 @@ impl VortexorReceiverAdapter { let thread_hdl = Builder::new() .name("vtxRcvAdptr".to_string()) .spawn(move || { - Self::recv_send(batch_receiver, recv_timeout, 8, packets_sender); + Self::recv_send( + batch_receiver, + recv_timeout, + 8, + packets_sender, + forward_stage_sender, + ); }) .unwrap(); Self { @@ -56,21 +78,20 @@ impl VortexorReceiverAdapter { recv_timeout: Duration, batch_size: usize, traced_sender: TracedSender, + forward_stage_sender: Option>, ) { loop { match Self::receive_until(packet_batch_receiver.clone(), recv_timeout, batch_size) { Ok(packet_batch) => { let count = packet_batch.len(); // Send out packet batches - match traced_sender.send(packet_batch) { - Ok(_) => { - trace!("Sent batch: {count} received from vortexor successfully"); - continue; - } - Err(err) => { - debug!("Failed to send batch {count} {err:?}"); - break; - } + if let Some(forward_stage_sender) = &forward_stage_sender { + send!(traced_sender, packet_batch.clone(), count); + // Send out packet batches to forward stage + let _ = forward_stage_sender + .try_send((packet_batch, false /* reject non-vote */)); + } else { + send!(traced_sender, packet_batch, count); } } Err(err) => match err {