Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ use {
log::*,
solana_clock::{Slot, DEFAULT_MS_PER_SLOT, HOLD_TRANSACTIONS_SLOT_OFFSET},
solana_genesis_config::GenesisConfig,
solana_gossip::{
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfoQuery,
},
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfoQuery, node::Node},
solana_keypair::Keypair,
solana_ledger::{
blockstore::{Blockstore, PurgeType},
Expand Down
2 changes: 1 addition & 1 deletion core/src/cluster_slots_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl ClusterSlotsService {
mod test {
use {
super::*,
solana_gossip::{cluster_info::Node, crds_data::LowestSlot},
solana_gossip::{crds_data::LowestSlot, node::Node},
solana_keypair::Keypair,
solana_signer::Signer,
solana_streamer::socket::SocketAddrSpace,
Expand Down
3 changes: 2 additions & 1 deletion core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -914,8 +914,9 @@ mod test {
vote_simulator::VoteSimulator,
},
solana_gossip::{
cluster_info::{ClusterInfo, Node},
cluster_info::ClusterInfo,
contact_info::{ContactInfo, Protocol},
node::Node,
},
solana_hash::Hash,
solana_keypair::Keypair,
Expand Down
2 changes: 1 addition & 1 deletion core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ mod test {
use {
super::*,
crate::repair::quic_endpoint::RemoteRequest,
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
solana_gossip::{contact_info::ContactInfo, node::Node},
solana_keypair::Keypair,
solana_ledger::{
blockstore::{
Expand Down
2 changes: 1 addition & 1 deletion core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4332,7 +4332,7 @@ pub(crate) mod tests {
solana_clock::NUM_CONSECUTIVE_LEADER_SLOTS,
solana_entry::entry::{self, Entry},
solana_genesis_config as genesis_config,
solana_gossip::{cluster_info::Node, crds::Cursor},
solana_gossip::{crds::Cursor, node::Node},
solana_hash::Hash,
solana_instruction::error::InstructionError,
solana_keypair::Keypair,
Expand Down
2 changes: 1 addition & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ pub mod tests {
repair::quic_endpoint::RepairQuicAsyncSenders,
},
serial_test::serial,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_gossip::{cluster_info::ClusterInfo, node::Node},
solana_keypair::Keypair,
solana_ledger::{
blockstore::BlockstoreSignals,
Expand Down
3 changes: 2 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ use {
},
solana_gossip::{
cluster_info::{
ClusterInfo, Node, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
ClusterInfo, DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
DEFAULT_CONTACT_SAVE_INTERVAL_MILLIS,
},
contact_info::ContactInfo,
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
gossip_service::GossipService,
node::Node,
},
solana_hard_forks::HardForks,
solana_hash::Hash,
Expand Down
275 changes: 6 additions & 269 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,14 @@ use {
solana_keypair::{signable::Signable, Keypair},
solana_ledger::shred::Shred,
solana_net_utils::{
bind_in_range, bind_to_unspecified, find_available_ports_in_range,
sockets::{
bind_gossip_port_in_range, bind_in_range_with_config, bind_more_with_config,
bind_to_with_config, bind_two_in_range_with_offset_and_config,
localhost_port_range_for_tests, multi_bind_in_range_with_config,
SocketConfiguration as SocketConfig,
},
PortRange, VALIDATOR_PORT_RANGE,
bind_in_range, bind_to_unspecified, sockets::bind_gossip_port_in_range, PortRange,
VALIDATOR_PORT_RANGE,
},
solana_perf::{
data_budget::DataBudget,
packet::{Packet, PacketBatch, PacketBatchRecycler, PacketRef, PinnedPacketBatch},
},
solana_pubkey::Pubkey,
solana_quic_definitions::QUIC_PORT_OFFSET,
solana_rayon_threadlimit::get_thread_count,
solana_runtime::bank_forks::BankForks,
solana_sanitize::Sanitize,
Expand All @@ -75,7 +68,6 @@ use {
solana_streamer::{
atomic_udp_socket::AtomicUdpSocket,
packet,
quic::DEFAULT_QUIC_ENDPOINTS,
socket::SocketAddrSpace,
streamer::{ChannelSend, PacketBatchReceiver},
},
Expand All @@ -90,7 +82,7 @@ use {
io::{BufReader, BufWriter, Write},
iter::repeat,
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
num::{NonZero, NonZeroUsize},
num::NonZeroUsize,
ops::{Deref, Div},
path::{Path, PathBuf},
rc::Rc,
Expand Down Expand Up @@ -2429,264 +2421,6 @@ impl AsRef<[IpAddr]> for BindIpAddrs {
}
}

#[derive(Debug)]
pub struct Node {
pub info: ContactInfo,
pub sockets: Sockets,
}

impl Node {
/// create localhost node for tests
pub fn new_localhost() -> Self {
let pubkey = solana_pubkey::new_rand();
Self::new_localhost_with_pubkey(&pubkey)
}

/// create localhost node for tests with provided pubkey
/// unlike the [new_with_external_ip], this will also bind RPC sockets.
pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self {
let port_range = localhost_port_range_for_tests();
let bind_ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
let config = NodeConfig {
bind_ip_addrs: BindIpAddrs {
addrs: vec![bind_ip_addr],
},
gossip_port: port_range.0,
port_range,
advertised_ip: bind_ip_addr,
public_tpu_addr: None,
public_tpu_forwards_addr: None,
num_tvu_receive_sockets: NonZero::new(1).unwrap(),
num_tvu_retransmit_sockets: NonZero::new(1).unwrap(),
num_quic_endpoints: NonZero::new(DEFAULT_QUIC_ENDPOINTS)
.expect("Number of QUIC endpoints can not be zero"),
vortexor_receiver_addr: None,
};
let mut node = Self::new_with_external_ip(pubkey, config);
let rpc_ports: [u16; 2] = find_available_ports_in_range(bind_ip_addr, port_range).unwrap();
let rpc_addr = SocketAddr::new(bind_ip_addr, rpc_ports[0]);
let rpc_pubsub_addr = SocketAddr::new(bind_ip_addr, rpc_ports[1]);
node.info.set_rpc(rpc_addr).unwrap();
node.info.set_rpc_pubsub(rpc_pubsub_addr).unwrap();
node
}

#[deprecated(since = "3.0.0", note = "use new_with_external_ip")]
pub fn new_single_bind(
pubkey: &Pubkey,
gossip_addr: &SocketAddr,
port_range: PortRange,
bind_ip_addr: IpAddr,
) -> Self {
let config = NodeConfig {
bind_ip_addrs: BindIpAddrs {
addrs: vec![bind_ip_addr],
},
gossip_port: gossip_addr.port(),
port_range,
advertised_ip: bind_ip_addr,
public_tpu_addr: None,
public_tpu_forwards_addr: None,
num_tvu_receive_sockets: NonZero::new(1).unwrap(),
num_tvu_retransmit_sockets: NonZero::new(1).unwrap(),
num_quic_endpoints: NonZero::new(DEFAULT_QUIC_ENDPOINTS)
.expect("Number of QUIC endpoints can not be zero"),
vortexor_receiver_addr: None,
};
let mut node = Self::new_with_external_ip(pubkey, config);
let rpc_ports: [u16; 2] = find_available_ports_in_range(bind_ip_addr, port_range).unwrap();
let rpc_addr = SocketAddr::new(bind_ip_addr, rpc_ports[0]);
let rpc_pubsub_addr = SocketAddr::new(bind_ip_addr, rpc_ports[1]);
node.info.set_rpc(rpc_addr).unwrap();
node.info.set_rpc_pubsub(rpc_pubsub_addr).unwrap();
node
}

pub fn new_with_external_ip(pubkey: &Pubkey, config: NodeConfig) -> Node {
let NodeConfig {
advertised_ip,
gossip_port,
port_range,
bind_ip_addrs,
public_tpu_addr,
public_tpu_forwards_addr,
num_tvu_receive_sockets,
num_tvu_retransmit_sockets,
num_quic_endpoints,
vortexor_receiver_addr,
} = config;
let bind_ip_addr = bind_ip_addrs.primary();

let gossip_addr = SocketAddr::new(advertised_ip, gossip_port);
let (gossip_port, (gossip, ip_echo)) =
bind_gossip_port_in_range(&gossip_addr, port_range, bind_ip_addr);

let socket_config = SocketConfig::default();

let (tvu_port, tvu_sockets) = multi_bind_in_range_with_config(
bind_ip_addr,
port_range,
socket_config,
num_tvu_receive_sockets.get(),
)
.expect("tvu multi_bind");
let (tvu_quic_port, tvu_quic) =
bind_in_range_with_config(bind_ip_addr, port_range, socket_config)
.expect("tvu_quic bind");

let ((tpu_port, tpu_socket), (_tpu_port_quic, tpu_quic)) =
bind_two_in_range_with_offset_and_config(
bind_ip_addr,
port_range,
QUIC_PORT_OFFSET,
socket_config,
socket_config,
)
.expect("tpu_socket primary bind");
let tpu_sockets =
bind_more_with_config(tpu_socket, 32, socket_config).expect("tpu_sockets multi_bind");

let tpu_quic = bind_more_with_config(tpu_quic, num_quic_endpoints.get(), socket_config)
.expect("tpu_quic bind");

let ((tpu_forwards_port, tpu_forwards_socket), (_, tpu_forwards_quic)) =
bind_two_in_range_with_offset_and_config(
bind_ip_addr,
port_range,
QUIC_PORT_OFFSET,
socket_config,
socket_config,
)
.expect("tpu_forwards primary bind");
let tpu_forwards_sockets = bind_more_with_config(tpu_forwards_socket, 8, socket_config)
.expect("tpu_forwards multi_bind");
let tpu_forwards_quic =
bind_more_with_config(tpu_forwards_quic, num_quic_endpoints.get(), socket_config)
.expect("tpu_forwards_quic multi_bind");

let (tpu_vote_port, tpu_vote_sockets) =
multi_bind_in_range_with_config(bind_ip_addr, port_range, socket_config, 1)
.expect("tpu_vote multi_bind");

let (tpu_vote_quic_port, tpu_vote_quic) =
bind_in_range_with_config(bind_ip_addr, port_range, socket_config)
.expect("tpu_vote_quic");
let tpu_vote_quic =
bind_more_with_config(tpu_vote_quic, num_quic_endpoints.get(), socket_config)
.expect("tpu_vote_quic multi_bind");

let (_, retransmit_sockets) = multi_bind_in_range_with_config(
bind_ip_addr,
port_range,
socket_config,
num_tvu_retransmit_sockets.get(),
)
.expect("retransmit multi_bind");

let (_, repair) = bind_in_range_with_config(bind_ip_addr, port_range, socket_config)
.expect("repair bind");
let (_, repair_quic) = bind_in_range_with_config(bind_ip_addr, port_range, socket_config)
.expect("repair_quic bind");

let (serve_repair_port, serve_repair) =
bind_in_range_with_config(bind_ip_addr, port_range, socket_config)
.expect("serve_repair");
let (serve_repair_quic_port, serve_repair_quic) =
bind_in_range_with_config(bind_ip_addr, port_range, socket_config)
.expect("serve_repair_quic");

let (_, broadcast) =
multi_bind_in_range_with_config(bind_ip_addr, port_range, socket_config, 4)
.expect("broadcast multi_bind");

let (_, ancestor_hashes_requests) =
bind_in_range_with_config(bind_ip_addr, port_range, socket_config)
.expect("ancestor_hashes_requests bind");
let (_, ancestor_hashes_requests_quic) =
bind_in_range_with_config(bind_ip_addr, port_range, socket_config)
.expect("ancestor_hashes_requests QUIC bind should succeed");

// These are client sockets, so the port is set to be 0 because it must be ephimeral.
let tpu_vote_forwarding_client =
bind_to_with_config(bind_ip_addr, 0, socket_config).unwrap();
let tpu_transaction_forwarding_client =
bind_to_with_config(bind_ip_addr, 0, socket_config).unwrap();
let quic_vote_client = bind_to_with_config(bind_ip_addr, 0, socket_config).unwrap();
let rpc_sts_client = bind_to_with_config(bind_ip_addr, 0, socket_config).unwrap();

let mut info = ContactInfo::new(
*pubkey,
timestamp(), // wallclock
0u16, // shred_version
);
use contact_info::Protocol::{QUIC, UDP};
info.set_gossip((advertised_ip, gossip_port)).unwrap();
info.set_tvu(UDP, (advertised_ip, tvu_port)).unwrap();
info.set_tvu(QUIC, (advertised_ip, tvu_quic_port)).unwrap();
info.set_tpu(public_tpu_addr.unwrap_or_else(|| SocketAddr::new(advertised_ip, tpu_port)))
.unwrap();
info.set_tpu_forwards(
public_tpu_forwards_addr
.unwrap_or_else(|| SocketAddr::new(advertised_ip, tpu_forwards_port)),
)
.unwrap();
info.set_tpu_vote(UDP, (advertised_ip, tpu_vote_port))
.unwrap();
info.set_tpu_vote(QUIC, (advertised_ip, tpu_vote_quic_port))
.unwrap();
info.set_serve_repair(UDP, (advertised_ip, serve_repair_port))
.unwrap();
info.set_serve_repair(QUIC, (advertised_ip, serve_repair_quic_port))
.unwrap();

let vortexor_receivers = vortexor_receiver_addr.map(|vortexor_receiver_addr| {
multi_bind_in_range_with_config(
vortexor_receiver_addr.ip(),
(
vortexor_receiver_addr.port(),
vortexor_receiver_addr.port() + 1,
),
socket_config,
32,
)
.unwrap_or_else(|_| {
panic!("Could not bind to the set vortexor_receiver_addr {vortexor_receiver_addr}")
})
.1
});

info!("vortexor_receivers is {vortexor_receivers:?}");
trace!("new ContactInfo: {info:?}");
let sockets = Sockets {
gossip: AtomicUdpSocket::new(gossip),
tvu: tvu_sockets,
tvu_quic,
tpu: tpu_sockets,
tpu_forwards: tpu_forwards_sockets,
tpu_vote: tpu_vote_sockets,
broadcast,
repair,
repair_quic,
retransmit_sockets,
serve_repair,
serve_repair_quic,
ip_echo: Some(ip_echo),
ancestor_hashes_requests,
ancestor_hashes_requests_quic,
tpu_quic,
tpu_forwards_quic,
tpu_vote_quic,
tpu_vote_forwarding_client,
quic_vote_client,
tpu_transaction_forwarding_client,
rpc_sts_client,
vortexor_receivers,
};
info!("Bound all network sockets as follows: {:#?}", &sockets);
Node { info, sockets }
}
}

pub fn push_messages_to_peer_for_tests(
messages: Vec<CrdsValue>,
self_id: Pubkey,
Expand Down Expand Up @@ -2869,14 +2603,17 @@ mod tests {
crds_gossip_pull::tests::MIN_NUM_BLOOM_FILTERS,
crds_value::{CrdsValue, CrdsValueLabel},
duplicate_shred::tests::new_rand_shred,
node::Node,
protocol::tests::new_rand_remote_node,
socketaddr,
},
bincode::serialize,
itertools::izip,
solana_keypair::Keypair,
solana_ledger::shred::Shredder,
solana_net_utils::sockets::localhost_port_range_for_tests,
solana_signer::Signer,
solana_streamer::quic::DEFAULT_QUIC_ENDPOINTS,
solana_vote_program::{
vote_instruction,
vote_state::{Vote, MAX_LOCKOUT_HISTORY},
Expand Down
Loading
Loading