Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ members = [
"validator",
"version",
"vortexor",
"vortexor-receiver",
"vote",
"watchtower",
"wen-restart",
Expand Down Expand Up @@ -554,7 +553,6 @@ solana-unified-scheduler-logic = { path = "unified-scheduler-logic", version = "
solana-unified-scheduler-pool = { path = "unified-scheduler-pool", version = "=3.0.0" }
solana-validator-exit = "2.2.1"
solana-version = { path = "version", version = "=3.0.0" }
solana-vortexor-receiver = { path = "vortexor-receiver", version = "=3.0.0" }
solana-vote = { path = "vote", version = "=3.0.0" }
solana-vote-interface = "2.2.5"
solana-vote-program = { path = "programs/vote", version = "=3.0.0", default-features = false }
Expand Down
1 change: 0 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ solana-turbine = { workspace = true }
solana-unified-scheduler-pool = { workspace = true }
solana-validator-exit = { workspace = true }
solana-version = { workspace = true }
solana-vortexor-receiver = { workspace = true }
solana-vote = { workspace = true }
solana-vote-program = { workspace = true }
solana-wen-restart = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ mod tpu_entry_notifier;
pub mod tvu;
pub mod unfrozen_gossip_verified_vote_hashes;
pub mod validator;
mod vortexor_receiver_adapter;
pub mod vote_simulator;
pub mod voting_service;
pub mod warm_quic_cache_service;
Expand Down
137 changes: 43 additions & 94 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use {
staked_nodes_updater_service::StakedNodesUpdaterService,
tpu_entry_notifier::TpuEntryNotifier,
validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure},
vortexor_receiver_adapter::VortexorReceiverAdapter,
},
bytes::Bytes,
crossbeam_channel::{bounded, unbounded, Receiver},
Expand Down Expand Up @@ -80,34 +79,18 @@ pub struct TpuSockets {
pub vote_quic: Vec<UdpSocket>,
/// Client-side socket for the forwarding votes.
pub vote_forwarding_client: UdpSocket,
pub vortexor_receivers: Option<Vec<UdpSocket>>,
}

/// The `SigVerifier` enum is used to determine whether to use a local or remote signature verifier.
enum SigVerifier {
Local(SigVerifyStage),
Remote(VortexorReceiverAdapter),
}

impl SigVerifier {
fn join(self) -> thread::Result<()> {
match self {
SigVerifier::Local(sig_verify_stage) => sig_verify_stage.join(),
SigVerifier::Remote(vortexor_receiver_adapter) => vortexor_receiver_adapter.join(),
}
}
}

pub struct Tpu {
fetch_stage: FetchStage,
sig_verifier: SigVerifier,
sigverify_stage: SigVerifyStage,
vote_sigverify_stage: SigVerifyStage,
banking_stage: BankingStage,
forwarding_stage: JoinHandle<()>,
cluster_info_vote_listener: ClusterInfoVoteListener,
broadcast_stage: BroadcastStage,
tpu_quic_t: Option<thread::JoinHandle<()>>,
tpu_forwards_quic_t: Option<thread::JoinHandle<()>>,
tpu_quic_t: thread::JoinHandle<()>,
tpu_forwards_quic_t: thread::JoinHandle<()>,
tpu_entry_notifier: Option<TpuEntryNotifier>,
staked_nodes_updater_service: StakedNodesUpdaterService,
tracer_thread_hdl: TracerThread,
Expand Down Expand Up @@ -167,7 +150,6 @@ impl Tpu {
transactions_forwards_quic: transactions_forwards_quic_sockets,
vote_quic: tpu_vote_quic_sockets,
vote_forwarding_client: vote_forwarding_client_socket,
vortexor_receivers,
} = sockets;

let (packet_sender, packet_receiver) = unbounded();
Expand Down Expand Up @@ -221,75 +203,47 @@ impl Tpu {
)
.unwrap();

let (tpu_quic_t, key_updater) = if vortexor_receivers.is_none() {
// Streamer for TPU
let SpawnServerResult {
endpoints: _,
thread: tpu_quic_t,
key_updater,
} = spawn_server_multi(
"solQuicTpu",
"quic_streamer_tpu",
transactions_quic_sockets,
keypair,
packet_sender,
exit.clone(),
staked_nodes.clone(),
tpu_quic_server_config,
)
.unwrap();
(Some(tpu_quic_t), Some(key_updater))
} else {
(None, None)
};
// Streamer for TPU
let SpawnServerResult {
endpoints: _,
thread: tpu_quic_t,
key_updater,
} = spawn_server_multi(
"solQuicTpu",
"quic_streamer_tpu",
transactions_quic_sockets,
keypair,
packet_sender,
exit.clone(),
staked_nodes.clone(),
tpu_quic_server_config,
)
.unwrap();

let (tpu_forwards_quic_t, forwards_key_updater) = if vortexor_receivers.is_none() {
// Streamer for TPU forward
let SpawnServerResult {
endpoints: _,
thread: tpu_forwards_quic_t,
key_updater: forwards_key_updater,
} = spawn_server_multi(
"solQuicTpuFwd",
"quic_streamer_tpu_forwards",
transactions_forwards_quic_sockets,
keypair,
forwarded_packet_sender,
exit.clone(),
staked_nodes.clone(),
tpu_fwd_quic_server_config,
)
.unwrap();
(Some(tpu_forwards_quic_t), Some(forwards_key_updater))
} else {
(None, None)
};
// Streamer for TPU forward
let SpawnServerResult {
endpoints: _,
thread: tpu_forwards_quic_t,
key_updater: forwards_key_updater,
} = spawn_server_multi(
"solQuicTpuFwd",
"quic_streamer_tpu_forwards",
transactions_forwards_quic_sockets,
keypair,
forwarded_packet_sender,
exit.clone(),
staked_nodes.clone(),
tpu_fwd_quic_server_config,
)
.unwrap();

let (forward_stage_sender, forward_stage_receiver) = bounded(1024);
let sig_verifier = if let Some(vortexor_receivers) = vortexor_receivers {
info!("starting vortexor adapter");
let sockets = vortexor_receivers.into_iter().map(Arc::new).collect();
let adapter = VortexorReceiverAdapter::new(
sockets,
Duration::from_millis(5),
tpu_coalesce,
non_vote_sender,
enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
exit.clone(),
);
SigVerifier::Remote(adapter)
} else {
info!("starting regular sigverify stage");
let sigverify_stage = {
let verifier = TransactionSigVerifier::new(
non_vote_sender,
enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
);
SigVerifier::Local(SigVerifyStage::new(
packet_receiver,
verifier,
"solSigVerTpu",
"tpu-verifier",
))
SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier")
};

let vote_sigverify_stage = {
Expand Down Expand Up @@ -375,19 +329,14 @@ impl Tpu {
);

let mut key_notifiers = key_notifiers.write().unwrap();
if let Some(key_updater) = key_updater {
key_notifiers.add(KeyUpdaterType::Tpu, key_updater);
}
if let Some(forwards_key_updater) = forwards_key_updater {
key_notifiers.add(KeyUpdaterType::TpuForwards, forwards_key_updater);
}
key_notifiers.add(KeyUpdaterType::Tpu, key_updater);
key_notifiers.add(KeyUpdaterType::TpuForwards, forwards_key_updater);
key_notifiers.add(KeyUpdaterType::TpuVote, vote_streamer_key_updater);

key_notifiers.add(KeyUpdaterType::Forward, client_updater);

Self {
fetch_stage,
sig_verifier,
sigverify_stage,
vote_sigverify_stage,
banking_stage,
forwarding_stage,
Expand All @@ -405,14 +354,14 @@ impl Tpu {
pub fn join(self) -> thread::Result<()> {
let results = vec![
self.fetch_stage.join(),
self.sig_verifier.join(),
self.sigverify_stage.join(),
self.vote_sigverify_stage.join(),
self.cluster_info_vote_listener.join(),
self.banking_stage.join(),
self.forwarding_stage.join(),
self.staked_nodes_updater_service.join(),
self.tpu_quic_t.map_or(Ok(()), |t| t.join()),
self.tpu_forwards_quic_t.map_or(Ok(()), |t| t.join()),
self.tpu_quic_t.join(),
self.tpu_forwards_quic_t.join(),
self.tpu_vote_quic_t.join(),
];
let broadcast_result = self.broadcast_stage.join();
Expand Down
1 change: 0 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1611,7 +1611,6 @@ impl Validator {
transactions_forwards_quic: node.sockets.tpu_forwards_quic,
vote_quic: node.sockets.tpu_vote_quic,
vote_forwarding_client: node.sockets.tpu_vote_forwarding_client,
vortexor_receivers: node.sockets.vortexor_receivers,
},
&rpc_subscriptions,
transaction_status_sender,
Expand Down
Loading
Loading