From 23948c75b5a1f8c565f4f7ca1e5730e337303a50 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 18 Aug 2022 22:39:31 +0000 Subject: [PATCH 1/2] adds hash domain to ping-pong protocol (#27193) In order to maintain backward compatibility, for now the responding node will hash the token both with and without domain so that the other node will accept the response regardless of its upgrade status. Once the cluster has upgraded to the new code, we will remove the legacy domain = false case. (cherry picked from commit 6928b2a5af45bf6985b90ff116cf47f917ac1c9c) # Conflicts: # gossip/src/cluster_info.rs --- core/src/ancestor_hashes_service.rs | 15 +++++++---- core/src/serve_repair.rs | 15 +++++++---- gossip/src/cluster_info.rs | 26 ++++++++++++++++--- gossip/src/ping_pong.rs | 40 ++++++++++++++++++++++++----- 4 files changed, 76 insertions(+), 20 deletions(-) diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index 7a76146d682..38e7aac8b45 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -425,16 +425,21 @@ impl AncestorHashesService { stats.invalid_packets += 1; return None; } - if ping.verify() { - stats.ping_count += 1; - if let Ok(pong) = Pong::new(&ping, keypair) { + if !ping.verify() { + stats.ping_err_verify_count += 1; + return None; + } + stats.ping_count += 1; + // Respond both with and without domain so that the other node + // will accept the response regardless of its upgrade status. + // TODO: remove domain = false once cluster is upgraded. + for domain in [false, true] { + if let Ok(pong) = Pong::new(domain, &ping, keypair) { let pong = RepairProtocol::Pong(pong); if let Ok(pong_bytes) = serialize(&pong) { let _ignore = ancestor_socket.send_to(&pong_bytes[..], from_addr); } } - } else { - stats.ping_err_verify_count += 1; } None } diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 043585f8aa0..6ad42c63076 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -1031,11 +1031,16 @@ impl ServeRepair { } packet.meta.set_discard(true); stats.ping_count += 1; - if let Ok(pong) = Pong::new(&ping, keypair) { - let pong = RepairProtocol::Pong(pong); - if let Ok(pong_bytes) = serialize(&pong) { - let from_addr = packet.meta.socket_addr(); - pending_pongs.push((pong_bytes, from_addr)); + // Respond both with and without domain so that the other node + // will accept the response regardless of its upgrade status. + // TODO: remove domain = false once cluster is upgraded. + for domain in [false, true] { + if let Ok(pong) = Pong::new(domain, &ping, keypair) { + let pong = RepairProtocol::Pong(pong); + if let Ok(pong_bytes) = serialize(&pong) { + let from_addr = packet.meta.socket_addr(); + pending_pongs.push((pong_bytes, from_addr)); + } } } } diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 27652597d18..1dab7f02f4f 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2160,6 +2160,7 @@ impl ClusterInfo { I: IntoIterator, { let keypair = self.keypair(); +<<<<<<< HEAD let packets: Vec<_> = pings .into_iter() .filter_map(|(addr, ping)| { @@ -2175,6 +2176,21 @@ impl ClusterInfo { }) .collect(); if packets.is_empty() { +======= + let mut pongs_and_dests = Vec::new(); + for (addr, ping) in pings { + // Respond both with and without domain so that the other node will + // accept the response regardless of its upgrade status. + // TODO: remove domain = false once cluster is upgraded. + for domain in [false, true] { + if let Ok(pong) = Pong::new(domain, &ping, &keypair) { + let pong = Protocol::PongMessage(pong); + pongs_and_dests.push((addr, pong)); + } + } + } + if pongs_and_dests.is_empty() { +>>>>>>> 6928b2a5a (adds hash domain to ping-pong protocol (#27193)) None } else { let packet_batch = PacketBatch::new_unpinned_with_recycler_data( @@ -3181,7 +3197,9 @@ mod tests { let pongs: Vec<(SocketAddr, Pong)> = pings .iter() .zip(&remote_nodes) - .map(|(ping, (keypair, socket))| (*socket, Pong::new(ping, keypair).unwrap())) + .map(|(ping, (keypair, socket))| { + (*socket, Pong::new(/*domain:*/ true, ping, keypair).unwrap()) + }) .collect(); let now = now + Duration::from_millis(1); cluster_info.handle_batch_pong_messages(pongs, now); @@ -3224,7 +3242,7 @@ mod tests { .collect(); let pongs: Vec<_> = pings .iter() - .map(|ping| Pong::new(ping, &this_node).unwrap()) + .map(|ping| Pong::new(/*domain:*/ false, ping, &this_node).unwrap()) .collect(); let recycler = PacketBatchRecycler::default(); let packets = cluster_info @@ -3236,9 +3254,9 @@ mod tests { &recycler, ) .unwrap(); - assert_eq!(remote_nodes.len(), packets.len()); + assert_eq!(remote_nodes.len() * 2, packets.len()); for (packet, (_, socket), pong) in izip!( - packets.into_iter(), + packets.into_iter().step_by(2), remote_nodes.into_iter(), pongs.into_iter() ) { diff --git a/gossip/src/ping_pong.rs b/gossip/src/ping_pong.rs index 6c3a219cfdb..16961f26f18 100644 --- a/gossip/src/ping_pong.rs +++ b/gossip/src/ping_pong.rs @@ -16,6 +16,8 @@ use { }, }; +const PING_PONG_HASH_PREFIX: &[u8] = "SOLANA_PING_PONG".as_bytes(); + #[derive(AbiExample, Debug, Deserialize, Serialize)] pub struct Ping { from: Pubkey, @@ -100,8 +102,17 @@ impl Signable for Ping { } impl Pong { - pub fn new(ping: &Ping, keypair: &Keypair) -> Result { - let hash = hash::hash(&serialize(&ping.token)?); + pub fn new( + domain: bool, + ping: &Ping, + keypair: &Keypair, + ) -> Result { + let token = serialize(&ping.token)?; + let hash = if domain { + hash::hashv(&[PING_PONG_HASH_PREFIX, &token]) + } else { + hash::hash(&token) + }; let pong = Pong { from: keypair.pubkey(), hash, @@ -187,9 +198,15 @@ impl PingCache { Some(t) if now.saturating_duration_since(*t) < delay => None, _ => { let ping = pingf()?; - let hash = hash::hash(&serialize(&ping.token).ok()?); - self.pings.put(node, now); + let token = serialize(&ping.token).ok()?; + // For backward compatibility, for now responses both with and + // without domain are accepted. + // TODO: remove no domain case once cluster is upgraded. + let hash = hash::hash(&token); + self.pending_cache.put(hash, node); + let hash = hash::hashv(&[PING_PONG_HASH_PREFIX, &token]); self.pending_cache.put(hash, node); + self.pings.put(node, now); Some(ping) } } @@ -281,10 +298,18 @@ mod tests { assert!(ping.verify()); assert!(ping.sanitize().is_ok()); - let pong = Pong::new(&ping, &keypair).unwrap(); + let pong = Pong::new(/*domain:*/ false, &ping, &keypair).unwrap(); assert!(pong.verify()); assert!(pong.sanitize().is_ok()); assert_eq!(hash::hash(&ping.token), pong.hash); + + let pong = Pong::new(/*domian:*/ true, &ping, &keypair).unwrap(); + assert!(pong.verify()); + assert!(pong.sanitize().is_ok()); + assert_eq!( + hash::hashv(&[PING_PONG_HASH_PREFIX, &ping.token]), + pong.hash + ); } #[test] @@ -339,7 +364,10 @@ mod tests { assert!(ping.is_none()); } Some(ping) => { - let pong = Pong::new(ping, keypair).unwrap(); + let domain = rng.gen_ratio(1, 2); + let pong = Pong::new(domain, ping, keypair).unwrap(); + assert!(cache.add(&pong, *socket, now)); + let pong = Pong::new(!domain, ping, keypair).unwrap(); assert!(cache.add(&pong, *socket, now)); } } From 881b7e37098764def0eb4769abd885b932ebb372 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 18 Aug 2022 18:48:50 -0400 Subject: [PATCH 2/2] removes mergify merge conflicts --- gossip/src/cluster_info.rs | 29 ++++++++--------------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 1dab7f02f4f..568054e1fc9 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2160,24 +2160,7 @@ impl ClusterInfo { I: IntoIterator, { let keypair = self.keypair(); -<<<<<<< HEAD - let packets: Vec<_> = pings - .into_iter() - .filter_map(|(addr, ping)| { - let pong = Pong::new(&ping, &keypair).ok()?; - let pong = Protocol::PongMessage(pong); - match Packet::from_data(Some(&addr), pong) { - Ok(packet) => Some(packet), - Err(err) => { - error!("failed to write pong packet: {:?}", err); - None - } - } - }) - .collect(); - if packets.is_empty() { -======= - let mut pongs_and_dests = Vec::new(); + let mut packets = Vec::new(); for (addr, ping) in pings { // Respond both with and without domain so that the other node will // accept the response regardless of its upgrade status. @@ -2185,12 +2168,16 @@ impl ClusterInfo { for domain in [false, true] { if let Ok(pong) = Pong::new(domain, &ping, &keypair) { let pong = Protocol::PongMessage(pong); - pongs_and_dests.push((addr, pong)); + match Packet::from_data(Some(&addr), pong) { + Ok(packet) => packets.push(packet), + Err(err) => { + error!("failed to write pong packet: {:?}", err); + } + } } } } - if pongs_and_dests.is_empty() { ->>>>>>> 6928b2a5a (adds hash domain to ping-pong protocol (#27193)) + if packets.is_empty() { None } else { let packet_batch = PacketBatch::new_unpinned_with_recycler_data(