Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl RepairService {
blockstore: Arc<Blockstore>,
exit: Arc<AtomicBool>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
repair_info: RepairInfo,
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
Expand All @@ -225,11 +226,10 @@ impl RepairService {
.unwrap()
};

let ancestor_hashes_request_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap());
let ancestor_hashes_service = AncestorHashesService::new(
exit,
blockstore,
ancestor_hashes_request_socket,
ancestor_hashes_socket,
repair_info,
ancestor_hashes_replay_update_receiver,
);
Expand Down
2 changes: 2 additions & 0 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ impl RetransmitStage {
cluster_info: Arc<ClusterInfo>,
retransmit_sockets: Arc<Vec<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
verified_receiver: Receiver<Vec<PacketBatch>>,
exit: Arc<AtomicBool>,
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
Expand Down Expand Up @@ -486,6 +487,7 @@ impl RetransmitStage {
verified_receiver,
retransmit_sender,
repair_socket,
ancestor_hashes_socket,
exit,
repair_info,
leader_schedule_cache,
Expand Down
5 changes: 5 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub struct Sockets {
pub repair: UdpSocket,
pub retransmit: Vec<UdpSocket>,
pub forwards: Vec<UdpSocket>,
pub ancestor_hashes_requests: UdpSocket,
}

#[derive(Default)]
Expand Down Expand Up @@ -148,11 +149,13 @@ impl Tvu {
fetch: fetch_sockets,
retransmit: retransmit_sockets,
forwards: tvu_forward_sockets,
ancestor_hashes_requests: ancestor_hashes_socket,
} = sockets;

let (fetch_sender, fetch_receiver) = channel();

let repair_socket = Arc::new(repair_socket);
let ancestor_hashes_socket = Arc::new(ancestor_hashes_socket);
let fetch_sockets: Vec<Arc<UdpSocket>> = fetch_sockets.into_iter().map(Arc::new).collect();
let forward_sockets: Vec<Arc<UdpSocket>> =
tvu_forward_sockets.into_iter().map(Arc::new).collect();
Expand Down Expand Up @@ -187,6 +190,7 @@ impl Tvu {
cluster_info.clone(),
Arc::new(retransmit_sockets),
repair_socket,
ancestor_hashes_socket,
verified_receiver,
exit.clone(),
cluster_slots_update_receiver,
Expand Down Expand Up @@ -461,6 +465,7 @@ pub mod tests {
retransmit: target1.sockets.retransmit_sockets,
fetch: target1.sockets.tvu,
forwards: target1.sockets.tvu_forwards,
ancestor_hashes_requests: target1.sockets.ancestor_hashes_requests,
}
},
blockstore,
Expand Down
5 changes: 5 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,11 @@ impl Validator {
.iter()
.map(|s| s.try_clone().expect("Failed to clone TVU forwards Sockets"))
.collect(),
ancestor_hashes_requests: node
.sockets
.ancestor_hashes_requests
.try_clone()
.expect("Failed to clone ancestor_hashes_requests socket"),
},
blockstore.clone(),
ledger_signal_receiver,
Expand Down
2 changes: 2 additions & 0 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ impl WindowService {
verified_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
retransmit_sender: Sender<Vec<Shred>>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
repair_info: RepairInfo,
leader_schedule_cache: Arc<LeaderScheduleCache>,
Expand All @@ -479,6 +480,7 @@ impl WindowService {
blockstore.clone(),
exit.clone(),
repair_socket,
ancestor_hashes_socket,
repair_info,
verified_vote_receiver,
outstanding_requests.clone(),
Expand Down
16 changes: 14 additions & 2 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ use {
};

pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 10; // VALIDATOR_PORT_RANGE must be at least this wide
pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 11; // VALIDATOR_PORT_RANGE must be at least this wide

/// The Data plane fanout size, also used as the neighborhood size
pub const DATA_PLANE_FANOUT: usize = 200;
Expand Down Expand Up @@ -2742,6 +2742,7 @@ pub struct Sockets {
pub repair: UdpSocket,
pub retransmit_sockets: Vec<UdpSocket>,
pub serve_repair: UdpSocket,
pub ancestor_hashes_requests: UdpSocket,
}

#[derive(Debug)]
Expand Down Expand Up @@ -2775,6 +2776,8 @@ impl Node {
let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()];
let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap();
let ancestor_hashes_requests = UdpSocket::bind("0.0.0.0:0").unwrap();

let info = ContactInfo {
id: *pubkey,
gossip: gossip_addr,
Expand Down Expand Up @@ -2804,6 +2807,7 @@ impl Node {
repair,
retransmit_sockets: vec![retransmit_socket],
serve_repair,
ancestor_hashes_requests,
},
}
}
Expand Down Expand Up @@ -2845,6 +2849,7 @@ impl Node {
let (repair_port, repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
let (_, broadcast) = Self::bind(bind_ip_addr, port_range);
let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range);

let rpc_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap();
let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap();
Expand Down Expand Up @@ -2880,6 +2885,7 @@ impl Node {
repair,
retransmit_sockets: vec![retransmit_socket],
serve_repair,
ancestor_hashes_requests,
},
}
}
Expand Down Expand Up @@ -2917,6 +2923,8 @@ impl Node {
let (_, broadcast) =
multi_bind_in_range(bind_ip_addr, port_range, 4).expect("broadcast multi_bind");

let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range);

let info = ContactInfo {
id: *pubkey,
gossip: SocketAddr::new(gossip_addr.ip(), gossip_port),
Expand Down Expand Up @@ -2948,6 +2956,7 @@ impl Node {
retransmit_sockets,
serve_repair,
ip_echo: Some(ip_echo),
ancestor_hashes_requests,
},
}
}
Expand Down Expand Up @@ -3490,7 +3499,10 @@ mod tests {
fn new_with_external_ip_test_gossip() {
// Can't use VALIDATOR_PORT_RANGE because if this test runs in parallel with others, the
// port returned by `bind_in_range()` might be snatched up before `Node::new_with_external_ip()` runs
let port_range = (VALIDATOR_PORT_RANGE.1 + 10, VALIDATOR_PORT_RANGE.1 + 20);
let port_range = (
VALIDATOR_PORT_RANGE.1 + MINIMUM_VALIDATOR_PORT_RANGE_WIDTH,
VALIDATOR_PORT_RANGE.1 + (2 * MINIMUM_VALIDATOR_PORT_RANGE_WIDTH),
);

let ip = IpAddr::V4(Ipv4Addr::from(0));
let port = bind_in_range(ip, port_range).expect("Failed to bind").0;
Expand Down