diff --git a/Cargo.lock b/Cargo.lock index 8eb926449143fd..1d4f31186dd151 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7909,7 +7909,6 @@ dependencies = [ "solana-unified-scheduler-pool", "solana-validator-exit", "solana-version", - "solana-vortexor-receiver", "solana-vote", "solana-vote-program", "solana-wen-restart", @@ -11630,15 +11629,6 @@ dependencies = [ "x509-parser", ] -[[package]] -name = "solana-vortexor-receiver" -version = "3.0.0" -dependencies = [ - "assert_matches", - "solana-perf", - "solana-streamer", -] - [[package]] name = "solana-vote" version = "3.0.0" diff --git a/Cargo.toml b/Cargo.toml index f1a0c44f4599e7..1947708cf12d02 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -135,7 +135,6 @@ members = [ "validator", "version", "vortexor", - "vortexor-receiver", "vote", "watchtower", "wen-restart", @@ -554,7 +553,6 @@ solana-unified-scheduler-logic = { path = "unified-scheduler-logic", version = " solana-unified-scheduler-pool = { path = "unified-scheduler-pool", version = "=3.0.0" } solana-validator-exit = "2.2.1" solana-version = { path = "version", version = "=3.0.0" } -solana-vortexor-receiver = { path = "vortexor-receiver", version = "=3.0.0" } solana-vote = { path = "vote", version = "=3.0.0" } solana-vote-interface = "2.2.5" solana-vote-program = { path = "programs/vote", version = "=3.0.0", default-features = false } diff --git a/core/Cargo.toml b/core/Cargo.toml index 85a0b12fcee584..7622e8a6b93cfe 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -159,7 +159,6 @@ solana-turbine = { workspace = true } solana-unified-scheduler-pool = { workspace = true } solana-validator-exit = { workspace = true } solana-version = { workspace = true } -solana-vortexor-receiver = { workspace = true } solana-vote = { workspace = true } solana-vote-program = { workspace = true } solana-wen-restart = { workspace = true } diff --git a/core/src/lib.rs b/core/src/lib.rs index 4272ee04e3b43e..5c6b6bafdbb001 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -41,7 +41,6 @@ mod tpu_entry_notifier; pub mod tvu; pub mod unfrozen_gossip_verified_vote_hashes; pub mod validator; -mod vortexor_receiver_adapter; pub mod vote_simulator; pub mod voting_service; pub mod warm_quic_cache_service; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 4dcc7b2ec7e6cb..a6d58452bb6026 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -28,7 +28,6 @@ use { staked_nodes_updater_service::StakedNodesUpdaterService, tpu_entry_notifier::TpuEntryNotifier, validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure}, - vortexor_receiver_adapter::VortexorReceiverAdapter, }, bytes::Bytes, crossbeam_channel::{bounded, unbounded, Receiver}, @@ -80,34 +79,18 @@ pub struct TpuSockets { pub vote_quic: Vec, /// Client-side socket for the forwarding votes. pub vote_forwarding_client: UdpSocket, - pub vortexor_receivers: Option>, -} - -/// The `SigVerifier` enum is used to determine whether to use a local or remote signature verifier. -enum SigVerifier { - Local(SigVerifyStage), - Remote(VortexorReceiverAdapter), -} - -impl SigVerifier { - fn join(self) -> thread::Result<()> { - match self { - SigVerifier::Local(sig_verify_stage) => sig_verify_stage.join(), - SigVerifier::Remote(vortexor_receiver_adapter) => vortexor_receiver_adapter.join(), - } - } } pub struct Tpu { fetch_stage: FetchStage, - sig_verifier: SigVerifier, + sigverify_stage: SigVerifyStage, vote_sigverify_stage: SigVerifyStage, banking_stage: BankingStage, forwarding_stage: JoinHandle<()>, cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_stage: BroadcastStage, - tpu_quic_t: Option>, - tpu_forwards_quic_t: Option>, + tpu_quic_t: thread::JoinHandle<()>, + tpu_forwards_quic_t: thread::JoinHandle<()>, tpu_entry_notifier: Option, staked_nodes_updater_service: StakedNodesUpdaterService, tracer_thread_hdl: TracerThread, @@ -167,7 +150,6 @@ impl Tpu { transactions_forwards_quic: transactions_forwards_quic_sockets, vote_quic: tpu_vote_quic_sockets, vote_forwarding_client: vote_forwarding_client_socket, - vortexor_receivers, } = sockets; let (packet_sender, packet_receiver) = unbounded(); @@ -221,75 +203,47 @@ impl Tpu { ) .unwrap(); - let (tpu_quic_t, key_updater) = if vortexor_receivers.is_none() { - // Streamer for TPU - let SpawnServerResult { - endpoints: _, - thread: tpu_quic_t, - key_updater, - } = spawn_server_multi( - "solQuicTpu", - "quic_streamer_tpu", - transactions_quic_sockets, - keypair, - packet_sender, - exit.clone(), - staked_nodes.clone(), - tpu_quic_server_config, - ) - .unwrap(); - (Some(tpu_quic_t), Some(key_updater)) - } else { - (None, None) - }; + // Streamer for TPU + let SpawnServerResult { + endpoints: _, + thread: tpu_quic_t, + key_updater, + } = spawn_server_multi( + "solQuicTpu", + "quic_streamer_tpu", + transactions_quic_sockets, + keypair, + packet_sender, + exit.clone(), + staked_nodes.clone(), + tpu_quic_server_config, + ) + .unwrap(); - let (tpu_forwards_quic_t, forwards_key_updater) = if vortexor_receivers.is_none() { - // Streamer for TPU forward - let SpawnServerResult { - endpoints: _, - thread: tpu_forwards_quic_t, - key_updater: forwards_key_updater, - } = spawn_server_multi( - "solQuicTpuFwd", - "quic_streamer_tpu_forwards", - transactions_forwards_quic_sockets, - keypair, - forwarded_packet_sender, - exit.clone(), - staked_nodes.clone(), - tpu_fwd_quic_server_config, - ) - .unwrap(); - (Some(tpu_forwards_quic_t), Some(forwards_key_updater)) - } else { - (None, None) - }; + // Streamer for TPU forward + let SpawnServerResult { + endpoints: _, + thread: tpu_forwards_quic_t, + key_updater: forwards_key_updater, + } = spawn_server_multi( + "solQuicTpuFwd", + "quic_streamer_tpu_forwards", + transactions_forwards_quic_sockets, + keypair, + forwarded_packet_sender, + exit.clone(), + staked_nodes.clone(), + tpu_fwd_quic_server_config, + ) + .unwrap(); let (forward_stage_sender, forward_stage_receiver) = bounded(1024); - let sig_verifier = if let Some(vortexor_receivers) = vortexor_receivers { - info!("starting vortexor adapter"); - let sockets = vortexor_receivers.into_iter().map(Arc::new).collect(); - let adapter = VortexorReceiverAdapter::new( - sockets, - Duration::from_millis(5), - tpu_coalesce, - non_vote_sender, - enable_block_production_forwarding.then(|| forward_stage_sender.clone()), - exit.clone(), - ); - SigVerifier::Remote(adapter) - } else { - info!("starting regular sigverify stage"); + let sigverify_stage = { let verifier = TransactionSigVerifier::new( non_vote_sender, enable_block_production_forwarding.then(|| forward_stage_sender.clone()), ); - SigVerifier::Local(SigVerifyStage::new( - packet_receiver, - verifier, - "solSigVerTpu", - "tpu-verifier", - )) + SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier") }; let vote_sigverify_stage = { @@ -375,19 +329,14 @@ impl Tpu { ); let mut key_notifiers = key_notifiers.write().unwrap(); - if let Some(key_updater) = key_updater { - key_notifiers.add(KeyUpdaterType::Tpu, key_updater); - } - if let Some(forwards_key_updater) = forwards_key_updater { - key_notifiers.add(KeyUpdaterType::TpuForwards, forwards_key_updater); - } + key_notifiers.add(KeyUpdaterType::Tpu, key_updater); + key_notifiers.add(KeyUpdaterType::TpuForwards, forwards_key_updater); key_notifiers.add(KeyUpdaterType::TpuVote, vote_streamer_key_updater); - key_notifiers.add(KeyUpdaterType::Forward, client_updater); Self { fetch_stage, - sig_verifier, + sigverify_stage, vote_sigverify_stage, banking_stage, forwarding_stage, @@ -405,14 +354,14 @@ impl Tpu { pub fn join(self) -> thread::Result<()> { let results = vec![ self.fetch_stage.join(), - self.sig_verifier.join(), + self.sigverify_stage.join(), self.vote_sigverify_stage.join(), self.cluster_info_vote_listener.join(), self.banking_stage.join(), self.forwarding_stage.join(), self.staked_nodes_updater_service.join(), - self.tpu_quic_t.map_or(Ok(()), |t| t.join()), - self.tpu_forwards_quic_t.map_or(Ok(()), |t| t.join()), + self.tpu_quic_t.join(), + self.tpu_forwards_quic_t.join(), self.tpu_vote_quic_t.join(), ]; let broadcast_result = self.broadcast_stage.join(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 4f4308412eb40f..03a5bccd333255 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1611,7 +1611,6 @@ impl Validator { transactions_forwards_quic: node.sockets.tpu_forwards_quic, vote_quic: node.sockets.tpu_vote_quic, vote_forwarding_client: node.sockets.tpu_vote_forwarding_client, - vortexor_receivers: node.sockets.vortexor_receivers, }, &rpc_subscriptions, transaction_status_sender, diff --git a/core/src/vortexor_receiver_adapter.rs b/core/src/vortexor_receiver_adapter.rs deleted file mode 100644 index d6c769788d6f87..00000000000000 --- a/core/src/vortexor_receiver_adapter.rs +++ /dev/null @@ -1,131 +0,0 @@ -//! Vortexor receiver adapter which wraps the VerifiedPacketReceiver -//! to receive packet batches from the remote and sends the packets to the -//! banking stage. - -use { - crate::banking_trace::TracedSender, - agave_banking_stage_ingress_types::BankingPacketBatch, - crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, - solana_perf::packet::PacketBatch, - solana_vortexor_receiver::receiver::VerifiedPacketReceiver, - std::{ - net::UdpSocket, - sync::{atomic::AtomicBool, Arc}, - thread::{self, Builder, JoinHandle}, - time::{Duration, Instant}, - }, -}; - -#[inline] -fn send(sender: &TracedSender, batch: Arc>, count: usize) -> Result<(), String> { - match sender.send(batch) { - Ok(_) => { - trace!("Sent batch: {count} received from vortexor successfully"); - Ok(()) - } - Err(err) => Err(format!("Failed to send batch {count} down {err:?}")), - } -} - -pub struct VortexorReceiverAdapter { - thread_hdl: JoinHandle<()>, - receiver: VerifiedPacketReceiver, -} - -const MAX_PACKET_BATCH_SIZE: usize = 8; - -impl VortexorReceiverAdapter { - pub fn new( - sockets: Vec>, - recv_timeout: Duration, - tpu_coalesce: Duration, - packets_sender: TracedSender, - forward_stage_sender: Option>, - exit: Arc, - ) -> Self { - let (batch_sender, batch_receiver) = unbounded(); - - let receiver = - VerifiedPacketReceiver::new(sockets, &batch_sender, tpu_coalesce, None, exit.clone()); - - let thread_hdl = Builder::new() - .name("vtxRcvAdptr".to_string()) - .spawn(move || { - if let Err(msg) = Self::recv_send( - batch_receiver, - recv_timeout, - MAX_PACKET_BATCH_SIZE, - packets_sender, - forward_stage_sender, - ) { - info!("Quiting VortexorReceiverAdapter: {msg}"); - } - }) - .unwrap(); - Self { - thread_hdl, - receiver, - } - } - - pub fn join(self) -> thread::Result<()> { - self.thread_hdl.join()?; - self.receiver.join() - } - - fn recv_send( - packet_batch_receiver: Receiver, - recv_timeout: Duration, - batch_size: usize, - traced_sender: TracedSender, - forward_stage_sender: Option>, - ) -> Result<(), String> { - loop { - match Self::receive_until(packet_batch_receiver.clone(), recv_timeout, batch_size) { - Ok(packet_batch) => { - let count = packet_batch.len(); - // Send out packet batches - if let Some(forward_stage_sender) = &forward_stage_sender { - send(&traced_sender, packet_batch.clone(), count)?; - // Send out packet batches to forward stage - let _ = forward_stage_sender - .try_send((packet_batch, false /* reject non-vote */)); - } else { - send(&traced_sender, packet_batch, count)?; - } - } - Err(err) => match err { - RecvTimeoutError::Timeout => { - continue; - } - RecvTimeoutError::Disconnected => { - return Err("Disconnected from the input channel".to_string()); - } - }, - } - } - } - - /// Receives packet batches from VerifiedPacketReceiver with a timeout - fn receive_until( - packet_batch_receiver: Receiver, - recv_timeout: Duration, - batch_size: usize, - ) -> Result { - let start = Instant::now(); - - let message = packet_batch_receiver.recv_timeout(recv_timeout)?; - let mut packet_batches = Vec::new(); - packet_batches.push(message); - - while let Ok(message) = packet_batch_receiver.try_recv() { - packet_batches.push(message); - - if start.elapsed() >= recv_timeout || packet_batches.len() >= batch_size { - break; - } - } - - Ok(Arc::new(packet_batches)) - } -} diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index fa1a7f35ade8d0..26802ebaf9c444 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2329,7 +2329,6 @@ pub struct Sockets { pub quic_vote_client: UdpSocket, /// Client-side socket for RPC/SendTransactionService. pub rpc_sts_client: UdpSocket, - pub vortexor_receivers: Option>, } pub struct NodeConfig { @@ -2338,8 +2337,6 @@ pub struct NodeConfig { pub bind_ip_addr: IpAddr, pub public_tpu_addr: Option, pub public_tpu_forwards_addr: Option, - pub vortexor_receiver_addr: Option, - /// The number of TVU receive sockets to create pub num_tvu_receive_sockets: NonZeroUsize, /// The number of TVU retransmit sockets to create @@ -2501,7 +2498,6 @@ impl Node { tpu_transaction_forwarding_client, quic_vote_client, rpc_sts_client, - vortexor_receivers: None, }, } } @@ -2658,7 +2654,6 @@ impl Node { quic_vote_client, tpu_transaction_forwarding_client, rpc_sts_client, - vortexor_receivers: None, }, } } @@ -2673,7 +2668,6 @@ impl Node { num_tvu_receive_sockets, num_tvu_retransmit_sockets, num_quic_endpoints, - vortexor_receiver_addr, } = config; let (gossip_port, (gossip, ip_echo)) = @@ -2795,23 +2789,6 @@ impl Node { info.set_serve_repair(QUIC, (addr, serve_repair_quic_port)) .unwrap(); - let vortexor_receivers = vortexor_receiver_addr.map(|vortexor_receiver_addr| { - multi_bind_in_range_with_config( - vortexor_receiver_addr.ip(), - ( - vortexor_receiver_addr.port(), - vortexor_receiver_addr.port() + 1, - ), - socket_config_reuseport, - 32, - ) - .unwrap_or_else(|_| { - panic!("Could not bind to the set vortexor_receiver_addr {vortexor_receiver_addr}") - }) - .1 - }); - - info!("vortexor_receivers is {vortexor_receivers:?}"); trace!("new ContactInfo: {:?}", info); let sockets = Sockets { gossip, @@ -2836,7 +2813,6 @@ impl Node { quic_vote_client, tpu_transaction_forwarding_client, rpc_sts_client, - vortexor_receivers, }; info!("Bound all network sockets as follows: {:#?}", &sockets); Node { info, sockets } @@ -3315,7 +3291,6 @@ mod tests { num_tvu_receive_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS, num_tvu_retransmit_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS, num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS, - vortexor_receiver_addr: None, }; let node = Node::new_with_external_ip(&solana_pubkey::new_rand(), config); @@ -3339,7 +3314,6 @@ mod tests { num_tvu_receive_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS, num_tvu_retransmit_sockets: MINIMUM_NUM_TVU_RECEIVE_SOCKETS, num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS, - vortexor_receiver_addr: None, }; let node = Node::new_with_external_ip(&solana_pubkey::new_rand(), config); diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index a67d30919f0a83..540c9c60710a0e 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6147,7 +6147,6 @@ dependencies = [ "solana-unified-scheduler-pool", "solana-validator-exit", "solana-version", - "solana-vortexor-receiver", "solana-vote", "solana-vote-program", "solana-wen-restart", @@ -9690,14 +9689,6 @@ dependencies = [ "solana-serde-varint", ] -[[package]] -name = "solana-vortexor-receiver" -version = "3.0.0" -dependencies = [ - "solana-perf", - "solana-streamer", -] - [[package]] name = "solana-vote" version = "3.0.0" diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 0fda3ebf0d638d..a53b2c8949b3d7 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -5994,7 +5994,6 @@ dependencies = [ "solana-unified-scheduler-pool", "solana-validator-exit", "solana-version", - "solana-vortexor-receiver", "solana-vote", "solana-vote-program", "solana-wen-restart", @@ -8789,14 +8788,6 @@ dependencies = [ "solana-serde-varint", ] -[[package]] -name = "solana-vortexor-receiver" -version = "3.0.0" -dependencies = [ - "solana-perf", - "solana-streamer", -] - [[package]] name = "solana-vote" version = "3.0.0" diff --git a/validator/src/commands/run/args.rs b/validator/src/commands/run/args.rs index 7acb31bbe667c7..720192f02dffe4 100644 --- a/validator/src/commands/run/args.rs +++ b/validator/src/commands/run/args.rs @@ -378,15 +378,6 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a, --entrypoint or localhostwhen --entrypoint is not provided]", ), ) - .arg( - Arg::with_name("tpu_vortexor_receiver_address") - .long("tpu-vortexor-receiver-address") - .value_name("HOST:PORT") - .takes_value(true) - .hidden(hidden_unless_forced()) - .validator(solana_net_utils::is_host_port) - .help("TPU Vortexor Receiver address to which verified transaction packet will be forwarded."), - ) .arg( Arg::with_name("public_rpc_addr") .long("public-rpc-address") diff --git a/validator/src/commands/run/execute.rs b/validator/src/commands/run/execute.rs index d74ccba9f2ef56..bed160b74d1657 100644 --- a/validator/src/commands/run/execute.rs +++ b/validator/src/commands/run/execute.rs @@ -1165,19 +1165,6 @@ pub fn execute( }) .transpose()?; - let tpu_vortexor_receiver_address = - matches - .value_of("tpu_vortexor_receiver_address") - .map(|tpu_vortexor_receiver_address| { - solana_net_utils::parse_host_port(tpu_vortexor_receiver_address).unwrap_or_else( - |err| { - eprintln!("Failed to parse --tpu-vortexor-receiver-address: {err}"); - exit(1); - }, - ) - }); - - info!("tpu_vortexor_receiver_address is {tpu_vortexor_receiver_address:?}"); let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", NonZeroUsize); let tpu_max_connections_per_peer = @@ -1204,7 +1191,6 @@ pub fn execute( num_tvu_receive_sockets: tvu_receive_threads, num_tvu_retransmit_sockets: tvu_retransmit_threads, num_quic_endpoints, - vortexor_receiver_addr: tpu_vortexor_receiver_address, }; let cluster_entrypoints = entrypoint_addrs diff --git a/vortexor-receiver/Cargo.toml b/vortexor-receiver/Cargo.toml deleted file mode 100644 index 372886b745c81c..00000000000000 --- a/vortexor-receiver/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "solana-vortexor-receiver" -description = "Solana TPU Vortexor Receiver" -documentation = "https://docs.rs/solana-vortexor-receiver" -publish = false -version = { workspace = true } -authors = { workspace = true } -repository = { workspace = true } -homepage = { workspace = true } -license = { workspace = true } -edition = { workspace = true } - -[package.metadata.docs.rs] -targets = ["x86_64-unknown-linux-gnu"] - -[lib] -crate-type = ["lib"] -name = "solana_vortexor_receiver" - -[dependencies] -solana-perf = { workspace = true } -solana-streamer = { workspace = true } - -[dev-dependencies] -assert_matches = { workspace = true } -solana-streamer = { workspace = true, features = ["dev-context-only-utils"] } diff --git a/vortexor-receiver/Readme.md b/vortexor-receiver/Readme.md deleted file mode 100644 index 90834f86469ee3..00000000000000 --- a/vortexor-receiver/Readme.md +++ /dev/null @@ -1,60 +0,0 @@ -# Introduction -The Vortexor is a service that can offload the tasks of receiving transactions -from the public, performing signature verifications, and deduplications from the -core validator, enabling it to focus on processing and executing the -transactions. The verified and filtered transactions will then be forwarded to -the validators linked with the Vortexor. This setup makes the TPU transaction -ingestion and verification more scalable compared to a single-node solution. - -This module implements the VerifiedPacketReceiver in the below architecture -which encapsulates the functionality of receiving the verified packet batches -from the vortexor. In the first impelementation, we use UDP to receive the -verified packets from the vortexor. It is designed to support other protocol -option such as using QUIC. - -# Architecture -Figure 1 describes the architecture diagram of the Vortexor and its -relationship with the validator. - - +---------------------+ - | Solana | - | RPC / Web Socket | - | Service | - +---------------------+ - | - v - +--------------------- VORTEXOR ------------------------+ - | | | - | +------------------+ | - | | StakedKeyUpdater | | - | +------------------+ | - | | | - | v | - | +-------------+ +--------------------+ | - TPU --> | | TPU Streamer| -----> | SigVerifier/Dedup | | - /QUIC | +-------------+ +--------------------+ | - | | | | - | v v | - | +----------------+ +------------------------+ | - | | Subscription |<----| VerifiedPacketForwarder| | - | | Management | +------------------------+ | - | +----------------+ | | - +--------------------------------|----------------------+ - ^ | (UDP/QUIC) - Heartbeat/subscriptions | | - | v - +-------------------- AGAVE VALIDATOR ------------------+ - | | - | +----------------+ +-----------------------+ | - Config-> | | Subscription | | VerifiedPacketReceiver| | - Admin RPC | | Management | | | | - | +----------------+ +-----------------------+ | - | | | | - | | v | - | v +-----------+ | - | +--------------------+ | Banking | | - Gossip <--------|--| Gossip/Contact Info| | Stage | | - | +--------------------+ +-----------+ | - +-------------------------------------------------------+ - - Figure 1. \ No newline at end of file diff --git a/vortexor-receiver/src/lib.rs b/vortexor-receiver/src/lib.rs deleted file mode 100644 index 4c0db7eaa35a61..00000000000000 --- a/vortexor-receiver/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod receiver; diff --git a/vortexor-receiver/src/receiver.rs b/vortexor-receiver/src/receiver.rs deleted file mode 100644 index 4a8c3490498591..00000000000000 --- a/vortexor-receiver/src/receiver.rs +++ /dev/null @@ -1,58 +0,0 @@ -/// This is responsible for receiving the verified and deduplicated transactions -/// from the vortexor and sending down to the banking stage. -use { - solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}, - solana_streamer::streamer::{self, PacketBatchSender, StreamerReceiveStats}, - std::{ - net::UdpSocket, - sync::{atomic::AtomicBool, Arc}, - thread::{self, JoinHandle}, - time::Duration, - }, -}; - -pub struct VerifiedPacketReceiver { - thread_hdls: Vec>, -} - -impl VerifiedPacketReceiver { - pub fn new( - sockets: Vec>, - sender: &PacketBatchSender, - coalesce: Duration, - in_vote_only_mode: Option>, - exit: Arc, - ) -> Self { - let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024); - - let tpu_stats = Arc::new(StreamerReceiveStats::new("vortexor_receiver")); - - let thread_hdls = sockets - .into_iter() - .enumerate() - .map(|(i, socket)| { - streamer::receiver( - format!("solVtxRcvr{i:02}"), - socket, - exit.clone(), - sender.clone(), - recycler.clone(), - tpu_stats.clone(), - Some(coalesce), - true, - in_vote_only_mode.clone(), - false, // is_staked_service - ) - }) - .collect(); - - Self { thread_hdls } - } - - pub fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; - } - Ok(()) - } -} diff --git a/vortexor/src/sender.rs b/vortexor/src/sender.rs index 89812ee8b7607d..c5526cce311fea 100644 --- a/vortexor/src/sender.rs +++ b/vortexor/src/sender.rs @@ -83,9 +83,7 @@ impl PacketBatchSender { for batch in &packet_batches { for packet_batch in batch.iter() { for packet in packet_batch { - if let Some(data) = packet.data(0..) { - packets.push(data); - } + packets.push(packet.data(0..).unwrap()); } } }