diff --git a/Cargo.lock b/Cargo.lock index dfb692199ab11e..55554c393adfe3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7571,6 +7571,7 @@ dependencies = [ "solana-unified-scheduler-logic", "solana-unified-scheduler-pool", "solana-version", + "solana-vortexor-receiver", "solana-vote", "solana-vote-program", "solana-wen-restart", @@ -11123,6 +11124,15 @@ dependencies = [ "x509-parser", ] +[[package]] +name = "solana-vortexor-receiver" +version = "2.3.0" +dependencies = [ + "assert_matches", + "solana-perf", + "solana-streamer", +] + [[package]] name = "solana-vote" version = "2.3.0" diff --git a/Cargo.toml b/Cargo.toml index e861f53e3a3fd6..10e1377a58d9bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,6 +149,7 @@ members = [ "validator", "version", "vortexor", + "vortexor-receiver", "vote", "watchtower", "wen-restart", @@ -566,6 +567,7 @@ solana-type-overrides = { path = "type-overrides", version = "=2.3.0" } solana-udp-client = { path = "udp-client", version = "=2.3.0" } solana-validator-exit = "2.2.1" solana-version = { path = "version", version = "=2.3.0" } +solana-vortexor-receiver = { path = "vortexor-receiver", version = "=2.3.0" } solana-vote = { path = "vote", version = "=2.3.0" } solana-vote-interface = "2.2.3" solana-vote-program = { path = "programs/vote", version = "=2.3.0", default-features = false } diff --git a/core/Cargo.toml b/core/Cargo.toml index ab04280ee6dba2..bc3d6762a8a062 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -96,6 +96,7 @@ solana-transaction-status = { workspace = true } solana-turbine = { workspace = true } solana-unified-scheduler-pool = { workspace = true } solana-version = { workspace = true } +solana-vortexor-receiver = { workspace = true } solana-vote = { workspace = true } solana-vote-program = { workspace = true } solana-wen-restart = { workspace = true } diff --git a/core/src/lib.rs b/core/src/lib.rs index 5c6b6bafdbb001..4272ee04e3b43e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -41,6 +41,7 @@ mod tpu_entry_notifier; pub mod tvu; pub mod unfrozen_gossip_verified_vote_hashes; pub mod validator; +mod vortexor_receiver_adapter; pub mod vote_simulator; pub mod voting_service; pub mod warm_quic_cache_service; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 281957f500509e..cc18627cf31ea4 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -23,6 +23,7 @@ use { staked_nodes_updater_service::StakedNodesUpdaterService, tpu_entry_notifier::TpuEntryNotifier, validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure}, + vortexor_receiver_adapter::VortexorReceiverAdapter, }, bytes::Bytes, crossbeam_channel::{bounded, unbounded, Receiver}, @@ -71,18 +72,34 @@ pub struct TpuSockets { pub transactions_quic: Vec, pub transactions_forwards_quic: Vec, pub vote_quic: Vec, + pub vortexor_receivers: Option>, +} + +/// The `SigVerifier` enum is used to determine whether to use a local or remote signature verifier. +enum SigVerifier { + Local(SigVerifyStage), + Remote(VortexorReceiverAdapter), +} + +impl SigVerifier { + fn join(self) -> thread::Result<()> { + match self { + SigVerifier::Local(sig_verify_stage) => sig_verify_stage.join(), + SigVerifier::Remote(vortexor_receiver_adapter) => vortexor_receiver_adapter.join(), + } + } } pub struct Tpu { fetch_stage: FetchStage, - sigverify_stage: SigVerifyStage, + sig_verifier: SigVerifier, vote_sigverify_stage: SigVerifyStage, banking_stage: BankingStage, forwarding_stage: JoinHandle<()>, cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_stage: BroadcastStage, - tpu_quic_t: thread::JoinHandle<()>, - tpu_forwards_quic_t: thread::JoinHandle<()>, + tpu_quic_t: Option>, + tpu_forwards_quic_t: Option>, tpu_entry_notifier: Option, staked_nodes_updater_service: StakedNodesUpdaterService, tracer_thread_hdl: TracerThread, @@ -140,6 +157,7 @@ impl Tpu { transactions_quic: transactions_quic_sockets, transactions_forwards_quic: transactions_forwards_quic_sockets, vote_quic: tpu_vote_quic_sockets, + vortexor_receivers, } = sockets; let (packet_sender, packet_receiver) = unbounded(); @@ -193,47 +211,75 @@ impl Tpu { ) .unwrap(); - // Streamer for TPU - let SpawnServerResult { - endpoints: _, - thread: tpu_quic_t, - key_updater, - } = spawn_server_multi( - "solQuicTpu", - "quic_streamer_tpu", - transactions_quic_sockets, - keypair, - packet_sender, - exit.clone(), - staked_nodes.clone(), - tpu_quic_server_config, - ) - .unwrap(); + let (tpu_quic_t, key_updater) = if vortexor_receivers.is_none() { + // Streamer for TPU + let SpawnServerResult { + endpoints: _, + thread: tpu_quic_t, + key_updater, + } = spawn_server_multi( + "solQuicTpu", + "quic_streamer_tpu", + transactions_quic_sockets, + keypair, + packet_sender, + exit.clone(), + staked_nodes.clone(), + tpu_quic_server_config, + ) + .unwrap(); + (Some(tpu_quic_t), Some(key_updater)) + } else { + (None, None) + }; - // Streamer for TPU forward - let SpawnServerResult { - endpoints: _, - thread: tpu_forwards_quic_t, - key_updater: forwards_key_updater, - } = spawn_server_multi( - "solQuicTpuFwd", - "quic_streamer_tpu_forwards", - transactions_forwards_quic_sockets, - keypair, - forwarded_packet_sender, - exit.clone(), - staked_nodes.clone(), - tpu_fwd_quic_server_config, - ) - .unwrap(); + let (tpu_forwards_quic_t, forwards_key_updater) = if vortexor_receivers.is_none() { + // Streamer for TPU forward + let SpawnServerResult { + endpoints: _, + thread: tpu_forwards_quic_t, + key_updater: forwards_key_updater, + } = spawn_server_multi( + "solQuicTpuFwd", + "quic_streamer_tpu_forwards", + transactions_forwards_quic_sockets, + keypair, + forwarded_packet_sender, + exit.clone(), + staked_nodes.clone(), + tpu_fwd_quic_server_config, + ) + .unwrap(); + (Some(tpu_forwards_quic_t), Some(forwards_key_updater)) + } else { + (None, None) + }; let (forward_stage_sender, forward_stage_receiver) = bounded(1024); - let sigverify_stage = { + let sig_verifier = if let Some(vortexor_receivers) = vortexor_receivers { + info!("starting vortexor adapter"); + let sockets = vortexor_receivers.into_iter().map(Arc::new).collect(); + let adapter = VortexorReceiverAdapter::new( + sockets, + Duration::from_millis(5), + tpu_coalesce, + non_vote_sender, + enable_block_production_forwarding.then(|| forward_stage_sender.clone()), + exit.clone(), + ); + SigVerifier::Remote(adapter) + } else { + info!("starting regular sigverify stage"); let verifier = TransactionSigVerifier::new( non_vote_sender, enable_block_production_forwarding.then(|| forward_stage_sender.clone()), ); - SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier") + SigVerifier::Local(SigVerifyStage::new( + packet_receiver, + verifier, + "solSigVerTpu", + "tpu-verifier", + )) }; let vote_sigverify_stage = { @@ -314,10 +360,18 @@ impl Tpu { turbine_quic_endpoint_sender, ); + let mut key_updaters: Vec> = Vec::new(); + if let Some(key_updater) = key_updater { + key_updaters.push(key_updater); + } + if let Some(forwards_key_updater) = forwards_key_updater { + key_updaters.push(forwards_key_updater); + } + key_updaters.push(vote_streamer_key_updater); ( Self { fetch_stage, - sigverify_stage, + sig_verifier, vote_sigverify_stage, banking_stage, forwarding_stage, @@ -330,21 +384,21 @@ impl Tpu { tracer_thread_hdl, tpu_vote_quic_t, }, - vec![key_updater, forwards_key_updater, vote_streamer_key_updater], + key_updaters, ) } pub fn join(self) -> thread::Result<()> { let results = vec![ self.fetch_stage.join(), - self.sigverify_stage.join(), + self.sig_verifier.join(), self.vote_sigverify_stage.join(), self.cluster_info_vote_listener.join(), self.banking_stage.join(), self.forwarding_stage.join(), self.staked_nodes_updater_service.join(), - self.tpu_quic_t.join(), - self.tpu_forwards_quic_t.join(), + self.tpu_quic_t.map_or(Ok(()), |t| t.join()), + self.tpu_forwards_quic_t.map_or(Ok(()), |t| t.join()), self.tpu_vote_quic_t.join(), ]; let broadcast_result = self.broadcast_stage.join(); diff --git a/core/src/validator.rs b/core/src/validator.rs index b281fd290648cb..64f9444047e071 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1576,6 +1576,7 @@ impl Validator { transactions_quic: node.sockets.tpu_quic, transactions_forwards_quic: node.sockets.tpu_forwards_quic, vote_quic: node.sockets.tpu_vote_quic, + vortexor_receivers: node.sockets.vortexor_receivers, }, &rpc_subscriptions, transaction_status_sender, diff --git a/core/src/vortexor_receiver_adapter.rs b/core/src/vortexor_receiver_adapter.rs new file mode 100644 index 00000000000000..099e2d087055ba --- /dev/null +++ b/core/src/vortexor_receiver_adapter.rs @@ -0,0 +1,131 @@ +//! Vortexor receiver adapter which wraps the VerifiedPacketReceiver +//! to receive packet batches from the remote and sends the packets to the +//! banking stage. + +use { + crate::banking_trace::TracedSender, + agave_banking_stage_ingress_types::BankingPacketBatch, + crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, + solana_perf::packet::PacketBatch, + solana_vortexor_receiver::receiver::VerifiedPacketReceiver, + std::{ + net::UdpSocket, + sync::{atomic::AtomicBool, Arc}, + thread::{self, Builder, JoinHandle}, + time::{Duration, Instant}, + }, +}; + +// Macro to send the packet batch to the sender +macro_rules! send { + ($sender:expr, $batch:expr, $count:expr) => { + match $sender.send($batch) { + Ok(_) => { + trace!("Sent batch: {} received from vortexor successfully", $count); + } + Err(err) => { + debug!("Failed to send batch {} error: {:?}", $count, err); + break; + } + } + }; +} + +pub struct VortexorReceiverAdapter { + thread_hdl: JoinHandle<()>, + receiver: VerifiedPacketReceiver, +} + +impl VortexorReceiverAdapter { + pub fn new( + sockets: Vec>, + recv_timeout: Duration, + tpu_coalesce: Duration, + packets_sender: TracedSender, + forward_stage_sender: Option>, + exit: Arc, + ) -> Self { + let (batch_sender, batch_receiver) = unbounded(); + + let receiver = + VerifiedPacketReceiver::new(sockets, &batch_sender, tpu_coalesce, None, exit.clone()); + + let thread_hdl = Builder::new() + .name("vtxRcvAdptr".to_string()) + .spawn(move || { + Self::recv_send( + batch_receiver, + recv_timeout, + 8, + packets_sender, + forward_stage_sender, + ); + }) + .unwrap(); + Self { + thread_hdl, + receiver, + } + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join()?; + self.receiver.join() + } + + fn recv_send( + packet_batch_receiver: Receiver, + recv_timeout: Duration, + batch_size: usize, + traced_sender: TracedSender, + forward_stage_sender: Option>, + ) { + loop { + match Self::receive_until(packet_batch_receiver.clone(), recv_timeout, batch_size) { + Ok(packet_batch) => { + let count = packet_batch.len(); + // Send out packet batches + if let Some(forward_stage_sender) = &forward_stage_sender { + send!(traced_sender, packet_batch.clone(), count); + // Send out packet batches to forward stage + let _ = forward_stage_sender + .try_send((packet_batch, false /* reject non-vote */)); + } else { + send!(traced_sender, packet_batch, count); + } + } + Err(err) => match err { + RecvTimeoutError::Timeout => { + continue; + } + RecvTimeoutError::Disconnected => { + break; + } + }, + } + } + } + + /// Receives packet batches from VerifiedPacketReceiver with a timeout + fn receive_until( + packet_batch_receiver: Receiver, + recv_timeout: Duration, + batch_size: usize, + ) -> Result { + let start = Instant::now(); + + let message = packet_batch_receiver.recv_timeout(recv_timeout)?; + let mut packet_batches = Vec::new(); + packet_batches.push(message); + + while let Ok(message) = packet_batch_receiver.try_recv() { + packet_batches.push(message); + + if start.elapsed() >= recv_timeout || packet_batches.len() >= batch_size { + break; + } + } + + Ok(Arc::new(packet_batches)) + } +} diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 47af8b70556b73..ffe6d6ac00495e 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2379,6 +2379,7 @@ pub struct Sockets { pub tpu_quic: Vec, pub tpu_forwards_quic: Vec, pub tpu_vote_quic: Vec, + pub vortexor_receivers: Option>, } pub struct NodeConfig { @@ -2387,6 +2388,8 @@ pub struct NodeConfig { pub bind_ip_addr: IpAddr, pub public_tpu_addr: Option, pub public_tpu_forwards_addr: Option, + pub vortexor_receiver_addr: Option, + /// The number of TVU receive sockets to create pub num_tvu_receive_sockets: NonZeroUsize, /// The number of TVU retransmit sockets to create @@ -2540,6 +2543,7 @@ impl Node { tpu_quic, tpu_forwards_quic, tpu_vote_quic, + vortexor_receivers: None, }, } } @@ -2703,6 +2707,7 @@ impl Node { tpu_quic, tpu_forwards_quic, tpu_vote_quic, + vortexor_receivers: None, }, } } @@ -2717,6 +2722,7 @@ impl Node { num_tvu_receive_sockets, num_tvu_retransmit_sockets, num_quic_endpoints, + vortexor_receiver_addr, } = config; let (gossip_port, (gossip, ip_echo)) = @@ -2830,6 +2836,23 @@ impl Node { info.set_serve_repair(QUIC, (addr, serve_repair_quic_port)) .unwrap(); + let vortexor_receivers = vortexor_receiver_addr.map(|vortexor_receiver_addr| { + multi_bind_in_range_with_config( + vortexor_receiver_addr.ip(), + ( + vortexor_receiver_addr.port(), + vortexor_receiver_addr.port() + 1, + ), + socket_config_reuseport, + 32, + ) + .unwrap_or_else(|_| { + panic!("Could not bind to the set vortexor_receiver_addr {vortexor_receiver_addr}") + }) + .1 + }); + + info!("vortexor_receivers is {vortexor_receivers:?}"); trace!("new ContactInfo: {:?}", info); Node { @@ -2853,6 +2876,7 @@ impl Node { tpu_quic, tpu_forwards_quic, tpu_vote_quic, + vortexor_receivers, }, } } @@ -3327,6 +3351,7 @@ mod tests { num_tvu_receive_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS, num_tvu_retransmit_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS, num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS, + vortexor_receiver_addr: None, }; let node = Node::new_with_external_ip(&solana_pubkey::new_rand(), config); @@ -3351,6 +3376,7 @@ mod tests { num_tvu_receive_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS, num_tvu_retransmit_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS, num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS, + vortexor_receiver_addr: None, }; let node = Node::new_with_external_ip(&solana_pubkey::new_rand(), config); diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index b1ae37e0a9f32a..9ac1ec536963c3 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5911,6 +5911,7 @@ dependencies = [ "solana-turbine", "solana-unified-scheduler-pool", "solana-version", + "solana-vortexor-receiver", "solana-vote", "solana-vote-program", "solana-wen-restart", @@ -9161,6 +9162,14 @@ dependencies = [ "solana-serde-varint", ] +[[package]] +name = "solana-vortexor-receiver" +version = "2.3.0" +dependencies = [ + "solana-perf", + "solana-streamer", +] + [[package]] name = "solana-vote" version = "2.3.0" diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 0043eb8e9ed961..05e08a52724f22 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -5769,6 +5769,7 @@ dependencies = [ "solana-turbine", "solana-unified-scheduler-pool", "solana-version", + "solana-vortexor-receiver", "solana-vote", "solana-vote-program", "solana-wen-restart", @@ -8502,6 +8503,14 @@ dependencies = [ "solana-serde-varint", ] +[[package]] +name = "solana-vortexor-receiver" +version = "2.3.0" +dependencies = [ + "solana-perf", + "solana-streamer", +] + [[package]] name = "solana-vote" version = "2.3.0" diff --git a/validator/src/commands/run/args.rs b/validator/src/commands/run/args.rs index 1c65982b2a9cd4..98dce6f3a5f4ab 100644 --- a/validator/src/commands/run/args.rs +++ b/validator/src/commands/run/args.rs @@ -377,6 +377,14 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a, --entrypoint or localhostwhen --entrypoint is not provided]", ), ) + .arg( + Arg::with_name("tpu_vortexor_receiver_address") + .long("tpu-vortexor-receiver-address") + .value_name("HOST:PORT") + .takes_value(true) + .validator(solana_net_utils::is_host_port) + .help("TPU Vortexor Receiver address to which verified transaction packet will be forwarded."), + ) .arg( Arg::with_name("public_rpc_addr") .long("public-rpc-address") diff --git a/validator/src/commands/run/execute.rs b/validator/src/commands/run/execute.rs index 9e0dc8daa97691..e21ab5cbc42813 100644 --- a/validator/src/commands/run/execute.rs +++ b/validator/src/commands/run/execute.rs @@ -1140,6 +1140,19 @@ pub fn execute( }) .transpose()?; + let tpu_vortexor_receiver_address = + matches + .value_of("tpu_vortexor_receiver_address") + .map(|tpu_vortexor_receiver_address| { + solana_net_utils::parse_host_port(tpu_vortexor_receiver_address).unwrap_or_else( + |err| { + eprintln!("Failed to parse --tpu-vortexor-receiver-address: {err}"); + exit(1); + }, + ) + }); + + info!("tpu_vortexor_receiver_address is {tpu_vortexor_receiver_address:?}"); let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", NonZeroUsize); let tpu_max_connections_per_peer = @@ -1166,6 +1179,7 @@ pub fn execute( num_tvu_receive_sockets: tvu_receive_threads, num_tvu_retransmit_sockets: tvu_retransmit_threads, num_quic_endpoints, + vortexor_receiver_addr: tpu_vortexor_receiver_address, }; let cluster_entrypoints = entrypoint_addrs diff --git a/vortexor-receiver/Cargo.toml b/vortexor-receiver/Cargo.toml new file mode 100644 index 00000000000000..c8843c9ab86c71 --- /dev/null +++ b/vortexor-receiver/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "solana-vortexor-receiver" +description = "Solana TPU Vortexor Receiver" +documentation = "https://docs.rs/solana-vortexor-receiver" +publish = false +version = { workspace = true } +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } + +[dependencies] +solana-perf = { workspace = true } +solana-streamer = { workspace = true } + +[dev-dependencies] +assert_matches = { workspace = true } +solana-streamer = { workspace = true, features = ["dev-context-only-utils"] } + +[lib] +crate-type = ["lib"] +name = "solana_vortexor_receiver" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/vortexor-receiver/Readme.md b/vortexor-receiver/Readme.md new file mode 100644 index 00000000000000..90834f86469ee3 --- /dev/null +++ b/vortexor-receiver/Readme.md @@ -0,0 +1,60 @@ +# Introduction +The Vortexor is a service that can offload the tasks of receiving transactions +from the public, performing signature verifications, and deduplications from the +core validator, enabling it to focus on processing and executing the +transactions. The verified and filtered transactions will then be forwarded to +the validators linked with the Vortexor. This setup makes the TPU transaction +ingestion and verification more scalable compared to a single-node solution. + +This module implements the VerifiedPacketReceiver in the below architecture +which encapsulates the functionality of receiving the verified packet batches +from the vortexor. In the first impelementation, we use UDP to receive the +verified packets from the vortexor. It is designed to support other protocol +option such as using QUIC. + +# Architecture +Figure 1 describes the architecture diagram of the Vortexor and its +relationship with the validator. + + +---------------------+ + | Solana | + | RPC / Web Socket | + | Service | + +---------------------+ + | + v + +--------------------- VORTEXOR ------------------------+ + | | | + | +------------------+ | + | | StakedKeyUpdater | | + | +------------------+ | + | | | + | v | + | +-------------+ +--------------------+ | + TPU --> | | TPU Streamer| -----> | SigVerifier/Dedup | | + /QUIC | +-------------+ +--------------------+ | + | | | | + | v v | + | +----------------+ +------------------------+ | + | | Subscription |<----| VerifiedPacketForwarder| | + | | Management | +------------------------+ | + | +----------------+ | | + +--------------------------------|----------------------+ + ^ | (UDP/QUIC) + Heartbeat/subscriptions | | + | v + +-------------------- AGAVE VALIDATOR ------------------+ + | | + | +----------------+ +-----------------------+ | + Config-> | | Subscription | | VerifiedPacketReceiver| | + Admin RPC | | Management | | | | + | +----------------+ +-----------------------+ | + | | | | + | | v | + | v +-----------+ | + | +--------------------+ | Banking | | + Gossip <--------|--| Gossip/Contact Info| | Stage | | + | +--------------------+ +-----------+ | + +-------------------------------------------------------+ + + Figure 1. \ No newline at end of file diff --git a/vortexor-receiver/src/lib.rs b/vortexor-receiver/src/lib.rs new file mode 100644 index 00000000000000..4c0db7eaa35a61 --- /dev/null +++ b/vortexor-receiver/src/lib.rs @@ -0,0 +1 @@ +pub mod receiver; diff --git a/vortexor-receiver/src/receiver.rs b/vortexor-receiver/src/receiver.rs new file mode 100644 index 00000000000000..4a8c3490498591 --- /dev/null +++ b/vortexor-receiver/src/receiver.rs @@ -0,0 +1,58 @@ +/// This is responsible for receiving the verified and deduplicated transactions +/// from the vortexor and sending down to the banking stage. +use { + solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}, + solana_streamer::streamer::{self, PacketBatchSender, StreamerReceiveStats}, + std::{ + net::UdpSocket, + sync::{atomic::AtomicBool, Arc}, + thread::{self, JoinHandle}, + time::Duration, + }, +}; + +pub struct VerifiedPacketReceiver { + thread_hdls: Vec>, +} + +impl VerifiedPacketReceiver { + pub fn new( + sockets: Vec>, + sender: &PacketBatchSender, + coalesce: Duration, + in_vote_only_mode: Option>, + exit: Arc, + ) -> Self { + let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024); + + let tpu_stats = Arc::new(StreamerReceiveStats::new("vortexor_receiver")); + + let thread_hdls = sockets + .into_iter() + .enumerate() + .map(|(i, socket)| { + streamer::receiver( + format!("solVtxRcvr{i:02}"), + socket, + exit.clone(), + sender.clone(), + recycler.clone(), + tpu_stats.clone(), + Some(coalesce), + true, + in_vote_only_mode.clone(), + false, // is_staked_service + ) + }) + .collect(); + + Self { thread_hdls } + } + + pub fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls { + thread_hdl.join()?; + } + Ok(()) + } +} diff --git a/vortexor/src/sender.rs b/vortexor/src/sender.rs index c5526cce311fea..89812ee8b7607d 100644 --- a/vortexor/src/sender.rs +++ b/vortexor/src/sender.rs @@ -83,7 +83,9 @@ impl PacketBatchSender { for batch in &packet_batches { for packet_batch in batch.iter() { for packet in packet_batch { - packets.push(packet.data(0..).unwrap()); + if let Some(data) = packet.data(0..) { + packets.push(data); + } } } }