Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ members = [
"unified-scheduler-pool",
"upload-perf",
"validator",
"verified-packet-receiver",
"version",
"vortexor",
"vote",
Expand Down Expand Up @@ -185,6 +186,7 @@ agave-precompiles = { path = "precompiles", version = "=3.0.0" }
agave-reserved-account-keys = { path = "reserved-account-keys", version = "=3.0.0" }
agave-thread-manager = { path = "thread-manager", version = "=3.0.0" }
agave-transaction-view = { path = "transaction-view", version = "=3.0.0" }
agave-verified-packet-receiver = { path = "verified-packet-receiver", version = "=3.0.0" }
agave-xdp = { path = "xdp", version = "=3.0.0" }
ahash = "0.8.11"
anyhow = "1.0.98"
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ frozen-abi = [
agave-banking-stage-ingress-types = { workspace = true }
agave-feature-set = { workspace = true }
agave-transaction-view = { workspace = true }
agave-verified-packet-receiver = { workspace = true }
ahash = { workspace = true }
anyhow = { workspace = true }
arrayvec = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod tpu_entry_notifier;
pub mod tvu;
pub mod unfrozen_gossip_verified_vote_hashes;
pub mod validator;
mod vortexor_receiver_adapter;
pub mod vote_simulator;
pub mod voting_service;
pub mod warm_quic_cache_service;
Expand Down
137 changes: 94 additions & 43 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use {
staked_nodes_updater_service::StakedNodesUpdaterService,
tpu_entry_notifier::TpuEntryNotifier,
validator::{BlockProductionMethod, GeneratorConfig, TransactionStructure},
vortexor_receiver_adapter::VortexorReceiverAdapter,
},
bytes::Bytes,
crossbeam_channel::{bounded, unbounded, Receiver},
Expand Down Expand Up @@ -79,18 +80,34 @@ pub struct TpuSockets {
pub vote_quic: Vec<UdpSocket>,
/// Client-side socket for the forwarding votes.
pub vote_forwarding_client: UdpSocket,
pub vortexor_receivers: Option<Vec<UdpSocket>>,
}

/// The `SigVerifier` enum is used to determine whether to use a local or remote signature verifier.
enum SigVerifier {
Local(SigVerifyStage),
Remote(VortexorReceiverAdapter),
}

impl SigVerifier {
fn join(self) -> thread::Result<()> {
match self {
SigVerifier::Local(sig_verify_stage) => sig_verify_stage.join(),
SigVerifier::Remote(vortexor_receiver_adapter) => vortexor_receiver_adapter.join(),
}
}
}

pub struct Tpu {
fetch_stage: FetchStage,
sigverify_stage: SigVerifyStage,
sig_verifier: SigVerifier,
vote_sigverify_stage: SigVerifyStage,
banking_stage: BankingStage,
forwarding_stage: JoinHandle<()>,
cluster_info_vote_listener: ClusterInfoVoteListener,
broadcast_stage: BroadcastStage,
tpu_quic_t: thread::JoinHandle<()>,
tpu_forwards_quic_t: thread::JoinHandle<()>,
tpu_quic_t: Option<thread::JoinHandle<()>>,
tpu_forwards_quic_t: Option<thread::JoinHandle<()>>,
tpu_entry_notifier: Option<TpuEntryNotifier>,
staked_nodes_updater_service: StakedNodesUpdaterService,
tracer_thread_hdl: TracerThread,
Expand Down Expand Up @@ -150,6 +167,7 @@ impl Tpu {
transactions_forwards_quic: transactions_forwards_quic_sockets,
vote_quic: tpu_vote_quic_sockets,
vote_forwarding_client: vote_forwarding_client_socket,
vortexor_receivers,
} = sockets;

let (packet_sender, packet_receiver) = unbounded();
Expand Down Expand Up @@ -203,47 +221,75 @@ impl Tpu {
)
.unwrap();

// Streamer for TPU
let SpawnServerResult {
endpoints: _,
thread: tpu_quic_t,
key_updater,
} = spawn_server_multi(
"solQuicTpu",
"quic_streamer_tpu",
transactions_quic_sockets,
keypair,
packet_sender,
exit.clone(),
staked_nodes.clone(),
tpu_quic_server_config,
)
.unwrap();
let (tpu_quic_t, key_updater) = if vortexor_receivers.is_none() {
// Streamer for TPU
let SpawnServerResult {
endpoints: _,
thread: tpu_quic_t,
key_updater,
} = spawn_server_multi(
"solQuicTpu",
"quic_streamer_tpu",
transactions_quic_sockets,
keypair,
packet_sender,
exit.clone(),
staked_nodes.clone(),
tpu_quic_server_config,
)
.unwrap();
(Some(tpu_quic_t), Some(key_updater))
} else {
(None, None)
};

// Streamer for TPU forward
let SpawnServerResult {
endpoints: _,
thread: tpu_forwards_quic_t,
key_updater: forwards_key_updater,
} = spawn_server_multi(
"solQuicTpuFwd",
"quic_streamer_tpu_forwards",
transactions_forwards_quic_sockets,
keypair,
forwarded_packet_sender,
exit.clone(),
staked_nodes.clone(),
tpu_fwd_quic_server_config,
)
.unwrap();
let (tpu_forwards_quic_t, forwards_key_updater) = if vortexor_receivers.is_none() {
// Streamer for TPU forward
let SpawnServerResult {
endpoints: _,
thread: tpu_forwards_quic_t,
key_updater: forwards_key_updater,
} = spawn_server_multi(
"solQuicTpuFwd",
"quic_streamer_tpu_forwards",
transactions_forwards_quic_sockets,
keypair,
forwarded_packet_sender,
exit.clone(),
staked_nodes.clone(),
tpu_fwd_quic_server_config,
)
.unwrap();
(Some(tpu_forwards_quic_t), Some(forwards_key_updater))
} else {
(None, None)
};

let (forward_stage_sender, forward_stage_receiver) = bounded(1024);
let sigverify_stage = {
let sig_verifier = if let Some(vortexor_receivers) = vortexor_receivers {
info!("starting vortexor adapter");
let sockets = vortexor_receivers.into_iter().map(Arc::new).collect();
let adapter = VortexorReceiverAdapter::new(
sockets,
Duration::from_millis(5),
tpu_coalesce,
non_vote_sender,
enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
exit.clone(),
);
SigVerifier::Remote(adapter)
} else {
info!("starting regular sigverify stage");
let verifier = TransactionSigVerifier::new(
non_vote_sender,
enable_block_production_forwarding.then(|| forward_stage_sender.clone()),
);
SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier")
SigVerifier::Local(SigVerifyStage::new(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm, if we're using vortexor then we don't create a local sigverifier and thus have no way to ingest transactions should vortexor connection get interrupted right ?

I guess vortexor is opt-in as of now so this is possibly fine

Copy link
Copy Markdown
Author

@lijunwangs lijunwangs Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That is the current behavior. In the heartbeat support which I am going to submit PR for after this, we will monitor traffic and heartbeat from the vortexor. And if we decide the vortexor is down or disconnected, it can auto switch back to the local sig verifier.

packet_receiver,
verifier,
"solSigVerTpu",
"tpu-verifier",
))
};

let vote_sigverify_stage = {
Expand Down Expand Up @@ -329,14 +375,19 @@ impl Tpu {
);

let mut key_notifiers = key_notifiers.write().unwrap();
key_notifiers.add(KeyUpdaterType::Tpu, key_updater);
key_notifiers.add(KeyUpdaterType::TpuForwards, forwards_key_updater);
if let Some(key_updater) = key_updater {
key_notifiers.add(KeyUpdaterType::Tpu, key_updater);
}
if let Some(forwards_key_updater) = forwards_key_updater {
key_notifiers.add(KeyUpdaterType::TpuForwards, forwards_key_updater);
}
key_notifiers.add(KeyUpdaterType::TpuVote, vote_streamer_key_updater);

key_notifiers.add(KeyUpdaterType::Forward, client_updater);

Self {
fetch_stage,
sigverify_stage,
sig_verifier,
vote_sigverify_stage,
banking_stage,
forwarding_stage,
Expand All @@ -354,14 +405,14 @@ impl Tpu {
pub fn join(self) -> thread::Result<()> {
let results = vec![
self.fetch_stage.join(),
self.sigverify_stage.join(),
self.sig_verifier.join(),
self.vote_sigverify_stage.join(),
self.cluster_info_vote_listener.join(),
self.banking_stage.join(),
self.forwarding_stage.join(),
self.staked_nodes_updater_service.join(),
self.tpu_quic_t.join(),
self.tpu_forwards_quic_t.join(),
self.tpu_quic_t.map_or(Ok(()), |t| t.join()),
self.tpu_forwards_quic_t.map_or(Ok(()), |t| t.join()),
self.tpu_vote_quic_t.join(),
];
let broadcast_result = self.broadcast_stage.join();
Expand Down
1 change: 1 addition & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1611,6 +1611,7 @@ impl Validator {
transactions_forwards_quic: node.sockets.tpu_forwards_quic,
vote_quic: node.sockets.tpu_vote_quic,
vote_forwarding_client: node.sockets.tpu_vote_forwarding_client,
vortexor_receivers: node.sockets.vortexor_receivers,
},
&rpc_subscriptions,
transaction_status_sender,
Expand Down
Loading
Loading